feat(drivers): add async Redpanda driver using aiokafka#1485
feat(drivers): add async Redpanda driver using aiokafka#1485phenobarbital merged 6 commits intomasterfrom
Conversation
Reviewer's GuideAdds a new async Redpanda (Kafka-compatible) driver built on aiokafka, wires it into the documented list of supported backends, and provides async tests that validate basic connect/publish/consume behavior via mocked aiokafka classes. Sequence diagram for publishing a message with the new redpanda driversequenceDiagram
actor Developer
participant RedpandaDriver as redpanda
participant AIOKafkaProducer
participant RedpandaCluster as Redpanda_Kafka
Developer->>RedpandaDriver: execute(message, topic, key, headers, ...)
alt Producer not initialized
RedpandaDriver->>RedpandaDriver: _bootstrap_servers()
RedpandaDriver->>RedpandaDriver: _load_aiokafka()
RedpandaDriver->>AIOKafkaProducer: __init__(bootstrap_servers, client_id, loop, ...)
RedpandaDriver->>AIOKafkaProducer: start()
AIOKafkaProducer-->>RedpandaDriver: producer_ready
end
RedpandaDriver->>RedpandaDriver: _encode_value(message)
RedpandaDriver->>RedpandaDriver: _encode_value(key)
RedpandaDriver->>AIOKafkaProducer: send_and_wait(topic, payload, key, partition, headers, timestamp_ms)
AIOKafkaProducer->>RedpandaCluster: append_record(topic, partition, payload)
RedpandaCluster-->>AIOKafkaProducer: RecordMetadata(topic, partition, offset, timestamp)
AIOKafkaProducer-->>RedpandaDriver: RecordMetadata
RedpandaDriver->>RedpandaDriver: _serializer(result_dict, None)
RedpandaDriver-->>Developer: (result, error=None)
Sequence diagram for consuming messages with the new redpanda driversequenceDiagram
actor Developer
participant RedpandaDriver as redpanda
participant AIOKafkaConsumer
participant RedpandaCluster as Redpanda_Kafka
Developer->>RedpandaDriver: query(topic_or_sentence, timeout_ms, max_records, ...)
RedpandaDriver->>RedpandaDriver: _load_aiokafka()
RedpandaDriver->>AIOKafkaConsumer: __init__(topic, bootstrap_servers, group_id, client_id, ...)
RedpandaDriver->>AIOKafkaConsumer: start()
AIOKafkaConsumer-->>RedpandaDriver: consumer_ready
RedpandaDriver->>AIOKafkaConsumer: getmany(timeout_ms, max_records)
AIOKafkaConsumer->>RedpandaCluster: fetch_batches(topic, partitions)
RedpandaCluster-->>AIOKafkaConsumer: records_by_partition
AIOKafkaConsumer-->>RedpandaDriver: records dict
RedpandaDriver->>RedpandaDriver: iterate records, decode key/value
RedpandaDriver->>RedpandaDriver: _serializer(messages_list, None)
RedpandaDriver-->>Developer: (messages, error=None)
RedpandaDriver->>AIOKafkaConsumer: stop()
AIOKafkaConsumer-->>RedpandaDriver: stopped
Class diagram for the new async redpanda driverclassDiagram
class BaseDriver {
<<abstract>>
string _provider
string _syntax
string _dsn_template
string _dsn
dict _params
any _connection
bool _connected
any _loop
any _prepared
+__init__(dsn, loop, params, kwargs)
+_serializer(data, error)
}
class redpanda {
<<driver>>
string _provider
string _syntax
string _dsn_template
string _topic
string _group_id
string _client_id
AIOKafkaProducer _producer
AIOKafkaAdminClient _admin
+__init__(dsn, loop, params, topic, group_id, client_id, kwargs)
+connection(kwargs) async
+close() async
+use(database) async
+prepare(sentence) async
+execute(sentence, args, kwargs) async
+execute_many(sentence, args) async
+query(sentence, kwargs) async
+queryrow(sentence) async
+fetch_all(sentence, kwargs) async
+fetch_one(sentence) async
-_load_aiokafka()
-_bootstrap_servers()
-_encode_value(value)
}
class AIOKafkaProducer {
+__init__(loop, bootstrap_servers, client_id, kwargs)
+start() async
+stop() async
+send_and_wait(topic, value, key, partition, headers, timestamp_ms) async
}
class AIOKafkaConsumer {
+__init__(topic, loop, bootstrap_servers, group_id, client_id, auto_offset_reset, enable_auto_commit, kwargs)
+start() async
+stop() async
+getmany(timeout_ms, max_records) async
}
class AIOKafkaAdminClient {
+__init__(bootstrap_servers, client_id, loop, kwargs)
+close() async
}
BaseDriver <|-- redpanda
redpanda --> AIOKafkaProducer : uses
redpanda --> AIOKafkaConsumer : uses
redpanda --> AIOKafkaAdminClient : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The
_bootstrap_serversmethod usesself._dsndirectly, which will include the scheme when instantiated via a standard DSN (e.g.redpanda://host:port); consider parsing the DSN and/or consistently using_dsn_templateso the bootstrap servers string is always in<host>:<port>form. - In
query, keys and values are always decoded as UTF-8 strings, which will fail or corrupt data for non-text/binary payloads; consider returning raw bytes (or making decoding configurable) to avoid unexpectedUnicodeDecodeErrorand preserve arbitrary message contents.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `_bootstrap_servers` method uses `self._dsn` directly, which will include the scheme when instantiated via a standard DSN (e.g. `redpanda://host:port`); consider parsing the DSN and/or consistently using `_dsn_template` so the bootstrap servers string is always in `<host>:<port>` form.
- In `query`, keys and values are always decoded as UTF-8 strings, which will fail or corrupt data for non-text/binary payloads; consider returning raw bytes (or making decoding configurable) to avoid unexpected `UnicodeDecodeError` and preserve arbitrary message contents.
## Individual Comments
### Comment 1
<location> `asyncdb/drivers/redpanda.py:161-168` </location>
<code_context>
+ messages = []
+ for _, batch in records.items():
+ for msg in batch:
+ messages.append(
+ {
+ "topic": msg.topic,
+ "partition": msg.partition,
+ "offset": msg.offset,
+ "timestamp": msg.timestamp,
+ "key": msg.key.decode("utf-8") if msg.key else None,
+ "value": msg.value.decode("utf-8") if msg.value else None,
+ }
+ )
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Unconditional UTF-8 decoding of keys/values may fail for arbitrary binary messages.
Decoding `msg.key` and `msg.value` as UTF-8 will raise `UnicodeDecodeError` for non-text or differently encoded payloads, which are valid in Kafka/Redpanda. If arbitrary or binary data is possible, consider returning bytes or making decoding configurable to avoid these runtime failures.
Suggested implementation:
```python
records = await consumer.getmany(timeout_ms=timeout_ms, max_records=max_records)
messages = []
for _, batch in records.items():
for msg in batch:
messages.append(
{
"topic": msg.topic,
"partition": msg.partition,
"offset": msg.offset,
"timestamp": msg.timestamp,
"key": msg.key,
"value": msg.value,
}
)
```
If callers expect text instead of bytes, consider:
1) Documenting that `key` and `value` are returned as `bytes` and should be decoded by the caller using the appropriate encoding, or
2) Adding an optional parameter (e.g. `decode: bool = False` and `encoding: str = "utf-8"`) to this method and, when enabled, decoding with `errors="replace"` to avoid runtime failures for arbitrary binary payloads.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| messages.append( | ||
| { | ||
| "topic": msg.topic, | ||
| "partition": msg.partition, | ||
| "offset": msg.offset, | ||
| "timestamp": msg.timestamp, | ||
| "key": msg.key.decode("utf-8") if msg.key else None, | ||
| "value": msg.value.decode("utf-8") if msg.value else None, |
There was a problem hiding this comment.
suggestion (bug_risk): Unconditional UTF-8 decoding of keys/values may fail for arbitrary binary messages.
Decoding msg.key and msg.value as UTF-8 will raise UnicodeDecodeError for non-text or differently encoded payloads, which are valid in Kafka/Redpanda. If arbitrary or binary data is possible, consider returning bytes or making decoding configurable to avoid these runtime failures.
Suggested implementation:
records = await consumer.getmany(timeout_ms=timeout_ms, max_records=max_records)
messages = []
for _, batch in records.items():
for msg in batch:
messages.append(
{
"topic": msg.topic,
"partition": msg.partition,
"offset": msg.offset,
"timestamp": msg.timestamp,
"key": msg.key,
"value": msg.value,
}
)If callers expect text instead of bytes, consider:
- Documenting that
keyandvalueare returned asbytesand should be decoded by the caller using the appropriate encoding, or - Adding an optional parameter (e.g.
decode: bool = Falseandencoding: str = "utf-8") to this method and, when enabled, decoding witherrors="replace"to avoid runtime failures for arbitrary binary payloads.
There was a problem hiding this comment.
Pull request overview
This PR adds a new asynchronous driver for Redpanda (Kafka-compatible messaging) to AsyncDB, using aiokafka as the underlying client library. The driver implements the standard AsyncDB interface with producer/consumer operations mapped to execute/query methods, and includes basic test coverage using mocked aiokafka components.
Changes:
- Added redpanda driver (
asyncdb/drivers/redpanda.py) with support for publishing and consuming messages through the AsyncDB interface - Added unit tests (
tests/test_redpanda.py) with mocked aiokafka classes to test connection, publish, and consume operations - Updated README.md to list Redpanda as a supported backend with its driver alias
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 11 comments.
| File | Description |
|---|---|
| asyncdb/drivers/redpanda.py | New driver implementation mapping AsyncDB methods to AIOKafkaProducer/Consumer operations with lazy dependency loading |
| tests/test_redpanda.py | Unit tests using mock aiokafka classes to verify connection, execute (publish), query (consume), and queryrow operations |
| README.md | Documentation updates adding Redpanda to supported databases list and driver aliases section |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| servers = kwargs.pop("bootstrap_servers", self._bootstrap_servers()) | ||
| try: | ||
| self._producer = AIOKafkaProducer( | ||
| loop=self._loop, |
There was a problem hiding this comment.
The loop parameter is deprecated in aiokafka version 0.8.0+ (released in 2022). Modern versions of aiokafka automatically use the current event loop. Passing the loop parameter explicitly may cause deprecation warnings or errors in recent versions. Consider removing the loop parameter or making it conditional based on the aiokafka version.
| def _encode_value(self, value: Any) -> bytes: | ||
| if value is None: | ||
| raise DriverError("Cannot publish an empty message") | ||
| if isinstance(value, bytes): | ||
| return value | ||
| if isinstance(value, str): | ||
| return value.encode("utf-8") | ||
| return json.dumps(value).encode("utf-8") | ||
|
|
||
| async def execute(self, sentence: Any, *args, **kwargs) -> Optional[Any]: | ||
| if not self._producer: | ||
| await self.connection() | ||
| topic = kwargs.pop("topic", self._topic) | ||
| if not topic: | ||
| raise DriverError("No topic selected. Use use(<topic>) or pass topic=<topic>.") | ||
|
|
||
| key = kwargs.pop("key", None) | ||
| partition = kwargs.pop("partition", None) | ||
| headers = kwargs.pop("headers", None) | ||
| timestamp_ms = kwargs.pop("timestamp_ms", None) | ||
|
|
||
| payload = self._encode_value(sentence) | ||
| encoded_key = self._encode_value(key) if key is not None else None |
There was a problem hiding this comment.
The _encode_value method raises a DriverError when value is None, but on line 107 it's called with key which can be None. This will cause the error "Cannot publish an empty message" even though having a None key is valid in Kafka/Redpanda (only the value is required). The condition on line 107 correctly checks if key is not None before encoding, but if someone mistakenly passes key=None to _encode_value, the error message would be misleading. Consider either allowing None in _encode_value with a clearer error message, or adding a separate _encode_key method.
| "key": msg.key.decode("utf-8") if msg.key else None, | ||
| "value": msg.value.decode("utf-8") if msg.value else None, |
There was a problem hiding this comment.
The decode calls on lines 167-168 will raise a UnicodeDecodeError if msg.key or msg.value contain binary data that's not valid UTF-8. Consider adding error handling for decode errors, or providing a configurable decoder. This is especially important for Kafka/Redpanda which can contain arbitrary binary data. For example, Avro or Protobuf serialized messages would fail here.
| async def test_redpanda_query_and_queryrow(patch_aiokafka): | ||
| db = AsyncDB("redpanda", params={"host": "127.0.0.1", "port": 9092, "topic": "events"}) | ||
|
|
||
| records, error = await db.query("events") | ||
| assert error is None | ||
| assert len(records) == 1 | ||
| assert records[0]["key"] == "user-1" | ||
|
|
||
| row, error = await db.queryrow("events") | ||
| assert error is None | ||
| assert row["value"] == '{"event":"created"}' |
There was a problem hiding this comment.
The test doesn't establish a connection before calling query and queryrow operations. While the query method creates its own consumer and doesn't require a prior connection, it would be more consistent with other test patterns (e.g., test_redis.py, test_mongo.py) to explicitly call await db.connection() at the start of the test, even though it's not strictly necessary for consume operations. This also tests that the connection and close methods work correctly.
| except Exception as err: | ||
| raise DriverError(f"Error closing Redpanda connection: {err}") from err | ||
|
|
||
| async def use(self, database: str): |
There was a problem hiding this comment.
The use method on line 79 sets self._topic but is not async, while it's defined as async. If use() needs to perform any validation or initialization of the topic (e.g., checking if it exists, creating it, etc.), those operations cannot be performed in the current implementation. Consider either making use() perform actual async operations (like checking topic existence using the admin client), or remove the async keyword if it's truly a synchronous operation. Most other drivers' use() methods in the codebase are async and perform actual operations.
| async def use(self, database: str): | |
| async def use(self, database: str): | |
| await self.connection() |
| response, error = await self.execute(message, *args) | ||
| if error: | ||
| return await self._serializer(results, error) | ||
| results.append(response) | ||
| return await self._serializer(results, None) | ||
|
|
There was a problem hiding this comment.
The error handling on line 131 is unreachable. The execute method only returns [result, None] on success or raises DriverError on failure - it never returns a non-None error in the tuple. The if error check will always be False. Either remove this dead code, or modify execute to catch exceptions and return them via _serializer as [None, error] instead of raising them.
| response, error = await self.execute(message, *args) | |
| if error: | |
| return await self._serializer(results, error) | |
| results.append(response) | |
| return await self._serializer(results, None) | |
| response = await self.execute(message, *args) | |
| results.append(response) | |
| return await self._serializer(results, None) |
| max_records = kwargs.pop("max_records", 100) | ||
| consumer = AIOKafkaConsumer( | ||
| topic, | ||
| loop=self._loop, |
There was a problem hiding this comment.
The loop parameter is deprecated in aiokafka version 0.8.0+ (released in 2022). Modern versions of aiokafka automatically use the current event loop. Passing the loop parameter explicitly may cause deprecation warnings or errors in recent versions. Consider removing the loop parameter or making it conditional based on the aiokafka version.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
| messages, error = await self.query(sentence, max_records=1) | ||
| if error: | ||
| return await self._serializer(None, error) | ||
| row = messages[0] if messages else None | ||
| return await self._serializer(row, None) | ||
|
|
||
| fetch_one = queryrow |
There was a problem hiding this comment.
The error handling on lines 181-182 is unreachable. The query method only returns [messages, None] on success or raises DriverError on failure - it never returns a non-None error in the tuple. The if error check will always be False. Either remove this dead code, or modify query to catch exceptions and return them via _serializer as [None, error] instead of raising them.
| messages, error = await self.query(sentence, max_records=1) | |
| if error: | |
| return await self._serializer(None, error) | |
| row = messages[0] if messages else None | |
| return await self._serializer(row, None) | |
| fetch_one = queryrow | |
| messages, _ = await self.query(sentence, max_records=1) | |
| row = messages[0] if messages else None | |
| return await self._serializer(row, None) | |
| fetch_one = queryrow | |
| fetch_one = queryrow |
| messages, error = await self.query(sentence, max_records=1) | ||
| if error: | ||
| return await self._serializer(None, error) |
There was a problem hiding this comment.
The queryrow method assumes query returns a (messages, error) tuple and checks for error on line 181. However, the query method raises DriverError on failure (line 173) rather than returning it in a tuple. This means if query fails, an uncaught exception will be raised instead of being handled. The error handling on lines 181-182 is unreachable. Either query should catch exceptions and return them via _serializer, or queryrow should remove the error check (since exceptions will propagate anyway).
| messages, error = await self.query(sentence, max_records=1) | |
| if error: | |
| return await self._serializer(None, error) | |
| messages, _ = await self.query(sentence, max_records=1) |
|
@copilot open a new pull request to apply changes based on the comments in this thread |
|
@phenobarbital I've opened a new pull request, #1486, to work on those changes. Once the pull request is ready, I'll request review from you. |
|
@phenobarbital I've opened a new pull request, #1487, to work on those changes. Once the pull request is ready, I'll request review from you. |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
[WIP] WIP Address feedback on async Redpanda driver using aiokafka
[WIP] Add async Redpanda driver using aiokafka
…r-redpanda feat(drivers): add async Redpanda driver using aiokafka
Motivation
Description
asyncdb/drivers/redpanda.pyimplementing aredpandaBaseDriverwithconnection,close,use,prepare,execute,execute_many,query, andqueryrowmapped toAIOKafkaProducer/AIOKafkaConsumeroperations and message encoding/decoding.aiokafkaand raises a clearDriverErrorwith install instructions when the dependency is missing.tests/test_redpanda.pythat exercises connection, publish and consume flows using mockedaiokafkaproducer/consumer/admin classes, so tests do not require a running Redpanda cluster.README.mdto list Redpanda in supported backends and added theredpandadriver alias to the driver-name section.Testing
python -m py_compile asyncdb/drivers/redpanda.py tests/test_redpanda.pywhich succeeded.pytest -q tests/test_redpanda.pywhich could not run due to repository pytest/tox configuration issues (tox.iniduplicateexcludeentry).pytest -q -c /dev/null tests/test_redpanda.pywhich failed in this environment becausepytest_asynciois not installed.asyncdb.exceptions.exceptionsis not built/importable, but the driver logic exercised with mocked aiokafka classes behaved as expected in local checks.Codex Task
Summary by Sourcery
Add an asynchronous Redpanda (Kafka-compatible) driver based on aiokafka and cover it with basic producer/consumer tests and documentation updates.
New Features:
Enhancements:
Documentation:
Tests: