-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstreaming_client.py
More file actions
65 lines (50 loc) · 2.06 KB
/
streaming_client.py
File metadata and controls
65 lines (50 loc) · 2.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from httpx import AsyncClient
import asyncio
import websockets
import json
import msgpack
import os
# from httpx_ws import aconnect_ws
# from httpx_ws.transport import ASGIWebSocketTransport
# async def stream_listener(path):
# nonlocal ac
# subprotocols = ["application/octet-stream"]
# async with aconnect_ws(
# f"http://localhost/stream/{path}?cursor=1", ac,
# subprotocols=subprotocols,
# ) as ws:
# for i in range(3):
# message = await ws.receive_bytes()
# data = np.frombuffer(message, dtype=np.int64)
# print(f"client received data {path = }, {data=}")
# await asyncio.sleep(1)
# ac = AsyncClient(
# transport=ASGIWebSocketTransport(app=app), base_url="http://localhost"
# )
# Get base URL from environment variable, default to localhost
REDIS_WS_API_URL = os.getenv("REDIS_WS_API_URL", "localhost:8000")
client = AsyncClient(base_url=f"http://{REDIS_WS_API_URL}")
print("streaming_client starting")
async def get_live():
result = await client.get("/stream/live")
return result.json()
async def stream_node(node_id: str, envelope_format="json"):
# Create WebSocket URL from base URL
websocket_url = f"ws://{REDIS_WS_API_URL}/stream/single/{node_id}?envelope_format={envelope_format}&seq_num=1"
async with websockets.connect(websocket_url) as websocket:
print(f"Connected to {websocket_url}")
try:
while True:
message = await websocket.recv()
if isinstance(message, bytes) and envelope_format == "msgpack":
message = msgpack.unpackb(message)
print(f"Received Msgpack: {message}")
else:
print(f"Received JSON: {json.loads(message)}")
except websockets.exceptions.ConnectionClosed as e:
print(f"Connection closed {e}")
async def main():
result = await get_live()
print(result)
await stream_node("481980", envelope_format="msgpack")
asyncio.run(main())