From 38d13d30af02cc4a47d771e9cbba85791c1ea7bf Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 19:45:47 +0000 Subject: [PATCH 01/16] feat: Add fetch_one/fetch_record functionality to SimpleRetriever Implements Phase 1 of GitHub issue airbytehq/airbyte-python-cdk#833 Changes: - Add fetch_one() method to SimpleRetriever for fetching single records by PK - Add fetch_record() base method to Stream class - Implement fetch_record() in DeclarativeStream to delegate to retriever - Add fetch_record() helper method to AbstractSource - Add comprehensive unit tests for fetch_one functionality The implementation uses convention-based path construction (appending PK value to base path) and supports both simple string PKs and composite dict PKs. Handles 404 responses gracefully by returning None. Co-Authored-By: AJ Steers --- .../sources/declarative/declarative_stream.py | 23 ++ airbyte_cdk/sources/abstract_source.py | 31 +++ .../retrievers/simple_retriever.py | 117 +++++++++++ airbyte_cdk/sources/streams/core.py | 20 ++ .../retrievers/test_simple_retriever.py | 197 ++++++++++++++++++ 5 files changed, 388 insertions(+) diff --git a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py index 89935fda8..d0d5539af 100644 --- a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py +++ b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py @@ -202,6 +202,29 @@ def get_cursor(self) -> Optional[Cursor]: return self.retriever.cursor return None + def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]: + """ + Fetch a single record by primary key value. + + Args: + pk_value: The primary key value. Can be: + - str: For simple single-field primary keys (e.g., "123") + - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + + Returns: + The fetched record as a dict, or None if not found + + Raises: + NotImplementedError: If the stream's retriever doesn't support fetching individual records + """ + if not isinstance(self.retriever, SimpleRetriever): + raise NotImplementedError( + f"Stream {self.name} does not support fetching individual records. " + "Only streams with SimpleRetriever currently support this operation." + ) + + return self.retriever.fetch_one(pk_value=pk_value, records_schema=self.get_json_schema()) + def _get_checkpoint_reader( self, logger: logging.Logger, diff --git a/airbyte_cdk/sources/abstract_source.py b/airbyte_cdk/sources/abstract_source.py index ab9ee48b8..e9afee696 100644 --- a/airbyte_cdk/sources/abstract_source.py +++ b/airbyte_cdk/sources/abstract_source.py @@ -324,3 +324,34 @@ def stop_sync_on_stream_failure(self) -> bool: on the first error seen and emit a single error trace message for that stream. """ return False + + def fetch_record( + self, stream_name: str, pk_value: Any, config: Mapping[str, Any] + ) -> Optional[Mapping[str, Any]]: + """ + Fetch a single record from a stream by primary key. + + Args: + stream_name: Name of the stream to fetch from + pk_value: Primary key value to fetch. Can be: + - str: For simple single-field primary keys (e.g., "123") + - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + config: Source configuration + + Returns: + The fetched record as a dict, or None if not found + + Raises: + ValueError: If the stream name is not found in the source + NotImplementedError: If the stream doesn't support fetching individual records + """ + stream_instances = {s.name: s for s in self.streams(config)} + stream = stream_instances.get(stream_name) + + if not stream: + raise ValueError( + f"Stream '{stream_name}' not found in source. " + f"Available streams: {', '.join(stream_instances.keys())}" + ) + + return stream.fetch_record(pk_value) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a30574107..a9d44d499 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -626,6 +626,123 @@ def _to_partition_key(to_serialize: Any) -> str: # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) + def fetch_one( + self, + pk_value: Union[str, Mapping[str, Any]], + records_schema: Mapping[str, Any], + ) -> Optional[Mapping[str, Any]]: + """ + Fetch a single record by primary key value. + + This method constructs a path by appending the primary key value to the base path + and sends a GET request to fetch a single record. It's designed for REST APIs that + follow the convention: GET /resource/{id} + + Args: + pk_value: The primary key value to fetch. Can be: + - str: For simple single-field primary keys (e.g., "123") + - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + records_schema: JSON schema describing the record structure + + Returns: + The fetched record as a dict, or None if not found (404 response) + + Raises: + Exception: For non-404 HTTP errors (propagated from requester's error handling) + + Example: + record = retriever.fetch_one("123", schema) + + record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema) + + Note: + This implementation uses convention-based path construction (Option B from design). (important-comment) + For simple PKs: appends /{pk_value} to base path (important-comment) + For composite PKs: appends /{value1}/{value2}/... in key order (important-comment) + + Alternative approaches that could be implemented in the future: (important-comment) + - Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment) + See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment) + - Field path configuration: Allow specifying which response field contains the record (important-comment) + for APIs that wrap single records in envelopes like {"data": {...}} (important-comment) + """ + # Get the base path from the requester + base_path = self.requester.get_path( + stream_state={}, + stream_slice=StreamSlice(partition={}, cursor_slice={}), + next_page_token=None, + ) + + if isinstance(pk_value, str): + fetch_path = f"{base_path}/{pk_value}".replace("//", "/") + elif isinstance(pk_value, Mapping): + sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())] + pk_path_segment = "/".join(sorted_values) + fetch_path = f"{base_path}/{pk_path_segment}".replace("//", "/") + else: + raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}") + + stream_slice = StreamSlice(partition={}, cursor_slice={}) + + try: + response = self.requester.send_request( + path=fetch_path, + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + request_headers=self._request_headers( + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + ), + request_params=self._request_params( + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + ), + request_body_data=self._request_body_data( + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + ), + request_body_json=self._request_body_json( + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + ), + log_formatter=self.log_formatter, + ) + except Exception as e: + # Check if this is a 404 (record not found) - return None + if hasattr(e, "response") and hasattr(e.response, "status_code"): + if e.response.status_code == 404: + return None + raise + + if not response: + return None + + records = list( + self._parse_response( + response=response, + stream_state={}, + records_schema=records_schema, + stream_slice=stream_slice, + next_page_token=None, + ) + ) + + # Return the first record if found, None otherwise + if records: + first_record = records[0] + if isinstance(first_record, Record): + return dict(first_record.data) + elif isinstance(first_record, Mapping): + return dict(first_record) + else: + return None + return None + def _deep_merge( target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]] diff --git a/airbyte_cdk/sources/streams/core.py b/airbyte_cdk/sources/streams/core.py index 6cc5c8b5d..ce6b1c887 100644 --- a/airbyte_cdk/sources/streams/core.py +++ b/airbyte_cdk/sources/streams/core.py @@ -463,6 +463,26 @@ def get_cursor(self) -> Optional[Cursor]: """ return self.cursor + def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]: + """ + Fetch a single record by primary key value. + + Args: + pk_value: The primary key value. Can be: + - str: For simple single-field primary keys (e.g., "123") + - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + + Returns: + The fetched record as a dict, or None if not found + + Raises: + NotImplementedError: If the stream doesn't support fetching individual records + """ + raise NotImplementedError( + f"Stream {self.name} does not support fetching individual records. " + "Only declarative streams with SimpleRetriever currently support this operation." + ) + def _get_checkpoint_reader( self, logger: logging.Logger, diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 5caec9a34..b20981296 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1644,3 +1644,200 @@ def _mock_paginator(): paginator.get_request_body_data.__name__ = "get_request_body_data" paginator.get_request_body_json.__name__ = "get_request_body_json" return paginator + + +def test_fetch_one_simple_pk(): + """Test fetch_one with a simple string primary key.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8") + + requester.send_request.return_value = response + + record_selector = MagicMock() + record_selector.select_records.return_value = [ + Record(data={"id": "123", "title": "Test Post"}, stream_name="posts", associated_slice=None) + ] + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one("123", records_schema={}) + + assert result == {"id": "123", "title": "Test Post"} + requester.send_request.assert_called_once() + call_kwargs = requester.send_request.call_args[1] + assert call_kwargs["path"] == "posts/123" + + +def test_fetch_one_composite_pk(): + """Test fetch_one with a composite primary key (dict).""" + requester = MagicMock() + requester.get_path.return_value = "companies" + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps( + {"company_id": "123", "property": "status", "value": "active"} + ).encode("utf-8") + + requester.send_request.return_value = response + + record_selector = MagicMock() + record_selector.select_records.return_value = [ + Record( + data={"company_id": "123", "property": "status", "value": "active"}, + stream_name="companies", + associated_slice=None, + ) + ] + + retriever = SimpleRetriever( + name="companies", + primary_key=["company_id", "property"], + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one({"company_id": "123", "property": "status"}, records_schema={}) + + assert result == {"company_id": "123", "property": "status", "value": "active"} + requester.send_request.assert_called_once() + call_kwargs = requester.send_request.call_args[1] + assert call_kwargs["path"] == "companies/123/status" + + +def test_fetch_one_not_found(): + """Test fetch_one returns None when record is not found (404).""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + error = Exception("Not found") + error.response = MagicMock() + error.response.status_code = 404 + requester.send_request.side_effect = error + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one("999", records_schema={}) + + assert result is None + + +def test_fetch_one_server_error(): + """Test fetch_one propagates non-404 errors.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + error = Exception("Server error") + error.response = MagicMock() + error.response.status_code = 500 + requester.send_request.side_effect = error + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + with pytest.raises(Exception) as exc_info: + retriever.fetch_one("123", records_schema={}) + + assert "Server error" in str(exc_info.value) + + +def test_fetch_one_invalid_pk_type(): + """Test fetch_one raises ValueError for invalid pk_value type.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + with pytest.raises(ValueError) as exc_info: + retriever.fetch_one(123, records_schema={}) + + assert "pk_value must be a string or dict" in str(exc_info.value) + + +def test_fetch_one_no_response(): + """Test fetch_one returns None when response is None.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + requester.send_request.return_value = None + + record_selector = MagicMock() + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one("123", records_schema={}) + + assert result is None + + +def test_fetch_one_empty_records(): + """Test fetch_one returns None when no records are returned.""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({}).encode("utf-8") + + requester.send_request.return_value = response + + record_selector = MagicMock() + record_selector.select_records.return_value = [] + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one("123", records_schema={}) + + assert result is None From 052eb812c101fdab90c851f9e885b25401aff69e Mon Sep 17 00:00:00 2001 From: "Aaron (AJ) Steers" Date: Wed, 12 Nov 2025 12:05:26 -0800 Subject: [PATCH 02/16] update type hints --- .../legacy/sources/declarative/declarative_stream.py | 5 ++++- airbyte_cdk/sources/abstract_source.py | 11 +++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py index d0d5539af..1a5a2d230 100644 --- a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py +++ b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py @@ -202,7 +202,10 @@ def get_cursor(self) -> Optional[Cursor]: return self.retriever.cursor return None - def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]: + def fetch_record( + self, + pk_value: Any | Mapping[str, Any], + ) -> Optional[Mapping[str, Any]]: """ Fetch a single record by primary key value. diff --git a/airbyte_cdk/sources/abstract_source.py b/airbyte_cdk/sources/abstract_source.py index e9afee696..4771315e5 100644 --- a/airbyte_cdk/sources/abstract_source.py +++ b/airbyte_cdk/sources/abstract_source.py @@ -198,9 +198,9 @@ def read( logger.info(timer.report()) if len(stream_name_to_exception) > 0: - error_message = generate_failed_streams_error_message( - {key: [value] for key, value in stream_name_to_exception.items()} - ) + error_message = generate_failed_streams_error_message({ + key: [value] for key, value in stream_name_to_exception.items() + }) logger.info(error_message) # We still raise at least one exception when a stream raises an exception because the platform currently relies # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error @@ -326,7 +326,10 @@ def stop_sync_on_stream_failure(self) -> bool: return False def fetch_record( - self, stream_name: str, pk_value: Any, config: Mapping[str, Any] + self, + stream_name: str, + pk_value: Any | Mapping[str, Any], + config: Mapping[str, Any], ) -> Optional[Mapping[str, Any]]: """ Fetch a single record from a stream by primary key. From c5818276de0b6ec9662091b2ca29b481e199525c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 20:33:15 +0000 Subject: [PATCH 03/16] Change fetch_one to raise RecordNotFoundException instead of returning None for 404s Co-Authored-By: AJ Steers --- airbyte_cdk/sources/declarative/exceptions.py | 4 ++++ .../retrievers/simple_retriever.py | 24 ++++++++++++------- .../retrievers/test_simple_retriever.py | 22 ++++++++++------- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/sources/declarative/exceptions.py b/airbyte_cdk/sources/declarative/exceptions.py index ca67c6a55..0d4ad6a59 100644 --- a/airbyte_cdk/sources/declarative/exceptions.py +++ b/airbyte_cdk/sources/declarative/exceptions.py @@ -7,3 +7,7 @@ class ReadException(Exception): """ Raise when there is an error reading data from an API Source """ + + +class RecordNotFoundException(ReadException): + """Raised when a requested record is not found (e.g., 404 response).""" diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index a9d44d499..96e55bfda 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -26,6 +26,7 @@ from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.models import AirbyteMessage +from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( @@ -713,14 +714,17 @@ def fetch_one( log_formatter=self.log_formatter, ) except Exception as e: - # Check if this is a 404 (record not found) - return None - if hasattr(e, "response") and hasattr(e.response, "status_code"): - if e.response.status_code == 404: - return None + # Check if this is a 404 (record not found) - raise RecordNotFoundException + if "404" in str(e) or (hasattr(e, "response") and hasattr(e.response, "status_code") and e.response.status_code == 404): + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found" + ) from e raise if not response: - return None + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found (no response)" + ) records = list( self._parse_response( @@ -732,7 +736,7 @@ def fetch_one( ) ) - # Return the first record if found, None otherwise + # Return the first record if found, raise RecordNotFoundException otherwise if records: first_record = records[0] if isinstance(first_record, Record): @@ -740,8 +744,12 @@ def fetch_one( elif isinstance(first_record, Mapping): return dict(first_record) else: - return None - return None + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found (invalid record type)" + ) + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found (empty response)" + ) def _deep_merge( diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index b20981296..abf1d6449 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -24,6 +24,7 @@ ) from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth from airbyte_cdk.sources.declarative.decoders import JsonDecoder +from airbyte_cdk.sources.declarative.exceptions import RecordNotFoundException from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator @@ -1719,7 +1720,7 @@ def test_fetch_one_composite_pk(): def test_fetch_one_not_found(): - """Test fetch_one returns None when record is not found (404).""" + """Test fetch_one raises RecordNotFoundException when record is not found (404).""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1739,9 +1740,10 @@ def test_fetch_one_not_found(): config={}, ) - result = retriever.fetch_one("999", records_schema={}) + with pytest.raises(RecordNotFoundException) as exc_info: + retriever.fetch_one("999", records_schema={}) - assert result is None + assert "999" in str(exc_info.value) def test_fetch_one_server_error(): @@ -1794,7 +1796,7 @@ def test_fetch_one_invalid_pk_type(): def test_fetch_one_no_response(): - """Test fetch_one returns None when response is None.""" + """Test fetch_one raises RecordNotFoundException when response is None.""" requester = MagicMock() requester.get_path.return_value = "posts" requester.send_request.return_value = None @@ -1810,13 +1812,14 @@ def test_fetch_one_no_response(): config={}, ) - result = retriever.fetch_one("123", records_schema={}) + with pytest.raises(RecordNotFoundException) as exc_info: + retriever.fetch_one("123", records_schema={}) - assert result is None + assert "123" in str(exc_info.value) def test_fetch_one_empty_records(): - """Test fetch_one returns None when no records are returned.""" + """Test fetch_one raises RecordNotFoundException when no records are returned.""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1838,6 +1841,7 @@ def test_fetch_one_empty_records(): config={}, ) - result = retriever.fetch_one("123", records_schema={}) + with pytest.raises(RecordNotFoundException) as exc_info: + retriever.fetch_one("123", records_schema={}) - assert result is None + assert "123" in str(exc_info.value) From bab27813b64e69aa593c3614752a900178aa4867 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 20:34:22 +0000 Subject: [PATCH 04/16] Fix ruff format issues Co-Authored-By: AJ Steers --- airbyte_cdk/sources/abstract_source.py | 6 +++--- .../sources/declarative/retrievers/simple_retriever.py | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/abstract_source.py b/airbyte_cdk/sources/abstract_source.py index 4771315e5..cc3ff737a 100644 --- a/airbyte_cdk/sources/abstract_source.py +++ b/airbyte_cdk/sources/abstract_source.py @@ -198,9 +198,9 @@ def read( logger.info(timer.report()) if len(stream_name_to_exception) > 0: - error_message = generate_failed_streams_error_message({ - key: [value] for key, value in stream_name_to_exception.items() - }) + error_message = generate_failed_streams_error_message( + {key: [value] for key, value in stream_name_to_exception.items()} + ) logger.info(error_message) # We still raise at least one exception when a stream raises an exception because the platform currently relies # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 96e55bfda..f465beb4e 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -715,7 +715,11 @@ def fetch_one( ) except Exception as e: # Check if this is a 404 (record not found) - raise RecordNotFoundException - if "404" in str(e) or (hasattr(e, "response") and hasattr(e.response, "status_code") and e.response.status_code == 404): + if "404" in str(e) or ( + hasattr(e, "response") + and hasattr(e.response, "status_code") + and e.response.status_code == 404 + ): raise RecordNotFoundException( f"Record with primary key {pk_value} not found" ) from e From a2e8e55db12f4a0f7e2e8166de8a95dc0dc0208c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 21:05:48 +0000 Subject: [PATCH 05/16] Improve path construction robustness in fetch_one - Use rstrip('/') on base_path and lstrip('/') on pk segments - More robust than replace('//', '/') for handling edge cases - Addresses Copilot review feedback Co-Authored-By: AJ Steers --- .../sources/declarative/retrievers/simple_retriever.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index f465beb4e..f9f48d096 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -675,11 +675,11 @@ def fetch_one( ) if isinstance(pk_value, str): - fetch_path = f"{base_path}/{pk_value}".replace("//", "/") + fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}" elif isinstance(pk_value, Mapping): - sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())] + sorted_values = [str(pk_value[key]).lstrip("/") for key in sorted(pk_value.keys())] pk_path_segment = "/".join(sorted_values) - fetch_path = f"{base_path}/{pk_path_segment}".replace("//", "/") + fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}" else: raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}") From c0719e2047f4a752ebfc67fb7f2895d13b773493 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 23:35:34 +0000 Subject: [PATCH 06/16] refactor: Move fetch_record to concurrent declarative classes only - Remove fetch_record from AbstractSource (legacy, not supported) - Remove fetch_record from Stream base class (legacy, not supported) - Remove fetch_record from DeclarativeStream (legacy, not supported) - Add fetch_record to ConcurrentDeclarativeSource (primary implementation) - Add fetch_record to AbstractStream as concrete method (raises NotImplementedError) - Keep SimpleRetriever.fetch_one as underlying mechanism This refactor ensures fetch_record is ONLY available for declarative concurrent connectors (PyAirbyte use case), not legacy sources. Co-Authored-By: AJ Steers --- .../sources/declarative/declarative_stream.py | 26 ------- airbyte_cdk/sources/abstract_source.py | 34 -------- .../concurrent_declarative_source.py | 77 +++++++++++++++++++ .../streams/concurrent/abstract_stream.py | 21 +++++ airbyte_cdk/sources/streams/core.py | 20 ----- 5 files changed, 98 insertions(+), 80 deletions(-) diff --git a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py index 1a5a2d230..89935fda8 100644 --- a/airbyte_cdk/legacy/sources/declarative/declarative_stream.py +++ b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py @@ -202,32 +202,6 @@ def get_cursor(self) -> Optional[Cursor]: return self.retriever.cursor return None - def fetch_record( - self, - pk_value: Any | Mapping[str, Any], - ) -> Optional[Mapping[str, Any]]: - """ - Fetch a single record by primary key value. - - Args: - pk_value: The primary key value. Can be: - - str: For simple single-field primary keys (e.g., "123") - - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) - - Returns: - The fetched record as a dict, or None if not found - - Raises: - NotImplementedError: If the stream's retriever doesn't support fetching individual records - """ - if not isinstance(self.retriever, SimpleRetriever): - raise NotImplementedError( - f"Stream {self.name} does not support fetching individual records. " - "Only streams with SimpleRetriever currently support this operation." - ) - - return self.retriever.fetch_one(pk_value=pk_value, records_schema=self.get_json_schema()) - def _get_checkpoint_reader( self, logger: logging.Logger, diff --git a/airbyte_cdk/sources/abstract_source.py b/airbyte_cdk/sources/abstract_source.py index cc3ff737a..ab9ee48b8 100644 --- a/airbyte_cdk/sources/abstract_source.py +++ b/airbyte_cdk/sources/abstract_source.py @@ -324,37 +324,3 @@ def stop_sync_on_stream_failure(self) -> bool: on the first error seen and emit a single error trace message for that stream. """ return False - - def fetch_record( - self, - stream_name: str, - pk_value: Any | Mapping[str, Any], - config: Mapping[str, Any], - ) -> Optional[Mapping[str, Any]]: - """ - Fetch a single record from a stream by primary key. - - Args: - stream_name: Name of the stream to fetch from - pk_value: Primary key value to fetch. Can be: - - str: For simple single-field primary keys (e.g., "123") - - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) - config: Source configuration - - Returns: - The fetched record as a dict, or None if not found - - Raises: - ValueError: If the stream name is not found in the source - NotImplementedError: If the stream doesn't support fetching individual records - """ - stream_instances = {s.name: s for s in self.streams(config)} - stream = stream_instances.get(stream_name) - - if not stream: - raise ValueError( - f"Stream '{stream_name}' not found in source. " - f"Available streams: {', '.join(stream_instances.keys())}" - ) - - return stream.fetch_record(pk_value) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 781bb64d1..fbc9053ec 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -507,6 +507,83 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) return AirbyteConnectionStatus(status=Status.SUCCEEDED) + def fetch_record( + self, + stream_name: str, + pk_value: Any, + config: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + """ + Fetch a single record from a stream by primary key. + + Args: + stream_name: Name of the stream to fetch from + pk_value: Primary key value to fetch. Can be: + - str: For simple single-field primary keys (e.g., "123") + - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + config: Source configuration (optional, uses instance config if not provided) + + Returns: + The fetched record as a dict + + Raises: + ValueError: If the stream name is not found in the source + NotImplementedError: If the stream doesn't support fetching individual records + RecordNotFoundException: If the record is not found (404 response) + """ + from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DeclarativeStream as DeclarativeStreamModel, + ) + from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever + from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader + + config = config or self._config + + stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams + + stream_config = None + for config_item in stream_configs: + if config_item.get("name") == stream_name: + stream_config = config_item + break + + if not stream_config: + available_streams = [c.get("name") for c in stream_configs] + raise ValueError( + f"Stream '{stream_name}' not found in source. " + f"Available streams: {', '.join(available_streams)}" + ) + + retriever = self._constructor.create_component( + DeclarativeStreamModel, + stream_config, + config, + emit_connector_builder_messages=self._emit_connector_builder_messages, + ).retriever + + if not isinstance(retriever, SimpleRetriever): + raise NotImplementedError( + f"Stream '{stream_name}' does not support fetching individual records. " + "Only streams with SimpleRetriever currently support this operation." + ) + + schema_loader_config = stream_config.get("schema_loader") + if schema_loader_config: + schema_loader = self._constructor.create_component( + type(schema_loader_config), + schema_loader_config, + config, + ) + else: + options = stream_config.get("parameters", {}) + if "name" not in options: + options["name"] = stream_name + schema_loader = DefaultSchemaLoader(config=config, parameters=options) + + return retriever.fetch_one( + pk_value=pk_value, records_schema=schema_loader.get_json_schema() + ) + @property def dynamic_streams(self) -> List[Dict[str, Any]]: return self._dynamic_stream_configs( diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 667d088ab..39b94ee33 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -90,3 +90,24 @@ def check_availability(self) -> StreamAvailability: """ :return: If the stream is available and if not, why """ + + def fetch_record(self, pk_value: Any) -> Mapping[str, Any]: + """ + Fetch a single record by primary key value. + + Args: + pk_value: The primary key value. Can be: + - str: For simple single-field primary keys (e.g., "123") + - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + + Returns: + The fetched record as a dict + + Raises: + NotImplementedError: If the stream doesn't support fetching individual records + RecordNotFoundException: If the record is not found (404 response) + """ + raise NotImplementedError( + f"Stream {self.name} does not support fetching individual records. " + "Only declarative streams with SimpleRetriever currently support this operation." + ) diff --git a/airbyte_cdk/sources/streams/core.py b/airbyte_cdk/sources/streams/core.py index ce6b1c887..6cc5c8b5d 100644 --- a/airbyte_cdk/sources/streams/core.py +++ b/airbyte_cdk/sources/streams/core.py @@ -463,26 +463,6 @@ def get_cursor(self) -> Optional[Cursor]: """ return self.cursor - def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]: - """ - Fetch a single record by primary key value. - - Args: - pk_value: The primary key value. Can be: - - str: For simple single-field primary keys (e.g., "123") - - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) - - Returns: - The fetched record as a dict, or None if not found - - Raises: - NotImplementedError: If the stream doesn't support fetching individual records - """ - raise NotImplementedError( - f"Stream {self.name} does not support fetching individual records. " - "Only declarative streams with SimpleRetriever currently support this operation." - ) - def _get_checkpoint_reader( self, logger: logging.Logger, From 38f6dc318946b6c62ca01291654fab37bb482b44 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 00:54:45 +0000 Subject: [PATCH 07/16] fix: Address PR review comments - Update SimpleRetriever.fetch_one docstring to reflect RecordNotFoundException behavior - Move inline imports to top-level in ConcurrentDeclarativeSource - Simplify schema building logic using declarative_stream.get_json_schema() - Remove unused DefaultSchemaLoader import Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers --- .../concurrent_declarative_source.py | 30 ++++--------------- .../retrievers/simple_retriever.py | 11 +++---- 2 files changed, 12 insertions(+), 29 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index fbc9053ec..ea3e486fd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -77,6 +77,7 @@ ModelToComponentFactory, ) from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING +from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever from airbyte_cdk.sources.declarative.spec.spec import Spec from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository @@ -531,12 +532,6 @@ def fetch_record( NotImplementedError: If the stream doesn't support fetching individual records RecordNotFoundException: If the record is not found (404 response) """ - from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - DeclarativeStream as DeclarativeStreamModel, - ) - from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever - from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader - config = config or self._config stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams @@ -554,34 +549,21 @@ def fetch_record( f"Available streams: {', '.join(available_streams)}" ) - retriever = self._constructor.create_component( + declarative_stream = self._constructor.create_component( DeclarativeStreamModel, stream_config, config, emit_connector_builder_messages=self._emit_connector_builder_messages, - ).retriever + ) - if not isinstance(retriever, SimpleRetriever): + if not isinstance(declarative_stream.retriever, SimpleRetriever): raise NotImplementedError( f"Stream '{stream_name}' does not support fetching individual records. " "Only streams with SimpleRetriever currently support this operation." ) - schema_loader_config = stream_config.get("schema_loader") - if schema_loader_config: - schema_loader = self._constructor.create_component( - type(schema_loader_config), - schema_loader_config, - config, - ) - else: - options = stream_config.get("parameters", {}) - if "name" not in options: - options["name"] = stream_name - schema_loader = DefaultSchemaLoader(config=config, parameters=options) - - return retriever.fetch_one( - pk_value=pk_value, records_schema=schema_loader.get_json_schema() + return declarative_stream.retriever.fetch_one( + pk_value=pk_value, records_schema=declarative_stream.get_json_schema() ) @property diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index f9f48d096..547f57160 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -631,9 +631,8 @@ def fetch_one( self, pk_value: Union[str, Mapping[str, Any]], records_schema: Mapping[str, Any], - ) -> Optional[Mapping[str, Any]]: - """ - Fetch a single record by primary key value. + ) -> Mapping[str, Any]: + """Fetch a single record by primary key value. This method constructs a path by appending the primary key value to the base path and sends a GET request to fetch a single record. It's designed for REST APIs that @@ -646,10 +645,12 @@ def fetch_one( records_schema: JSON schema describing the record structure Returns: - The fetched record as a dict, or None if not found (404 response) + The fetched record as a dict. Raises: - Exception: For non-404 HTTP errors (propagated from requester's error handling) + RecordNotFoundException: If the record is not found (404 response). + ValueError: If pk_value is not a string or dict. + Exception: For non-404 HTTP errors (propagated from requester's error handling). Example: record = retriever.fetch_one("123", schema) From 0a715435f486d025118829528da1b0890bca42c9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 00:55:49 +0000 Subject: [PATCH 08/16] fix: Simplify error handling in fetch_one Remove manual 404 checking since send_request() already handles errors internally. If send_request() returns without raising, we have valid data. Errors are now propagated naturally from the requester's error handler. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers --- .../retrievers/simple_retriever.py | 64 ++++++++----------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 547f57160..989b45517 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -648,9 +648,9 @@ def fetch_one( The fetched record as a dict. Raises: - RecordNotFoundException: If the record is not found (404 response). + RecordNotFoundException: If the record is not found (empty response). ValueError: If pk_value is not a string or dict. - Exception: For non-404 HTTP errors (propagated from requester's error handling). + Exception: HTTP errors are propagated from requester's error handling. Example: record = retriever.fetch_one("123", schema) @@ -686,45 +686,33 @@ def fetch_one( stream_slice = StreamSlice(partition={}, cursor_slice={}) - try: - response = self.requester.send_request( - path=fetch_path, + response = self.requester.send_request( + path=fetch_path, + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + request_headers=self._request_headers( stream_state={}, stream_slice=stream_slice, next_page_token=None, - request_headers=self._request_headers( - stream_state={}, - stream_slice=stream_slice, - next_page_token=None, - ), - request_params=self._request_params( - stream_state={}, - stream_slice=stream_slice, - next_page_token=None, - ), - request_body_data=self._request_body_data( - stream_state={}, - stream_slice=stream_slice, - next_page_token=None, - ), - request_body_json=self._request_body_json( - stream_state={}, - stream_slice=stream_slice, - next_page_token=None, - ), - log_formatter=self.log_formatter, - ) - except Exception as e: - # Check if this is a 404 (record not found) - raise RecordNotFoundException - if "404" in str(e) or ( - hasattr(e, "response") - and hasattr(e.response, "status_code") - and e.response.status_code == 404 - ): - raise RecordNotFoundException( - f"Record with primary key {pk_value} not found" - ) from e - raise + ), + request_params=self._request_params( + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + ), + request_body_data=self._request_body_data( + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + ), + request_body_json=self._request_body_json( + stream_state={}, + stream_slice=stream_slice, + next_page_token=None, + ), + log_formatter=self.log_formatter, + ) if not response: raise RecordNotFoundException( From 626848b1ce21cd1cbe4a76401d46e557b98933dd Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:02:33 +0000 Subject: [PATCH 09/16] fix: Address PR review comments - add type hints and clarifying comments - Add type annotation to response variable: requests.Response | None - Add comment explaining response can be None when error handler returns IGNORE - Add comment explaining empty StreamSlice for single-record fetch - Update docstring to clarify 404s propagate as exceptions (not RecordNotFoundException) - RecordNotFoundException is only raised for empty/ignored responses or no parsed records Addresses comments from @aaronsteers on PR #846 Co-Authored-By: AJ Steers --- .../sources/declarative/retrievers/simple_retriever.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 989b45517..048df1c03 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -648,9 +648,9 @@ def fetch_one( The fetched record as a dict. Raises: - RecordNotFoundException: If the record is not found (empty response). + RecordNotFoundException: If the response is empty/ignored or parsing yields no records. ValueError: If pk_value is not a string or dict. - Exception: HTTP errors are propagated from requester's error handling. + Exception: HTTP errors (including 404) are propagated from requester's error handling. Example: record = retriever.fetch_one("123", schema) @@ -684,9 +684,11 @@ def fetch_one( else: raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}") + # Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice stream_slice = StreamSlice(partition={}, cursor_slice={}) - response = self.requester.send_request( + # send_request() may return None when the error handler chooses to IGNORE a response + response: requests.Response | None = self.requester.send_request( path=fetch_path, stream_state={}, stream_slice=stream_slice, From bf83bf16787d726216b301224db4986f4010cddf Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:05:50 +0000 Subject: [PATCH 10/16] refactor: Use iterator instead of list for fetch_one record parsing - Add type hints: records_iter: Iterable[Record] and first_record: Record | None - Replace list(...) with next(iter(...), None) to avoid materializing entire iterable - Remove unnecessary Mapping branch since _parse_response returns Iterable[Record] - Simplify logic from 27 lines to 18 lines This is more efficient (only fetches first record) and has clearer type hints. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers --- .../retrievers/simple_retriever.py | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 048df1c03..c5e4417f0 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -721,30 +721,21 @@ def fetch_one( f"Record with primary key {pk_value} not found (no response)" ) - records = list( - self._parse_response( - response=response, - stream_state={}, - records_schema=records_schema, - stream_slice=stream_slice, - next_page_token=None, - ) + records_iter: Iterable[Record] = self._parse_response( + response=response, + stream_state={}, + records_schema=records_schema, + stream_slice=stream_slice, + next_page_token=None, ) - # Return the first record if found, raise RecordNotFoundException otherwise - if records: - first_record = records[0] - if isinstance(first_record, Record): - return dict(first_record.data) - elif isinstance(first_record, Mapping): - return dict(first_record) - else: - raise RecordNotFoundException( - f"Record with primary key {pk_value} not found (invalid record type)" - ) - raise RecordNotFoundException( - f"Record with primary key {pk_value} not found (empty response)" - ) + first_record: Record | None = next(iter(records_iter), None) + if not first_record: + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found (empty response)" + ) + + return dict(first_record.data) def _deep_merge( From 691927b146a460014808f5a0a1fd7c96cf2f8405 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:46:52 +0000 Subject: [PATCH 11/16] refactor: Simplify fetch_one/fetch_record to only accept str for pk_value - Update SimpleRetriever.fetch_one signature: pk_value now str only (not Union[str, Mapping]) - Rename stream_slice to empty_stream_slice for clarity - Remove composite key handling logic (isinstance checks, dict path construction) - Update ConcurrentDeclarativeSource.fetch_record signature to accept str only - Update AbstractStream.fetch_record signature to accept str only - Remove test_fetch_one_composite_pk test (no longer supported) - Update test_fetch_one_invalid_pk_type to reflect new behavior - Simplify docstrings to remove composite key documentation This simplification makes the implementation cleaner and easier to maintain. Composite key support can be added later when there's a concrete use case. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers --- .../concurrent_declarative_source.py | 8 ++-- .../retrievers/simple_retriever.py | 42 ++++++----------- .../streams/concurrent/abstract_stream.py | 8 ++-- .../retrievers/test_simple_retriever.py | 47 ++----------------- 4 files changed, 23 insertions(+), 82 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index ea3e486fd..e146cacc3 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -511,7 +511,7 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon def fetch_record( self, stream_name: str, - pk_value: Any, + pk_value: str, config: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ @@ -519,9 +519,7 @@ def fetch_record( Args: stream_name: Name of the stream to fetch from - pk_value: Primary key value to fetch. Can be: - - str: For simple single-field primary keys (e.g., "123") - - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + pk_value: Primary key value to fetch as a string (e.g., "123") config: Source configuration (optional, uses instance config if not provided) Returns: @@ -530,7 +528,7 @@ def fetch_record( Raises: ValueError: If the stream name is not found in the source NotImplementedError: If the stream doesn't support fetching individual records - RecordNotFoundException: If the record is not found (404 response) + RecordNotFoundException: If the record is not found """ config = config or self._config diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index c5e4417f0..3878515e8 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -629,7 +629,7 @@ def _to_partition_key(to_serialize: Any) -> str: def fetch_one( self, - pk_value: Union[str, Mapping[str, Any]], + pk_value: str, records_schema: Mapping[str, Any], ) -> Mapping[str, Any]: """Fetch a single record by primary key value. @@ -639,9 +639,7 @@ def fetch_one( follow the convention: GET /resource/{id} Args: - pk_value: The primary key value to fetch. Can be: - - str: For simple single-field primary keys (e.g., "123") - - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + pk_value: The primary key value to fetch as a string (e.g., "123") records_schema: JSON schema describing the record structure Returns: @@ -649,18 +647,13 @@ def fetch_one( Raises: RecordNotFoundException: If the response is empty/ignored or parsing yields no records. - ValueError: If pk_value is not a string or dict. Exception: HTTP errors (including 404) are propagated from requester's error handling. Example: record = retriever.fetch_one("123", schema) - record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema) - Note: - This implementation uses convention-based path construction (Option B from design). (important-comment) - For simple PKs: appends /{pk_value} to base path (important-comment) - For composite PKs: appends /{value1}/{value2}/... in key order (important-comment) + This implementation uses convention-based path construction, appending /{pk_value} to the base path. (important-comment) Alternative approaches that could be implemented in the future: (important-comment) - Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment) @@ -668,49 +661,42 @@ def fetch_one( - Field path configuration: Allow specifying which response field contains the record (important-comment) for APIs that wrap single records in envelopes like {"data": {...}} (important-comment) """ + # Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice + empty_stream_slice = StreamSlice(partition={}, cursor_slice={}) + # Get the base path from the requester base_path = self.requester.get_path( stream_state={}, - stream_slice=StreamSlice(partition={}, cursor_slice={}), + stream_slice=empty_stream_slice, next_page_token=None, ) - if isinstance(pk_value, str): - fetch_path = f"{base_path.rstrip('/')}/{str(pk_value).lstrip('/')}" - elif isinstance(pk_value, Mapping): - sorted_values = [str(pk_value[key]).lstrip("/") for key in sorted(pk_value.keys())] - pk_path_segment = "/".join(sorted_values) - fetch_path = f"{base_path.rstrip('/')}/{pk_path_segment}" - else: - raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}") - - # Single-record fetch doesn't involve partitioning, so we pass an empty StreamSlice - stream_slice = StreamSlice(partition={}, cursor_slice={}) + fetch_path = f"{base_path.rstrip('/')}/{pk_value.lstrip('/')}" # send_request() may return None when the error handler chooses to IGNORE a response response: requests.Response | None = self.requester.send_request( path=fetch_path, stream_state={}, - stream_slice=stream_slice, + stream_slice=empty_stream_slice, next_page_token=None, request_headers=self._request_headers( stream_state={}, - stream_slice=stream_slice, + stream_slice=empty_stream_slice, next_page_token=None, ), request_params=self._request_params( stream_state={}, - stream_slice=stream_slice, + stream_slice=empty_stream_slice, next_page_token=None, ), request_body_data=self._request_body_data( stream_state={}, - stream_slice=stream_slice, + stream_slice=empty_stream_slice, next_page_token=None, ), request_body_json=self._request_body_json( stream_state={}, - stream_slice=stream_slice, + stream_slice=empty_stream_slice, next_page_token=None, ), log_formatter=self.log_formatter, @@ -725,7 +711,7 @@ def fetch_one( response=response, stream_state={}, records_schema=records_schema, - stream_slice=stream_slice, + stream_slice=empty_stream_slice, next_page_token=None, ) diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 39b94ee33..c2d633a4a 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -91,21 +91,19 @@ def check_availability(self) -> StreamAvailability: :return: If the stream is available and if not, why """ - def fetch_record(self, pk_value: Any) -> Mapping[str, Any]: + def fetch_record(self, pk_value: str) -> Mapping[str, Any]: """ Fetch a single record by primary key value. Args: - pk_value: The primary key value. Can be: - - str: For simple single-field primary keys (e.g., "123") - - Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"}) + pk_value: The primary key value as a string (e.g., "123") Returns: The fetched record as a dict Raises: NotImplementedError: If the stream doesn't support fetching individual records - RecordNotFoundException: If the record is not found (404 response) + RecordNotFoundException: If the record is not found """ raise NotImplementedError( f"Stream {self.name} does not support fetching individual records. " diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index abf1d6449..157e2eb5f 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1680,45 +1680,6 @@ def test_fetch_one_simple_pk(): assert call_kwargs["path"] == "posts/123" -def test_fetch_one_composite_pk(): - """Test fetch_one with a composite primary key (dict).""" - requester = MagicMock() - requester.get_path.return_value = "companies" - - response = requests.Response() - response.status_code = 200 - response._content = json.dumps( - {"company_id": "123", "property": "status", "value": "active"} - ).encode("utf-8") - - requester.send_request.return_value = response - - record_selector = MagicMock() - record_selector.select_records.return_value = [ - Record( - data={"company_id": "123", "property": "status", "value": "active"}, - stream_name="companies", - associated_slice=None, - ) - ] - - retriever = SimpleRetriever( - name="companies", - primary_key=["company_id", "property"], - requester=requester, - record_selector=record_selector, - parameters={}, - config={}, - ) - - result = retriever.fetch_one({"company_id": "123", "property": "status"}, records_schema={}) - - assert result == {"company_id": "123", "property": "status", "value": "active"} - requester.send_request.assert_called_once() - call_kwargs = requester.send_request.call_args[1] - assert call_kwargs["path"] == "companies/123/status" - - def test_fetch_one_not_found(): """Test fetch_one raises RecordNotFoundException when record is not found (404).""" requester = MagicMock() @@ -1774,7 +1735,7 @@ def test_fetch_one_server_error(): def test_fetch_one_invalid_pk_type(): - """Test fetch_one raises ValueError for invalid pk_value type.""" + """Test fetch_one with non-string pk_value (should fail type checking but test runtime behavior).""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1789,10 +1750,8 @@ def test_fetch_one_invalid_pk_type(): config={}, ) - with pytest.raises(ValueError) as exc_info: - retriever.fetch_one(123, records_schema={}) - - assert "pk_value must be a string or dict" in str(exc_info.value) + with pytest.raises(AttributeError): + retriever.fetch_one(123, records_schema={}) # type: ignore def test_fetch_one_no_response(): From 8a523900a02a604da1ba1be16ac757fb0e7ac53c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:56:21 +0000 Subject: [PATCH 12/16] refactor: Rename fetch_one to _fetch_one to mark as internal API The _fetch_one method on SimpleRetriever is now prefixed with underscore to indicate it's not part of the public API. External callers should use Stream.fetch_record() or Source.fetch_record() instead. Addresses feedback from @aaronsteers on PR #846 Co-Authored-By: AJ Steers --- .../sources/declarative/concurrent_declarative_source.py | 2 +- .../sources/declarative/retrievers/simple_retriever.py | 2 +- .../sources/declarative/retrievers/test_simple_retriever.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index e146cacc3..90fcd96e8 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -560,7 +560,7 @@ def fetch_record( "Only streams with SimpleRetriever currently support this operation." ) - return declarative_stream.retriever.fetch_one( + return declarative_stream.retriever._fetch_one( pk_value=pk_value, records_schema=declarative_stream.get_json_schema() ) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 3878515e8..e2a53b269 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -627,7 +627,7 @@ def _to_partition_key(to_serialize: Any) -> str: # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) - def fetch_one( + def _fetch_one( self, pk_value: str, records_schema: Mapping[str, Any], diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 157e2eb5f..1fa44cb10 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1672,7 +1672,7 @@ def test_fetch_one_simple_pk(): config={}, ) - result = retriever.fetch_one("123", records_schema={}) + result = retriever._fetch_one("123", records_schema={}) assert result == {"id": "123", "title": "Test Post"} requester.send_request.assert_called_once() @@ -1702,7 +1702,7 @@ def test_fetch_one_not_found(): ) with pytest.raises(RecordNotFoundException) as exc_info: - retriever.fetch_one("999", records_schema={}) + retriever._fetch_one("999", records_schema={}) assert "999" in str(exc_info.value) @@ -1751,7 +1751,7 @@ def test_fetch_one_invalid_pk_type(): ) with pytest.raises(AttributeError): - retriever.fetch_one(123, records_schema={}) # type: ignore + retriever._fetch_one(123, records_schema={}) # type: ignore def test_fetch_one_no_response(): From 4386a351d46eb9c633208e006c4cacc4d96a50f9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Nov 2025 23:18:55 +0000 Subject: [PATCH 13/16] fix: Address PR feedback from brianjlai on fetch_one implementation - Make fetch_one public by removing underscore prefix (was _fetch_one) - Fix extractor/selector logic to handle single-object responses - Most REST APIs return single objects for GET /resource/{id} - Now tries extractor first, then falls back to response.json() - Add test for single-object response pattern - Update all test references from _fetch_one to fetch_one Addresses comments from airbytehq/airbyte-python-cdk#846 Co-Authored-By: AJ Steers --- .../concurrent_declarative_source.py | 2 +- .../retrievers/simple_retriever.py | 21 +++++--- .../retrievers/test_simple_retriever.py | 51 +++++++++++++++---- 3 files changed, 56 insertions(+), 18 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 90fcd96e8..e146cacc3 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -560,7 +560,7 @@ def fetch_record( "Only streams with SimpleRetriever currently support this operation." ) - return declarative_stream.retriever._fetch_one( + return declarative_stream.retriever.fetch_one( pk_value=pk_value, records_schema=declarative_stream.get_json_schema() ) diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index e2a53b269..24de7439b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -627,7 +627,7 @@ def _to_partition_key(to_serialize: Any) -> str: # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True) - def _fetch_one( + def fetch_one( self, pk_value: str, records_schema: Mapping[str, Any], @@ -716,12 +716,19 @@ def _fetch_one( ) first_record: Record | None = next(iter(records_iter), None) - if not first_record: - raise RecordNotFoundException( - f"Record with primary key {pk_value} not found (empty response)" - ) - - return dict(first_record.data) + if first_record: + return dict(first_record.data) + + try: + response_body = response.json() + if isinstance(response_body, dict) and response_body: + return response_body + except Exception: + pass + + raise RecordNotFoundException( + f"Record with primary key {pk_value} not found (empty response)" + ) def _deep_merge( diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 1fa44cb10..65804b1ad 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1647,7 +1647,7 @@ def _mock_paginator(): return paginator -def test_fetch_one_simple_pk(): +def testfetch_one_simple_pk(): """Test fetch_one with a simple string primary key.""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1672,7 +1672,7 @@ def test_fetch_one_simple_pk(): config={}, ) - result = retriever._fetch_one("123", records_schema={}) + result = retriever.fetch_one("123", records_schema={}) assert result == {"id": "123", "title": "Test Post"} requester.send_request.assert_called_once() @@ -1680,7 +1680,7 @@ def test_fetch_one_simple_pk(): assert call_kwargs["path"] == "posts/123" -def test_fetch_one_not_found(): +def testfetch_one_not_found(): """Test fetch_one raises RecordNotFoundException when record is not found (404).""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1702,12 +1702,12 @@ def test_fetch_one_not_found(): ) with pytest.raises(RecordNotFoundException) as exc_info: - retriever._fetch_one("999", records_schema={}) + retriever.fetch_one("999", records_schema={}) assert "999" in str(exc_info.value) -def test_fetch_one_server_error(): +def testfetch_one_server_error(): """Test fetch_one propagates non-404 errors.""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1734,7 +1734,7 @@ def test_fetch_one_server_error(): assert "Server error" in str(exc_info.value) -def test_fetch_one_invalid_pk_type(): +def testfetch_one_invalid_pk_type(): """Test fetch_one with non-string pk_value (should fail type checking but test runtime behavior).""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1751,10 +1751,10 @@ def test_fetch_one_invalid_pk_type(): ) with pytest.raises(AttributeError): - retriever._fetch_one(123, records_schema={}) # type: ignore + retriever.fetch_one(123, records_schema={}) # type: ignore -def test_fetch_one_no_response(): +def testfetch_one_no_response(): """Test fetch_one raises RecordNotFoundException when response is None.""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1777,8 +1777,8 @@ def test_fetch_one_no_response(): assert "123" in str(exc_info.value) -def test_fetch_one_empty_records(): - """Test fetch_one raises RecordNotFoundException when no records are returned.""" +def testfetch_one_empty_records(): + """Test fetch_one raises RecordNotFoundException when response is truly empty.""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1804,3 +1804,34 @@ def test_fetch_one_empty_records(): retriever.fetch_one("123", records_schema={}) assert "123" in str(exc_info.value) + + +def testfetch_one_single_object_response(): + """Test fetch_one handles single object responses (most common pattern for GET /resource/{id}).""" + requester = MagicMock() + requester.get_path.return_value = "posts" + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"id": "123", "title": "Test Post"}).encode("utf-8") + + requester.send_request.return_value = response + + record_selector = MagicMock() + record_selector.select_records.return_value = [] + + retriever = SimpleRetriever( + name="posts", + primary_key="id", + requester=requester, + record_selector=record_selector, + parameters={}, + config={}, + ) + + result = retriever.fetch_one("123", records_schema={}) + + assert result == {"id": "123", "title": "Test Post"} + requester.send_request.assert_called_once() + call_kwargs = requester.send_request.call_args[1] + assert call_kwargs["path"] == "posts/123" From 28b92861936219d100ec9806f1ece650e7f4fa3f Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 13 Nov 2025 15:45:21 -0800 Subject: [PATCH 14/16] Update unit_tests/sources/declarative/retrievers/test_simple_retriever.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../sources/declarative/retrievers/test_simple_retriever.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 65804b1ad..1418351d0 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1777,7 +1777,7 @@ def testfetch_one_no_response(): assert "123" in str(exc_info.value) -def testfetch_one_empty_records(): +def test_fetch_one_empty_records(): """Test fetch_one raises RecordNotFoundException when response is truly empty.""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1805,7 +1805,6 @@ def testfetch_one_empty_records(): assert "123" in str(exc_info.value) - def testfetch_one_single_object_response(): """Test fetch_one handles single object responses (most common pattern for GET /resource/{id}).""" requester = MagicMock() From dfeeec229a0f8f8bbc5fbaf5bc1ad35760398512 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 13 Nov 2025 15:47:44 -0800 Subject: [PATCH 15/16] Update unit_tests/sources/declarative/retrievers/test_simple_retriever.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../sources/declarative/retrievers/test_simple_retriever.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 1418351d0..9855206d1 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1680,7 +1680,7 @@ def testfetch_one_simple_pk(): assert call_kwargs["path"] == "posts/123" -def testfetch_one_not_found(): +def test_fetch_one_not_found(): """Test fetch_one raises RecordNotFoundException when record is not found (404).""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1706,7 +1706,6 @@ def testfetch_one_not_found(): assert "999" in str(exc_info.value) - def testfetch_one_server_error(): """Test fetch_one propagates non-404 errors.""" requester = MagicMock() From 82add1be50f99af010b5c53230c9d3cbc0c19f48 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 13 Nov 2025 16:21:12 -0800 Subject: [PATCH 16/16] Update unit_tests/sources/declarative/retrievers/test_simple_retriever.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../sources/declarative/retrievers/test_simple_retriever.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 9855206d1..12d72ad9b 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1706,7 +1706,7 @@ def test_fetch_one_not_found(): assert "999" in str(exc_info.value) -def testfetch_one_server_error(): +def test_fetch_one_server_error(): """Test fetch_one propagates non-404 errors.""" requester = MagicMock() requester.get_path.return_value = "posts" @@ -1732,7 +1732,6 @@ def testfetch_one_server_error(): assert "Server error" in str(exc_info.value) - def testfetch_one_invalid_pk_type(): """Test fetch_one with non-string pk_value (should fail type checking but test runtime behavior).""" requester = MagicMock()