diff --git a/cycode/cli/apps/mcp/mcp_command.py b/cycode/cli/apps/mcp/mcp_command.py index 0dcef968..b9989ce2 100644 --- a/cycode/cli/apps/mcp/mcp_command.py +++ b/cycode/cli/apps/mcp/mcp_command.py @@ -2,13 +2,14 @@ import json import logging import os +import shutil import sys import tempfile import uuid -from pathlib import Path from typing import Annotated, Any import typer +from pathvalidate import sanitize_filepath from pydantic import Field from cycode.cli.cli_types import McpTransportOption, ScanTypeOption @@ -26,7 +27,7 @@ _logger = get_logger('Cycode MCP') -_DEFAULT_RUN_COMMAND_TIMEOUT = 5 * 60 +_DEFAULT_RUN_COMMAND_TIMEOUT = 10 * 60 _FILES_TOOL_FIELD = Field(description='Files to scan, mapping file paths to their content') @@ -91,48 +92,76 @@ async def _run_cycode_command(*args: str, timeout: int = _DEFAULT_RUN_COMMAND_TI return {'error': f'Failed to run command: {e!s}'} -def _create_temp_files(files_content: dict[str, str]) -> list[str]: - """Create temporary files from content and return their paths.""" - temp_dir = tempfile.mkdtemp(prefix='cycode_mcp_') - temp_files = [] +def _sanitize_file_path(file_path: str) -> str: + """Sanitize file path to prevent path traversal and other security issues. - _logger.debug('Creating temporary files in directory: %s', temp_dir) + Args: + file_path: The file path to sanitize - for file_path, content in files_content.items(): - safe_filename = f'{_gen_random_id()}_{Path(file_path).name}' - temp_file_path = os.path.join(temp_dir, safe_filename) + Returns: + Sanitized file path safe for use in temporary directory - os.makedirs(os.path.dirname(temp_file_path), exist_ok=True) + Raises: + ValueError: If the path is invalid or potentially dangerous + """ + if not file_path or not isinstance(file_path, str): + raise ValueError('File path must be a non-empty string') - _logger.debug('Creating temp file: %s', temp_file_path) - with open(temp_file_path, 'w', encoding='UTF-8') as f: - f.write(content) + return sanitize_filepath(file_path, platform='auto', validate_after_sanitize=True) - temp_files.append(temp_file_path) - return temp_files +class _TempFilesManager: + """Context manager for creating and cleaning up temporary files. + Creates a temporary directory structure that preserves original file paths + inside a call_id as a suffix. Automatically cleans up all files and directories + when exiting the context. + """ -def _cleanup_temp_files(temp_files: list[str]) -> None: - """Clean up temporary files and directories.""" + def __init__(self, files_content: dict[str, str], call_id: str) -> None: + self.files_content = files_content + self.call_id = call_id + self.temp_base_dir = None + self.temp_files = [] - temp_dirs = set() - for temp_file in temp_files: - try: - if os.path.exists(temp_file): - _logger.debug('Removing temp file: %s', temp_file) - os.remove(temp_file) - temp_dirs.add(os.path.dirname(temp_file)) - except OSError as e: - _logger.warning('Failed to remove temp file %s: %s', temp_file, e) - - for temp_dir in temp_dirs: - try: - if os.path.exists(temp_dir) and not os.listdir(temp_dir): - _logger.debug('Removing temp directory: %s', temp_dir) - os.rmdir(temp_dir) - except OSError as e: - _logger.warning('Failed to remove temp directory %s: %s', temp_dir, e) + def __enter__(self) -> list[str]: + self.temp_base_dir = tempfile.mkdtemp(prefix='cycode_mcp_', suffix=self.call_id) + _logger.debug('Creating temporary files in directory: %s', self.temp_base_dir) + + for file_path, content in self.files_content.items(): + try: + sanitized_path = _sanitize_file_path(file_path) + temp_file_path = os.path.join(self.temp_base_dir, sanitized_path) + + # Ensure the normalized path is still within our temp directory + normalized_temp_path = os.path.normpath(temp_file_path) + normalized_base_path = os.path.normpath(self.temp_base_dir) + if not normalized_temp_path.startswith(normalized_base_path + os.sep): + raise ValueError(f'Path escapes temporary directory: {file_path}') + + os.makedirs(os.path.dirname(temp_file_path), exist_ok=True) + + _logger.debug('Creating temp file: %s (from: %s)', temp_file_path, file_path) + with open(temp_file_path, 'w', encoding='UTF-8') as f: + f.write(content) + + self.temp_files.append(temp_file_path) + except ValueError as e: + _logger.error('Invalid file path rejected: %s - %s', file_path, str(e)) + continue + except Exception as e: + _logger.error('Failed to create temp file for %s: %s', file_path, str(e)) + continue + + if not self.temp_files: + raise ValueError('No valid files provided after sanitization') + + return self.temp_files + + def __exit__(self, *_) -> None: + if self.temp_base_dir and os.path.exists(self.temp_base_dir): + _logger.debug('Removing temp directory recursively: %s', self.temp_base_dir) + shutil.rmtree(self.temp_base_dir, ignore_errors=True) async def _run_cycode_scan(scan_type: ScanTypeOption, temp_files: list[str]) -> dict[str, Any]: @@ -153,21 +182,36 @@ async def _cycode_scan_tool(scan_type: ScanTypeOption, files: dict[str, str] = _ _logger.error('No files provided for scan') return json.dumps({'error': 'No files provided'}) - temp_files = _create_temp_files(files) - try: - _logger.info( - 'Running Cycode scan, %s', - {'scan_type': scan_type, 'files_count': len(temp_files), 'call_id': _tool_call_id}, - ) - result = await _run_cycode_scan(scan_type, temp_files) - _logger.info('Scan completed, %s', {'scan_type': scan_type, 'call_id': _tool_call_id}) - return json.dumps(result, indent=2) + with _TempFilesManager(files, _tool_call_id) as temp_files: + original_count = len(files) + processed_count = len(temp_files) + + if processed_count < original_count: + _logger.warning( + 'Some files were rejected during sanitization, %s', + { + 'scan_type': scan_type, + 'original_count': original_count, + 'processed_count': processed_count, + 'call_id': _tool_call_id, + }, + ) + + _logger.info( + 'Running Cycode scan, %s', + {'scan_type': scan_type, 'files_count': processed_count, 'call_id': _tool_call_id}, + ) + result = await _run_cycode_scan(scan_type, temp_files) + + _logger.info('Scan completed, %s', {'scan_type': scan_type, 'call_id': _tool_call_id}) + return json.dumps(result, indent=2) + except ValueError as e: + _logger.error('Invalid input files, %s', {'scan_type': scan_type, 'call_id': _tool_call_id, 'error': str(e)}) + return json.dumps({'error': f'Invalid input files: {e!s}'}, indent=2) except Exception as e: _logger.error('Scan failed, %s', {'scan_type': scan_type, 'call_id': _tool_call_id, 'error': str(e)}) return json.dumps({'error': f'Scan failed: {e!s}'}, indent=2) - finally: - _cleanup_temp_files(temp_files) async def cycode_secret_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str: @@ -197,6 +241,10 @@ async def cycode_sca_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str: - verify software supply chain security - review package.json, requirements.txt, pom.xml and other dependency files + Important: + You must also include lock files (like package-lock.json, Pipfile.lock, etc.) to get accurate results. + You must provide manifest and lock files together. + Args: files: Dictionary mapping file paths to their content diff --git a/poetry.lock b/poetry.lock index cdaff5cf..67c78b46 100644 --- a/poetry.lock +++ b/poetry.lock @@ -644,6 +644,23 @@ files = [ {file = "patch-ng-1.18.1.tar.gz", hash = "sha256:52fd46ee46f6c8667692682c1fd7134edc65a2d2d084ebec1d295a6087fc0291"}, ] +[[package]] +name = "pathvalidate" +version = "3.3.1" +description = "pathvalidate is a Python library to sanitize/validate a string such as filenames/file-paths/etc." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pathvalidate-3.3.1-py3-none-any.whl", hash = "sha256:5263baab691f8e1af96092fa5137ee17df5bdfbd6cff1fcac4d6ef4bc2e1735f"}, + {file = "pathvalidate-3.3.1.tar.gz", hash = "sha256:b18c07212bfead624345bb8e1d6141cdcf15a39736994ea0b94035ad2b1ba177"}, +] + +[package.extras] +docs = ["Sphinx (>=2.4)", "sphinx_rtd_theme (>=1.2.2)", "urllib3 (<2)"] +readme = ["path (>=13,<18)", "readmemaker (>=1.2.0)"] +test = ["Faker (>=1.0.8)", "allpairspy (>=2)", "click (>=6.2)", "pytest (>=6.0.1)", "pytest-md-report (>=0.6.2)"] + [[package]] name = "pefile" version = "2024.8.26" @@ -1541,4 +1558,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.9,<3.14" -content-hash = "2a401929c8b999931e32f020bd794e9dc3716647bacc79afa70b7791ab86ce00" +content-hash = "a32a53bea8963df5ec2c1a0db09804b8e9466e523488326c20b3f6dd21dee6d2" diff --git a/pyproject.toml b/pyproject.toml index 022d8206..8fce7854 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ typer = "^0.15.3" tenacity = ">=9.0.0,<9.1.0" mcp = { version = ">=1.9.3,<2.0.0", markers = "python_version >= '3.10'" } pydantic = ">=2.11.5,<3.0.0" +pathvalidate = ">=3.3.1,<4.0.0" [tool.poetry.group.test.dependencies] mock = ">=4.0.3,<4.1.0"