Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,14 @@ def _get_concurrent_state(

value_from_partitioned_state = None
if slices_from_partitioned_state:
# We assume here that the slices have been already merged
first_slice = slices_from_partitioned_state[0]
# We assume here that the slices have been already merged.
# After merging, slices are sorted in ascending order by (START_KEY, END_KEY),
# so the last slice contains the most recent cursor value for client-side filtering.
last_slice = slices_from_partitioned_state[-1]
value_from_partitioned_state = (
first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
else first_slice[self._connector_state_converter.END_KEY]
last_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
if self._connector_state_converter.MOST_RECENT_RECORD_KEY in last_slice
else last_slice[self._connector_state_converter.END_KEY]
)
return (
value_from_partitioned_state
Expand Down
27 changes: 20 additions & 7 deletions unit_tests/sources/streams/concurrent/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1344,16 +1344,17 @@ def test_given_partitioned_state_with_one_slice_without_most_recent_cursor_value
)


def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_first_slice_to_filter():
def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_last_slice_to_filter():
first_slice_end = 5
second_slice_start = first_slice_end + 10
second_slice_end = first_slice_end + 100
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{
"slices": [
{"end": first_slice_end, "start": 0},
{"end": first_slice_end + 100, "start": second_slice_start},
{"end": second_slice_end, "start": second_slice_start},
],
"state_type": "date-range",
},
Expand All @@ -1367,23 +1368,35 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then
_NO_LOOKBACK_WINDOW,
)

# Records before the last slice's end should NOT be synced (already processed)
assert (
cursor.should_be_synced(
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end - 1}, stream_name="test_stream")
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream")
)
== False
)
assert (
cursor.should_be_synced(
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream")
Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream")
)
== False
)
assert (
cursor.should_be_synced(
Record(data={_A_CURSOR_FIELD_KEY: second_slice_end - 1}, stream_name="test_stream")
)
== False
)
# Records at or after the last slice's end should be synced
assert (
cursor.should_be_synced(
Record(data={_A_CURSOR_FIELD_KEY: second_slice_end}, stream_name="test_stream")
)
== True
)
# even if this is within a boundary that has been synced, we don't take any chance and we sync it
# anyway in most cases, it shouldn't be pulled because we query for specific slice boundaries to the API
assert (
cursor.should_be_synced(
Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream")
Record(data={_A_CURSOR_FIELD_KEY: second_slice_end + 1}, stream_name="test_stream")
)
== True
)
Loading