Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

This is a beta release, notes to change to full release later on in changelog. Changes included are:

- Enforced type hinting for all interaces
- Enforced type hinting for all interfaces
- Handle OAuth Token Refreshes
- Added black and isort linting rules and enforcement to codebase
- Fix support for wrapped Avro unions
Expand Down Expand Up @@ -50,7 +50,7 @@ Starting with __confluent-kafka-python 2.12.0__, the next generation consumer gr

**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided.

### AsyncIO Producer (experimental)
### AsyncIO Producer
Introduces beta class `AIOProducer` for asynchronous message production in asyncio applications.

#### Added
Expand Down
8 changes: 4 additions & 4 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ uv sync --extra dev --extra tests
## Project layout

- `src/confluent_kafka/` — core sync client APIs
- `src/confluent_kafka/experimental/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated)
- `src/confluent_kafka/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated)
- `src/confluent_kafka/schema_registry/` — Schema Registry clients and serdes
- `tests/` — unit and integration tests (including async producer tests)
- `examples/` — runnable samples (includes asyncio example)
Expand Down Expand Up @@ -120,14 +120,14 @@ python3 tools/unasync.py --check

If you make any changes to the async code (in `src/confluent_kafka/schema_registry/_async` and `tests/integration/schema_registry/_async`), you **must** run this script to generate the sync counterparts (in `src/confluent_kafka/schema_registry/_sync` and `tests/integration/schema_registry/_sync`). Otherwise, this script will be run in CI with the `--check` flag and fail the build.

Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/experimental/aio/` are first-class asyncio implementations and are not generated using `unasync`.
Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/aio/` are first-class asyncio implementations and are not generated using `unasync`.

## AsyncIO Producer development (AIOProducer)

Source:

- `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py` (public async API)
- Internal modules in `src/confluent_kafka/experimental/aio/producer/` and helpers in `src/confluent_kafka/experimental/aio/_common.py`
- `src/confluent_kafka/aio/producer/_AIOProducer.py` (public async API)
- Internal modules in `src/confluent_kafka/aio/producer/` and helpers in `src/confluent_kafka/aio/_common.py`

For a complete usage example, see [`examples/asyncio_example.py`](examples/asyncio_example.py).

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Unlike the basic Apache Kafka Python client, `confluent-kafka-python` provides:

- **High Performance & Reliability**: Built on [`librdkafka`](https://github.com/confluentinc/librdkafka), the battle-tested C client for Apache Kafka, ensuring maximum throughput, low latency, and stability. The client is supported by Confluent and is trusted in mission-critical production environments.
- **Comprehensive Kafka Support**: Full support for the Kafka protocol, transactions, and administration APIs.
- **Experimental; AsyncIO Producer**: An experimental fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`.
- **AsyncIO Producer**: A fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`.
- **Seamless Schema Registry Integration**: Synchronous and asynchronous clients for Confluent Schema Registry to handle schema management and serialization (Avro, Protobuf, JSON Schema).
- **Improved Error Handling**: Detailed, context-aware error messages and exceptions to speed up debugging and troubleshooting.
- **[Confluent Cloud] Automatic Zone Detection**: Producers automatically connect to brokers in the same availability zone, reducing latency and data transfer costs without requiring manual configuration.
Expand All @@ -54,13 +54,13 @@ Additional examples can be found in the [examples](examples) directory or the [c
Also see the [Python client docs](https://docs.confluent.io/kafka-clients/python/current/overview.html) and the [API reference](https://docs.confluent.io/kafka-clients/python/current/).

Finally, the [tests](tests) are useful as a reference for example usage.
### AsyncIO Producer (experimental)
### AsyncIO Producer

Use the AsyncIO `Producer` inside async applications to avoid blocking the event loop.

```python
import asyncio
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.aio import AIOProducer

async def main():
p = AIOProducer({"bootstrap.servers": "mybroker"})
Expand Down Expand Up @@ -177,7 +177,7 @@ producer.flush()
Use the `AsyncSchemaRegistryClient` and `Async` serializers with `AIOProducer` and `AIOConsumer`. The configuration is the same as the synchronous client.

```python
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer

Expand Down
8 changes: 4 additions & 4 deletions aio_producer_simple_diagram.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ The `AIOProducer` implements a multi-component architecture designed for high-pe

### Source Code Location

- **Main Implementation**: `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py`
- **Supporting Modules**: `src/confluent_kafka/experimental/aio/producer/` directory
- **Common Utilities**: `src/confluent_kafka/experimental/aio/_common.py`
- **Main Implementation**: `src/confluent_kafka/aio/producer/_AIOProducer.py`
- **Supporting Modules**: `src/confluent_kafka/aio/producer/` directory
- **Common Utilities**: `src/confluent_kafka/aio/_common.py`

### Design Principles

Expand All @@ -103,4 +103,4 @@ Unlike the synchronous `Producer` which uses polling-based callbacks, the `AIOPr

- [AsyncIO Producer Usage Examples](examples/asyncio_example.py) - Comprehensive usage patterns and best practices.
- [AsyncIO Producer Development Guide](DEVELOPER.md#asyncio-producer-development-aioproducer) - Implementation details for contributors.
- [Main README AsyncIO Section](README.md#asyncio-producer-experimental) - Getting started with AsyncIO producer.
- [Main README AsyncIO Section](README.md#asyncio-produce) - Getting started with AsyncIO producer.
12 changes: 6 additions & 6 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The scripts in this directory provide various examples of using the Confluent Py

## AsyncIO Examples

- [asyncio_example.py](asyncio_example.py): Experimental comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns.
- [asyncio_example.py](asyncio_example.py): Comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns.
- [asyncio_avro_producer.py](asyncio_avro_producer.py): Minimal AsyncIO Avro producer using `AsyncSchemaRegistryClient` and `AsyncAvroSerializer` (supports Confluent Cloud using `--sr-api-key`/`--sr-api-secret`).

**Architecture:** For implementation details and component design, see the [AIOProducer Architecture Overview](../aio_producer_simple_diagram.md).
Expand All @@ -25,7 +25,7 @@ The AsyncIO producer works seamlessly with popular Python web frameworks:

```python
from fastapi import FastAPI
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.aio import AIOProducer

app = FastAPI()
producer = None
Expand All @@ -46,7 +46,7 @@ async def create_event(data: dict):

```python
from aiohttp import web
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.aio import AIOProducer

async def init_app():
app = web.Application()
Expand All @@ -67,7 +67,7 @@ For more details, see [Integrating Apache Kafka With Python Asyncio Web Applicat
The AsyncIO producer and consumer work seamlessly with async Schema Registry serializers:

```python
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer

Expand Down Expand Up @@ -164,15 +164,15 @@ producer_conf = {
producer = Producer(producer_conf)
```

### Asynchronous usage (Experimental AsyncIO)
### Asynchronous usage

Use async serializers with `AIOProducer` and `AIOConsumer`. Note that you must
instantiate the serializer and then call it to serialize the data *before*
producing.

```python
# From examples/README.md
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer

Expand Down