## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## # SIMULATOR.py --- 'Plain' DEVS Model Simulator # -------------------------------- # Copyright (c) 2000 # Jean-Sébastien BOLDUC # Hans Vangheluwe # McGill University (Montréal) # -------------------------------- # Version 1.0 last modified: 01/11/01 ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## # # GENERAL NOTES AND REMARKS: # # Atomic- and coupled-DEVS specifications adapted from: # B.P.Zeigler, ''Theory of modeling and simulation: Integrating # Discrete Event and Continuous Complex Dynamic Systems '', # Academic Press, 2000 # # Description of message passing mechanism (to be completed) # ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## GLOBAL VARIABLES AND FUNCTIONS ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## # Necessary to send COPIES of messages. from copy import deepcopy # {\tt __VERBOSE__} switch allows to enter\dots well, ''verbose mode'' when set # to true. Useful for debugging (similar to {\tt #define _DEBUG_} convention in # {\tt C++}). __VERBOSE__ = 1 def Error(message = '', esc = 1): """Error-handling function: reports an error and exits interpreter if {\tt esc} evaluates to true. To be replaced later by exception-handling mechanism. """ from sys import exit, stderr stderr.write("ERROR: %s\n" % message) if esc: exit(1) ## SIMULATOR CLASSES ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## class AtomicSolver: """Simulator for atomic-DEVS. Atomic-DEVS can receive three types of messages: $(i,\,t)$, $(x,\,t)$ and $(*,\,t)$. The latter is the only one that triggers another message- sending, namely, $(y,\,t)$ is sent back to the parent coupled-DEVS. This is actually implemented as a return value (to be completed). """ ### def receive(self, aDEVS, msg): # For any received message, the time {\tt t} (time at which the message # is sent) is the second item in the list {\tt msg}. t = msg[1] # $(i,\,t)$ message --- sets origin of time at {\tt t}: if msg[0] == 0: aDEVS.timeLast = t - aDEVS.elapsed aDEVS.timeNext = aDEVS.timeLast + aDEVS.timeAdvance() # $(*,\,t)$ message --- triggers internal transition and returns # $(y,\,t)$ message for parent coupled-DEVS: elif msg[0] == 1: if t != aDEVS.timeNext: Error("Bad synchronization...", 1) # First call the output function, which (amongst other things) rebuilds # the output dictionnary {\tt myOutput}: aDEVS.myOutput = {} aDEVS.outputFnc() # Call the internal transition function to update the DEVS' state, and # update time variables: aDEVS.state = aDEVS.intTransition() aDEVS.timeLast = t aDEVS.timeNext = aDEVS.timeLast + aDEVS.timeAdvance() aDEVS.elapsed = 0. if __VERBOSE__: print "\n\tINTERNAL TRANSITION: %s" % (aDEVS.myID) print "\t New State: %s" % (aDEVS.state) print "\t Output Port Configuration:" for I in range(0, len(aDEVS.OPorts) ): if aDEVS.OPorts[I] in aDEVS.myOutput.keys(): print "\t port%d: %s at %s" % (I, aDEVS.myOutput[aDEVS.OPorts[I]], \ repr(aDEVS.myOutput[aDEVS.OPorts[I]])) else: print "\t port%d: None" % (I) print "\t Next scheduled internal transition at time %f" % aDEVS.timeNext # Return the DEVS' output to the parent coupled-DEVS (rather than # sending $(y,\,t)$ message). return aDEVS.myOutput # ${x,\,t)$ message --- triggers external transition, where $x$ is the # input dictionnary to the DEVS: elif type(msg[0]) == type({}): if not(aDEVS.timeLast <= t <= aDEVS.timeNext): Error("Bad synchronization...", 1) aDEVS.myInput = msg[0] # update elapsed time. This is necessary for the call to the external # transition function, which is used to update the DEVS' state. aDEVS.elapsed = t - aDEVS.timeLast aDEVS.state = aDEVS.extTransition() # Udpate time variables: aDEVS.timeLast = t aDEVS.timeNext = aDEVS.timeLast + aDEVS.timeAdvance() aDEVS.elapsed = 0 if __VERBOSE__: print "\n\tEXTERNAL TRANSITION: %s" % (aDEVS.myID) print "\t Input Port Configuration:" for I in range(0, len(aDEVS.IPorts) ): print "\t port%d: %s at %s" % (I, aDEVS.peek(aDEVS.IPorts[I]),\ repr(aDEVS.peek(aDEVS.IPorts[I]))) print "\t New State: %s" % (aDEVS.state) print "\t Next scheduled internal transition at time %f" % aDEVS.timeNext else: Error("Unrecognized message", 1) # =================================================================== # class CoupledSolver: """Simulator (coordinator) for coupled-DEVS. Coupled-DEVS can receive the same three types of messages as for atomic- DEVS, plus the $(y,\,t)$ message. The latter is implemented as a returned value rather than a message. This is possible because the $(y,\,t)$ message is always sent to a coupled-DEVS in response from its sending a $(*,\,t)$ message. (This implementation makes it possible to easily distinguish $(y,\,t)$ from $(x,\,t)$ messages... to be completed) Description of {\tt eventList} and {\tt dStar} (to be completed) """ ### def receive(self, cDEVS, msg): # For any received message, the time {\tt t} (time at which the message # is sent) is the second item in the list {\tt msg}. t = msg[1] # $(i,\,t)$ message --- sets origin of time at {\tt t}: if msg[0] == 0: # Rebuild event-list and update time variables, by sending the # initialization message to all the children of the coupled-DEVS. Note # that the event-list is not sorted here, but only when the list of # {\sl imminent children\/} is needed. Also note that {\tt None} is # defined as bigger than any number in Python (stands for $+\infty$). cDEVS.timeLast = 0. cDEVS.timeNext = 100000 cDEVS.eventList = [] for d in cDEVS.componentSet: self.send(d, msg) cDEVS.eventList.append( [d.timeNext, d] ) cDEVS.timeNext = min(cDEVS.timeNext, d.timeNext) cDEVS.timeLast = max(cDEVS.timeLast, d.timeLast) # $(*,\,t)$ message --- triggers internal transition and returns # $(y,\,t)$ message for parent coupled-DEVS: elif msg[0] == 1: if t != cDEVS.timeNext: Error("Bad synchronization...", 1) # Build the list {\tt immChildren} of {\sl imminent children\/} based # on the sorted event-list, and select the active-child {\tt dStar}. If # there are more than one imminent child, the coupled-DEVS {\tt select} # function is used to decide the active-child. cDEVS.eventList.sort() immChildren = [] for I in cDEVS.eventList: if I[0] == t: immChildren.append(I[1]) else: break if len(immChildren) == 1: dStar = immChildren[0] else: dStar = cDEVS.select(immChildren) if __VERBOSE__: print "\n\tCollision occured in %s, involving:" % (cDEVS.myID) for I in immChildren: print " \t %s" % (I.myID) print "\t select chooses %s" % (dStar.myID) # Send $(*,\,t)$ message to active children, which returns (or sends # back) message $(y,\,t)$. In the present implementation, just the # sub-DEVS output dictionnary $y$ is returned and stored in {\tt Y}: Y = self.send(dStar, msg) # Scan the list of sub-DEVS to find those influenced by (connected to) # the active child. To these sub-DEVS a message $(x,\,t)$ is sent. The # generation of the input dictionnary {\tt X} relies on ports' # {\tt inLine} attributes rather than on coupled-DEVS' {\tt IC}. The # coupled-DEVS time variables are also updated in parallel. This piece # of code, although functionnal, could be far more elegant # and efficient\dots cDEVS.timeLast = t cDEVS.timeNext = 100000 cDEVS.eventList = [] for d in cDEVS.componentSet: X = {} for p in d.IPorts: for pi in p.inLine: if pi in Y.keys(): X[p] = deepcopy(Y[pi]) # only if atomic DEVS? break # if p.inLine in Y.keys(): # X[p] = Y[p.inLine] if X != {}: self.send(d, [X, t]) cDEVS.eventList.append( [d.timeNext, d] ) cDEVS.timeNext = min(cDEVS.timeNext, d.timeNext) # Use same pattern as above to update coupled-DEVS output dictionnary # (once again using ports' {\tt inLine} attributes rather than # coupled-DEVS' {\tt EOC}). The DEVS' output is then returned to its # parent coupled-DEVS (rather than being sent). cDEVS.myOutput = {} for p in cDEVS.OPorts: for pi in p.inLine: if pi in Y.keys(): cDEVS.myOutput[p] = Y[pi] break # if p.inLine in Y.keys(): # cDEVS.myOutput[p] = Y[p.inLine] return cDEVS.myOutput # ${x,\,t)$ message --- triggers external transition, where $x$ is the # input dictionnary to the DEVS: elif type(msg[0]) == type({}): if not(cDEVS.timeLast <= t <= cDEVS.timeNext): Error("Bad synchronization...", 1) cDEVS.myInput = msg[0] # Send $(x,\,t)$ messages to those sub-DEVS influenced by the coupled- # DEVS input ports (ref. {\tt EIC}). This is essentially the same code # as above that also updates the coupled-DEVS' time variables in # parallel. cDEVS.timeLast = t cDEVS.timeNext = 100000 cDEVS.eventList = [] for d in cDEVS.componentSet: X = {} for p in d.IPorts: for pi in p.inLine: if pi in cDEVS.myInput.keys(): X[p] = cDEVS.myInput[pi] break # if p.inLine in cDEVS.myInput.keys(): # X[p] = cDEVS.myInput[p.inLine] if X != {}: self.send(d, [X, t]) cDEVS.eventList.append( [d.timeNext, d] ) cDEVS.timeNext = min(cDEVS.timeNext, d.timeNext) else: Error("Unrecognized message", 1) # =================================================================== # class Simulator(AtomicSolver, CoupledSolver): """Associates a hierarchical DEVS model with the simulation engine. """ ### def __init__(self, model): """Constructor. {\tt model} is an instance of a valid hierarchical DEVS model. The constructor stores a local reference to this model and augments it with {\sl time variables\/} required by the simulator. """ self.model = model self.augment(self.model) ### def augment(self, d): """Recusively augment model {\tt d} with {\sl time variables\/}. """ # {\tt timeLast} holds the simulation time when the last event occured # within the given DEVS, and (\tt timeNext} holds the scheduled time for # the next event. d.timeLast = d.timeNext = 0. if d.type() == 'COUPLED': # {\tt eventList} is the list of pairs $(tn_d,\,d)$, where $d$ is a # reference to a sub-model of the coupled-DEVS and $tn_d$ is $d$'s # time of next event. d.eventList = [] for subd in d.componentSet: self.augment(subd) ### def send(self, d, msg): """Dispatch messages to the right method. """ if d.type() == 'COUPLED': return CoupledSolver.receive(self, d, msg) elif d.type() == 'ATOMIC': return AtomicSolver.receive(self, d, msg) ### def simulate(self, T=100): """Simulate the model (Root-Coordinator). In this implementation, the simulation stops only when the simulation time (stored in {\tt clock}) reaches {\tt T} (starts at 0). Other means to stop the simulation ({\it e.g.}, when an atomic-DEVS enters a given state, or when a global variable reaches a predefined value) will be implemented. """ # Initialize the model --- set the simulation clock to 0. self.send(self.model, (0, 0)) clock = self.model.timeNext # Main loop repeatedly sends $(*,\,t)$ messages to the model's root # DEVS. while clock <= T: print "\n", "* " * 10, "CLOCK: %f" % clock, "\n" self.send(self.model, (1, clock)) if __VERBOSE__: print "\n\tROOT DEVS' OUTPUT PORT CONFIGURATION:" for I in range(0, len(self.model.OPorts) ): if self.model.OPorts[I] in self.model.myOutput.keys(): print "\t port%d: %s at %s" % (I, \ self.model.myOutput[self.model.OPorts[I]], \ repr(self.model.myOutput[self.model.OPorts[I]])) else: print "\t port%d: None" % (I) clock = self.model.timeNext