added attacklog_correlator#106
Conversation
|
Please add:
|
|
Apache/Nginx lines like: are silently dropped, no parse_json_line_timestamp() only captures string, misses epochs to find bugs easier; i suggest counting unparsable lines and adding something like "skipped N lines across M files due to unparseable timestamps", is that feasible? |
|
| def get_zone(name: str) -> ZoneInfo: | ||
| try: | ||
| return ZoneInfo(name) | ||
| except Exception: |
There was a problem hiding this comment.
should log a warning if that happens
| end = line_number + radius | ||
| out: list[str] = [] | ||
|
|
||
| def _sanitize_preview_text(value: str) -> str: |
There was a problem hiding this comment.
this is the same as _ui_sanitize ?
| return attacks | ||
|
|
||
|
|
||
| def run_journalctl_export(path: Path) -> list[str]: |
There was a problem hiding this comment.
better to ask for permission here? (i.e. is journalclt there?) otherwise silent error, was journal file was skipped because of a format issue or because journalctl isn't installed
| lexical_terms = extract_attack_lexical_terms(attack) | ||
| exact_hits = [tok for tok in lexical_terms if tok not in LEXICAL_STOP_TOKENS and tok in event_text] | ||
| if exact_hits: | ||
| val = min(80.0, 10.0 * len(set(exact_hits))) |
|
|
||
| overlap_terms = sorted((set(lexical_terms) - set(exact_hits)) & event_terms) | ||
| if overlap_terms: | ||
| val = min(40.0, 6.0 * len(overlap_terms)) |
|
|
||
| matched_total = len(set(binary_hits)) + len(set(exact_hits)) + len(set(overlap_terms)) | ||
| if matched_total >= 3: | ||
| bonus = min(25.0, 5.0 * matched_total) |
There was a problem hiding this comment.
magic numbers
giving meaningful names somewhere, an then its also easier to tinker around with if one wants to change the scoring system
# Binary name match: highest weight signal, executable names are strong evidence
SCORE_BINARY_BASE = 40.0 # awarded for any binary match at all
SCORE_BINARY_PER_HIT = 25.0 # per additional distinct binary matched
SCORE_BINARY_CAP = 120.0 # caps at 4 binaries (base + 3 additional hits)
# Exact token match: attack tokens appearing literally in the log line
SCORE_EXACT_TOKEN_PER_HIT = 10.0 # per distinct token
SCORE_EXACT_TOKEN_CAP = 80.0 # caps at 8 tokens
# Token overlap: tokens present in both after tokenization
SCORE_OVERLAP_PER_TERM = 6.0 # per overlapping term
SCORE_OVERLAP_CAP = 40.0 # caps at ~7 terms
# Multi-signal bonus: rewards correlations where several independent signals agree
SCORE_MULTI_SIGNAL_THRESHOLD = 3 # minimum combined hits to qualify
SCORE_MULTI_SIGNAL_PER_HIT = 5.0 # per hit above threshold
SCORE_MULTI_SIGNAL_CAP = 25.0 # caps at 5 combined hits
| cmd: str | ||
| source_file: str | ||
| line_number: int | ||
| raw: dict[str, Any] |
There was a problem hiding this comment.
this is never used again? gets assigned to Attack() when getting it from raw output it assigns raw={}?
| cmd=str(row.get("cmd", row.get("attack_cmd", ""))), | ||
| source_file=str(row.get("source_file", row.get("attack_file", ""))), | ||
| line_number=int(row.get("line_number", row.get("attack_line", 0)) or 0), | ||
| raw={}, |
There was a problem hiding this comment.
intentional? the raw is saved in Attack() before
| ) | ||
| return top | ||
|
|
||
| def _main(self, stdscr) -> None: |
There was a problem hiding this comment.
somewhat long, maybe extract rendering of each pane to functions?
| return None | ||
|
|
||
|
|
||
| ndefault = object() |
added attacklog_correlator.py to scripts/ and adapted .gitignore for its working files