Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions airbyte_cdk/sources/concurrent_source/concurrent_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import concurrent
import logging
from queue import Queue
from queue import Queue, Empty
from typing import Iterable, Iterator, List, Optional

from airbyte_cdk.models import AirbyteMessage
Expand Down Expand Up @@ -143,17 +143,23 @@ def _consume_from_queue(
queue: Queue[QueueItem],
concurrent_stream_processor: ConcurrentReadProcessor,
) -> Iterable[AirbyteMessage]:
while airbyte_message_or_record_or_exception := queue.get():
yield from self._handle_item(
airbyte_message_or_record_or_exception,
concurrent_stream_processor,
)
# In the event that a partition raises an exception, anything remaining in
# the queue will be missed because is_done() can raise an exception and exit
# out of this loop before remaining items are consumed
if queue.empty() and concurrent_stream_processor.is_done():
# all partitions were generated and processed. we're done here
break
done = False
while not done:
try:
while airbyte_message_or_record_or_exception := queue.get(block=True, timeout=60.0 * 5):
yield from self._handle_item(
airbyte_message_or_record_or_exception,
concurrent_stream_processor,
)
# In the event that a partition raises an exception, anything remaining in
# the queue will be missed because is_done() can raise an exception and exit
# out of this loop before remaining items are consumed
if queue.empty() and concurrent_stream_processor.is_done():
# all partitions were generated and processed. we're done here
done = True
break
except Empty:
self._logger.info("No result from the queue for the past 5 minutes. Will try again...")

def _handle_item(
self,
Expand Down
8 changes: 8 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
import time
from queue import Queue

Expand All @@ -12,6 +13,8 @@
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem

LOGGER = logging.getLogger(f"airbyte.PartitionEnqueuer")


class PartitionEnqueuer:
"""
Expand Down Expand Up @@ -42,7 +45,9 @@ def generate_partitions(self, stream: AbstractStream) -> None:

This method is meant to be called in a separate thread.
"""

try:
LOGGER.info(f"Starting partition generation for stream {stream.name}")
for partition in stream.generate_partitions():
# Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
# we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
Expand All @@ -58,7 +63,10 @@ def generate_partitions(self, stream: AbstractStream) -> None:
while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit():
time.sleep(self._sleep_time_in_seconds)
self._queue.put(partition)

LOGGER.info(f"Partition generation complete for stream {stream.name}")
self._queue.put(PartitionGenerationCompletedSentinel(stream))
except Exception as e:
LOGGER.info(f"Error during partition generation for stream {stream.name}")
self._queue.put(StreamThreadException(e, stream.name))
self._queue.put(PartitionGenerationCompletedSentinel(stream))
6 changes: 6 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/partition_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
)
from airbyte_cdk.sources.utils.slice_logger import SliceLogger

LOGGER = logging.getLogger(f"airbyte.PartitionReader")


# Since moving all the connector builder workflow to the concurrent CDK which required correct ordering
# of grouping log messages onto the main write thread using the ConcurrentMessageRepository, this
Expand Down Expand Up @@ -73,14 +75,18 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None:
:return: None
"""
try:
LOGGER.info(f"Starting to read from stream {partition.stream_name()} and partition {partition.to_slice()}")
if self._partition_logger:
self._partition_logger.log(partition)

for record in partition.read():
self._queue.put(record)
cursor.observe(record)
cursor.close_partition(partition)

LOGGER.info(f"Reading complete for stream {partition.stream_name()} and partition {partition.to_slice()}")
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
except Exception as e:
LOGGER.info(f"Error while reading from {partition.stream_name()} and partition {partition.to_slice()}")
self._queue.put(StreamThreadException(e, partition.stream_name()))
self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
27 changes: 7 additions & 20 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import urllib
import uuid
from pathlib import Path
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union

Expand Down Expand Up @@ -329,9 +330,9 @@ def _send(
if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
self._session.auth(request)

self._logger.debug(
"Making outbound API request",
extra={"headers": request.headers, "url": request.url, "request_body": request.body},
request_id = str(uuid.uuid4())
self._logger.info(
f"[{request_id}] Making outbound API request to {request.url}",
)

response: Optional[requests.Response] = None
Expand All @@ -346,23 +347,9 @@ def _send(
response if response is not None else exc
)

# Evaluation of response.text can be heavy, for example, if streaming a large response
# Do it only in debug mode
if self._logger.isEnabledFor(logging.DEBUG) and response is not None:
if request_kwargs.get("stream"):
self._logger.debug(
"Receiving response, but not logging it as the response is streamed",
extra={"headers": response.headers, "status": response.status_code},
)
else:
self._logger.debug(
"Receiving response",
extra={
"headers": response.headers,
"status": response.status_code,
"body": response.text,
},
)
self._logger.info(
f"[{request_id}] Receiving response from {request.url}{f' with exception {type(exc)}' if exc else ''}"
)

# Request/response logging for declarative cdk
if (
Expand Down
Loading