diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..47fabb8 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +testpaths = tests +asyncio_mode = auto +markers = + asyncio: mark a test as an asyncio coroutine diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e2d893f --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,421 @@ +"""Shared fixtures for the response plugin test suite.""" +import sys +import os +import types +import importlib +import importlib.util +import pytest + +# Root of the cloned repo +REPO_ROOT = '/tmp/response-pytest' + + +# --------------------------------------------------------------------------- +# Lightweight stubs for Caldera framework objects that are not available in an +# isolated test environment. We only model the attributes / methods that the +# response plugin actually uses so the tests stay focused on *plugin* logic. +# --------------------------------------------------------------------------- + +class FakeFact: + """Minimal stand-in for app.objects.secondclass.c_fact.Fact.""" + def __init__(self, trait=None, value=None): + self.trait = trait + self.value = value + + def __eq__(self, other): + if not isinstance(other, FakeFact): + return NotImplemented + return self.trait == other.trait and self.value == other.value + + def __repr__(self): + return f"FakeFact(trait={self.trait!r}, value={self.value!r})" + + +class FakeRelationship: + """Minimal stand-in for app.objects.secondclass.c_relationship.Relationship.""" + def __init__(self, source=None, edge=None, target=None): + self.source = source + self.edge = edge + self.target = target + + def __repr__(self): + return (f"FakeRelationship(source={self.source!r}, " + f"edge={self.edge!r}, target={self.target!r})") + + +class FakeMapper: + """Represents a parser mapper configuration.""" + def __init__(self, source, edge, target): + self.source = source + self.edge = edge + self.target = target + + +class FakeLink: + """Minimal stand-in for a Link object.""" + def __init__(self, id=None, host=None, used=None, facts=None, + relationships=None, finish=None, pin=None, pid=0, + status=0, operation=None, executor=None): + self.id = id or 'link-1' + self.host = host or 'testhost' + self.used = used or [] + self.facts = facts if facts is not None else [] + self.relationships = relationships if relationships is not None else [] + self.finish = finish + self.pin = pin + self.pid = pid + self.status = status + self.operation = operation + self.executor = executor + + def can_ignore(self): + return False + + def apply_id(self, host): + self.id = f'{self.id}-{host}' + + +class FakeOperation: + """Minimal stand-in for an Operation.""" + def __init__(self, relationships=None, source=None, chain=None): + self._relationships = relationships or [] + self.source = source + self.chain = chain or [] + + async def all_relationships(self): + return self._relationships + + async def is_finished(self): + return False + + +class FakeAgent: + """Minimal stand-in for an Agent.""" + def __init__(self, paw='abc', host='testhost', pid=1234, trusted=True, access=None): + self.paw = paw + self.host = host + self.pid = pid + self.trusted = trusted + self.access = access + + +class FakeSource: + """Minimal stand-in for a Source.""" + def __init__(self, id=None, name=None, facts=None): + self.id = id + self.name = name + self.facts = facts or [] + + +class FakeBaseParser: + """Stub for app.utility.base_parser.BaseParser. + + Provides the helper methods that concrete parsers rely on (line, + set_value, load_json) without pulling in the full Caldera framework. + """ + def __init__(self, mappers=None, used_facts=None): + self.mappers = mappers or [] + self.used_facts = used_facts or [] + + @staticmethod + def line(blob): + return blob.splitlines() + + @staticmethod + def set_value(mapper_field, match, _facts): + return match + + @staticmethod + def load_json(blob): + import json + return json.loads(blob) + + +# --------------------------------------------------------------------------- +# Monkey-patch framework modules so that ``import app.…`` / ``import plugins.…`` +# resolve to our fakes. +# --------------------------------------------------------------------------- + +def _ensure_module(dotted_name, attrs=None): + """Create a stub module at *dotted_name* (and all parents) if absent.""" + parts = dotted_name.split('.') + for i in range(len(parts)): + partial = '.'.join(parts[:i + 1]) + if partial not in sys.modules: + sys.modules[partial] = types.ModuleType(partial) + mod = sys.modules[dotted_name] + for k, v in (attrs or {}).items(): + setattr(mod, k, v) + return mod + + +def _install_stubs(): + # app.objects.secondclass.c_fact + _ensure_module('app', {'__path__': [os.path.join(REPO_ROOT, 'app')]}) + _ensure_module('app.objects') + _ensure_module('app.objects.secondclass') + _ensure_module('app.objects.secondclass.c_fact', {'Fact': FakeFact}) + _ensure_module('app.objects.secondclass.c_relationship', {'Relationship': FakeRelationship}) + class FakeResult: + def __init__(self, id=None, output=None, pid=None, status=None): + self.id = id + self.output = output + self.pid = pid + self.status = status + + _ensure_module('app.objects.secondclass.c_result', {'Result': FakeResult}) + _ensure_module('app.objects.secondclass.c_link', {'LinkSchema': type('LinkSchema', (), {})}) + + class FakeOperationCls: + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + self.chain = getattr(self, 'chain', []) + self.id = getattr(self, 'id', 'op-id') + self.objective = None + def add_link(self, link): + self.chain.append(link) + def set_start_details(self): + pass + async def is_finished(self): + return False + + _ensure_module('app.objects.c_operation', {'Operation': FakeOperationCls}) + _ensure_module('app.objects.c_source', {'Source': FakeSource}) + _ensure_module('app.objects.interfaces') + _ensure_module('app.objects.interfaces.i_object', + {'FirstClassObjectInterface': type('FirstClassObjectInterface', (), {})}) + _ensure_module('app.utility') + + # BaseObject stub + class _BaseObject: + def __init__(self): + pass + def retrieve(self, collection, unique): + for item in collection: + if getattr(item, 'unique', None) == unique: + return item + return None + + _ensure_module('app.utility.base_object', {'BaseObject': _BaseObject}) + + # BaseService stub used by response_svc + class _Access: + RED = 'red' + BLUE = 'blue' + HIDDEN = 'hidden' + + class _BaseService(_BaseObject): + Access = _Access + services = {} + + @classmethod + def add_service(cls, name, svc): + cls.services[name] = svc + import logging + return logging.getLogger(name) + + @classmethod + def get_service(cls, name): + return cls.services.get(name) + + @staticmethod + def decode_bytes(b): + if isinstance(b, bytes): + return b.decode() + return b + + @staticmethod + def encode_string(s): + if isinstance(s, str): + return s.encode() + return s + + @classmethod + def get_config(cls, name=None, prop=None): + return None + + @classmethod + def set_config(cls, name=None, prop=None, value=None): + pass + + _ensure_module('app.utility.base_service', {'BaseService': _BaseService}) + + # BaseWorld stub used by hook.py + class _BaseWorld(_BaseService): + @staticmethod + def strip_yml(path): + return [{}] + + @staticmethod + def apply_config(name, cfg): + pass + + _ensure_module('app.utility.base_world', {'BaseWorld': _BaseWorld}) + + # BaseParser stub + _ensure_module('app.utility.base_parser', {'BaseParser': FakeBaseParser}) + + # marshmallow — try real one first + try: + import marshmallow # noqa: F401 + except ImportError: + ma = _ensure_module('marshmallow') + ma.Schema = type('Schema', (), {}) + ma.fields = types.ModuleType('marshmallow.fields') + sys.modules['marshmallow.fields'] = ma.fields + for fld in ('Integer', 'String', 'List', 'Dict', 'Nested'): + setattr(ma.fields, fld, lambda *a, **kw: None) + ma.post_load = lambda **kw: (lambda fn: fn) + + # plugins.response paths + _ensure_module('plugins') + _ensure_module('plugins.response') + _ensure_module('plugins.response.app') + _ensure_module('plugins.response.app.requirements') + + # aiohttp / jinja stubs + try: + import aiohttp # noqa: F401 + except ImportError: + aio = _ensure_module('aiohttp') + aio.web = types.ModuleType('aiohttp.web') + sys.modules['aiohttp.web'] = aio.web + aio.web.json_response = lambda x: x + + try: + import aiohttp_jinja2 # noqa: F401 + except ImportError: + aj = _ensure_module('aiohttp_jinja2') + aj.template = lambda name: (lambda fn: fn) + + +_install_stubs() + +# --------------------------------------------------------------------------- +# Load real plugin modules from file paths using importlib +# --------------------------------------------------------------------------- + +def _load_from_file(module_name, file_path): + """Load a module from an absolute file path and register it in sys.modules.""" + spec = importlib.util.spec_from_file_location(module_name, file_path) + mod = importlib.util.module_from_spec(spec) + sys.modules[module_name] = mod + spec.loader.exec_module(mod) + return mod + + +# Sub-packages need __path__ set so child imports work +def _ensure_package(dotted_name, dir_path): + """Register a directory as a Python package in sys.modules.""" + mod = sys.modules.get(dotted_name) + if mod is None: + mod = types.ModuleType(dotted_name) + sys.modules[dotted_name] = mod + mod.__path__ = [dir_path] + mod.__package__ = dotted_name + return mod + + +# Make app and its sub-packages real packages that point to the repo +_app_mod = sys.modules['app'] +_app_mod.__path__ = [os.path.join(REPO_ROOT, 'app')] +_app_mod.__package__ = 'app' + +_ensure_package('app.parsers', os.path.join(REPO_ROOT, 'app', 'parsers')) +_ensure_package('app.requirements', os.path.join(REPO_ROOT, 'app', 'requirements')) + +# Load base_requirement into the plugins path too +_load_from_file( + 'plugins.response.app.requirements.base_requirement', + os.path.join(REPO_ROOT, 'app', 'requirements', 'base_requirement.py'), +) + +# Load the actual parsers +for _parser_name in ('basic_strip', 'childprocess', 'ecs_sysmon', 'key_value', + 'ports', 'processguids', 'process', 'sysmon'): + _load_from_file( + f'app.parsers.{_parser_name}', + os.path.join(REPO_ROOT, 'app', 'parsers', f'{_parser_name}.py'), + ) + +# Load requirements +for _req_name in ('base_requirement', 'basic', 'has_property', 'source_fact'): + _mod_name = f'app.requirements.{_req_name}' + _plugin_mod_name = f'plugins.response.app.requirements.{_req_name}' + _fpath = os.path.join(REPO_ROOT, 'app', 'requirements', f'{_req_name}.py') + _load_from_file(_mod_name, _fpath) + # Also register under plugin path so intra-plugin imports work + sys.modules[_plugin_mod_name] = sys.modules[_mod_name] + +# Load ProcessNode and ProcessTree +_load_from_file( + 'plugins.response.app.c_processnode', + os.path.join(REPO_ROOT, 'app', 'c_processnode.py'), +) +sys.modules['app.c_processnode'] = sys.modules['plugins.response.app.c_processnode'] + +_load_from_file( + 'plugins.response.app.c_processtree', + os.path.join(REPO_ROOT, 'app', 'c_processtree.py'), +) +sys.modules['app.c_processtree'] = sys.modules['plugins.response.app.c_processtree'] + +# Load response_svc (needs plugins.response.app.c_processtree) +_load_from_file( + 'app.response_svc', + os.path.join(REPO_ROOT, 'app', 'response_svc.py'), +) +sys.modules['plugins.response.app.response_svc'] = sys.modules['app.response_svc'] + +# Load hook +_load_from_file('hook', os.path.join(REPO_ROOT, 'hook.py')) + + +# --------------------------------------------------------------------------- +# Reusable pytest fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def fake_fact(): + return FakeFact + + +@pytest.fixture +def fake_relationship(): + return FakeRelationship + + +@pytest.fixture +def fake_mapper(): + return FakeMapper + + +@pytest.fixture +def fake_link(): + return FakeLink + + +@pytest.fixture +def fake_operation(): + return FakeOperation + + +@pytest.fixture +def fake_agent(): + return FakeAgent + + +@pytest.fixture +def fake_source(): + return FakeSource + + +@pytest.fixture +def make_parser(): + """Factory fixture: returns a parser instance with mappers / used_facts.""" + def _make(parser_cls, mappers=None, used_facts=None): + p = parser_cls.__new__(parser_cls) + FakeBaseParser.__init__(p, mappers=mappers or [], used_facts=used_facts or []) + return p + return _make diff --git a/tests/test_hook.py b/tests/test_hook.py new file mode 100644 index 0000000..3c75ea8 --- /dev/null +++ b/tests/test_hook.py @@ -0,0 +1,89 @@ +"""Tests for hook.py — plugin metadata and enable/expansion functions.""" +import sys +import types +import pytest +from unittest.mock import MagicMock, AsyncMock, patch + +sys.path.insert(0, '/tmp/response-pytest') + + +class TestHookMetadata: + + def test_name(self): + from hook import name + assert name == 'Response' + + def test_description(self): + from hook import description + assert description == 'An automated incident response plugin' + + def test_address(self): + from hook import address + assert address == '/plugin/responder/gui' + + def test_access_is_blue(self): + from hook import access + from app.utility.base_world import BaseWorld + assert access == BaseWorld.Access.BLUE + + +class TestRegisterAgent: + + def test_register_agent_adds_to_deployments(self): + from app.utility.base_world import BaseWorld + BaseWorld.get_config = MagicMock(return_value=set()) + BaseWorld.set_config = MagicMock() + from hook import _register_agent + _register_agent('test-ability-id') + BaseWorld.set_config.assert_called_once() + call_kwargs = BaseWorld.set_config.call_args + assert 'test-ability-id' in call_kwargs[1]['value'] or 'test-ability-id' in call_kwargs[0][0] if call_kwargs[0] else True + + +class TestEnable: + + @pytest.mark.asyncio + async def test_enable_registers_routes(self): + from app.utility.base_world import BaseWorld + BaseWorld.apply_config = MagicMock() + BaseWorld.strip_yml = MagicMock(return_value=[{}]) + BaseWorld.get_config = MagicMock(return_value=False) + + mock_app = MagicMock() + mock_router = MagicMock() + mock_app.router = mock_router + + mock_data_svc = AsyncMock() + mock_app_svc = MagicMock() + mock_app_svc.application = mock_app + mock_event_svc = MagicMock() + + services = { + 'data_svc': mock_data_svc, + 'rest_svc': MagicMock(), + 'app_svc': mock_app_svc, + 'event_svc': mock_event_svc, + } + services_mock = MagicMock() + services_mock.get = services.get + + # Patch _register_agent to avoid side effects + with patch('hook._register_agent'): + from hook import enable + await enable(services_mock) + + # 4 routes should be added + assert mock_router.add_route.call_count == 4 + + +class TestExpansion: + + @pytest.mark.asyncio + async def test_expansion_calls_apply_adversary_config(self): + mock_response_svc = AsyncMock() + services = MagicMock() + services.get = MagicMock(return_value=mock_response_svc) + + from hook import expansion + await expansion(services) + mock_response_svc.apply_adversary_config.assert_awaited_once() diff --git a/tests/test_parsers/__init__.py b/tests/test_parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_parsers/test_basic_strip.py b/tests/test_parsers/test_basic_strip.py new file mode 100644 index 0000000..0762498 --- /dev/null +++ b/tests/test_parsers/test_basic_strip.py @@ -0,0 +1,58 @@ +"""Tests for app.parsers.basic_strip.""" +import sys +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper, FakeFact +from app.parsers.basic_strip import Parser + + +class TestBasicStripParser: + + def _parser(self, mappers=None, used_facts=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = used_facts or [] + return p + + def test_single_line_single_mapper(self): + mp = FakeMapper(source='src.trait', edge='edge1', target='tgt.trait') + p = self._parser(mappers=[mp]) + result = p.parse(' hello ') + assert len(result) == 1 + assert result[0].source.trait == 'src.trait' + assert result[0].source.value == 'hello' + assert result[0].edge == 'edge1' + assert result[0].target.trait == 'tgt.trait' + assert result[0].target.value == 'hello' + + def test_multi_line(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + result = p.parse('line1\nline2\nline3') + assert len(result) == 3 + + def test_strips_whitespace(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + result = p.parse(' value ') + assert result[0].source.value == 'value' + + def test_multiple_mappers(self): + mp1 = FakeMapper(source='s1', edge='e1', target='t1') + mp2 = FakeMapper(source='s2', edge='e2', target='t2') + p = self._parser(mappers=[mp1, mp2]) + result = p.parse('val') + assert len(result) == 2 + assert result[0].source.trait == 's1' + assert result[1].source.trait == 's2' + + def test_empty_blob(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + result = p.parse('') + assert result == [] + + def test_no_mappers(self): + p = self._parser(mappers=[]) + result = p.parse('something') + assert result == [] diff --git a/tests/test_parsers/test_childprocess.py b/tests/test_parsers/test_childprocess.py new file mode 100644 index 0000000..d491603 --- /dev/null +++ b/tests/test_parsers/test_childprocess.py @@ -0,0 +1,65 @@ +"""Tests for app.parsers.childprocess.""" +import sys +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper, FakeFact +from app.parsers.childprocess import Parser + + +class TestChildProcessParser: + + def _parser(self, mappers=None, used_facts=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = used_facts or [] + return p + + def test_parse_childid_regex(self): + blob = 'ProcessId: 1234\nSomething else\nProcessId: 5678' + result = Parser.parse_childid(blob) + assert result == ['1234', '5678'] + + def test_parse_childid_case_insensitive(self): + blob = 'processid: 42' + result = Parser.parse_childid(blob) + assert result == ['42'] + + def test_parse_childid_no_match(self): + result = Parser.parse_childid('nothing here') + assert result == [] + + def test_parse_single_match(self): + mp = FakeMapper(source='host.process.guid', edge='has_childprocess_id', target='host.process.id') + fact = FakeFact(trait='host.process.guid', value='guid-abc') + p = self._parser(mappers=[mp], used_facts=[fact]) + blob = 'ProcessId: 999' + result = p.parse(blob) + assert len(result) == 1 + assert result[0].source.value == 'guid-abc' + assert result[0].target.value == '999' + + def test_parse_multiple_matches(self): + mp = FakeMapper(source='host.process.guid', edge='has_childprocess_id', target='host.process.id') + fact = FakeFact(trait='host.process.guid', value='guid-abc') + p = self._parser(mappers=[mp], used_facts=[fact]) + blob = 'ProcessId: 100\nProcessId: 200' + result = p.parse(blob) + assert len(result) == 2 + assert result[0].target.value == '100' + assert result[1].target.value == '200' + + def test_parse_no_match_returns_empty(self): + mp = FakeMapper(source='host.process.guid', edge='e', target='host.process.id') + fact = FakeFact(trait='host.process.guid', value='g') + p = self._parser(mappers=[mp], used_facts=[fact]) + result = p.parse('no match') + assert result == [] + + def test_parse_appends_to_used_facts(self): + mp = FakeMapper(source='host.process.guid', edge='e', target='host.process.id') + fact = FakeFact(trait='host.process.guid', value='g') + facts_list = [fact] + p = self._parser(mappers=[mp], used_facts=facts_list) + p.parse('ProcessId: 10') + # The parser appends r.target to all_facts (which is self.used_facts) + assert len(facts_list) == 2 diff --git a/tests/test_parsers/test_ecs_sysmon.py b/tests/test_parsers/test_ecs_sysmon.py new file mode 100644 index 0000000..0cfec85 --- /dev/null +++ b/tests/test_parsers/test_ecs_sysmon.py @@ -0,0 +1,166 @@ +"""Tests for app.parsers.ecs_sysmon.""" +import sys +import json +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper, FakeFact +from app.parsers.ecs_sysmon import Parser + + +def _event(overrides=None): + """Build a minimal ECS-style sysmon event dict.""" + base = { + '_id': 'es-id-1', + '_source': { + 'process': { + 'entity_id': '{guid-123}', + 'pid': 42, + 'name': 'cmd.exe', + 'parent': { + 'entity_id': '{parent-guid-456}', + }, + }, + 'winlog': { + 'event_id': 1, + 'record_id': 999, + }, + 'user': { + 'domain': 'CORP', + 'name': 'admin', + }, + }, + } + if overrides: + base.update(overrides) + return base + + +class TestEcsSysmonParser: + + def _parser(self, mappers=None, used_facts=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = used_facts or [] + return p + + # ── Individual parse methods ───────────────────────────────────────── + + def test_parse_process_guid(self): + event = _event() + assert Parser.parse_process_guid(event) == 'guid-123' + + def test_parse_parent_process_guid(self): + event = _event() + assert Parser.parse_parent_process_guid(event) == 'parent-guid-456' + + def test_parse_eventid(self): + assert Parser.parse_eventid(_event()) == 1 + + def test_parse_recordid(self): + assert Parser.parse_recordid(_event()) == 999 + + def test_parse_user(self): + assert Parser.parse_user(_event()) == 'CORP\\admin' + + def test_parse_pid(self): + assert Parser.parse_pid(_event()) == 42 + + def test_parse_pid_missing(self): + event = {'_source': {}} + assert Parser.parse_pid(event) is None + + def test_parse_process_name(self): + assert Parser.parse_process_name(_event()) == 'cmd.exe' + + # ── flatten_dict ───────────────────────────────────────────────────── + + def test_flatten_dict_simple(self): + result = Parser.flatten_dict({'a': 1, 'b': 2}) + assert result == {'a': 1, 'b': 2} + + def test_flatten_dict_nested(self): + result = Parser.flatten_dict({'a': {'b': {'c': 3}}}) + assert result == {'a.b.c': 3} + + def test_flatten_dict_empty(self): + assert Parser.flatten_dict({}) == {} + + def test_flatten_dict_mixed(self): + result = Parser.flatten_dict({'x': 1, 'y': {'z': 2}}) + assert result == {'x': 1, 'y.z': 2} + + # ── _sanitize_fact_traits ──────────────────────────────────────────── + + def test_sanitize_removes_special_chars(self): + assert Parser._sanitize_fact_traits('@key[0]') == 'key0' + + def test_sanitize_removes_slashes(self): + assert Parser._sanitize_fact_traits('a/b\\c') == 'abc' + + def test_sanitize_removes_quotes(self): + # The source contains two ASCII double-quote entries in special_chars + assert Parser._sanitize_fact_traits('"test"') == 'test' + + def test_sanitize_removes_at_and_brackets(self): + assert Parser._sanitize_fact_traits('@test[0]') == 'test0' + + def test_sanitize_no_special(self): + assert Parser._sanitize_fact_traits('clean.trait') == 'clean.trait' + + # ── parse_elasticsearch_results ────────────────────────────────────── + + def test_parse_elasticsearch_results(self): + event = _event() + rels = Parser.parse_elasticsearch_results(event) + # Should have relationships for flattened source keys + a pid relationship + assert len(rels) > 0 + # Check that elasticsearch.result.id is used as source + es_rels = [r for r in rels if r.source.trait == 'elasticsearch.result.id'] + assert len(es_rels) > 0 + assert es_rels[0].source.value == 'es-id-1' + # Check pid relationship + pid_rels = [r for r in rels if r.source.trait == 'host.process.id'] + assert len(pid_rels) == 1 + assert pid_rels[0].source.value == 42 + + def test_parse_elasticsearch_results_no_pid(self): + event = {'_id': 'x', '_source': {'key': 'val'}} + rels = Parser.parse_elasticsearch_results(event) + pid_rels = [r for r in rels if r.source.trait == 'host.process.id'] + assert len(pid_rels) == 0 + + # ── parse (main method) ────────────────────────────────────────────── + + def test_parse_dict_event_with_mapper(self): + mp = FakeMapper(source='host.process.guid', edge='has_eventid', target='sysmon.eventid') + p = self._parser(mappers=[mp]) + event = _event() + blob = json.dumps(event) + result = p.parse(blob) + # Should have mapper relationships + elasticsearch relationships + assert len(result) > 0 + + def test_parse_array_returns_empty(self): + """Arrays should not be parsed (they are handled by pseudo-links).""" + mp = FakeMapper(source='s', edge='e', target='sysmon.eventid') + p = self._parser(mappers=[mp]) + blob = json.dumps([_event(), _event()]) + result = p.parse(blob) + assert result == [] + + def test_parse_options_keys(self): + p = self._parser() + opts = p.parse_options + expected = {'eventid', 'recordid', 'user', 'guid', 'pid', 'name', 'parent_guid'} + assert set(opts.keys()) == expected + + def test_parse_with_bad_mapper_logs_debug(self): + """A mapper targeting a non-existent parse option should not crash.""" + mp = FakeMapper(source='s', edge='e', target='sysmon.nonexistent') + p = self._parser(mappers=[mp]) + event = _event() + blob = json.dumps(event) + # Should not raise + result = p.parse(blob) + # Still gets elasticsearch relationships + assert any(r.source.trait == 'elasticsearch.result.id' for r in result) diff --git a/tests/test_parsers/test_key_value.py b/tests/test_parsers/test_key_value.py new file mode 100644 index 0000000..5dfbaf0 --- /dev/null +++ b/tests/test_parsers/test_key_value.py @@ -0,0 +1,53 @@ +"""Tests for app.parsers.key_value.""" +import sys +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper +from app.parsers.key_value import Parser + + +class TestKeyValueParser: + + def _parser(self, mappers=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = [] + return p + + def test_single_pair(self): + mp = FakeMapper(source='file.path', edge='has_hash', target='file.hash') + p = self._parser(mappers=[mp]) + result = p.parse('/tmp/a.txt > abc123') + assert len(result) == 1 + assert result[0].source.value == '/tmp/a.txt' + assert result[0].target.value == 'abc123' + assert result[0].edge == 'has_hash' + + def test_multi_line(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + blob = 'key1 > val1\nkey2 > val2' + result = p.parse(blob) + assert len(result) == 2 + assert result[0].source.value == 'key1' + assert result[1].target.value == 'val2' + + def test_strips_whitespace(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + result = p.parse(' k > v ') + assert result[0].source.value == 'k' + assert result[0].target.value == 'v' + + def test_empty_blob(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + result = p.parse('') + assert result == [] + + def test_multiple_mappers(self): + mp1 = FakeMapper(source='s1', edge='e1', target='t1') + mp2 = FakeMapper(source='s2', edge='e2', target='t2') + p = self._parser(mappers=[mp1, mp2]) + result = p.parse('a > b') + assert len(result) == 2 diff --git a/tests/test_parsers/test_ports.py b/tests/test_parsers/test_ports.py new file mode 100644 index 0000000..51132e3 --- /dev/null +++ b/tests/test_parsers/test_ports.py @@ -0,0 +1,49 @@ +"""Tests for app.parsers.ports.""" +import sys +import json +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper +from app.parsers.ports import Parser + + +class TestPortsParser: + + def _parser(self, mappers=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = [] + return p + + def test_single_entry(self): + mp = FakeMapper(source='host.process.id', edge='has_port', target='host.port') + p = self._parser(mappers=[mp]) + blob = json.dumps([{'pid': '1234', 'port': '8080'}]) + result = p.parse(blob) + assert len(result) == 1 + assert result[0].source.value == '1234' + assert result[0].target.value == '8080' + + def test_multiple_entries(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + blob = json.dumps([ + {'pid': '1', 'port': '80'}, + {'pid': '2', 'port': '443'}, + ]) + result = p.parse(blob) + assert len(result) == 2 + + def test_empty_array(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + result = p.parse('[]') + assert result == [] + + def test_multiple_mappers(self): + mp1 = FakeMapper(source='s1', edge='e1', target='t1') + mp2 = FakeMapper(source='s2', edge='e2', target='t2') + p = self._parser(mappers=[mp1, mp2]) + blob = json.dumps([{'pid': '1', 'port': '80'}]) + result = p.parse(blob) + assert len(result) == 2 diff --git a/tests/test_parsers/test_process.py b/tests/test_parsers/test_process.py new file mode 100644 index 0000000..ad80229 --- /dev/null +++ b/tests/test_parsers/test_process.py @@ -0,0 +1,48 @@ +"""Tests for app.parsers.process.""" +import sys +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper +from app.parsers.process import Parser + + +class TestProcessParser: + + def _parser(self, mappers=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = [] + return p + + def test_single_line(self): + mp = FakeMapper(source='host.process.id', edge='has', target='host.process.name') + p = self._parser(mappers=[mp]) + result = p.parse('1234 cmd.exe') + assert len(result) == 1 + assert result[0].source.value == '1234 cmd.exe' + + def test_multi_line(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + result = p.parse('line1\nline2\nline3') + assert len(result) == 3 + + def test_empty_blob(self): + mp = FakeMapper(source='s', edge='e', target='t') + p = self._parser(mappers=[mp]) + # Empty string -> line() returns [''] but the parser processes it + result = p.parse('') + # splitlines on '' returns [] so no matches + assert len(result) == 0 + + def test_no_mappers(self): + p = self._parser(mappers=[]) + result = p.parse('something') + assert result == [] + + def test_multiple_mappers(self): + mp1 = FakeMapper(source='s1', edge='e1', target='t1') + mp2 = FakeMapper(source='s2', edge='e2', target='t2') + p = self._parser(mappers=[mp1, mp2]) + result = p.parse('val') + assert len(result) == 2 diff --git a/tests/test_parsers/test_processguids.py b/tests/test_parsers/test_processguids.py new file mode 100644 index 0000000..056b7cd --- /dev/null +++ b/tests/test_parsers/test_processguids.py @@ -0,0 +1,105 @@ +"""Tests for app.parsers.processguids.""" +import sys +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper, FakeFact +from app.parsers.processguids import Parser + + +class TestProcessGuidsParser: + + def _parser(self, mappers=None, used_facts=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = used_facts or [] + return p + + # ── Static regex methods ───────────────────────────────────────────── + + def test_parse_id(self): + blob = 'ProcessId: 1234' + assert Parser.parse_id(blob) == ['1234'] + + def test_parse_id_case_insensitive(self): + assert Parser.parse_id('processid: 99') == ['99'] + + def test_parse_id_multiple(self): + blob = 'ProcessId: 1\nProcessId: 2' + assert Parser.parse_id(blob) == ['1', '2'] + + def test_parse_id_no_match(self): + assert Parser.parse_id('nothing') == [] + + def test_parse_guid(self): + blob = 'ProcessGuid: {abc-def-123}' + assert Parser.parse_guid(blob) == ['abc-def-123'] + + def test_parse_guid_case_insensitive(self): + assert Parser.parse_guid('processguid: {XYZ}') == ['XYZ'] + + def test_parse_guid_no_match(self): + assert Parser.parse_guid('no guid') == [] + + def test_parse_parentid(self): + blob = 'ParentProcessId: 5678' + assert Parser.parse_parentid(blob) == ['5678'] + + def test_parse_parentid_case_insensitive(self): + assert Parser.parse_parentid('parentprocessid: 77') == ['77'] + + def test_parse_parentguid(self): + blob = 'ParentProcessGuid: {parent-guid}' + assert Parser.parse_parentguid(blob) == ['parent-guid'] + + def test_parse_parentguid_case_insensitive(self): + assert Parser.parse_parentguid('parentprocessguid: {PG}') == ['PG'] + + # ── parse_options ──────────────────────────────────────────────────── + + def test_parse_options_keys(self): + p = self._parser() + assert set(p.parse_options.keys()) == {'id', 'guid', 'parentid', 'parentguid'} + + # ── parse ──────────────────────────────────────────────────────────── + + def test_parse_with_id_mapper(self): + mp = FakeMapper(source='host.process.guid', edge='has_childprocess_id', target='host.process.id') + fact = FakeFact(trait='host.process.guid', value='my-guid') + p = self._parser(mappers=[mp], used_facts=[fact]) + blob = 'ProcessId: 42' + result = p.parse(blob) + assert len(result) == 1 + assert result[0].source.value == 'my-guid' + assert result[0].target.value == '42' + + def test_parse_with_guid_mapper(self): + mp = FakeMapper(source='host.process.id', edge='has_guid', target='host.process.guid') + fact = FakeFact(trait='host.process.id', value='100') + p = self._parser(mappers=[mp], used_facts=[fact]) + blob = 'ProcessGuid: {guid-abc}' + result = p.parse(blob) + assert len(result) == 1 + assert result[0].target.value == 'guid-abc' + + def test_parse_no_match(self): + mp = FakeMapper(source='s', edge='e', target='host.process.id') + fact = FakeFact(trait='s', value='v') + p = self._parser(mappers=[mp], used_facts=[fact]) + result = p.parse('nothing') + assert result == [] + + def test_parse_appends_target_to_facts(self): + mp = FakeMapper(source='host.process.guid', edge='e', target='host.process.id') + fact = FakeFact(trait='host.process.guid', value='g') + facts_list = [fact] + p = self._parser(mappers=[mp], used_facts=facts_list) + p.parse('ProcessId: 10') + assert len(facts_list) == 2 + + def test_parse_multiple_matches(self): + mp = FakeMapper(source='host.process.guid', edge='e', target='host.process.id') + fact = FakeFact(trait='host.process.guid', value='g') + p = self._parser(mappers=[mp], used_facts=[fact]) + blob = 'ProcessId: 10\nProcessId: 20' + result = p.parse(blob) + assert len(result) == 2 diff --git a/tests/test_parsers/test_sysmon.py b/tests/test_parsers/test_sysmon.py new file mode 100644 index 0000000..1e2a2f5 --- /dev/null +++ b/tests/test_parsers/test_sysmon.py @@ -0,0 +1,113 @@ +"""Tests for app.parsers.sysmon.""" +import sys +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeMapper, FakeFact +from app.parsers.sysmon import Parser + + +class TestSysmonParser: + + def _parser(self, mappers=None, used_facts=None): + p = Parser.__new__(Parser) + p.mappers = mappers or [] + p.used_facts = used_facts or [] + return p + + # ── Static regex methods ───────────────────────────────────────────── + + def test_parse_eventid(self): + event = 'Id : 1\nOther: stuff' + m = Parser.parse_eventid(event) + assert m is not None + assert m.group(1) == '1' + + def test_parse_eventid_no_match(self): + assert Parser.parse_eventid('nothing') is None + + def test_parse_recordid(self): + event = 'RecordId : 42' + m = Parser.parse_recordid(event) + assert m is not None + assert m.group(1) == '42' + + def test_parse_recordid_no_match(self): + assert Parser.parse_recordid('nothing') is None + + def test_parse_user(self): + event = 'User: CORP\\admin' + m = Parser.parse_user(event) + assert m is not None + assert m.group(1) == 'CORP\\admin' + + def test_parse_user_no_match(self): + assert Parser.parse_user('nothing') is None + + # ── parse_options ──────────────────────────────────────────────────── + + def test_parse_options_keys(self): + p = self._parser() + assert set(p.parse_options.keys()) == {'eventid', 'recordid', 'user'} + + # ── parse ──────────────────────────────────────────────────────────── + + def test_parse_single_event(self): + mp = FakeMapper(source='host.process.guid', edge='has_eventid', target='sysmon.eventid') + fact = FakeFact(trait='host.process.guid', value='guid-1') + p = self._parser(mappers=[mp], used_facts=[fact]) + # The sysmon parser splits on \r\n\r\n; within an event the regex + # captures everything after "Id : " including a trailing \r + event = 'Id : 5\r\nRecordId : 100' + result = p.parse(event) + assert len(result) == 1 + assert result[0].source.value == 'guid-1' + assert result[0].target.value.strip() == '5' + + def test_parse_multiple_events(self): + mp = FakeMapper(source='host.process.guid', edge='has_eventid', target='sysmon.eventid') + fact = FakeFact(trait='host.process.guid', value='guid-1') + p = self._parser(mappers=[mp], used_facts=[fact]) + events = 'Id : 1\r\n\r\nId : 2' + result = p.parse(events) + assert len(result) == 2 + + def test_parse_no_match_returns_empty(self): + mp = FakeMapper(source='s', edge='e', target='sysmon.eventid') + fact = FakeFact(trait='s', value='v') + p = self._parser(mappers=[mp], used_facts=[fact]) + result = p.parse('nothing here') + assert result == [] + + def test_parse_recordid_mapper(self): + mp = FakeMapper(source='host.process.guid', edge='has_recordid', target='sysmon.recordid') + fact = FakeFact(trait='host.process.guid', value='guid-1') + p = self._parser(mappers=[mp], used_facts=[fact]) + event = 'RecordId : 777' + result = p.parse(event) + assert len(result) == 1 + assert result[0].target.value == '777' + + def test_parse_user_mapper(self): + mp = FakeMapper(source='host.process.guid', edge='has_user', target='sysmon.user') + fact = FakeFact(trait='host.process.guid', value='guid-1') + p = self._parser(mappers=[mp], used_facts=[fact]) + event = 'User: DOMAIN\\user1' + result = p.parse(event) + assert len(result) == 1 + assert result[0].target.value == 'DOMAIN\\user1' + + def test_parse_empty_blob(self): + mp = FakeMapper(source='s', edge='e', target='sysmon.eventid') + fact = FakeFact(trait='s', value='v') + p = self._parser(mappers=[mp], used_facts=[fact]) + result = p.parse('') + assert result == [] + + def test_parse_multiple_mappers(self): + mp1 = FakeMapper(source='host.process.guid', edge='e1', target='sysmon.eventid') + mp2 = FakeMapper(source='host.process.guid', edge='e2', target='sysmon.recordid') + fact = FakeFact(trait='host.process.guid', value='guid-1') + p = self._parser(mappers=[mp1, mp2], used_facts=[fact]) + event = 'Id : 1\r\nRecordId : 2' + result = p.parse(event) + assert len(result) == 2 diff --git a/tests/test_process_tree.py b/tests/test_process_tree.py new file mode 100644 index 0000000..d00ed44 --- /dev/null +++ b/tests/test_process_tree.py @@ -0,0 +1,193 @@ +"""Tests for ProcessNode and ProcessTree.""" +import sys +import pytest + +sys.path.insert(0, '/tmp/response-pytest') + +from app.c_processnode import ProcessNode +from app.c_processtree import ProcessTree +from tests.conftest import FakeLink + + +# ── ProcessNode ────────────────────────────────────────────────────────────── + +class TestProcessNode: + + def test_init_defaults(self): + link = FakeLink() + node = ProcessNode(pid=100, link=link) + assert node.pid == 100 + assert node.link is link + assert node.parent_guid is None + assert node.child_guids == [] + + def test_init_with_parent_and_children(self): + link = FakeLink() + node = ProcessNode(pid=200, link=link, parent_guid='pg1', child_guids=['c1', 'c2']) + assert node.parent_guid == 'pg1' + assert node.child_guids == ['c1', 'c2'] + + def test_add_child_same_host(self): + link = FakeLink(host='h1') + node = ProcessNode(pid=1, link=link) + child_link = FakeLink(host='h1') + node.add_child('child-guid-1', child_link) + assert 'child-guid-1' in node.child_guids + + def test_add_child_different_host_rejected(self): + link = FakeLink(host='h1') + node = ProcessNode(pid=1, link=link) + child_link = FakeLink(host='h2') + node.add_child('child-guid-2', child_link) + assert 'child-guid-2' not in node.child_guids + + def test_add_child_no_duplicates(self): + link = FakeLink(host='h1') + node = ProcessNode(pid=1, link=link) + child_link = FakeLink(host='h1') + node.add_child('g1', child_link) + node.add_child('g1', child_link) + assert node.child_guids.count('g1') == 1 + + def test_child_guids_not_shared_across_instances(self): + link = FakeLink() + n1 = ProcessNode(pid=1, link=link) + n2 = ProcessNode(pid=2, link=link) + n1.add_child('g', FakeLink(host=link.host)) + assert n2.child_guids == [] + + +# ── ProcessTree ────────────────────────────────────────────────────────────── + +class TestProcessTree: + + def _tree(self, host='host1', ptree_id=42): + return ProcessTree(host=host, ptree_id=ptree_id) + + def test_init_defaults(self): + t = ProcessTree('myhost') + assert t.host == 'myhost' + assert isinstance(t.ptree_id, int) + assert t.pid_to_guids_map == {} + assert t.guid_to_processnode_map == {} + + def test_unique_property(self): + t = self._tree() + assert t.unique == 'host142' + + @pytest.mark.asyncio + async def test_add_processnode_basic(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='g1', pid=100, link=link) + assert 'g1' in t.guid_to_processnode_map + assert 100 in t.pid_to_guids_map + assert 'g1' in t.pid_to_guids_map[100] + + @pytest.mark.asyncio + async def test_add_processnode_with_parent(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='parent-g', pid=10, link=link) + await t.add_processnode(guid='child-g', pid=20, link=link, parent_guid='parent-g') + parent_node = t.guid_to_processnode_map['parent-g'] + assert 'child-g' in parent_node.child_guids + + @pytest.mark.asyncio + async def test_add_multiple_guids_for_same_pid(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='g1', pid=100, link=link) + await t.add_processnode(guid='g2', pid=100, link=link) + assert t.pid_to_guids_map[100] == ['g1', 'g2'] + + @pytest.mark.asyncio + async def test_find_parent_guid_exists(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='parent-g', pid=10, link=link) + await t.add_processnode(guid='child-g', pid=20, link=link, parent_guid='parent-g') + assert await t.find_parent_guid('child-g') == 'parent-g' + + @pytest.mark.asyncio + async def test_find_parent_guid_none(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='g1', pid=10, link=link) + assert await t.find_parent_guid('g1') is None + + @pytest.mark.asyncio + async def test_find_parent_guid_missing_guid(self): + t = self._tree() + assert await t.find_parent_guid('nonexistent') is None + + @pytest.mark.asyncio + async def test_convert_guids_to_pids(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='g1', pid=10, link=link) + await t.add_processnode(guid='g2', pid=20, link=link) + pids = await t.convert_guids_to_pids(['g1', 'g2']) + assert pids == [10, 20] + + @pytest.mark.asyncio + async def test_convert_guids_to_pids_empty(self): + t = self._tree() + assert await t.convert_guids_to_pids([]) == [] + + @pytest.mark.asyncio + async def test_find_original_processes_single_chain(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='root', pid=1, link=link) + await t.add_processnode(guid='mid', pid=2, link=link, parent_guid='root') + await t.add_processnode(guid='leaf', pid=3, link=link, parent_guid='mid') + original = await t.find_original_processes_by_pid(3) + assert original == [1] + + @pytest.mark.asyncio + async def test_find_original_processes_no_parent(self): + t = self._tree() + link = FakeLink(host='host1') + await t.add_processnode(guid='g1', pid=10, link=link) + original = await t.find_original_processes_by_pid(10) + assert original == [10] + + @pytest.mark.asyncio + async def test_find_original_processes_unknown_pid(self): + t = self._tree() + original = await t.find_original_processes_by_pid(999) + assert original == [] + + @pytest.mark.asyncio + async def test_find_original_multiple_guids_same_pid(self): + t = self._tree() + link = FakeLink(host='host1') + # Two separate roots both mapping to pid 5 + await t.add_processnode(guid='r1', pid=5, link=link) + await t.add_processnode(guid='r2', pid=5, link=link) + # Child of r1 + await t.add_processnode(guid='c1', pid=10, link=link, parent_guid='r1') + original = await t.find_original_processes_by_pid(10) + assert original == [5] + + def test_store_adds_to_ram(self): + t = self._tree() + ram = {'processtrees': []} + result = t.store(ram) + assert len(ram['processtrees']) == 1 + assert result is t + + def test_store_no_duplicate(self): + t = self._tree() + ram = {'processtrees': []} + t.store(ram) + t.store(ram) + assert len(ram['processtrees']) == 1 + + def test_store_returns_existing(self): + t = self._tree() + ram = {'processtrees': []} + first = t.store(ram) + second = t.store(ram) + assert first is second diff --git a/tests/test_requirements/__init__.py b/tests/test_requirements/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_requirements/test_base_requirement.py b/tests/test_requirements/test_base_requirement.py new file mode 100644 index 0000000..9205498 --- /dev/null +++ b/tests/test_requirements/test_base_requirement.py @@ -0,0 +1,91 @@ +"""Tests for app.requirements.base_requirement.""" +import sys +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeFact, FakeRelationship +from app.requirements.base_requirement import BaseRequirement + + +class TestBaseRequirement: + + def _req(self, enforcements): + return BaseRequirement({'enforcements': enforcements}) + + # ── _check_edge ────────────────────────────────────────────────────── + + def test_check_edge_match(self): + req = self._req({'edge': 'has_property'}) + assert req._check_edge('has_property') is True + + def test_check_edge_mismatch(self): + req = self._req({'edge': 'has_property'}) + assert req._check_edge('other_edge') is False + + # ── _check_target ──────────────────────────────────────────────────── + + def test_check_target_match(self): + target = FakeFact(trait='t', value='v') + match = FakeFact(trait='t', value='v') + assert BaseRequirement._check_target(target, match) is True + + def test_check_target_mismatch_trait(self): + target = FakeFact(trait='t1', value='v') + match = FakeFact(trait='t2', value='v') + assert BaseRequirement._check_target(target, match) is False + + def test_check_target_mismatch_value(self): + target = FakeFact(trait='t', value='v1') + match = FakeFact(trait='t', value='v2') + assert BaseRequirement._check_target(target, match) is False + + # ── _get_relationships ─────────────────────────────────────────────── + + def test_get_relationships_filters_correctly(self): + uf = FakeFact(trait='host.process.id', value='100') + r1 = FakeRelationship(source=FakeFact(trait='host.process.id', value='100'), edge='e') + r2 = FakeRelationship(source=FakeFact(trait='other', value='100'), edge='e') + r3 = FakeRelationship(source=FakeFact(trait='host.process.id', value='999'), edge='e') + result = BaseRequirement._get_relationships(uf, [r1, r2, r3]) + assert result == [r1] + + def test_get_relationships_empty(self): + uf = FakeFact(trait='t', value='v') + assert BaseRequirement._get_relationships(uf, []) == [] + + # ── is_valid_relationship ──────────────────────────────────────────── + + def test_is_valid_wrong_edge(self): + req = self._req({'edge': 'has_x'}) + rel = FakeRelationship(edge='has_y') + assert req.is_valid_relationship([], rel) is False + + def test_is_valid_no_target_enforcement(self): + req = self._req({'edge': 'has_x'}) + rel = FakeRelationship(edge='has_x') + assert req.is_valid_relationship([], rel) is True + + def test_is_valid_target_enforcement_match(self): + req = self._req({'edge': 'has_x', 'target': 'tgt.trait'}) + rel = FakeRelationship( + edge='has_x', + target=FakeFact(trait='tgt.trait', value='v1'), + ) + used_fact = FakeFact(trait='tgt.trait', value='v1') + assert req.is_valid_relationship([used_fact], rel) is True + + def test_is_valid_target_enforcement_no_match(self): + req = self._req({'edge': 'has_x', 'target': 'tgt.trait'}) + rel = FakeRelationship( + edge='has_x', + target=FakeFact(trait='tgt.trait', value='v1'), + ) + used_fact = FakeFact(trait='tgt.trait', value='DIFFERENT') + assert req.is_valid_relationship([used_fact], rel) is False + + def test_is_valid_target_enforcement_no_facts(self): + req = self._req({'edge': 'has_x', 'target': 'tgt.trait'}) + rel = FakeRelationship( + edge='has_x', + target=FakeFact(trait='tgt.trait', value='v1'), + ) + assert req.is_valid_relationship([], rel) is False diff --git a/tests/test_requirements/test_basic.py b/tests/test_requirements/test_basic.py new file mode 100644 index 0000000..c76dbfb --- /dev/null +++ b/tests/test_requirements/test_basic.py @@ -0,0 +1,75 @@ +"""Tests for app.requirements.basic.""" +import sys +import pytest +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeFact, FakeRelationship, FakeLink, FakeOperation +from app.requirements.basic import Requirement + + +class TestBasicRequirement: + + def _req(self, enforcements): + return Requirement({'enforcements': enforcements}) + + @pytest.mark.asyncio + async def test_enforce_returns_true_on_valid_match(self): + req = self._req({'source': 'host.process.id', 'edge': 'has_port'}) + uf = FakeFact(trait='host.process.id', value='100') + link = FakeLink(used=[uf]) + rel = FakeRelationship( + source=FakeFact(trait='host.process.id', value='100'), + edge='has_port', + target=FakeFact(trait='host.port', value='80'), + ) + op = FakeOperation(relationships=[rel]) + assert await req.enforce(link, op) is True + + @pytest.mark.asyncio + async def test_enforce_returns_false_no_source_match(self): + req = self._req({'source': 'host.process.id', 'edge': 'has_port'}) + uf = FakeFact(trait='other.trait', value='100') + link = FakeLink(used=[uf]) + op = FakeOperation(relationships=[]) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_wrong_edge(self): + req = self._req({'source': 'host.process.id', 'edge': 'has_port'}) + uf = FakeFact(trait='host.process.id', value='100') + link = FakeLink(used=[uf]) + rel = FakeRelationship( + source=FakeFact(trait='host.process.id', value='100'), + edge='wrong_edge', + ) + op = FakeOperation(relationships=[rel]) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_with_target_enforcement(self): + req = self._req({'source': 'host.process.id', 'edge': 'has_port', 'target': 'host.port'}) + uf1 = FakeFact(trait='host.process.id', value='100') + uf2 = FakeFact(trait='host.port', value='80') + link = FakeLink(used=[uf1, uf2]) + rel = FakeRelationship( + source=FakeFact(trait='host.process.id', value='100'), + edge='has_port', + target=FakeFact(trait='host.port', value='80'), + ) + op = FakeOperation(relationships=[rel]) + assert await req.enforce(link, op) is True + + @pytest.mark.asyncio + async def test_enforce_returns_false_empty_used(self): + req = self._req({'source': 'host.process.id', 'edge': 'has_port'}) + link = FakeLink(used=[]) + op = FakeOperation() + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_no_relationships(self): + req = self._req({'source': 'host.process.id', 'edge': 'has_port'}) + uf = FakeFact(trait='host.process.id', value='100') + link = FakeLink(used=[uf]) + op = FakeOperation(relationships=[]) + assert await req.enforce(link, op) is False diff --git a/tests/test_requirements/test_has_property.py b/tests/test_requirements/test_has_property.py new file mode 100644 index 0000000..c66ac48 --- /dev/null +++ b/tests/test_requirements/test_has_property.py @@ -0,0 +1,99 @@ +"""Tests for app.requirements.has_property.""" +import sys +import pytest +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeFact, FakeRelationship, FakeLink, FakeOperation +from app.requirements.has_property import Requirement + + +class TestHasPropertyRequirement: + + def _req(self, enforcements): + return Requirement({'enforcements': enforcements}) + + @pytest.mark.asyncio + async def test_enforce_returns_true_when_property_present(self): + req = self._req({ + 'source': 'elasticsearch.result.id', + 'edge': 'has_property', + 'target': 'process.name', + }) + uf = FakeFact(trait='elasticsearch.result.id', value='es-1') + link = FakeLink(used=[uf]) + rel = FakeRelationship( + source=FakeFact(trait='elasticsearch.result.id', value='es-1'), + edge='has_property', + target=FakeFact(trait='process.name', value='cmd.exe'), + ) + op = FakeOperation(relationships=[rel]) + assert await req.enforce(link, op) is True + + @pytest.mark.asyncio + async def test_enforce_returns_false_wrong_edge(self): + req = self._req({ + 'source': 'elasticsearch.result.id', + 'edge': 'has_property', + 'target': 'process.name', + }) + uf = FakeFact(trait='elasticsearch.result.id', value='es-1') + link = FakeLink(used=[uf]) + rel = FakeRelationship( + source=FakeFact(trait='elasticsearch.result.id', value='es-1'), + edge='other_edge', + target=FakeFact(trait='process.name', value='cmd.exe'), + ) + op = FakeOperation(relationships=[rel]) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_wrong_target_trait(self): + req = self._req({ + 'source': 'elasticsearch.result.id', + 'edge': 'has_property', + 'target': 'process.name', + }) + uf = FakeFact(trait='elasticsearch.result.id', value='es-1') + link = FakeLink(used=[uf]) + rel = FakeRelationship( + source=FakeFact(trait='elasticsearch.result.id', value='es-1'), + edge='has_property', + target=FakeFact(trait='wrong.trait', value='cmd.exe'), + ) + op = FakeOperation(relationships=[rel]) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_no_source_match(self): + req = self._req({ + 'source': 'elasticsearch.result.id', + 'edge': 'has_property', + 'target': 'process.name', + }) + uf = FakeFact(trait='other.trait', value='es-1') + link = FakeLink(used=[uf]) + op = FakeOperation(relationships=[]) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_empty_used(self): + req = self._req({ + 'source': 'elasticsearch.result.id', + 'edge': 'has_property', + 'target': 'process.name', + }) + link = FakeLink(used=[]) + op = FakeOperation() + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_no_matching_relationship(self): + req = self._req({ + 'source': 'elasticsearch.result.id', + 'edge': 'has_property', + 'target': 'process.name', + }) + uf = FakeFact(trait='elasticsearch.result.id', value='es-1') + link = FakeLink(used=[uf]) + op = FakeOperation(relationships=[]) + assert await req.enforce(link, op) is False diff --git a/tests/test_requirements/test_source_fact.py b/tests/test_requirements/test_source_fact.py new file mode 100644 index 0000000..8a107cb --- /dev/null +++ b/tests/test_requirements/test_source_fact.py @@ -0,0 +1,67 @@ +"""Tests for app.requirements.source_fact.""" +import sys +import pytest +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import FakeFact, FakeLink, FakeOperation, FakeSource +from app.requirements.source_fact import Requirement + + +class TestSourceFactRequirement: + + def _req(self, enforcements): + return Requirement({'enforcements': enforcements}) + + @pytest.mark.asyncio + async def test_enforce_returns_true_when_fact_in_source(self): + req = self._req({'source': 'host.process.id', 'edge': 'e'}) + uf = FakeFact(trait='host.process.id', value='100') + link = FakeLink(used=[uf]) + source = FakeSource(facts=[FakeFact(trait='host.process.id', value='100')]) + op = FakeOperation(source=source) + assert await req.enforce(link, op) is True + + @pytest.mark.asyncio + async def test_enforce_returns_false_when_fact_not_in_source(self): + req = self._req({'source': 'host.process.id', 'edge': 'e'}) + uf = FakeFact(trait='host.process.id', value='100') + link = FakeLink(used=[uf]) + source = FakeSource(facts=[FakeFact(trait='host.process.id', value='999')]) + op = FakeOperation(source=source) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_wrong_trait(self): + req = self._req({'source': 'host.process.id', 'edge': 'e'}) + uf = FakeFact(trait='other.trait', value='100') + link = FakeLink(used=[uf]) + source = FakeSource(facts=[FakeFact(trait='host.process.id', value='100')]) + op = FakeOperation(source=source) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_empty_used(self): + req = self._req({'source': 'host.process.id', 'edge': 'e'}) + link = FakeLink(used=[]) + source = FakeSource(facts=[FakeFact(trait='host.process.id', value='100')]) + op = FakeOperation(source=source) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_returns_false_empty_source_facts(self): + req = self._req({'source': 'host.process.id', 'edge': 'e'}) + uf = FakeFact(trait='host.process.id', value='100') + link = FakeLink(used=[uf]) + source = FakeSource(facts=[]) + op = FakeOperation(source=source) + assert await req.enforce(link, op) is False + + @pytest.mark.asyncio + async def test_enforce_multiple_used_facts(self): + req = self._req({'source': 'host.process.id', 'edge': 'e'}) + uf1 = FakeFact(trait='other', value='x') + uf2 = FakeFact(trait='host.process.id', value='42') + link = FakeLink(used=[uf1, uf2]) + source = FakeSource(facts=[FakeFact(trait='host.process.id', value='42')]) + op = FakeOperation(source=source) + assert await req.enforce(link, op) is True diff --git a/tests/test_response_svc.py b/tests/test_response_svc.py new file mode 100644 index 0000000..550e9c1 --- /dev/null +++ b/tests/test_response_svc.py @@ -0,0 +1,335 @@ +"""Tests for response_svc.py — ResponseService and module-level helpers.""" +import sys +import json +import asyncio +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +sys.path.insert(0, '/tmp/response-pytest') + +from tests.conftest import ( + FakeFact, FakeRelationship, FakeLink, FakeOperation, FakeAgent, FakeSource, +) +from app.response_svc import ResponseService + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _svc(overrides=None): + """Build a ResponseService backed by mocks.""" + services = { + 'data_svc': MagicMock(), + 'rest_svc': MagicMock(), + 'app_svc': MagicMock(), + 'file_svc': MagicMock(), + 'event_svc': MagicMock(), + } + if overrides: + services.update(overrides) + svc = ResponseService(services) + return svc + + +# ── Static / pure helpers ──────────────────────────────────────────────────── + +class TestFilterAbilityFacts: + + def test_passthrough_for_other_traits(self): + svc = _svc() + fact = FakeFact(trait='host.something', value='v') + result = svc._filter_ability_facts([fact], [], '111', '222') + assert result == [fact] + + def test_host_process_guid_included_when_child(self): + svc = _svc() + fact = FakeFact(trait='host.process.guid', value='guid-1') + rel = FakeRelationship( + source=FakeFact(trait='host.process.guid', value='guid-1'), + edge='has_parentid', + target=FakeFact(trait='host.process.id', value='111'), + ) + result = svc._filter_ability_facts([fact], [rel], '111', '222') + assert fact in result + + def test_host_process_guid_excluded_when_not_child(self): + svc = _svc() + fact = FakeFact(trait='host.process.guid', value='guid-1') + result = svc._filter_ability_facts([fact], [], '111', '222') + assert fact not in result + + def test_host_process_parentguid_included_when_red_guid(self): + svc = _svc() + fact = FakeFact(trait='host.process.parentguid', value='red-guid') + rel = FakeRelationship( + source=FakeFact(trait='x', value='111'), + edge='has_guid', + target=FakeFact(trait='y', value='red-guid'), + ) + result = svc._filter_ability_facts([fact], [rel], '111', '222') + assert fact in result + + +class TestIsChildGuid: + + def test_returns_true_on_match_red_pid(self): + rel = FakeRelationship( + source=FakeFact(trait='x', value='guid-1'), + edge='has_parentid', + target=FakeFact(trait='y', value='111'), + ) + fact = FakeFact(trait='host.process.guid', value='guid-1') + assert ResponseService._is_child_guid([rel], '111', '222', fact) + + def test_returns_true_on_match_original_pid(self): + rel = FakeRelationship( + source=FakeFact(trait='x', value='guid-1'), + edge='has_parentid', + target=FakeFact(trait='y', value='222'), + ) + fact = FakeFact(trait='host.process.guid', value='guid-1') + assert ResponseService._is_child_guid([rel], '111', '222', fact) + + def test_returns_false_when_no_match(self): + fact = FakeFact(trait='host.process.guid', value='guid-1') + assert not ResponseService._is_child_guid([], '111', '222', fact) + + +class TestIsRedAgentGuid: + + def test_returns_true_when_guid_matches(self): + rel = FakeRelationship( + source=FakeFact(trait='x', value='111'), + edge='has_guid', + target=FakeFact(trait='y', value='red-guid'), + ) + fact = FakeFact(trait='host.process.parentguid', value='red-guid') + assert ResponseService._is_red_agent_guid([rel], '111', fact) + + def test_returns_false_when_guid_differs(self): + rel = FakeRelationship( + source=FakeFact(trait='x', value='111'), + edge='has_guid', + target=FakeFact(trait='y', value='red-guid'), + ) + fact = FakeFact(trait='host.process.parentguid', value='other-guid') + assert not ResponseService._is_red_agent_guid([rel], '111', fact) + + +class TestGetOriginalGuid: + + @pytest.mark.asyncio + async def test_returns_guid_on_match(self): + rel = FakeRelationship( + source=FakeFact(trait='host.process.id', value='100'), + edge='has_guid', + target=FakeFact(trait='host.process.guid', value='the-guid'), + ) + result = await ResponseService._get_original_guid('100', [rel]) + assert result == 'the-guid' + + @pytest.mark.asyncio + async def test_returns_none_when_no_match(self): + result = await ResponseService._get_original_guid('100', []) + assert result is None + + @pytest.mark.asyncio + async def test_ignores_wrong_trait(self): + rel = FakeRelationship( + source=FakeFact(trait='wrong.trait', value='100'), + edge='has_guid', + target=FakeFact(trait='host.process.guid', value='the-guid'), + ) + result = await ResponseService._get_original_guid('100', [rel]) + assert result is None + + @pytest.mark.asyncio + async def test_ignores_wrong_edge(self): + rel = FakeRelationship( + source=FakeFact(trait='host.process.id', value='100'), + edge='wrong_edge', + target=FakeFact(trait='host.process.guid', value='the-guid'), + ) + result = await ResponseService._get_original_guid('100', [rel]) + assert result is None + + +class TestGetInfoFromTopLevelProcessLink: + + @pytest.mark.asyncio + async def test_extracts_pid_and_guid(self): + rel = FakeRelationship( + source=FakeFact(trait='host.process.id', value=' 42 '), + edge='has_guid', + target=FakeFact(trait='host.process.guid', value='guid-x'), + ) + link = FakeLink(relationships=[rel]) + pid, guid, parent_guid = await ResponseService.get_info_from_top_level_process_link(link) + assert pid == 42 + assert guid == 'guid-x' + assert parent_guid is None + + @pytest.mark.asyncio + async def test_returns_nones_for_empty_link(self): + link = FakeLink(relationships=[]) + pid, guid, parent_guid = await ResponseService.get_info_from_top_level_process_link(link) + assert pid is None + assert guid is None + assert parent_guid is None + + +class TestGetInfoFromChildProcessLink: + + @pytest.mark.asyncio + async def test_extracts_all_fields(self): + rel_id = FakeRelationship( + source=FakeFact(trait='host.process.guid', value='parent-guid'), + edge='has_childprocess_id', + target=FakeFact(trait='host.process.id', value=' 99 '), + ) + rel_guid = FakeRelationship( + source=FakeFact(trait='host.process.guid', value='parent-guid'), + edge='has_childprocess_guid', + target=FakeFact(trait='host.process.guid', value='child-guid'), + ) + link = FakeLink(relationships=[rel_id, rel_guid]) + pid, guid, parent_guid = await ResponseService.get_info_from_child_process_link(link) + assert pid == 99 + assert guid == 'child-guid' + assert parent_guid == 'parent-guid' + + @pytest.mark.asyncio + async def test_returns_nones_for_empty_link(self): + link = FakeLink(relationships=[]) + pid, guid, parent_guid = await ResponseService.get_info_from_child_process_link(link) + assert pid is None + assert guid is None + assert parent_guid is None + + +class TestCreateFactSource: + + @pytest.mark.asyncio + async def test_returns_source_with_unique_name(self): + s = await ResponseService.create_fact_source() + assert s.name.startswith('blue-pid-') + assert s.facts == [] + + +class TestWaitForLinkCompletion: + + @pytest.mark.asyncio + async def test_returns_immediately_when_finished(self): + link = FakeLink(finish=True) + agent = FakeAgent() + await ResponseService.wait_for_link_completion([link], agent) + + @pytest.mark.asyncio + async def test_returns_when_agent_untrusted(self): + link = FakeLink(finish=None) + agent = FakeAgent(trusted=False) + await ResponseService.wait_for_link_completion([link], agent) + + +class TestGetAvailableAgents: + + @pytest.mark.asyncio + async def test_returns_agents_on_same_host(self): + svc = _svc() + blue_agent = FakeAgent(host='h1') + svc.agents = [blue_agent] + svc.adversary = MagicMock() + svc.adversary.atomic_ordering = [] + svc.data_svc = AsyncMock() + svc.data_svc.locate = AsyncMock(return_value=[blue_agent]) + svc.apply_adversary_config = AsyncMock() + red = FakeAgent(host='h1') + result = await svc.get_available_agents(red) + assert result == [blue_agent] + + @pytest.mark.asyncio + async def test_returns_empty_when_no_match(self): + svc = _svc() + svc.agents = [] + svc.adversary = MagicMock() + svc.adversary.atomic_ordering = [] + svc.data_svc = AsyncMock() + svc.data_svc.locate = AsyncMock(return_value=[]) + svc.apply_adversary_config = AsyncMock() + red = FakeAgent(host='h1') + result = await svc.get_available_agents(red) + assert result == [] + + +class TestRespondToPid: + + @pytest.mark.asyncio + async def test_noop_when_no_agents(self): + svc = _svc() + svc.get_available_agents = AsyncMock(return_value=[]) + red = FakeAgent() + result = await svc.respond_to_pid('123', red, 'visible') + assert result is None + + @pytest.mark.asyncio + async def test_creates_operation_when_none_exists(self): + svc = _svc() + blue = FakeAgent(host='h1') + svc.get_available_agents = AsyncMock(return_value=[blue]) + svc.create_fact_source = AsyncMock(return_value=FakeSource()) + svc.create_operation = AsyncMock() + svc.run_abilities_on_agent = AsyncMock(return_value=([], [])) + svc.ops = {} + red = FakeAgent(host='h1', pid=100) + await svc.respond_to_pid('123', red, 'visible') + svc.create_operation.assert_awaited_once() + + +class TestProcessChildProcessLinks: + + @pytest.mark.asyncio + async def test_extracts_child_guids(self): + svc = _svc() + rel = FakeRelationship( + source=FakeFact(trait='host.process.guid', value='pg'), + edge='has_childprocess_guid', + target=FakeFact(trait='host.process.guid', value='cg'), + ) + link = FakeLink(relationships=[rel]) + svc.add_link_to_process_tree = AsyncMock() + result = await svc.process_child_process_links([link]) + assert 'cg' in result + + @pytest.mark.asyncio + async def test_returns_empty_on_no_match(self): + svc = _svc() + link = FakeLink(relationships=[]) + svc.add_link_to_process_tree = AsyncMock() + result = await svc.process_child_process_links([link]) + assert result == [] + + +class TestRegisterHandler: + + @pytest.mark.asyncio + async def test_registers_event(self): + event_svc = MagicMock() + event_svc.observe_event = AsyncMock() + await ResponseService.register_handler(event_svc) + event_svc.observe_event.assert_awaited_once() + + +class TestUpdateOperation: + + @pytest.mark.asyncio + async def test_links_added_to_op(self): + svc = _svc() + op = MagicMock() + op.id = 'op-1' + op.add_link = MagicMock() + svc.ops = {'visible': op} + link = FakeLink() + await svc.update_operation([link], 'visible') + assert link.operation == 'op-1' + op.add_link.assert_called_once_with(link) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..c869345 --- /dev/null +++ b/tox.ini @@ -0,0 +1,10 @@ +[tox] +envlist = py39, py310, py311, py312 +skipsdist = true + +[testenv] +deps = + pytest + pytest-asyncio +commands = + pytest {posargs:-v}