-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
114 lines (93 loc) · 3.76 KB
/
main.py
File metadata and controls
114 lines (93 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
import json
import logging
import time
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer
from telegraf import router as telegraf_router, counters, flush_stats
log = logging.getLogger(__name__)
LOG_FORMAT = '%(asctime)-15s %(levelname)s %(processName)s:%(threadName)s->%(name)s: %(message)s'
FLUSH_INTERVAL = 8
MAX_BUFFER_LEN = 5000
def message_handler(message, router, kafka_producer):
if len(message.value()) == 0:
return True
try:
message = json.loads(message.value().decode('utf-8'))
except ValueError:
log.debug('Error parsing message: {}'.format(message.value()))
return True
return router(message, kafka_producer)
def main_loop(kafka_consumer, kafka_producer, router):
running = True
log.info('Starting main loop...')
last_flush = time.time()
previous_lag = {}
while running:
now = time.time()
if now - last_flush >= FLUSH_INTERVAL:
log.debug('After {:.02f} seconds, flushed {} points to postgres with {} errors'.format(now - last_flush, counters['inserts'], counters['errors']))
kafka_consumer.commit()
kafka_producer.flush()
total_lag = 0
my_positions = kafka_consumer.position(kafka_consumer.assignment())
for part in my_positions:
try:
low, high = kafka_consumer.get_watermark_offsets(part, cached=False)
except KafkaException as e:
log.error('{}'.format(e.args[0].str()))
continue
if part.partition not in previous_lag:
previous_lag[part.partition] = 0
if part.offset > 0:
lag = high - part.offset
log.debug(' - partition {} lag {} ({})'.format(part.partition, lag, lag - previous_lag[part.partition]))
total_lag += lag
previous_lag[part.partition] = lag
counters['total_lag'] = total_lag
flush_stats(now - last_flush)
last_flush = now
try:
msg = kafka_consumer.poll(timeout=0.2)
if msg is None:
continue
elif msg.error() and msg.error().code() != KafkaError._PARTITION_EOF:
log.error(msg.error())
continue
else:
while not message_handler(msg, router, kafka_producer):
time.sleep(1)
except KeyboardInterrupt:
log.info('CTRL-C detected, exiting...')
return
except Exception:
log.exception('Exception in main loop')
def assign_cb(consumer_, partitions):
log.info("Partitions reassigned: ")
for part in partitions:
log.info(" - {}".format(part.partition))
def main_process(brokers, topic_name: str, message_router):
conf = {
'bootstrap.servers': brokers,
'group.id': 'kafkapost',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'auto.commit.enable': False,
'api.version.request': True,
'partition.assignment.strategy': 'roundrobin',
'compression.codec': 'snappy'
}
c = Consumer(conf)
c.subscribe([topic_name], on_assign=assign_cb)
log.info('Subscribed to Kafka topic')
p = Producer(conf)
main_loop(c, p, message_router)
log.debug('Unsubscribing from topic...')
c.unsubscribe()
log.debug('Closing Kafka consumer....')
c.close()
def main():
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
main_process('bf11:9092,bf5:9092', 'telegraf_json', telegraf_router)
if __name__ == "__main__":
main()