Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ This release includes Ghidra PyGhidra support, performance improvements, depende
### New Features

- ghidra: support PyGhidra @mike-hunhoff #2788
- ghidra: support analyzing existing Ghidra projects via .gpr:program input syntax @saniyafatima07 #3066
- vmray: extract number features from whitelisted void_ptr parameters (hKey, hKeyRoot) @adeboyedn #2835

### Breaking Changes
Expand Down
36 changes: 24 additions & 12 deletions capa/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def get_extractor(
should_save_workspace=False,
disable_progress=False,
sample_path: Optional[Path] = None,
ghidra_program_path: Optional[str] = None,
) -> FeatureExtractor:
"""
raises:
Expand Down Expand Up @@ -436,25 +437,35 @@ def get_extractor(

import tempfile

tmpdir = tempfile.TemporaryDirectory()

project_cm = pyghidra.open_project(tmpdir.name, "CapaProject", create=True)
tmpdir = None
if ghidra_program_path:
project_path = input_path
project_cm = pyghidra.open_project(str(project_path.parent), project_path.stem, create=False)
else:
tmpdir = tempfile.TemporaryDirectory()
project_cm = pyghidra.open_project(tmpdir.name, "CapaProject", create=True)
project = project_cm.__enter__()
try:
from ghidra.util.task import TaskMonitor

monitor = TaskMonitor.DUMMY

# Import file
loader = pyghidra.program_loader().project(project).source(str(input_path)).name(input_path.name)
with loader.load() as load_results:
load_results.save(monitor)
if ghidra_program_path:
program_name = (
ghidra_program_path if ghidra_program_path.startswith("/") else f"/{ghidra_program_path}"
)
program, consumer = pyghidra.consume_program(project, program_name)
else:
# Import file
loader = pyghidra.program_loader().project(project).source(str(input_path)).name(input_path.name)
with loader.load() as load_results:
load_results.save(monitor)

# Open program
program, consumer = pyghidra.consume_program(project, "/" + input_path.name)
# Open program
program, consumer = pyghidra.consume_program(project, "/" + input_path.name)

# Analyze
pyghidra.analyze(program, monitor)
# Analyze
pyghidra.analyze(program, monitor)

from ghidra.program.flatapi import FlatProgramAPI

Expand All @@ -479,7 +490,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):

except Exception:
project_cm.__exit__(None, None, None)
tmpdir.cleanup()
if tmpdir:
tmpdir.cleanup()
raise

import capa.features.extractors.ghidra.extractor
Expand Down
97 changes: 81 additions & 16 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import textwrap
import contextlib
from types import TracebackType
from typing import Optional, TypedDict
from typing import Tuple, Optional, TypedDict
from pathlib import Path

import colorama
Expand Down Expand Up @@ -398,6 +398,40 @@ def __init__(self, status_code: int):
self.status_code = status_code


def parse_ghidra_project_path(input_path: Path | str) -> Optional[Tuple[Path, str]]:
"""
Parse Ghidra project syntax: /path/to/project.gpr:folder/program

Detects existing Ghidra project format in a case-insensitive manner.

Returns:
tuple of (project_path: Path, program_path: str) if format is detected
None if not in Ghidra project format

Raises:
ValueError: if format is malformed (for example, empty project or program path)
"""
input_str = str(input_path)

idx = input_str.lower().find(".gpr:")
if idx == -1:
return None

project_path_str = input_str[: idx + 4]
program_path = input_str[idx + 5 :].strip()

if not project_path_str or not program_path:
raise ValueError(
f"Invalid Ghidra project syntax: {input_str}\nExpected format: /path/to/project.gpr:folder/program"
)

project_path = Path(project_path_str)
if project_path.suffix.lower() != ".gpr":
raise ValueError(f"Project path must end with .gpr: {project_path}")

return project_path, program_path


def handle_common_args(args):
"""
handle the global config specified by `install_common_args`,
Expand Down Expand Up @@ -849,7 +883,12 @@ def get_signatures_from_cli(args, input_format: str, backend: str) -> list[Path]
raise ShouldExitError(E_INVALID_SIG) from e


def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtractor:
def get_extractor_from_cli(
args,
input_format: str,
backend: str,
ghidra_program_path: Optional[str] = None,
) -> FeatureExtractor:
"""
args:
args: The parsed command line arguments from `install_common_args`.
Expand All @@ -871,7 +910,7 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr

os_ = get_os_from_cli(args, backend)
sample_path = get_sample_path_from_cli(args, backend)
extractor_filters = get_extractor_filters_from_cli(args, input_format)
extractor_filters = get_extractor_filters_from_cli(args, input_format, backend)

logger.debug("format: %s", input_format)
logger.debug("backend: %s", backend)
Expand All @@ -886,6 +925,7 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
should_save_workspace=should_save_workspace,
disable_progress=args.quiet or args.debug,
sample_path=sample_path,
ghidra_program_path=ghidra_program_path,
)
return apply_extractor_filters(extractor, extractor_filters)
except UnsupportedFormatError as e:
Expand All @@ -909,12 +949,12 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
raise ShouldExitError(E_CORRUPT_FILE) from e


def get_extractor_filters_from_cli(args, input_format) -> FilterConfig:
def get_extractor_filters_from_cli(args, input_format, backend: Optional[str] = None) -> FilterConfig:
if not hasattr(args, "restrict_to_processes") and not hasattr(args, "restrict_to_functions"):
# no processes or function filters were installed in the args
return {}

if input_format in STATIC_FORMATS:
if backend == BACKEND_GHIDRA or input_format in STATIC_FORMATS:
if args.restrict_to_processes:
raise InvalidArgument("Cannot filter processes with static analysis.")
return {"functions": {int(addr, 0) for addr in args.restrict_to_functions}}
Expand Down Expand Up @@ -1001,8 +1041,22 @@ def main(argv: Optional[list[str]] = None):

try:
handle_common_args(args)
ghidra_info = parse_ghidra_project_path(args.input_file)
if ghidra_info:
project_path, program_path = ghidra_info

if args.backend not in (BACKEND_AUTO, BACKEND_GHIDRA):
raise ShouldExitError(E_INVALID_INPUT_FORMAT)

args.input_file = project_path
args.ghidra_program = program_path
args.backend = BACKEND_GHIDRA

ensure_input_exists_from_cli(args)
input_format = get_input_format_from_cli(args)
if ghidra_info:
input_format = FORMAT_AUTO
else:
input_format = get_input_format_from_cli(args)
except ShouldExitError as e:
return e.status_code

Expand All @@ -1025,20 +1079,31 @@ def main(argv: Optional[list[str]] = None):
rules: RuleSet = get_rules_from_cli(args)

found_limitation = False
file_extractors = get_file_extractors_from_cli(args, input_format)
if input_format in STATIC_FORMATS:
# only static extractors have file limitations
found_limitation = find_static_limitations_from_cli(args, rules, file_extractors)
if input_format in DYNAMIC_FORMATS:
found_limitation = find_dynamic_limitations_from_cli(args, rules, file_extractors)
if ghidra_info:
file_extractors = []
else:
file_extractors = get_file_extractors_from_cli(args, input_format)
if input_format in STATIC_FORMATS:
# only static extractors have file limitations
found_limitation = find_static_limitations_from_cli(args, rules, file_extractors)
if input_format in DYNAMIC_FORMATS:
found_limitation = find_dynamic_limitations_from_cli(args, rules, file_extractors)

backend = get_backend_from_cli(args, input_format)
sample_path = get_sample_path_from_cli(args, backend)
if sample_path is None:
if ghidra_info:
os_ = "unknown"
else:
os_ = capa.loader.get_os(sample_path)
extractor: FeatureExtractor = get_extractor_from_cli(args, input_format, backend)
sample_path = get_sample_path_from_cli(args, backend)
if sample_path is None:
os_ = "unknown"
else:
os_ = capa.loader.get_os(sample_path)
extractor: FeatureExtractor = get_extractor_from_cli(
args,
input_format,
backend,
ghidra_program_path=getattr(args, "ghidra_program", None),
)
except ShouldExitError as e:
return e.status_code

Expand Down
25 changes: 25 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import textwrap
from pathlib import Path

import pytest
import fixtures

import capa.main
Expand Down Expand Up @@ -352,3 +353,27 @@ def test_main_cape_gzip():
/ "./data/dynamic/cape/v2.2/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz"
)
assert capa.main.main([path]) == 0


def test_parse_ghidra_project_path():
assert capa.main.parse_ghidra_project_path("/tmp/project.gpr:folder/program") == (
Path("/tmp/project.gpr"),
"folder/program",
)
assert capa.main.parse_ghidra_project_path("/tmp/project.GPR:folder/program") == (
Path("/tmp/project.GPR"),
"folder/program",
)
assert capa.main.parse_ghidra_project_path("/tmp/project.exe") is None


def test_parse_ghidra_project_path_invalid():
with pytest.raises(ValueError):
capa.main.parse_ghidra_project_path("/tmp/project.gpr:")


def test_main_ghidra_project_path_invalid_backend(tmp_path):
project = tmp_path / "project.gpr"
project.write_text("")

assert capa.main.main([f"{project}:folder/program", "-b", "vivisect"]) == capa.main.E_INVALID_INPUT_FORMAT
Loading