Skip to content

Conversation

@devin-ai-integration
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Dec 2, 2025

feat(cdk): Add RecordExpander component for nested array extraction

Summary

This PR adds a new RecordExpander component to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This is needed to fix the Stripe invoice_line_items stream issue where the events endpoint returns invoice objects with nested lines.data arrays, but we need to emit each line item as a separate record.

Key changes:

  • New RecordExpander class in airbyte_cdk/sources/declarative/expanders/
  • Integration with DpathExtractor via optional record_expander parameter
  • Support for wildcard paths (e.g., ["sections", "*", "items"])
  • Optional remain_original_record flag to embed parent record context
  • Schema updates and auto-generated models
  • 13 new unit tests covering expansion scenarios (29 total tests in file)

Example usage:

extractor:
  type: DpathExtractor
  field_path: ["data", "object"]
  record_expander:
    type: RecordExpander
    expand_records_from_field: ["lines", "data"]
    remain_original_record: true

Review & Testing Checklist for Human

This is a YELLOW risk PR (medium confidence). Please verify:

  • Type safety with Any annotation: The code uses extracted: Any on line 73 and 86 of record_expander.py to work around MyPy's limitations with the dpath library (which returns object type). Review whether this is acceptable or if we need a different approach (e.g., type: ignore comments).

  • Variable naming clarity: The code uses parent_record = record to preserve the original record, then reuses record as a loop variable in the wildcard branch (line 74). While this works correctly, verify this naming pattern is clear enough or if we should use different variable names.

  • Wildcard path behavior: Test with real nested data structures to ensure wildcard matching works correctly. The logic iterates extracted values and only processes items that are lists (line 75: if isinstance(record, list)). Verify this handles all edge cases properly.

  • Backward compatibility: Verify existing connectors without record_expander still work correctly (the parameter is optional and defaults to None).

  • End-to-end testing: This PR only includes unit tests. The real-world behavior needs to be verified with the Stripe connector in the companion PR (fix(source-stripe): Fix invoice_line_items incremental stream to emit line items instead of invoices (do not merge) airbyte#70294).

Recommended test plan:

  1. Run existing CDK tests to ensure no regressions: poetry run pytest unit_tests/sources/declarative/extractors/test_dpath_extractor.py
  2. Test with Stripe connector (separate PR) to verify end-to-end behavior
  3. Try edge cases: empty arrays, missing paths, non-list values, wildcard with no matches, deeply nested paths

Notes

…thExtractor

- Add optional expand_records_from_field parameter to extract items from nested arrays
- Add optional remain_original_record parameter to preserve parent record context
- Implement _expand_record method to handle array expansion logic
- Add comprehensive unit tests covering all edge cases
- Maintain backward compatibility with existing functionality

Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Contributor Author

Original prompt from API User
Comment from @DanyloGL: /ai-triage\n\nIMPORTANT: The user will expect a response posted back to the PR. You should post exactly one comment back to the respective issue PR. If the user requested a code change or PR, your comment should contain a link to the PR. Assume the user has no access to your session or conversation thread unless/until you respond back to them.\n\nIssue #8683 by @jnr0790: Python L3: Stripe - Missing data in `invoice_line_items` stream\n\nIssue URL: https://github.com/airbytehq/oncall/issues/8683\n\nPlease use playbook macro: !issue_triage

PLAYBOOK_md:
# `/ai-triage` Slash Command Playbook

You are AI Triage Devin, an expert at analyzing Airbyte-related issues and providing actionable insights. You are responding to a GitHub slash command request. After reading the provided context, you should post a comment to confirm you understand the request and stating what your next steps will be, along with a link to your session. Once your triage and analysis is complete, update your comment with the full results of your triage. Collapse all of your comments under expandable sections.

IMPORTANT: Expect that your user has no access to the session and cannot talk with you directly. Do not wait for feedback or confirmation on any action.

## Context

You are analyzing the issue provided to you above. You will need to pull comment history on this issue to ensure you have full context.

## Your Task: Static Analysis and Triage

1. **Issue Analysis and Confirmation**: Read the complete issue content including all comments for full context.
   - **Post an initial comment immediately** (within 1-2 minutes) to confirm you understand the assignment and that you are looking into it. Include your session URL.
   - If you are missing any critical information or context (e.g., workspace UUID, connector version, error logs, reproduction steps, customer environment details), include in your initial comment a request for additional context. (Do not block waiting for a... (9078 chars truncated...)

@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

github-actions bot commented Dec 2, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1764690419-dpath-extractor-expansion#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1764690419-dpath-extractor-expansion

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions github-actions bot added the enhancement New feature or request label Dec 2, 2025
@github-actions
Copy link

github-actions bot commented Dec 2, 2025

PyTest Results (Fast)

3 826 tests  +13   3 814 ✅ +13   6m 14s ⏱️ -17s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 5b0c0d5. ± Comparison against base commit 80b7668.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Dec 2, 2025

PyTest Results (Full)

3 829 tests  +13   3 817 ✅ +13   10m 54s ⏱️ -2s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 5b0c0d5. ± Comparison against base commit 80b7668.

♻️ This comment has been updated with latest results.

- Create new RecordExpander class in airbyte_cdk/sources/declarative/expanders/
- Move expand_records_from_field and remain_original_record parameters from DpathExtractor to RecordExpander
- Update DpathExtractor to accept optional record_expander attribute
- Register RecordExpander in manifest component transformer
- Update unit tests to use new RecordExpander class structure
- All 24 tests passing, MyPy and Ruff checks passing

This refactoring improves separation of concerns by isolating record expansion logic into a dedicated component.

Co-Authored-By: unknown <>
- Add RecordExpander definition to declarative_component_schema.yaml
- Add record_expander property to DpathExtractor schema
- Update create_dpath_extractor in model_to_component_factory.py to handle record_expander
- Auto-generate models from schema using poetry run poe build
- All 24 tests passing

This completes the schema registration for RecordExpander component, enabling
YAML manifests to properly instantiate RecordExpander when used with DpathExtractor.

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: Add expand_records_from_field and remain_original_record to DpathExtractor (do not merge) feat: Add RecordExpander component for nested array extraction Dec 2, 2025
Apply cleaner logic using 'yield from' consistently:
- When extracted is a list without record_expander, use 'yield from extracted'
- Check 'if not self.record_expander' instead of nested if/else
- Remove unnecessary 'yield from []' for empty case

All 24 tests passing. Suggested by @DanyloGL.

Co-Authored-By: unknown <>
Changes:
- Add back 'else: yield from []' in DpathExtractor for explicit empty case
- Update RecordExpander to return nothing when expand_records_from_field path doesn't exist or isn't a list
- Update unit tests to expect no records instead of original record when expansion fails

This makes RecordExpander stricter: it only emits records when successfully expanding a list.
For Stripe invoice_line_items, this ensures we only emit line items, not invoice objects.

All 24 tests passing. Requested by @DanyloGL.

Co-Authored-By: unknown <>
Changes:
1. Remove TypeError from exception handler (only catch KeyError per dpath.get docs)
2. Add wildcard (*) support to RecordExpander for matching multiple arrays
3. Update docstring and schema to document wildcard support
4. Add 5 new unit tests for wildcard expansion scenarios
5. Regenerate models from updated schema

When wildcards are used, RecordExpander:
- Uses dpath.values() to find all matches
- Filters for list-valued matches only
- Expands items from all matched lists
- Returns nothing if no list matches found

All 29 tests passing. Requested by @DanyloGL.

Co-Authored-By: unknown <>
MyPy was complaining that dpath.values() and dpath.get() return 'object' type.
Added cast(Iterable[Any], ...) for dpath.values() and cast(Any, ...) for dpath.get()
to satisfy MyPy type checking while maintaining runtime behavior.

All 29 tests passing. MyPy check now passes.

Co-Authored-By: unknown <>
Unified the wildcard and non-wildcard branches by collecting all arrays
to process into a single list, then using one common loop for expansion.
This eliminates the duplicated item iteration and record expansion logic.

All 29 tests passing. MyPy check passes.

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: Add RecordExpander component for nested array extraction feat(cdk): Add RecordExpander component for nested array extraction Dec 2, 2025
Changes per Daryna's feedback:
1. Removed isinstance(m, list) filter - now checking inside loop
2. Renamed 'matches' to 'extracted'
3. Removed type casts - using 'extracted: Any' instead
4. Renamed 'nested_array' to 'record' (loop var), using 'parent_record' for original
5. Removed 'if not nested_array:' check (redundant with for loop)

All 29 tests passing. MyPy check passes.

Co-Authored-By: unknown <>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant