-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue_redis.py
More file actions
84 lines (66 loc) · 2.29 KB
/
queue_redis.py
File metadata and controls
84 lines (66 loc) · 2.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import json
import hmac
import hashlib
from datetime import datetime, timezone
import redis
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0").strip()
QUEUE_SIGNING_SECRET = os.getenv("QUEUE_SIGNING_SECRET", "").strip()
r = redis.from_url(REDIS_URL, decode_responses=True)
def _jdump(obj) -> str:
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _hmac_sig(data: str) -> str:
return hmac.new(QUEUE_SIGNING_SECRET.encode("utf-8"), data.encode("utf-8"), hashlib.sha256).hexdigest()
def _wrap(payload: dict) -> dict:
"""
Envelope format:
{ "v":1, "ts":"...", "payload":{...}, "sig":"..." }
"""
if not QUEUE_SIGNING_SECRET:
return payload # signing disabled
ts = _now_iso()
body = {"v": 1, "ts": ts, "payload": payload}
sig = _hmac_sig(_jdump(body))
body["sig"] = sig
return body
def _unwrap(obj: dict) -> dict | None:
if not QUEUE_SIGNING_SECRET:
return obj
# must be envelope
if not isinstance(obj, dict) or "payload" not in obj or "sig" not in obj or "ts" not in obj:
return None
sig = str(obj.get("sig") or "")
unsigned = {"v": obj.get("v", 1), "ts": obj.get("ts"), "payload": obj.get("payload")}
expected = _hmac_sig(_jdump(unsigned))
if not hmac.compare_digest(expected, sig):
return None
payload = obj.get("payload")
return payload if isinstance(payload, dict) else None
def qpush(queue_name: str, payload: dict) -> None:
msg = _wrap(payload)
r.rpush(queue_name, _jdump(msg))
def qpop(queue_name: str, timeout: int = 0) -> dict | None:
"""
BLPOP if timeout > 0 else LPOP.
Returns dict payload or None if empty/invalid/tampered.
"""
if timeout and timeout > 0:
item = r.blpop(queue_name, timeout=timeout)
if not item:
return None
_, raw = item
else:
raw = r.lpop(queue_name)
if raw is None:
return None
try:
obj = json.loads(raw)
except Exception:
return None
return _unwrap(obj)
# --- Compatibility wrapper for sentinel_api enqueue path ---
def get_queue_redis():
# queue_redis.py already uses a module-level redis client named `r`
return r