Skip to content

Commit 7e80280

Browse files
author
mnbplus
committed
feat(parser): FastAPI explicit source annotations as taint sources
Extend _extract_source_params to recognise FastAPI explicit input constructors as user-controlled taint sources, in addition to Depends(): Query(), Body(), Header(), Path(), Form(), Cookie(), File(), UploadFile Changes: - Add _FASTAPI_SOURCE_CALLS frozenset with all known source constructors and their fully-qualified / starlette variants - Add _is_fastapi_source_call() with alias resolution via import_map - Update _extract_source_params to call both _is_depends_call and _is_fastapi_source_call for positional and keyword-only args - Add 10 tests covering Query/Body/Header/Path/Form/Cookie/File, aliased imports, sanitizer suppression, and mixed Depends+Query All 10 new tests pass; full suite (288 tests) passes with zero regression.
1 parent 0f065ef commit 7e80280

2 files changed

Lines changed: 415 additions & 10 deletions

File tree

pyaegis/core/parser.py

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -176,23 +176,58 @@ def _extract_decorators(
176176

177177
return decorator_names, routes
178178

179+
# FastAPI explicit source constructors — all represent user-controlled input
180+
_FASTAPI_SOURCE_CALLS = frozenset(
181+
{
182+
"Query",
183+
"Body",
184+
"Header",
185+
"Path",
186+
"Form",
187+
"Cookie",
188+
"File",
189+
"UploadFile",
190+
# Fully-qualified variants (fastapi.Query, etc.)
191+
"fastapi.Query",
192+
"fastapi.Body",
193+
"fastapi.Header",
194+
"fastapi.Path",
195+
"fastapi.Form",
196+
"fastapi.Cookie",
197+
"fastapi.File",
198+
"fastapi.UploadFile",
199+
# starlette aliases
200+
"starlette.datastructures.UploadFile",
201+
}
202+
)
203+
179204
def _extract_source_params(
180205
self,
181206
func_node: ast.AST,
182207
import_map: Optional[Dict[str, str]] = None,
183208
) -> List[str]:
184209
"""Extract parameters that should be treated as tainted sources.
185210
186-
Currently focuses on FastAPI-style dependency injection:
187-
def endpoint(
188-
user_id: str,
189-
user=Depends(get_user),
190-
token: str = Depends(auth),
191-
)
211+
Recognises two categories of FastAPI tainted parameters:
212+
213+
1. Dependency injection via ``Depends()``:
214+
def endpoint(user=Depends(get_user), token: str = Depends(auth))
215+
216+
2. Explicit FastAPI source annotations (user-controlled input):
217+
def endpoint(
218+
q: str = Query(...),
219+
body: MyModel = Body(...),
220+
x_token: str = Header(None),
221+
item_id: int = Path(...),
222+
username: str = Form(...),
223+
session: str = Cookie(None),
224+
file: bytes = File(...),
225+
upload: UploadFile = File(...),
226+
)
192227
193228
We treat the *parameter name* as a taint source when its default
194-
value is a call to fastapi.Depends / starlette.Depends (directly or
195-
via imported alias).
229+
value is a call to fastapi.Depends / starlette.Depends OR one of
230+
the FastAPI explicit source constructors (directly or via alias).
196231
"""
197232
if not isinstance(func_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
198233
return []
@@ -212,7 +247,9 @@ def endpoint(
212247
for param, default in zip(params, defaults):
213248
if default is None:
214249
continue
215-
if self._is_depends_call(default, import_map):
250+
if self._is_depends_call(
251+
default, import_map
252+
) or self._is_fastapi_source_call(default, import_map):
216253
out.append(param.arg)
217254

218255
# keyword-only parameters
@@ -221,7 +258,9 @@ def endpoint(
221258
for param, default in zip(kwonlyargs, kw_defaults):
222259
if default is None:
223260
continue
224-
if self._is_depends_call(default, import_map):
261+
if self._is_depends_call(
262+
default, import_map
263+
) or self._is_fastapi_source_call(default, import_map):
225264
out.append(param.arg)
226265

227266
return out
@@ -249,6 +288,38 @@ def _is_depends_call(
249288

250289
return False
251290

291+
def _is_fastapi_source_call(
292+
self,
293+
node: ast.AST,
294+
import_map: Optional[Dict[str, str]] = None,
295+
) -> bool:
296+
"""Return True if node is a FastAPI explicit source constructor.
297+
298+
Matches calls like ``Query(...)``, ``Body(...)``, ``Header(None)``,
299+
``Path(...)``, ``Form(...)``, ``Cookie(None)``, ``File(...)``,
300+
and their fully-qualified / aliased forms.
301+
"""
302+
if not isinstance(node, ast.Call):
303+
return False
304+
fn_name = self._get_full_name(node.func)
305+
if not fn_name:
306+
return False
307+
308+
if fn_name in self._FASTAPI_SOURCE_CALLS:
309+
return True
310+
311+
# Resolve import alias, e.g. ``from fastapi import Query as Q``
312+
if import_map:
313+
resolved = import_map.get(fn_name, "")
314+
if resolved in self._FASTAPI_SOURCE_CALLS:
315+
return True
316+
# Also match suffix: fastapi.Query -> ends with known name
317+
short = resolved.split(".")[-1] if resolved else ""
318+
if short and f"fastapi.{short}" in self._FASTAPI_SOURCE_CALLS:
319+
return True
320+
321+
return False
322+
252323
def _extract_import_aliases(self, tree: ast.AST) -> Dict[str, str]:
253324
"""Extract import aliases from a module AST.
254325

0 commit comments

Comments
 (0)