-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshared_test.py
More file actions
65 lines (55 loc) · 1.73 KB
/
shared_test.py
File metadata and controls
65 lines (55 loc) · 1.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import ctypes
import socket
import netifaces as ni
import time
import multiprocessing
def main():
error_secondary_counter = 0
start = 0
secondaryAlive.value = False
print(f"secondaryAlive.value:{secondaryAlive.value}")
reading1process = multiprocessing.Process(target=readDataSecondary, args=(secondaryAlive,))
reading1process.start()
while not secondaryAlive.value:
print(f"secondaryAlive.value while:{secondaryAlive.value}")
if start == 0:
time.sleep(1)
sendDataSecondary()
time.sleep(0.2)
start = 1
error_secondary_counter = +1
if error_secondary_counter > 45:
start = 0
print("Connection to secondary failed")
reading1process.join()
print("reading joined")
main()
def sendDataSecondary():
UDP_IP = ni.ifaddresses('lo')[ni.AF_INET][0]['addr']
UDP_PORT = 5000
MESSAGE = b"123"
print("Message sent")
# MESSAGE = b'Foto'
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
def readDataSecondary(secondaryAlive):
UDP_IP = "127.0.0.1"
UDP_PORT = 10000
try:
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
except:
pass
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
incoming_message = data.decode()
if incoming_message:
print(f"incoming message: {incoming_message}")
secondaryAlive.value = True
return
secondaryAlive = multiprocessing.Value(ctypes.c_bool, False)
p = multiprocessing.Process(target=main)
p.start()
p.join()
print("End")