Skip to content

Commit 0c43e97

Browse files
gendeuxgoogle-labs-jules[bot]
authored andcommitted
feat: client and server
Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
1 parent 3988ffd commit 0c43e97

13 files changed

Lines changed: 1228 additions & 0 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
---
2+
name: CI
3+
on: [push, pull_request]
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
strategy:
9+
fail-fast: false
10+
matrix:
11+
python-version: ["3.10", "3.11", "3.12"]
12+
13+
steps:
14+
- uses: actions/checkout@v6
15+
16+
- name: Set up PDM
17+
uses: pdm-project/setup-pdm@v4
18+
with:
19+
python-version: ${{ matrix.python-version }}
20+
21+
- name: Install dependencies
22+
run: pdm install
23+
24+
- name: Lint with PDM
25+
run: pdm run lint
26+
27+
- name: Test with PDM
28+
run: pdm run test

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pythonpath = ["src", "test"]
2828
testpaths = ["test"]
2929

3030
[project.scripts]
31+
rendezqueue-client = "rendezqueue.cli:main"
32+
rendezqueue-server = "rendezqueue.server:run"
3133

3234
[tool.pdm]
3335
package-dir = "src"

src/rendezqueue/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .client import RendezqueueClient
2+
3+
__all__ = ["RendezqueueClient"]

src/rendezqueue/cli.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import argparse
2+
import sys
3+
import threading
4+
import time
5+
from typing import List
6+
from .client import RendezqueueClient
7+
8+
def main() -> None:
9+
parser = argparse.ArgumentParser(description="Rendezqueue Client")
10+
parser.add_argument("--url", required=True, help="URL of the Rendezqueue server")
11+
parser.add_argument("--key", required=True, help="Session key")
12+
parser.add_argument("--hue", default="cli", help="Client hue (ID prefix)")
13+
14+
args = parser.parse_args()
15+
16+
stop_event = threading.Event()
17+
18+
def on_data(values: List[bytes]) -> None:
19+
for v in values:
20+
try:
21+
# Try to decode as utf-8, fallback to repr or raw bytes
22+
print(v.decode('utf-8'))
23+
except UnicodeDecodeError:
24+
print(v)
25+
sys.stdout.flush()
26+
27+
def on_error(e: Exception) -> None:
28+
print(f"Error: {e}", file=sys.stderr)
29+
30+
client = RendezqueueClient(
31+
url=args.url,
32+
key=args.key,
33+
hue=args.hue,
34+
on_data=on_data,
35+
on_error=on_error
36+
)
37+
38+
try:
39+
client.start()
40+
41+
# Reader thread
42+
def read_stdin() -> None:
43+
try:
44+
for line in sys.stdin:
45+
if stop_event.is_set():
46+
break
47+
# Strip newline
48+
msg = line.rstrip('\n')
49+
client.send(msg)
50+
except Exception:
51+
# stdin closed or error
52+
pass
53+
finally:
54+
pass
55+
56+
input_thread = threading.Thread(target=read_stdin, daemon=True)
57+
input_thread.start()
58+
59+
# Main loop to keep the main thread alive and monitor stop_event
60+
while not stop_event.is_set():
61+
time.sleep(0.1)
62+
63+
except KeyboardInterrupt:
64+
pass
65+
finally:
66+
stop_event.set()
67+
client.stop()
68+
69+
if __name__ == "__main__":
70+
main()

src/rendezqueue/client.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import base64
2+
import json
3+
import logging
4+
import random
5+
import string
6+
import threading
7+
import time
8+
import urllib.request
9+
import urllib.error
10+
from typing import Callable, Optional, List, Dict, Any
11+
12+
logger = logging.getLogger(__name__)
13+
14+
class RendezqueueClient:
15+
def __init__(self, url: str, key: str, hue: str, on_data: Optional[Callable[[List[bytes]], None]] = None, on_error: Optional[Callable[[Exception], None]] = None, poll_interval_ms: int = 2000) -> None:
16+
self.url = url
17+
self.key = key
18+
self.hue = hue
19+
self.on_data = on_data
20+
self.on_error = on_error or (lambda e: logger.error(f"Rendezqueue error: {e}"))
21+
self.poll_interval_ms = poll_interval_ms
22+
23+
self.sid_counter = 1
24+
self.sid = self._generate_sid()
25+
self.offset = 0
26+
self.outgoing_queue: List[bytes] = []
27+
self.lock = threading.Lock()
28+
29+
self.is_polling = False
30+
self.is_stopped = True
31+
self.poll_thread: Optional[threading.Thread] = None
32+
33+
def _generate_sid(self) -> str:
34+
random_part = ''.join(random.choices(string.ascii_lowercase + string.digits, k=7))
35+
return f"{self.hue}-{self.sid_counter}-{random_part}"
36+
37+
def start(self) -> None:
38+
if not self.is_stopped:
39+
return
40+
self.is_stopped = False
41+
self.poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
42+
self.poll_thread.start()
43+
44+
def stop(self) -> None:
45+
if self.is_stopped:
46+
return
47+
self.is_stopped = True
48+
49+
def send(self, value: Any) -> None:
50+
if isinstance(value, str):
51+
value = value.encode('utf-8')
52+
with self.lock:
53+
self.outgoing_queue.append(value)
54+
55+
def _start_new_session(self) -> None:
56+
with self.lock:
57+
self.sid_counter += 1
58+
self.sid = self._generate_sid()
59+
self.offset = 0
60+
61+
def _poll_loop(self) -> None:
62+
# Initial poll
63+
self._poll()
64+
while not self.is_stopped:
65+
time.sleep(self.poll_interval_ms / 1000.0)
66+
if self.is_stopped:
67+
break
68+
self._poll()
69+
70+
def _poll(self) -> None:
71+
if self.is_polling:
72+
return
73+
self.is_polling = True
74+
try:
75+
with self.lock:
76+
req_sid = self.sid
77+
req_offset = self.offset
78+
# Take snapshot of queue to send
79+
snapshot_queue = list(self.outgoing_queue)
80+
b64_values = [base64.b64encode(v).decode('ascii') for v in snapshot_queue]
81+
82+
request_body = {
83+
"key": self.key,
84+
"sid": req_sid,
85+
"offset": req_offset,
86+
"values": b64_values,
87+
"b64": 1,
88+
}
89+
90+
data = json.dumps(request_body).encode('utf-8')
91+
headers = {
92+
'Content-Type': 'application/json',
93+
'User-Agent': 'Rendezqueue-Python-Client/0.0.0',
94+
}
95+
req = urllib.request.Request(self.url, data=data, headers=headers)
96+
97+
try:
98+
with urllib.request.urlopen(req, timeout=10) as response:
99+
if response.status != 200:
100+
text = response.read().decode('utf-8')
101+
self.on_error(Exception(f"Server error: {response.status} {text}"))
102+
return
103+
104+
resp_body = response.read().decode('utf-8')
105+
msg = json.loads(resp_body)
106+
data = self._decode_response(msg)
107+
108+
received_values = data.get('values', [])
109+
# Session ended if we got values OR (server offset > 0 and no ttl/keepalive)
110+
session_has_ended = len(received_values) > 0 or (data.get('offset', 0) > 0 and 'ttl' not in data)
111+
112+
with self.lock:
113+
if self.sid != req_sid:
114+
# Session changed, ignore response
115+
return
116+
117+
if session_has_ended:
118+
# Remove sent messages
119+
sent_count = len(snapshot_queue)
120+
del self.outgoing_queue[:sent_count]
121+
122+
# Note: self.offset is reset by _start_new_session anyway
123+
else:
124+
server_offset = data.get('offset', 0)
125+
accepted_count = server_offset - self.offset
126+
if accepted_count > 0:
127+
del self.outgoing_queue[:accepted_count]
128+
self.offset = server_offset
129+
130+
if session_has_ended:
131+
if self.on_data:
132+
self.on_data(received_values)
133+
self._start_new_session()
134+
135+
except urllib.error.HTTPError as e:
136+
text = e.read().decode('utf-8')
137+
self.on_error(Exception(f"Server error: {e.code} {text}"))
138+
except Exception as e:
139+
self.on_error(e)
140+
141+
except Exception as e:
142+
self.on_error(e)
143+
finally:
144+
self.is_polling = False
145+
146+
def _decode_response(self, msg: Dict[str, Any]) -> Dict[str, Any]:
147+
b64_flags = msg.get('b64', 0)
148+
if b64_flags & 4:
149+
msg['key'] = self._b64decode_padded(msg['key']).decode('utf-8')
150+
if b64_flags & 2:
151+
msg['sid'] = self._b64decode_padded(msg['sid']).decode('utf-8')
152+
if 'values' in msg and (b64_flags & 1):
153+
msg['values'] = [self._b64decode_padded(v) for v in msg['values']]
154+
return msg
155+
156+
def _b64decode_padded(self, s: str) -> bytes:
157+
missing_padding = len(s) % 4
158+
if missing_padding:
159+
s += '=' * (4 - missing_padding)
160+
return base64.b64decode(s, validate=False)

0 commit comments

Comments
 (0)