forked from sonamkshenoy/YACS
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequests.py
More file actions
75 lines (63 loc) · 2.21 KB
/
requests.py
File metadata and controls
75 lines (63 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import json
import socket
import time
import sys
import random
import numpy as np
# ----------------------------------
# from allConfigs import *
# General variables
# File and variable names
# CONFIGFILE = "config.json"
MAINKEYINCONFIG = "workers"
# IPs and Ports
MASTER_SCHEDULING_PORT = 5000 # Port that listens to requests from request generator and schedules them to workers
MASTER_UPDATE_PORT = 5001 # Port that listens to updates from workers and executes reduce tasks once done
MASTER_IP = "localhost"
WORKER_IP = "localhost"
# Variables
PORTNUMBER = "portNumber"
# ----------------------------------
def create_job_request(job_id):
number_of_map_tasks=random.randrange(1,5)
number_of_reduce_tasks=random.randrange(1,3)
job_request={"job_id":job_id,"map_tasks":[],"reduce_tasks":[]}
for i in range(0,number_of_map_tasks):
map_task={"task_id":job_id+"_M"+str(i),"duration":random.randrange(1,5)}
job_request["map_tasks"].append(map_task)
for i in range(0,number_of_reduce_tasks):
reduce_task={"task_id":job_id+"_R"+str(i),"duration":random.randrange(1,5)}
job_request["reduce_tasks"].append(reduce_task)
return job_request
def send_request(job_request):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((MASTER_IP, MASTER_SCHEDULING_PORT))
message=json.dumps(job_request)
#send task
# print(message)
s.send(message.encode())
if __name__ == '__main__':
if(len(sys.argv)!=2):
print("Usage: python requests.py <number_of_requests>")
exit()
#get number of requests to be generated
number_of_requests=int(sys.argv[1])
arrivals = np.random.exponential(1, size=number_of_requests-1)
request_number=0
#send first request
current_time=last_request_time=time.time() # time 0
job_request=create_job_request(str(request_number))
print("interval: ",0, 'Job ID' ,job_request['job_id'])
send_request(job_request)
request_number+=1
while request_number<number_of_requests:
interval=arrivals[request_number-1]
while True:
if(time.time()-last_request_time>=interval):
break
time.sleep(0.01)
job_request=create_job_request(str(request_number))
print("interval: ",interval, 'Job ID' ,job_request['job_id'])
send_request(job_request)
last_request_time=time.time()
request_number+=1