diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index e3a487183..e76799af8 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -233,12 +233,14 @@ def _get_concurrent_state( value_from_partitioned_state = None if slices_from_partitioned_state: - # We assume here that the slices have been already merged - first_slice = slices_from_partitioned_state[0] + # We assume here that the slices have been already merged. + # After merging, slices are sorted in ascending order by (START_KEY, END_KEY), + # so the last slice contains the most recent cursor value for client-side filtering. + last_slice = slices_from_partitioned_state[-1] value_from_partitioned_state = ( - first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] - if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice - else first_slice[self._connector_state_converter.END_KEY] + last_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] + if self._connector_state_converter.MOST_RECENT_RECORD_KEY in last_slice + else last_slice[self._connector_state_converter.END_KEY] ) return ( value_from_partitioned_state diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 34c92800d..372724a81 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1344,16 +1344,17 @@ def test_given_partitioned_state_with_one_slice_without_most_recent_cursor_value ) -def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_first_slice_to_filter(): +def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_last_slice_to_filter(): first_slice_end = 5 second_slice_start = first_slice_end + 10 + second_slice_end = first_slice_end + 100 cursor = ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, { "slices": [ {"end": first_slice_end, "start": 0}, - {"end": first_slice_end + 100, "start": second_slice_start}, + {"end": second_slice_end, "start": second_slice_start}, ], "state_type": "date-range", }, @@ -1367,23 +1368,35 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then _NO_LOOKBACK_WINDOW, ) + # Records before the last slice's end should NOT be synced (already processed) assert ( cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: first_slice_end - 1}, stream_name="test_stream") + Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream") ) == False ) assert ( cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream") + Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream") + ) + == False + ) + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: second_slice_end - 1}, stream_name="test_stream") + ) + == False + ) + # Records at or after the last slice's end should be synced + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: second_slice_end}, stream_name="test_stream") ) == True ) - # even if this is within a boundary that has been synced, we don't take any chance and we sync it - # anyway in most cases, it shouldn't be pulled because we query for specific slice boundaries to the API assert ( cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream") + Record(data={_A_CURSOR_FIELD_KEY: second_slice_end + 1}, stream_name="test_stream") ) == True )