-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_worker.py
More file actions
45 lines (35 loc) · 1.46 KB
/
Copy pathtest_worker.py
File metadata and controls
45 lines (35 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""
test_worker.py
Unit test for worker.py. Connects directly to the worker's Unix Domain Socket
and validates that a sample image payload receives a well-formed response.
CSCI 599: Network Systems for Cloud Computing
University of Southern California
"""
import socket
import base64
# 创建一张极小的红底测试图片的 Base64
test_img_base64 = "/9j/4AAQSkZJRgABAQEASABIAAD/2wBDAP//////////////////////////////////////////////////////////////////////////////////////wgALCAABAAEBAREA/8QAFBABAAAAAAAAAAAAAAAAAAAAAP/aAAgBAQABPxA="
def test():
sock_path = "/tmp/test_worker.sock"
print("[Tester] 正在连接 Python Worker...")
try:
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(sock_path)
except Exception as e:
print(f"[-] 连接失败: {e}. 请确保 worker 已经启动。")
return
print("[Tester] 连接成功!发送模拟 HTTP POST 请求...")
# 模拟 C++ 转发过来的带 HTTP 头的请求
request = f"POST /invoke HTTP/1.1\r\nContent-Length: {len(test_img_base64)}\r\n\r\n{test_img_base64}"
client.sendall(request.encode('utf-8'))
response = b""
while True:
chunk = client.recv(4096)
if not chunk:
break
response += chunk
print("\n[Tester] 收到处理结果!")
print(response.decode('utf-8')[:150] + "......(省略后续 Base64 数据)")
client.close()
if __name__ == "__main__":
test()