diff --git a/misc/python-client/client.py b/misc/python-client/client.py index d51d063c..f89cf8f5 100644 --- a/misc/python-client/client.py +++ b/misc/python-client/client.py @@ -23,13 +23,15 @@ def __init__(self, gatekeeper=True, source=None, timeout=60, - chunk=32768): + chunk=32768, + metadata=None): self.server = server self.cert = cert self.gatekeeper = gatekeeper self.source = source self.timeout = timeout self.chunk = chunk + self.metadata = metadata or {} def __ScanFileRequest(self, filename): """ Generates a ScanFileRequest message defined in Strelka's Frontend protobuf @@ -43,7 +45,9 @@ def __ScanFileRequest(self, filename): client='strelka-python', source=self.source, gatekeeper=self.gatekeeper) - attributes = strelka_pb2.Attributes(filename=filename) + attributes = strelka_pb2.Attributes(filename=filename, + metadata=self.metadata, + yaraFilename=self.yaraFilename) with open(filename, 'rb') as f: while True: chunk = f.read(self.chunk) @@ -97,10 +101,17 @@ def main(): parser.add_argument('-f', '--file', required=True, help='file to submit for scanning') + parser.add_argument('-p', '--password', + default=None, + help='password for decrypting protected files (e.g. encrypted PDFs)') args = parser.parse_args() + metadata = {} + if args.password: + metadata['password'] = args.password client = StrelkaFrontend(server=args.server, cert=args.cert, - gatekeeper=False) + gatekeeper=False, + metadata=metadata) result = client.ScanFile(args.file) with open(args.log, 'a', encoding='utf-8') as f: f.write('\n.'.join(result)+'\n') diff --git a/src/go/cmd/strelka-frontend/main.go b/src/go/cmd/strelka-frontend/main.go index 5a1cb99f..1afc2272 100644 --- a/src/go/cmd/strelka-frontend/main.go +++ b/src/go/cmd/strelka-frontend/main.go @@ -125,6 +125,7 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { keye := fmt.Sprintf("event:%v", id) keyy := fmt.Sprintf("yara:%v", id) keyo := fmt.Sprintf("yara_cache_key:%s", id) + keym := fmt.Sprintf("metadata:%v", id) var attr *strelka.Attributes var req *strelka.Request @@ -156,6 +157,15 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { p.Set(stream.Context(), keyo, attr.YaraCacheKey, time.Until(deadline)) } + if len(attr.GetMetadata()) > 0 { + // We want to store the metadata in a separate key, because it's not part of the file data. + // This is to avoid the metadata being included in the file data hash, which would cause the file + // to be hashed differently if the metadata changes. + if metadataJSON, err := json.Marshal(attr.GetMetadata()); err == nil { + p.SetNX(stream.Context(), keym, metadataJSON, time.Until(deadline)) + } + } + p.RPush(stream.Context(), keyd, in.Data) p.ExpireAt(stream.Context(), keyd, deadline) diff --git a/src/python/bin/strelka-backend b/src/python/bin/strelka-backend index 4d2c204f..26c0ad94 100644 --- a/src/python/bin/strelka-backend +++ b/src/python/bin/strelka-backend @@ -437,6 +437,14 @@ class Backend(object): legacy_yara_data = self.coordinator.get(f'yara:{root_id}') # backcompat end_pop_data_time = datetime.now() + request_metadata = {} + raw_metadata = self.coordinator.get(f'metadata:{root_id}') + if raw_metadata: + try: + request_metadata = json.loads(raw_metadata) + except Exception: + logging.exception('failed to parse request metadata') + start_taste_time = datetime.now() file.add_flavors({'mime': self.taste_mime(data)}) file.add_flavors({'yara': self.taste_yara(data)}) @@ -492,6 +500,8 @@ class Backend(object): try: options = scanner.get('options', {}) options['strelka_id'] = root_id + if request_metadata: + options['metadata'] = request_metadata if name == 'ScanYara': start_yara_retrieval = datetime.now() yara_load_took_ms = 0 diff --git a/src/python/strelka/scanners/scan_pdf.py b/src/python/strelka/scanners/scan_pdf.py index 679eb442..aa399a08 100644 --- a/src/python/strelka/scanners/scan_pdf.py +++ b/src/python/strelka/scanners/scan_pdf.py @@ -24,6 +24,7 @@ def init(self): def scan(self, data, file, options, expire_at): extract_text = options.get("extract_text", False) file_limit = options.get("limit", 2000) + password = options.get('metadata', {}).get('password', None) self.event["total"] = {"objects": 0, "extracted": 0} extracted_objects = set() @@ -33,6 +34,8 @@ def scan(self, data, file, options, expire_at): try: if pdf_to_png: doc = fitz.open(stream=data, filetype='pdf') + if password and doc.is_encrypted: + doc.authenticate(password) for i in range(0, min(3, doc.page_count)): png_data = doc.get_page_pixmap(i, dpi=150).tobytes('png') @@ -56,8 +59,18 @@ def scan(self, data, file, options, expire_at): try: with io.BytesIO(data) as pdf_io: - # Open file as with PyMuPDF as file object pdf_reader = fitz.open(stream=pdf_io, filetype="pdf") + if pdf_reader.is_encrypted: + if password: + auth_result = pdf_reader.authenticate(password) + if auth_result: + self.flags.append("decrypted_with_password") + else: + self.flags.append("password_auth_failed") + return + else: + self.flags.append("encrypted_pdf_no_password") + return no_object_extraction = options.get('no_object_extraction', False) diff --git a/src/python/strelka/tests/fixtures/test.pdf b/src/python/strelka/tests/fixtures/test.pdf new file mode 100644 index 00000000..7a6fcb75 Binary files /dev/null and b/src/python/strelka/tests/fixtures/test.pdf differ diff --git a/src/python/strelka/tests/fixtures/test_encrypted.pdf b/src/python/strelka/tests/fixtures/test_encrypted.pdf new file mode 100644 index 00000000..8eef4905 Binary files /dev/null and b/src/python/strelka/tests/fixtures/test_encrypted.pdf differ diff --git a/src/python/strelka/tests/test_scan_pdf.py b/src/python/strelka/tests/test_scan_pdf.py new file mode 100644 index 00000000..5b8fda54 --- /dev/null +++ b/src/python/strelka/tests/test_scan_pdf.py @@ -0,0 +1,157 @@ +""" +Tests for ScanPdf scanner, focusing on password-protected PDF handling. + +Run with: + PYTHONPATH=src/python uv run pytest src/python/strelka/tests/test_scan_pdf.py -v +""" +import time +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from strelka.scanners.scan_pdf import ScanPdf +from strelka.strelka import File + + +FIXTURES = Path(__file__).parent / "fixtures" +BACKEND_CFG = {"limits": {"scanner": 30}} +EXPIRE_AT = int(time.time()) + 300 + + +@pytest.fixture +def scanner(): + coordinator = MagicMock() + coordinator.pipeline.return_value = MagicMock() + return ScanPdf(BACKEND_CFG, coordinator) + + +@pytest.fixture +def normal_pdf(): + with open(FIXTURES / "test.pdf", "rb") as f: + return f.read() + + +@pytest.fixture +def encrypted_pdf(): + with open(FIXTURES / "test_encrypted.pdf", "rb") as f: + return f.read() + + +class TestScanPdfUnencrypted: + def test_no_encryption_flags_without_password(self, scanner, normal_pdf): + """Unencrypted PDF produces no password-related flags.""" + file = File(name="test.pdf") + _, result = scanner.scan_wrapper(normal_pdf, file, {}, EXPIRE_AT) + event = result["pdf"] + + assert "decrypted_with_password" not in event["flags"] + assert "encrypted_pdf_no_password" not in event["flags"] + assert "password_auth_failed" not in event["flags"] + assert "pdf_load_error" not in event["flags"] + + def test_no_encryption_flags_with_password(self, scanner, normal_pdf): + """Unencrypted PDF with an unnecessary password ignores it.""" + file = File(name="test.pdf") + options = {"metadata": {"password": "unnecessary"}} + _, result = scanner.scan_wrapper(normal_pdf, file, options, EXPIRE_AT) + event = result["pdf"] + + assert "decrypted_with_password" not in event["flags"] + assert "encrypted_pdf_no_password" not in event["flags"] + assert "password_auth_failed" not in event["flags"] + assert "pdf_load_error" not in event["flags"] + + def test_produces_totals(self, scanner, normal_pdf): + """Unencrypted PDF initializes the total counters.""" + file = File(name="test.pdf") + _, result = scanner.scan_wrapper(normal_pdf, file, {}, EXPIRE_AT) + event = result["pdf"] + + assert "total" in event + assert "objects" in event["total"] + assert "extracted" in event["total"] + + def test_extracts_uris(self, scanner, normal_pdf): + """Unencrypted PDF extracts annotated URIs.""" + file = File(name="test.pdf") + _, result = scanner.scan_wrapper(normal_pdf, file, {}, EXPIRE_AT) + event = result["pdf"] + + assert "https://example.com/test" in event.get("annotated_uris", []) + + +class TestScanPdfEncrypted: + def test_correct_password_decrypts(self, scanner, encrypted_pdf): + """Encrypted PDF with correct password sets decrypted flag.""" + file = File(name="test_encrypted.pdf") + options = {"metadata": {"password": "test_password"}} + _, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT) + event = result["pdf"] + + assert "decrypted_with_password" in event["flags"] + assert "password_auth_failed" not in event["flags"] + assert "encrypted_pdf_no_password" not in event["flags"] + assert "pdf_load_error" not in event["flags"] + + def test_correct_password_produces_totals(self, scanner, encrypted_pdf): + """Encrypted PDF decrypted with correct password initializes totals.""" + file = File(name="test_encrypted.pdf") + options = {"metadata": {"password": "test_password"}} + _, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT) + event = result["pdf"] + + assert "total" in event + assert event["total"]["extracted"] >= 0 + + def test_correct_password_extracts_uris(self, scanner, encrypted_pdf): + """Encrypted PDF with correct password extracts URIs.""" + file = File(name="test_encrypted.pdf") + options = {"metadata": {"password": "test_password"}} + _, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT) + event = result["pdf"] + + assert "https://example.com/encrypted" in event.get("annotated_uris", []) + + def test_wrong_password_flags_failure(self, scanner, encrypted_pdf): + """Encrypted PDF with wrong password flags auth failure.""" + file = File(name="test_encrypted.pdf") + options = {"metadata": {"password": "wrong_password"}} + _, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT) + event = result["pdf"] + + assert "password_auth_failed" in event["flags"] + assert "decrypted_with_password" not in event["flags"] + + def test_wrong_password_no_extraction(self, scanner, encrypted_pdf): + """Encrypted PDF with wrong password does not extract objects.""" + file = File(name="test_encrypted.pdf") + options = {"metadata": {"password": "wrong_password"}} + files, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT) + + assert len(files) == 0 + + def test_no_password_flags_missing(self, scanner, encrypted_pdf): + """Encrypted PDF with no password flags appropriately.""" + file = File(name="test_encrypted.pdf") + _, result = scanner.scan_wrapper(encrypted_pdf, file, {}, EXPIRE_AT) + event = result["pdf"] + + assert "encrypted_pdf_no_password" in event["flags"] + assert "decrypted_with_password" not in event["flags"] + + def test_no_password_no_extraction(self, scanner, encrypted_pdf): + """Encrypted PDF with no password does not extract objects.""" + file = File(name="test_encrypted.pdf") + files, result = scanner.scan_wrapper(encrypted_pdf, file, {}, EXPIRE_AT) + + assert len(files) == 0 + + def test_metadata_without_password_key(self, scanner, encrypted_pdf): + """Encrypted PDF with metadata but no password key flags missing.""" + file = File(name="test_encrypted.pdf") + options = {"metadata": {"other_key": "value"}} + _, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT) + event = result["pdf"] + + assert "encrypted_pdf_no_password" in event["flags"]