-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaudit.py
More file actions
106 lines (92 loc) · 3.75 KB
/
audit.py
File metadata and controls
106 lines (92 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
# Log de auditoría forense — formato JSON Lines
# jaimefg1888
import getpass
import json
import logging
import os
import socket
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Generator
def _safe_username() -> str:
"""Return the current username without raising in daemon / Docker environments.
``os.getlogin()`` raises ``OSError`` when there is no controlling
terminal (typical in Docker, CI pipelines, or systemd services).
The function falls back to ``getpass.getuser()`` and finally to
well-known environment variables.
"""
try:
return os.getlogin()
except OSError:
pass
try:
return getpass.getuser()
except Exception:
pass
return os.environ.get("USER", os.environ.get("USERNAME", "unknown"))
class AuditLogger:
"""Append-only forensic audit logger that writes JSON Lines records.
Each call to :meth:`log_wipe_operation` appends a single JSON object
terminated by a newline to the configured log file. Reading is done
lazily via :meth:`get_logs` to avoid loading the entire file into RAM.
Args:
log_path: Destination file for the audit log. Created on first
write if it does not exist.
"""
def __init__(self, log_path: Path = Path("madara_audit.jsonl")) -> None:
self.log_path = log_path
def log_wipe_operation(
self,
file_path: Path,
file_size: int,
sha256_before: str,
standard_used: str,
result: dict[str, Any],
) -> None:
"""Append one JSON record describing a completed wipe operation.
Args:
file_path: Absolute path of the file that was wiped.
file_size: Size of the file in bytes, captured before wiping.
sha256_before: Hex-encoded SHA-256 digest of the file contents
before the wipe started.
standard_used: Sanitization standard identifier (e.g. ``"clear"``).
result: Dictionary returned by the wipe engine. Expected keys:
``passes_completed``, ``verified``, ``duration``,
``success``, ``error``, ``strategy``.
"""
try:
record: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"file": str(file_path.absolute()),
"size_bytes": file_size,
"sha256_before": sha256_before,
"standard": standard_used,
"passes": result.get("passes_completed", 0),
"verified": result.get("verified", False),
"duration_sec": result.get("duration", 0.0),
"user": _safe_username(),
"hostname": socket.gethostname(),
"success": result.get("success", False),
"error": result.get("error"),
"strategy": result.get("strategy", "Unknown"),
}
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")
except Exception as exc:
logging.error("No se pudo escribir en el log de auditoría: %s", exc)
def get_logs(self) -> Generator[dict[str, Any], None, None]:
"""Yield parsed log records one at a time.
Reads the log file lazily so arbitrarily large audit logs can be
processed without exhausting available memory.
Yields:
One dictionary per valid JSON line in the log file.
"""
if not self.log_path.exists():
return
with open(self.log_path, "r", encoding="utf-8") as f:
for line in f:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue