Skip to content

Commit 057efe3

Browse files
Implement Rendezqueue Server in Python
- Implemented `SwapStore` in `src/rendezqueue/swapstore.py` matching JS logic. - Implemented `RendezqueueImpl` in `src/rendezqueue/impl.py` for protocol handling. - Implemented `RendezqueueServer` in `src/rendezqueue/server.py` using `http.server`. - Added SxPB test cases in `test/server_test_cases.sxpb`. - Added test runner `test/server_test.py` using `sxpb` library. - Added integration test `test/integration_test.py` to verify client-server interaction. - Exposed server classes in `src/rendezqueue/__init__.py`. Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
1 parent e3cc2ed commit 057efe3

File tree

7 files changed

+777
-1
lines changed

7 files changed

+777
-1
lines changed

src/rendezqueue/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
from .client import RendezqueueClient
2+
from .server import RendezqueueServer, RendezqueueHandler, run as run_server
3+
from .impl import RendezqueueImpl
4+
from .swapstore import SwapStore
25

3-
__all__ = ["RendezqueueClient"]
6+
__all__ = ["RendezqueueClient", "RendezqueueServer", "RendezqueueHandler", "run_server", "RendezqueueImpl", "SwapStore"]

src/rendezqueue/impl.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import json
2+
import base64
3+
import time
4+
from typing import Optional, Union, Any, Dict
5+
from rendezqueue.swapstore import SwapStore, TrySwapResponse
6+
7+
MAX_KEY_BYTES = 100
8+
MAX_ID_BYTES = 100
9+
MAX_VALUE_BYTES = 65536
10+
11+
def btoa(s: str) -> str:
12+
# URL-safe base64 encoding without padding
13+
return base64.urlsafe_b64encode(s.encode('latin1')).decode('ascii').rstrip('=')
14+
15+
def atob(s: str) -> str:
16+
# URL-safe base64 decoding with padding added
17+
missing_padding = len(s) % 4
18+
if missing_padding:
19+
s += '=' * (4 - missing_padding)
20+
return base64.urlsafe_b64decode(s).decode('latin1')
21+
22+
def inplace_decode_tryswap_message(msg: Dict[str, Any]) -> str:
23+
if 'b64' not in msg:
24+
msg['b64'] = 0
25+
elif not isinstance(msg['b64'], int) or msg['b64'] < 0:
26+
return "b64"
27+
28+
if 'ttl' not in msg:
29+
msg['ttl'] = 0
30+
elif not isinstance(msg['ttl'], int) or msg['ttl'] < 0:
31+
return "ttl"
32+
33+
if 'key' not in msg:
34+
msg['key'] = ""
35+
elif not isinstance(msg['key'], str):
36+
return "key"
37+
38+
if 'sid' not in msg:
39+
msg['sid'] = ""
40+
elif not isinstance(msg['sid'], str):
41+
return "sid"
42+
43+
if 'offset' not in msg:
44+
msg['offset'] = 0
45+
elif not isinstance(msg['offset'], int) or msg['offset'] < 0:
46+
return "offset"
47+
48+
if 'values' not in msg:
49+
msg['values'] = []
50+
elif not isinstance(msg['values'], list):
51+
return "values"
52+
53+
try:
54+
if msg['b64'] & 4:
55+
msg['key'] = atob(msg['key'])
56+
if msg['b64'] & 2:
57+
msg['sid'] = atob(msg['sid'])
58+
if msg['b64'] & 1:
59+
msg['values'] = [atob(v) for v in msg['values']]
60+
except Exception:
61+
return "b64_decode_error" # Custom error, JS might just crash or throw
62+
63+
return ""
64+
65+
def inplace_encode_tryswap_message(msg: Dict[str, Any]) -> None:
66+
if 'b64' not in msg:
67+
msg['b64'] = 0
68+
69+
if msg['b64'] & 4:
70+
msg['key'] = btoa(msg['key'])
71+
if msg['b64'] & 2:
72+
msg['sid'] = btoa(msg['sid'])
73+
if msg['b64'] & 1:
74+
if 'values' not in msg or msg['values'] is None:
75+
msg['b64'] &= ~1
76+
else:
77+
msg['values'] = [btoa(v) for v in msg['values']]
78+
79+
if msg['b64'] == 0:
80+
del msg['b64']
81+
if 'ttl' in msg and msg['ttl'] == 0:
82+
del msg['ttl']
83+
if 'offset' in msg and msg['offset'] == 0:
84+
del msg['offset']
85+
# If values is None, remove it (TrySwapResponse handles optional)
86+
if 'values' in msg and msg['values'] is None:
87+
del msg['values']
88+
89+
class RendezqueueImpl:
90+
def __init__(self):
91+
self.swapstore = SwapStore()
92+
93+
def tryswap(self, msg: Any, now_ms: Optional[float] = None) -> Union[int, TrySwapResponse]:
94+
if not msg:
95+
return 400
96+
97+
# We assume msg is a dict here, validated by inplace_decode... but we should be careful
98+
if not isinstance(msg, dict):
99+
return 400
100+
101+
key = msg.get('key', '')
102+
sid = msg.get('sid', '')
103+
offset = msg.get('offset', 0)
104+
values = msg.get('values', [])
105+
106+
if len(key) > MAX_KEY_BYTES:
107+
return 413
108+
if len(sid) > MAX_ID_BYTES:
109+
return 413
110+
111+
total_value_bytes = sum(len(v) for v in values)
112+
if total_value_bytes > MAX_VALUE_BYTES:
113+
return 413
114+
115+
if now_ms is None:
116+
now_ms = time.time() * 1000
117+
if now_ms == 0: # unlikely but matches JS
118+
return 500
119+
now_ms = int(now_ms) # JS uses Math.floor
120+
121+
result = self.swapstore.tryswap(
122+
key, sid, offset, values,
123+
now_ms, msg.get('ttl', 0),
124+
)
125+
126+
if not isinstance(result, int):
127+
# It's a TrySwapResponse object. We need to convert it to dict for encoding
128+
# or wrap it.
129+
# But the caller expects to encode it.
130+
# Python dataclass to dict:
131+
# We can use vars(result) or similar
132+
# But we need to preserve b64 from request
133+
pass
134+
135+
return result
136+
137+
def tryswap_string(self, request_text: str) -> Union[int, str]:
138+
msg = None
139+
try:
140+
msg = json.loads(request_text)
141+
e = inplace_decode_tryswap_message(msg)
142+
if e:
143+
# In JS: throw e
144+
# Here we can return error code or message?
145+
# The JS code catches 'e' and logs it, then sets msg = null.
146+
# So if decoding fails, msg is null.
147+
raise Exception(e)
148+
except Exception as e:
149+
# print(e) # Optional logging
150+
msg = None
151+
152+
result = self.tryswap(msg)
153+
if isinstance(result, int):
154+
return result
155+
156+
# Convert dataclass to dict
157+
res_dict = {
158+
'key': result.key,
159+
'sid': result.sid,
160+
'offset': result.offset,
161+
}
162+
if result.values is not None:
163+
res_dict['values'] = result.values
164+
if result.ttl is not None:
165+
res_dict['ttl'] = result.ttl
166+
167+
# Carry over b64 flags from request if present
168+
if msg and 'b64' in msg:
169+
res_dict['b64'] = msg['b64']
170+
171+
inplace_encode_tryswap_message(res_dict)
172+
return json.dumps(res_dict)

src/rendezqueue/server.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import http.server
2+
import socketserver
3+
import argparse
4+
from urllib.parse import urlparse
5+
from rendezqueue.impl import RendezqueueImpl
6+
7+
class RendezqueueHandler(http.server.BaseHTTPRequestHandler):
8+
def do_OPTIONS(self):
9+
self.send_response(204)
10+
if "Access-Control-Request-Headers" in self.headers:
11+
self.send_header("Access-Control-Allow-Headers", self.headers["Access-Control-Request-Headers"])
12+
if "Access-Control-Request-Method" in self.headers:
13+
self.send_header("Access-Control-Allow-Methods", self.headers["Access-Control-Request-Method"])
14+
if "Origin" in self.headers:
15+
self.send_header("Access-Control-Allow-Origin", self.headers["Origin"])
16+
self.end_headers()
17+
18+
def do_POST(self):
19+
parsed_path = urlparse(self.path)
20+
if parsed_path.path != self.server.http_path:
21+
self.send_error(404, "Not Found")
22+
return
23+
24+
content_type = self.headers.get("Content-Type", "")
25+
if content_type != "application/json":
26+
self.send_error(418, "I'm a teapot")
27+
return
28+
29+
content_length = int(self.headers.get("Content-Length", 0))
30+
body = self.rfile.read(content_length).decode("utf-8")
31+
32+
result = self.server.impl.tryswap_string(body)
33+
34+
if isinstance(result, int):
35+
self.send_response(result)
36+
self.send_header("Access-Control-Allow-Origin", "*")
37+
self.send_header("Content-Type", "application/json")
38+
self.end_headers()
39+
else:
40+
self.send_response(200)
41+
self.send_header("Access-Control-Allow-Origin", "*")
42+
self.send_header("Content-Type", "application/json")
43+
self.end_headers()
44+
self.wfile.write(result.encode("utf-8"))
45+
46+
class RendezqueueServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
47+
def __init__(self, server_address, RequestHandlerClass, impl, http_path="/"):
48+
self.impl = impl
49+
self.http_path = http_path
50+
super().__init__(server_address, RequestHandlerClass)
51+
52+
def run():
53+
parser = argparse.ArgumentParser(description="Rendezqueue Server")
54+
parser.add_argument("--http_host", default="127.0.0.1", help="Host to bind to")
55+
parser.add_argument("--http_port", type=int, default=0, help="Port to bind to (0 for random)")
56+
parser.add_argument("--http_path", default="/", help="Path for RPC")
57+
parser.add_argument("--port_filepath", help="File to write the port number to")
58+
59+
# Allow unknown args to be ignored (e.g. for testing or future flags)
60+
args, unknown = parser.parse_known_args()
61+
62+
impl = RendezqueueImpl()
63+
64+
server = RendezqueueServer(
65+
(args.http_host, args.http_port),
66+
RendezqueueHandler,
67+
impl,
68+
http_path=args.http_path
69+
)
70+
71+
host, port = server.server_address
72+
print(f"Server running at http://{host}:{port}{args.http_path}")
73+
74+
if args.port_filepath:
75+
with open(args.port_filepath, "w") as f:
76+
f.write(str(port))
77+
78+
try:
79+
server.serve_forever()
80+
except KeyboardInterrupt:
81+
pass
82+
finally:
83+
server.server_close()
84+
85+
if __name__ == "__main__":
86+
run()

0 commit comments

Comments
 (0)