@@ -41,7 +41,7 @@ def jobs(self):
4141 # yield job_id, intervals, NetFlowBot.perform_job, job_params
4242
4343 # mock the jobs for now: (until frontend is done)
44- job_id = 'traffic_in '
44+ job_id = '1min '
4545 intervals = [60 ]
4646 job_params = {
4747 "job_id" : job_id ,
@@ -75,7 +75,7 @@ def jobs(self):
7575 }
7676 yield job_id , intervals , NetFlowBot .perform_job , job_params
7777
78- job_id = 'daily '
78+ job_id = '24h '
7979 intervals = [3600 * 24 ]
8080 job_params = {
8181 "job_id" : job_id ,
@@ -122,35 +122,40 @@ def perform_job(*args, **job_params):
122122 # }
123123 # https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
124124
125- affecting_intervals , = args
125+ job_id = job_params [ "job_id" ]
126126 values = []
127127 entity_info = job_params ["entity_info" ]
128+ minute_ago = datetime .now () - timedelta (minutes = 1 )
128129
129- if 60 in affecting_intervals :
130+ if job_id == '1min' :
130131 output_path_prefix = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic_in'
131132
132- minute_ago = datetime .now () - timedelta (minutes = 1 )
133133 two_minutes_ago = minute_ago - timedelta (minutes = 1 )
134134
135135 # Traffic in and out: (per interface)
136136 values .extend (NetFlowBot .get_values_traffic_in (output_path_prefix , two_minutes_ago , minute_ago ))
137137 values .extend (NetFlowBot .get_values_traffic_out (output_path_prefix , two_minutes_ago , minute_ago ))
138+ # output_path_prefix = f'entity.{entity_info["entity_id"]}.netflow'
138139 values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = True ))
139140 values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = False ))
141+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = True ))
142+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = False ))
140143
141144 # every hour, collect stats for the whole hour:
142- if 3600 in affecting_intervals :
145+ elif job_id == '1h' :
143146 output_path_prefix_1hour = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic.in.1hour'
144147 hour_ago = minute_ago - timedelta (hours = 1 )
145148 values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1hour , hour_ago , minute_ago , 18 , is_direction_in = True ))
146149 values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1hour , hour_ago , minute_ago , 18 , is_direction_in = False ))
147150
148151 # every 24h, also collect stats for the whole day:
149- if 3600 * 24 in affecting_intervals :
152+ elif job_id == '24h' :
150153 output_path_prefix_1day = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic.in.1day'
151154 day_ago = minute_ago - timedelta (days = 1 )
152155 values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = True ))
153156 values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = False ))
157+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = True ))
158+ values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = False ))
154159
155160 if not values :
156161 log .warning ("No values found to be sent to Grafolean" )
@@ -228,7 +233,7 @@ def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_di
228233 # TODO: missing check for IP: r.client_ip = %s AND
229234 c .execute (f"""
230235 SELECT
231- f.IPV4_DST_ADDR ,
236+ f.IPV4_ { 'SRC' if is_direction_in else 'DST' } _ADDR ,
232237 sum(f.IN_BYTES) "traffic"
233238 FROM
234239 netflow_records "r",
@@ -240,14 +245,12 @@ def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_di
240245 f.{ 'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP' } = %s AND
241246 f.DIRECTION = { '0' if is_direction_in else '1' }
242247 GROUP BY
243- f.IPV4_DST_ADDR
248+ f.IPV4_ { 'SRC' if is_direction_in else 'DST' } _ADDR
244249 ORDER BY
245250 traffic desc
246251 LIMIT 10;
247252 """ , (from_time , to_time , interface_index ,))
248253
249- #SELECT f.data->'IPV4_DST_ADDR', sum((f.data->'IN_BYTES')::integer) "traffic" FROM netflow_records "r", netflow_flows "f" WHERE r.ts >= now() - interval '1 minute' AND r.seq = f.record AND (f.data->'INPUT_SNMP')::integer = 18 AND (f.data->'DIRECTION')::integer = '0' GROUP BY f.data->'IPV4_DST_ADDR' ORDER BY traffic desc LIMIT 10;
250-
251254 values = []
252255 for top_ip , traffic_bytes in c .fetchall ():
253256 output_path = f"{ output_path_prefix } .topip.{ 'in' if is_direction_in else 'out' } .{ interface_index } .if{ interface_index } .{ top_ip } "
@@ -257,6 +260,39 @@ def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_di
257260 })
258261 return values
259262
263+ @staticmethod
264+ def get_top_N_protocols (output_path_prefix , from_time , to_time , interface_index , is_direction_in = True ):
265+ with db .cursor () as c :
266+ # TODO: missing check for IP: r.client_ip = %s AND
267+ c .execute (f"""
268+ SELECT
269+ f.PROTOCOL,
270+ sum(f.IN_BYTES) "traffic"
271+ FROM
272+ netflow_records "r",
273+ netflow_flows "f"
274+ WHERE
275+ r.ts >= %s AND
276+ r.ts < %s AND
277+ r.seq = f.record AND
278+ f.{ 'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP' } = %s AND
279+ f.DIRECTION = { '0' if is_direction_in else '1' }
280+ GROUP BY
281+ f.PROTOCOL
282+ ORDER BY
283+ traffic desc
284+ LIMIT 10;
285+ """ , (from_time , to_time , interface_index ,))
286+
287+ values = []
288+ for protocol , traffic_bytes in c .fetchall ():
289+ output_path = f"{ output_path_prefix } .topproto.{ 'in' if is_direction_in else 'out' } .{ interface_index } .if{ interface_index } .{ protocol } .{ PROTOCOLS [protocol ]} "
290+ values .append ({
291+ 'p' : output_path ,
292+ 'v' : traffic_bytes / 60. , # Bps
293+ })
294+ return values
295+
260296
261297def wait_for_grafolean (backend_url ):
262298 while True :
0 commit comments