-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_statistics.py
More file actions
77 lines (61 loc) · 2.52 KB
/
log_statistics.py
File metadata and controls
77 lines (61 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
Log Statistics Module
Calculates statistics and metrics from log entries.
"""
from typing import List, Dict
from collections import Counter, defaultdict
from datetime import datetime
from log_parser import LogEntry, LogLevel
# Date format for hourly grouping
HOURLY_FORMAT = '%Y-%m-%d %H:00'
class LogStatistics:
"""Statistics calculator for log entries"""
def __init__(self, entries: List[LogEntry]):
self.entries = entries
self._calculate()
def _calculate(self) -> None:
"""Calculate all statistics"""
self.total_entries = len(self.entries)
self.level_counts = Counter()
self.source_counts = Counter()
self.hourly_counts = defaultdict(int)
self.anomaly_count = 0
for entry in self.entries:
# Count by level
self.level_counts[entry.level] += 1
# Count by source
if entry.source:
self.source_counts[entry.source] += 1
# Count by hour
if entry.timestamp:
hour_key = entry.timestamp.strftime(HOURLY_FORMAT)
self.hourly_counts[hour_key] += 1
# Count anomalies
if entry.is_anomaly:
self.anomaly_count += 1
def get_level_distribution(self) -> Dict[str, int]:
"""Get distribution of log levels"""
return {level.value: count for level, count in self.level_counts.items()}
def get_top_sources(self, n: int = 10) -> List[tuple]:
"""Get top N sources by count"""
return self.source_counts.most_common(n)
def get_timeline_data(self) -> Dict[str, int]:
"""Get timeline data (hourly counts)"""
return dict(sorted(self.hourly_counts.items()))
def get_error_rate(self) -> float:
"""Get percentage of error/critical logs"""
if self.total_entries == 0:
return 0.0
error_count = (self.level_counts[LogLevel.ERROR] +
self.level_counts[LogLevel.CRITICAL])
return (error_count / self.total_entries) * 100
def get_summary(self) -> Dict[str, any]:
"""Get summary statistics"""
return {
'total_entries': self.total_entries,
'level_distribution': self.get_level_distribution(),
'error_rate': self.get_error_rate(),
'anomaly_count': self.anomaly_count,
'unique_sources': len(self.source_counts),
'top_sources': self.get_top_sources(5)
}