1313from fysom import Fysom
1414
1515from instana .log import logger
16- from instana .util import get_default_gateway
16+ from instana .util import get_default_gateway , get_lock
1717from instana .util .process_discovery import Discovery
1818from instana .util .runtime import is_windows
1919from instana .version import VERSION
@@ -26,14 +26,16 @@ class TheMachine:
2626 RETRY_PERIOD = 30
2727 THREAD_NAME = "Instana Machine"
2828
29- warnedPeriodic = False
30-
3129 def __init__ (self , agent : "HostAgent" ) -> None :
3230 logger .debug ("Initializing host agent state machine" )
3331
32+ self ._lock = get_lock ()
33+ self ._warned_periodic = False
34+
3435 self .agent = agent
3536 self .fsm = Fysom (
3637 {
38+ "initial" : "*" ,
3739 "events" : [
3840 ("lookup" , "*" , "found" ),
3941 ("announce" , "found" , "announced" ),
@@ -42,7 +44,7 @@ def __init__(self, agent: "HostAgent") -> None:
4244 ],
4345 "callbacks" : {
4446 # Can add the following to debug
45- # "onchangestate": self.print_state_change,
47+ # "onchangestate": self.print_state_change,
4648 "onlookup" : self .lookup_agent_host ,
4749 "onannounce" : self .announce_sensor ,
4850 "onpending" : self .on_ready ,
@@ -51,17 +53,33 @@ def __init__(self, agent: "HostAgent") -> None:
5153 }
5254 )
5355
54- self .timer = threading .Timer (1 , self .fsm .lookup )
55- self .timer .daemon = True
56- self .timer .name = self .THREAD_NAME
57- self .timer .start ()
56+ with self ._lock :
57+ self .timer = threading .Timer (1 , self ._safe_fsm_lookup )
58+ self .timer .daemon = True
59+ self .timer .name = self .THREAD_NAME
60+ self .timer .start ()
5861
5962 @staticmethod
6063 def print_state_change (e : Any ) -> None :
6164 logger .debug (
6265 f"========= ({ os .getpid ()} #{ threading .current_thread ().name } ) FSM event: { e .event } , src: { e .src } , dst: { e .dst } =========="
6366 )
6467
68+ def _safe_fsm_lookup (self ) -> None :
69+ """Thread-safe wrapper for FSM lookup."""
70+ with self ._lock :
71+ self .fsm .lookup ()
72+
73+ def _safe_fsm_announce (self ) -> None :
74+ """Thread-safe wrapper for FSM announce."""
75+ with self ._lock :
76+ self .fsm .announce ()
77+
78+ def _safe_fsm_pending (self ) -> None :
79+ """Thread-safe wrapper for FSM pending."""
80+ with self ._lock :
81+ self .fsm .pending ()
82+
6583 def reset (self ) -> None :
6684 """
6785 reset is called to start from scratch in a process. It may be called on first boot or
@@ -73,14 +91,15 @@ def reset(self) -> None:
7391 :return: void
7492 """
7593 logger .debug ("State machine being reset. Will start a new announce cycle." )
76- self .fsm .lookup ()
94+ with self ._lock :
95+ self .fsm .lookup ()
7796
7897 def lookup_agent_host (self , e : Any ) -> bool :
7998 host = self .agent .options .agent_host
8099 port = self .agent .options .agent_port
81100
82101 if self .agent .is_agent_listening (host , port ):
83- self .fsm . announce ()
102+ self ._safe_fsm_announce ()
84103 return True
85104
86105 if os .path .exists ("/proc/" ):
@@ -89,14 +108,15 @@ def lookup_agent_host(self, e: Any) -> bool:
89108 if self .agent .is_agent_listening (host , port ):
90109 self .agent .options .agent_host = host
91110 self .agent .options .agent_port = port
92- self .fsm . announce ()
111+ self ._safe_fsm_announce ()
93112 return True
94113
95- if self .warnedPeriodic is False :
96- logger .info (
97- "Instana Host Agent couldn't be found. Will retry periodically..."
98- )
99- self .warnedPeriodic = True
114+ with self ._lock :
115+ if self ._warned_periodic is False :
116+ logger .info (
117+ "Instana Host Agent couldn't be found. Will retry periodically..."
118+ )
119+ self ._warned_periodic = True
100120
101121 self .schedule_retry (
102122 self .lookup_agent_host , e , f"{ self .THREAD_NAME } : agent_lookup"
@@ -143,17 +163,18 @@ def announce_sensor(self, e: Any) -> bool:
143163 return False
144164
145165 self .agent .set_from (payload )
146- self .fsm . pending ()
166+ self ._safe_fsm_pending ()
147167 logger .debug (
148168 f"Announced PID: { pid } (true PID: { self .agent .announce_data .pid } ). Waiting for Agent Ready..."
149169 )
150170 return True
151171
152172 def schedule_retry (self , fun : Callable , e : Any , name : str ) -> None :
153- self .timer = threading .Timer (self .RETRY_PERIOD , fun , [e ])
154- self .timer .daemon = True
155- self .timer .name = name
156- self .timer .start ()
173+ with self ._lock :
174+ self .timer = threading .Timer (self .RETRY_PERIOD , fun , [e ])
175+ self .timer .daemon = True
176+ self .timer .name = name
177+ self .timer .start ()
157178
158179 def on_ready (self , _ : Any ) -> None :
159180 self .agent .start ()
0 commit comments