Skip to content

Commit 0640b72

Browse files
committed
fix: honor server-directed FDv1 Fallback Directive in initializer phase
Previously the X-LD-FD-Fallback response header was honored only in the synchronizer phase, and in the synchronizer phase a payload arriving alongside the directive was discarded because the streaming processor halted before applying it. Per the updated FDv2 Data System spec, the directive must be honored in both initializer and synchronizer phases, must take precedence over the section 1.2 failover algorithm, and must let any accompanying payload be applied before the SDK switches terminally to the FDv1 Fallback Synchronizer. Carry the signal explicitly on the public initializer/synchronizer result types -- Basis.fallback_to_fdv1 and Update.fallback_to_fdv1 -- so callers cannot silently drop it. Update.revert_to_fdv1 is renamed to Update.fallback_to_fdv1 to match the spec terminology.
1 parent 1c700b4 commit 0640b72

8 files changed

Lines changed: 475 additions & 65 deletions

File tree

ldclient/impl/datasourcev2/polling.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
139139
yield Update(
140140
state=DataSourceState.OFF,
141141
error=error_info,
142-
revert_to_fdv1=True,
142+
fallback_to_fdv1=True,
143143
environment_id=envid,
144144
)
145145
break
@@ -168,6 +168,17 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
168168
message=result.error,
169169
)
170170

171+
# Even a non-HTTP error (e.g. malformed JSON) can carry the fallback
172+
# header. If so, halt rather than retrying the FDv2 endpoint.
173+
if fallback:
174+
yield Update(
175+
state=DataSourceState.OFF,
176+
error=error_info,
177+
fallback_to_fdv1=True,
178+
environment_id=envid,
179+
)
180+
break
181+
171182
yield Update(
172183
state=DataSourceState.INTERRUPTED,
173184
error=error_info,
@@ -179,7 +190,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
179190
state=DataSourceState.VALID,
180191
change_set=change_set,
181192
environment_id=headers.get(_LD_ENVID_HEADER),
182-
revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
193+
fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
183194
)
184195

185196
if self._interrupt_event.wait(self._poll_interval):
@@ -204,13 +215,18 @@ def _poll(self, ss: SelectorStore) -> BasisResult:
204215
if is_http_error_recoverable(status_code):
205216
log.warning(http_error_message_result)
206217

218+
# Forward any response headers so callers (e.g. FDv2 datasystem)
219+
# can read the X-LD-FD-Fallback directive even on error.
207220
return _Fail(
208-
error=http_error_message_result, exception=result.exception
221+
error=http_error_message_result,
222+
exception=result.exception,
223+
headers=result.headers,
209224
)
210225

211226
return _Fail(
212227
error=result.error or "Failed to request payload",
213228
exception=result.exception,
229+
headers=result.headers,
214230
)
215231

216232
(change_set, headers) = result.value
@@ -223,6 +239,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult:
223239
change_set=change_set,
224240
persist=change_set.selector.is_defined(),
225241
environment_id=env_id,
242+
fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true',
226243
)
227244

228245
return _Success(value=basis)

ldclient/impl/datasourcev2/streaming.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
166166
self._connection_attempt_start_time = time()
167167

168168
envid = None
169+
# fallback_requested is set when a Start action carries
170+
# X-LD-FD-Fallback: true. We finish applying the current payload
171+
# before halting, so consumers can serve the server-provided data
172+
# while FDv1 takes over.
173+
fallback_requested = False
169174
for action in self._sse.all:
170175
if isinstance(action, Fault):
171176
# If the SSE client detects the stream has closed, then it will
@@ -186,17 +191,9 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
186191
continue
187192

188193
if isinstance(action, Start) and action.headers is not None:
189-
fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
190194
envid = action.headers.get(_LD_ENVID_HEADER, envid)
191-
192-
if fallback:
193-
self._record_stream_init(True)
194-
yield Update(
195-
state=DataSourceState.OFF,
196-
revert_to_fdv1=True,
197-
environment_id=envid,
198-
)
199-
break
195+
if action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
196+
fallback_requested = True
200197

201198
if not isinstance(action, Event):
202199
continue
@@ -206,6 +203,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
206203
if update is not None:
207204
self._record_stream_init(False)
208205
self._connection_attempt_start_time = None
206+
if fallback_requested:
207+
# Decorate the completed update with the fallback signal,
208+
# then halt — the consumer will switch to FDv1.
209+
update = Update(
210+
state=update.state,
211+
change_set=update.change_set,
212+
error=update.error,
213+
fallback_to_fdv1=True,
214+
environment_id=update.environment_id,
215+
)
216+
yield update
217+
break
209218
yield update
210219
except json.decoder.JSONDecodeError as e:
211220
log.info(
@@ -229,7 +238,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
229238
error=DataSourceErrorInfo(
230239
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
231240
),
232-
revert_to_fdv1=False,
241+
fallback_to_fdv1=False,
233242
environment_id=envid,
234243
)
235244

@@ -353,7 +362,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
353362
error=DataSourceErrorInfo(
354363
DataSourceErrorKind.INVALID_DATA, 0, time(), str(error)
355364
),
356-
revert_to_fdv1=False,
365+
fallback_to_fdv1=False,
357366
environment_id=envid,
358367
)
359368
return (update, True)
@@ -377,7 +386,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
377386
update = Update(
378387
state=DataSourceState.OFF,
379388
error=error_info,
380-
revert_to_fdv1=True,
389+
fallback_to_fdv1=True,
381390
environment_id=envid,
382391
)
383392
self.stop()
@@ -394,7 +403,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
394403
else DataSourceState.OFF
395404
),
396405
error=error_info,
397-
revert_to_fdv1=False,
406+
fallback_to_fdv1=False,
398407
environment_id=envid,
399408
)
400409

@@ -416,7 +425,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
416425
error=DataSourceErrorInfo(
417426
DataSourceErrorKind.UNKNOWN, 0, time(), str(error)
418427
),
419-
revert_to_fdv1=False,
428+
fallback_to_fdv1=False,
420429
environment_id=envid,
421430
)
422431
# no stacktrace here because, for a typical connection error, it'll

ldclient/impl/datasystem/fdv2.py

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
from ldclient.impl.listeners import Listeners
1919
from ldclient.impl.repeating_task import RepeatingTask
2020
from ldclient.impl.rwlock import ReadWriteLock
21-
from ldclient.impl.util import _Fail, log
21+
from ldclient.impl.util import _LD_FD_FALLBACK_HEADER, _Fail, log
2222
from ldclient.interfaces import (
2323
DataSourceErrorInfo,
24+
DataSourceErrorKind,
2425
DataSourceState,
2526
DataSourceStatus,
2627
DataSourceStatusProvider,
@@ -276,7 +277,7 @@ class ConditionDirective(str, Enum):
276277

277278
FDV1 = "fdv1"
278279
"""
279-
FDV1 suggests that we should immediately revert to the FDv1 fallback synchronizer.
280+
FDV1 suggests that we should immediately fall back to the FDv1 Fallback Synchronizer.
280281
"""
281282

282283

@@ -403,7 +404,26 @@ def _run_main_loop(self, set_on_ready: Event):
403404
)
404405

405406
# Run initializers first
406-
self._run_initializers(set_on_ready)
407+
fallback_requested = self._run_initializers(set_on_ready)
408+
409+
# If an initializer asked the SDK to fall back to FDv1, halt the
410+
# configured FDv2 chain and switch terminally to the FDv1 Fallback
411+
# Synchronizer (or transition to OFF if none is configured).
412+
if fallback_requested:
413+
if self._fdv1_fallback_synchronizer_builder is not None:
414+
log.warning("Falling back to FDv1 protocol")
415+
self._synchronizers = [self._fdv1_fallback_synchronizer_builder]
416+
else:
417+
log.warning(
418+
"Initializer requested FDv1 fallback but none configured"
419+
)
420+
self._synchronizers = []
421+
self._data_source_status_provider.update_status(
422+
DataSourceState.OFF,
423+
self._data_source_status_provider.status.error,
424+
)
425+
set_on_ready.set()
426+
return
407427

408428
# Run synchronizers
409429
self._run_synchronizers(set_on_ready)
@@ -414,14 +434,22 @@ def _run_main_loop(self, set_on_ready: Event):
414434
if not set_on_ready.is_set():
415435
set_on_ready.set()
416436

417-
def _run_initializers(self, set_on_ready: Event):
418-
"""Run initializers to get initial data."""
437+
def _run_initializers(self, set_on_ready: Event) -> bool:
438+
"""
439+
Run initializers to get initial data.
440+
441+
Returns True when an initializer requested the FDv1 Fallback Directive
442+
(via the X-LD-FD-Fallback response header). When that happens, any
443+
accompanying payload is applied first so evaluations can serve the
444+
server-provided data while the FDv1 synchronizer spins up; the caller
445+
is then responsible for switching to the FDv1 Fallback Synchronizer.
446+
"""
419447
if self._data_system_config.initializers is None:
420-
return
448+
return False
421449

422450
for initializer_builder in self._data_system_config.initializers:
423451
if self._stop_event.is_set():
424-
return
452+
return False
425453

426454
try:
427455
initializer = initializer_builder.build(self._config)
@@ -431,6 +459,25 @@ def _run_initializers(self, set_on_ready: Event):
431459

432460
if isinstance(basis_result, _Fail):
433461
log.warning("Initializer %s failed: %s", initializer.name, basis_result.error)
462+
# An error response can still carry the FDv1 fallback directive.
463+
if basis_result.headers is not None and \
464+
basis_result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
465+
log.warning(
466+
"Initializer %s requested fallback to FDv1 protocol",
467+
initializer.name,
468+
)
469+
# Surface the underlying error on the status so
470+
# programmatic monitors can see why FDv2 shut down.
471+
self._data_source_status_provider.update_status(
472+
DataSourceState.INITIALIZING,
473+
DataSourceErrorInfo(
474+
kind=DataSourceErrorKind.UNKNOWN,
475+
status_code=0,
476+
time=time.time(),
477+
message=basis_result.error,
478+
),
479+
)
480+
return True
434481
continue
435482

436483
basis = basis_result.value
@@ -442,9 +489,19 @@ def _run_initializers(self, set_on_ready: Event):
442489
# Set ready event if an only if a selector is defined for the changeset
443490
if basis.change_set.selector.is_defined():
444491
set_on_ready.set()
445-
return
492+
493+
if basis.fallback_to_fdv1:
494+
log.warning(
495+
"Initializer %s requested fallback to FDv1 protocol",
496+
initializer.name,
497+
)
498+
return True
499+
500+
if basis.change_set.selector.is_defined():
501+
return False
446502
except Exception as e:
447503
log.error("Initializer failed with exception: %s", e)
504+
return False
448505

449506
def _run_synchronizers(self, set_on_ready: Event):
450507
"""Run synchronizers to keep data up-to-date."""
@@ -476,12 +533,12 @@ def synchronizer_loop(self: 'FDv2'):
476533

477534
if directive == ConditionDirective.FDV1:
478535
# Abandon all synchronizers and use only fdv1 fallback
479-
log.info("Reverting to FDv1 fallback synchronizer")
536+
log.warning("Falling back to FDv1 protocol")
480537
if self._fdv1_fallback_synchronizer_builder is not None:
481538
synchronizers_list = [self._fdv1_fallback_synchronizer_builder]
482539
current_index = 0
483540
else:
484-
log.warning("No FDv1 fallback synchronizer available")
541+
log.warning("Synchronizer requested FDv1 fallback but none configured")
485542
synchronizers_list = []
486543
self._data_source_status_provider.update_status(
487544
DataSourceState.OFF,
@@ -608,8 +665,11 @@ def reader(self: 'FDv2'):
608665
# Update status
609666
self._data_source_status_provider.update_status(update.state, update.error)
610667

611-
# Check if we should revert to FDv1 immediately
612-
if update.revert_to_fdv1:
668+
# Check if we should fall back to FDv1 immediately. fallback_to_fdv1
669+
# may ride along on a Valid update (payload + directive in the same
670+
# response), in which case the ChangeSet has already been applied
671+
# above before we hand off.
672+
if update.fallback_to_fdv1:
613673
return ConditionDirective.FDV1
614674

615675
# Check for OFF state indicating permanent failure

ldclient/interfaces.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1366,6 +1366,13 @@ class Basis:
13661366
change_set: ChangeSet
13671367
persist: bool
13681368
environment_id: Optional[str] = None
1369+
fallback_to_fdv1: bool = False
1370+
"""
1371+
Indicates that the LaunchDarkly server has directed the SDK to fall
1372+
back to the FDv1 Fallback Synchronizer (via the X-LD-FD-Fallback
1373+
response header). When True, callers must apply ``change_set`` first
1374+
and then terminally switch to the FDv1 Fallback Synchronizer.
1375+
"""
13691376

13701377

13711378
class ChangeSetBuilder:
@@ -1647,7 +1654,14 @@ class Update:
16471654
state: DataSourceState
16481655
change_set: Optional[ChangeSet] = None
16491656
error: Optional[DataSourceErrorInfo] = None
1650-
revert_to_fdv1: bool = False
1657+
fallback_to_fdv1: bool = False
1658+
"""
1659+
Indicates that the LaunchDarkly server has directed the SDK to fall
1660+
back to the FDv1 Fallback Synchronizer (via the X-LD-FD-Fallback
1661+
response header). When True, callers must apply any accompanying
1662+
``change_set`` first and then terminally switch to the FDv1 Fallback
1663+
Synchronizer. The flag may ride along on Valid or Off updates.
1664+
"""
16511665
environment_id: Optional[str] = None
16521666

16531667

0 commit comments

Comments
 (0)