# ====================================================================== # # FTPCS-maximum.py # Fall 2004 MZia SMustafiz # COMP 522 - Final Project # Fault tolerant pump control system # ====================================================================== # # Import code for model representation and simulation: from DEVS import * from Simulator import * # Import for a uniform random number generator from whrandom import * # ====================================================================== # ## ACTUAL METHANE READING GENERATOR ATOMIC DEVS # ====================================================================== # class actualRGeneratorState: def __init__(self,mode): self.mode = mode def __str__(self): return self.mode class actualRGenerator(AtomicDEVS): """ A class to generate the actual methane reading to be sent to the Methane Sensor coupled devs. An actual reading is sent every 2 seconds. """ actualMethReading = 0 actualCMReading = 0 actualAFthReading = 0 def __init__(self): AtomicDEVS.__init__(self) # initial state of the generator is IDLE. self.state = actualRGeneratorState(mode = 'IDLE') # one output port connected to the MethaneCDEVS self.amrOUT = self.addOutPort() def intTransition(self): if(self.state.mode == 'IDLE'): self.state.mode = 'GENERATING' actualRGenerator.actualMethReading = randint(0, 10) actualRGenerator.actualCOReading = randint(0, 10) actualRGenerator.actualAFReading = randint(0, 10) else: self.state.mode = 'IDLE' print("generator is in mode %s")%self.state.mode return self.state def ouputFnc(self): pass def timeAdvance(self): if(self.state.mode == 'IDLE'): return generation_time print("in idle mode, so next generation in 1.9") else: print("in gen mode, so go idle in 0 sec") return 0 # ====================================================================== # ## METHANE ATOMIC DEVS # ====================================================================== # class MethaneSensorState: """ A class to hold the state of the methane sensor. The sensor may either be READING the concentration of methane in the environment, or IDLE between readings. Readings must occur every 2 seconds. """ def __init__(self, mode): """ The methane sensor's initial state is IDLE """ self.mode = mode def __str__(self): return "\n mode = %s" % self.mode class MethaneSensor(AtomicDEVS): falseMethReading = 0 def __init__(self): # Always call parent class' constructor first: AtomicDEVS.__init__(self) # add one output port - to send the methane reading self.mrOUT = self.addOutPort() # set initial state of the methane sensor self.state = MethaneSensorState(mode = 'IDLE') def intTransition(self): if(self.state.mode == 'IDLE'): self.state.mode = 'READING' else: self.state.mode = 'IDLE' return self.state def outputFnc(self): if(self.state.mode == 'IDLE'): MethaneSensor.falseMethReading = randint (0,10) # the sensor has a 10% probability of failing noisy, #that is of providing an incorrect reading of the environment. decision = uniform (0,99) if(decision < 10.0): NFTRELIABILITY.append('F') self.poke(self.mrOUT, MethaneSensor.falseMethReading) if((MethaneSensor.falseMethReading < 7) and (actualRGenerator.actualMethReading >=7)): SAFETY.append('F') else: SAFETY.append('-') SAFTIME.append(S.model.timeNext -1) else: NFTRELIABILITY.append('-') self.poke(self.mrOUT, actualRGenerator.actualMethReading) SAFETY.append('-') SAFTIME.append(S.model.timeNext -1) def timeAdvance(self): if(self.state.mode == 'IDLE'): return methReadingTime else: return 0 # ====================================================================== # ## METHANE VOTER ATOMIC DEVS # ====================================================================== # class MethaneVoterState: """ A class to hold the state of the methane sensor voter. The sensor may either be VOTING on the concentration of methane in the environment, or IDLE between reception of readings. """ def __init__(self, mode): """ The methane sensor voter's initial state is IDLE """ self.mode = mode def __str__(self): return "\n mode = %s" % self.mode class MethaneVoter(AtomicDEVS): def __init__(self): # Always call parent class' constructor first: AtomicDEVS.__init__(self) # add an input port for each methane sensor reading received self.vIN1 = self.addInPort() self.vIN2 = self.addInPort() self.vIN3 = self.addInPort() # add one output port - to send the methane reading voted upon self.vOUT = self.addOutPort() # set initial state of the methane sensor self.state = 'IDLE' print("initial state of voter %s")%self.state # boolean to keep track of which sensors have sent their readings to the voter self.receivedR1 = False self.receivedR2 = False self.receivedR3 = False def intTransition(self): print("the inttran state is:") print self.state if(self.state == 'VOTING'): self.state = 'IDLE' print("voter is in IDLE state") return self.state def extTransition (self): print ("In external transition of methaneVoter") #received methane reading from sensor 1 mr1 = self.peek (self.vIN1) if(mr1 != None): self.mr1 = mr1 self.receivedR1 = True self.state = 'R1' print("in VOTER received reading 1") print("the reading is %d")%self.mr1 print self.state #received methane reading from sensor 2 mr2 = self.peek (self.vIN2) if(mr2 != None): self.mr2 = mr2 self.receivedR2 = True self.state = 'R2' print("in VOTER received reading 2") print("the reading is %d")%self.mr2 print self.state #received methane reading from sensor 3 mr3 = self.peek (self.vIN3) if(mr3 != None): self.mr3 = mr3 self.receivedR3 = True self.state = 'R3' print("in VOTER received reading 3") print("the reading is %d")%self.mr3 print self.state if((self.receivedR1 == True) and (self.receivedR2 == True) and (self.receivedR3 == True)): print("in voter, received all 3 reading") self.state = 'VOTING' print("so changed the state to %s")%self.state return self.state def outputFnc(self): if(self.state == 'VOTING'): print("i am going from voting to idle now...") print self.mr1 print self.mr2 print self.mr3 result = max (self.mr1, self.mr2, self.mr3) self.receivedR1 = False self.receivedR2 = False self.receivedR3 = False self.poke(self.vOUT, result) print("i voted on %s")%result actual_mr = actualRGenerator.actualMethReading if(result != actual_mr): RELIABILITY.append('F') RELTIME.append(S.model.timeNext -1) file2.write(str('F,')) file2.write(str(S.model.timeNext -1)) file2.write('\n') else: RELIABILITY.append('-') RELTIME.append(S.model.timeNext -1) file2.write(str('-,')) file2.write(str(S.model.timeNext -1)) file2.write('\n') if ((result < 7) and (actual_mr >=7)): VSAFETY.append('F') file1.write('F,') else: VSAFETY.append('-') file1.write('-,') VSAFTIME.append(S.model.timeNext-1) file1.write(str(S.model.timeNext -1)) file1.write('\n') def timeAdvance(self): print self.state if(self.state != 'VOTING'): print("voter in infinity") return INFINITY else: print("voter in zero") return voter_delay # ====================================================================== # ## METHANE COUPLED DEVS # ====================================================================== # class MethaneCDEVS(CoupledDEVS): """Methane Sensors Coupled-DEVS """ def __init__(self): # Always call parent class' constructor FIRST: CoupledDEVS.__init__(self) # PORTS: self.voteOUT = self.addOutPort() # SUB-MODELS: # Declaring three sub models of type MethaneSensor (for triple modular redundancy of sensors) and a voter sub model self.MS1 = self.addSubModel(MethaneSensor()) self.MS2 = self.addSubModel(MethaneSensor()) self.MS3 = self.addSubModel(MethaneSensor()) self.MV = self.addSubModel(MethaneVoter()) # COUPLINGS: self.connectPorts(self.MS1.mrOUT, self.MV.vIN1) # INTERNAL COUPLING self.connectPorts(self.MS2.mrOUT, self.MV.vIN2) # INTERNAL COUPLING self.connectPorts(self.MS3.mrOUT, self.MV.vIN3) # INTERNAL COUPLING self.connectPorts(self.MV.vOUT, self.voteOUT) # EXTERNAL OUTPUT COUPLING def select(self, immList): """Select function. Parameter 'imm' is a list of references to sub-models that have a scheduled internal transition occuring at the same time. """ return immList[-1] # ====================================================================== # ## AIRFLOW DEVS # ====================================================================== # class AirFlowSensorState: """ A class to hold the state of the airflow sensor. The sensor may either be READING the flow of air in the environment, or IDLE between readings. Readings must occur every 5 seconds. """ def __init__(self, mode): """ The airflow sensor's initial state is IDLE """ self.mode = mode def __str__(self): return "\n mode = %s" % self.mode class AirFlowSensor(AtomicDEVS): falseAFReading = 0 def __init__(self): AtomicDEVS.__init__(self) # add one output port - to send the airflow reading self.afOUT = self.addOutPort() # set initial state of the airflow sensor self.state = AirFlowSensorState(mode = 'IDLE') def intTransition(self): if(self.state.mode == 'IDLE'): self.state.mode = 'READING' else: self.state.mode = 'IDLE' return self.state def outputFnc(self): if(self.state.mode == 'IDLE'): #AirflowSensor.actualARReading = randint(0, 10) AirFlowSensor.falseAFReading = randint (0,10) # the sensor has a 12% probability of failing noisy, #that is of providing an incorrect reading of the environment. decision = uniform (0,99) if(decision < 12.0): self.poke(self.afOUT, AirFlowSensor.falseAFReading) else: self.poke(self.afOUT, actualRGenerator.actualAFReading) def timeAdvance(self): if(self.state.mode == 'IDLE'): return airflowReadingTime else: return 0 # ====================================================================== # ## AIRFLOW VOTER ATOMIC DEVS # ====================================================================== # class AirFlowVoterState: """ A class to hold the state of the airflow sensor voter. The sensor may either be VOTING on the level of air in the environment, or IDLE between reception of readings. """ def __init__(self, mode): """ The airflow sensor voter's initial state is IDLE """ self.mode = mode def __str__(self): return "\n mode = %s" % self.mode class AirFlowVoter(AtomicDEVS): def __init__(self): # Always call parent class' constructor first: AtomicDEVS.__init__(self) # add an input port for each airflow sensor reading received self.vIN1 = self.addInPort() self.vIN2 = self.addInPort() self.vIN3 = self.addInPort() # add one output port - to send the airflow reading voted upon self.vOUT = self.addOutPort() # set initial state of the airflow sensor self.state = 'IDLE' print("initial state of voter %s")%self.state # boolean to keep track of which sensors have sent their readings to the voter self.receivedR1 = False self.receivedR2 = False self.receivedR3 = False def intTransition(self): print("the inttran state is:") print self.state if(self.state == 'VOTING'): self.state = 'IDLE' print("voter is in IDLE state") return self.state def extTransition (self): print ("In external transition of airflowVoter") #received a.f. reading from sensor 1 af1 = self.peek (self.vIN1) if(af1 != None): self.af1 = af1 self.receivedR1 = True self.state = 'R1' print("in VOTER received reading 1") print("the reading is %d")%self.af1 print self.state #received a.f. reading from sensor 2 af2 = self.peek (self.vIN2) if(af2 != None): self.af2 = af2 self.receivedR2 = True self.state = 'R2' print("in VOTER received reading 2") print("the reading is %d")%self.af2 print self.state #received a.f. reading from sensor 3 af3 = self.peek (self.vIN3) if(af3 != None): self.af3 = af3 self.receivedR3 = True self.state = 'R3' print("in VOTER received reading 3") print("the reading is %d")%self.af3 print self.state if((self.receivedR1 == True) and (self.receivedR2 == True) and (self.receivedR3 == True)): print("in voter, received all 3 reading") self.state = 'VOTING' print("so changed the state to %s")%self.state return self.state def outputFnc(self): if(self.state == 'VOTING'): print("i am going from voting to idle now...") print self.af1 print self.af2 print self.af3 result = max (self.af1, self.af2, self.af3) self.receivedR1 = False self.receivedR2 = False self.receivedR3 = False self.poke(self.vOUT, result) print("i voted on %s")%result def timeAdvance(self): print self.state if(self.state != 'VOTING'): print("voter in infinity") return INFINITY else: print("voter in zero") return voter_delay # ====================================================================== # ## AIRFLOW COUPLED DEVS # ====================================================================== # class AirFlowCDEVS(CoupledDEVS): def __init__(self): # Always call parent class' constructor FIRST: CoupledDEVS.__init__(self) # PORTS: self.voteOUT = self.addOutPort() # SUB-MODELS: # Declaring three sub models of type AirFlowSensor (for triple modular redundancy of sensors) and a voter sub model self.AS1 = self.addSubModel(AirFlowSensor()) self.AS2 = self.addSubModel(AirFlowSensor()) self.AS3 = self.addSubModel(AirFlowSensor()) self.AV = self.addSubModel(AirFlowVoter()) # COUPLINGS: self.connectPorts(self.AS1.afOUT, self.AV.vIN1) # INTERNAL COUPLING self.connectPorts(self.AS2.afOUT, self.AV.vIN2) # INTERNAL COUPLING self.connectPorts(self.AS3.afOUT, self.AV.vIN3) # INTERNAL COUPLING self.connectPorts(self.AV.vOUT, self.voteOUT) # EXTERNAL OUTPUT COUPLING def select(self, immList): """Select function. Parameter 'imm' is a list of references to sub-models that have a scheduled internal transition occuring at the same time. """ return immList[-1] # ====================================================================== # ## CARBONMONOXIDE DEVS # ====================================================================== # class CarbonMonoxideSensorState: """ A class to hold the state of the carbon monoxide sensor. The sensor may either be READING the gaz level in the environment, or IDLE between readings. Readings must occur every 6 seconds. """ def __init__(self, mode): """ The carbon monoxide sensor's initial state is IDLE """ self.mode = mode def __str__(self): return "\n mode = %s" % self.mode class CarbonMonoxideSensor(AtomicDEVS): falseCMReading = 0 def __init__(self): AtomicDEVS.__init__(self) # add one output port - to send the carbon monoxide reading self.cmOUT = self.addOutPort() # set initial state of the carbon monoxide sensor self.state = CarbonMonoxideSensorState(mode = 'IDLE') def intTransition(self): if(self.state.mode == 'IDLE'): self.state.mode = 'READING' else: self.state.mode = 'IDLE' return self.state def outputFnc(self): if(self.state.mode == 'IDLE'): CarbonMonoxideSensor.falseCMReading = randint (0,10) # the sensor has a 9% probability of failing noisy, #that is of providing an incorrect reading of the environment. decision = uniform (0,99) if(decision < 9): self.poke(self.cmOUT, CarbonMonoxideSensor.falseCMReading) else: self.poke(self.cmOUT, actualRGenerator.actualCMReading) def timeAdvance(self): if(self.state.mode == 'IDLE'): return carbonReadingTime else: return 0 # ====================================================================== # ## CARBONMONOXIDE VOTER ATOMIC DEVS # ====================================================================== # class CarbonMonoxideVoterState: """ A class to hold the state of the carbon monoxide sensor voter. The sensor may either be VOTING on the level of air in the environment, or IDLE between reception of readings. """ def __init__(self, mode): """ The cm sensor voter's initial state is IDLE """ self.mode = mode def __str__(self): return "\n mode = %s" % self.mode class CarbonMonoxideVoter(AtomicDEVS): def __init__(self): # Always call parent class' constructor first: AtomicDEVS.__init__(self) # add an input port for each cm sensor reading received self.vIN1 = self.addInPort() self.vIN2 = self.addInPort() self.vIN3 = self.addInPort() # add one output port - to send the cm reading voted upon self.vOUT = self.addOutPort() # set initial state of the cm sensor self.state = 'IDLE' print("initial state of voter %s")%self.state # boolean to keep track of which sensors have sent their readings to the voter self.receivedR1 = False self.receivedR2 = False self.receivedR3 = False def intTransition(self): print("the inttran state is:") print self.state if(self.state == 'VOTING'): self.state = 'IDLE' print("voter is in IDLE state") return self.state def extTransition (self): print ("In external transition of airflowVoter") #received cm reading from sensor 1 cm1 = self.peek (self.vIN1) if(cm1 != None): self.cm1 = cm1 self.receivedR1 = True self.state = 'R1' print("in VOTER received reading 1") print("the reading is %d")%self.cm1 print self.state #received cm reading from sensor 2 cm2 = self.peek (self.vIN2) if(cm2 != None): self.cm2 = cm2 self.receivedR2 = True self.state = 'R2' print("in VOTER received reading 2") print("the reading is %d")%self.cm2 print self.state #received cm reading from sensor 3 cm3 = self.peek (self.vIN3) if(cm3 != None): self.cm3 = cm3 self.receivedR3 = True self.state = 'R3' print("in VOTER received reading 3") print("the reading is %d")%self.cm3 print self.state if((self.receivedR1 == True) and (self.receivedR2 == True) and (self.receivedR3 == True)): print("in voter, received all 3 reading") self.state = 'VOTING' print("so changed the state to %s")%self.state return self.state def outputFnc(self): if(self.state == 'VOTING'): print("i am going from voting to idle now...") print self.cm1 print self.cm2 print self.cm3 result = max (self.cm1, self.cm2, self.cm3) self.receivedR1 = False self.receivedR2 = False self.receivedR3 = False self.poke(self.vOUT, result) print("i voted on %s")%result def timeAdvance(self): print self.state if(self.state != 'VOTING'): print("voter in infinity") return INFINITY else: print("voter in zero") return voter_delay # ====================================================================== # ## CARBON MONOXIDE COUPLED DEVS # ====================================================================== # class CarbonMonoxideCDEVS(CoupledDEVS): """CarbonMonoxide Sensors Coupled-DEVS """ def __init__(self): # Always call parent class' constructor FIRST: CoupledDEVS.__init__(self) # PORTS: self.voteOUT = self.addOutPort() # SUB-MODELS: # Declaring three sub models of type CarbonMonoxideSensor (for triple modular redundancy of sensors) and a voter sub model self.CS1 = self.addSubModel(CarbonMonoxideSensor()) self.CS2 = self.addSubModel(CarbonMonoxideSensor()) self.CS3 = self.addSubModel(CarbonMonoxideSensor()) self.CV = self.addSubModel(CarbonMonoxideVoter()) # COUPLINGS: self.connectPorts(self.CS1.cmOUT, self.CV.vIN1) # INTERNAL COUPLING self.connectPorts(self.CS2.cmOUT, self.CV.vIN2) # INTERNAL COUPLING self.connectPorts(self.CS3.cmOUT, self.CV.vIN3) # INTERNAL COUPLING self.connectPorts(self.CV.vOUT, self.voteOUT) # EXTERNAL OUTPUT COUPLING def select(self, immList): """Select function. Parameter 'imm' is a list of references to sub-models that have a scheduled internal transition occuring at the same time. """ return immList[-1] # ====================================================================== # ## ENVMONITOR DEVS # ====================================================================== # class EnvMonitorState: """ A class to hold the state of an environment monitor. It may either be processing sensor readings ('PROCESSING'), responding to a query ('QUERYING') or doing nothing ('IDLE'). """ def __init__(self, mode): """ The monitor's initial state is IDLE. """ self.mode = mode def __str__(self): return self.mode class EnvMonitor(AtomicDEVS): """A class to simulate the behaviour of the environment monitor. It receives environmental readinds from the 3 env sensors: methane, ariflow, cardbon monoxide. It handles these readings by outputting the appropriate messages to the necessary controllers. """ def __init__(self): AtomicDEVS.__init__(self) # add 3 in ports self.methIN = self.addInPort() self.afIN = self.addInPort() self.cmIN = self.addInPort() self.q_recv = self.addInPort() # add 1 out port self.alarmOUT = self.addOutPort() self.q_sack = self.addOutPort() # set the state of the monitor self.state = EnvMonitorState(mode = 'IDLE') # variables that will keep track of whether the methane level # is CRT or NCRT (critical or not critical) for the sake of replying # to queries from the pump controller. self.methaneState = 'NCRT' # variable that will keep track of the message that needs to be output # when sensor readings are handled self.msgToBeSent = 'NONE' def intTransition(self): if((self.state.mode == 'PROCESSING') or (self.state.mode == 'QUERYING')): self.state.mode = 'IDLE' print("ENV: i am doing an internal transition, and i changed my state to idle") return self.state def extTransition(self): print("ENV: i am in external ") # received a methane reading methane_reading = self.peek(self.methIN) if(methane_reading != None): print("ENV: received none empty meth reading") if(methane_reading >= 7): # a methane level is critical when it reaches 7 or ges beyond self.methaneState = 'CRT' self.msgToBeSent = 'methane-critical' print("and changed my status to processing") self.state.mode = 'PROCESSING' # received an airflow reading airflow_reading = self.peek(self.afIN) if(airflow_reading != None): print("ENV: for af reading") if(airflow_reading <= 3): # an airflow level is critical when it is too low, 3 or below. self.msgToBeSent = 'airflow-critical' self.state.mode = 'PROCESSING' # received a carbon monoxide reading carbon_monoxide_reading = self.peek(self.cmIN) if(carbon_monoxide_reading != None): print("ENV: for cm reading") if(carbon_monoxide_reading >= 5): # a carbon monoxide reading is critical when it is 5 or above. self.msgToBeSent = 'carbon-critical' self.state.mode = 'PROCESSING' # received a methane query query = self.peek(self.q_recv) if(query != None): print("ENV: i received a query") self.state.mode = 'QUERYING' return self.state def outputFnc(self): if(self.state.mode == 'PROCESSING'): if(self.msgToBeSent != 'NONE'): # a message is only sent when there is a critical reading. self.poke(self.alarmOUT, self.msgToBeSent) self.msgToBeSent = 'NONE' print("ENV: sent methane critical alarm") else: pass elif(self.state.mode == 'QUERYING'): reply = QueryAck(self.methaneState) print self.methaneState self.poke(self.q_sack, reply) print("ENV: sent reply to query") print reply def timeAdvance(self): if((self.state.mode == 'PROCESSING') or (self.state.mode == 'QUERYING')): return monitor_delay else: return INFINITY # ====================================================================== # ## COMMUNICATION DEVS # ====================================================================== # class CommunicationState: """ A class to hold the state of a communication subsystem. It may either be sending alarms ('SEND-ALARM'), sending a query to the env monitor ('SEND-QUERY') or sending a query ack to the pump controller ('SEND-ACK'). When it completes either of these tasks, its state is 'IDLE'. """ def __init__(self, mode): """ The communication's initial state is IDLE. """ self.mode = mode def __str__(self): return self.mode class Communication(AtomicDEVS): """A class to simulate the communication channel between the env monitor, and the pump controller. """ def __init__(self): AtomicDEVS.__init__(self) print("COMM created") # add 3 in ports self.alarm_recv = self.addInPort() self.q_recv = self.addInPort() self.q_rack = self.addInPort() # add 3 out ports self.alarm_send_pc = self.addOutPort() self.alarm_send_hc = self.addOutPort() self.q_send = self.addOutPort() self.q_sack = self.addOutPort() # set initial state of the communication channel self.state = CommunicationState(mode = 'IDLE') # variable to ensure that a critical methane level will also be sent to the pump controller self.sendAlsoToPC = False # variables to keep track of the inputs to the communication subsystem # self.alarm = 'NONE' # self.query = Query() # self.ack = QueryAck('NONE') def intTransition(self): print("COMM: in int tr.") if((self.state.mode == 'SEND-ALARM') or (self.state.mode == 'SEND-QUERY') or (self.state.mode == 'SEND-ACK')): self.state.mode = 'IDLE' return self.state def extTransition(self): print("COMM: in extTransition") # received methane alarm self.alarm = self.peek(self.alarm_recv) if(self.alarm != None): print("COMM: received alarm") self.state.mode = 'SEND-ALARM' if(self.alarm == 'methane-critical'): self.sendAlsoToPC = True # received query from pump controller self.query = self.peek(self.q_recv) print("COMM: check if received query") if(self.query != None): print("COMM: received query") self.state.mode = 'SEND-QUERY' # received methane query ack self.ack = self.peek(self.q_rack) if(self.ack != None): print("COMM: received query response from env mon") self.state.mode = 'SEND-ACK' print("COMM: after ext tran, my state is") print self.state.mode return self.state def outputFnc(self): if(self.state.mode == 'SEND-ALARM'): self.poke(self.alarm_send_hc,self.alarm) print("COMM: sent alarm to hc ") if(self.sendAlsoToPC == True): self.poke(self.alarm_send_pc, self.alarm) self.sendAlsoToPc = False print("COMM: sent alarm to pc") elif(self.state.mode == 'SEND-QUERY'): print("COMM: sent query to env mon") self.poke(self.q_send, self.query) elif(self.state.mode == 'SEND-ACK'): print("COMM: sent query ack to pc") self.poke(self.q_sack, self.ack) def timeAdvance(self): print("COMM: in time ad") if((self.state.mode == 'SEND-ALARM') or (self.state.mode == 'SEND-QUERY') or (self.state.mode == 'SEND-ACK')): return communication_delay else: return INFINITY # ====================================================================== # ## HUMANCONTROLLER DEVS # ====================================================================== # class HumanController(AtomicDEVS): """A class to simulate the behaviour of a controller. In our implementation, the controller is a passive entity, and is stateless. It will receive external alarm events, but it will not respond to them. """ def __init__(self): AtomicDEVS.__init__(self) # add one input port self.alarmIN = self.addInPort() # There is only one passive state. self.state = 'PASSIVE' def extTransition(self): alarm = self.peek(self.alarmIN) return self.state def timeAdvance(Self): return INFINITY # ====================================================================== # ## WATER DEVS # ====================================================================== # class WaterState: """ A class to hold the state of the water level in the mine shaft. The level may either be HIGH or LOW. """ def __init__(self, level): """ The water's initial state is LOW """ self.level = level def __str__(self): return self.level class Water(AtomicDEVS): def __init__(self): AtomicDEVS.__init__(self) # add one output port - to send the water level to the PumpController self.wOUT = self.addOutPort() # set initial state of the water sensor self.state = WaterState(level = 'LOW') def getNextState(self): decision = randint(0,99) if(decision < 25.0): self.nextState = 'HIGH' else: self.nextState = 'LOW' return self.nextState def intTransition(self): self.state.level = self.nextState self.state.level = 'LOW' return self.state def outputFnc(self): nextState = self.getNextState() self.poke(self.wOUT, nextState) def timeAdvance(self): return waterReadingTime # ====================================================================== # ## PUMPCONTROLLER DEVS # ====================================================================== # class PumpControllerState: """ A class to hold the state of the pump controller. It may either be processing a water sensor reading and sending an operation to the pump ('PROCESSING-WATER'), processing a methane alarm ('PROCESSING-ALARM'), processing a query ack ('PROCESSING-ACK'), or doing nothing ('IDLE'). """ def __init__(self, mode): """ The pump controller's initial state is IDLE. """ self.mode = mode def __str__(self): return self.mode class PumpController(AtomicDEVS): """ A class to simulate the behaviour of the pump controller. It receives methane alarms from the EnvMonitor through the Communication channel. It handles these alarms bu outputting the apporciate operation to be performed by the Pump. """ def __init__(self): AtomicDEVS.__init__(self) # add 3 in ports self.meth_alarm = self.addInPort() self.wIN = self.addInPort() self.q_rack = self.addInPort() # add 2 out ports self.pump_op = self.addOutPort() self.q_send = self.addOutPort() # set initial state of the controller self.state = PumpControllerState(mode = 'IDLE') # the operation which needs to be sent to the pump self.pumpState = 'OFF' # boolean to keep track of whether there is a query to be sent or not. self.queryToBeSent = False def intTransition(self): if((self.state.mode == 'PROCESSING-ALARM') or (self.state.mode == 'PROCESSING-WATER') or (self.state.mode == 'PROCESSING-ACK')): self.state.mode = 'IDLE' return self.state def extTransition(self): print("external event occured in PUMP CONT.") # methane alarm received methAlarm = self.peek(self.meth_alarm) if(methAlarm != None): print("received methane alarm in PUMP CONT.") self.state.mode = 'PROCESSING-ALARM' self.pumpState = 'OFF' # water reading received from sensor waterReading = self.peek(self.wIN) if(waterReading != None): print("received water level in PUMP CONT.") self.state.mode = 'PROCESSING-WATER' if(waterReading == 'LOW'): self.pumpState = 'OFF' else: self.pumpState = 'READY' self.queryToBeSent = True # methane level acknoledgement received meth_ack = self.peek(self.q_rack) if(meth_ack != None): print("received methane ack in PUMP CONT.") self.state.mode = 'PROCESSING-ACK' if(meth_ack.query_result == 'CRT'): self.pumpState = 'OFF' else: self.pumpState = 'ON' return self.state def outputFnc(self): if(self.state.mode == 'PROCESSING-ALARM'): self.poke(self.pump_op, self.pumpState) elif(self.state.mode == 'PROCESSING-WATER'): self.poke(self.pump_op, self.pumpState) if(self.queryToBeSent == True): newQuery = Query() print("in pump cont. i am sending a query") self.poke(self.q_send, newQuery) self.queryToBeSent = False elif(self.state.mode == 'PROCESSING-ACK'): self.poke(self.pump_op, self.pumpState) def timeAdvance(self): if((self.state.mode == 'PROCESSING-ALARM') or (self.state.mode == 'PROCESSING-WATER') or (self.state.mode == 'PROCESSING-ACK')): return 0 else: return INFINITY # ====================================================================== # ## PUMP DEVS # ====================================================================== # class PumpState: """ A class to hold the state of a Pump. A pump can be in three modes: OFF, READY and RUNNING. """ def __init__(self, mode): """ The pump's initial state is OFF. """ self.mode = mode def __str__(self): return self.mode class Pump(AtomicDEVS): """ A class to represent the physical pump operation. """ def __init__(self): AtomicDEVS.__init__(self) # add one input port self.opIN = self.addInPort() # set initial state of the pump self.state = PumpState(mode = 'OFF') def intTransition(self): if (self.state.mode == 'OFF'): self.state.mode = 'READY' elif (self.state.mode == 'READY'): self.state.mode = 'ON' else: self.state.mode = 'OFF' return self.state def extTransition(self): newMode = self.peek(self.opIN) if(newMode != None): self.state.mode = newMode return self.state def outputFnc(self): pass def timeAdvance(self): return INFINITY # ====================================================================== # class Query: """ Query class has no attributes. """ def __init__(self): pass def __str__(self): return "(Querying)" # ====================================================================== # class QueryAck: """ QueryAck has one attribute indicates whether the methane level is critical or not. """ def __init__(self, methaneState): self.query_result = methaneState def __str__(self): return "Methane is %s" %(self.query_result) # ====================================================================== # class Root(CoupledDEVS): def __init__(self): """Constructor. """ # Always call parent class' constructor FIRST: CoupledDEVS.__init__(self) # create the real readings generator self.generator = self.addSubModel(actualRGenerator()) # Create the EnvSensors submodel self.methane_sensors = self.addSubModel(MethaneCDEVS()) self.airflow_sensors = self.addSubModel(AirFlowCDEVS()) self.carbon_monoxide_sensors = self.addSubModel(CarbonMonoxideCDEVS()) # Create the EnvMonitor submodel self.env_monitor = self.addSubModel(EnvMonitor()) # Create the Communication submodel self.communication = self.addSubModel(Communication()) # Create the HumanController submodel self.human_controller = self.addSubModel(HumanController()) # Create the Water submodel self.water = self.addSubModel(Water()) # Create the PumpController submodel self.pump_controller = self.addSubModel(PumpController()) # Create the Pump submodel self.pump = self.addSubModel(Pump()) # Connect the ports self.connectPorts(self.methane_sensors.voteOUT, self.env_monitor.methIN) self.connectPorts(self.airflow_sensors.voteOUT, self.env_monitor.afIN) self.connectPorts(self.carbon_monoxide_sensors.voteOUT, self.env_monitor.cmIN) self.connectPorts(self.env_monitor.alarmOUT, self.communication.alarm_recv) self.connectPorts(self.env_monitor.q_sack, self.communication.q_rack) self.connectPorts(self.communication.q_send, self.env_monitor.q_recv) self.connectPorts(self.communication.alarm_send_hc, self.human_controller.alarmIN) self.connectPorts(self.communication.alarm_send_pc, self.pump_controller.meth_alarm) self.connectPorts(self.communication.q_sack, self.pump_controller.q_rack) self.connectPorts(self.water.wOUT, self.pump_controller.wIN) self.connectPorts(self.pump_controller.q_send, self.communication.q_recv) self.connectPorts(self.pump_controller.pump_op, self.pump.opIN) # ====================================================================== # # non-functional requirements # we denote a failure of a requirements by an 'F', and a success by a '-' SAFETY = [] VSAFETY = [] SAFTIME = [] VSAFTIME = [] RELIABILITY = [] NFTRELIABILITY = [] RELTIME = [] file1 = open ('safety_ft_maxvoter.txt', 'w') file1.write ('\n*******SAFETY STATS generated by FTPCS.py********\n\n') file2 = open ('reliability_ft_maxvoter.txt', 'w') file2.write ('\n*******RELIABILITY STATS generated by FTPCS.py********\n\n') # time advance definitions INFINITY = 100000.0 methReadingTime = 2 # methane sensor must detect level of methane every 2 seconds. generation_time = 1.9 # generator of the actual methane reading occurs every 2 seconds airflowReadingTime = 5 # airflow sensor must detect flow of air every 5 seconds carbonReadingTime = 6 # cardonMonoxide sensor must detect level of gaz every 6 seconds. waterReadingTime = 10 # water sensor must detect level of water every 10 seconds monitor_delay = 0 # env monitor replies to a methane level query within this delay communication_delay = 1 # communication channel takes 1 second to send messages. voter_delay = 0 # each voter has a delay time of 0 sec # Simulate M = Root() S = Simulator(M) S.simulate(2000) print "\n*****Safety List (NO FT):******\n" print ("length of safety list = %d") %len(SAFETY) print SAFETY print SAFETY.count('F') print SAFTIME print "\n*****Safety List (WITH FT):******\n" print ("length of new safety list = %d") %len(VSAFETY) print VSAFETY vsc = VSAFETY.count('F') print vsc file1.write('\nTotal Readings: %d\n' %len(VSAFETY)) file1.write('\nFailure Readings: %d\n' %vsc) print VSAFTIME print "\n*****Reliability List (NO FT):******\n" print ("length of reliability list = %d") %len(NFTRELIABILITY) print NFTRELIABILITY.count('F') print NFTRELIABILITY print "\n*****Reliability List (WITH FT):******\n" print ("length of reliability list = %d") %len(RELIABILITY) rc = RELIABILITY.count('F') print rc print RELIABILITY file2.write('\nTotal Readings: %d\n' %len(RELIABILITY)) file2.write('\nFailure Readings: %d\n' %rc) file1.close() file2.close()