Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions misc/python-client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ def __init__(self,
gatekeeper=True,
source=None,
timeout=60,
chunk=32768):
chunk=32768,
metadata=None):
self.server = server
self.cert = cert
self.gatekeeper = gatekeeper
self.source = source
self.timeout = timeout
self.chunk = chunk
self.metadata = metadata or {}

def __ScanFileRequest(self, filename):
""" Generates a ScanFileRequest message defined in Strelka's Frontend protobuf
Expand All @@ -43,7 +45,9 @@ def __ScanFileRequest(self, filename):
client='strelka-python',
source=self.source,
gatekeeper=self.gatekeeper)
attributes = strelka_pb2.Attributes(filename=filename)
attributes = strelka_pb2.Attributes(filename=filename,
metadata=self.metadata,
yaraFilename=self.yaraFilename)
with open(filename, 'rb') as f:
while True:
chunk = f.read(self.chunk)
Expand Down Expand Up @@ -97,10 +101,17 @@ def main():
parser.add_argument('-f', '--file',
required=True,
help='file to submit for scanning')
parser.add_argument('-p', '--password',
default=None,
help='password for decrypting protected files (e.g. encrypted PDFs)')
args = parser.parse_args()
metadata = {}
if args.password:
metadata['password'] = args.password
client = StrelkaFrontend(server=args.server,
cert=args.cert,
gatekeeper=False)
gatekeeper=False,
metadata=metadata)
result = client.ScanFile(args.file)
with open(args.log, 'a', encoding='utf-8') as f:
f.write('\n.'.join(result)+'\n')
Expand Down
10 changes: 10 additions & 0 deletions src/go/cmd/strelka-frontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error {
keye := fmt.Sprintf("event:%v", id)
keyy := fmt.Sprintf("yara:%v", id)
keyo := fmt.Sprintf("yara_cache_key:%s", id)
keym := fmt.Sprintf("metadata:%v", id)

var attr *strelka.Attributes
var req *strelka.Request
Expand Down Expand Up @@ -156,6 +157,15 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error {
p.Set(stream.Context(), keyo, attr.YaraCacheKey, time.Until(deadline))
}

if len(attr.GetMetadata()) > 0 {
// We want to store the metadata in a separate key, because it's not part of the file data.
// This is to avoid the metadata being included in the file data hash, which would cause the file
// to be hashed differently if the metadata changes.
if metadataJSON, err := json.Marshal(attr.GetMetadata()); err == nil {
p.SetNX(stream.Context(), keym, metadataJSON, time.Until(deadline))
}
}

p.RPush(stream.Context(), keyd, in.Data)
p.ExpireAt(stream.Context(), keyd, deadline)

Expand Down
10 changes: 10 additions & 0 deletions src/python/bin/strelka-backend
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ class Backend(object):
legacy_yara_data = self.coordinator.get(f'yara:{root_id}') # backcompat
end_pop_data_time = datetime.now()

request_metadata = {}
raw_metadata = self.coordinator.get(f'metadata:{root_id}')
if raw_metadata:
try:
request_metadata = json.loads(raw_metadata)
except Exception:
logging.exception('failed to parse request metadata')

start_taste_time = datetime.now()
file.add_flavors({'mime': self.taste_mime(data)})
file.add_flavors({'yara': self.taste_yara(data)})
Expand Down Expand Up @@ -492,6 +500,8 @@ class Backend(object):
try:
options = scanner.get('options', {})
options['strelka_id'] = root_id
if request_metadata:
options['metadata'] = request_metadata
if name == 'ScanYara':
start_yara_retrieval = datetime.now()
yara_load_took_ms = 0
Expand Down
15 changes: 14 additions & 1 deletion src/python/strelka/scanners/scan_pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def init(self):
def scan(self, data, file, options, expire_at):
extract_text = options.get("extract_text", False)
file_limit = options.get("limit", 2000)
password = options.get('metadata', {}).get('password', None)

self.event["total"] = {"objects": 0, "extracted": 0}
extracted_objects = set()
Expand All @@ -33,6 +34,8 @@ def scan(self, data, file, options, expire_at):
try:
if pdf_to_png:
doc = fitz.open(stream=data, filetype='pdf')
if password and doc.is_encrypted:
doc.authenticate(password)

for i in range(0, min(3, doc.page_count)):
png_data = doc.get_page_pixmap(i, dpi=150).tobytes('png')
Expand All @@ -56,8 +59,18 @@ def scan(self, data, file, options, expire_at):
try:
with io.BytesIO(data) as pdf_io:

# Open file as with PyMuPDF as file object
pdf_reader = fitz.open(stream=pdf_io, filetype="pdf")
if pdf_reader.is_encrypted:
if password:
auth_result = pdf_reader.authenticate(password)
if auth_result:
self.flags.append("decrypted_with_password")
else:
self.flags.append("password_auth_failed")
return
else:
self.flags.append("encrypted_pdf_no_password")
return

no_object_extraction = options.get('no_object_extraction', False)

Expand Down
Binary file added src/python/strelka/tests/fixtures/test.pdf
Binary file not shown.
Binary file not shown.
157 changes: 157 additions & 0 deletions src/python/strelka/tests/test_scan_pdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Tests for ScanPdf scanner, focusing on password-protected PDF handling.

Run with:
PYTHONPATH=src/python uv run pytest src/python/strelka/tests/test_scan_pdf.py -v
"""
import time
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from strelka.scanners.scan_pdf import ScanPdf
from strelka.strelka import File


FIXTURES = Path(__file__).parent / "fixtures"
BACKEND_CFG = {"limits": {"scanner": 30}}
EXPIRE_AT = int(time.time()) + 300


@pytest.fixture
def scanner():
coordinator = MagicMock()
coordinator.pipeline.return_value = MagicMock()
return ScanPdf(BACKEND_CFG, coordinator)


@pytest.fixture
def normal_pdf():
with open(FIXTURES / "test.pdf", "rb") as f:
return f.read()


@pytest.fixture
def encrypted_pdf():
with open(FIXTURES / "test_encrypted.pdf", "rb") as f:
return f.read()


class TestScanPdfUnencrypted:
def test_no_encryption_flags_without_password(self, scanner, normal_pdf):
"""Unencrypted PDF produces no password-related flags."""
file = File(name="test.pdf")
_, result = scanner.scan_wrapper(normal_pdf, file, {}, EXPIRE_AT)
event = result["pdf"]

assert "decrypted_with_password" not in event["flags"]
assert "encrypted_pdf_no_password" not in event["flags"]
assert "password_auth_failed" not in event["flags"]
assert "pdf_load_error" not in event["flags"]

def test_no_encryption_flags_with_password(self, scanner, normal_pdf):
"""Unencrypted PDF with an unnecessary password ignores it."""
file = File(name="test.pdf")
options = {"metadata": {"password": "unnecessary"}}
_, result = scanner.scan_wrapper(normal_pdf, file, options, EXPIRE_AT)
event = result["pdf"]

assert "decrypted_with_password" not in event["flags"]
assert "encrypted_pdf_no_password" not in event["flags"]
assert "password_auth_failed" not in event["flags"]
assert "pdf_load_error" not in event["flags"]

def test_produces_totals(self, scanner, normal_pdf):
"""Unencrypted PDF initializes the total counters."""
file = File(name="test.pdf")
_, result = scanner.scan_wrapper(normal_pdf, file, {}, EXPIRE_AT)
event = result["pdf"]

assert "total" in event
assert "objects" in event["total"]
assert "extracted" in event["total"]

def test_extracts_uris(self, scanner, normal_pdf):
"""Unencrypted PDF extracts annotated URIs."""
file = File(name="test.pdf")
_, result = scanner.scan_wrapper(normal_pdf, file, {}, EXPIRE_AT)
event = result["pdf"]

assert "https://example.com/test" in event.get("annotated_uris", [])


class TestScanPdfEncrypted:
def test_correct_password_decrypts(self, scanner, encrypted_pdf):
"""Encrypted PDF with correct password sets decrypted flag."""
file = File(name="test_encrypted.pdf")
options = {"metadata": {"password": "test_password"}}
_, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT)
event = result["pdf"]

assert "decrypted_with_password" in event["flags"]
assert "password_auth_failed" not in event["flags"]
assert "encrypted_pdf_no_password" not in event["flags"]
assert "pdf_load_error" not in event["flags"]

def test_correct_password_produces_totals(self, scanner, encrypted_pdf):
"""Encrypted PDF decrypted with correct password initializes totals."""
file = File(name="test_encrypted.pdf")
options = {"metadata": {"password": "test_password"}}
_, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT)
event = result["pdf"]

assert "total" in event
assert event["total"]["extracted"] >= 0

def test_correct_password_extracts_uris(self, scanner, encrypted_pdf):
"""Encrypted PDF with correct password extracts URIs."""
file = File(name="test_encrypted.pdf")
options = {"metadata": {"password": "test_password"}}
_, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT)
event = result["pdf"]

assert "https://example.com/encrypted" in event.get("annotated_uris", [])

def test_wrong_password_flags_failure(self, scanner, encrypted_pdf):
"""Encrypted PDF with wrong password flags auth failure."""
file = File(name="test_encrypted.pdf")
options = {"metadata": {"password": "wrong_password"}}
_, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT)
event = result["pdf"]

assert "password_auth_failed" in event["flags"]
assert "decrypted_with_password" not in event["flags"]

def test_wrong_password_no_extraction(self, scanner, encrypted_pdf):
"""Encrypted PDF with wrong password does not extract objects."""
file = File(name="test_encrypted.pdf")
options = {"metadata": {"password": "wrong_password"}}
files, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT)

assert len(files) == 0

def test_no_password_flags_missing(self, scanner, encrypted_pdf):
"""Encrypted PDF with no password flags appropriately."""
file = File(name="test_encrypted.pdf")
_, result = scanner.scan_wrapper(encrypted_pdf, file, {}, EXPIRE_AT)
event = result["pdf"]

assert "encrypted_pdf_no_password" in event["flags"]
assert "decrypted_with_password" not in event["flags"]

def test_no_password_no_extraction(self, scanner, encrypted_pdf):
"""Encrypted PDF with no password does not extract objects."""
file = File(name="test_encrypted.pdf")
files, result = scanner.scan_wrapper(encrypted_pdf, file, {}, EXPIRE_AT)

assert len(files) == 0

def test_metadata_without_password_key(self, scanner, encrypted_pdf):
"""Encrypted PDF with metadata but no password key flags missing."""
file = File(name="test_encrypted.pdf")
options = {"metadata": {"other_key": "value"}}
_, result = scanner.scan_wrapper(encrypted_pdf, file, options, EXPIRE_AT)
event = result["pdf"]

assert "encrypted_pdf_no_password" in event["flags"]