from __future__ import annotations
import keyword as _keyword
import re
import re as _re
from collections.abc import Mapping, MutableMapping
from numbers import Number as _Number
import numpy as _numpy
from ..util.naming import parenthize
_ParameterRef_C_repr_txt = "P"
_DataRef_repr_txt = "X"
_null_ = "_"
_boolmatch = re.compile(r"^boolean\((.+)==(.+)\)$")
def _what_is(thing):
if isinstance(thing, (ParameterRef, DataRef)):
return repr(thing)
if isinstance(thing, (str, int, float)):
return f"{thing.__class__.__name__}({thing})"
if isinstance(thing, LinearComponent):
return f"{thing.__class__.__name__}({thing!r})"
return f"<{thing.__class__.__name__}>"
def _unsupported_operands(op, a, b):
return f"unsupported operands for {op}: '{_what_is(a)}' and '{_what_is(b)}'"
[docs]
class UnicodeRef(str):
"""
A common base class for all larch named reference types.
This class itself has no features and should not be instantiated.
Instead create :class:`ParameterRef` or :class:`DataRef` objects as needed.
"""
class Ref_Gen:
def __init__(self, kind):
self._kind = kind
def __getattr__(self, key):
return self._kind(key)
def __call__(self, arg):
return self._kind(str(arg))
def __getitem__(self, arg):
return self._kind(str(arg))
[docs]
class ParameterRef(UnicodeRef):
_precedence = 99
def __init__(self, *args):
self._formatting = None
def set_fmt(self, formatting):
self._formatting = formatting
return self
def __repr__(self):
if _re.match("[_A-Za-z][_a-zA-Z0-9]*$", self) and not _keyword.iskeyword(self):
return f"{_ParameterRef_C_repr_txt}.{self}"
else:
return f"{_ParameterRef_C_repr_txt}('{self}')"
def __eq__(self, other):
if isinstance(other, str) and not isinstance(other, DataRef):
if str(self) == str(other):
return True
return False
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return super().__hash__()
def __pos__(self):
return self
def __add__(self, other):
if isinstance(self, ParameterRef):
if other == 0:
return self
if isinstance(other, (ParameterRef, LinearComponent, LinearFunction)):
return LinearComponent(param=str(self), data="1") + other
if isinstance(other, _Number):
from .linear_math import ParameterAdd
return ParameterAdd(self, other)
elif isinstance(other, ParameterRef):
if self == 0:
return other
if isinstance(self, (LinearComponent, LinearFunction)):
return self + LinearComponent(param=str(other), data="1")
if isinstance(self, _Number):
from .linear_math import ParameterAdd
return ParameterAdd(self, other)
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} + {_what_is(other)}")
def __sub__(self, other):
if isinstance(self, ParameterRef):
if other == 0:
return self
if isinstance(other, (ParameterRef, LinearComponent, LinearFunction)):
return LinearComponent(param=str(self), data="1") - other
if isinstance(other, _Number):
from .linear_math import ParameterSubtract
return ParameterSubtract(self, other)
elif isinstance(other, ParameterRef):
if self == 0:
return other
if isinstance(self, (LinearComponent, LinearFunction)):
return self - LinearComponent(param=str(other), data="1")
if isinstance(self, _Number):
from .linear_math import ParameterSubtract
return ParameterSubtract(self, other)
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} - {_what_is(other)}")
def __mul__(self, other):
if isinstance(self, ParameterRef):
if isinstance(other, DataRef):
return LinearComponent(param=str(self), data=str(other))
if isinstance(other, _Number):
# return LinearComponent(param=str(self), data=str(other))
return LinearComponent(param=str(self), data="1", scale=other)
if isinstance(other, ParameterRef):
if self == _null_:
return other
if other == _null_:
return self
from .linear_math import ParameterMultiply
return ParameterMultiply(self, other)
if isinstance(other, LinearComponent):
if self == _null_:
return other
if other.param == _null_:
return LinearComponent(
param=str(self), data=str(other.data), scale=other.scale
)
from .linear_math import ParameterMultiply
return ParameterMultiply(self, other)
if isinstance(other, LinearFunction):
return LinearFunction([self * c for c in other])
elif isinstance(other, ParameterRef):
if isinstance(self, DataRef):
return LinearComponent(param=str(other), data=str(self))
if isinstance(self, _Number):
return LinearComponent(param=str(other), data="1", scale=float(self))
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} * {_what_is(other)}")
def __truediv__(self, other):
if isinstance(self, ParameterRef):
if isinstance(other, ParameterRef):
from .linear_math import ParameterDivide
return ParameterDivide(self, other)
elif isinstance(other, _Number):
return LinearComponent(param=str(self), data="1", scale=1 / other)
elif isinstance(other, DataRef):
return LinearComponent(param=str(self), data=str(1 / other), scale=1)
elif isinstance(other, ParameterRef):
return NotImplemented
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} / {_what_is(other)}")
[docs]
def value(self, *args):
"""
Get the value of the parameter in a given model.
Parameters
----------
m : Model
The model from which to extract a parameter value.
Returns
-------
float
"""
s = str(self)
for m in args:
if isinstance(m, dict) and s in m:
return m[s]
else:
try:
return m.get_value(s)
except KeyError:
pass
raise KeyError(s)
[docs]
def string(self, m):
"""
Get the value of the parameter in a given model, as a formatted string.
Parameters
----------
m : Model
The model from which to extract a parameter value.
Returns
-------
str
"""
if self._formatting is None:
return f"{self.value(m):.3g}"
else:
return self._formatting.format(self.value(m))
[docs]
def valid(self, m):
"""
Check if this ParameterRef would give a value for a given model.
Parameters
----------
m : Model
The model from which to extract a parameter value.
Returns
-------
bool
False if the value method would raise an exception, and True otherwise.
"""
if str(self) in m:
return True
return False
def as_pmath(self):
from .linear_math import ParameterNoop
return ParameterNoop(self)
def __xml__(self, resolve_parameters=None, value_in_tooltips=True):
if resolve_parameters is not None:
if value_in_tooltips:
p_display = repr(self)
p_tooltip = self.string(resolve_parameters)
else:
p_display = self.string(resolve_parameters)
p_tooltip = repr(self)
else:
p_display = repr(self)
p_tooltip = "This is a Parameter"
from xmle import Elem
x = Elem("div")
if use_tooltips:
a_p = x.elem("div", attrib={"class": "tooltipped"}, text=p_display)
a_p.elem("span", attrib={"class": "tooltiptext"}, text=p_tooltip)
else:
a_p = x.elem("span", attrib={"class": "Larch_Parameter"}, text=p_display)
return x
[docs]
class DataRef(UnicodeRef):
def __repr__(self):
if _re.match("[_A-Za-z][_a-zA-Z0-9]*$", self) and not _keyword.iskeyword(self):
return f"{_DataRef_repr_txt}.{self}"
else:
return f"{_DataRef_repr_txt}('{self}')"
def __eq__(self, other):
if isinstance(other, str) and not isinstance(other, ParameterRef):
if str(self) == str(other):
return True
return False
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return super().__hash__()
def __pos__(self):
return self
def __add__(self, other):
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
if self == "0" or self == "0.0" or self == 0:
return other
if other == "0" or other == "0.0" or other == 0:
return self
# Double zero is trapped here as it is used to flag duplicate terms in a utility function.
if self == "00":
return DataRef(f"0+{parenthize(self)}")
if other == "00":
return DataRef(f"{parenthize(self)}+0")
return DataRef(f"{parenthize(self)}+{parenthize(other, True)}")
if isinstance(self, DataRef) and isinstance(
other, (ParameterRef, LinearComponent)
):
return P(_null_) * self + other
# Don't return NotImplemented just raise TypeError when adding a DataRef and a plain string.
# This will disallow the __radd__ method on the plain string.
if (
isinstance(self, (DataRef, _Number))
and isinstance(other, str)
and not isinstance(other, DataRef)
):
raise TypeError(_unsupported_operands("-", self, other))
if (
isinstance(other, (DataRef, _Number))
and isinstance(self, str)
and not isinstance(self, DataRef)
):
raise TypeError(_unsupported_operands("-", self, other))
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} + {_what_is(other)}")
def __sub__(self, other):
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
return DataRef(f"{parenthize(self)}-{parenthize(other, True)}")
if isinstance(self, DataRef) and isinstance(other, ParameterRef):
return P(_null_) * self - other
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} - {_what_is(other)}")
def __mul__(self, other):
if isinstance(self, DataRef):
if isinstance(other, (DataRef, _Number)):
if self == "1" or self == "1.0" or self == 1:
return other
if other == "1" or other == "1.0" or other == 1:
return self
if isinstance(other, _Number):
return P(_null_) * other * self
if self == other and self[:8] == "boolean(" and self[-1:] == ")":
# Squaring a boolean does not change it
return self
if (
self[:8] == "boolean("
and self[-1:] == ")"
and other[:8] == "boolean("
and other[-1:] == ")"
):
# Check for two mutually exclusive conditions
boolmatch1 = _boolmatch.match(self)
if boolmatch1:
boolmatch2 = _boolmatch.match(other)
if boolmatch2:
if boolmatch1.group(1) == boolmatch2.group(1):
if boolmatch1.group(2) != boolmatch2.group(2):
return DataRef("0")
return DataRef(f"{parenthize(self)}*{parenthize(other, True)}")
if isinstance(other, ParameterRef):
return LinearComponent(param=str(other), data=str(self))
if isinstance(other, LinearComponent):
return LinearComponent(
param=str(other.param),
data=str(self * other.data),
scale=other.scale,
)
if isinstance(other, LinearFunction):
return LinearFunction([self * c for c in other])
elif isinstance(other, DataRef):
if isinstance(self, (DataRef, _Number)):
if self == "1" or self == "1.0" or self == 1:
return other
if other == "1" or other == "1.0" or other == 1:
return self
if isinstance(self, _Number):
return P(_null_) * self * other
return DataRef(f"{parenthize(self)}*{parenthize(other, True)}")
if isinstance(self, ParameterRef):
return LinearComponent(param=str(self), data=str(other))
if isinstance(self, LinearComponent):
return LinearComponent(
param=str(self.param), data=str(self.data * other)
)
if isinstance(self, LinearFunction):
return LinearFunction([c * other for c in self])
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} * {_what_is(other)}")
def __rmul__(self, other):
if isinstance(self, DataRef):
if isinstance(other, (DataRef, _Number)):
if self == "1" or self == "1.0" or self == 1:
return other
if other == "1" or other == "1.0" or other == 1:
return self
if isinstance(other, _Number):
return P(_null_) * other * self
if self == other and self[:8] == "boolean(" and self[-1:] == ")":
# Squaring a boolean does not change it
return self
if (
self[:8] == "boolean("
and self[-1:] == ")"
and other[:8] == "boolean("
and other[-1:] == ")"
):
# Check for two mutually exclusive conditions
boolmatch1 = _boolmatch.match(self)
if boolmatch1:
boolmatch2 = _boolmatch.match(other)
if boolmatch2:
if boolmatch1.group(1) == boolmatch2.group(1):
if boolmatch1.group(2) != boolmatch2.group(2):
return DataRef("0")
return DataRef(f"{parenthize(other, True)}*{parenthize(self)}")
if isinstance(other, ParameterRef):
return LinearComponent(param=str(other), data=str(self))
if isinstance(other, LinearComponent):
return LinearComponent(
param=str(other.param),
data=str(other.data * self),
scale=other.scale,
)
if isinstance(other, LinearFunction):
return LinearFunction([self * c for c in other])
elif isinstance(other, DataRef):
if isinstance(self, (DataRef, _Number)):
if self == "1" or self == "1.0" or self == 1:
return other
if other == "1" or other == "1.0" or other == 1:
return self
if isinstance(self, _Number):
return P(_null_) * self * other
return DataRef(f"{parenthize(other, True)}*{parenthize(self)}")
if isinstance(self, ParameterRef):
return LinearComponent(param=str(self), data=str(other))
if isinstance(self, LinearComponent):
return LinearComponent(
param=str(self.param), data=str(other * self.data)
)
if isinstance(self, LinearFunction):
return LinearFunction([other * c for c in self])
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} * {_what_is(other)}")
def __truediv__(self, other):
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
return DataRef(f"{parenthize(self)}/{parenthize(other, True)}")
if isinstance(self, ParameterRef) and isinstance(other, (DataRef, _Number)):
return self * DataRef(f"1/{parenthize(other, True)}")
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} / {_what_is(other)}")
def __rtruediv__(self, other):
if isinstance(self, DataRef) and isinstance(other, DataRef):
return DataRef(f"{parenthize(other, True)}/{parenthize(self)}")
if isinstance(self, DataRef) and isinstance(other, ParameterRef):
return LinearComponent(param=str(other), data=f"1/{self}")
if isinstance(self, DataRef) and isinstance(other, LinearComponent):
return LinearComponent(
param=str(other.param),
data=f"{parenthize(other.data, True)}/{parenthize(self)}",
scale=other.scale,
)
if isinstance(self, DataRef) and isinstance(other, _Number):
if other == 1:
return DataRef(f"1/{parenthize(self, True)}")
else:
return other * DataRef(f"1/{parenthize(self, True)}")
return NotImplemented
def __and__(self, other):
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
return DataRef(f"{parenthize(self)}&{parenthize(other, True)}")
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} & {_what_is(other)}")
def __or__(self, other):
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
return DataRef(f"{parenthize(self)}|{parenthize(other, True)}")
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} | {_what_is(other)}")
def __xor__(self, other):
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
return DataRef(f"{parenthize(self)}^{parenthize(other, True)}")
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} ^ {_what_is(other)}")
def __floordiv__(self, other):
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
return DataRef(f"{parenthize(self)}//{parenthize(other, True)}")
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} // {_what_is(other)}")
def __pow__(self, other, modulo):
if modulo is not None:
raise NotImplementedError(
f"no pow with modulo on {self.__class__.__name__}"
)
if isinstance(self, (DataRef, _Number)) and isinstance(
other, (DataRef, _Number)
):
return DataRef(f"{parenthize(self)}**{parenthize(other, True)}")
return NotImplemented # raise NotImplementedError(f"{_what_is(self)} ** {_what_is(other)}")
def __invert__(self):
return DataRef(f"~{parenthize(self, True)}")
def __neg__(self):
return DataRef(f"-{parenthize(self, True)}")
def eval(self, namespace=None, *, globals=None, **more_namespace):
import numpy
from ..util.common_functions import hard_sigmoid, piece
use_namespace = {
"exp": numpy.exp,
"log": numpy.log,
"log1p": numpy.log1p,
"fabs": numpy.fabs,
"sqrt": numpy.sqrt,
"absolute": numpy.absolute,
"isnan": numpy.isnan,
"isfinite": numpy.isfinite,
"logaddexp": numpy.logaddexp,
"fmin": numpy.fmin,
"fmax": numpy.fmax,
"nan_to_num": numpy.nan_to_num,
"piece": piece,
"hard_sigmoid": hard_sigmoid,
}
if namespace is not None:
use_namespace.update(namespace)
use_namespace.update(more_namespace)
return eval(self, globals, use_namespace)
use_tooltips = False
class LinearComponent:
def __init__(self, param: str, data: str = "1", scale: float = 1):
self._param = param
self._data = data
self._scale = scale
@property
def param(self):
return ParameterRef(self._param)
@property
def data(self):
return DataRef(self._data)
@property
def scale(self):
return self._scale
def to_dict(self):
out = {
"param": str(self._param),
"data": str(self._data),
}
if self.scale != 1:
out["scale"] = self.scale
return out
def __pos__(self):
return self
def __neg__(self):
return self.__class__(param=self._param, data=self._data, scale=-self._scale)
def __repr__(self):
try:
if self.scale == 1.0:
try:
data_is_1 = float(self.data) == 1
except Exception:
data_is_1 = False
if data_is_1:
return f"{self.param!r}"
else:
return f"{self.param!r} * {self.data!r}"
return f"{self.param!r} * {self.scale} * {self.data!r}"
except AttributeError:
return f"<{self.__class__.__name__} {id(self)} with error>"
def _str_exponentiate(self):
try:
if self.scale == 1.0:
try:
data_is_1 = float(self.data) == 1
except Exception:
data_is_1 = False
if data_is_1:
return f"exp({self.param!r})"
else:
return f"exp({self.param!r}) * {self.data!r}"
return f"exp({self.param!r}) * {self.scale} * {self.data!r}"
except AttributeError:
return f"<{self.__class__.__name__} {id(self)} with error>"
def __radd__(self, other):
if other == 0:
return self
else:
raise TypeError(
f"unsupported operand type(s) for +: {type(other)} and {type(self)}"
)
def __add__(self, other):
if isinstance(self, LinearComponent):
if other == () or other == 0:
return self
elif isinstance(other, LinearComponent):
if other.data == "0" or other.data == "0.0" or other.scale == 0:
return self
if self.data == "0" or self.data == "0.0" or self.scale == 0:
return other
return LinearFunction([self, other])
elif isinstance(other, LinearFunction):
return LinearFunction([self, *other])
elif isinstance(other, ParameterRef):
return self + LinearComponent(param=str(other))
elif isinstance(other, DataRef):
return self + LinearComponent(param=_null_, data=str(other))
else:
try:
return self.as_pmath() + other
except NotImplementedError:
pass
elif isinstance(other, LinearComponent):
if self == () or self == 0:
return other
elif isinstance(self, ParameterRef):
# return LinearFunction([LinearComponent(param=str(self)), other])
return LinearComponent(param=str(self)) + other
elif isinstance(self, DataRef):
return LinearComponent(param=_null_, data=str(self)) + other
elif isinstance(self, LinearFunction):
return (
LinearFunction(
[
*self,
]
)
+ other
)
else:
try:
return other + self.as_pmath()
except NotImplementedError:
pass
raise NotImplementedError(f"{_what_is(self)} + {_what_is(other)}")
def __sub__(self, other):
if isinstance(self, LinearComponent):
if other == () or other == 0:
return self
elif isinstance(other, LinearComponent):
return LinearFunction([self, -other])
elif isinstance(other, LinearFunction):
return LinearFunction([self, *(-other)])
elif isinstance(other, ParameterRef):
return LinearFunction([self, -LinearComponent(param=str(other))])
elif isinstance(other, ParameterRef):
return LinearFunction(
[self, -LinearComponent(param=_null_, data=str(other))]
)
else:
try:
return self.as_pmath() - other
except NotImplementedError:
pass
raise NotImplementedError(f"{_what_is(self)} + {_what_is(other)}")
def __mul__(self, other):
if isinstance(self, LinearComponent):
if isinstance(
other,
(
int,
float,
),
):
return self.__class__(
param=str(self.param),
data=str(self.data),
scale=self.scale * other,
)
if isinstance(other, (DataRef,)):
return self.__class__(
param=str(self.param),
data=str(self.data * other),
scale=self.scale,
)
if isinstance(other, (LinearComponent,)):
if self.param == _null_:
return self.__class__(
param=str(other.param),
data=str(self.data * other.data),
scale=self.scale * other.scale,
)
if other.param == _null_:
return self.__class__(
param=str(self.param),
data=str(self.data * other.data),
scale=self.scale * other.scale,
)
from .linear_math import ParameterMultiply
return ParameterMultiply(
self.as_pmath(),
other.as_pmath(),
)
if isinstance(other, ParameterRef):
if other == _null_:
return self
elif self.param == _null_:
return self.__class__(
param=str(other),
data=str(self.data),
scale=self.scale,
)
try:
return self.as_pmath() * other
except NotImplementedError:
pass
if isinstance(other, LinearComponent) and isinstance(self, _Number):
return other.__class__(
param=str(other.param),
data=str(other.data),
scale=other.scale * self,
)
raise NotImplementedError(f"{_what_is(self)} * {_what_is(other)}")
def __truediv__(self, other):
if isinstance(self, LinearComponent):
if isinstance(
other,
(
int,
float,
),
):
return self.__class__(
param=str(self.param),
data=str(self.data),
scale=self.scale / other,
)
elif isinstance(other, (DataRef,)):
return self.__class__(
param=str(self.param),
data=str(self.data / other),
scale=self.scale,
)
elif isinstance(other, (LinearComponent,)):
from .linear_math import ParameterDivide
return ParameterDivide(
self.as_pmath(),
other.as_pmath(),
)
else:
try:
return self.as_pmath() / other
except NotImplementedError:
pass
raise NotImplementedError(f"{_what_is(self)} / {_what_is(other)}")
def __iter__(self):
return iter(LinearFunction([self]))
def __eq__(self, other):
if isinstance(other, LinearFunction) and len(other) == 1:
other = other[0]
if not isinstance(other, LinearComponent):
return False
if self.param != other.param:
return False
if self.data != other.data:
return False
if self.scale != other.scale:
return False
return True
def __xml__(
self,
exponentiate_parameter=False,
resolve_parameters=None,
value_in_tooltips=True,
):
from xmle import Elem
if use_tooltips:
x = Elem("div")
# x << tooltipped_style()
if resolve_parameters is not None:
if exponentiate_parameter:
if value_in_tooltips:
p_tooltip = (
f"exp({self.param.string(resolve_parameters)}) "
f"= {self.param.value(resolve_parameters):.4g}"
)
p_display = f"{repr(self.param)}"
else:
p_display = f"{self.param.string(resolve_parameters)}"
p_tooltip = f"exp({repr(self.param)})"
else:
if value_in_tooltips:
p_tooltip = self.param.string(resolve_parameters)
p_display = repr(self.param)
else:
p_display = self.param.string(resolve_parameters)
p_tooltip = repr(self.param)
else:
p_display = repr(self.param)
p_tooltip = "This is a Parameter"
data_tail = " * "
try:
if float(self.data) == 1:
data_tail = ""
except Exception:
pass
if self.scale == 1.0:
if exponentiate_parameter:
x.elem("span", tail="exp(")
a_p = x.elem(
"div",
attrib={"class": "tooltipped"},
text=p_display,
tail=")" + data_tail,
)
a_p.elem("span", attrib={"class": "tooltiptext"}, text=p_tooltip)
else:
a_p = x.elem(
"div",
attrib={"class": "tooltipped"},
text=p_display,
tail=data_tail,
)
a_p.elem("span", attrib={"class": "tooltiptext"}, text=p_tooltip)
else:
if exponentiate_parameter:
x.elem("span", tail="exp(")
a_p = x.elem(
"div",
attrib={"class": "tooltipped"},
text=p_display,
tail=f" * {self.scale}){data_tail}",
)
a_p.elem("span", attrib={"class": "tooltiptext"}, text=p_tooltip)
else:
a_p = x.elem(
"div",
attrib={"class": "tooltipped"},
text=p_display,
tail=f" * {self.scale}{data_tail}",
)
a_p.elem("span", attrib={"class": "tooltiptext"}, text=p_tooltip)
if data_tail == " * ":
a_x = x.elem(
"div", attrib={"class": "tooltipped"}, text=repr(self.data)
)
a_x.elem("span", attrib={"class": "tooltiptext"}, text="This is Data")
else:
x = Elem("pre")
# x << tooltipped_style()
if resolve_parameters is not None:
if exponentiate_parameter:
if value_in_tooltips:
p_display = f"{repr(self.param)}"
else:
p_display = f"{self.param.string(resolve_parameters)}"
else:
if value_in_tooltips:
p_display = repr(self.param)
else:
p_display = self.param.string(resolve_parameters)
else:
p_display = repr(self.param)
data_tail = " * "
try:
if float(self.data) == 1:
data_tail = ""
except Exception:
pass
if self.scale == 1.0:
if exponentiate_parameter:
x.elem("span", tail="exp(")
a_p = x.elem(
"span",
attrib={"class": "LinearFunc_Param"},
text=p_display,
tail=")" + data_tail,
)
else:
a_p = x.elem(
"span",
attrib={"class": "LinearFunc_Param"},
text=p_display,
tail=data_tail,
)
else:
if exponentiate_parameter:
x.elem("span", tail="exp(")
a_p = x.elem(
"span",
attrib={"class": "LinearFunc_Param"},
text=p_display,
tail=f" * {self.scale}){data_tail}",
)
else:
a_p = x.elem(
"span",
attrib={"class": "LinearFunc_Param"},
text=p_display,
tail=f" * {self.scale}{data_tail}",
)
if data_tail == " * ":
a_x = x.elem(
"span", attrib={"class": "LinearFunc_Data"}, text=repr(self.data)
)
return x
def _repr_html_(self):
return self.__xml__().tostring()
def evaluate(self, p_getter, x_namespace=None, exp_params=False, **kwargs):
if self.data in x_namespace:
x = x_namespace[self.data]
elif self.data in kwargs:
x = kwargs[self.data]
else:
x = self.data.eval(namespace=x_namespace, **kwargs)
if exp_params:
return self.scale * _numpy.exp(p_getter(str(self.param))) * x
else:
return self.scale * p_getter(str(self.param)) * x
def __copy__(self):
return self.__class__(
param=str(self.param),
data=str(self._data),
scale=self.scale,
)
def as_pmath(self):
from .linear_math import ParameterMultiply, ParameterNoop
if self.data != "1":
try:
scale = float(self.data) * self.scale
except Exception:
pass
else:
if scale == 1:
return ParameterNoop(self.param)
else:
return ParameterMultiply(self.param, scale)
raise NotImplementedError("data is not 1")
if self.scale == 1:
return ParameterNoop(self.param)
else:
return ParameterMultiply(self.param, self.scale)
def _try_mangle(instance):
try:
instance.mangle()
except AttributeError:
pass # print(f"No Mangle L: {err}")
def _try_mangle_h(instance_holder):
try:
instance_holder._instance.mangle()
except AttributeError:
pass # print(f"No Mangle L2: {err}")
class LinearFunction:
_instance = None
def __init__(self, init=None):
self._func = list()
if init is not None and init != 0:
# Copy contents of init, stabilizes functionality of iadd
init_ = list(init)
for i in init_:
if isinstance(i, Mapping):
i = LinearComponent(**i)
if isinstance(i, LinearComponent):
self._func.append(i)
else:
raise TypeError(
f"members of {self.__class__.__name__} must be LinearComponent"
)
def set_instance(self, instance):
self._instance = instance
def __fresh(self, instance):
newself = LinearFunction()
newself._instance = instance
setattr(instance, self.private_name, newself)
return newself
def __get__(self, instance, owner):
# LinearFunction newself
if instance is None:
return self
try:
newself = getattr(instance, self.private_name)
except AttributeError:
newself = self.__fresh(instance)
if newself is None:
newself = self.__fresh(instance)
return newself
def __set__(self, instance, values):
try:
newself = getattr(instance, self.private_name)
except AttributeError:
newself = self.__fresh(instance)
if newself is None:
newself = self.__fresh(instance)
newself.__init__(values)
# _try_mangle_h(newself)
try:
newself._instance.mangle()
except AttributeError:
pass # print(f"No Mangle L2: {err}")
else:
pass # print(f"Yes Mangle L2: {newself._instance}")
def __delete__(self, instance):
try:
newself = getattr(instance, self.private_name)
except AttributeError:
newself = self.__fresh(instance)
newself.__init__()
newself._instance = instance
# _try_mangle_h(newself)
try:
newself._instance.mangle()
except AttributeError:
pass # print(f"No Mangle L2: {err}")
else:
pass # print(f"Yes Mangle L2: {newself._instance}")
def __set_name__(self, owner, name):
self.name = name
self.private_name = "_" + name
def __getitem__(self, item):
return self._func[item]
def __setitem__(self, key, value: LinearComponent):
self._func[key] = value
_try_mangle(self._instance)
def __delitem__(self, key):
del self._func[key]
_try_mangle(self._instance)
def remove_data(self, data):
"""
Remove all terms from this linear function with the given data.
This operation mutates this linear function in-place.
Parameters
----------
data : str
Name of data to remove, which must match exactly.
Returns
-------
self : LinearFunction
"""
i = len(self._func)
while i > 0:
i -= 1
if self._func[i].data == data:
del self._func[i]
return self
def remove_param(self, param):
"""
Remove all terms from this linear function with the given parameter.
This operation mutates this linear function in-place.
Parameters
----------
param : str
Name of parameter to remove
Returns
-------
self : LinearFunction
"""
i = len(self._func)
while i > 0:
i -= 1
if self._func[i].param == param:
del self._func[i]
return self
def __len__(self):
return len(self._func)
def insert(self, index, value: LinearComponent):
self._func.insert(index, value)
_try_mangle(self._instance)
def append(self, value: LinearComponent):
self._func.append(value)
_try_mangle(self._instance)
def extend(self, values):
for v in values:
if not isinstance(v, LinearComponent):
raise TypeError(f"cannot add type {type(v)} to LinearFunction")
self._func.extend(values)
_try_mangle(self._instance)
def __add__(self, other):
if isinstance(self, LinearFunction):
if other == () or other == 0:
return self
if isinstance(other, LinearFunction):
return self.__class__([*list(self), *list(other)])
if isinstance(other, ParameterRef):
other = LinearComponent(param=str(other))
if isinstance(other, LinearComponent):
result = self.__class__(self)
if not (other.data == "0" or other.data == "0.0" or other.scale == 0):
result.append(other)
return result
from .linear_math import ParameterAdd, _ParameterOp
if isinstance(other, (_ParameterOp, _Number)):
try:
return ParameterAdd(self.as_pmath(), other)
except NotImplementedError:
pass
if isinstance(other, LinearFunction):
if self == () or self == 0:
return other
from .linear_math import ParameterAdd, _ParameterOp
if isinstance(self, (_ParameterOp, _Number)):
try:
return ParameterAdd(self, other.as_pmath())
except NotImplementedError:
pass
raise NotImplementedError(f"{_what_is(self)} + {_what_is(other)}")
def __radd__(self, other):
if other == 0:
return self
else:
raise TypeError(
f"unsupported operand type(s) for +: {type(other)} and {type(self)}"
)
def __iadd__(self, other):
if isinstance(other, ParameterRef):
other = LinearComponent(param=str(other))
if other == () or other == 0:
return self
elif isinstance(other, LinearFunction):
self._func.extend(other)
_try_mangle(self._instance)
elif isinstance(other, LinearComponent):
self.append(other)
else:
raise TypeError(f"cannot add type {type(other)} to LinearFunction")
return self
def __pos__(self):
return self
def __neg__(self):
return self.__class__(-i for i in self)
def __sub__(self, other):
if isinstance(self, LinearFunction):
if other == () or other == 0:
return self
if isinstance(other, LinearFunction):
return self.__class__([*list(self), *list(-other)])
if isinstance(other, ParameterRef):
other = LinearComponent(param=str(other))
if isinstance(other, LinearComponent):
result = self.__class__(self)
result.append(-other)
return result
from .linear_math import ParameterSubtract, _ParameterOp
if isinstance(other, (_ParameterOp, _Number)):
try:
return ParameterSubtract(self.as_pmath(), other)
except NotImplementedError:
pass
if isinstance(other, LinearFunction):
from .linear_math import ParameterSubtract, _ParameterOp
if isinstance(self, (_ParameterOp, _Number)):
try:
return ParameterSubtract(self, other.as_pmath())
except NotImplementedError:
pass
raise NotImplementedError(f"{_what_is(self)} + {_what_is(other)}")
def __mul__(self, other):
from .linear_math import ParameterMultiply, _ParameterOp
if isinstance(self, LinearFunction) and isinstance(other, _Number):
trial = LinearFunction()
for component in self:
trial.append(component * other)
return trial
if isinstance(self, LinearFunction) and isinstance(
other, (ParameterRef, _ParameterOp)
):
if isinstance(other, ParameterRef):
if other == _null_:
return self
trial = LinearFunction()
for component in self:
if component.param == _null_:
trial.append(component * other)
else:
trial = None
break
if trial is not None:
return trial
return ParameterMultiply(self.as_pmath(), other)
if isinstance(self, LinearFunction) and isinstance(other, LinearFunction):
return sum(i * j for i in self for j in other)
if isinstance(other, LinearFunction) and isinstance(self, _Number):
trial = LinearFunction()
for component in other:
trial.append(self * component)
return trial
if isinstance(other, LinearFunction) and isinstance(
self, (ParameterRef, _ParameterOp)
):
if isinstance(self, ParameterRef):
if self == _null_:
return other
trial = LinearFunction()
for component in other:
if component.param == _null_:
trial.append(self * component)
else:
trial = None
break
if trial is not None:
return trial
return ParameterMultiply(self, other.as_pmath())
try:
trial = LinearFunction()
for component in self:
trial.append(component * other)
return trial
except NotImplementedError:
return NotImplemented
def __truediv__(self, other):
from .linear_math import ParameterDivide, _ParameterOp
if isinstance(self, LinearFunction) and isinstance(
other, (ParameterRef, _ParameterOp)
):
return ParameterDivide(self.as_pmath(), other)
if isinstance(other, LinearFunction) and isinstance(
self, (ParameterRef, _ParameterOp)
):
return ParameterDivide(self, other.as_pmath())
if isinstance(self, LinearFunction) and isinstance(other, _Number):
try:
return ParameterDivide(self.as_pmath(), other)
except NotImplementedError:
pass
if isinstance(other, (DataRef, _Number)):
return LinearFunction([i / other for i in self])
return NotImplemented
def __contains__(self, val):
if isinstance(val, ParameterRef):
for i in self:
if i.param == val:
return True
return False
if isinstance(val, DataRef):
for i in self:
if i.data == val:
return True
return False
raise TypeError(
"the searched for content must be of type ParameterRef or DataRef"
)
def _index_of(self, val):
if isinstance(val, ParameterRef):
for n, i in enumerate(self):
if i.param == val:
return n
raise KeyError("ParameterRef not found")
if isinstance(val, DataRef):
for n, i in enumerate(self):
if i.data == val:
return n
raise KeyError("DataRef not found")
raise TypeError(
"the searched for content must be of type ParameterRef or DataRef"
)
def reformat_param(self, container=None, pattern=None, repl=None, **kwargs):
"""
Transform all the parameters in the LinearFunction.
Parameters
----------
container : str
A format string, into which the previous parameters are formatted.
Use this to append things to the parameter names.
pattern : str
repl : str
Passed to `re.sub` with each existing parameter as the base string
to be searched.
Examples
--------
>>> from larch.roles import P,X
>>> f = P.InVehTime * X.IVTT + P.OutOfVehTime * X.OVTT
>>> f1 = f.reformat_param('{}_Suffix')
>>> str(f1)
'(P.InVehTime_Suffix * X.IVTT) + (P.OutOfVehTime_Suffix * X.OVTT)'
>>> f2 = f.reformat_param(pattern='(Veh)', repl='Vehicle')
>>> str(f2)
'(P.InVehicleTime * X.IVTT) + (P.OutOfVehicleTime * X.OVTT)'
"""
import re
r = self.__class__()
for i in self:
if pattern is None:
param = i.param
else:
if repl is None:
raise TypeError("must give repl with pattern")
param = re.sub(pattern, repl, i.param, **kwargs)
if container is None:
container = "{}"
r += LinearComponent(
data=str(i.data), param=container.format(param), scale=i.scale
)
return r
def reformat_data(self, container=None, pattern=None, repl=None, **kwargs):
"""
Transform all the data in the LinearFunction.
Parameters
----------
container : str
A format string, into which the previous data strings are formatted.
Use this to apply common global transforms to the data.
pattern : str
repl : str
Passed to `re.sub` with each existing data string as the base string
to be searched.
"""
import re
r = self.__class__()
for i in self:
if pattern is None:
data = i.data
else:
if repl is None:
raise TypeError("must give repl with pattern")
data = re.sub(pattern, repl, i.data, **kwargs)
if container is None:
container = "{}"
r += LinearComponent(
data=container.format(data), param=str(i.param), scale=i.scale
)
return r
def __code__(self):
return " + ".join(f"({repr(i)})" for i in self)
def __eq__(self, other):
if not isinstance(other, LinearFunction):
return False
if len(self) != len(other):
return False
for i, j in zip(self, other):
if i != j:
return False
return True
def __repr__(self):
if len(self):
result = " + ".join(repr(i) for i in self)
if len(result) < 80:
return result
else:
return " " + result.replace(" + ", "\n+ ")
return f"<Empty {self.__class__.__name__}>"
def __xml__(
self,
linebreaks=True,
lineprefix="",
exponentiate_parameters=False,
resolve_parameters=None,
value_in_tooltips=True,
):
from xmle import Elem
x = Elem("div" if use_tooltips else "pre", attrib={"class": "LinearFunc"})
for n, i in enumerate(self):
ix_ = list(
i.__xml__(
exponentiate_parameter=exponentiate_parameters,
resolve_parameters=resolve_parameters,
value_in_tooltips=value_in_tooltips,
)
)
if linebreaks:
if n > 0 or lineprefix:
ix_.insert(0, Elem("br", tail=lineprefix + " + "))
else:
if n < len(self) - 1:
if ix_[-1].tail is None:
ix_[-1].tail = " + "
else:
ix_[-1].tail += " + "
for ii in ix_:
x << ii
if len(self) == 0:
x << Elem("span", text=repr(self))
return x
def _repr_html_(self):
return self.__xml__().tostring()
def data(self, cls=None):
if cls is None:
return [_.data for _ in self]
else:
return [cls(_.data) for _ in self]
def evaluate(
self, param_source, x_namespace=None, exp_params=False, **more_x_namespace
):
"""
Evaluate the linear function in the context of some parameters and data.
Typically all of the data given will be scalar values (to compute a
scalar result) or a single data item will be a vector of possible
values (to get a vector result).
Parameters
----------
param_source : Model-like
The source of the current parameter values.
x_namespace : dict, optional
A namespace of data values.
exp_params : bool, default False
Whether to take the exponential of parameters (i.e. for
a quantity function).
**more_x_namespace : any
More data values
Returns
-------
numeric or array-like
"""
if hasattr(param_source, "pvalue") and callable(param_source.pvalue):
param_source = param_source.pvalue
return sum(
j.evaluate(
param_source,
x_namespace=x_namespace,
exp_params=exp_params,
**more_x_namespace,
)
for j in self
)
def value(self, *args):
return self.as_pmath().value(*args)
def copy(self):
result = self.__class__(self)
return result
def __deepcopy__(self, memodict):
result = self.__class__()
import copy
for i in self:
result.append(copy.deepcopy(i, memodict))
return result
def _linear_plot_2d_data(
self, p_getter, x_name, x_min, x_max, n_points=100, **other_namespace
):
import numpy
if hasattr(self, "plotting_namespace") and len(other_namespace) == 0:
other_namespace = self.plotting_namespace
x = numpy.linspace(x_min, x_max, n_points)
y = self.evaluate(p_getter, {x_name: x}, **other_namespace)
return x, y
def linear_plot_2d(
self,
p_getter,
x_name,
x_min,
x_max,
n_points=100,
*,
xlabel=None,
svg=True,
header=None,
**other_namespace,
):
# Delayed evaluation mode...
if p_getter is None:
return lambda x: self.linear_plot_2d(
x,
x_name,
x_min,
x_max,
n_points=n_points,
xlabel=xlabel,
svg=svg,
header=header,
**other_namespace,
)
# Active evaluation mode...
from .._optional import pyplot as plt
plt.clf()
x, y = self._linear_plot_2d_data(
p_getter, x_name, x_min, x_max, n_points, **other_namespace
)
if hasattr(self, "plotting_label"):
plt.plot(x, y, label=self.plotting_label)
else:
plt.plot(x, y)
if xlabel is None:
plt.xlabel(x_name)
else:
plt.xlabel(xlabel)
plt.tight_layout(pad=0.5)
from ..util.plotting import plot_as_svg_xhtml
if svg is True:
svg = {}
if svg or svg == {}:
if header is not None:
svg["header"] = header
return plot_as_svg_xhtml(plt, **svg)
else:
plt.show()
def _inplot_linear_plot_2d(
self,
plt,
p_getter,
x_name,
x_min,
x_max,
n_points=100,
*,
xlabel=None,
svg=True,
header=None,
**other_namespace,
):
# Delayed evaluation mode...
if p_getter is None:
return lambda x: self._inplot_linear_plot_2d(
plt,
x,
x_name,
x_min,
x_max,
n_points=n_points,
xlabel=xlabel,
svg=svg,
header=header,
**other_namespace,
)
# Active evaluation mode...
x, y = self._linear_plot_2d_data(
p_getter, x_name, x_min, x_max, n_points, **other_namespace
)
if hasattr(self, "plotting_label"):
plt.plot(x, y, label=self.plotting_label)
else:
plt.plot(x, y)
def total_ordering_increasing(self):
from toolz.itertoolz import sliding_window
snaps = []
for windows_size in range(2, len(self) - 1):
for sub_p in sliding_window(windows_size, self):
snaps.append(sub_p[0].param.lessthan(sub_p[-1].param))
return snaps
def as_pmath(self):
for i in self:
if i.data != "1":
raise NotImplementedError(f"{type(self)} has non-unit data")
from .linear_math import ParameterAdd
if len(self) == 0:
return 0
elif len(self) == 1:
return self[0].as_pmath()
else:
x = ParameterAdd(self[0].as_pmath(), self[1].as_pmath())
for i in self[2:]:
x = ParameterAdd(x, i.as_pmath())
return x
class DictOfAlts(MutableMapping):
_instance = None
def __set_name__(self, owner, name):
self.name = name
self.private_name = "_private_" + name
def __fresh(self, instance):
newself = DictOfAlts(
alts_validator=self._alts_validator,
default_value=self._default_value,
value_validator=self._value_validator,
)
newself._instance = instance
setattr(instance, self.private_name, newself)
return newself
def __init__(
self,
mapping=None,
alts_validator=None,
default_value=1,
value_validator=lambda i: True,
**kwargs,
):
self._map = {}
self._value_validator = value_validator
self._alts_validator = alts_validator
self._default_value = default_value
if mapping is None:
mapping = {}
for k, v in mapping.items():
if not self._value_validator(v):
raise ValueError(f"bad value [{k}]: {v}")
self._map[k] = v
for k, v in kwargs.items():
self._map[k] = v
def __get__(self, instance, instancetype):
"""
Get attribute of instance.
Parameters
----------
instance : Any
Instance of parent class that has `self` as a member.
instancetype : class
Class of `instance`.
"""
if instance is None:
return self
try:
newself = getattr(instance, self.private_name)
except AttributeError:
newself = self.__fresh(instance)
if newself is None:
newself = self.__fresh(instance)
return newself
def __set__(self, instance, value):
try:
newself = getattr(instance, self.private_name)
except AttributeError:
newself = self.__fresh(instance)
if newself is None:
newself = self.__fresh(instance)
newself.__init__(
value,
alts_validator=self._alts_validator,
default_value=self._default_value,
value_validator=self._value_validator,
)
try:
newself._instance.mangle()
except AttributeError:
pass
def __delete__(self, instance):
self.__set__(instance, None)
def __getitem__(self, k):
try:
return self._map[k]
except KeyError:
if self._alts_validator is None or self._alts_validator(k):
v = self._map[k] = self._default_value
return v
else:
raise
def __setitem__(self, k, v):
if not self._value_validator(v):
raise ValueError(f"bad value [{k}]: {v}")
existing_value = self[k]
if existing_value != v:
self._map[k] = v
try:
self._instance.mangle()
except AttributeError:
pass
def __delitem__(self, key):
del self._map[key]
_try_mangle(self._instance)
def __iter__(self):
return iter(self._map.keys())
def __len__(self):
return len(self._map)
# def keys(self):
# return self._map.keys()
#
# def items(self):
# return self._map.items()
#
# def values(self):
# return self._map.values()
def copy(self):
return type(self)(self)
def __repr__(self):
return f"{type(self).__name__}({repr(self._map)})"
def to_dict(self):
return dict(self)
class DictOfLinearFunction:
_instance = None
def __init__(self, mapping=None, alts_validator=None, **kwargs):
self._map = {}
if mapping is None:
mapping = {}
for k, v in mapping.items():
self._map[k] = LinearFunction(v)
for k, v in kwargs.items():
try:
self._map[k] = LinearFunction(v)
except Exception:
print(v)
print(type(v))
raise
self._alts_validator = alts_validator
# self._instance = None
def __fresh(self, instance):
newself = DictOfLinearFunction()
newself._instance = instance
setattr(instance, self.private_name, newself)
return newself
def __get__(self, instance, instancetype):
"""
Get attribute of instance.
Parameters
----------
instance : Any
Instance of parent class that has `self` as a member.
instancetype : class
Class of `instance`.
"""
if instance is None:
return self
try:
newself = getattr(instance, self.private_name)
except AttributeError:
newself = self.__fresh(instance)
if newself is None:
newself = self.__fresh(instance)
return newself
def __set__(self, instance, values):
try:
newself = getattr(instance, self.private_name)
except AttributeError:
newself = self.__fresh(instance)
if newself is None:
newself = self.__fresh(instance)
newself.__init__(values)
# _try_mangle_h(newself)
try:
newself._instance.mangle()
except AttributeError:
pass # print(f"No Mangle L2: {err}")
else:
pass # print(f"Yes Mangle L2: {newself._instance}")
def __delete__(self, instance):
self.__set__(instance, None)
def __set_name__(self, owner, name):
self.name = name
self.private_name = "_" + name
def set_alts_validator(self, av):
self._alts_validator = av
def __getitem__(self, k):
try:
v = self._map[k]
v.set_instance(self._instance)
return v
except KeyError:
if self._alts_validator is None or self._alts_validator(k):
v = self._map[k] = LinearFunction()
v.set_instance(self._instance)
return v
else:
raise
def __setitem__(self, k, v):
if isinstance(v, ParameterRef):
v = v * DataRef("1")
if isinstance(v, int) and v == 0:
v = LinearFunction()
elif isinstance(v, LinearComponent):
v = LinearFunction([v])
elif isinstance(v, list):
v = LinearFunction(v)
elif not isinstance(v, LinearFunction):
raise TypeError(
f"values in {self.__class__} can only have type LinearFunction, not {type(v)}"
)
if "with error" in repr(v):
raise ValueError("found error here")
v.set_instance(self._instance)
self._map[k] = v
_try_mangle(self._instance)
def __delitem__(self, key):
del self._map[key]
_try_mangle(self._instance)
def __iter__(self):
return iter(self._map.keys())
def __len__(self):
return len(self._map)
def keys(self):
return self._map.keys()
def items(self):
return self._map.items()
def values(self):
return self._map.values()
def copy(self):
return type(self)(self)
def __repr__(self):
return f"{type(self).__name__}({repr(self._map)})"
def __xml__(self):
from xmle import Elem
x = Elem("div")
t = x.elem("table", style="margin-top:1px;")
t.elem(
"caption",
text=f"<larch.{self.__class__.__name__}>",
style="caption-side:top;text-align:left;font-family:Roboto;font-weight:700;font-style:normal;font-size:100%;padding:0px;",
)
if len(self):
tr = t.elem("tr")
tr.elem("th", text="alt")
tr.elem("th", text="formula")
for k, v in self._map.items():
tr = t.elem("tr")
tr.elem("td", text=str(k))
try:
v_ = v.__xml__()
except AttributeError:
tr.elem("td", text=str(v), style="text-align:left;")
else:
tr.elem("td", style="text-align:left;") << v_
else:
tr = t.elem("tr")
tr.elem("td", text="<empty>")
return x
def _repr_html_(self):
return self.__xml__().tostring()
P = Ref_Gen(ParameterRef)
X = Ref_Gen(DataRef)