-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathconftest.py
More file actions
185 lines (142 loc) · 6.33 KB
/
conftest.py
File metadata and controls
185 lines (142 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Shared fixtures and mock services for sandcat plugin tests."""
import os
import sys
import shutil
import tempfile
from unittest.mock import AsyncMock, MagicMock
import pytest
# ---------------------------------------------------------------------------
# Fake caldera service stubs so that imports from app.utility.base_service,
# app.utility.base_world, and app.service.auth_svc never touch real code.
# ---------------------------------------------------------------------------
class _FakeBaseService:
"""Minimal stand-in for app.utility.base_service.BaseService."""
def create_logger(self, name):
import logging
return logging.getLogger(name)
class _FakeBaseWorld:
"""Minimal stand-in for app.utility.base_world.BaseWorld."""
_config = {}
@classmethod
def get_config(cls, prop=None, name=None):
return cls._config.get(prop, '')
@classmethod
def set_config(cls, prop, value):
cls._config[prop] = value
@classmethod
def clear_config(cls):
cls._config.clear()
def _noop_decorator(fn):
return fn
def _for_all_public_methods(decorator):
def wrapper(cls):
return cls
return wrapper
# ---------------------------------------------------------------------------
# Session-scoped autouse fixture: inject fake modules into sys.modules once
# per test session so sandcat code can be imported without a real Caldera
# installation present.
# ---------------------------------------------------------------------------
@pytest.fixture(scope='session', autouse=True)
def _inject_fake_modules():
"""Populate sys.modules with thin stubs for Caldera framework modules."""
repo_root = os.path.dirname(os.path.abspath(__file__))
# Build fake module stubs
base_service_mod = type(sys)('app.utility.base_service')
base_service_mod.BaseService = _FakeBaseService
base_world_mod = type(sys)('app.utility.base_world')
base_world_mod.BaseWorld = _FakeBaseWorld
auth_svc_mod = type(sys)('app.service.auth_svc')
auth_svc_mod.check_authorization = _noop_decorator
auth_svc_mod.for_all_public_methods = _for_all_public_methods
jinja2_mod = type(sys)('aiohttp_jinja2')
jinja2_mod.template = lambda name: (lambda fn: fn)
# Ensure parent packages exist as proper namespace stubs with real paths
pkg_paths = {
'app': [os.path.join(repo_root, 'app')],
'app.utility': [os.path.join(repo_root, 'app', 'utility')],
'app.service': [],
'app.extensions': [os.path.join(repo_root, 'app', 'extensions')],
'app.extensions.contact': [os.path.join(repo_root, 'app', 'extensions', 'contact')],
'app.extensions.donut': [os.path.join(repo_root, 'app', 'extensions', 'donut')],
'app.extensions.execute': [os.path.join(repo_root, 'app', 'extensions', 'execute')],
'app.extensions.execute.native': [os.path.join(repo_root, 'app', 'extensions', 'execute', 'native')],
'app.extensions.execute.shellcode': [os.path.join(repo_root, 'app', 'extensions', 'execute', 'shellcode')],
'app.extensions.execute.shells': [os.path.join(repo_root, 'app', 'extensions', 'execute', 'shells')],
'app.extensions.proxy': [os.path.join(repo_root, 'app', 'extensions', 'proxy')],
'app.extensions.shared': [os.path.join(repo_root, 'app', 'extensions', 'shared')],
}
for pkg, paths in pkg_paths.items():
if pkg not in sys.modules:
mod = type(sys)(pkg)
mod.__path__ = paths
sys.modules[pkg] = mod
sys.modules['app.utility.base_service'] = base_service_mod
sys.modules['app.utility.base_world'] = base_world_mod
sys.modules['app.service.auth_svc'] = auth_svc_mod
sys.modules['aiohttp_jinja2'] = jinja2_mod
# Make the plugin package importable as plugins.sandcat
if 'plugins' not in sys.modules:
plugins_mod = type(sys)('plugins')
plugins_mod.__path__ = [os.path.dirname(repo_root)]
sys.modules['plugins'] = plugins_mod
if 'plugins.sandcat' not in sys.modules:
sandcat_mod = type(sys)('plugins.sandcat')
sandcat_mod.__path__ = [repo_root]
sys.modules['plugins.sandcat'] = sandcat_mod
if 'plugins.sandcat.app' not in sys.modules:
ps_app = type(sys)('plugins.sandcat.app')
ps_app.__path__ = [os.path.join(repo_root, 'app')]
sys.modules['plugins.sandcat.app'] = ps_app
if 'plugins.sandcat.app.utility' not in sys.modules:
ps_util = type(sys)('plugins.sandcat.app.utility')
ps_util.__path__ = [os.path.join(repo_root, 'app', 'utility')]
sys.modules['plugins.sandcat.app.utility'] = ps_util
# Add repo root to sys.path so plain `from app.…` works for extension files
if repo_root not in sys.path:
sys.path.insert(0, repo_root)
yield
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def fake_base_world():
"""Provide and auto-clear the FakeBaseWorld config store."""
_FakeBaseWorld.clear_config()
yield _FakeBaseWorld
_FakeBaseWorld.clear_config()
@pytest.fixture
def mock_services():
"""Return a dict of mock caldera services suitable for SandService / SandGuiApi."""
file_svc = MagicMock()
file_svc.add_special_payload = AsyncMock()
file_svc.find_file_path = AsyncMock(return_value=('/fake', '/fake/sandcat.go'))
file_svc.compile_go = AsyncMock()
file_svc.sanitize_ldflag_value = MagicMock(side_effect=lambda p, v: v)
file_svc.log = MagicMock()
data_svc = MagicMock()
data_svc.locate = AsyncMock(return_value=[])
contact_svc = MagicMock()
contact_svc.contacts = []
app_svc = MagicMock()
app_svc.retrieve_compiled_file = AsyncMock(return_value=('/path', 'sandcat-linux'))
app_svc.application = MagicMock()
auth_svc = MagicMock()
return dict(
file_svc=file_svc,
data_svc=data_svc,
contact_svc=contact_svc,
app_svc=app_svc,
auth_svc=auth_svc,
)
@pytest.fixture
def sand_svc(mock_services):
"""Instantiate a SandService backed by mock services."""
from app.sand_svc import SandService
return SandService(mock_services)
@pytest.fixture
def tmp_dir():
"""Provide a temporary directory, cleaned up afterwards."""
d = tempfile.mkdtemp()
yield d
shutil.rmtree(d, ignore_errors=True)