Skip to content

You #71

@velaandre

Description

@velaandre

import requests
import json
import numpy as np
import os
import time
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import statistics
import concurrent.futures
import math
import random

--- CLIENT CONFIGURATION (Mirroring Server's Expected Values) ---

MAX_FACEMAP_SIZE_SERVER_EXPECTED = 1024 * 1024 # Server's max facemap size
EXPECTED_SIGNATURE_SIZE_SERVER = 256 # Server's expected signature size (e.g., RSA 2048)
MAX_TOTAL_REQUEST_SIZE_SERVER_EXPECTED = 2 * 1024 * 1024 # Server's app.config['MAX_CONTENT_LENGTH']

Expected server response time range

MIN_SERVER_RESPONSE_TIME = 1.0
MAX_SERVER_RESPONSE_TIME = 1.2

Server's rate limit parameters (for client-side estimation/testing)

SERVER_RATE_LIMIT_COUNT = 20
SERVER_RATE_LIMIT_WINDOW = 60 # seconds

--- Utility Functions ---

def load_facemap_and_signature(facemap_path: str, signature_path: str, public_key_path: str) -> tuple[bytearray, bytes, bytes]:
"""
Loads facemap data, signature, and public key from specified file paths.
"""
if not os.path.exists(facemap_path):
raise FileNotFoundError(f"The facemap file '{facemap_path}' was not found.")
if not os.path.exists(signature_path):
raise FileNotFoundError(f"The signature file '{signature_path}' was not found.")
if not os.path.exists(public_key_path):
raise FileNotFoundError(f"The public key file '{public_key_path}' was not found.")
try:
with open(facemap_path, "rb") as f:
facemap_data = bytearray(f.read())
with open(signature_path, "rb") as f:
signature = f.read()
with open(public_key_path, "rb") as f:
public_key_pem = f.read()
return facemap_data, signature, public_key_pem
except IOError as e:
raise IOError(f"Error reading file: {e}")

def verify_local_signature(facemap_data: bytes, signature: bytes, public_key_pem: bytes) -> bool:
"""
Locally verifies the signature using the provided public key.
This is for testing your understanding of the signature, not for the attack itself.
"""
try:
public_key = serialization.load_pem_public_key(public_key_pem)
public_key.verify(signature, facemap_data, padding.PKCS1v15(), hashes.SHA256())
return True
except InvalidSignature:
return False
except Exception as e:
return False

def prepare_request_data(original_facemap: bytearray, original_signature: bytes, strategy: str, **kwargs) -> tuple[dict, str, dict]:
"""
Prepares the request data for requests.post.
Returns (files_dict, description_string, request_kwargs).
request_kwargs can include headers, data, or other specific settings for requests.post.
"""
files_dict = {}
description = ""
request_kwargs = {}

if strategy == "valid_input":
    files_dict = {
        'facemap': ('facemap.bin', original_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = "Sending original valid facemap and signature (Expected OK)."

# --- Tampering to trigger signature/hash check ---
elif strategy == "tamper_facemap_random_bytes":
    tampered_facemap = bytearray(original_facemap)
    start_index = kwargs.get("start_index", max(0, len(tampered_facemap) // 4))
    end_index = kwargs.get("end_index", min(len(tampered_facemap), len(tampered_facemap) // 2))
    for i in range(start_index, end_index):
        tampered_facemap[i] = np.random.randint(0, 256)
    files_dict = {
        'facemap': ('facemap.bin', tampered_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = f"Tampering facemap with random bytes ({start_index}-{end_index}), original signature (Expected Invalid)."

elif strategy == "tamper_signature_random_bytes":
    tampered_signature_list = list(original_signature)
    idx_to_change = np.random.randint(0, len(tampered_signature_list))
    byte_to_change = np.random.randint(0, 256)
    tampered_signature_list[idx_to_change] = byte_to_change
    tampered_signature = bytes(tampered_signature_list)
    files_dict = {
        'facemap': ('facemap.bin', original_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', tampered_signature, 'application/octet-stream')
    }
    description = f"Original facemap, tampering signature with random byte at index {idx_to_change} (Expected Invalid)."

# --- Size Limit Tests (Facemap - caught by Flask or custom validation) ---
elif strategy == "facemap_at_max_size_server_expected":
    large_facemap = original_facemap[:MAX_FACEMAP_SIZE_SERVER_EXPECTED].ljust(MAX_FACEMAP_SIZE_SERVER_EXPECTED, b'\x00')
    files_dict = {
        'facemap': ('facemap.bin', large_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = f"Sending facemap exactly at {MAX_FACEMAP_SIZE_SERVER_EXPECTED} bytes (Expected Invalid Sig if not pre-signed)."

elif strategy == "facemap_just_over_max_size":
    oversized_facemap = original_facemap + os.urandom(100)
    if len(oversized_facemap) < MAX_FACEMAP_SIZE_SERVER_EXPECTED + 10:
         oversized_facemap = original_facemap[:MAX_FACEMAP_SIZE_SERVER_EXPECTED] + os.urandom(100)
    files_dict = {
        'facemap': ('facemap.bin', oversized_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = f"Sending facemap just over {MAX_FACEMAP_SIZE_SERVER_EXPECTED} bytes ({len(oversized_facemap)} bytes) (Expected Invalid)."

elif strategy == "facemap_empty":
    files_dict = {
        'facemap': ('facemap.bin', b'', 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = "Sending empty facemap (Expected Invalid)."

# --- Size Limit Tests (Signature - caught by custom validation) ---
elif strategy == "signature_too_short":
    short_signature = original_signature[:EXPECTED_SIGNATURE_SIZE_SERVER // 2]
    files_dict = {
        'facemap': ('facemap.bin', original_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', short_signature, 'application/octet-stream')
    }
    description = f"Sending signature too short ({len(short_signature)} bytes) (Expected Invalid)."

elif strategy == "signature_too_long":
    long_signature = original_signature + os.urandom(EXPECTED_SIGNATURE_SIZE_SERVER // 2)
    files_dict = {
        'facemap': ('facemap.bin', original_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', long_signature, 'application/octet-stream')
    }
    description = f"Sending signature too long ({len(long_signature)} bytes) (Expected Invalid)."

# --- Multipart/Request Parsing Malformations ---
elif strategy == "missing_facemap_file":
    files_dict = {
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = "Sending request with missing 'facemap' file part (Expected Invalid)."

elif strategy == "missing_signature_file":
    files_dict = {
        'facemap': ('facemap.bin', original_facemap, 'application/octet-stream')
    }
    description = "Sending request with missing 'signature' file part (Expected Invalid)."

elif strategy == "no_files_at_all":
    files_dict = {}
    description = "Sending request with no file parts (Expected Invalid)."

elif strategy == "invalid_mime_type_facemap":
    files_dict = {
        'facemap': ('facemap.txt', original_facemap, 'text/plain'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = "Sending facemap with wrong MIME type (text/plain) (Expected Invalid)."

elif strategy == "invalid_mime_type_signature":
    files_dict = {
        'facemap': ('facemap.bin', original_facemap, 'application/octet-stream'),
        'signature': ('signature.jpg', original_signature, 'image/jpeg')
    }
    description = "Sending signature with wrong MIME type (image/jpeg) (Expected Invalid)."

elif strategy == "no_multipart_content_type":
    request_kwargs['data'] = b"This is not multipart data."
    request_kwargs['headers'] = {'Content-Type': 'application/x-www-form-urlencoded'}
    description = "Sending request without multipart/form-data Content-Type (Expected Invalid/BadRequest)."
    files_dict = None

# --- Total Request Size Limit Test (app.config['MAX_CONTENT_LENGTH']) ---
elif strategy == "total_request_too_large":
    large_facemap = os.urandom(MAX_TOTAL_REQUEST_SIZE_SERVER_EXPECTED + 100)
    files_dict = {
        'facemap': ('facemap_large.bin', large_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = f"Sending total request size > {MAX_TOTAL_REQUEST_SIZE_SERVER_EXPECTED} bytes (Expected RequestEntityTooLarge)."

# --- Resource Consumption / DoS Probing (within allowed sizes) ---
elif strategy == "large_facemap_all_zeros":
    large_zeros_facemap = bytearray(b'\x00' * MAX_FACEMAP_SIZE_SERVER_EXPECTED)
    files_dict = {
        'facemap': ('facemap.bin', large_zeros_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = f"Sending large facemap ({len(large_zeros_facemap)}B) of all zeros (Expected Invalid Sig)."

elif strategy == "large_facemap_high_entropy":
    large_random_facemap = os.urandom(MAX_FACEMAP_SIZE_SERVER_EXPECTED)
    files_dict = {
        'facemap': ('facemap.bin', large_random_facemap, 'application/octet-stream'),
        'signature': ('signature.bin', original_signature, 'application/octet-stream')
    }
    description = f"Sending large facemap ({len(large_random_facemap)}B) of high entropy data (Expected Invalid Sig)."

else:
    raise ValueError(f"Unknown strategy: '{strategy}'")

return files_dict, description, request_kwargs

def send_to_server(files_data: dict, url: str, headers: dict = None, **kwargs) -> tuple[requests.Response, float]:
"""
Sends the request data to the specified server URL and measures response time.
Returns (response_object, response_time_in_seconds).
"""
try:
start_time = time.perf_counter()
if files_data is not None:
response = requests.post(url, files=files_data, headers=headers or {}, timeout=30, verify=False, **kwargs)
else:
response = requests.post(url, headers=headers or {}, timeout=30, verify=False, **kwargs)
end_time = time.perf_counter()
response_time = end_time - start_time
response.raise_for_status()
return response, response_time
except requests.exceptions.RequestException as e:
end_time = time.perf_counter()
response_time = end_time - start_time
return None, response_time

def run_test_case(test_params, original_facemap, original_signature, server_url):
strategy_name = test_params["strategy"]
files_to_send, description, request_kwargs = prepare_request_data(original_facemap, original_signature, **test_params)

print(f"\n--- Test Case: {description} ---")

resp, duration = send_to_server(files_to_send, server_url, **request_kwargs)

status_code = resp.status_code if resp else "N/A"
response_text = resp.text if resp else "N/A (Request Error)"

print(f"Status: {status_code}")
print(f"Response: {response_text}")
print(f"Request Duration: {duration:.4f} seconds (Expected: {MIN_SERVER_RESPONSE_TIME:.1f}-{MAX_SERVER_RESPONSE_TIME:.1f}s)")
return strategy_name, duration, status_code, response_text

if name == "main":
# --- Configuration ---
facemap_path = "facemap.bin"
signature_path = "signature.bin"
public_key_path = "public_key.pem"
server_url = "https://127.0.0.1:8443/facemap/verify"

requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)

print(f"--- Starting Server Hardening Verification and DoS Probing ---")

# --- Load Facemap, Signature, and Public Key ---
original_facemap = None
original_signature = None
public_key_pem = None
try:
    original_facemap, original_signature, public_key_pem = load_facemap_and_signature(facemap_path, signature_path, public_key_path)
    print(f"Successfully loaded facemap ({len(original_facemap)} bytes), signature ({len(original_signature)} bytes), and public key.")

    if verify_local_signature(original_facemap, original_signature, public_key_pem):
        print("Local verification of original facemap and signature PASSED (Good baseline).")
    else:
        print("Local verification of original facemap and signature FAILED. Please check your files.")
        print("Remember to use a private key to sign a valid facemap.bin to get a valid signature.bin for testing.")
        exit(1)

except (FileNotFoundError, IOError) as e:
    print(f"Failed to load required files: {e}")
    print("Please ensure 'facemap.bin', 'signature.bin', and 'public_key.pem' exist and are accessible.")
    print("Refer to script comments for how to generate dummy files if needed.")
    exit(1)

# --- Individual Test Cases for Server Defenses and Robustness ---
test_cases = [
    {"strategy": "valid_input"},
    {"strategy": "tamper_facemap_random_bytes"},
    {"strategy": "tamper_signature_random_bytes"},
    {"strategy": "facemap_at_max_size_server_expected"},
    {"strategy": "facemap_just_over_max_size"},
    {"strategy": "facemap_empty"},
    {"strategy": "signature_too_short"},
    {"strategy": "signature_too_long"},
    {"strategy": "missing_facemap_file"},
    {"strategy": "missing_signature_file"},
    {"strategy": "no_files_at_all"},
    {"strategy": "invalid_mime_type_facemap"},
    {"strategy": "invalid_mime_type_signature"},
    {"strategy": "no_multipart_content_type"}, # Now handled by BadRequest error handler
    {"strategy": "total_request_too_large"},    # Now handled by RequestEntityTooLarge error handler
    {"strategy": "large_facemap_all_zeros"},
    {"strategy": "large_facemap_high_entropy"},
]

all_test_results = []

for test_params in test_cases:
    strategy_name, duration, status_code, response_text = run_test_case(test_params, original_facemap, original_signature, server_url)
    all_test_results.append({
        "strategy": strategy_name,
        "duration": duration,
        "status_code": status_code,
        "response_text": response_text
    })

# --- Concurrent Stress Test and Rate Limit Verification ---
print("\n--- Starting Concurrent Stress Test (Verifying Rate Limit & Stability) ---")
num_concurrent_requests_burst = SERVER_RATE_LIMIT_COUNT * 3 # Send 3x the rate limit to ensure it's hit
num_burst_iterations = 5          

concurrent_durations = []
concurrent_statuses = []

for i in range(num_burst_iterations):
    print(f"\nConcurrent Burst {i+1}/{num_burst_iterations} (Sending {num_concurrent_requests_burst} requests)...")
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_requests_burst) as executor:
        # Use a simple invalid input (tampered facemap) to hit core validation quickly
        futures = [executor.submit(send_to_server, *prepare_request_data(original_facemap, original_signature, "tamper_facemap_random_bytes")[:2], server_url) for _ in range(num_concurrent_requests_burst)]
        
        for k, future in enumerate(concurrent.futures.as_completed(futures)):
            try:
                resp, duration = future.result()
                concurrent_durations.append(duration)
                concurrent_statuses.append(resp.status_code if resp else "N/A")
            except Exception as exc:
                concurrent_durations.append(float('inf'))
                concurrent_statuses.append("ERROR_CLIENT_SIDE")
                pass

    # Wait a bit between bursts to allow rate limit to reset for some IPs, but not entirely for others
    # This helps test the sliding window more effectively
    time.sleep(SERVER_RATE_LIMIT_WINDOW / (SERVER_RATE_LIMIT_COUNT / 2))

# --- Final Timing and Result Analysis ---
print("\n--- Final Analysis of All Test Results ---")

durations_by_status = {}
for result in all_test_results:
    status = result["status_code"]
    durations_by_status.setdefault(status, []).append(result["duration"])

print("\nIndividual Test Case Durations and Responses:")
for result in all_test_results:
    print(f"  Strategy: {result['strategy']}")
    print(f"    Status: {result['status_code']}, Duration: {result['duration']:.4f}s")
    print(f"    Response: {result['response_text']}")

print("\nSummary of Durations by Status Code (Individual Tests):")
for status, times in durations_by_status.items():
    if times:
        min_t = min(times)
        max_t = max(times)
        avg_t = statistics.mean(times)
        stdev_t = statistics.stdev(times) if len(times) > 1 else 0.0
        print(f"  Status {status}: (Count: {len(times)})")
        print(f"    Min: {min_t:.4f}s, Max: {max_t:.4f}s, Avg: {avg_t:.4f}s, StdDev: {stdev_t:.4f}s")
    else:
        print(f"  Status {status}: No requests for this status.")

print("\nConcurrent Stress Test Durations and Statuses:")
if concurrent_durations:
    finite_durations = [d for d in concurrent_durations if d != float('inf')]
    if finite_durations:
        min_c = min(finite_durations)
        max_c = max(finite_durations)
        avg_c = statistics.mean(finite_durations)
        stdev_c = statistics.stdev(finite_durations) if len(finite_durations) > 1 else 0.0
        print(f"  Total requests sent: {len(concurrent_durations)}")
        print(f"  Successful/Timed-in requests: {len(finite_durations)}")
        print(f"  Min: {min_c:.4f}s, Max: {max_c:.4f}s, Avg: {avg_c:.4f}s, StdDev: {stdev_c:.4f}s")
        
        within_range_count = sum(1 for d in finite_durations if MIN_SERVER_RESPONSE_TIME <= d <= MAX_SERVER_RESPONSE_TIME)
        print(f"  Requests within {MIN_SERVER_RESPONSE_TIME:.1f}-{MAX_SERVER_RESPONSE_TIME:.1f}s range: {within_range_count}/{len(finite_durations)} ({within_range_count/len(finite_durations)*100:.2f}%)")
    else:
        print("  No successful/timed-in concurrent requests.")

    from collections import Counter
    status_counts = Counter(concurrent_statuses)
    print("  Status Code Distribution:")
    for status, count in status_counts.items():
        print(f"    {status}: {count} ({count/len(concurrent_statuses)*100:.2f}%)")
else:
    print("  No concurrent requests completed.")


print("\n--- Server Hardening Verification and DoS Probing Finished ---")
print("\nKey observations to look for:")
print(f"- **Timing Consistency:** Are ALL response durations (success and error, including 429s) consistently within the expected {MIN_SERVER_R

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions