11"""Security utilities."""
22
33import os
4+ from pathlib import Path
45
56import structlog
67from fastapi import HTTPException , status
78
89logger = structlog .get_logger ()
910
1011
11- def validate_path (path : str , base_path : str ) -> str :
12+ def resolve_path_within_base (path : str , base_path : str ) -> str :
1213 """
13- Validate that a path is safe and within the base path .
14- Prevents directory traversal attacks .
14+ Resolve a user-provided path within a base directory .
15+ Prevents directory traversal and symlink escapes .
1516
1617 Args:
17- path: The path to validate (can be absolute or relative)
18+ path: The path to resolve (can be absolute or relative)
1819 base_path: The allowed base directory
1920
2021 Returns:
@@ -24,36 +25,71 @@ def validate_path(path: str, base_path: str) -> str:
2425 HTTPException: If path is invalid or outside base_path
2526 """
2627 try :
27- # Normalize base path to absolute path
28- abs_base_path = os .path .abspath (base_path )
29-
30- # Handle relative paths by joining with base_path
31- if not os .path .isabs (path ):
32- abs_path = os .path .abspath (os .path .join (abs_base_path , path ))
33- else :
34- abs_path = os .path .abspath (path )
35-
36- # Check if path is within base_path
37- # os.path.commonpath returns the longest common sub-path
38- # If valid, commonpath should be equal to base_path
39- if os .path .commonpath ([abs_base_path , abs_path ]) != abs_base_path :
28+ if path is None or not str (path ).strip ():
29+ raise HTTPException (
30+ status_code = status .HTTP_400_BAD_REQUEST ,
31+ detail = "Invalid path: Path is required" ,
32+ )
33+ if "\x00 " in str (path ):
34+ raise HTTPException (
35+ status_code = status .HTTP_400_BAD_REQUEST ,
36+ detail = "Invalid path: Null byte detected" ,
37+ )
38+
39+ abs_base_path = Path (base_path ).resolve ()
40+ candidate_path = Path (path )
41+ if not candidate_path .is_absolute ():
42+ candidate_path = abs_base_path / candidate_path
43+
44+ resolved_path = candidate_path .resolve (strict = False )
45+
46+ if not resolved_path .is_relative_to (abs_base_path ):
4047 logger .warning (
4148 "Path traversal attempt detected" ,
4249 path = path ,
43- resolved_path = abs_path ,
44- base_path = abs_base_path ,
50+ resolved_path = str ( resolved_path ) ,
51+ base_path = str ( abs_base_path ) ,
4552 )
4653 raise HTTPException (
4754 status_code = status .HTTP_400_BAD_REQUEST ,
4855 detail = "Invalid path: Path traversal detected" ,
4956 )
5057
51- return abs_path
58+ return str ( resolved_path )
5259
5360 except HTTPException :
5461 raise
5562 except Exception as e :
5663 logger .error ("Path validation error" , error = str (e ), path = path )
5764 raise HTTPException (
58- status_code = status .HTTP_400_BAD_REQUEST , detail = f"Invalid path: { str (e )} "
65+ status_code = status .HTTP_400_BAD_REQUEST ,
66+ detail = "Invalid path: Path validation failed" ,
5967 )
68+
69+
70+ def ensure_directory_within_base (
71+ path : str , base_path : str , * , allow_subpaths : bool = True
72+ ) -> str :
73+ """Validate a path within base_path and create the directory."""
74+ path_value = os .fspath (path )
75+ if not allow_subpaths :
76+ if os .path .isabs (path_value ):
77+ raise HTTPException (
78+ status_code = status .HTTP_400_BAD_REQUEST ,
79+ detail = "Invalid path: Absolute paths are not allowed" ,
80+ )
81+ for sep in (os .path .sep , os .path .altsep ):
82+ if sep and sep in path_value :
83+ raise HTTPException (
84+ status_code = status .HTTP_400_BAD_REQUEST ,
85+ detail = "Invalid path: Path separators are not allowed" ,
86+ )
87+
88+ resolved_path = resolve_path_within_base (path_value , base_path )
89+ os .makedirs (resolved_path , exist_ok = True )
90+ return resolved_path
91+
92+
93+ def validate_path (path : str , base_path : str ) -> str :
94+ """Backward-compatible wrapper for path resolution."""
95+ return resolve_path_within_base (path , base_path )
0 commit comments