-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathf2b-collate-subnets.py
More file actions
executable file
·492 lines (414 loc) · 25.2 KB
/
f2b-collate-subnets.py
File metadata and controls
executable file
·492 lines (414 loc) · 25.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
#!/usr/bin/env python3
# Fail2Ban Collate Subnets
# By: Gavin Botica
# Version 1.4.7
# This script aggregates banned IPs into subnets.
# This can dramatically lower the number of individual bans, thereby reducing resource usage by Fail2ban & iptables
# The script searches all IPs currently banned by Fail2Ban. Collating IPs into subnets that exceed the threshold.
# When using --collate-bans, the script unbans ALL banned IPs in the identified subnets and then bans the subnets.
import argparse
import subprocess
import ipaddress
import logging
import json
import sys
import os
import random
import glob
from typing import List, Dict, Tuple, Set
# ANSI Color Codes
RESET = "\033[0m"
BOLD = "\033[1m"
RED = "\033[91m" # For errors
GREEN = "\033[92m" # For success messages
YELLOW = "\033[93m" # For warnings, prompts, and important notes
CYAN = "\033[96m" # For subnet details and targets
WHITE = "\033[97m" # For general bold text and headers
# Configure logging
script_dir = os.path.dirname(os.path.abspath(__file__))
log_file = '/var/log/fail2ban-collate-subnets.log'
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_all_banned_ips_from_fail2ban() -> Dict[str, List[str]]:
"""
Calls 'fail2ban-client banned' to get all currently banned IPs
across all jails and returns the data as a dictionary {jail_name: [ip1, ip2, ...]}.
"""
try:
result = subprocess.run(
['sudo', 'fail2ban-client', 'banned'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
check=True
)
output_str = result.stdout.strip()
if not output_str.startswith('['):
print(f"{RED}Error: fail2ban-client output format unexpected. Is Fail2Ban running and do you have sudo access?{RESET}")
logging.error(f"fail2ban-client output format unexpected: {output_str[:200]}...")
return {}
# Replace single quotes with double quotes for valid JSON
json_str = output_str.replace("'", "\"")
# Load the list of single-item dictionaries, e.g., [{'jail1': [...]}, {'jail2': [...]}]
data = json.loads(json_str)
# Flatten the list of dictionaries into a single dictionary
banned_ips_by_jail = {}
for jail_dict in data:
banned_ips_by_jail.update(jail_dict)
return banned_ips_by_jail
except subprocess.CalledProcessError as e:
print(f"{RED}Error executing fail2ban-client: {e.stderr.strip()}{RESET}")
print(f"{YELLOW}Please ensure you have 'sudo' privileges and Fail2Ban is running.{RESET}")
logging.error(f"Error executing fail2ban-client: {e.stderr.strip()}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"{RED}Error parsing fail2ban-client output: {e}{RESET}")
print(f"{YELLOW}Raw output was: {output_str[:200]}...{RESET}")
logging.error(f"Error parsing fail2ban-client output: {e}. Raw output: {output_str[:200]}...")
sys.exit(1)
def flatten_banned_ips(banned_ips_by_jail: Dict[str, List[str]]) -> Tuple[Dict[str, str], List[str]]:
"""
Converts {jail_name: [ip1, ip2, ...]} to {ip: jail_name}.
Also collects any invalid IPs.
"""
all_ips_flat = {}
skipped_subnets = set() # Using a set to avoid duplicates
for jail, ips_list in banned_ips_by_jail.items():
for ip_str in ips_list:
try:
# Validate IP format, but don't create object yet for performance
ipaddress.ip_address(ip_str)
all_ips_flat[ip_str] = jail
except ValueError:
# If it's not a valid IP address, it might be a subnet or invalid entry
skipped_subnets.add(ip_str)
return all_ips_flat, list(skipped_subnets)
def subnet_aggregation(ips_flat: Dict[str, str], subnet_mask: int) -> Tuple[Dict[str, int], List[str]]:
"""
Groups IPs by subnet and counts IPs per subnet.
"""
subnets_counts = {}
invalid_ips = []
for ip, jail in ips_flat.items():
try:
network = ipaddress.ip_network(f"{ip}/{subnet_mask}", strict=False)
subnet_address = str(network.network_address)
subnets_counts[subnet_address] = subnets_counts.get(subnet_address, 0) + 1
except ValueError as e:
invalid_ips.append(ip)
logging.warning(f"Skipping invalid IP '{ip}' during subnet aggregation: {e}")
continue
return subnets_counts, invalid_ips
def get_ips_in_subnet(target_subnet: str, subnet_mask: int, all_ips_flat: Dict[str, str]) -> Dict[str, str]:
"""
Identifies all individual banned IPs that fall within a given target subnet
from the pre-fetched flat list of all banned IPs.
"""
ips_in_target_subnet = {}
try:
target_net = ipaddress.ip_network(f"{target_subnet}/{subnet_mask}", strict=False)
except ValueError as e:
logging.error(f"Invalid target subnet '{target_subnet}/{subnet_mask}': {e}")
return {}
for ip_str, jail in all_ips_flat.items():
try:
ip_addr = ipaddress.ip_address(ip_str)
if ip_addr in target_net:
ips_in_target_subnet[ip_str] = jail
except ValueError:
# Should ideally not happen if ips_flat was pre-validated, but good for robustness
logging.warning(f"Skipping invalid IP '{ip_str}' during subnet check.")
continue
return ips_in_target_subnet
def clean_old_rollback_files(script_dir: str, max_files_to_keep: int = 10):
"""
Deletes older rollback files, keeping only the 'max_files_to_keep' most recent ones.
This is called *after* a new file is created, so that if there are max_files_to_keep + 1 files,
the oldest one is removed to bring the total back to max_files_to_keep.
"""
rollback_pattern = os.path.join(script_dir, "f2b-collate-subnets-rollback_*.txt")
existing_files = glob.glob(rollback_pattern)
if len(existing_files) > max_files_to_keep:
# Sort files by modification time (oldest first)
files_with_mtime = []
for f in existing_files:
try:
mtime = os.path.getmtime(f)
files_with_mtime.append((mtime, f))
except OSError as e:
logging.warning(f"Could not get modification time for {f}: {e}. Skipping.")
files_with_mtime.sort()
num_to_delete = len(files_with_mtime) - max_files_to_keep
for i in range(num_to_delete):
file_to_delete = files_with_mtime[i][1]
try:
os.remove(file_to_delete)
logging.info(f"Removed old rollback file: {file_to_delete}")
except OSError as e:
print(f"{RED}Error removing old rollback file {file_to_delete}: {e}{RESET}")
logging.error(f"Error removing old rollback file {file_to_delete}: {e}")
def manage_bans(sorted_subnets_counts: Dict[str, int], all_ips_flat: Dict[str, str], threshold: int, collate_bans: bool, subnet_mask: int, ban_jail: str, test_backup: bool, non_interactive: bool) -> Tuple[int, int, Dict[str, str], Set[str]]:
"""
Manages the banning and unbanning of IPs and subnets.
Returns total_potential_ips, total_banned_ips_in_found_subnets, ips_to_unban, and qualifying_subnets.
"""
total_potential_ips_in_found_subnets = 0
total_banned_ips_in_found_subnets = 0
ips_to_unban_collected = {} # Stores {ip: jail} for all IPs in qualifying subnets
found_subnets_for_action = {} # Stores {subnet_address: {ip: jail}} for subnets exceeding threshold
qualifying_subnets = set() # Stores just the subnet addresses (CIDR) that meet the threshold
for subnet_address, count in sorted_subnets_counts.items():
if count >= threshold:
print(f"{CYAN}{count:<15}{subnet_address}/{subnet_mask}{RESET}")
# Add to the set of qualifying subnets (CIDR format)
qualifying_subnets.add(f"{subnet_address}/{subnet_mask}")
# Accumulate total potential IPs
total_potential_ips_in_found_subnets += ipaddress.ip_network(f"{subnet_address}/{subnet_mask}").num_addresses
# Get all banned IPs within this specific subnet (efficiently from memory)
current_subnet_ips = get_ips_in_subnet(subnet_address, subnet_mask, all_ips_flat)
# Accumulate all banned IPs found in qualifying subnets
ips_to_unban_collected.update(current_subnet_ips)
# Store for collate_bans action and rollback file generation
found_subnets_for_action[subnet_address] = current_subnet_ips
total_banned_ips_in_found_subnets = len(ips_to_unban_collected)
# --- Prepare rollback entries ---
rollback_entries = []
# Add entries for IPs that will be unbanned (rollback action: ban)
for ip, jail in ips_to_unban_collected.items():
rollback_entries.append(("ban", ip, jail))
# Add entries for subnets that will be banned (rollback action: unban)
for subnet_cidr in qualifying_subnets:
subnet_address, subnet_mask_str = subnet_cidr.split('/')
target_jail_for_subnet = ban_jail # Start with explicitly provided ban_jail
if not target_jail_for_subnet:
# If no specific jail, try to infer from an IP in this subnet
first_ip_in_subnet = next(iter(found_subnets_for_action[subnet_address]), None)
if first_ip_in_subnet:
target_jail_for_subnet = found_subnets_for_action[subnet_address][first_ip_in_subnet]
if target_jail_for_subnet: # Only add to rollback if a jail could be determined
rollback_entries.append(("unban", subnet_cidr, target_jail_for_subnet))
else:
logging.warning(f"Could not determine jail for subnet {subnet_cidr} for rollback file. Skipping this entry.")
# --- Create rollback file (triggered by collate_bans or test_backup) ---
if collate_bans or test_backup:
random_number = random.randint(100000, 999999) # Generate a 6-digit random number
rollback_filename = os.path.join(script_dir, f"f2b-collate-subnets-rollback_{random_number}.txt")
try:
with open(rollback_filename, 'w') as f:
for action, target, jail in rollback_entries:
f.write(f"{action} {target} {jail}\n") # Format: action target jail
print(f"{GREEN}Rollback file saved to: {rollback_filename}{RESET}")
logging.info(f"Rollback file saved to: {rollback_filename}")
clean_old_rollback_files(script_dir, max_files_to_keep=10)
except IOError as e:
print(f"{RED}Error saving rollback file {rollback_filename}: {e}{RESET}")
logging.error(f"Error saving rollback file {rollback_filename}: {e}")
# --- End Rollback File Creation ---
if collate_bans:
if not non_interactive: # Only prompt if not in non-interactive mode
available_ips = total_potential_ips_in_found_subnets - total_banned_ips_in_found_subnets
print(f"{YELLOW}Unbanning all found IPs, then banning these subnets will block {total_potential_ips_in_found_subnets} IPs, including {available_ips} IPs not already banned - are you sure?{RESET}")
confirmation = input(f"{YELLOW}Type 'y' to continue: {RESET}").lower()
if confirmation != 'y':
print(f"{YELLOW}Subnet banning aborted.{RESET}")
return total_potential_ips_in_found_subnets, total_banned_ips_in_found_subnets, {}, set()
else:
print(f"{YELLOW}Non-interactive mode: Proceeding with collate-bans.{RESET}")
logging.info("Non-interactive mode: Proceeding with collate-bans.")
# Ban the subnets that met the threshold (moved to happen first)
for subnet_address in found_subnets_for_action.keys():
try:
# Determine the jail to ban the subnet in.
target_jail = ban_jail
if not target_jail:
# If no specific jail is provided, use the jail of the first IP found in this subnet
first_ip_in_subnet = next(iter(found_subnets_for_action[subnet_address]), None)
if first_ip_in_subnet:
target_jail = found_subnets_for_action[subnet_address][first_ip_in_subnet]
if not target_jail:
print(f"{YELLOW}No jail specified and no IPs found in subnet {subnet_address}/{subnet_mask} to infer jail. Skipping ban.{RESET}")
logging.warning(f"No jail specified and no IPs found in subnet {subnet_address}/{subnet_mask} to infer jail. Skipping ban.")
continue
subprocess.run(['sudo', 'fail2ban-client', 'set', target_jail, 'banip', f"{subnet_address}/{subnet_mask}"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
print(f"{GREEN}Banned subnet: {subnet_address}/{subnet_mask} in jail: {target_jail}{RESET}")
logging.info(f"Banned subnet: {subnet_address}/{subnet_mask} in jail: {target_jail}")
except subprocess.CalledProcessError as e:
print(f"{RED}Error banning subnet {subnet_address}/{subnet_mask}: {e.stderr.strip()}{RESET}")
logging.error(f"Error banning subnet {subnet_address}/{subnet_mask}: {e.stderr.strip()}")
# Unban each IP in the collected list (moved to happen second)
for ip, jail in ips_to_unban_collected.items():
try:
subprocess.run(['sudo', 'fail2ban-client', 'set', jail, 'unbanip', ip], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
print(f"{GREEN}Unbanned IP: {ip} from jail: {jail}{RESET}")
logging.info(f"Unbanned IP: {ip} from jail: {jail}")
except subprocess.CalledProcessError as e:
print(f"{RED}Error unbanning IP {ip} from jail {jail}: {e.stderr.strip()}{RESET}")
logging.error(f"Error unbanning IP {ip} from jail {jail}: {e.stderr.strip()}")
return total_potential_ips_in_found_subnets, total_banned_ips_in_found_subnets, ips_to_unban_collected, qualifying_subnets
def perform_rollback(rollback_file: str, non_interactive: bool):
"""
Reads the rollback file and executes fail2ban-client commands to revert changes.
"""
print(f"\n{BOLD}{WHITE}Attempting to perform rollback using file: {rollback_file}{RESET}")
logging.info(f"Starting rollback from file: {rollback_file}")
if not os.path.exists(rollback_file):
print(f"{RED}Error: Rollback file '{rollback_file}' not found.{RESET}")
logging.error(f"Rollback file not found: {rollback_file}")
sys.exit(1)
rollback_actions = []
try:
with open(rollback_file, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
parts = line.split(' ', 2) # Split into 3 parts: action, value, jailname
if len(parts) != 3:
print(f"{YELLOW}Warning: Skipping malformed line {line_num} in rollback file: '{line}'{RESET}")
logging.warning(f"Skipping malformed line {line_num} in rollback file: '{line}'")
continue
action, value, jailname = parts
rollback_actions.append((action, value, jailname))
except IOError as e:
print(f"{RED}Error reading rollback file '{rollback_file}': {e}{RESET}")
logging.error(f"Error reading rollback file '{rollback_file}': {e}")
sys.exit(1)
if not rollback_actions:
print(f"{YELLOW}No valid rollback actions found in the file. Exiting.{RESET}")
logging.info("No valid rollback actions found in the file.")
sys.exit(0)
print(f"\n{BOLD}{WHITE}Found {len(rollback_actions)} rollback actions. Review carefully before proceeding:{RESET}")
for i, (action, value, jailname) in enumerate(rollback_actions):
# Using WHITE for action, CYAN for target, YELLOW for jail
print(f" {BOLD}{i+1}. Action: {WHITE}{action:<5}{RESET} Target: {CYAN}{value:<20}{RESET} Jail: {YELLOW}{jailname}{RESET}")
if not non_interactive: # Only prompt if not in non-interactive mode
confirmation = input(f"\n{YELLOW}Type 'y' to confirm and execute rollback: {RESET}").lower()
if confirmation != 'y':
print(f"{YELLOW}Rollback aborted by user.{RESET}")
logging.info("Rollback aborted by user.")
sys.exit(0)
else:
print(f"{YELLOW}Non-interactive mode: Proceeding with rollback.{RESET}")
logging.info("Non-interactive mode: Proceeding with rollback.")
print(f"\n{BOLD}{WHITE}Executing rollback actions...{RESET}")
for action, value, jailname in rollback_actions:
try:
# Append 'ip' to the action for fail2ban-client, i.e. "banip" or "unbanip"
f2b_action = action + 'ip'
command = ['sudo', 'fail2ban-client', 'set', jailname, f2b_action, value]
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
print(f"{GREEN}Successfully executed: {action} {value} in {jailname}{RESET}")
logging.info(f"Rollback success: {action} {value} in {jailname}")
except subprocess.CalledProcessError as e:
print(f"{RED}Error executing rollback command for '{action} {value} in {jailname}': {e.stderr.strip()}{RESET}")
logging.error(f"Rollback error: {action} {value} in {jailname}: {e.stderr.strip()}")
except Exception as e:
print(f"{RED}An unexpected error occurred during rollback for '{action} {value} in {jailname}': {e}{RESET}")
logging.error(f"Unexpected error during rollback for '{action} {value} in {jailname}': {e}")
print(f"\n{GREEN}Rollback process completed.{RESET}")
logging.info("Rollback process completed.")
def main():
parser = argparse.ArgumentParser(description="Fail2Ban Collate Subnets")
parser.add_argument("--threshold", type=int, default=50, dest="threshold", help="Minimum number of IPs per subnet to trigger collate (default: 10)")
parser.add_argument("--sortby", choices=['ip', 'count'], default='count', help="Sort by IP or count (default: count)")
parser.add_argument("--collate-bans", "-c", action="store_true", help="Unban IPs found in subnets (where threshold is exceeded), then ban the subnets")
parser.add_argument("--subnet-mask", type=int, default=24, help="Subnet mask (default: 24)")
parser.add_argument("--ban-jail", type=str, default=None, help="Jail to ban subnets in. If empty, will attempt to use the first jail the IPs are banned in.")
parser.add_argument("--list-ips", action="store_true", help="Display the full lists of IPs to unban and subnets to ban in dry run mode.")
parser.add_argument("--test-backup", action="store_true", help="FOR TESTING ONLY: Create the rollback file without performing ban/unban actions.")
parser.add_argument("--rollback", type=str, metavar="ROLLBACK_NUMBER", help="Perform a rollback using the specified file (by 6-digit number).")
parser.add_argument("--non-interactive", "-y", action="store_true", help="Run in non-interactive mode, automatically confirming actions.")
args = parser.parse_args()
if args.rollback:
try:
rollback_number = int(args.rollback)
if not (100000 <= rollback_number <= 999999):
raise ValueError("Rollback number must be a 6-digit integer.")
rollback_filename = os.path.join(script_dir, f"f2b-collate-subnets-rollback_{rollback_number}.txt")
perform_rollback(rollback_filename, args.non_interactive) # Pass non_interactive
sys.exit(0) # Exit after rollback is performed
except ValueError as e:
print(f"{RED}Error: Invalid rollback number provided. {e}{RESET}")
logging.error(f"Invalid rollback number provided: {args.rollback}. {e}")
sys.exit(1)
print(f"\n{BOLD}{WHITE}IP count Subnet{RESET}")
# Step 1: Get all banned IPs from Fail2Ban once
banned_ips_by_jail = get_all_banned_ips_from_fail2ban()
if not banned_ips_by_jail:
print(f"{YELLOW}No jails or banned IPs found. Exiting.{RESET}")
sys.exit(0)
# Step 2: Flatten the list of IPs and collect skipped ones
all_ips_flat, skipped_subnets_from_f2b = flatten_banned_ips(banned_ips_by_jail)
# Step 3: Aggregate IPs into subnets and count them
subnets_counts, invalid_ips_from_aggregation = subnet_aggregation(all_ips_flat, args.subnet_mask)
# Sort subnets
if args.sortby == 'ip':
sorted_subnets_counts = dict(sorted(subnets_counts.items()))
else:
sorted_subnets_counts = dict(sorted(subnets_counts.items(), key=lambda item: item[1], reverse=True))
# Step 4: Manage bans and collect IPs to unban (this prints the main list of subnets)
total_potential_ips_in_found_subnets, total_banned_ips_in_found_subnets, ips_to_unban_collected, qualifying_subnets = manage_bans(
sorted_subnets_counts, all_ips_flat, args.threshold, args.collate_bans, args.subnet_mask, args.ban_jail, args.test_backup, args.non_interactive # Pass non_interactive
)
# --- Start: New logic for skipped entries with jails (MOVED HERE) ---
skipped_entries_with_jails: List[Tuple[str, str]] = []
# Add entries from invalid_ips_from_aggregation (these were valid IPs, so their jail is in all_ips_flat)
for ip_str in invalid_ips_from_aggregation:
jail = all_ips_flat.get(ip_str, "Unknown") # Should always find it if it was in all_ips_flat
skipped_entries_with_jails.append((ip_str, jail))
# Add entries from skipped_subnets_from_f2b (these were not valid IPs, need to search banned_ips_by_jail)
# To avoid duplicates in the output, we'll use a set to track what's already added.
added_to_skipped_output = set()
for entry in skipped_subnets_from_f2b:
if entry in added_to_skipped_output:
continue
found_jail = "Unknown"
for jail_name, ips_list in banned_ips_by_jail.items():
if entry in ips_list:
found_jail = jail_name
break
skipped_entries_with_jails.append((entry, found_jail))
added_to_skipped_output.add(entry)
# Sort for consistent output
skipped_entries_with_jails.sort(key=lambda x: x[0]) # Sort by the entry string (IP or subnet)
if skipped_entries_with_jails:
print(f"\n{BOLD}{YELLOW}Skipped (existing subnets or invalid entries):{RESET}")
# Determine max entry length for formatting
max_entry_len = max(len(entry) for entry, _ in skipped_entries_with_jails) if skipped_entries_with_jails else 0
for entry, jail in skipped_entries_with_jails:
print(f" {CYAN}{entry:<{max_entry_len}}{RESET} Jail: {YELLOW}{jail}{RESET}")
# --- End: New logic for skipped entries with jails ---
# Calculate IPs per subnet for the given mask
ips_per_single_subnet = 2**(32 - args.subnet_mask)
print(f"\n{BOLD}{WHITE}** Total currently banned IPs (all jails): {len(all_ips_flat)}{RESET}")
print(f"{BOLD}{WHITE}** Total IPs in found subnets: {total_potential_ips_in_found_subnets} ({ips_per_single_subnet} per subnet){RESET}")
print(f"{BOLD}{WHITE}** Total currently banned IPs (where number of IPs exceed threshold of {args.threshold}) included in subnets: {total_banned_ips_in_found_subnets}{RESET}")
print(f"{BOLD}{WHITE}** Remaining \"innocent\" IPs in these subnets: {total_potential_ips_in_found_subnets - total_banned_ips_in_found_subnets}{RESET}")
if not args.collate_bans:
print(f"\n{BOLD}{YELLOW}** This is a dry run - no changes made{RESET}")
#print(f"{BOLD}{WHITE}** Run with \"--collate-bans\" to apply{RESET}") # Commented out as per previous instruction
#print(f"{BOLD}{WHITE}** Use \"--list-ips\" to display full lists of IPs and subnets{RESET}") # Commented out as per previous instruction
# Subnets to ban output
num_subnets_to_ban = len(qualifying_subnets)
print(f"\n{BOLD}{WHITE}** Subnets to ban: {num_subnets_to_ban}{RESET}")
if args.list_ips:
if qualifying_subnets:
for subnet in sorted(list(qualifying_subnets)):
print(f" {CYAN}{subnet}{RESET}")
else:
print(f" {WHITE}No subnets identified for banning.{RESET}")
# IPs to unban output
num_ips_to_unban = len(ips_to_unban_collected)
print(f"{BOLD}{WHITE}** IPs to unban: {num_ips_to_unban}{RESET}")
if args.list_ips:
if ips_to_unban_collected:
max_ip_len = 0
if ips_to_unban_collected:
max_ip_len = max(len(ip) for ip in ips_to_unban_collected.keys())
for ip, jail in ips_to_unban_collected.items():
print(f" {CYAN}{ip:<{max_ip_len}}{RESET} Jail: {YELLOW}{jail}{RESET}")
logging.info(f"IP to unban: {ip} Jail: {jail}")
else:
print(f" {WHITE}No IPs identified for unbanning in qualifying subnets.{RESET}")
print()
if __name__ == "__main__":
main()