Skip to content

Commit b98413e

Browse files
feat: client and server
Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
1 parent 3988ffd commit b98413e

13 files changed

Lines changed: 1348 additions & 0 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
---
2+
name: CI
3+
on: [push, pull_request]
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
strategy:
9+
fail-fast: false
10+
matrix:
11+
python-version: ["3.10", "3.11", "3.12"]
12+
13+
steps:
14+
- uses: actions/checkout@v6
15+
16+
- name: Set up PDM
17+
uses: pdm-project/setup-pdm@v4
18+
with:
19+
python-version: ${{ matrix.python-version }}
20+
21+
- name: Install dependencies
22+
run: pdm install
23+
24+
- name: Lint with PDM
25+
run: pdm run lint
26+
27+
- name: Test with PDM
28+
run: pdm run test

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pythonpath = ["src", "test"]
2828
testpaths = ["test"]
2929

3030
[project.scripts]
31+
rendezqueue-client = "rendezqueue.cli:main"
32+
rendezqueue-server = "rendezqueue.server:run"
3133

3234
[tool.pdm]
3335
package-dir = "src"

src/rendezqueue/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .client import RendezqueueClient
2+
3+
__all__ = ["RendezqueueClient"]

src/rendezqueue/cli.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import argparse
2+
import sys
3+
import threading
4+
import time
5+
from typing import List
6+
from .client import RendezqueueClient
7+
8+
9+
def main() -> None:
10+
parser = argparse.ArgumentParser(description="Rendezqueue Client")
11+
parser.add_argument("--url", required=True, help="URL of the Rendezqueue server")
12+
parser.add_argument("--key", required=True, help="Session key")
13+
parser.add_argument("--hue", default="cli", help="Client hue (ID prefix)")
14+
15+
args = parser.parse_args()
16+
17+
stop_event = threading.Event()
18+
19+
def on_data(values: List[bytes]) -> None:
20+
for v in values:
21+
try:
22+
# Try to decode as utf-8, fallback to repr or raw bytes
23+
print(v.decode("utf-8"))
24+
except UnicodeDecodeError:
25+
print(v)
26+
sys.stdout.flush()
27+
28+
def on_error(e: Exception) -> None:
29+
print(f"Error: {e}", file=sys.stderr)
30+
31+
client = RendezqueueClient(
32+
url=args.url, key=args.key, hue=args.hue, on_data=on_data, on_error=on_error
33+
)
34+
35+
try:
36+
client.start()
37+
38+
# Reader thread
39+
def read_stdin() -> None:
40+
try:
41+
for line in sys.stdin:
42+
if stop_event.is_set():
43+
break
44+
# Strip newline
45+
msg = line.rstrip("\n")
46+
client.send(msg)
47+
except Exception:
48+
# stdin closed or error
49+
pass
50+
finally:
51+
pass
52+
53+
input_thread = threading.Thread(target=read_stdin, daemon=True)
54+
input_thread.start()
55+
56+
# Main loop to keep the main thread alive and monitor stop_event
57+
while not stop_event.is_set():
58+
time.sleep(0.1)
59+
60+
except KeyboardInterrupt:
61+
pass
62+
finally:
63+
stop_event.set()
64+
client.stop()
65+
66+
67+
if __name__ == "__main__":
68+
main()

src/rendezqueue/client.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import base64
2+
import json
3+
import logging
4+
import random
5+
import string
6+
import threading
7+
import time
8+
import urllib.request
9+
import urllib.error
10+
from typing import Callable, Optional, List, Dict, Any
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class RendezqueueClient:
16+
def __init__(
17+
self,
18+
url: str,
19+
key: str,
20+
hue: str,
21+
on_data: Optional[Callable[[List[bytes]], None]] = None,
22+
on_error: Optional[Callable[[Exception], None]] = None,
23+
poll_interval_ms: int = 2000,
24+
) -> None:
25+
self.url = url
26+
self.key = key
27+
self.hue = hue
28+
self.on_data = on_data
29+
self.on_error = on_error or (lambda e: logger.error(f"Rendezqueue error: {e}"))
30+
self.poll_interval_ms = poll_interval_ms
31+
32+
self.sid_counter = 1
33+
self.sid = self._generate_sid()
34+
self.offset = 0
35+
self.outgoing_queue: List[bytes] = []
36+
self.lock = threading.Lock()
37+
38+
self.is_polling = False
39+
self.is_stopped = True
40+
self.poll_thread: Optional[threading.Thread] = None
41+
42+
def _generate_sid(self) -> str:
43+
random_part = "".join(
44+
random.choices(string.ascii_lowercase + string.digits, k=7)
45+
)
46+
return f"{self.hue}-{self.sid_counter}-{random_part}"
47+
48+
def start(self) -> None:
49+
if not self.is_stopped:
50+
return
51+
self.is_stopped = False
52+
self.poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
53+
self.poll_thread.start()
54+
55+
def stop(self) -> None:
56+
if self.is_stopped:
57+
return
58+
self.is_stopped = True
59+
60+
def send(self, value: Any) -> None:
61+
if isinstance(value, str):
62+
value = value.encode("utf-8")
63+
with self.lock:
64+
self.outgoing_queue.append(value)
65+
66+
def _start_new_session(self) -> None:
67+
with self.lock:
68+
self.sid_counter += 1
69+
self.sid = self._generate_sid()
70+
self.offset = 0
71+
72+
def _poll_loop(self) -> None:
73+
# Initial poll
74+
self._poll()
75+
while not self.is_stopped:
76+
time.sleep(self.poll_interval_ms / 1000.0)
77+
if self.is_stopped:
78+
break
79+
self._poll()
80+
81+
def _poll(self) -> None:
82+
if self.is_polling:
83+
return
84+
self.is_polling = True
85+
try:
86+
with self.lock:
87+
req_sid = self.sid
88+
req_offset = self.offset
89+
# Take snapshot of queue to send
90+
snapshot_queue = list(self.outgoing_queue)
91+
b64_values = [
92+
base64.b64encode(v).decode("ascii") for v in snapshot_queue
93+
]
94+
95+
request_body = {
96+
"key": self.key,
97+
"sid": req_sid,
98+
"offset": req_offset,
99+
"values": b64_values,
100+
"b64": 1,
101+
}
102+
103+
data = json.dumps(request_body).encode("utf-8")
104+
headers = {
105+
"Content-Type": "application/json",
106+
"User-Agent": "Rendezqueue-Python-Client/0.0.0",
107+
}
108+
req = urllib.request.Request(self.url, data=data, headers=headers)
109+
110+
try:
111+
with urllib.request.urlopen(req, timeout=10) as response:
112+
if response.status != 200:
113+
text = response.read().decode("utf-8")
114+
self.on_error(
115+
Exception(f"Server error: {response.status} {text}")
116+
)
117+
return
118+
119+
resp_body = response.read().decode("utf-8")
120+
msg = json.loads(resp_body)
121+
data = self._decode_response(msg)
122+
123+
received_values = data.get("values", [])
124+
# Session ended if we got values OR (server offset > 0 and no ttl/keepalive)
125+
session_has_ended = len(received_values) > 0 or (
126+
data.get("offset", 0) > 0 and "ttl" not in data
127+
)
128+
129+
with self.lock:
130+
if self.sid != req_sid:
131+
# Session changed, ignore response
132+
return
133+
134+
if session_has_ended:
135+
# Remove sent messages
136+
sent_count = len(snapshot_queue)
137+
del self.outgoing_queue[:sent_count]
138+
139+
# Note: self.offset is reset by _start_new_session anyway
140+
else:
141+
server_offset = data.get("offset", 0)
142+
accepted_count = server_offset - self.offset
143+
if accepted_count > 0:
144+
del self.outgoing_queue[:accepted_count]
145+
self.offset = server_offset
146+
147+
if session_has_ended:
148+
if self.on_data:
149+
self.on_data(received_values)
150+
self._start_new_session()
151+
152+
except urllib.error.HTTPError as e:
153+
text = e.read().decode("utf-8")
154+
self.on_error(Exception(f"Server error: {e.code} {text}"))
155+
except Exception as e:
156+
self.on_error(e)
157+
158+
except Exception as e:
159+
self.on_error(e)
160+
finally:
161+
self.is_polling = False
162+
163+
def _decode_response(self, msg: Dict[str, Any]) -> Dict[str, Any]:
164+
b64_flags = msg.get("b64", 0)
165+
if b64_flags & 4:
166+
msg["key"] = self._b64decode_padded(msg["key"]).decode("utf-8")
167+
if b64_flags & 2:
168+
msg["sid"] = self._b64decode_padded(msg["sid"]).decode("utf-8")
169+
if "values" in msg and (b64_flags & 1):
170+
msg["values"] = [self._b64decode_padded(v) for v in msg["values"]]
171+
return msg
172+
173+
def _b64decode_padded(self, s: str) -> bytes:
174+
missing_padding = len(s) % 4
175+
if missing_padding:
176+
s += "=" * (4 - missing_padding)
177+
return base64.b64decode(s, validate=False)

0 commit comments

Comments
 (0)