Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,10 @@ definitions:
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
record_expander:
title: Record Expander
description: Optional component to expand records by extracting items from nested array fields.
"$ref": "#/definitions/RecordExpander"
$parameters:
type: object
additionalProperties: true
Expand All @@ -1928,6 +1932,38 @@ definitions:
$parameters:
type: object
additionalProperties: true
RecordExpander:
title: Record Expander
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays.
type: object
required:
- type
- expand_records_from_field
properties:
type:
type: string
enum: [RecordExpander]
expand_records_from_field:
title: Expand Records From Field
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.
type: array
items:
type: string
interpolation_context:
- config
examples:
- ["lines", "data"]
- ["items"]
- ["nested", "array"]
- ["sections", "*", "items"]
remain_original_record:
title: Remain Original Record
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
ExponentialBackoffStrategy:
title: Exponential Backoff
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Expand Down
7 changes: 7 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander

__all__ = ["RecordExpander"]
98 changes: 98 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/record_expander.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


@dataclass
class RecordExpander:
"""
Expands records by extracting items from a nested array field.

When configured, this component extracts items from a specified nested array path
within each record and emits each item as a separate record. Optionally, the original
parent record can be embedded in each expanded item for context preservation.

The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
When wildcards are used, items from all matched arrays are extracted and emitted.

Examples of instantiating this component:
```
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

```
record_expander:
type: RecordExpander
expand_records_from_field:
- "sections"
- "*"
- "items"
remain_original_record: false
```

Attributes:
expand_records_from_field (List[Union[InterpolatedString, str]]): Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*).
remain_original_record (bool): If True, each expanded record will include the original parent record in an "original_record" field. Defaults to False.
config (Config): The user-provided configuration as specified by the source's spec
"""

expand_records_from_field: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._expand_path: Optional[List[InterpolatedString]] = [
InterpolatedString.create(path, parameters=parameters)
for path in self.expand_records_from_field
]

def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
"""Expand a record by extracting items from a nested array field."""
if not self._expand_path:
yield record
return

parent_record = record
expand_path = [path.eval(self.config) for path in self._expand_path]

if "*" in expand_path:
extracted: Any = dpath.values(parent_record, expand_path)
for record in extracted:
if isinstance(record, list):
for item in record:
if isinstance(item, dict):
expanded_record = dict(item)
if self.remain_original_record:
expanded_record["original_record"] = parent_record
yield expanded_record
else:
yield item
else:
try:
extracted = dpath.get(parent_record, expand_path)
except KeyError:
return
if not isinstance(extracted, list):
return
for item in extracted:
if isinstance(item, dict):
expanded_record = dict(item)
if self.remain_original_record:
expanded_record["original_record"] = parent_record
yield expanded_record
else:
yield item
35 changes: 32 additions & 3 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
Expand All @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor):
If the field path points to an empty object, an empty array is returned.
If the field path points to a non-existing path, an empty array is returned.

Optionally, records can be expanded by providing a RecordExpander component.
When record_expander is configured, each extracted record is passed through the
expander which extracts items from nested array fields and emits each item as a
separate record.

Examples of instantiating this transform:
```
extractor:
Expand All @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor):
field_path: []
```

```
extractor:
type: DpathExtractor
field_path:
- "data"
- "object"
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

Attributes:
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
config (Config): The user-provided configuration as specified by the source's spec
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields
"""

field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
record_expander: Optional[RecordExpander] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
if not self.record_expander:
yield from extracted
else:
for record in extracted:
yield from self.record_expander.expand_record(record)
elif extracted:
yield extracted
if self.record_expander:
yield from self.record_expander.expand_record(extracted)
else:
yield extracted
else:
yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -482,24 +482,29 @@ class Config:
)


class DpathExtractor(BaseModel):
type: Literal["DpathExtractor"]
field_path: List[str] = Field(
class ResponseToFileExtractor(BaseModel):
type: Literal["ResponseToFileExtractor"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class RecordExpander(BaseModel):
type: Literal["RecordExpander"]
expand_records_from_field: List[str] = Field(
...,
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.",
examples=[
["data"],
["data", "records"],
["data", "{{ parameters.name }}"],
["data", "*", "record"],
["lines", "data"],
["items"],
["nested", "array"],
["sections", "*", "items"],
],
title="Field Path",
title="Expand Records From Field",
)
remain_original_record: Optional[bool] = Field(
False,
description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.',
title="Remain Original Record",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ResponseToFileExtractor(BaseModel):
type: Literal["ResponseToFileExtractor"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -2034,6 +2039,27 @@ class DefaultPaginator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DpathExtractor(BaseModel):
type: Literal["DpathExtractor"]
field_path: List[str] = Field(
...,
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
examples=[
["data"],
["data", "records"],
["data", "{{ parameters.name }}"],
["data", "*", "record"],
],
title="Field Path",
)
record_expander: Optional[RecordExpander] = Field(
None,
description="Optional component to expand records by extracting items from nested array fields.",
title="Record Expander",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SessionTokenRequestApiKeyAuthenticator(BaseModel):
type: Literal["ApiKey"]
inject_into: RequestOption = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"DefaultPaginator.page_size_option": "RequestOption",
# DpathExtractor
"DpathExtractor.decoder": "JsonDecoder",
"DpathExtractor.record_expander": "RecordExpander",
# HttpRequester
"HttpRequester.error_handler": "DefaultErrorHandler",
# ListPartitionRouter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2314,11 +2314,20 @@ def create_dpath_extractor(
else:
decoder_to_use = JsonDecoder(parameters={})
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]

record_expander = None
if hasattr(model, "record_expander") and model.record_expander:
record_expander = self._create_component_from_model(
model=model.record_expander,
config=config,
)

return DpathExtractor(
decoder=decoder_to_use,
field_path=model_field_path,
config=config,
parameters=model.parameters or {},
record_expander=record_expander,
)

@staticmethod
Expand Down
Loading
Loading