Skip to content

Commit ffcb022

Browse files
jkebingerclaude
andauthored
Add SSE watchdog and improve connection error handling (#20)
* Add SSE watchdog for connection health monitoring Adds a watchdog thread that monitors SSE connection health by tracking when data (including keepalives) is received. If no data is received for 120 seconds (configurable), the watchdog: 1. Logs a warning 2. Polls the checkpoint API for fresh config data 3. Closes the SSE client to force reconnection This helps detect and recover from stuck SSE connections that may not trigger normal timeout/error handling (e.g., proxy issues, half-open connections). Additional improvements: - Changed except Exception to except BaseException to catch GeneratorExit and other BaseException subclasses that could silently kill the thread - Added logging when streaming loop exits (with shutdown reason) - Fixed backoff logging to show actual sleep time instead of pre-doubled value - Removed dead code (ConfigSDK.sse_client was never assigned) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix 401/403 handling in SSE streaming loop The previous code caught UnauthorizedException which is never raised by raise_for_status(). Instead, HTTPError is raised. This change: - Catches HTTPError and inspects response.status_code for 401/403 - Removes dead UnauthorizedException catch block - Adds specific tests for 401 and 403 responses Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Ensure streaming starts even when checkpoint loading fails If load_checkpoint() fails (no data found or unexpected exception), streaming would never start because finish_init() was never called. This fix starts streaming as a fallback when checkpoint loading fails, but does NOT call finish_init() - this preserves the timeout behavior where get() blocks until timeout if no data is available. - Start streaming when CDN and cache both fail to load - Start streaming when unexpected exception occurs - Do NOT start streaming on UnauthorizedException (handled separately) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Bump version to 1.2.0 and add dev runner script - Bump version to 1.2.0 for SSE watchdog and error handling improvements - Add dev_runner.py for observing SDK behavior during development Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Pin Poetry version to 1.8.5 in CI workflows Poetry 2.x installation was failing in GitHub Actions. Pin to 1.8.5 for stability. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 245b9f2 commit ffcb022

File tree

11 files changed

+801
-56
lines changed

11 files changed

+801
-56
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ jobs:
3434
- name: Install Poetry
3535
uses: snok/install-poetry@v1
3636
with:
37+
version: "1.8.5"
3738
virtualenvs-create: true
3839

3940
- name: Install dependencies

.github/workflows/publish.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ jobs:
4747
if: steps.version-check.outputs.skip == 'false'
4848
uses: snok/install-poetry@v1
4949
with:
50+
version: "1.8.5"
5051
virtualenvs-create: true
5152

5253
- name: Install dependencies

dev_runner.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Development runner to observe SDK behavior including SSE streaming and watchdog.
4+
5+
Usage:
6+
REFORGE_SDK_KEY=your-key python dev_runner.py
7+
8+
Or set a specific config key to watch:
9+
REFORGE_SDK_KEY=your-key python dev_runner.py my.config.key
10+
"""
11+
12+
import logging
13+
import sys
14+
import time
15+
import os
16+
17+
from sdk_reforge import ReforgeSDK, Options
18+
19+
20+
def setup_logging() -> None:
21+
"""Configure logging to show SDK internals."""
22+
root_logger = logging.getLogger()
23+
handler = logging.StreamHandler(sys.stdout)
24+
handler.setFormatter(
25+
logging.Formatter(
26+
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
27+
datefmt="%H:%M:%S",
28+
)
29+
)
30+
root_logger.addHandler(handler)
31+
32+
# Set root to DEBUG to see everything
33+
root_logger.setLevel(logging.DEBUG)
34+
35+
# Quiet down noisy libraries
36+
logging.getLogger("urllib3").setLevel(logging.WARNING)
37+
logging.getLogger("requests").setLevel(logging.WARNING)
38+
39+
40+
def main() -> None:
41+
setup_logging()
42+
43+
sdk_key = os.environ.get("REFORGE_SDK_KEY")
44+
if not sdk_key:
45+
print("Error: REFORGE_SDK_KEY environment variable not set")
46+
print("Usage: REFORGE_SDK_KEY=your-key python dev_runner.py [config.key]")
47+
sys.exit(1)
48+
49+
# Optional: config key to watch
50+
watch_key = sys.argv[1] if len(sys.argv) > 1 else None
51+
52+
print(f"Starting SDK with key: {sdk_key[:10]}...")
53+
print(f"Watching config key: {watch_key or '(none)'}")
54+
print("Press Ctrl+C to stop\n")
55+
print("=" * 60)
56+
57+
options = Options(
58+
sdk_key=sdk_key,
59+
connection_timeout_seconds=10,
60+
)
61+
62+
sdk = ReforgeSDK(options)
63+
config_sdk = sdk.config_sdk()
64+
65+
print("=" * 60)
66+
print("SDK initialized, entering main loop...")
67+
print("=" * 60 + "\n")
68+
69+
try:
70+
iteration = 0
71+
while True:
72+
iteration += 1
73+
74+
status_parts = [
75+
f"[{iteration}]",
76+
f"initialized={config_sdk.is_ready()}",
77+
f"hwm={config_sdk.highwater_mark()}",
78+
]
79+
80+
if watch_key:
81+
try:
82+
value = config_sdk.get(watch_key, default="<not found>")
83+
status_parts.append(f"{watch_key}={value!r}")
84+
except Exception as e:
85+
status_parts.append(f"{watch_key}=<error: {e}>")
86+
87+
print(" | ".join(status_parts))
88+
time.sleep(5)
89+
90+
except KeyboardInterrupt:
91+
print("\n\nShutting down...")
92+
93+
sdk.close()
94+
print("Done.")
95+
96+
97+
if __name__ == "__main__":
98+
main()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "sdk-reforge"
3-
version = "1.1.1"
3+
version = "1.2.0"
44
description = "Python sdk for Reforge Feature Flags and Config as a Service: https://www.reforge.com"
55
license = "MIT"
66
authors = ["Michael Berkowitz <michael.berkowitz@gmail.com>", "James Kebinger <james.kebinger@reforge.com>"]

sdk_reforge/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.1
1+
1.2.0

sdk_reforge/_sse_connection_manager.py

Lines changed: 82 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
import base64
22
import time
3-
from typing import Optional, Callable
3+
from typing import Optional, Callable, TYPE_CHECKING
44

55
import sseclient # type: ignore
66
from requests import Response
7+
from requests.exceptions import HTTPError
78

89
from sdk_reforge._internal_logging import InternalLogger
9-
from sdk_reforge._requests import ApiClient, UnauthorizedException
10+
from sdk_reforge._requests import ApiClient
11+
from sdk_reforge._sse_watchdog import WatchdogResponseWrapper
1012
import prefab_pb2 as Prefab
1113
from sdk_reforge.config_sdk_interface import ConfigSDKInterface
1214

15+
if TYPE_CHECKING:
16+
from sdk_reforge._sse_watchdog import SSEWatchdog
17+
1318
SHORT_CONNECTION_THRESHOLD = 2 # seconds
1419
CONSECUTIVE_SHORT_CONNECTION_LIMIT = 2 # times
1520
MIN_BACKOFF_TIME = 1 # seconds
@@ -29,68 +34,98 @@ def __init__(
2934
api_client: ApiClient,
3035
config_client: ConfigSDKInterface,
3136
urls: list[str],
37+
watchdog: Optional["SSEWatchdog"] = None,
3238
):
3339
self.api_client = api_client
3440
self.config_client = config_client
3541
self.sse_client: Optional[sseclient.SSEClient] = None
3642
self.timing = Timing()
3743
self.urls = urls
44+
self.watchdog = watchdog
3845

3946
def streaming_loop(self) -> None:
4047
too_short_connection_count = 0
4148
backoff_time = MIN_BACKOFF_TIME
4249

43-
while self.config_client.continue_connection_processing():
44-
try:
45-
logger.debug("Starting streaming connection")
46-
headers = {
47-
"Last-Event-ID": f"{self.config_client.highwater_mark()}",
48-
"accept": "text/event-stream",
49-
}
50-
response = self.api_client.resilient_request(
51-
"/api/v2/sse/config",
52-
headers=headers,
53-
stream=True,
54-
auth=("authuser", self.config_client.options.api_key),
55-
timeout=(5, 60),
56-
hosts=self.urls,
57-
)
58-
response.raise_for_status()
59-
if response.ok:
60-
elapsed_time = self.timing.time_execution(
61-
lambda: self.process_response(response)
50+
try:
51+
while self.config_client.continue_connection_processing():
52+
try:
53+
logger.debug("Starting streaming connection")
54+
headers = {
55+
"Last-Event-ID": f"{self.config_client.highwater_mark()}",
56+
"accept": "text/event-stream",
57+
}
58+
response = self.api_client.resilient_request(
59+
"/api/v2/sse/config",
60+
headers=headers,
61+
stream=True,
62+
auth=("authuser", self.config_client.options.api_key),
63+
timeout=(5, 60),
64+
hosts=self.urls,
6265
)
63-
if elapsed_time < SHORT_CONNECTION_THRESHOLD:
64-
too_short_connection_count += 1
65-
if (
66-
too_short_connection_count
67-
>= CONSECUTIVE_SHORT_CONNECTION_LIMIT
68-
):
69-
raise TooQuickConnectionException()
70-
else:
71-
too_short_connection_count = 0
72-
backoff_time = MIN_BACKOFF_TIME
73-
time.sleep(backoff_time)
74-
except UnauthorizedException:
75-
self.config_client.handle_unauthorized_response()
76-
except TooQuickConnectionException as e:
77-
logger.debug(f"Connection ended quickly: {str(e)}. Will apply backoff.")
78-
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
79-
time.sleep(backoff_time)
80-
except Exception as e:
81-
if not self.config_client.is_shutting_down():
82-
logger.warning(
83-
f"Streaming connection error: {str(e)}, Will retry in {backoff_time} seconds"
66+
response.raise_for_status()
67+
if response.ok:
68+
elapsed_time = self.timing.time_execution(
69+
lambda: self.process_response(response)
70+
)
71+
if elapsed_time < SHORT_CONNECTION_THRESHOLD:
72+
too_short_connection_count += 1
73+
if (
74+
too_short_connection_count
75+
>= CONSECUTIVE_SHORT_CONNECTION_LIMIT
76+
):
77+
raise TooQuickConnectionException()
78+
else:
79+
too_short_connection_count = 0
80+
backoff_time = MIN_BACKOFF_TIME
81+
time.sleep(backoff_time)
82+
except TooQuickConnectionException as e:
83+
logger.debug(
84+
f"Connection ended quickly: {str(e)}. Will apply backoff."
8485
)
8586
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
8687
time.sleep(backoff_time)
87-
88-
"""
89-
Hand off a successful response here for processing
90-
"""
88+
except HTTPError as e:
89+
# Check for unauthorized (401/403) responses
90+
if e.response is not None and e.response.status_code in (401, 403):
91+
logger.warning(
92+
f"Received {e.response.status_code} response, stopping SSE"
93+
)
94+
self.config_client.handle_unauthorized_response()
95+
else:
96+
if not self.config_client.is_shutting_down():
97+
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
98+
logger.warning(
99+
f"Streaming connection error ({type(e).__name__}): {str(e)}, "
100+
f"Will retry in {backoff_time} seconds"
101+
)
102+
time.sleep(backoff_time)
103+
except BaseException as e:
104+
# Re-raise system exceptions that should terminate the thread
105+
if isinstance(e, (KeyboardInterrupt, SystemExit)):
106+
raise
107+
if not self.config_client.is_shutting_down():
108+
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
109+
logger.warning(
110+
f"Streaming connection error ({type(e).__name__}): {str(e)}, "
111+
f"Will retry in {backoff_time} seconds"
112+
)
113+
time.sleep(backoff_time)
114+
finally:
115+
logger.info(
116+
f"Streaming loop exited "
117+
f"(shutdown={self.config_client.is_shutting_down()})"
118+
)
91119

92120
def process_response(self, response: Response) -> None:
93-
self.sse_client = sseclient.SSEClient(response)
121+
"""Hand off a successful response here for processing."""
122+
# Wrap response to track data received for watchdog
123+
if self.watchdog:
124+
wrapped_response = WatchdogResponseWrapper(response, self.watchdog.touch)
125+
self.sse_client = sseclient.SSEClient(wrapped_response)
126+
else:
127+
self.sse_client = sseclient.SSEClient(response)
128+
94129
if self.sse_client is not None:
95130
for event in self.sse_client.events():
96131
if self.config_client.is_shutting_down():

0 commit comments

Comments
 (0)