Skip to content

Commit f14eea9

Browse files
committed
Try and make graphql connection more robust by reconnecting if the connection sliently dies.
Refactor _listen_to_see to make it less complexe
1 parent e2cc4a0 commit f14eea9

1 file changed

Lines changed: 107 additions & 48 deletions

File tree

pyhilo/graphql.py

Lines changed: 107 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,95 @@ async def subscribe_to_location_updated(
620620
location_hilo_id,
621621
)
622622

623+
# Seconds without any SSE event before treating the connection as stalled.
624+
# Hilo's server sends periodic keepalive comments; 120 s is a safe margin.
625+
_SSE_KEEPALIVE_TIMEOUT = 120
626+
# Reconnection back-off: starts at _BACKOFF_BASE, doubles each failure, caps at _BACKOFF_MAX.
627+
_BACKOFF_BASE = 5
628+
_BACKOFF_MAX = 300 # 5 minutes
629+
630+
def _parse_sse_message(self, sse: Any) -> Optional[Dict[str, Any]]:
631+
"""Parse raw SSE event data; returns None if it should be skipped."""
632+
if not sse.data:
633+
return None
634+
try:
635+
result: Dict[str, Any] = json.loads(sse.data)
636+
return result
637+
except json.JSONDecodeError:
638+
return None
639+
640+
def _handle_sse_message(
641+
self,
642+
data: Dict[str, Any],
643+
handler: Callable[[Dict[str, Any]], str],
644+
callback: Optional[Callable[[str], None]],
645+
) -> tuple:
646+
"""Dispatch a parsed SSE message. Returns (retry_apq, had_success)."""
647+
if "errors" in data:
648+
if any(
649+
e.get("message") == "PersistedQueryNotFound" for e in data["errors"]
650+
):
651+
return True, False
652+
LOG.error("GraphQL Subscription Errors: %s", data["errors"])
653+
return False, False
654+
if "data" in data:
655+
LOG.debug("Received subscription result %s", data["data"])
656+
handler_result = handler(data["data"])
657+
if callback:
658+
callback(handler_result)
659+
return False, True
660+
return False, False
661+
662+
async def _drain_sse_events(
663+
self,
664+
event_source: Any,
665+
handler: Callable[[Dict[str, Any]], str],
666+
callback: Optional[Callable[[str], None]],
667+
) -> tuple:
668+
"""Read events until the stream closes. Returns (retry_apq, had_success)."""
669+
had_success = False
670+
sse_iter = event_source.aiter_sse()
671+
while True:
672+
try:
673+
sse = await asyncio.wait_for(
674+
sse_iter.__anext__(),
675+
timeout=self._SSE_KEEPALIVE_TIMEOUT,
676+
)
677+
except StopAsyncIteration:
678+
break
679+
except asyncio.TimeoutError:
680+
LOG.warning(
681+
"SSE keepalive timeout (%ss without data), reconnecting...",
682+
self._SSE_KEEPALIVE_TIMEOUT,
683+
)
684+
raise
685+
data = self._parse_sse_message(sse)
686+
if data is None:
687+
continue
688+
retry, success = self._handle_sse_message(data, handler, callback)
689+
if retry:
690+
return True, had_success
691+
if success:
692+
had_success = True
693+
return False, had_success
694+
695+
async def _run_sse_connection(
696+
self,
697+
url: str,
698+
headers: Dict[str, str],
699+
payload: Dict[str, Any],
700+
handler: Callable[[Dict[str, Any]], str],
701+
callback: Optional[Callable[[str], None]],
702+
) -> tuple:
703+
"""Open one SSE connection and drain it. Returns (retry_apq, had_success)."""
704+
async with httpx.AsyncClient(
705+
http2=True, timeout=None, verify=self._ssl_context
706+
) as client:
707+
async with aconnect_sse(
708+
client, "POST", url, json=payload, headers=headers
709+
) as event_source:
710+
return await self._drain_sse_events(event_source, handler, callback)
711+
623712
async def _listen_to_sse(
624713
self,
625714
query: str,
@@ -630,70 +719,40 @@ async def _listen_to_sse(
630719
) -> None:
631720
query_hash = hashlib.sha256(query.encode("utf-8")).hexdigest()
632721
payload: Dict[str, Any] = {
633-
"extensions": {
634-
"persistedQuery": {
635-
"version": 1,
636-
"sha256Hash": query_hash,
637-
}
638-
},
722+
"extensions": {"persistedQuery": {"version": 1, "sha256Hash": query_hash}},
639723
"variables": variables,
640724
}
725+
backoff_delay = self._BACKOFF_BASE
641726

642727
while True:
643728
try:
644729
access_token = await self._get_access_token()
645730
url = f"https://{PLATFORM_HOST}/api/digital-twin/v3/graphql"
646731
headers = {"Authorization": f"Bearer {access_token}"}
647-
648-
retry_with_full_query = False
649-
650-
async with httpx.AsyncClient(
651-
http2=True, timeout=None, verify=self._ssl_context
652-
) as client:
653-
async with aconnect_sse(
654-
client, "POST", url, json=payload, headers=headers
655-
) as event_source:
656-
async for sse in event_source.aiter_sse():
657-
if not sse.data:
658-
continue
659-
try:
660-
data = json.loads(sse.data)
661-
except json.JSONDecodeError:
662-
continue
663-
664-
if "errors" in data:
665-
if any(
666-
e.get("message") == "PersistedQueryNotFound"
667-
for e in data["errors"]
668-
):
669-
retry_with_full_query = True
670-
break
671-
LOG.error(
672-
"GraphQL Subscription Errors: %s", data["errors"]
673-
)
674-
continue
675-
676-
if "data" in data:
677-
LOG.debug(
678-
"Received subscription result %s", data["data"]
679-
)
680-
handler_result = handler(data["data"])
681-
if callback:
682-
callback(handler_result)
683-
684-
if retry_with_full_query:
732+
retry_apq, had_success = await self._run_sse_connection(
733+
url, headers, payload, handler, callback
734+
)
735+
if had_success:
736+
backoff_delay = self._BACKOFF_BASE
737+
if retry_apq:
685738
payload["query"] = query
686739
continue
740+
# Server closed cleanly — brief pause before reconnecting.
741+
LOG.debug(
742+
"SSE connection closed by server. Reconnecting in 5 seconds..."
743+
)
744+
await asyncio.sleep(5)
687745

688746
except Exception as e:
689747
LOG.debug(
690-
"Subscription connection lost: %s. Reconnecting in 5 seconds...", e
748+
"Subscription connection lost: %s. Reconnecting in %s seconds...",
749+
e,
750+
backoff_delay,
691751
)
692-
await asyncio.sleep(5)
693-
# Reset payload to APQ only on reconnect
752+
await asyncio.sleep(backoff_delay)
753+
backoff_delay = min(backoff_delay * 2, self._BACKOFF_MAX)
694754
if "query" in payload:
695755
del payload["query"]
696-
697756
if location_hilo_id:
698757
try:
699758
await self.call_get_location_query(location_hilo_id)

0 commit comments

Comments
 (0)