From d077d99ed026111f9c8782ec706db5f3abc08cd9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 15:49:37 +0000 Subject: [PATCH 01/12] feat: Add expand_records_from_field and remain_original_record to DpathExtractor - Add optional expand_records_from_field parameter to extract items from nested arrays - Add optional remain_original_record parameter to preserve parent record context - Implement _expand_record method to handle array expansion logic - Add comprehensive unit tests covering all edge cases - Maintain backward compatibility with existing functionality Co-Authored-By: unknown <> --- .../declarative/extractors/dpath_extractor.py | 67 ++++++++- .../extractors/test_dpath_extractor.py | 134 ++++++++++++++++++ 2 files changed, 198 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 9c97773e3..4c5f32b25 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, List, Mapping, MutableMapping, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath import requests @@ -24,6 +24,12 @@ class DpathExtractor(RecordExtractor): If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned. + Optionally, records can be expanded by extracting items from a nested array field. + When expand_records_from_field is configured, each extracted record is expanded by + extracting items from the specified nested array path and emitting each item as a + separate record. If remain_original_record is True, each expanded record will include + the original parent record in an "original_record" field. + Examples of instantiating this transform: ``` extractor: @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor): field_path: [] ``` + ``` + extractor: + type: DpathExtractor + field_path: + - "data" + - "object" + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + Attributes: field_path (Union[InterpolatedString, str]): Path to the field that should be extracted config (Config): The user-provided configuration as specified by the source's spec decoder (Decoder): The decoder responsible to transfom the response in a Mapping + expand_records_from_field (Optional[List[Union[InterpolatedString, str]]]): Path to a nested array field within each extracted record. If provided, items from this array will be extracted and emitted as separate records. + remain_original_record (bool): If True and expand_records_from_field is set, each expanded record will include the original parent record in an "original_record" field. Defaults to False. """ field_path: List[Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) + expand_records_from_field: Optional[List[Union[InterpolatedString, str]]] = None + remain_original_record: bool = False def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -67,6 +89,44 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path[path_index] = InterpolatedString.create( self.field_path[path_index], parameters=parameters ) + + if self.expand_records_from_field: + self._expand_path = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.expand_records_from_field + ] + else: + self._expand_path = None + + def _expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: + """Expand a record by extracting items from a nested array field.""" + if not self._expand_path: + yield record + return + + expand_path = [path.eval(self.config) for path in self._expand_path] + + try: + nested_array = dpath.get(record, expand_path) + except (KeyError, TypeError): + yield record + return + + if not isinstance(nested_array, list): + yield record + return + + if len(nested_array) == 0: + return + + for item in nested_array: + if isinstance(item, dict): + expanded_record = dict(item) + if self.remain_original_record: + expanded_record["original_record"] = record + yield expanded_record + else: + yield item def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: for body in self.decoder.decode(response): @@ -79,8 +139,9 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): - yield from extracted + for record in extracted: + yield from self._expand_record(record) elif extracted: - yield extracted + yield from self._expand_record(extracted) else: yield from [] diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 05e586592..4d0160f14 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -121,3 +121,137 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco actual_records = list(extractor.extract_records(response)) assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, remain_original_record, body, expected_records", + [ + ( + ["data", "object"], + ["lines", "data"], + False, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ] + }, + } + } + }, + [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ], + ), + ( + ["data", "object"], + ["lines", "data"], + True, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + ] + }, + } + } + }, + [ + { + "id": "il_1", + "amount": 100, + "original_record": { + "id": "in_123", + "created": 1234567890, + "lines": {"data": [{"id": "il_1", "amount": 100}]}, + }, + }, + ], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": []}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1"}}, + [{"id": "parent_1"}], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [{"id": "parent_1", "items": "not_an_array"}], + ), + ( + ["data"], + ["nested", "array"], + False, + {"data": {"id": "parent_1", "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}}}, + [{"id": "child_1"}, {"id": "child_2"}], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": [1, 2, "string", {"id": "dict_item"}]}}, + [1, 2, "string", {"id": "dict_item"}], + ), + ( + [], + ["items"], + False, + [ + {"id": "parent_1", "items": [{"id": "child_1"}]}, + {"id": "parent_2", "items": [{"id": "child_2"}, {"id": "child_3"}]}, + ], + [{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}], + ), + ], + ids=[ + "test_expand_nested_array", + "test_expand_with_original_record", + "test_expand_empty_array_yields_nothing", + "test_expand_missing_path_yields_original", + "test_expand_non_array_yields_original", + "test_expand_deeply_nested_path", + "test_expand_mixed_types_in_array", + "test_expand_multiple_parent_records", + ], +) +def test_dpath_extractor_with_expansion( + field_path: List, + expand_records_from_field: List, + remain_original_record: bool, + body, + expected_records: List, +): + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + expand_records_from_field=expand_records_from_field, + remain_original_record=remain_original_record, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records From 81d46301f934052b3037845f027bc978f4c38674 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 15:54:20 +0000 Subject: [PATCH 02/12] style: Fix ruff formatting in dpath_extractor.py Co-Authored-By: unknown <> --- .../declarative/extractors/dpath_extractor.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 4c5f32b25..e4ad2e640 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -89,36 +89,38 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path[path_index] = InterpolatedString.create( self.field_path[path_index], parameters=parameters ) - + if self.expand_records_from_field: self._expand_path = [ - InterpolatedString.create(path, parameters=parameters) + InterpolatedString.create(path, parameters=parameters) for path in self.expand_records_from_field ] else: self._expand_path = None - def _expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: + def _expand_record( + self, record: MutableMapping[Any, Any] + ) -> Iterable[MutableMapping[Any, Any]]: """Expand a record by extracting items from a nested array field.""" if not self._expand_path: yield record return - + expand_path = [path.eval(self.config) for path in self._expand_path] - + try: nested_array = dpath.get(record, expand_path) except (KeyError, TypeError): yield record return - + if not isinstance(nested_array, list): yield record return - + if len(nested_array) == 0: return - + for item in nested_array: if isinstance(item, dict): expanded_record = dict(item) From c7ac5f2cac489ad0c031f9e70da7bccbfbf89afb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 15:55:39 +0000 Subject: [PATCH 03/12] style: Fix ruff formatting in test_dpath_extractor.py Co-Authored-By: unknown <> --- .../sources/declarative/extractors/test_dpath_extractor.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 4d0160f14..de4fd8bf1 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -203,7 +203,12 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco ["data"], ["nested", "array"], False, - {"data": {"id": "parent_1", "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}}}, + { + "data": { + "id": "parent_1", + "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}, + } + }, [{"id": "child_1"}, {"id": "child_2"}], ), ( From 24c8ac96edcfc8d90cec2af08843c6298379b302 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 15:59:32 +0000 Subject: [PATCH 04/12] fix: Add type annotation for _expand_path to fix MyPy error Co-Authored-By: unknown <> --- airbyte_cdk/sources/declarative/extractors/dpath_extractor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index e4ad2e640..cb3fe455a 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -91,7 +91,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: ) if self.expand_records_from_field: - self._expand_path = [ + self._expand_path: Optional[List[InterpolatedString]] = [ InterpolatedString.create(path, parameters=parameters) for path in self.expand_records_from_field ] From 91690f45c8954b7c10074b1f360b1253735f699b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 16:21:55 +0000 Subject: [PATCH 05/12] refactor: Extract record expansion logic into RecordExpander class - Create new RecordExpander class in airbyte_cdk/sources/declarative/expanders/ - Move expand_records_from_field and remain_original_record parameters from DpathExtractor to RecordExpander - Update DpathExtractor to accept optional record_expander attribute - Register RecordExpander in manifest component transformer - Update unit tests to use new RecordExpander class structure - All 24 tests passing, MyPy and Ruff checks passing This refactoring improves separation of concerns by isolating record expansion logic into a dedicated component. Co-Authored-By: unknown <> --- .../sources/declarative/expanders/__init__.py | 7 ++ .../declarative/expanders/record_expander.py | 78 +++++++++++++++++++ .../declarative/extractors/dpath_extractor.py | 76 +++++------------- .../parsers/manifest_component_transformer.py | 1 + .../extractors/test_dpath_extractor.py | 10 ++- 5 files changed, 115 insertions(+), 57 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/expanders/__init__.py create mode 100644 airbyte_cdk/sources/declarative/expanders/record_expander.py diff --git a/airbyte_cdk/sources/declarative/expanders/__init__.py b/airbyte_cdk/sources/declarative/expanders/__init__.py new file mode 100644 index 000000000..c89197fc3 --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/__init__.py @@ -0,0 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander + +__all__ = ["RecordExpander"] diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py new file mode 100644 index 000000000..d766ad920 --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union + +import dpath + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.types import Config + + +@dataclass +class RecordExpander: + """ + Expands records by extracting items from a nested array field. + + When configured, this component extracts items from a specified nested array path + within each record and emits each item as a separate record. Optionally, the original + parent record can be embedded in each expanded item for context preservation. + + Examples of instantiating this component: + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + + Attributes: + expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. + remain_original_record (bool): If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False. + config (Config): The user-provided configuration as specified by the source's spec + """ + + expand_records_from_field: List[Union[InterpolatedString, str]] + config: Config + parameters: InitVar[Mapping[str, Any]] + remain_original_record: bool = False + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._expand_path: Optional[List[InterpolatedString]] = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.expand_records_from_field + ] + + def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: + """Expand a record by extracting items from a nested array field.""" + if not self._expand_path: + yield record + return + + expand_path = [path.eval(self.config) for path in self._expand_path] + + try: + nested_array = dpath.get(record, expand_path) + except (KeyError, TypeError): + yield record + return + + if not isinstance(nested_array, list): + yield record + return + + if len(nested_array) == 0: + return + + for item in nested_array: + if isinstance(item, dict): + expanded_record = dict(item) + if self.remain_original_record: + expanded_record["original_record"] = record + yield expanded_record + else: + yield item diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index cb3fe455a..ca9ec9b44 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -9,6 +9,7 @@ import requests from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.types import Config @@ -24,11 +25,10 @@ class DpathExtractor(RecordExtractor): If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned. - Optionally, records can be expanded by extracting items from a nested array field. - When expand_records_from_field is configured, each extracted record is expanded by - extracting items from the specified nested array path and emitting each item as a - separate record. If remain_original_record is True, each expanded record will include - the original parent record in an "original_record" field. + Optionally, records can be expanded by providing a RecordExpander component. + When record_expander is configured, each extracted record is passed through the + expander which extracts items from nested array fields and emits each item as a + separate record. Examples of instantiating this transform: ``` @@ -59,26 +59,26 @@ class DpathExtractor(RecordExtractor): field_path: - "data" - "object" - expand_records_from_field: - - "lines" - - "data" - remain_original_record: true + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true ``` Attributes: field_path (Union[InterpolatedString, str]): Path to the field that should be extracted config (Config): The user-provided configuration as specified by the source's spec decoder (Decoder): The decoder responsible to transfom the response in a Mapping - expand_records_from_field (Optional[List[Union[InterpolatedString, str]]]): Path to a nested array field within each extracted record. If provided, items from this array will be extracted and emitted as separate records. - remain_original_record (bool): If True and expand_records_from_field is set, each expanded record will include the original parent record in an "original_record" field. Defaults to False. + record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields """ field_path: List[Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) - expand_records_from_field: Optional[List[Union[InterpolatedString, str]]] = None - remain_original_record: bool = False + record_expander: Optional[RecordExpander] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -90,46 +90,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.field_path[path_index], parameters=parameters ) - if self.expand_records_from_field: - self._expand_path: Optional[List[InterpolatedString]] = [ - InterpolatedString.create(path, parameters=parameters) - for path in self.expand_records_from_field - ] - else: - self._expand_path = None - - def _expand_record( - self, record: MutableMapping[Any, Any] - ) -> Iterable[MutableMapping[Any, Any]]: - """Expand a record by extracting items from a nested array field.""" - if not self._expand_path: - yield record - return - - expand_path = [path.eval(self.config) for path in self._expand_path] - - try: - nested_array = dpath.get(record, expand_path) - except (KeyError, TypeError): - yield record - return - - if not isinstance(nested_array, list): - yield record - return - - if len(nested_array) == 0: - return - - for item in nested_array: - if isinstance(item, dict): - expanded_record = dict(item) - if self.remain_original_record: - expanded_record["original_record"] = record - yield expanded_record - else: - yield item - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: for body in self.decoder.decode(response): if len(self._field_path) == 0: @@ -142,8 +102,14 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): for record in extracted: - yield from self._expand_record(record) + if self.record_expander: + yield from self.record_expander.expand_record(record) + else: + yield record elif extracted: - yield from self._expand_record(extracted) + if self.record_expander: + yield from self.record_expander.expand_record(extracted) + else: + yield extracted else: yield from [] diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 3ed86bf06..00b1a18ff 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -42,6 +42,7 @@ "DefaultPaginator.page_size_option": "RequestOption", # DpathExtractor "DpathExtractor.decoder": "JsonDecoder", + "DpathExtractor.record_expander": "RecordExpander", # HttpRequester "HttpRequester.error_handler": "DefaultErrorHandler", # ListPartitionRouter diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index de4fd8bf1..998c3d4f2 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -14,6 +14,7 @@ IterableDecoder, JsonDecoder, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor config = {"field": "record_array"} @@ -247,13 +248,18 @@ def test_dpath_extractor_with_expansion( body, expected_records: List, ): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + remain_original_record=remain_original_record, + ) extractor = DpathExtractor( field_path=field_path, config=config, decoder=decoder_json, parameters=parameters, - expand_records_from_field=expand_records_from_field, - remain_original_record=remain_original_record, + record_expander=record_expander, ) response = create_response(body) From c0351383a6b45cf68711399b7c14eb1fddd70eec Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 16:35:00 +0000 Subject: [PATCH 06/12] feat: Add RecordExpander to declarative component schema - Add RecordExpander definition to declarative_component_schema.yaml - Add record_expander property to DpathExtractor schema - Update create_dpath_extractor in model_to_component_factory.py to handle record_expander - Auto-generate models from schema using poetry run poe build - All 24 tests passing This completes the schema registration for RecordExpander component, enabling YAML manifests to properly instantiate RecordExpander when used with DpathExtractor. Co-Authored-By: unknown <> --- .../declarative_component_schema.yaml | 35 +++++++++++++ .../models/declarative_component_schema.py | 51 +++++++++++++------ .../parsers/model_to_component_factory.py | 9 ++++ 3 files changed, 80 insertions(+), 15 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 153a5d105..e6ed9752f 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1912,6 +1912,10 @@ definitions: - ["data", "records"] - ["data", "{{ parameters.name }}"] - ["data", "*", "record"] + record_expander: + title: Record Expander + description: Optional component to expand records by extracting items from nested array fields. + "$ref": "#/definitions/RecordExpander" $parameters: type: object additionalProperties: true @@ -1928,6 +1932,37 @@ definitions: $parameters: type: object additionalProperties: true + RecordExpander: + title: Record Expander + description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. + type: object + required: + - type + - expand_records_from_field + properties: + type: + type: string + enum: [RecordExpander] + expand_records_from_field: + title: Expand Records From Field + description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. + type: array + items: + type: string + interpolation_context: + - config + examples: + - ["lines", "data"] + - ["items"] + - ["nested", "array"] + remain_original_record: + title: Remain Original Record + description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false. + type: boolean + default: false + $parameters: + type: object + additionalProperties: true ExponentialBackoffStrategy: title: Exponential Backoff description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0f5c0f1f9..2c17be403 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -482,24 +482,24 @@ class Config: ) -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( - ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', - examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], - ], - title="Field Path", - ) +class ResponseToFileExtractor(BaseModel): + type: Literal["ResponseToFileExtractor"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class ResponseToFileExtractor(BaseModel): - type: Literal["ResponseToFileExtractor"] +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( + ..., + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.", + examples=[["lines", "data"], ["items"], ["nested", "array"]], + title="Expand Records From Field", + ) + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2034,6 +2034,27 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( + ..., + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestApiKeyAuthenticator(BaseModel): type: Literal["ApiKey"] inject_into: RequestOption = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 867c93a22..f71968258 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2314,11 +2314,20 @@ def create_dpath_extractor( else: decoder_to_use = JsonDecoder(parameters={}) model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + + record_expander = None + if hasattr(model, "record_expander") and model.record_expander: + record_expander = self._create_component_from_model( + model=model.record_expander, + config=config, + ) + return DpathExtractor( decoder=decoder_to_use, field_path=model_field_path, config=config, parameters=model.parameters or {}, + record_expander=record_expander, ) @staticmethod From b04e174a1fbf876e9146ce507a2a6fe88f764198 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 16:50:17 +0000 Subject: [PATCH 07/12] refactor: Clean up DpathExtractor extract_records logic Apply cleaner logic using 'yield from' consistently: - When extracted is a list without record_expander, use 'yield from extracted' - Check 'if not self.record_expander' instead of nested if/else - Remove unnecessary 'yield from []' for empty case All 24 tests passing. Suggested by @DanyloGL. Co-Authored-By: unknown <> --- .../sources/declarative/extractors/dpath_extractor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index ca9ec9b44..0189421c4 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -101,15 +101,13 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): - for record in extracted: - if self.record_expander: + if not self.record_expander: + yield from extracted + else: + for record in extracted: yield from self.record_expander.expand_record(record) - else: - yield record elif extracted: if self.record_expander: yield from self.record_expander.expand_record(extracted) else: yield extracted - else: - yield from [] From c8a2643a2a4ab6641c1543af43892fb2e22d6484 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 16:58:39 +0000 Subject: [PATCH 08/12] fix: Update RecordExpander to return nothing when path doesn't exist Changes: - Add back 'else: yield from []' in DpathExtractor for explicit empty case - Update RecordExpander to return nothing when expand_records_from_field path doesn't exist or isn't a list - Update unit tests to expect no records instead of original record when expansion fails This makes RecordExpander stricter: it only emits records when successfully expanding a list. For Stripe invoice_line_items, this ensures we only emit line items, not invoice objects. All 24 tests passing. Requested by @DanyloGL. Co-Authored-By: unknown <> --- .../sources/declarative/expanders/record_expander.py | 2 -- .../sources/declarative/extractors/dpath_extractor.py | 2 ++ .../declarative/extractors/test_dpath_extractor.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index d766ad920..f753270bc 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -58,11 +58,9 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap try: nested_array = dpath.get(record, expand_path) except (KeyError, TypeError): - yield record return if not isinstance(nested_array, list): - yield record return if len(nested_array) == 0: diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 0189421c4..1d8831056 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -111,3 +111,5 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin yield from self.record_expander.expand_record(extracted) else: yield extracted + else: + yield from [] diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 998c3d4f2..35d3946ed 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -191,14 +191,14 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco ["items"], False, {"data": {"id": "parent_1"}}, - [{"id": "parent_1"}], + [], ), ( ["data"], ["items"], False, {"data": {"id": "parent_1", "items": "not_an_array"}}, - [{"id": "parent_1", "items": "not_an_array"}], + [], ), ( ["data"], @@ -234,8 +234,8 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco "test_expand_nested_array", "test_expand_with_original_record", "test_expand_empty_array_yields_nothing", - "test_expand_missing_path_yields_original", - "test_expand_non_array_yields_original", + "test_expand_missing_path_yields_nothing", + "test_expand_non_array_yields_nothing", "test_expand_deeply_nested_path", "test_expand_mixed_types_in_array", "test_expand_multiple_parent_records", From c6a9d05c8a2add98aba05f9d55ca15a54e81b7b1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:08:52 +0000 Subject: [PATCH 09/12] feat: Add wildcard support to RecordExpander and remove TypeError Changes: 1. Remove TypeError from exception handler (only catch KeyError per dpath.get docs) 2. Add wildcard (*) support to RecordExpander for matching multiple arrays 3. Update docstring and schema to document wildcard support 4. Add 5 new unit tests for wildcard expansion scenarios 5. Regenerate models from updated schema When wildcards are used, RecordExpander: - Uses dpath.values() to find all matches - Filters for list-valued matches only - Expands items from all matched lists - Returns nothing if no list matches found All 29 tests passing. Requested by @DanyloGL. Co-Authored-By: unknown <> --- .../declarative_component_schema.yaml | 5 +- .../declarative/expanders/record_expander.py | 71 +++++++++++----- .../models/declarative_component_schema.py | 9 ++- .../extractors/test_dpath_extractor.py | 81 +++++++++++++++++++ 4 files changed, 142 insertions(+), 24 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e6ed9752f..09bf07da1 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1934,7 +1934,7 @@ definitions: additionalProperties: true RecordExpander: title: Record Expander - description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. + description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays. type: object required: - type @@ -1945,7 +1945,7 @@ definitions: enum: [RecordExpander] expand_records_from_field: title: Expand Records From Field - description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. + description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays. type: array items: type: string @@ -1955,6 +1955,7 @@ definitions: - ["lines", "data"] - ["items"] - ["nested", "array"] + - ["sections", "*", "items"] remain_original_record: title: Remain Original Record description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false. diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index f753270bc..12ab8cd1f 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -20,6 +20,9 @@ class RecordExpander: within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. + The expand_records_from_field path supports wildcards (*) for matching multiple arrays. + When wildcards are used, items from all matched arrays are extracted and emitted. + Examples of instantiating this component: ``` record_expander: @@ -30,8 +33,18 @@ class RecordExpander: remain_original_record: true ``` + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "sections" + - "*" + - "items" + remain_original_record: false + ``` + Attributes: - expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. + expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*). remain_original_record (bool): If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False. config (Config): The user-provided configuration as specified by the source's spec """ @@ -55,22 +68,40 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap expand_path = [path.eval(self.config) for path in self._expand_path] - try: - nested_array = dpath.get(record, expand_path) - except (KeyError, TypeError): - return - - if not isinstance(nested_array, list): - return - - if len(nested_array) == 0: - return - - for item in nested_array: - if isinstance(item, dict): - expanded_record = dict(item) - if self.remain_original_record: - expanded_record["original_record"] = record - yield expanded_record - else: - yield item + if "*" in expand_path: + matches = dpath.values(record, expand_path) + list_nodes = [m for m in matches if isinstance(m, list)] + if not list_nodes: + return + + for nested_array in list_nodes: + if len(nested_array) == 0: + continue + for item in nested_array: + if isinstance(item, dict): + expanded_record = dict(item) + if self.remain_original_record: + expanded_record["original_record"] = record + yield expanded_record + else: + yield item + else: + try: + nested_array = dpath.get(record, expand_path) + except KeyError: + return + + if not isinstance(nested_array, list): + return + + if len(nested_array) == 0: + return + + for item in nested_array: + if isinstance(item, dict): + expanded_record = dict(item) + if self.remain_original_record: + expanded_record["original_record"] = record + yield expanded_record + else: + yield item diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2c17be403..da1cc9474 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -491,8 +491,13 @@ class RecordExpander(BaseModel): type: Literal["RecordExpander"] expand_records_from_field: List[str] = Field( ..., - description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.", - examples=[["lines", "data"], ["items"], ["nested", "array"]], + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + examples=[ + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], + ], title="Expand Records From Field", ) remain_original_record: Optional[bool] = Field( diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 35d3946ed..affd03216 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -229,6 +229,82 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco ], [{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}], ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}, {"id": "item_2"}]}, + {"name": "section2", "items": [{"id": "item_3"}]}, + ] + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + True, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + } + }, + [ + { + "id": "item_1", + "original_record": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + }, + } + ], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": []}, + {"name": "section2", "items": []}, + ] + } + }, + [], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1"}, + {"name": "section2", "items": "not_an_array"}, + ] + } + }, + [], + ), + ( + ["data"], + ["*", "items"], + False, + { + "data": { + "group1": {"items": [{"id": "item_1"}]}, + "group2": {"items": [{"id": "item_2"}, {"id": "item_3"}]}, + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), ], ids=[ "test_expand_nested_array", @@ -239,6 +315,11 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco "test_expand_deeply_nested_path", "test_expand_mixed_types_in_array", "test_expand_multiple_parent_records", + "test_expand_wildcard_multiple_lists", + "test_expand_wildcard_with_original_record", + "test_expand_wildcard_all_empty_arrays", + "test_expand_wildcard_no_list_matches", + "test_expand_wildcard_dict_values", ], ) def test_dpath_extractor_with_expansion( From c6448e5de85815db6c52baa8555a73aba7ff0a25 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:12:59 +0000 Subject: [PATCH 10/12] fix: Add type casts for dpath.values and dpath.get to fix MyPy errors MyPy was complaining that dpath.values() and dpath.get() return 'object' type. Added cast(Iterable[Any], ...) for dpath.values() and cast(Any, ...) for dpath.get() to satisfy MyPy type checking while maintaining runtime behavior. All 29 tests passing. MyPy check now passes. Co-Authored-By: unknown <> --- .../sources/declarative/expanders/record_expander.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index 12ab8cd1f..ea3c49c17 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union, cast import dpath @@ -69,7 +69,7 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap expand_path = [path.eval(self.config) for path in self._expand_path] if "*" in expand_path: - matches = dpath.values(record, expand_path) + matches = cast(Iterable[Any], dpath.values(record, expand_path)) list_nodes = [m for m in matches if isinstance(m, list)] if not list_nodes: return @@ -87,7 +87,7 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap yield item else: try: - nested_array = dpath.get(record, expand_path) + nested_array = cast(Any, dpath.get(record, expand_path)) except KeyError: return From 6afe47423de5006813e98c56c276e5a80b7e0619 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:15:17 +0000 Subject: [PATCH 11/12] refactor: Eliminate code duplication in expand_record method Unified the wildcard and non-wildcard branches by collecting all arrays to process into a single list, then using one common loop for expansion. This eliminates the duplicated item iteration and record expansion logic. All 29 tests passing. MyPy check passes. Co-Authored-By: unknown <> --- .../declarative/expanders/record_expander.py | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index ea3c49c17..fa12c7306 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -68,35 +68,27 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap expand_path = [path.eval(self.config) for path in self._expand_path] + arrays: List[List[Any]] = [] + if "*" in expand_path: matches = cast(Iterable[Any], dpath.values(record, expand_path)) - list_nodes = [m for m in matches if isinstance(m, list)] - if not list_nodes: - return - - for nested_array in list_nodes: - if len(nested_array) == 0: - continue - for item in nested_array: - if isinstance(item, dict): - expanded_record = dict(item) - if self.remain_original_record: - expanded_record["original_record"] = record - yield expanded_record - else: - yield item + arrays = [m for m in matches if isinstance(m, list)] else: try: - nested_array = cast(Any, dpath.get(record, expand_path)) + nested = cast(Any, dpath.get(record, expand_path)) except KeyError: return - - if not isinstance(nested_array, list): + if isinstance(nested, list): + arrays = [nested] + else: return - if len(nested_array) == 0: - return + if not arrays: + return + for nested_array in arrays: + if not nested_array: + continue for item in nested_array: if isinstance(item, dict): expanded_record = dict(item) From 5b0c0d5e3d4c650c0bf1f738f87255d7db3cd373 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:22:43 +0000 Subject: [PATCH 12/12] refactor: Simplify expand_record per code review feedback Changes per Daryna's feedback: 1. Removed isinstance(m, list) filter - now checking inside loop 2. Renamed 'matches' to 'extracted' 3. Removed type casts - using 'extracted: Any' instead 4. Renamed 'nested_array' to 'record' (loop var), using 'parent_record' for original 5. Removed 'if not nested_array:' check (redundant with for loop) All 29 tests passing. MyPy check passes. Co-Authored-By: unknown <> --- .../declarative/expanders/record_expander.py | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index fa12c7306..b4579c385 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union, cast +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath @@ -66,34 +66,33 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap yield record return + parent_record = record expand_path = [path.eval(self.config) for path in self._expand_path] - arrays: List[List[Any]] = [] - if "*" in expand_path: - matches = cast(Iterable[Any], dpath.values(record, expand_path)) - arrays = [m for m in matches if isinstance(m, list)] + extracted: Any = dpath.values(parent_record, expand_path) + for record in extracted: + if isinstance(record, list): + for item in record: + if isinstance(item, dict): + expanded_record = dict(item) + if self.remain_original_record: + expanded_record["original_record"] = parent_record + yield expanded_record + else: + yield item else: try: - nested = cast(Any, dpath.get(record, expand_path)) + extracted = dpath.get(parent_record, expand_path) except KeyError: return - if isinstance(nested, list): - arrays = [nested] - else: + if not isinstance(extracted, list): return - - if not arrays: - return - - for nested_array in arrays: - if not nested_array: - continue - for item in nested_array: + for item in extracted: if isinstance(item, dict): expanded_record = dict(item) if self.remain_original_record: - expanded_record["original_record"] = record + expanded_record["original_record"] = parent_record yield expanded_record else: yield item