Source code for core.agent

from itertools import chain
from pathlib import Path

from numpy import prod
from sympy import Eq

from core.deserialiser import read_model_from_tex
from core.ecomod_utils import deriv_degree, spec_funcs, generate_symbols, span, eq2func, euler_mask, \
    transversality_mask, KKT_mask, latexify, add_subscript, is_substricted, is_spec_function, span_dict, gradient
from core.errors.RWErrors import TimeVariableNotFound, AnyPropertyNotFound, \
    DimensionCheckingFailed, ExtraVariableError, ObjectiveFunctionNotFound, NonSympyfiableError, NoSuchFlow
from core.logger import log
from core.market import Flow
from templates.pprint import AgentTemplateEngine, exec_tex
from core.sympyfier import ecomodify, ecomodify_
from core.utils import iterable_substract, timeit, set_equality, get_root_dir

from typing import Union, List, Dict

project_path = get_root_dir()


[docs]class AgentValidator(object): """ Validation class for Agent class. Provides Agent validation and correction to be processed by ECOMOD Core. Methods: Private: 1. __dimension_check Uses KV-storage gained from input Agent model with variables -> dimensions to proceed Dimension check in Agent equations and inequations (expressions). 2. __variable_completeness Checks if there are no extra initialized Agent variables in model, which are unused in model Expressions and if there are uninitialized variables in model. Open: 1. validate Provides full blackbox checks + emitent check Class variables: emitent = None: bool Shows that this Validation process is for Simple Agents -- with objective functional. """ emitent = None @staticmethod def __dimension_check(dim_dict: Union[List, Dict], exprs: List): """ Algorithm: 1. for e in exprs: 2. case when any Arg of Analytical Func then Arg -> SpecStack 3. e.subs(Arg, 1) 4. -> checker(e) 5. else 6. continue 7. for s in SpecStack: 8. -> checker(s) :param dim_dict: Union[List, Dict] -- KV-storage: variable -> dimension :param exprs: List[Expr, Eq, ...] -- List of Agent model expressions :return: None if no Errors else: Union[DimensionCheckingFailed, AnyError] """ from sympy import Symbol, Integral, Derivative, Eq from numpy.random import rand def checker(_expr, _casted_dict, _randomize_dict, _spec_stack): """ RDS Algorithm: 1. Generate random coefficients for each variable: Dict(rc). -> randomizeDict: variable[i] -> rc[i] * variable[i] 2. Replace Integrals and Differential to product and fraction respectively 3. Substitute expr variables with help of Dict(rc) :param _expr: [Expr, Eq, ...] -- Expression to be observed in dimension checking. :param _casted_dict: Dict -- additional internal representation :param _randomize_dict: Dict -- KV-storage variable -> rand_coef * variable. Special heuristic to proceed dimension check. :param _spec_stack: List[Expr] -- storage for Expr arguments inside Pure analytical functions. :return: bool -- True|False -- result of RDS (Randomized Dimension Substitution) """ # remove spec functions specs = _expr.find(lambda x: x.__class__ in spec_funcs()) _spec_stack.extend(specs) _expr = _expr.xreplace({s: 1 for s in specs}) # cast all to dummies _expr = _expr.replace(lambda x: x.is_Function or x.is_symbol, lambda x: Symbol(x.name)) # deal with Integrals and Derivatives _expr = _expr.replace(lambda x: isinstance(x, Integral), lambda x: x.args[0] * x.args[1][0]) _expr = _expr.replace(lambda x: isinstance(x, Derivative), lambda x: x.args[0] / x.args[1][0]) # subs # randomize all _expr = _expr.copy() _expr = _expr.subs(_randomize_dict) _expr = (_expr.lhs / _expr.rhs).subs(_casted_dict) return _expr.is_Number errors = [] spec_stack = [] random_coef = rand(len(dim_dict, )) casted_dict = {Symbol(kv[0].name): kv[1] * random_coef[i] for i, kv in enumerate(dim_dict.items())} randomize_dict = {Symbol(kv[0].name): Symbol(kv[0].name) * random_coef[i] for i, kv in enumerate(dim_dict.items())} for expr in exprs: expr_copy = expr.copy() if expr.rhs == 0: from sympy import exp spec_stack.append(exp(expr.lhs)) continue if not checker(expr, casted_dict, randomize_dict, spec_stack): errors.append(expr_copy) # spec stack check spec_stack = [Eq(ss.args[0], 1) for ss in spec_stack] for ss in spec_stack: if not checker(ss, casted_dict, randomize_dict, spec_stack): errors.append(ss) if errors: report_str = '\n'.join([i.__str__() for i in errors]) raise DimensionCheckingFailed(expr=report_str) @staticmethod def __variable_completeness(objectives, inequations, equations, functions, params): """ Simple variable existing check. ( .atoms method) :param objectives: Agent objectives functions :param inequations: Agent inequation boundaries :param equations: Agent equation boundaries :param functions: Agent phase variables, controls and exogenous constants (but those are functions of time) :param params: Agent not-functional variables :return: """ from sympy import Function, Number from sympy import Symbol # compatibility testing # func porting tests # ----------- UNCOMMENT ------------------------- fs_all_ = set(chain(*[eq.free_symbols.union([f.simplify() for f in eq.atoms(Function)]) for eq in equations + inequations + objectives])) test1 = iterable_substract(set([i.func for i in fs_all_ if i.func]), spec_funcs()) test2 = set([i.func for i in params + functions if i.func]) if not set_equality(test1, test2): print(test1, test2) raise NonSympyfiableError(err=f'{test1.__str__()} vs {test2.__str__()}') # args porting tests test1 = set( chain(*[i.args if prod([k.is_Function or k.is_symbol for k in i.args]) else i.atoms() for i in fs_all_])) test2 = set([i for i in params + functions]) numbersDOTtk = test1 - test2 # cicada meme if prod([issubclass(i.__class__, Number) for i in numbersDOTtk]) == 0: raise NonSympyfiableError(err=f'{test1.__str__()} vs {test2.__str__()}') # completeness completion_names = set(j.name for j in chain(*[i.atoms(Function).union(i.atoms(Symbol)) for i in fs_all_]) if not is_spec_function(j)) inited_names = set(j.name for j in functions + params) if completion_names != inited_names: raise ExtraVariableError(vars=completion_names - inited_names) # ---------------------UNCOMMENT-----------------
[docs] def validate(self, objectives, inequations, equations, functions, params, dim_dict): """ See __variable_completeness and __dimension_check docstrings. :param objectives: Agent objectives functions :param inequations: Agent inequation boundaries :param equations: Agent equation boundaries :param functions: Agent phase variables, controls and exogenous constants (but those are functions of time) :param params: Agent not-functional variables :param dim_dict: Union[List, Dict] -- KV-storage: variable -> dimension :return: Union[AnyError, NonSympyfiableError, ExtraVariableError, DimensionCheckingFailed] """ if self.emitent is False: if not objectives: raise ObjectiveFunctionNotFound() # STEP 1: check completeness self.__variable_completeness(objectives, inequations, equations, functions, params) # STEP 2: dimension check if dim_dict: self.__dimension_check(dim_dict, equations + inequations + objectives)
[docs]class AbstractAgent(AgentValidator): """ Core methods for Agent models. Methods: Constructors: 1. __init__: Init Agent from its parts: see method args. 2. read_from_tex: Parse Agent model from .tex file written in YAML (json-like) format. See examples at `/models/inputs/`. Private: 1. __generate_duals: Generate dual variables and functions due to Lagrange principum. Public: Additional: 1. Support system Methods that helps system to understand which variables are phase, which are controls etc. 1. time : extract time variable in agent model 2. time_horizon : extract time horizon boundaries from integral part of objective functions 3. transitions : extract first-order differential equations from all boundaries 4. phases : extract phase variables from all functions 5. external : extract exogenous constants from all functions 6. controls : extract control variables from all functions 7. variables : extract all model variable (not-functions) 8. expr : extract all expressions (with objectives) 9. boundaries : all equations and inequations (no differential equations) 10. constant_ineqs : inequations with no terminal values 11. diff_degree : Divide all expressions over differential equation degrees 12. validate : provide Agent model validation from subclass 2. Misc Basic class methods to provide comfort. 1. args : Args to re-init 2. kwargs : Kwargs to re-init 3. process : Pre-process model to be used in core methods 4. compress : Evaluate all Pontragin Principle conditions 5. dump : Process model to PDF file with optimal conditions (Pontryagin Principle) [compress + dump] 3. Core Methods that conduct Maximum Principle Conditions due to Lagrange principum. 1. Lagrangian : integral part of L 2. lagrangian : termination part of L 3. euler_equations : Euler-Lagrange equations 4. transversality_conditions : Transversality conditions 5. control_optimality : Control optimality conditions 6. KKT: Dual-feasibility and Complementary Slackness conditions """ emitent = False def __init__(self, name='', objectives=None, inequations=None, equations=None, functions=None, params=None, dim_dict=None): if dim_dict is None: dim_dict = {} if objectives is None: objectives = [] if inequations is None: inequations = [] if equations is None: equations = [] if functions is None: functions = [] if params is None: params = [] self.params = params self.functions = functions self.equations = equations self.inequations = inequations self.objectives = objectives self.dim_dict = dim_dict self.name = name # dual variables k:v where k = real phase or ineq constraint, v = dual self.lambdas = {} # for conjugate arbitrary constants self.duals = {} # for conjugate functions # linking part) self.links = [] self.processed = False @log(comment='Agent ready for analysis') def __generate_duals(self): """ Lambdas -- dual variables, conducted by count of Len(chain(objectives, boundaries, constant_ineqs)) Alphas -- dual functions, conducted by count of Len(chain(transitions, [inequations - constant_ineqs] + boundaries)) :return: self.lambdas -- List[Symbol] self.alphas -- List[Function] """ from sympy import Symbol, Function # step 1: create lambdas lambda_factor = self.objectives + self.boundaries + self.constant_ineqs lambdas_count = len(lambda_factor) self.lambdas = { (eq2func(lambda_factor[i]) if lambda_factor[i] not in self.objectives else lambda_factor[i].args[1]): v for i, v in enumerate(generate_symbols(tag='lambda', count=lambdas_count, cls=Symbol))} # step 2: create conjugates funcs (duals) alpha_factors = self.transitions + iterable_substract(self.inequations, self.constant_ineqs) + self.boundaries alpha_count = len(alpha_factors) self.duals = {eq2func(alpha_factors[i]): d(self.time) for i, d in enumerate(generate_symbols(tag='alpha', count=alpha_count, cls=Function))} @property def time(self): """ Extract time variable from: 1. Integral in objectives 2. Differential variable in boundaries :return: Union[Symbol, TimeVariableNotFound] """ # two ways: integration argument, under derivative from sympy import Integral # method 1 try: ret1 = self.objectives[0].find(Integral).pop().args[1][0] except: ret1 = None # method 2 try: from sympy import Derivative ret2 = [eq.find(Derivative).pop().args[1][0] for eq, deg in self.diff_degree(deg=1).items() if deg == 1][0] except: ret2 = None if not ret1 and not ret2: raise TimeVariableNotFound() return ret1 if ret1 is not None else ret2 @property def time_horizon(self): """ If time is extracted from objective integrals then collect integral boundaries. :return: Union[Tuple, AnyPropertyNotFound] """ from sympy import Integral # method 1 try: ret = self.objectives[0].find(Integral).pop().args[1][1:] return ret except: raise AnyPropertyNotFound(attr='Time Horizon') @property def transitions(self): """ Extract linear differential equations from model. :return: List[Eq] """ # linear Differential equations in model return [eq for eq in self.diff_degree(deg=1).keys()] @property def phases(self): """ Extract phase variables from model: functions which appears as arguments in Differential operators in transitions :return: Union[List[Symbol], AnyPropertyNotFound] """ from sympy import Derivative derivas = set(chain(*[d.find(Derivative) for d in [t for t in self.transitions]])) phases = [d.args[0] for d in derivas] indicator = prod([(p in self.functions) for p in phases]) if indicator != 1: missing_var = iterable_substract(phases, self.functions) missing_transition = iterable_substract(self.functions, phases) if missing_var: raise AnyPropertyNotFound(attr=missing_var) if missing_transition: raise AnyPropertyNotFound(attr=missing_transition) return phases @property def external(self): """ Extract exogenous functions, they must be substricted :return: List[Symbol] """ return [ext for ext in self.functions if (is_substricted(ext) and not is_substricted(ext, tag=self.name))] @property def controls(self): """ Extract all controls: controls = functions - external - phases :return: List[Symbol] """ return iterable_substract(self.functions, self.phases + self.external) @property def expr(self): """ All expressions in model :return: List[Expr] """ return self.equations + self.inequations + self.objectives @property def variables(self): """ All symbols in model :return: List[Symbol] """ return self.functions + self.params @property def boundaries(self): """ Expressions with these types: 1. Differential degree == 0 2. Constant inequalities :return: List[Expr] """ # Equality types (deg=0) and constant ineqs return [*self.diff_degree(deg=0).keys()] @property def constant_ineqs(self): """ Constant inequalities: those which contains only functions in terminant (constant) values. :return: """ return [i for i in self.inequations if self.time not in i.free_symbols] @property def Lagrangian(self): """ Intergal part of L :return: Expr """ from sympy import Integral return span_dict(self.duals) + span_dict( {k: v for k, v in self.lambdas.items() if k in [i.args[1] for i in self.objectives]}).replace( lambda x: isinstance(x, Integral), lambda x: x.args[0]) @property def lagrangian(self): """ Terminant part of L :return: Expr """ return span_dict({k: v for k, v in self.lambdas.items() if k not in [i.args[1] for i in self.objectives]}) @property def Hamiltonian_Gamkrelidze(self): """not tested yet""" from sympy import Symbol mu = Symbol('\mu') # return span_dict(self.duals) - mu * gradient(self.inequations) raise NotImplementedError('Not implemented yet') @property def kwargs(self): """ Kwargs, to re-init :return: """ return { 'name': self.name, 'objectives': self.objectives, 'inequations': self.inequations, 'equations': self.equations, 'functions': self.functions, 'params': self.params, 'dim_dict': self.dim_dict } @property def args(self): """ Args, to re-init :return: """ return self.objectives, self.inequations, self.equations, self.functions, self.params, self.dim_dict # ECOMOD CORE SOFT
[docs] def euler_equations(self): """ Euler Equations: d/dt L_{x'} = L_{x} :return: List[Eq] """ ret = [] for x in self.phases: ret.append(euler_mask(self.Lagrangian, x, self.time)) return ret
[docs] def transversality_conditions(self): """ Transversality conditions: L_{x'}(t_i) = (-1)^i l_{x(t_i)} :return: List[Eq] """ ret = [] for x in self.phases: # for all t_0, t_1 ret.extend(transversality_mask(self.Lagrangian, x, self.time, self.lagrangian, *self.time_horizon)) return ret
[docs] def control_optimality(self): """ Control optimality conditions, using smoothness of L function wrt control variables :return: List[Eq] """ from sympy import Eq return [Eq(self.Lagrangian.diff(c), 0) for c in self.controls]
[docs] def KKT(self): """ Dual feasibility and Complementary slackness conditions :return: List[Expr] """ _duals = {k: v for k, v in self.duals.items() if k not in [eq2func(e) for e in self.transitions]} _lambdas = {k: v for k, v in self.lambdas.items() if k not in [o.rhs for o in self.objectives]} return KKT_mask(_duals) + KKT_mask(_lambdas)
[docs] def control_boundaries(self): from sympy.core.numbers import Zero from sympy import Derivative return [i for i in self.equations + self.inequations if sum([issubclass(type(eq2func(i).diff(j)), Zero) for j in self.controls]) == 0 and not i.find(Derivative)]
[docs] def phase_boundaries(self): from sympy.core.numbers import Zero from sympy import Derivative return [i for i in self.equations + self.inequations if sum([issubclass(type(eq2func(i).diff(j)), Zero) for j in self.phases]) == 0 and not i.find(Derivative)]
[docs] def regularity_conditions(self): if len(self.phase_boundaries()) != 1: return AnyPropertyNotFound(attr='More than one phase boundary. Not supported.') return [Eq( span(gradient(self.phase_boundaries()[0], self.phases), [i.rhs for i in self.transitions]), 0 )]
[docs] def diff_degree(self, deg=None): """ Returns all expr with Derivative degree == deg. :param deg: int. Derivative degree of returning equations :return: Dict[Expr->deg] if None else Dict[Expr->`deg`] """ ret = {} for eq in self.equations: ret[eq] = deriv_degree(eq) # casting values to int ret = {k: int(v.__str__()) for k, v in ret.items()} if deg is not None: ret = [eq for eq, d in ret.items() if d == deg] return {k: deg for k in ret} return ret
[docs] @classmethod @log(comment='\nParsing new agent file.') def read_from_tex(cls, f): """ Contructor from .tex files. See Knowledge base to get guidelines. :param f: filename or fd :return: Agent """ from pathlib import Path name = Path(f).stem header, raw_model = read_model_from_tex(f) model = ecomodify_(raw_model) return cls(name, *model)
[docs] @log(comment='Agent ready for economic processing') def process(self, skip_validation=False): """ Process model to be used in core methods. :param skip_validation: bool -- If True validation process will be skipped :return: Union[None, AnyError] """ if not skip_validation: self.validate(*self.args) self.__generate_duals() self.processed = True
[docs] def compress(self, to_tex=False, headers=True): """ :param to_tex: bool. If True -- .tex file will be generated :param headers: If True -- .tex file will contain headers :return: Union[None, RenderedJinjaTemplate] """ ret = { "PHASES": latexify(self.phases, to_str=True), # because we render in one line "CONTROLS": latexify(self.controls, to_str=True), # same "INFOS": latexify(self.external, to_str=True), # same "EULERS": latexify(self.euler_equations()), "OPTIMAS": latexify(self.control_optimality()), "TRANSVERS": latexify(self.transversality_conditions()), "KKT": latexify(self.KKT()), # "GRC": latexify(self.regularity_conditions()), "DUALS": latexify([*self.duals.keys()], to_str=True), "DUALS_MAP": latexify([*self.duals.values()], to_str=True) } if not to_tex: return ret else: engine = AgentTemplateEngine() if headers: return engine.render(ret) return engine.render(ret).split(r'\begin{document}')[1].split(r'\end{document}')[0]
[docs] @log(comment='Dumping agent file') def dump(self, destination=None, is_absolute=False): """ Create .tex and PDF files with Agent optimal conditions. :param destination: filepath or fd. :return: PDF, .tex files saved in `destination` """ if not destination: destination = '.' engine = AgentTemplateEngine() engine.render(self.compress()) if is_absolute: tex_directorypath = Path(project_path) / destination / self.name else: tex_directorypath = Path(destination) / self.name tex_filepath = (tex_directorypath / self.name).with_suffix('.tex') # Due to bug with .absolute() method in Pathlib # tex_directorypath = Path(str(tex_directorypath.cwd()) + str(tex_directorypath)) # tex_filepath = Path(str(tex_filepath.cwd()) + str(tex_filepath)) engine.dump(tex_filepath) exec_tex(tex_filepath, tex_directorypath)
[docs]class LinkedAgent(AbstractAgent): """ Methods: Constructors: 1. __init__ Basic contructor 2. from_abstract Init LinkedAgent from Agent instance Private: 1. __merge_prepare Use Agent model output to gain tagged Agent model output. Public: 1. add_flow 2. delete_flow 3. print_flows """ emitent = False def __init__(self, *args): super().__init__(*args) self.flows = [] self.__merge_prepare() def __merge_prepare(self): """ Private method to tag all agent variables with agent name. All core methods are inherited from parent class. :return: """ # gaining tagged system merge_map = {symb: add_subscript(symb, self.name) for symb in self.phases + self.controls} merge_map_t0 = { f.subs(self.time, self.time_horizon[0]): add_subscript(f.subs(self.time, self.time_horizon[0]), self.name) for f in self.functions} merge_map_t1 = { f.subs(self.time, self.time_horizon[1]): add_subscript(f.subs(self.time, self.time_horizon[1]), self.name) for f in self.functions} merge_map = merge_map | merge_map_t0 | merge_map_t1 new_kwargs = {} for k, v in self.kwargs.items(): if k != 'name' and k != 'dim_dict': new_kwargs[k] = [expr.xreplace(merge_map) for expr in v] self.__dict__.update(new_kwargs)
[docs] def add_flow(self, flow: Flow): if flow.receiver != self and flow.producer != self: print('This flow do not affect this agent') else: self.flows.append(flow)
[docs] def delete_flow(self, flow: Flow): try: self.flows.remove(flow) except ValueError: raise NoSuchFlow(flow=flow.__str__(), agent=self.name)
[docs] def print_flows(self): return "\n".join([f'[{flow}]' for flow in self.flows])
[docs] @staticmethod def from_abstract(a: AbstractAgent): kwargs = a.kwargs return LinkedAgent(*kwargs.values())
[docs]def create_empty_agents(names, cls=AbstractAgent): """ Additional function to provide test cases in `scenarios/debug_scenarios.py` :param names: Agent tags separated by space ' '. :param cls: Default=AbstractAgent. Class for returned agents. :return: List[Union[AbstractAgent, LinkedAgent]] """ names = names.split(' ') return [cls(name) for name in names] if len(names) != 1 else cls(names)
if __name__ == "__main__": @timeit def main(): f = '../inputs/agent.tex' A = LinkedAgent.read_from_tex(f) A.process() B = LinkedAgent.from_abstract(create_empty_agents('B')) C = LinkedAgent.from_abstract(create_empty_agents('C')) A.add_flow(Flow(A, C, 3, 'rub')) B.add_flow(Flow(A, B, 6, 'tv')) print(A.__class__()) print(A.name) print(A.Lagrangian) print(A.lagrangian) print(A.euler_equations()) print(A.transversality_conditions()) print(A.control_optimality()) print(A.KKT()) print(A.print_flows()) print('Done')