-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhashing.py
More file actions
99 lines (74 loc) · 2.62 KB
/
hashing.py
File metadata and controls
99 lines (74 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""File hashing utilities for memory dump integrity verification"""
import hashlib
import asyncio
from pathlib import Path
from typing import Dict, List
from database import ForensicsDatabase
async def calculate_hashes(file_path: Path,
algorithms: List[str] = None) -> Dict[str, str]:
"""
Calculate multiple hashes for a file asynchronously
Args:
file_path: Path to file to hash
algorithms: List of hash algorithms (default: ['md5', 'sha1', 'sha256'])
Returns:
Dictionary of algorithm -> hexdigest mapping
Example:
{'md5': 'd41d8cd...', 'sha1': 'da39a3e...', 'sha256': 'e3b0c44...'}
"""
if algorithms is None:
algorithms = ['md5', 'sha1', 'sha256']
# Create hashers for each algorithm
hashers = {alg: hashlib.new(alg) for alg in algorithms}
# Use 8MB chunks for performance with large memory dumps
chunk_size = 8 * 1024 * 1024
# Run hash calculation in executor to avoid blocking
def _calculate():
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
for hasher in hashers.values():
hasher.update(chunk)
return {alg: hasher.hexdigest() for alg, hasher in hashers.items()}
# Run in thread pool to avoid blocking event loop
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, _calculate)
return result
async def get_or_calculate_hashes(db: ForensicsDatabase, dump_id: str,
dump_path: Path) -> Dict[str, str]:
"""
Get cached hashes or calculate if not available
Args:
db: Database instance
dump_id: Dump identifier
dump_path: Path to dump file
Returns:
Dictionary of hashes
"""
# Check database cache first
cached_hashes = await db.get_dump_hashes(dump_id)
if cached_hashes:
return cached_hashes
# Calculate hashes
hashes = await calculate_hashes(dump_path)
# Store in database
await db.store_dump_hashes(dump_id, hashes)
return hashes
def format_hashes(hashes: Dict[str, str]) -> str:
"""
Format hashes for display
Args:
hashes: Dictionary of algorithm -> digest mapping
Returns:
Formatted markdown string
"""
result = "**File Hashes:**\n"
if 'md5' in hashes:
result += f"- MD5: {hashes['md5']}\n"
if 'sha1' in hashes:
result += f"- SHA1: {hashes['sha1']}\n"
if 'sha256' in hashes:
result += f"- SHA256: {hashes['sha256']}\n"
return result