-
Notifications
You must be signed in to change notification settings - Fork 31
fix(concurrent): use last slice for client-side incremental filtering #861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix(concurrent): use last slice for client-side incremental filtering #861
Conversation
The _get_concurrent_state() method was using slices[0] (the earliest slice) instead of slices[-1] (the most recent slice) when extracting the cursor value from partitioned state. Since merge_intervals() sorts slices in ascending order by (START_KEY, END_KEY), using the first slice caused should_be_synced() to use the earliest timestamp as the lower bound, admitting ALL records instead of only new records. This bug affected all low-code connectors using DatetimeBasedCursor with is_client_side_incremental: true, including Notion's pages, databases, comments, and blocks streams. Fixes: airbytehq/oncall#8854 Co-Authored-By: unknown <>
Original prompt from API User |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1764803847-fix-client-side-incremental-state#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1764803847-fix-client-side-incremental-stateHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
…ental filtering Updated test_given_partitioned_state_with_multiple_slices_when_should_be_synced to verify that the LAST slice's end value is used for filtering, not the first. This reflects the correct behavior for client-side incremental filtering. Co-Authored-By: unknown <>
PyTest Results (Fast)3 818 tests ±0 3 806 ✅ ±0 6m 30s ⏱️ +15s Results for commit ba87ec2. ± Comparison against base commit daf7d48. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both. |
PyTest Results (Full)3 821 tests ±0 3 808 ✅ - 1 10m 59s ⏱️ +6s For more details on these failures, see this check. Results for commit ba87ec2. ± Comparison against base commit daf7d48. This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical bug in the concurrent cursor's client-side incremental filtering logic. The bug caused incremental syncs to behave like full refreshes by using the earliest slice instead of the most recent slice when determining which records to sync.
Key Changes:
- Fixed
ConcurrentCursor._get_concurrent_state()to useslices[-1](most recent) instead ofslices[0](earliest) for extracting cursor values - Updated test to correctly validate that only records at or after the last slice's end are synced
- Added clarifying comments about slice ordering guarantees from
merge_intervals()
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| airbyte_cdk/sources/streams/concurrent/cursor.py | Fixed cursor value extraction to use last slice instead of first slice, with added documentation about slice ordering |
| unit_tests/sources/streams/concurrent/test_cursor.py | Updated test name, logic, and assertions to correctly validate filtering behavior with multiple slices |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary
Fixes a bug where
ConcurrentCursor._get_concurrent_state()was usingslices[0](the earliest slice) instead ofslices[-1](the most recent slice) when extracting the cursor value from partitioned state.Since
merge_intervals()sorts slices in ascending order by(START_KEY, END_KEY), using the first slice causedshould_be_synced()to use the earliest timestamp as the lower bound, admitting ALL records instead of only new records. This made incremental syncs behave like full refreshes.Impact: This bug affected all low-code connectors using
DatetimeBasedCursorwithis_client_side_incremental: true, including Notion'spages,databases,comments, andblocksstreams.Fixes: airbytehq/oncall#8854
Review & Testing Checklist for Human
merge_intervals()always produces slices sorted in ascending order by(START_KEY, END_KEY)- the fix relies on this invariant_get_concurrent_state()return value is also used instream_slices()for gap detection - verify this change doesn't break slice generation logicRecommended test plan:
pagesstream in incremental modeUpdates since last revision
test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_last_slice_to_filterto verify the fixNotes