forked from sonamkshenoy/YACS
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
170 lines (109 loc) · 4.12 KB
/
worker.py
File metadata and controls
170 lines (109 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import sys
import json
import socket
import threading
import time
import datetime
import logging
# ----------------------------------
# from allConfigs import *
# General variables
# File and variable names
# CONFIGFILE = "config.json"
MAINKEYINCONFIG = "workers"
# IPs and Ports
MASTER_SCHEDULING_PORT = 5000 # Port that listens to requests from request generator and schedules them to workers
MASTER_UPDATE_PORT = 5001 # Port that listens to updates from workers and executes reduce tasks once done
MASTER_IP = "localhost"
WORKER_IP = "localhost"
# Variables
PORTNUMBER = "portNumber"
# ----------------------------------
"""
Worker configs:
1. WORKER_PORT
2. WORKER_CONFIG
3. WORKER_ID
4. WORKER_IP
"""
# THREAD 1: LISTENS TO TASKS TO EXECUTE (ACTS AS CLIENT)
def listenToTasks():
global freeSlotsNum
workersocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
workersocket.bind((WORKER_IP, WORKER_PORT))
workersocket.listen()
while True:
(mastersocket, address) = workersocket.accept()
while True:
data = mastersocket.recv(4096).decode()
if(len(data) == 0):
mastersocket.close()
break
task = json.loads(data)
taskid = task["task_id"]
duration = task["duration"]
# Add to execution pool
t = threading.Thread(target = executeTaskAndUpdateMaster, args=(taskid, duration))
t.start()
# THREAD 2: EXECUTES TASKS AND UPDATES MASTER ABOUT THIS
def executeTaskAndUpdateMaster(task_id, durationOfTask):
global freeSlotsNum
logging.info("[START] Task-{0} Started Execution".format(task_id))
starttime = datetime.datetime.now()
# Execute task of duration = x seconds
while(durationOfTask):
# Reduce remaining duration by 1 after 1 second (simulate passing of 1 second using time.sleep(1))
time.sleep(1)
durationOfTask -= 1
# Once duration of task done, update master
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((MASTER_IP, MASTER_UPDATE_PORT))
duration = datetime.datetime.now() - starttime
message = json.dumps({PORTNUMBER: WORKER_PORT, "taskid": task_id})
logging.info("[FINISH] TASK {0} Finished execution. Total duration - {1:.3f}".format(task_id, duration.total_seconds()*1000))
s.send(message.encode())
# Thread gets killed (slot free) once execution over.
"""
# THREAD 3: SENDS HEARTBEATS ABOUT NUMBER OF FREE SLOTS TO MASTER
def sendHeartbeats():
while(True):
print("Sending...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((MASTER_IP, MASTER_UPDATE_PORT))
message = json.dumps({"typeTask": HEARTBEAT, "numFreeSlots": (WORKER_PORT, freeSlotsNum)})
s.send(message.encode())
time.sleep(0.001)
"""
# MAIN FUNCTION
if __name__ == "__main__":
# First get worker details
# Make sure user passes port number for worker
if len(sys.argv) < 3:
print("Usage: python worker.py <port no.> <worker_id>", file=sys.stderr)
sys.exit(-1)
WORKER_PORT = int(sys.argv[1])
WORKER_ID = int(sys.argv[2])
logging.basicConfig(
level=logging.INFO,
format="(%(asctime)s) %(message)s",
handlers=[
logging.FileHandler("logs/worker_{0}.log".format(WORKER_ID)),
]
)
# debug -> if logs should be print to the terminal
debug = True
if(len(sys.argv) == 4):
debug = sys.argv[3]
if(debug == 'False'):
debug = False
if(debug):
logging.getLogger().addHandler(logging.StreamHandler())
# Once details fetched, set up socket for listening to tasks and thread for executing them
try:
t1 = threading.Thread(target = listenToTasks)
# t2 = threading.Thread(target = sendHeartbeats)
t1.start()
# t2.start()
# We don't want to join (stop master till threads finish executing, they don't stop executing)
except Exception as e:
print("Error in starting thread: ", e)