-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtprocessing.py
More file actions
90 lines (73 loc) · 3.59 KB
/
tprocessing.py
File metadata and controls
90 lines (73 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#
# Written by: Carlos Bazaga
#
# Licensed under BSD 2-Clause
#
from multiprocessing import *
from tmonitor import Tmonitor
from tmanager import Tmanager
from tpool import Tpool
from tspy import Tspy
import threading as th
class Tprocess(Process):
"""
Extension class for "multiprocessing.Process" wich adds monitorization mechanism via "Tspy".
Includes an internal "Tspy" for processes and threads monitoring.
It's designed for identical use as "Process" so it can be replaced in an already working code.
Replaces original "run" method by the new "_run_with_spy" just before starting the new process.
This allows subclassing this class same way as "multiprocessing.Process"
Accepts a predefinition of its "daemonic" behaviour.
Method naming convention:
Methods named normally are the ones to be called from calling processes,
"Users", and so they are executed in parent's scope.
Methods with mangled names (two leading underscores) inteds for internal
process use only so they are executed in the self scope.
"""
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
"""Return a new "Tprocess"."""
Process.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs)
if daemon is not None: # Set the process "daemonic" behaviour.
self.daemon = daemon
self.tspy = self # Make the process its own "Tspy". This way the "get_report" method can be accessed.
# Communication channel:
self._in_queue = Queue() # Queue for "Tspy" requests. Only the "Tprocess" process reads, any "Users" writes.
self._out_queue = Queue() # Queue for "Tspy" answers. Only the "Tprocess" process writes, any "Users" reads.
self._lock = Lock() # "Tspy" requests synchronization lock.
self._enabled = True # Flag the "Tprocess" as ready to receive "Tspy" requests.
# Replace original "run" method by the new "_run_with_spy"
self._run_func = self.run
self.run = self._run_with_spy
def _run_with_spy(self):
"""
Set a service thread that waits for "Tspy" signal.
Create a real "Tspy" inside the process.
The Tspy will be accesible as "tspy" atribute.
Run the original "run" function.
After execution clear the queues avoiding requests to stay unanswered.
"""
# Set a service thread that waits for "Tspy" signal.
_reporter = th.Thread(target=self.__get_report, name='Tspy')
_reporter.daemon = True # Set the thread as "Daemon".
_reporter.start() # Start the service.
self._tspy = Tspy() # Create a real "Tspy" inside the process.
# Run the original "run" function.
self._run_func()
# Waiting for requests lock avoids "Tprocess" to die suddenly while a request is being served.
with self._lock:
self._enabled = False
# Close the queues.
self._in_queue.close()
self._out_queue.close()
def get_report(self):
"""Request and return the "Tspy" report."""
with self._lock:
if self._enabled:
self._in_queue.put('This message awakes "__get_report"')
return self._out_queue.get()
else:
return ((self.name, str(self), None), [], [])
def __get_report(self):
"""Wait the signal to perform an "Spying"."""
while self._enabled:
self._in_queue.get()
self._out_queue.put(self._tspy.get_report())