Skip to content

Commit d9f4851

Browse files
committed
Refactor kafka streaming loader and fix Resume watermark
1 parent 2b66d22 commit d9f4851

File tree

6 files changed

+119
-215
lines changed

6 files changed

+119
-215
lines changed

apps/kafka_streaming_loader.py

Lines changed: 76 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1,187 +1,124 @@
11
#!/usr/bin/env python3
2-
"""
3-
Kafka streaming loader with label joining.
4-
Continuously loads ERC20 transfers to Kafka with token metadata.
5-
"""
2+
"""Stream data to Kafka with resume watermark support."""
63

74
import argparse
8-
import json
5+
import logging
96
import os
10-
import time
117
from pathlib import Path
128

139
from amp.client import Client
1410
from amp.loaders.types import LabelJoinConfig
15-
from kafka import KafkaConsumer
16-
17-
18-
def consume_messages(kafka_brokers: str, topic: str, max_messages: int = 10):
19-
"""Consume and print messages from Kafka topic for testing."""
20-
print(f'\n{"=" * 60}')
21-
print('Consuming messages from Kafka')
22-
print(f'{"=" * 60}\n')
23-
print(f'Topic: {topic}')
24-
print(f'Brokers: {kafka_brokers}')
25-
print(f'Max messages: {max_messages}\n')
26-
27-
consumer = KafkaConsumer(
28-
topic,
29-
bootstrap_servers=kafka_brokers,
30-
auto_offset_reset='earliest',
31-
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
32-
consumer_timeout_ms=5000,
33-
group_id='kafka-streaming-loader-consumer',
34-
enable_auto_commit=True,
35-
)
11+
from amp.streaming import BlockRange, ResumeWatermark
12+
13+
14+
def get_block_hash(client: Client, raw_dataset: str, block_num: int) -> str:
15+
"""Get block hash from dataset.blocks table."""
16+
query = f'SELECT hash FROM {raw_dataset}.blocks WHERE block_num = {block_num} LIMIT 1'
17+
result = client.get_sql(query, read_all=True)
18+
hash_val = result.to_pydict()['hash'][0]
19+
return '0x' + hash_val.hex() if isinstance(hash_val, bytes) else hash_val
3620

37-
def format_address(addr):
38-
"""Convert binary address to hex string."""
39-
if addr is None:
40-
return None
41-
if isinstance(addr, str):
42-
return addr
43-
if isinstance(addr, bytes):
44-
return '0x' + addr.hex()
45-
return addr
46-
47-
msg_count = 0
48-
for message in consumer:
49-
msg_count += 1
50-
data = message.value
51-
print(f'Message {msg_count}:')
52-
print(f' block_num: {data.get("block_num")}')
53-
print(f' token_address: {format_address(data.get("token_address"))}')
54-
print(f' symbol: {data.get("symbol")}')
55-
print(f' name: {data.get("name")}')
56-
print(f' decimals: {data.get("decimals")}')
57-
print(f' value: {data.get("value")}')
58-
print(f' from_address: {format_address(data.get("from_address"))}')
59-
print(f' to_address: {format_address(data.get("to_address"))}')
60-
print()
61-
62-
if msg_count >= max_messages:
63-
break
64-
65-
consumer.close()
66-
print(f'Consumed {msg_count} messages from Kafka topic "{topic}"')
21+
22+
def get_latest_block(client: Client, raw_dataset: str) -> int:
23+
"""Get latest block number from dataset.blocks table."""
24+
query = f'SELECT block_num FROM {raw_dataset}.blocks ORDER BY block_num DESC LIMIT 1'
25+
result = client.get_sql(query, read_all=True)
26+
return result.to_pydict()['block_num'][0]
27+
28+
29+
def create_watermark(client: Client, raw_dataset: str, network: str, start_block: int) -> ResumeWatermark:
30+
"""Create a resume watermark for the given start block."""
31+
watermark_block = start_block - 1
32+
watermark_hash = get_block_hash(client, raw_dataset, watermark_block)
33+
return ResumeWatermark(
34+
ranges=[BlockRange(network=network, start=watermark_block, end=watermark_block, hash=watermark_hash)]
35+
)
6736

6837

6938
def main(
39+
amp_server: str,
7040
kafka_brokers: str,
7141
topic: str,
72-
label_csv: str,
73-
amp_server: str,
7442
query_file: str,
75-
consume_mode: bool = False,
76-
consume_max: int = 10,
43+
raw_dataset: str,
44+
network: str,
45+
start_block: int = None,
46+
label_csv: str = None,
7747
):
78-
if consume_mode:
79-
consume_messages(kafka_brokers, topic, consume_max)
80-
return
81-
82-
print(f'Connecting to Amp server: {amp_server}')
8348
client = Client(amp_server)
49+
print(f'Connected to {amp_server}')
8450

85-
label_path = Path(label_csv)
86-
if not label_path.exists():
87-
raise FileNotFoundError(f'Label CSV not found: {label_csv}')
51+
if label_csv and Path(label_csv).exists():
52+
client.configure_label('tokens', label_csv)
53+
print(f'Loaded {len(client.label_manager.get_label("tokens"))} labels from {label_csv}')
54+
label_config = LabelJoinConfig(
55+
label_name='tokens', label_key_column='token_address', stream_key_column='token_address'
56+
)
57+
else:
58+
label_config = None
8859

89-
client.configure_label('tokens', str(label_path))
90-
print(f'Loaded {len(client.label_manager.get_label("tokens"))} tokens from {label_csv}')
60+
client.configure_connection('kafka', 'kafka', {'bootstrap_servers': kafka_brokers, 'client_id': 'amp-kafka-loader'})
9161

92-
kafka_config = {
93-
'bootstrap_servers': kafka_brokers,
94-
'client_id': 'amp-kafka-loader',
95-
}
96-
client.configure_connection('kafka', 'kafka', kafka_config)
62+
with open(query_file) as f:
63+
query = f.read()
9764

98-
query_path = Path(query_file)
99-
if not query_path.exists():
100-
raise FileNotFoundError(f'Query file not found: {query_file}')
65+
if start_block is None:
66+
start_block = get_latest_block(client, raw_dataset) - 10
10167

102-
with open(query_path) as f:
103-
query = f.read()
68+
print(f'Starting from block {start_block}')
10469

105-
label_config = LabelJoinConfig(
106-
label_name='tokens',
107-
label_key_column='token_address',
108-
stream_key_column='token_address',
109-
)
70+
resume_watermark = create_watermark(client, raw_dataset, network, start_block) if start_block > 0 else None
71+
if resume_watermark:
72+
print(f'Watermark: {resume_watermark.to_json()}')
11073

111-
print(f'Starting Kafka streaming loader')
112-
print(f'Kafka brokers: {kafka_brokers}')
113-
print(f'Topic: {topic}')
114-
print('Press Ctrl+C to stop\n')
74+
print(f'Streaming to Kafka: {kafka_brokers} -> {topic}\n')
11575

116-
total_rows = 0
11776
batch_count = 0
118-
11977
for result in client.sql(query).load(
120-
connection='kafka',
121-
destination=topic,
122-
stream=True,
123-
label_config=label_config,
78+
'kafka', topic, stream=True, label_config=label_config, resume_watermark=resume_watermark
12479
):
12580
if result.success:
126-
total_rows += result.rows_loaded
12781
batch_count += 1
128-
print(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s (total: {total_rows})')
82+
if batch_count == 1 and result.metadata:
83+
print(f'First batch: {result.metadata.get("block_ranges")}\n')
84+
print(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s')
12985
else:
13086
print(f'Error: {result.error}')
13187

13288

13389
if __name__ == '__main__':
134-
parser = argparse.ArgumentParser(description='Stream ERC20 transfers to Kafka with token labels')
135-
parser.add_argument(
136-
'--kafka-brokers',
137-
default=os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
138-
help='Kafka bootstrap servers (default: localhost:9092 or KAFKA_BOOTSTRAP_SERVERS env var)',
139-
)
140-
parser.add_argument('--topic', default='erc20_transfers', help='Kafka topic name (default: erc20_transfers)')
90+
parser = argparse.ArgumentParser(description='Stream data to Kafka with resume watermark')
91+
parser.add_argument('--amp-server', default=os.getenv('AMP_SERVER_URL', 'grpc://127.0.0.1:1602'))
92+
parser.add_argument('--kafka-brokers', default='localhost:9092')
93+
parser.add_argument('--topic', required=True)
94+
parser.add_argument('--query-file', required=True)
14195
parser.add_argument(
142-
'--label-csv',
143-
default='data/eth_mainnet_token_metadata.csv',
144-
help='Path to token metadata CSV (default: data/eth_mainnet_token_metadata.csv)',
96+
'--raw-dataset', required=True, help='Dataset name for the raw dataset of the chain (e.g., anvil, eth_firehose)'
14597
)
146-
parser.add_argument(
147-
'--amp-server',
148-
default=os.getenv('AMP_SERVER_URL', 'grpc://34.27.238.174:80'),
149-
help='Amp server URL (default: grpc://34.27.238.174:80 or AMP_SERVER_URL env var)',
150-
)
151-
parser.add_argument(
152-
'--query-file',
153-
default='apps/queries/erc20_transfers.sql',
154-
help='Path to SQL query file (default: apps/queries/erc20_transfers.sql)',
155-
)
156-
parser.add_argument(
157-
'--consume',
158-
action='store_true',
159-
help='Consume mode: read and print messages from Kafka topic (for testing)',
160-
)
161-
parser.add_argument(
162-
'--consume-max',
163-
type=int,
164-
default=10,
165-
help='Maximum messages to consume in consume mode (default: 10)',
166-
)
167-
98+
parser.add_argument('--network', default='anvil')
99+
parser.add_argument('--start-block', type=int, help='Start from specific block (default: latest - 10)')
100+
parser.add_argument('--label-csv', help='Optional CSV for label joining')
101+
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'])
168102
args = parser.parse_args()
169103

104+
if args.log_level:
105+
logging.basicConfig(
106+
level=getattr(logging, args.log_level), format='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
107+
)
108+
170109
try:
171110
main(
111+
amp_server=args.amp_server,
172112
kafka_brokers=args.kafka_brokers,
173113
topic=args.topic,
174-
label_csv=args.label_csv,
175-
amp_server=args.amp_server,
176114
query_file=args.query_file,
177-
consume_mode=args.consume,
178-
consume_max=args.consume_max,
115+
raw_dataset=args.raw_dataset,
116+
network=args.network,
117+
start_block=args.start_block,
118+
label_csv=args.label_csv,
179119
)
180120
except KeyboardInterrupt:
181-
print('\n\nInterrupted by user')
121+
print('\n\nStopped by user')
182122
except Exception as e:
183-
print(f'\n\nError: {e}')
184-
import traceback
185-
186-
traceback.print_exc()
123+
print(f'\nError: {e}')
187124
raise

apps/queries/anvil_logs.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
SELECT
2+
block_num,
3+
tx_hash,
4+
log_index,
5+
address,
6+
topic0
7+
FROM anvil.logs

apps/queries/erc20_transfers_streaming.sql

Lines changed: 0 additions & 43 deletions
This file was deleted.

apps/test_kafka_query.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
Test ERC20 query with label joining
44
"""
55

6+
import json
67
import os
8+
import time
9+
10+
from kafka import KafkaConsumer
711

812
from amp.client import Client
913
from amp.loaders.types import LabelJoinConfig
@@ -76,10 +80,6 @@
7680
print('Reading back from Kafka')
7781
print('=' * 60)
7882

79-
from kafka import KafkaConsumer
80-
import json
81-
import time
82-
8383
time.sleep(1)
8484

8585
consumer = KafkaConsumer(
@@ -90,7 +90,7 @@
9090
consumer_timeout_ms=5000,
9191
)
9292

93-
print(f'\nConsuming messages from topic "erc20_transfers":\n')
93+
print('\nConsuming messages from topic "erc20_transfers":\n')
9494
msg_count = 0
9595
for message in consumer:
9696
msg_count += 1

0 commit comments

Comments
 (0)