From 41f10f16ca612bb3f61fa3976f636f7cd4dc9c9f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 3 Dec 2025 23:17:59 +0000 Subject: [PATCH 1/2] fix(concurrent): use last slice for client-side incremental filtering The _get_concurrent_state() method was using slices[0] (the earliest slice) instead of slices[-1] (the most recent slice) when extracting the cursor value from partitioned state. Since merge_intervals() sorts slices in ascending order by (START_KEY, END_KEY), using the first slice caused should_be_synced() to use the earliest timestamp as the lower bound, admitting ALL records instead of only new records. This bug affected all low-code connectors using DatetimeBasedCursor with is_client_side_incremental: true, including Notion's pages, databases, comments, and blocks streams. Fixes: airbytehq/oncall#8854 Co-Authored-By: unknown <> --- airbyte_cdk/sources/streams/concurrent/cursor.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index e3a487183..e76799af8 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -233,12 +233,14 @@ def _get_concurrent_state( value_from_partitioned_state = None if slices_from_partitioned_state: - # We assume here that the slices have been already merged - first_slice = slices_from_partitioned_state[0] + # We assume here that the slices have been already merged. + # After merging, slices are sorted in ascending order by (START_KEY, END_KEY), + # so the last slice contains the most recent cursor value for client-side filtering. + last_slice = slices_from_partitioned_state[-1] value_from_partitioned_state = ( - first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] - if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice - else first_slice[self._connector_state_converter.END_KEY] + last_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] + if self._connector_state_converter.MOST_RECENT_RECORD_KEY in last_slice + else last_slice[self._connector_state_converter.END_KEY] ) return ( value_from_partitioned_state From ba87ec2c6d21f3b24c41eb47aa274c5bb4a76307 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 3 Dec 2025 23:20:37 +0000 Subject: [PATCH 2/2] test: update test to verify last slice is used for client-side incremental filtering Updated test_given_partitioned_state_with_multiple_slices_when_should_be_synced to verify that the LAST slice's end value is used for filtering, not the first. This reflects the correct behavior for client-side incremental filtering. Co-Authored-By: unknown <> --- .../sources/streams/concurrent/test_cursor.py | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 34c92800d..372724a81 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1344,16 +1344,17 @@ def test_given_partitioned_state_with_one_slice_without_most_recent_cursor_value ) -def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_first_slice_to_filter(): +def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_last_slice_to_filter(): first_slice_end = 5 second_slice_start = first_slice_end + 10 + second_slice_end = first_slice_end + 100 cursor = ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, { "slices": [ {"end": first_slice_end, "start": 0}, - {"end": first_slice_end + 100, "start": second_slice_start}, + {"end": second_slice_end, "start": second_slice_start}, ], "state_type": "date-range", }, @@ -1367,23 +1368,35 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then _NO_LOOKBACK_WINDOW, ) + # Records before the last slice's end should NOT be synced (already processed) assert ( cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: first_slice_end - 1}, stream_name="test_stream") + Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream") ) == False ) assert ( cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream") + Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream") + ) + == False + ) + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: second_slice_end - 1}, stream_name="test_stream") + ) + == False + ) + # Records at or after the last slice's end should be synced + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: second_slice_end}, stream_name="test_stream") ) == True ) - # even if this is within a boundary that has been synced, we don't take any chance and we sync it - # anyway in most cases, it shouldn't be pulled because we query for specific slice boundaries to the API assert ( cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream") + Record(data={_A_CURSOR_FIELD_KEY: second_slice_end + 1}, stream_name="test_stream") ) == True )