Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1074,8 +1074,10 @@ addition to the properties dictated by the underlying librdkafka C library:
commit error, or None on success. *list(TopicPartition)* is the list of partitions with their committed
offsets or per-partition errors.

* ``logger=logging.Handler`` kwarg: forward logs from the Kafka client to the
provided ``logging.Handler`` instance.
* ``logger=logging.Logger`` kwarg: forward logs from the Kafka client to the
provided ``logging.Logger`` instance.
The client calls ``logger.log(level, msg, *args)`` internally, which matches
the ``logging.Logger`` interface (not ``logging.Handler``).
To avoid spontaneous calls from non-Python threads the log messages
will only be forwarded when ``client.poll()`` or ``producer.flush()`` are called.
For example:
Expand Down
25 changes: 23 additions & 2 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ maintenance burden and get type hints directly from the implementation.
"""

import builtins
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload

from typing_extensions import Literal, Self
Expand Down Expand Up @@ -325,6 +326,8 @@ class Producer:
self,
config: Dict[str, Any],
/,
*,
logger: Optional[logging.Logger] = None,
**kwargs: Any
) -> None:
"""
Expand All @@ -333,6 +336,10 @@ class Producer:

Args:
config: Configuration dictionary.
logger: Optional ``logging.Logger`` instance to forward Kafka client
log messages to. The client calls ``logger.log(level, msg, *args)``
internally. Log messages are only forwarded when
``producer.poll()`` or ``producer.flush()`` are called.
**kwargs: Additional config as keyword args (overrides dict values).

Example:
Expand All @@ -341,11 +348,15 @@ class Producer:
"""
...
@overload
def __init__(self, **config: Any) -> None:
def __init__(self, *, logger: Optional[logging.Logger] = None, **config: Any) -> None:
"""
Create Producer with keyword arguments only.

Args:
logger: Optional ``logging.Logger`` instance to forward Kafka client
log messages to. The client calls ``logger.log(level, msg, *args)``
internally. Log messages are only forwarded when
``producer.poll()`` or ``producer.flush()`` are called.
**config: Configuration as keyword args.
Note: Use underscores (bootstrap_servers) not dots (bootstrap.servers) in kwargs.

Expand Down Expand Up @@ -408,6 +419,8 @@ class Consumer:
self,
config: dict[str, Any],
/,
*,
logger: Optional[logging.Logger] = None,
**kwargs: Any
) -> None:
"""
Expand All @@ -416,6 +429,10 @@ class Consumer:

Args:
config: Configuration dictionary. Must include 'group.id'.
logger: Optional ``logging.Logger`` instance to forward Kafka client
log messages to. The client calls ``logger.log(level, msg, *args)``
internally. Log messages are only forwarded when
``consumer.poll()`` is called.
**kwargs: Additional config as keyword args (overrides dict values).

Example:
Expand All @@ -424,11 +441,15 @@ class Consumer:
"""
...
@overload
def __init__(self, **config: Any) -> None:
def __init__(self, *, logger: Optional[logging.Logger] = None, **config: Any) -> None:
"""
Create Consumer with keyword arguments only.

Args:
logger: Optional ``logging.Logger`` instance to forward Kafka client
log messages to. The client calls ``logger.log(level, msg, *args)``
internally. Log messages are only forwarded when
``consumer.poll()`` is called.
**config: Configuration as keyword args. Must include group_id.
Note: Use underscores (group_id) not dots (group.id) in kwargs.

Expand Down