From 1d4e6a7badb1b15df118d20a3606a678840961d5 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 12 Nov 2025 10:16:33 -0500 Subject: [PATCH 1/2] enable more logging --- .../concurrent_source/concurrent_source.py | 30 +++++++++++-------- .../streams/concurrent/partition_enqueuer.py | 8 +++++ .../streams/concurrent/partition_reader.py | 6 ++++ .../sources/streams/http/http_client.py | 27 +++++------------ 4 files changed, 39 insertions(+), 32 deletions(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..b8f1b640a 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -4,7 +4,7 @@ import concurrent import logging -from queue import Queue +from queue import Queue, Empty from typing import Iterable, Iterator, List, Optional from airbyte_cdk.models import AirbyteMessage @@ -143,17 +143,23 @@ def _consume_from_queue( queue: Queue[QueueItem], concurrent_stream_processor: ConcurrentReadProcessor, ) -> Iterable[AirbyteMessage]: - while airbyte_message_or_record_or_exception := queue.get(): - yield from self._handle_item( - airbyte_message_or_record_or_exception, - concurrent_stream_processor, - ) - # In the event that a partition raises an exception, anything remaining in - # the queue will be missed because is_done() can raise an exception and exit - # out of this loop before remaining items are consumed - if queue.empty() and concurrent_stream_processor.is_done(): - # all partitions were generated and processed. we're done here - break + done = False + while not done: + try: + while airbyte_message_or_record_or_exception := queue.get(block=True, timeout=60.0 * 5): + yield from self._handle_item( + airbyte_message_or_record_or_exception, + concurrent_stream_processor, + ) + # In the event that a partition raises an exception, anything remaining in + # the queue will be missed because is_done() can raise an exception and exit + # out of this loop before remaining items are consumed + if queue.empty() and concurrent_stream_processor.is_done(): + # all partitions were generated and processed. we're done here + done = True + break + except Empty: + self._logger.info("No result from the queue for the past 5 minutes. Will try again...") def _handle_item( self, diff --git a/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py b/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py index a4dd81f29..0d765f89f 100644 --- a/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py +++ b/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import logging import time from queue import Queue @@ -12,6 +13,8 @@ from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem +LOGGER = logging.getLogger(f"airbyte.PartitionEnqueuer") + class PartitionEnqueuer: """ @@ -42,7 +45,9 @@ def generate_partitions(self, stream: AbstractStream) -> None: This method is meant to be called in a separate thread. """ + try: + LOGGER.info(f"Starting partition generation for stream {stream.name}") for partition in stream.generate_partitions(): # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the @@ -58,7 +63,10 @@ def generate_partitions(self, stream: AbstractStream) -> None: while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit(): time.sleep(self._sleep_time_in_seconds) self._queue.put(partition) + + LOGGER.info(f"Partition generation complete for stream {stream.name}") self._queue.put(PartitionGenerationCompletedSentinel(stream)) except Exception as e: + LOGGER.info(f"Error during partition generation for stream {stream.name}") self._queue.put(StreamThreadException(e, stream.name)) self._queue.put(PartitionGenerationCompletedSentinel(stream)) diff --git a/airbyte_cdk/sources/streams/concurrent/partition_reader.py b/airbyte_cdk/sources/streams/concurrent/partition_reader.py index 0edc5056a..3a9500579 100644 --- a/airbyte_cdk/sources/streams/concurrent/partition_reader.py +++ b/airbyte_cdk/sources/streams/concurrent/partition_reader.py @@ -14,6 +14,8 @@ ) from airbyte_cdk.sources.utils.slice_logger import SliceLogger +LOGGER = logging.getLogger(f"airbyte.PartitionReader") + # Since moving all the connector builder workflow to the concurrent CDK which required correct ordering # of grouping log messages onto the main write thread using the ConcurrentMessageRepository, this @@ -73,6 +75,7 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None: :return: None """ try: + LOGGER.info(f"Starting to read from stream {partition.stream_name()} and partition {partition.to_slice()}") if self._partition_logger: self._partition_logger.log(partition) @@ -80,7 +83,10 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None: self._queue.put(record) cursor.observe(record) cursor.close_partition(partition) + + LOGGER.info(f"Reading complete for stream {partition.stream_name()} and partition {partition.to_slice()}") self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) except Exception as e: + LOGGER.info(f"Error while reading from {partition.stream_name()} and partition {partition.to_slice()}") self._queue.put(StreamThreadException(e, partition.stream_name())) self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL)) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index e9fc5add2..2dfa76570 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -5,6 +5,7 @@ import logging import os import urllib +import uuid from pathlib import Path from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union @@ -329,9 +330,9 @@ def _send( if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase): self._session.auth(request) - self._logger.debug( - "Making outbound API request", - extra={"headers": request.headers, "url": request.url, "request_body": request.body}, + request_id = str(uuid.uuid4()) + self._logger.info( + f"[{request_id}] Making outbound API request to {request.url}", ) response: Optional[requests.Response] = None @@ -346,23 +347,9 @@ def _send( response if response is not None else exc ) - # Evaluation of response.text can be heavy, for example, if streaming a large response - # Do it only in debug mode - if self._logger.isEnabledFor(logging.DEBUG) and response is not None: - if request_kwargs.get("stream"): - self._logger.debug( - "Receiving response, but not logging it as the response is streamed", - extra={"headers": response.headers, "status": response.status_code}, - ) - else: - self._logger.debug( - "Receiving response", - extra={ - "headers": response.headers, - "status": response.status_code, - "body": response.text, - }, - ) + self._logger.info( + f"[{request_id}] Receiving response from {request.url}" + f" with exception {type(exc)}" if exc else "" + ) # Request/response logging for declarative cdk if ( From 4aed10df6fcf4dac37c8690fb8338c2cd08dc68a Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 12 Nov 2025 10:26:48 -0500 Subject: [PATCH 2/2] fix --- airbyte_cdk/sources/streams/http/http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 2dfa76570..344aa9c1c 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -348,7 +348,7 @@ def _send( ) self._logger.info( - f"[{request_id}] Receiving response from {request.url}" + f" with exception {type(exc)}" if exc else "" + f"[{request_id}] Receiving response from {request.url}{f' with exception {type(exc)}' if exc else ''}" ) # Request/response logging for declarative cdk