From a5a5a21e9fb051aa525a00804d5ab6fd8a19beaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20J=2E=20Rodr=C3=ADguez?= Date: Sat, 27 Jun 2026 15:02:34 +0200 Subject: [PATCH] feat(emu,frida): capture Windows TEB and seed x86 segment base Frida's CpuContext carries no segment base, so every Frida-acquired slice had fs/gs base = 0 and peb = None, and the Unicorn emulator could not run x86 SEH prologues that read the TEB via fs:[0]. Capture side (frida_bridge): on Windows, resolve each thread's TEB via OpenThread + NtQueryInformationThread(ThreadBasicInformation) and record it as gs_base (x64) / fs_base (ia32). Adds version-robust export resolution. Emulator side (emu/engine): in 32-bit mode Unicorn ignores UC_X86_REG_FS_BASE/GS_BASE, so install a synthetic GDT (flat ring-3 data descriptor) for each captured base and load the matching selector; expose it through segment_base()/peb_address(). x64 already honored gs_base directly. Tests for the x86 GDT path (fs:[0x30] -> PEB), the no-base no-regression case, and segment-base flow through the Frida bridge. README updated. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 6 ++ src/memslicer/acquirer/frida_bridge.py | 74 ++++++++++++++++++++++ src/memslicer/emu/engine.py | 88 ++++++++++++++++++++++++-- tests/test_emu.py | 71 +++++++++++++++++++++ tests/test_frida_bridge.py | 31 +++++++++ 5 files changed, 266 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8d1df2c..cb2074c 100644 --- a/README.md +++ b/README.md @@ -366,6 +366,12 @@ emu.segment_base("gs") # captured GS base (TEB on Windows x64) emu.peb_address() # follow gs:[0x60] / fs:[0x30] to the Windows PEB ``` +On Windows the Frida backend recovers each thread's TEB (via +`NtQueryInformationThread`) and records it as `gs_base` (x64) / `fs_base` (x86), +since the CPU context alone carries no segment base. On x86 the emulator installs +a synthetic GDT for the captured base (32-bit `FS`/`GS` can't be set as a +register), so SEH prologues that read `fs:[0]` emulate correctly. + ### Symbolic execution (angr) Load a slice into [angr](https://angr.io) for symbolic execution from the exact diff --git a/src/memslicer/acquirer/frida_bridge.py b/src/memslicer/acquirer/frida_bridge.py index ee7abe2..54af722 100644 --- a/src/memslicer/acquirer/frida_bridge.py +++ b/src/memslicer/acquirer/frida_bridge.py @@ -14,6 +14,69 @@ # Frida JS script for RPC exports _FRIDA_SCRIPT = """\ +// Resolve an export across Frida API generations (instance vs. legacy static). +function _resolveExport(mod, name) { + try { + var m = Process.findModuleByName(mod); + if (m) { + if (typeof m.findExportByName === 'function') { + var p = m.findExportByName(name); + if (p) return p; + } + if (typeof m.getExportByName === 'function') { + try { return m.getExportByName(name); } catch (e) {} + } + } + } catch (e) {} + try { + if (typeof Module.findExportByName === 'function') { + var p2 = Module.findExportByName(mod, name); + if (p2) return p2; + } + } catch (e) {} + try { + if (typeof Module.getExportByName === 'function') { + return Module.getExportByName(mod, name); + } + } catch (e) {} + return null; +} + +// Build a per-thread TEB resolver on Windows. Frida's CpuContext carries no +// segment base, so the TEB/PEB and TLS anchor is otherwise lost; we recover it +// via NtQueryInformationThread(ThreadBasicInformation).TebBaseAddress. Returns +// null on non-Windows or when the needed exports are unavailable. +function _makeTebResolver() { + if (Process.platform !== 'windows') return null; + var pOpen = _resolveExport('kernel32.dll', 'OpenThread'); + var pQuery = _resolveExport('ntdll.dll', 'NtQueryInformationThread'); + var pClose = _resolveExport('kernel32.dll', 'CloseHandle'); + if (!pOpen || !pQuery || !pClose) return null; + var OpenThread = new NativeFunction(pOpen, 'pointer', ['uint32', 'int', 'uint32']); + var NtQueryInformationThread = new NativeFunction( + pQuery, 'int', ['pointer', 'int', 'pointer', 'uint32', 'pointer']); + var CloseHandle = new NativeFunction(pClose, 'int', ['pointer']); + var THREAD_QUERY_INFORMATION = 0x0040; + var ThreadBasicInformation = 0; + var psize = Process.pointerSize; + // THREAD_BASIC_INFORMATION: NTSTATUS ExitStatus; PVOID TebBaseAddress; ... + // TebBaseAddress sits at offset == pointerSize (NTSTATUS padded to align). + var bufLen = (psize === 8) ? 48 : 28; + return function (tid) { + var h = OpenThread(THREAD_QUERY_INFORMATION, 0, tid); + if (h.isNull()) return null; + try { + var buf = Memory.alloc(bufLen); + var status = NtQueryInformationThread( + h, ThreadBasicInformation, buf, bufLen, NULL); + if (status !== 0) return null; + return buf.add(psize).readPointer(); + } finally { + CloseHandle(h); + } + }; +} + rpc.exports = { enumerateRanges: function(prot) { return Process.enumerateRanges(prot); @@ -50,6 +113,11 @@ "x22","x23","x24","x25","x26","x27","x28"] }; var names = REG_NAMES[Process.arch] || []; + // CpuContext exposes no segment base; recover the TEB on Windows and + // surface it as gs_base (x64) / fs_base (ia32) so the emulator can seed + // segment-relative (TEB/PEB, TLS) access. + var tebResolver = _makeTebResolver(); + var segBaseReg = (Process.arch === 'x64') ? 'gs_base' : 'fs_base'; return Process.enumerateThreads().map(function(t) { var ctx = {}; var raw = t.context || {}; @@ -70,6 +138,12 @@ if (v === undefined || v === null) continue; try { ctx[k] = v.toString(); } catch (e) { ctx[k] = String(v); } } + if (tebResolver && ctx[segBaseReg] === undefined) { + try { + var teb = tebResolver(t.id); + if (teb && !teb.isNull()) ctx[segBaseReg] = teb.toString(); + } catch (e) {} + } return {id: t.id, state: t.state, context: ctx}; }); }, diff --git a/src/memslicer/emu/engine.py b/src/memslicer/emu/engine.py index ec07427..de6bfa9 100644 --- a/src/memslicer/emu/engine.py +++ b/src/memslicer/emu/engine.py @@ -59,6 +59,23 @@ def _arch_table(): } +def _gdt_descriptor(base: int, limit: int, access: int, gran: int) -> bytes: + """Pack an 8-byte legacy GDT segment descriptor. + + *access* is the access byte (e.g. ``0xf2`` = present, ring-3, data RW) and + *gran* the 4-bit granularity/flags nibble (e.g. ``0xc`` = page-granular, + 32-bit). Used to give 32-bit ``fs``/``gs`` a base Unicorn won't accept via + ``UC_X86_REG_*_BASE``. + """ + desc = limit & 0xffff + desc |= (base & 0xffffff) << 16 + desc |= (access & 0xff) << 40 + desc |= ((limit >> 16) & 0xf) << 48 + desc |= (gran & 0xf) << 52 + desc |= ((base >> 24) & 0xff) << 56 + return desc.to_bytes(8, "little") + + def _coalesce(spans: list[tuple[int, int]]) -> list[tuple[int, int]]: """Merge [start, end) spans (already page-aligned) that touch or overlap.""" out: list[tuple[int, int]] = [] @@ -105,6 +122,8 @@ def __init__(self, image: SliceImage, thread: "int | EmuThread | None" = None): self.cs = capstone.Cs(cs_arch, cs_mode) self._until = (1 << self.bits) - 1 + self._mapped: list[tuple[int, int]] = [] # coalesced mapped spans + self._seg_bases: dict[str, int] = {} # x86 fs/gs base (GDT-seeded) self._map_memory() self._seed_registers() @@ -143,12 +162,26 @@ def _map_memory(self) -> None: lo = r.base & ~(_UC_PAGE - 1) hi = (r.base + r.size + _UC_PAGE - 1) & ~(_UC_PAGE - 1) spans.append((lo, hi)) - for lo, hi in _coalesce(spans): + self._mapped = _coalesce(spans) + for lo, hi in self._mapped: self.uc.mem_map(lo, hi - lo) for r in self.image.regions: for paddr, data in r.pages.items(): self.uc.mem_write(paddr, data) + def _find_free_page(self, size: int) -> int: + """Return a page-aligned address with *size* bytes free of mapped spans.""" + size = (size + _UC_PAGE - 1) & ~(_UC_PAGE - 1) + limit = 1 << self.bits + candidates = [(hi + _UC_PAGE - 1) & ~(_UC_PAGE - 1) for _, hi in self._mapped] + candidates.append(_UC_PAGE) + for c in sorted(candidates): + if c + size > limit: + continue + if not any(c < hi and c + size > lo for lo, hi in self._mapped): + return c + raise EmuError("no free address space for synthetic GDT") + def _reg_const(self, name: str): mod = getattr(self._U, self._const_mod) return getattr(mod, self._reg_prefix + name.upper(), None) @@ -162,8 +195,14 @@ def _resolve_thread(self, spec) -> "EmuThread | None": def _seed_registers(self) -> None: thread = self.thread if thread is None: + self._seg_bases = {} return + is_x86 = self.image.arch == ArchType.x86 for reg in thread.registers: + # In 32-bit mode Unicorn ignores UC_X86_REG_FS_BASE/GS_BASE (no-op); + # those bases are installed via a synthetic GDT in _seed_x86_segments. + if is_x86 and reg.name.lower().endswith("_base"): + continue const = self._reg_const(reg.name) if const is None: continue @@ -173,6 +212,43 @@ def _seed_registers(self) -> None: self.uc.reg_write(const, reg.value) except (self._U.UcError, OverflowError, TypeError): continue # a register width this engine build can't accept + if is_x86: + self._seed_x86_segments() + + def _seed_x86_segments(self) -> None: + """Install a synthetic GDT so 32-bit ``fs:``/``gs:`` accesses resolve. + + In 32-bit mode the segment base lives in a descriptor, not a register + Unicorn will honor. For each captured ``fs_base``/``gs_base`` we add a + flat ring-3 data descriptor and load the matching selector, so TEB/PEB- + and TLS-relative reads (e.g. the CRT SEH prologue's ``mov eax, fs:[0]``) + work during emulation. No-op when the slice carried no segment base. + """ + bases: dict[str, int] = {} + for reg in (self.thread.registers if self.thread else []): + n = reg.name.lower() + if n in ("fs_base", "gs_base") and reg.value: + bases[n[:2]] = reg.value + self._seg_bases = bases + if not bases: + return + order = [s for s in ("fs", "gs") if s in bases] + descriptors = [b"\x00" * 8] # mandatory null descriptor + selectors: dict[str, int] = {} + for idx, seg in enumerate(order, start=1): + descriptors.append( + _gdt_descriptor(bases[seg], 0xfffff, access=0xf2, gran=0xc) + ) + selectors[seg] = (idx << 3) | 3 # GDT index, TI=0, RPL=3 + gdt = b"".join(descriptors) + gdt_base = self._find_free_page(len(gdt)) + self.uc.mem_map(gdt_base, _UC_PAGE) + self.uc.mem_write(gdt_base, gdt) + self._mapped = _coalesce(self._mapped + [(gdt_base, gdt_base + _UC_PAGE)]) + xc = getattr(self._U, self._const_mod) + self.uc.reg_write(xc.UC_X86_REG_GDTR, (0, gdt_base, len(gdt) - 1, 0)) + for seg, sel in selectors.items(): + self.uc.reg_write(self._reg_const(seg), sel) def switch_thread(self, thread: "int | EmuThread | None") -> "EmuThread | None": """Re-seed the CPU from another captured thread (by tid or EmuThread). @@ -226,9 +302,13 @@ def read_mem(self, addr: int, size: int) -> bytes: return bytes(self.uc.mem_read(addr, size)) def segment_base(self, seg: str) -> int | None: - """Return the captured ``fs_base`` / ``gs_base`` (x86), or None if the - slice didn't capture it. These anchor TEB/PEB and TLS access.""" - const = self._reg_const(f"{seg.lower()}_base") + """Return the captured ``fs_base`` / ``gs_base``, or None if the slice + didn't capture it. These anchor TEB/PEB and TLS access.""" + seg = seg.lower() + if self.image.arch == ArchType.x86: + # 32-bit base lives in the synthetic GDT, not a readable MSR. + return self._seg_bases.get(seg) + const = self._reg_const(f"{seg}_base") if const is None: return None return self.uc.reg_read(const) diff --git a/tests/test_emu.py b/tests/test_emu.py index 40ee963..fb4f86c 100644 --- a/tests/test_emu.py +++ b/tests/test_emu.py @@ -462,3 +462,74 @@ def test_emulator_peb_address(tmp_path): assert emu.peb_address() == peb # resolved gs:[0x60] +# ---- x86 (32-bit) segment base via synthetic GDT ---- + +def _write_x86_slice(path, code, regs, *, extra_regions=()): + """Write a minimal 32-bit Windows slice: code + stack + thread context, + plus any *extra_regions* as (base, page_bytes) pairs (e.g. a TEB page).""" + page = code + b"\x90" * (PS - len(code)) + cap = ((1 << CapBit.MemoryRegions) | (1 << CapBit.ProcessIdentity) + | (1 << CapBit.ThreadContexts)) + hdr = FileHeader(os_type=OSType.Windows, arch_type=ArchType.x86, + pid=1, cap_bitmap=cap) + with open(path, "wb") as f: + w = MSLWriter(f, hdr, CompAlgo.NONE) + w.write_process_identity(ProcessIdentity(exe_path="C:\\a.exe")) + w.write_memory_region(MemoryRegion( + base_addr=CODE_VA, region_size=PS, protection=0b101, + region_type=RegionType.Image, page_size=PS, + page_states=[PageState.CAPTURED], page_data_chunks=[page])) + w.write_memory_region(MemoryRegion( + base_addr=STACK_VA, region_size=PS, protection=0b011, + region_type=RegionType.Stack, page_size=PS, + page_states=[PageState.CAPTURED], page_data_chunks=[b"\x00" * PS])) + for base, data in extra_regions: + w.write_memory_region(MemoryRegion( + base_addr=base, region_size=PS, protection=0b011, + region_type=RegionType.Stack, page_size=PS, + page_states=[PageState.CAPTURED], page_data_chunks=[data])) + w.write_thread_context(ThreadContext( + thread_id=1, flags=THREAD_FLAG_CURRENT, state=ThreadState.Stopped, + name="main", registers=regs)) + w.finalize() + + +def test_emulator_seeds_x86_fs_base_via_gdt(tmp_path): + pytest.importorskip("unicorn") + pytest.importorskip("capstone") + from memslicer.emu.engine import MSLEmulator + + TEB = 0x60000000 + peb = 0x001a0000 + teb = bytearray(b"\x00" * PS) + teb[0x30:0x34] = peb.to_bytes(4, "little") # PEB pointer at fs:[0x30] + code = bytes.fromhex("64a130000000") # mov eax, fs:[0x30] + p = tmp_path / "x86fs.msl" + _write_x86_slice(p, code, [ + ThreadRegister("eip", CODE_VA.to_bytes(4, "little"), REG_FLAG_PC), + ThreadRegister("esp", (STACK_VA + 0xf00).to_bytes(4, "little"), REG_FLAG_SP), + ThreadRegister("fs_base", TEB.to_bytes(4, "little"), 0), + ], extra_regions=[(TEB, bytes(teb))]) + emu = MSLEmulator(load_slice(str(p))) + assert emu.segment_base("fs") == TEB # GDT-seeded base + assert emu.peb_address() == peb # fs:[0x30] resolved + emu.step() # mov eax, fs:[0x30] + assert emu.read_reg("eax") == peb # segment-relative read worked + + +def test_emulator_x86_without_fs_base_still_runs(tmp_path): + pytest.importorskip("unicorn") + pytest.importorskip("capstone") + from memslicer.emu.engine import MSLEmulator + + p = tmp_path / "x86plain.msl" + _write_x86_slice(p, b"\x90", [ # nop + ThreadRegister("eip", CODE_VA.to_bytes(4, "little"), REG_FLAG_PC), + ThreadRegister("esp", (STACK_VA + 0xf00).to_bytes(4, "little"), REG_FLAG_SP), + ]) + emu = MSLEmulator(load_slice(str(p))) + assert emu.segment_base("fs") is None # no GDT installed + assert emu.peb_address() is None + assert emu.step().ok # no regression + + diff --git a/tests/test_frida_bridge.py b/tests/test_frida_bridge.py index 8ebde04..c392901 100644 --- a/tests/test_frida_bridge.py +++ b/tests/test_frida_bridge.py @@ -185,6 +185,37 @@ def test_enumerate_modules_converts(self): assert modules[1].size == 0x1000 +class TestFridaBridgeEnumerateThreads: + """Tests for FridaBridge.enumerate_threads().""" + + def test_enumerate_threads_surfaces_segment_base(self): + """A segment base injected by the JS agent (gs_base/fs_base, from the + Windows TEB) flows through to a RegisterValue with the arch width.""" + bridge = _make_bridge(target=1234) + _device, api = _setup_device_and_api(bridge) + + api.enumerate_threads.return_value = [ + { + "id": 7, + "state": "stopped", + "context": { + "rip": "0x401000", + "rsp": "0x7fff0000", + "gs_base": "0x7ff7fffdb000", + }, + }, + ] + + threads = bridge.enumerate_threads() + + assert len(threads) == 1 + regs = {r.name: r for r in threads[0].registers} + assert "gs_base" in regs + assert regs["gs_base"].value == 0x7ff7fffdb000 + assert regs["gs_base"].size == 8 # x64 GPR width + assert regs["gs_base"].role == "" # not pc/sp/fp/flags + + class TestFridaBridgeReadMemory: """Tests for FridaBridge.read_memory()."""