Skip to content

Commit b8f486b

Browse files
committed
Improve logging for better debugging of named pipes (and less noise in logs)
1 parent 18a4427 commit b8f486b

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

netflowcollector.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99

1010
from colors import color
1111

12+
1213
# python-netflow-v9-softflowd expects main.py to be the main entrypoint, but we only need
1314
# get_export_packets() iterator:
1415
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow')
1516
from pynetflow.main import get_export_packets
17+
# disable DEBUG logging on NetFlow collector library:
18+
logging.getLogger('pynetflow.main').setLevel(logging.INFO)
1619

1720

1821
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
@@ -55,6 +58,7 @@ def process_netflow(netflow_port, named_pipe_filename):
5558
entry = {
5659
"ts": ts,
5760
"client": client_ip,
61+
"seq": export.header.sequence,
5862
"flows": [{
5963
"IN_BYTES": data["IN_BYTES"],
6064
"PROTOCOL": data["PROTOCOL"],
@@ -69,6 +73,7 @@ def process_netflow(netflow_port, named_pipe_filename):
6973
}
7074
line = json.dumps(entry).encode() + b'\n'
7175
fp.write(line)
76+
log.debug(f"Wrote seq [{export.header.sequence}] from client [{client_ip}], ts [{ts}], n flows: [{len(flows_data)}]")
7277
line = None
7378
except Exception as ex:
7479
log.exception(f"Exception: {str(ex)}")

netflowwriter.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ def write_record(j):
4747
with db.cursor() as c:
4848
# first save the flow record:
4949
ts = datetime.utcfromtimestamp(j['ts'])
50-
log.info(f"Received record: {ts} from {j['client']}")
50+
log.info(f"Received record [{j['seq']}]: {ts} from {j['client']}")
5151
c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, j['client'],))
52-
record_seq = c.fetchone()[0]
52+
record_db_seq = c.fetchone()[0]
5353

5454
# then save each of the flows within the record, but use execute_values() to perform bulk insert:
55-
def _get_data(record_seq, flows):
55+
def _get_data(record_db_seq, flows):
5656
for flow in flows:
5757
yield (
58-
record_seq,
58+
record_db_seq,
5959
flow.get('IN_BYTES'),
6060
flow.get('PROTOCOL'),
6161
flow.get('DIRECTION'),
@@ -66,7 +66,7 @@ def _get_data(record_seq, flows):
6666
flow.get('IPV4_DST_ADDR'),
6767
flow.get('IPV4_SRC_ADDR'),
6868
)
69-
data_iterator = _get_data(record_seq, j['flows'])
69+
data_iterator = _get_data(record_db_seq, j['flows'])
7070
psycopg2.extras.execute_values(
7171
c,
7272
f"INSERT INTO {DB_PREFIX}flows (record, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",

0 commit comments

Comments
 (0)