-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathpyPlcWorker.py
More file actions
136 lines (127 loc) · 5.74 KB
/
pyPlcWorker.py
File metadata and controls
136 lines (127 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Worker for the pyPLC
#
# Tested on
# - Windows10 with python 3.9 and
# - Raspbian with python 3.9
#
#------------------------------------------------------------
import pyPlcHomeplug
import fsmEvse
import fsmPev
from pyPlcModes import *
import addressManager
import time
import subprocess
import hardwareInterface
import connMgr
class pyPlcWorker():
def __init__(self, callbackAddToTrace=None, callbackShowStatus=None, mode=C_EVSE_MODE, isSimulationMode=0, callbackSoC=None):
print("initializing pyPlcWorker")
self.nMainFunctionCalls=0
self.mode = mode
self.strUserAction = ""
self.addressManager = addressManager.addressManager()
self.callbackAddToTrace = callbackAddToTrace
self.callbackShowStatus = callbackShowStatus
self.callbackSoC = callbackSoC
self.oldAvlnStatus = 0
self.isSimulationMode = isSimulationMode
self.connMgr = connMgr.connMgr(self.workerAddToTrace, self.showStatus)
self.hp = pyPlcHomeplug.pyPlcHomeplug(self.workerAddToTrace, self.showStatus, self.mode, self.addressManager, self.connMgr, self.isSimulationMode)
self.hardwareInterface = hardwareInterface.hardwareInterface(self.workerAddToTrace, self.showStatus, self.hp)
self.hp.printToUdp("pyPlcWorker init")
# Find out the version number, using git.
# see https://stackoverflow.com/questions/14989858/get-the-current-git-hash-in-a-python-script
try:
strLabel = str(subprocess.check_output(["git", "describe", "--tags"], text=True).strip())
except:
strLabel = "(unknown version. 'git describe --tags' failed.)"
self.workerAddToTrace("[pyPlcWorker] Software version " + strLabel)
if (self.mode == C_EVSE_MODE):
self.evse = fsmEvse.fsmEvse(self.addressManager, self.workerAddToTrace, self.hardwareInterface, self.showStatus, self.callbackSoC)
if (self.mode == C_PEV_MODE):
self.pev = fsmPev.fsmPev(self.addressManager, self.connMgr, self.workerAddToTrace, self.hardwareInterface, self.showStatus)
def __del__(self):
if (self.mode == C_PEV_MODE):
try:
del(self.pev)
except:
pass
def workerAddToTrace(self, s):
# The central logging function. All logging messages from the different parts of the project
# shall come here.
#print("workerAddToTrace " + s)
self.callbackAddToTrace(s) # give the message to the upper level, eg for console log.
self.hp.printToUdp(s) # give the message to the udp for remote logging.
def showStatus(self, s, selection = "", strAuxInfo1="", strAuxInfo2=""):
self.callbackShowStatus(s, selection)
if (selection == "pevState"):
self.hardwareInterface.showOnDisplay(s, strAuxInfo1, strAuxInfo2)
try:
self.hardwareInterface.visualizeStatus(s, selection, strAuxInfo1, strAuxInfo2)
except:
pass
def handleTcpConnectionTrigger(self):
if (self.mode == C_PEV_MODE) and (self.connMgr.getConnectionLevel()==50) and (self.oldAvlnStatus==0):
self.workerAddToTrace("[PLCWORKER] Network is established, ready for TCP.")
self.oldAvlnStatus = 1
self.pev.reInit()
return
if (self.connMgr.getConnectionLevel()<50):
self.oldAvlnStatus = 0
def mainfunction(self):
self.nMainFunctionCalls+=1
#self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls))
if (self.mode == C_PEV_MODE):
self.connMgr.mainfunction()
self.handleTcpConnectionTrigger()
self.hp.mainfunction() # call the lower-level workers
self.hardwareInterface.mainfunction()
if (self.mode == C_EVSE_MODE):
if (self.nMainFunctionCalls>8*33): # ugly. Wait with EVSE high level handling, until the modem restarted.
self.evse.mainfunction() # call the evse state machine
if (self.mode == C_PEV_MODE):
self.pev.mainfunction() # call the pev state machine
def handleUserAction(self, strAction):
self.strUserAction = strAction
print("user action " + strAction)
if (strAction == "P"):
print("switching to PEV mode")
self.mode = C_PEV_MODE
if (hasattr(self, 'evse')):
print("deleting evse")
del self.evse
self.hp.enterPevMode()
if (not hasattr(self, 'pev')):
print("creating pev")
self.pev = fsmPev.fsmPev(self.addressManager, self.workerAddToTrace, self.hardwareInterface, self.callbackShowStatus)
self.pev.reInit()
if (strAction == "E"):
print("switching to EVSE mode")
self.mode = C_EVSE_MODE
if (hasattr(self, 'pev')):
print("deleting pev")
del self.pev
self.hp.enterEvseMode()
if (not hasattr(self, 'evse')):
print("creating fsmEvse")
self.evse = fsmEvse.fsmEvse(self.workerAddToTrace)
self.evse.reInit()
if (strAction == "L"):
print("switching to LISTEN mode")
self.mode = C_LISTEN_MODE
self.hp.enterListenMode()
if (hasattr(self, 'evse')):
print("deleting evse")
del self.evse
if (hasattr(self, 'pev')):
print("deleting pev")
del self.pev
if (strAction == "space"):
print("stopping the charge process")
if (hasattr(self, 'pev')):
self.pev.stopCharging()
if (hasattr(self, 'evse')):
self.evse.stopCharging()
# self.addToTrace("UserAction " + strAction)
self.hp.sendTestFrame(strAction)