Skip to content

Commit 3d5c462

Browse files
committed
initialize the kafka loader with some tests
1 parent 3f2b905 commit 3d5c462

File tree

7 files changed

+2917
-12
lines changed

7 files changed

+2917
-12
lines changed

.gitignore

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Byte-compiled / optimized / DLL files
2+
__pycache__/
3+
*.py[cod]
4+
*$py.class
5+
6+
# C extensions
7+
*.so
8+
9+
# Distribution / packaging
10+
.Python
11+
build/
12+
develop-eggs/
13+
dist/
14+
downloads/
15+
eggs/
16+
.eggs/
17+
lib/
18+
lib64/
19+
parts/
20+
sdist/
21+
var/
22+
wheels/
23+
share/python-wheels/
24+
*.egg-info/
25+
.installed.cfg
26+
*.egg
27+
MANIFEST
28+
29+
# PyInstaller
30+
*.manifest
31+
*.spec
32+
33+
# Unit test / coverage reports
34+
htmlcov/
35+
.tox/
36+
.nox/
37+
.coverage
38+
.coverage.*
39+
.cache
40+
nosetests.xml
41+
coverage.xml
42+
*.cover
43+
*.py,cover
44+
.hypothesis/
45+
.pytest_cache/
46+
cover/
47+
48+
# Environments
49+
.env
50+
.venv
51+
env/
52+
venv/
53+
ENV/
54+
env.bak/
55+
venv.bak/
56+
57+
# IDEs
58+
.vscode/
59+
.idea/
60+
*.swp
61+
*.swo
62+
*~
63+
.DS_Store
64+
65+
# Jupyter Notebook
66+
.ipynb_checkpoints
67+
68+
# pyenv
69+
.python-version
70+
71+
# mypy
72+
.mypy_cache/
73+
.dmypy.json
74+
dmypy.json
75+
76+
# Pyre type checker
77+
.pyre/
78+
79+
# pytype static type analyzer
80+
.pytype/
81+
82+
# Cython debug symbols
83+
cython_debug/
84+
85+
# Project specific
86+
*.log
87+
.env.local
88+
.env.*.local

pyproject.toml

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,19 @@ lmdb = [
6565
"lmdb>=1.4.0",
6666
]
6767

68+
kafka = [
69+
"kafka-python>=2.2.15",
70+
]
71+
6872
all_loaders = [
69-
"psycopg2-binary>=2.9.0", # PostgreSQL
70-
"redis>=4.5.0", # Redis
71-
"deltalake>=1.0.2", # Delta Lake (consistent version)
73+
"psycopg2-binary>=2.9.0", # PostgreSQL
74+
"redis>=4.5.0", # Redis
75+
"deltalake>=1.0.2", # Delta Lake (consistent version)
7276
"pyiceberg[sql-sqlite]>=0.10.0", # Apache Iceberg
73-
"pydantic>=2.0,<2.12", # PyIceberg 0.10.0 compatibility
74-
"snowflake-connector-python>=3.5.0", # Snowflake
75-
"lmdb>=1.4.0", # LMDB
77+
"pydantic>=2.0,<2.12", # PyIceberg 0.10.0 compatibility
78+
"snowflake-connector-python>=3.5.0", # Snowflake
79+
"lmdb>=1.4.0", # LMDB
80+
"kafka-python>=2.2.15",
7681
]
7782

7883
test = [

src/amp/loaders/implementations/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
try:
2323
from .iceberg_loader import IcebergLoader
24-
except ImportError:
24+
except Exception:
2525
IcebergLoader = None
2626

2727
try:
@@ -34,11 +34,10 @@
3434
except ImportError:
3535
LMDBLoader = None
3636

37-
# Add any other loaders here
38-
# try:
39-
# from .snowflake_loader import SnowflakeLoader
40-
# except ImportError:
41-
# SnowflakeLoader = None
37+
try:
38+
from .kafka_loader import KafkaLoader
39+
except ImportError:
40+
KafkaLoader = None
4241

4342
__all__ = []
4443

@@ -55,3 +54,5 @@
5554
__all__.append('SnowflakeLoader')
5655
if LMDBLoader:
5756
__all__.append('LMDBLoader')
57+
if KafkaLoader:
58+
__all__.append('KafkaLoader')
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import json
2+
from dataclasses import dataclass
3+
from typing import Any, Dict, List, Optional
4+
5+
import pyarrow as pa
6+
from kafka import KafkaProducer
7+
8+
from ...streaming.types import BlockRange
9+
from ..base import DataLoader, LoadMode
10+
11+
12+
@dataclass
13+
class KafkaConfig:
14+
bootstrap_servers: str
15+
client_id: str = 'amp-kafka-loader'
16+
key_field: Optional[str] = 'id'
17+
18+
def __post_init__(self):
19+
pass
20+
21+
22+
class KafkaLoader(DataLoader[KafkaConfig]):
23+
SUPPORTED_MODES = {LoadMode.APPEND}
24+
REQUIRES_SCHEMA_MATCH = False
25+
SUPPORTS_TRANSACTIONS = False
26+
27+
def __init__(self, config: Dict[str, Any]) -> None:
28+
super().__init__(config)
29+
self._producer = None
30+
31+
def _get_required_config_fields(self) -> list[str]:
32+
return ['bootstrap_servers']
33+
34+
def connect(self) -> None:
35+
try:
36+
self._producer = KafkaProducer(
37+
bootstrap_servers=self.config.bootstrap_servers,
38+
client_id=self.config.client_id,
39+
value_serializer=lambda x: json.dumps(x, default=str).encode('utf-8'),
40+
)
41+
42+
metadata = self._producer.bootstrap_connected()
43+
self.logger.info(f'Connection status: {metadata}')
44+
self.logger.info(f'Connected to Kafka at {self.config.bootstrap_servers}')
45+
self.logger.info(f'Client ID: {self.config.client_id}')
46+
47+
self._is_connected = True
48+
49+
except Exception as e:
50+
self.logger.error(f'Failed to connect to Kafka: {e}')
51+
raise
52+
53+
def disconnect(self) -> None:
54+
if self._producer:
55+
self._producer.flush()
56+
self._producer.close()
57+
self._producer = None
58+
59+
self._is_connected = False
60+
self.logger.info('Disconnected from Kafka')
61+
62+
def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> int:
63+
pass
64+
65+
def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str) -> None:
66+
pass

tests/conftest.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
try:
3535
from testcontainers.postgres import PostgresContainer
3636
from testcontainers.redis import RedisContainer
37+
from testcontainers.kafka import KafkaContainer
3738

3839
TESTCONTAINERS_AVAILABLE = True
3940
except ImportError:
@@ -201,6 +202,46 @@ def redis_test_config(request):
201202
return request.getfixturevalue('redis_config')
202203

203204

205+
@pytest.fixture(scope='session')
206+
def kafka_container():
207+
"""Kafka container for integration tests"""
208+
if not TESTCONTAINERS_AVAILABLE:
209+
pytest.skip('Testcontainers not available')
210+
211+
container = KafkaContainer(image='confluentinc/cp-kafka:7.6.0')
212+
container.start()
213+
214+
time.sleep(10)
215+
216+
yield container
217+
218+
container.stop()
219+
220+
221+
@pytest.fixture(scope='session')
222+
def kafka_config():
223+
"""Kafka configuration from environment or defaults"""
224+
return {
225+
'bootstrap_servers': os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
226+
'client_id': 'amp-test-client',
227+
}
228+
229+
230+
@pytest.fixture(scope='session')
231+
def kafka_test_config(request):
232+
"""Kafka configuration from testcontainer or environment"""
233+
if TESTCONTAINERS_AVAILABLE and USE_TESTCONTAINERS:
234+
kafka_container = request.getfixturevalue('kafka_container')
235+
bootstrap_servers = kafka_container.get_bootstrap_server()
236+
237+
return {
238+
'bootstrap_servers': bootstrap_servers,
239+
'client_id': 'amp-test-client',
240+
}
241+
else:
242+
return request.getfixturevalue('kafka_config')
243+
244+
204245
@pytest.fixture(scope='session')
205246
def delta_test_env():
206247
"""Create Delta Lake test environment for the session"""
@@ -408,6 +449,7 @@ def pytest_configure(config):
408449
config.addinivalue_line('markers', 'performance: Performance and benchmark tests')
409450
config.addinivalue_line('markers', 'postgresql: Tests requiring PostgreSQL')
410451
config.addinivalue_line('markers', 'redis: Tests requiring Redis')
452+
config.addinivalue_line('markers', 'kafka: Tests requiring Apache Kafka')
411453
config.addinivalue_line('markers', 'delta_lake: Tests requiring Delta Lake')
412454
config.addinivalue_line('markers', 'iceberg: Tests requiring Apache Iceberg')
413455
config.addinivalue_line('markers', 'snowflake: Tests requiring Snowflake')
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import pytest
2+
3+
try:
4+
from src.amp.loaders.implementations.kafka_loader import KafkaLoader
5+
except ImportError:
6+
pytest.skip('amp modules not available', allow_module_level=True)
7+
8+
9+
@pytest.mark.integration
10+
@pytest.mark.kafka
11+
class TestKafkaLoaderIntegration:
12+
13+
def test_loader_connection(self, kafka_test_config):
14+
loader = KafkaLoader(kafka_test_config)
15+
16+
loader.connect()
17+
assert loader._is_connected == True
18+
assert loader._producer is not None
19+
20+
loader.disconnect()
21+
assert loader._is_connected == False
22+
assert loader._producer is None
23+
24+
def test_context_manager(self, kafka_test_config):
25+
loader = KafkaLoader(kafka_test_config)
26+
27+
with loader:
28+
assert loader._is_connected == True
29+
assert loader._producer is not None
30+
31+
assert loader._is_connected == False

0 commit comments

Comments
 (0)