-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_filter.py
More file actions
92 lines (75 loc) · 2.95 KB
/
log_filter.py
File metadata and controls
92 lines (75 loc) · 2.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""
Log Filter Module
Handles filtering of log entries by level, time, source, and regex patterns.
"""
import re
from datetime import datetime
from typing import List, Optional, Set
from log_parser import LogEntry, LogLevel
class LogFilter:
"""Filter for log entries"""
def __init__(self):
self.levels: Set[LogLevel] = set()
self.sources: Set[str] = set()
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
self.regex_pattern: Optional[str] = None
self.compiled_regex: Optional[re.Pattern] = None
self.show_anomalies_only: bool = False
def set_levels(self, levels: List[LogLevel]) -> None:
"""Set log levels to filter"""
self.levels = set(levels)
def set_sources(self, sources: List[str]) -> None:
"""Set sources to filter"""
self.sources = set(sources)
def set_time_range(self, start: Optional[datetime], end: Optional[datetime]) -> None:
"""Set time range to filter"""
self.start_time = start
self.end_time = end
def set_regex(self, pattern: str) -> None:
"""Set regex pattern for message filtering"""
self.regex_pattern = pattern
try:
if pattern:
self.compiled_regex = re.compile(pattern, re.IGNORECASE)
else:
self.compiled_regex = None
except re.error:
self.compiled_regex = None
def set_anomalies_only(self, enabled: bool) -> None:
"""Set whether to show only anomalies"""
self.show_anomalies_only = enabled
def matches(self, entry: LogEntry) -> bool:
"""Check if log entry matches filter criteria"""
# Level filter
if self.levels and entry.level not in self.levels:
return False
# Source filter
if self.sources and entry.source not in self.sources:
return False
# Time range filter
if entry.timestamp:
if self.start_time and entry.timestamp < self.start_time:
return False
if self.end_time and entry.timestamp > self.end_time:
return False
# Regex filter
if self.compiled_regex:
if not self.compiled_regex.search(entry.message):
return False
# Anomaly filter
if self.show_anomalies_only and not entry.is_anomaly:
return False
return True
def filter_entries(self, entries: List[LogEntry]) -> List[LogEntry]:
"""Filter log entries based on criteria"""
return [entry for entry in entries if self.matches(entry)]
def clear(self) -> None:
"""Clear all filters"""
self.levels.clear()
self.sources.clear()
self.start_time = None
self.end_time = None
self.regex_pattern = None
self.compiled_regex = None
self.show_anomalies_only = False