Skip to content

Commit 06e48af

Browse files
committed
use transactions in kafka loader
1 parent d9f4851 commit 06e48af

File tree

1 file changed

+42
-27
lines changed

1 file changed

+42
-27
lines changed

src/amp/loaders/implementations/kafka_loader.py

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def __post_init__(self):
2222
class KafkaLoader(DataLoader[KafkaConfig]):
2323
SUPPORTED_MODES = {LoadMode.APPEND}
2424
REQUIRES_SCHEMA_MATCH = False
25-
SUPPORTS_TRANSACTIONS = False
25+
SUPPORTS_TRANSACTIONS = True
2626

2727
def __init__(self, config: Dict[str, Any], label_manager=None) -> None:
2828
super().__init__(config, label_manager)
@@ -37,8 +37,11 @@ def connect(self) -> None:
3737
bootstrap_servers=self.config.bootstrap_servers,
3838
client_id=self.config.client_id,
3939
value_serializer=lambda x: json.dumps(x, default=str).encode('utf-8'),
40+
transactional_id=f'{self.config.client_id}-txn',
4041
)
4142

43+
self._producer.init_transactions()
44+
4245
metadata = self._producer.bootstrap_connected()
4346
self.logger.info(f'Connection status: {metadata}')
4447
self.logger.info(f'Connected to Kafka at {self.config.bootstrap_servers}')
@@ -52,7 +55,6 @@ def connect(self) -> None:
5255

5356
def disconnect(self) -> None:
5457
if self._producer:
55-
self._producer.flush()
5658
self._producer.close()
5759
self._producer = None
5860

@@ -73,17 +75,23 @@ def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) ->
7375
if num_rows == 0:
7476
return 0
7577

76-
for i in range(num_rows):
77-
row = {field: values[i] for field, values in data_dict.items()}
78-
row['_type'] = 'data'
78+
self._producer.begin_transaction()
79+
try:
80+
for i in range(num_rows):
81+
row = {field: values[i] for field, values in data_dict.items()}
82+
row['_type'] = 'data'
7983

80-
key = self._extract_message_key(row)
84+
key = self._extract_message_key(row)
8185

82-
self._producer.send(topic=table_name, key=key, value=row)
86+
self._producer.send(topic=table_name, key=key, value=row)
8387

84-
self._producer.flush()
88+
self._producer.commit_transaction()
89+
self.logger.debug(f'Committed transaction with {num_rows} messages to topic {table_name}')
8590

86-
self.logger.debug(f'Sent {num_rows} messages to topic {table_name}')
91+
except Exception as e:
92+
self._producer.abort_transaction()
93+
self.logger.error(f'Transaction aborted due to error: {e}')
94+
raise
8795

8896
return num_rows
8997

@@ -111,22 +119,29 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str)
111119
self.logger.warning('Producer not connected, skipping reorg handling')
112120
return
113121

114-
for invalidation_range in invalidation_ranges:
115-
reorg_message = {
116-
'_type': 'reorg',
117-
'network': invalidation_range.network,
118-
'start_block': invalidation_range.start,
119-
'end_block': invalidation_range.end,
120-
}
121-
122-
self._producer.send(
123-
topic=table_name, key=f'reorg:{invalidation_range.network}'.encode('utf-8'), value=reorg_message
124-
)
125-
126-
self.logger.info(
127-
f'Sent reorg event to {table_name}: '
128-
f'{invalidation_range.network} blocks {invalidation_range.start}-{invalidation_range.end}'
129-
)
122+
self._producer.begin_transaction()
123+
try:
124+
for invalidation_range in invalidation_ranges:
125+
reorg_message = {
126+
'_type': 'reorg',
127+
'network': invalidation_range.network,
128+
'start_block': invalidation_range.start,
129+
'end_block': invalidation_range.end,
130+
}
131+
132+
self._producer.send(
133+
topic=table_name, key=f'reorg:{invalidation_range.network}'.encode('utf-8'), value=reorg_message
134+
)
135+
136+
self.logger.info(
137+
f'Sent reorg event to {table_name}: '
138+
f'{invalidation_range.network} blocks {invalidation_range.start}-{invalidation_range.end}'
139+
)
140+
141+
self._producer.commit_transaction()
142+
self.logger.info(f'Committed {len(invalidation_ranges)} reorg events to {table_name}')
130143

131-
self._producer.flush()
132-
self.logger.info(f'Flushed {len(invalidation_ranges)} reorg events to {table_name}')
144+
except Exception as e:
145+
self._producer.abort_transaction()
146+
self.logger.error(f'Reorg transaction aborted due to error: {e}')
147+
raise

0 commit comments

Comments
 (0)