diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 00000000..b3a02d9a --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,70 @@ +# Copilot Instructions for hackingBuddyGPT + +## Project Summary + +hackingBuddyGPT is a research-driven Python framework that helps security researchers and penetration testers use Large Language Models (LLMs) to automate and experiment with security testing, especially privilege escalation and web/API pentesting. It supports both local shell and SSH connections to targets, and is designed for rapid prototyping of new agent-based use cases. **Warning:** This tool executes real commands on live systems—use only in safe, isolated environments. + +## Tech Stack +- **Language:** Python 3.10+ +- **Core dependencies:** See `pyproject.toml` (notable: `fabric`, `requests`, `pydantic`, `pytest`) +- **CLI Entrypoint:** `wintermute` (see `src/hackingBuddyGPT/cli/wintermute.py`) +- **Web viewer:** Optional, for log viewing (`wintermute Viewer`) +- **RAG/Knowledge base:** Markdown files in `rag/` +- **Container/VM orchestration:** Bash scripts in `scripts/`, Ansible playbooks (`tasks.yaml`) + +## Project Structure +- `src/hackingBuddyGPT/` — Main Python package + - `cli/` — CLI entrypoint (`wintermute.py`) + - `capabilities/` — Modular agent actions (e.g., SSH, HTTP, note-taking) + - `usecases/` — Agent logic for each use case (Linux privesc, web, API, etc.) + - `utils/` — Shared helpers (LLM, logging, config, prompt generation) +- `tests/` — Pytest-based unit and integration tests +- `scripts/` — Setup, orchestration, and run scripts for Mac, Codespaces, and containers +- `rag/` — Markdown knowledge base for RAG (GTFOBins, HackTricks) +- `docs/` — Minimal, see https://docs.hackingbuddy.ai for full docs + +## Setup & Usage +- **Python:** Use 3.10+ (see `pyproject.toml`). +- **Install:** + ```bash + python -m venv venv + source venv/bin/activate + pip install -e . + ``` +- **Run:** + - List use cases: `python src/hackingBuddyGPT/cli/wintermute.py` + - Example: `python src/hackingBuddyGPT/cli/wintermute.py LinuxPrivesc --llm.api_key=... --conn=ssh ...` + - See `README.md`, `MAC.md`, `CODESPACES.md` for platform-specific instructions. +- **Testing:** `pip install '.[testing]' && pytest` +- **Linting:** `ruff` (config in `pyproject.toml`) +- **Container/VM setup:** Use scripts in `scripts/` (see comments in each script for prerequisites and usage). + +## Coding Guidelines +- Follow PEP8 and use `ruff` for linting (see `[tool.ruff]` in `pyproject.toml`). +- Use type hints and docstrings for all public functions/classes. +- Place new agent logic in `usecases/`, new capabilities in `capabilities/`. +- Prefer composition (capabilities, helpers) over inheritance. +- Use the logging utilities in `utils/logging.py`. +- Document all new scripts and major changes in the `README.md` or relevant `.md` files. +- Mark all workarounds or hacks with `HACK`, `TODO`, or `FIXME`. + +## Existing Tools & Resources +- **Documentation:** https://docs.hackingbuddy.ai +- **Community/Support:** Discord link in `README.md` +- **Security Policy:** See `SECURITY.md` +- **Code of Conduct:** See `CODE_OF_CONDUCT.md` +- **Contribution Guide:** See `CONTRIBUTING.md` +- **Citations:** See `CITATION.cff` +- **Benchmarks:** https://github.com/ipa-lab/benchmark-privesc-linux + +## Tips to Minimize Bash/Build Failures +- Always use the provided scripts for environment/container setup; do not run ad-hoc commands unless necessary. +- Ensure Bash version 4+ (Mac: install via Homebrew). +- Use virtual environments for Python dependencies. +- For Codespaces/Mac, follow the step-by-step guides in `CODESPACES.md` and `MAC.md`. +- Never expose the web viewer to the public internet. +- Always set API keys and credentials in `.env` or as prompted by scripts. +- For RAG, add new markdown files to the appropriate `rag/` subfolder. + +--- +For further details, see the `README.md` and https://docs.hackingbuddy.ai. When in doubt, prefer existing patterns and scripts over inventing new ones. diff --git a/.gitignore b/.gitignore index 04fa677a..bf9123bf 100644 --- a/.gitignore +++ b/.gitignore @@ -26,9 +26,10 @@ scripts/mac_ansible_id_rsa scripts/mac_ansible_id_rsa.pub .aider* -src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_spec/ src/hackingBuddyGPT/usecases/web_api_testing/documentation/reports/ src/hackingBuddyGPT/usecases/web_api_testing/retrieve_spotify_token.py config/my_configs/* config/configs/* -config/configs/ \ No newline at end of file +config/configs/ + +src/hackingBuddyGPT/usecases/web_api_documentation/openapi_spec/ diff --git a/src/hackingBuddyGPT/usecases/__init__.py b/src/hackingBuddyGPT/usecases/__init__.py index 394a8be8..85d045ef 100644 --- a/src/hackingBuddyGPT/usecases/__init__.py +++ b/src/hackingBuddyGPT/usecases/__init__.py @@ -1,4 +1,5 @@ from .web import * +from .web_api_documentation import * from .web_api_testing import * from .viewer import * from .minimal_linux_privesc import * diff --git a/src/hackingBuddyGPT/usecases/web_api_documentation/__init__.py b/src/hackingBuddyGPT/usecases/web_api_documentation/__init__.py new file mode 100644 index 00000000..0d9c41de --- /dev/null +++ b/src/hackingBuddyGPT/usecases/web_api_documentation/__init__.py @@ -0,0 +1 @@ +from .simple_openapi_documentation import SimpleWebAPIDocumentation \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/evaluator.py b/src/hackingBuddyGPT/usecases/web_api_documentation/evaluator.py similarity index 99% rename from src/hackingBuddyGPT/usecases/web_api_testing/utils/evaluator.py rename to src/hackingBuddyGPT/usecases/web_api_documentation/evaluator.py index acc7205d..a0d57078 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/evaluator.py +++ b/src/hackingBuddyGPT/usecases/web_api_documentation/evaluator.py @@ -1,7 +1,6 @@ import copy -from itertools import chain -from hackingBuddyGPT.usecases.web_api_testing.documentation.pattern_matcher import PatternMatcher +from hackingBuddyGPT.utils.web_api.pattern_matcher import PatternMatcher class Evaluator: diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py b/src/hackingBuddyGPT/usecases/web_api_documentation/openapi_specification_handler.py similarity index 75% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py rename to src/hackingBuddyGPT/usecases/web_api_documentation/openapi_specification_handler.py index 6a5df36a..13fa35f2 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_documentation/openapi_specification_handler.py @@ -1,13 +1,15 @@ +import copy +import json import os import re from collections import defaultdict from datetime import datetime +from typing import Any, Dict, Optional, Tuple import yaml from hackingBuddyGPT.capabilities.yamlFile import YAMLFile -from hackingBuddyGPT.usecases.web_api_testing.documentation.pattern_matcher import PatternMatcher +from hackingBuddyGPT.utils.web_api.pattern_matcher import PatternMatcher from hackingBuddyGPT.utils.prompt_generation.information import PromptStrategy -from hackingBuddyGPT.usecases.web_api_testing.response_processing import ResponseHandler -from hackingBuddyGPT.usecases.web_api_testing.utils import LLMHandler +from hackingBuddyGPT.utils.web_api.llm_handler import LLMHandler class OpenAPISpecificationHandler(object): @@ -15,7 +17,6 @@ class OpenAPISpecificationHandler(object): Handles the generation and updating of an OpenAPI specification document based on dynamic API responses. Attributes: - response_handler (object): An instance of the response handler for processing API responses. schemas (dict): A dictionary to store API schemas. filename (str): The filename for the OpenAPI specification file. openapi_spec (dict): The OpenAPI specification document structure. @@ -26,18 +27,16 @@ class OpenAPISpecificationHandler(object): _capabilities (dict): A dictionary to store capabilities related to YAML file handling. """ - def __init__(self, llm_handler: LLMHandler, response_handler: ResponseHandler, strategy: PromptStrategy, url: str, + def __init__(self, llm_handler: LLMHandler, strategy: PromptStrategy, url: str, description: str, name: str) -> None: """ Initializes the handler with a template OpenAPI specification. Args: llm_handler (object): An instance of the LLM handler for interacting with the LLM. - response_handler (object): An instance of the response handler for processing API responses. strategy (PromptStrategy): An instance of the PromptStrategy class. """ self.unsuccessful_methods = {} - self.response_handler = response_handler self.schemas = {} self.query_params = {} self.endpoint_methods = {} @@ -103,6 +102,143 @@ def is_partial_match(self, element, string_list): return False + def parse_http_response_to_openapi_example( + self, openapi_spec: Dict[str, Any], http_response: str, path: str, method: str + ) -> Tuple[Optional[Dict[str, Any]], Optional[str], Dict[str, Any]]: + """ + Parses an HTTP response to generate an OpenAPI example. + + Args: + openapi_spec (Dict[str, Any]): The OpenAPI specification to update. + http_response (str): The HTTP response to parse. + path (str): The API path. + method (str): The HTTP method. + + Returns: + Tuple[Optional[Dict[str, Any]], Optional[str], Dict[str, Any]]: A tuple containing the entry dictionary, reference, and updated OpenAPI specification. + """ + + headers, body = http_response.split("\r\n\r\n", 1) + try: + body_dict = json.loads(body) + except json.decoder.JSONDecodeError: + return None, None, openapi_spec + + reference, object_name, openapi_spec = self.parse_http_response_to_schema(openapi_spec, body_dict, path) + entry_dict = {} + old_body_dict = copy.deepcopy(body_dict) + + if len(body_dict) == 1 and "data" not in body_dict: + entry_dict["id"] = body_dict + self.llm_handler._add_created_object(entry_dict, object_name) + else: + if "data" in body_dict: + body_dict = body_dict["data"] + if isinstance(body_dict, list) and len(body_dict) > 0: + body_dict = body_dict[0] + if isinstance(body_dict, list): + for entry in body_dict: + key = entry.get("title") or entry.get("name") or entry.get("id") + entry_dict[key] = {"value": entry} + self.llm_handler._add_created_object(entry_dict[key], object_name) + if len(entry_dict) > 3: + break + + + if isinstance(body_dict, list) and len(body_dict) > 0: + body_dict = body_dict[0] + if isinstance(body_dict, list): + + for entry in body_dict: + key = entry.get("title") or entry.get("name") or entry.get("id") + entry_dict[key] = entry + self.llm_handler._add_created_object(entry_dict[key], object_name) + if len(entry_dict) > 3: + break + else: + if isinstance(body_dict, list) and len(body_dict) == 0: + entry_dict = "" + elif isinstance(body_dict, dict) and "data" in body_dict.keys(): + entry_dict = body_dict["data"] + if isinstance(entry_dict, list) and len(entry_dict) > 0: + entry_dict = entry_dict[0] + else: + entry_dict= body_dict + self.llm_handler._add_created_object(entry_dict, object_name) + if isinstance(old_body_dict, dict) and len(old_body_dict.keys()) > 0 and "data" in old_body_dict.keys() and isinstance(old_body_dict, dict) \ + and isinstance(entry_dict, dict): + old_body_dict.pop("data") + entry_dict = {**entry_dict, **old_body_dict} + + + return entry_dict, reference, openapi_spec + + def parse_http_response_to_schema( + self, openapi_spec: Dict[str, Any], body_dict: Dict[str, Any], path: str + ) -> Tuple[str, str, Dict[str, Any]]: + """ + Parses an HTTP response body to generate an OpenAPI schema. + + Args: + openapi_spec (Dict[str, Any]): The OpenAPI specification to update. + body_dict (Dict[str, Any]): The HTTP response body as a dictionary or list. + path (str): The API path. + + Returns: + Tuple[str, str, Dict[str, Any]]: A tuple containing the reference, object name, and updated OpenAPI specification. + """ + if "/" not in path: + return None, None, openapi_spec + + object_name = path.split("/")[1].capitalize().rstrip("s") + properties_dict = {} + + # Handle different structures of `body_dict` + if isinstance(body_dict, dict): + for key, value in body_dict.items(): + # If it's a nested dictionary, extract keys recursively + properties_dict = self.extract_keys(key, value, properties_dict) + + elif isinstance(body_dict, list) and len(body_dict) > 0: + first_item = body_dict[0] + if isinstance(first_item, dict): + for key, value in first_item.items(): + properties_dict = self.extract_keys(key, value, properties_dict) + + # Create the schema object for this response + object_dict = {"type": "object", "properties": properties_dict} + + # Add the schema to OpenAPI spec if not already present + if object_name not in openapi_spec["components"]["schemas"]: + openapi_spec["components"]["schemas"][object_name] = object_dict + + reference = f"#/components/schemas/{object_name}" + return reference, object_name, openapi_spec + + def extract_keys(self, key: str, value: Any, properties_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Extracts and formats the keys and values from a dictionary to generate OpenAPI properties. + + Args: + key (str): The key in the dictionary. + value (Any): The value associated with the key. + properties_dict (Dict[str, Any]): The dictionary to store the extracted properties. + + Returns: + Dict[str, Any]: The updated properties dictionary. + """ + if key == "id": + properties_dict[key] = { + "type": str(type(value).__name__), + "format": "uuid", + "example": str(value), + } + else: + properties_dict[key] = {"type": str(type(value).__name__), "example": str(value)} + + return properties_dict + + def update_openapi_spec(self, resp, result, prompt_engineer): """ Updates the OpenAPI specification based on the API response provided. @@ -156,7 +292,7 @@ def update_openapi_spec(self, resp, result, prompt_engineer): return list(self.openapi_spec["endpoints"].keys()) # Parse the response into OpenAPI example and reference - example, reference, self.openapi_spec = self.response_handler.parse_http_response_to_openapi_example( + example, reference, self.openapi_spec = self.parse_http_response_to_openapi_example( self.openapi_spec, result, path, method ) diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py b/src/hackingBuddyGPT/usecases/web_api_documentation/simple_openapi_documentation.py similarity index 91% rename from src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py rename to src/hackingBuddyGPT/usecases/web_api_documentation/simple_openapi_documentation.py index 5e8cc21c..77f6a367 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py +++ b/src/hackingBuddyGPT/usecases/web_api_documentation/simple_openapi_documentation.py @@ -1,3 +1,5 @@ +import json +from logging import config import os from dataclasses import field @@ -6,17 +8,18 @@ from hackingBuddyGPT.capabilities.http_request import HTTPRequest from hackingBuddyGPT.capabilities.record_note import RecordNote from hackingBuddyGPT.usecases.base import AutonomousUseCase, use_case -from hackingBuddyGPT.usecases.web_api_testing.documentation.openapi_specification_handler import \ +from hackingBuddyGPT.usecases.web_api_documentation.openapi_specification_handler import \ OpenAPISpecificationHandler from hackingBuddyGPT.utils.capability_manager import CapabilityManager -from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper +from hackingBuddyGPT.utils.logging import Logger, log_param +from hackingBuddyGPT.utils.prompt_generation.information.prompt_information import PromptStrategy +from hackingBuddyGPT.utils.prompt_generation.prompt_generation_helper import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PromptContext from hackingBuddyGPT.utils.prompt_generation.prompt_engineer import PromptEngineer -from hackingBuddyGPT.usecases.web_api_testing.response_processing.response_handler import ResponseHandler -from hackingBuddyGPT.usecases.web_api_testing.utils import LLMHandler -from hackingBuddyGPT.usecases.web_api_testing.utils.configuration_handler import ConfigurationHandler -from hackingBuddyGPT.usecases.web_api_testing.utils.custom_datatypes import Context, Prompt -from hackingBuddyGPT.usecases.web_api_testing.utils.evaluator import Evaluator +from hackingBuddyGPT.utils.web_api.response_handler import ResponseHandler +from hackingBuddyGPT.utils.web_api.llm_handler import LLMHandler +from hackingBuddyGPT.utils.web_api.custom_datatypes import Context, Prompt +from hackingBuddyGPT.usecases.web_api_documentation.evaluator import Evaluator from hackingBuddyGPT.utils.configurable import parameter from hackingBuddyGPT.utils.openai.openai_lib import OpenAILib @@ -30,7 +33,6 @@ class SimpleWebAPIDocumentation(AutonomousUseCase): llm (OpenAILib): The language model interface used for prompt execution. _prompt_history (Prompt): Internal history of prompts exchanged with the LLM. _context (Context): Context information used by capabilities (e.g., notes). - _capabilities (Dict[str, Capability]): Dictionary of active tool capabilities (HTTP requests, notes, etc.). config_path (str): Path to the configuration file for the API under test. strategy_string (str): Serialized string representing the documentation strategy to apply. _http_method_description (str): Description for identifying HTTP methods in responses. @@ -41,6 +43,7 @@ class SimpleWebAPIDocumentation(AutonomousUseCase): all_steps_done (bool): Flag to indicate whether the full documentation process is complete. """ llm: OpenAILib = None + log: Logger = log_param _prompt_history: Prompt = field(default_factory=list) _context: Context = field(default_factory=lambda: {"notes": list()}) _capabilities: CapabilityManager = None @@ -76,6 +79,15 @@ class SimpleWebAPIDocumentation(AutonomousUseCase): def get_name(self) -> str: return self.__class__.__name__ + + def get_strategy(self, strategy_string): + + strategies = { + "cot": PromptStrategy.CHAIN_OF_THOUGHT, + "tot": PromptStrategy.TREE_OF_THOUGHT, + "icl": PromptStrategy.IN_CONTEXT + } + return strategies.get(strategy_string, PromptStrategy.IN_CONTEXT) def init(self): """Initialize the agent with configurations, capabilities, and handlers.""" @@ -84,15 +96,24 @@ def init(self): self.found_all_http_methods = False self.all_steps_done = False + # load config file + self.strategy = self.get_strategy(self.strategy_string) - config_handler = ConfigurationHandler(self.config_path, self.strategy_string) - config, self.strategy = config_handler.load() - token, self.host, description, self._correct_endpoints, query_params = config_handler._extract_config_values(config) + """Loads JSON configuration from the specified path.""" + if not os.path.exists(self.config_path): + raise FileNotFoundError(f"Configuration file not found at {self.config_path}") + with open(self.config_path, 'r') as file: + config = json.load(file) + token = config.get("token") + self.host = config.get("host") + description = config.get("description") + self._correct_endpoints = config.get("correct_endpoints", {}) + query_params = config.get("query_params", {}) self.categorized_endpoints = self.categorize_endpoints(self._correct_endpoints, query_params) # setup capabilities - self._capabilities.init() + self._capabilities = CapabilityManager(self.log) self._capabilities.add_capability(HTTPRequest(self.host)) self._capabilities.add_capability(RecordNote(self._context["notes"])) @@ -160,7 +181,7 @@ def _initialize_handlers(self, config, description, token, name, initial_prompt) self._response_handler = ResponseHandler(llm_handler=self._llm_handler, prompt_context=self._prompt_context, prompt_helper=self.prompt_helper, config=config) self._documentation_handler = OpenAPISpecificationHandler( - self._llm_handler, self._response_handler, self.strategy, self.host, description, name + self._llm_handler, self.strategy, self.host, description, name ) self._prompt_history.append(initial_prompt) diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/__init__.py b/src/hackingBuddyGPT/usecases/web_api_testing/__init__.py index 42edb2bd..116bef02 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/__init__.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/__init__.py @@ -1,5 +1 @@ -from .simple_openapi_documentation import SimpleWebAPIDocumentation -from .simple_web_api_testing import SimpleWebAPITesting -from . import response_processing -from . import documentation -from . import testing +from .simple_web_api_testing import SimpleWebAPITesting \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/__init__.py b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/__init__.py deleted file mode 100644 index 3038bb3b..00000000 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .openapi_specification_handler import OpenAPISpecificationHandler -from .report_handler import ReportHandler diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/report_handler.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py rename to src/hackingBuddyGPT/usecases/web_api_testing/report_handler.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/__init__.py b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/__init__.py deleted file mode 100644 index 4f1206eb..00000000 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .response_analyzer import ResponseAnalyzer -from .response_handler import ResponseHandler - -# from .response_analyzer_with_llm import ResponseAnalyzerWithLLM diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer.py b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer.py deleted file mode 100644 index ff9fa4ca..00000000 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer.py +++ /dev/null @@ -1,310 +0,0 @@ -import json -import re -from typing import Any, Dict, Optional, Tuple - -from hackingBuddyGPT.utils.prompt_generation.information import PromptPurpose - - -class ResponseAnalyzer: - """ - A class to parse and analyze HTTP responses based on different purposes, such as - authentication/authorization checks and input validation. - - Attributes: - purpose (Optional[PromptPurpose]): The specific purpose for analyzing the HTTP response. It determines - which analysis method will be applied. - """ - - def __init__(self, purpose: Optional[PromptPurpose] = None) -> None: - """ - Initializes the ResponseAnalyzer with an optional purpose. - - Args: - purpose (Optional[PromptPurpose]): The purpose for analyzing the HTTP response. Default is None. - """ - self.purpose: Optional[PromptPurpose] = purpose - - def set_purpose(self, purpose: PromptPurpose) -> None: - """ - Sets the purpose for analyzing the HTTP response. - - Args: - purpose (PromptPurpose): The specific purpose for analyzing the HTTP response. - """ - self.purpose = purpose - - def parse_http_response(self, raw_response: str) -> Tuple[Optional[int], Dict[str, str], str]: - """ - Parses the raw HTTP response string into its components: status line, headers, and body. - - Args: - raw_response (str): The raw HTTP response string to parse. - - Returns: - Tuple[Optional[int], Dict[str, str], str]: A tuple containing the status code (int), headers (dict), and body (str). - """ - header_body_split = raw_response.split("\r\n\r\n", 1) - header_lines = header_body_split[0].split("\n") - body = header_body_split[1] if len(header_body_split) > 1 else "" - - if body != {} and bool(body and not body.isspace()): - body = json.loads(body)[0] - - if body == "": - for line in header_lines: - if line.startswith("{") or line.startswith("["): - body = line - body = json.loads(body) - - status_line = header_lines[0].strip() - headers = { - key.strip(): value.strip() - for key, value in (line.split(":", 1) for line in header_lines[1:] if ":" in line) - } - - match = re.match(r"HTTP/1\.1 (\d{3}) (.*)", status_line) - status_code = int(match.group(1)) if match else None - - return status_code, headers, body - - def analyze_response(self, raw_response: str) -> Optional[Dict[str, Any]]: - """ - Parses the HTTP response and analyzes it based on the set purpose. - - Args: - raw_response (str): The raw HTTP response string to parse and analyze. - - Returns: - Optional[Dict[str, Any]]: The analysis results based on the purpose. - """ - status_code, headers, body = self.parse_http_response(raw_response) - return self.analyze_parsed_response(status_code, headers, body) - - def analyze_parsed_response( - self, status_code: Optional[int], headers: Dict[str, str], body: str - ) -> Optional[Dict[str, Any]]: - """ - Analyzes the parsed HTTP response based on the purpose, invoking the appropriate method. - - Args: - status_code (Optional[int]): The HTTP status code. - headers (Dict[str, str]): The HTTP headers. - body (str): The HTTP response body. - - Returns: - Optional[Dict[str, Any]]: The analysis results based on the purpose. - """ - analysis_methods = { - PromptPurpose.AUTHENTICATION: self.analyze_authentication_authorization( - status_code, headers, body - ), - PromptPurpose.INPUT_VALIDATION: self.analyze_input_validation(status_code, headers, body), - } - return analysis_methods.get(self.purpose) - - def analyze_authentication_authorization( - self, status_code: Optional[int], headers: Dict[str, str], body: str - ) -> Dict[str, Any]: - """ - Analyzes the HTTP response with a focus on authentication and authorization. - - Args: - status_code (Optional[int]): The HTTP status code. - headers (Dict[str, str]): The HTTP headers. - body (str): The HTTP response body. - - Returns: - Dict[str, Any]: The analysis results focused on authentication and authorization. - """ - analysis = { - "status_code": status_code, - "authentication_status": ( - "Authenticated" - if status_code == 200 - else "Not Authenticated or Not Authorized" - if status_code in [401, 403] - else "Unknown" - ), - "auth_headers_present": any( - header in headers for header in ["Authorization", "Set-Cookie", "WWW-Authenticate"] - ), - "rate_limiting": { - "X-Ratelimit-Limit": headers.get("X-Ratelimit-Limit"), - "X-Ratelimit-Remaining": headers.get("X-Ratelimit-Remaining"), - "X-Ratelimit-Reset": headers.get("X-Ratelimit-Reset"), - }, - "content_body": "Empty" if body == {} else body, - } - return analysis - - def analyze_input_validation( - self, status_code: Optional[int], headers: Dict[str, str], body: str - ) -> Dict[str, Any]: - """ - Analyzes the HTTP response with a focus on input validation. - - Args: - status_code (Optional[int]): The HTTP status code. - headers (Dict[str, str]): The HTTP headers. - body (str): The HTTP response body. - - Returns: - Dict[str, Any]: The analysis results focused on input validation. - """ - analysis = { - "status_code": status_code, - "response_body": "Empty" if body == {} else body, - "is_valid_response": self.is_valid_input_response(status_code, body), - "security_headers_present": any(key in headers for key in ["X-Content-Type-Options", "X-Ratelimit-Limit"]), - } - return analysis - - def is_valid_input_response(self, status_code: Optional[int], body: str) -> str: - """ - Determines if the HTTP response is valid based on the status code and body content. - - Args: - status_code (Optional[int]): The HTTP status code. - body (str): The HTTP response body. - - Returns: - str: The validity status ("Valid", "Invalid", "Error", or "Unexpected"). - """ - if status_code == 200: - return "Valid" - elif status_code == 400: - return "Invalid" - elif status_code in [401, 403, 404, 500]: - return "Error" - else: - return "Unexpected" - - def document_findings( - self, - status_code: Optional[int], - headers: Dict[str, str], - body: str, - expected_behavior: str, - actual_behavior: str, - ) -> Dict[str, Any]: - """ - Documents the findings from the analysis, comparing expected and actual behavior. - - Args: - status_code (Optional[int]): The HTTP status code. - headers (Dict[str, str]): The HTTP headers. - body (str): The HTTP response body. - expected_behavior (str): The expected behavior of the API. - actual_behavior (str): The actual behavior observed. - - Returns: - Dict[str, Any]: A dictionary containing the documented findings. - """ - document = { - "Status Code": status_code, - "Headers": headers, - "Response Body": body.strip(), - "Expected Behavior": expected_behavior, - "Actual Behavior": actual_behavior, - } - - return document - - def report_issues(self, document: Dict[str, Any]) -> None: - """ - Reports any discrepancies found during analysis, suggesting improvements where necessary. - - Args: - document (Dict[str, Any]): The documented findings to be reported. - """ - print("Reporting Issues:") - if document["Expected Behavior"] != document["Actual Behavior"]: - print("Issue Found:") - print(f"Expected: {document['Expected Behavior']}") - print(f"Actual: {document['Actual Behavior']}") - print("Suggestion: Improve input validation, clearer error messages, or enhanced security measures.") - else: - print("No issues found in this test case.") - print("-" * 50) - - def print_analysis(self, analysis: Dict[str, Any]) -> str: - """ - Prints the analysis results in a structured and readable format. - - Args: - analysis (Dict[str, Any]): The analysis results to be printed. - - Returns: - str: A formatted string representing the analysis results. - """ - fields_to_print = { - "HTTP Status Code": analysis.get("status_code"), - "Response Body": analysis.get("response_body"), - "Content Body": analysis.get("content_body"), - "Valid Response": analysis.get("is_valid_response"), - "Authentication Status": analysis.get("authentication_status"), - "Security Headers Present": "Yes" if analysis.get("security_headers_present") else "No", - } - analysis_str = "\n" - - for label, value in fields_to_print.items(): - if label == "Content Body": - if value is not None: - analysis_str += f"{label}: {fields_to_print['Content Body']}" - else: - if value is not None: - analysis_str += f"{label}: {value}\n" - - if "rate_limiting" in analysis: - analysis_str += "Rate Limiting Information:\n" - - for key, value in analysis["rate_limiting"].items(): - analysis_str += f" {key}: {value}\n" - - analysis_str += "-" * 50 - return analysis_str - - -if __name__ == "__main__": - # Example HTTP response to parse - raw_http_response = """HTTP/1.1 404 Not Found - Date: Fri, 16 Aug 2024 10:01:19 GMT - Content-Type: application/json; charset=utf-8 - Content-Length: 2 - Connection: keep-alive - Report-To: {"group":"heroku-nel","max_age":3600,"endpoints":[{"url":"https://nel.heroku.com/reports?ts=1723802269&sid=e11707d5-02a7-43ef-b45e-2cf4d2036f7d&s=dkvm744qehjJmab8kgf%2BGuZA8g%2FCCIkfoYc1UdYuZMc%3D"}]} - Reporting-Endpoints: heroku-nel=https://nel.heroku.com/reports?ts=1723802269&sid=e11707d5-02a7-43ef-b45e-2cf4d2036f7d&s=dkvm744qehjJmab8kgf%2BGuZA8g%2FCCIkfoYc1UdYuZMc%3D - Nel: {"report_to":"heroku-nel","max_age":3600,"success_fraction":0.005,"failure_fraction":0.05,"response_headers":["Via"]} - X-Powered-By: Express - X-Ratelimit-Limit: 1000 - X-Ratelimit-Remaining: 999 - X-Ratelimit-Reset: 1723802321 - Vary: Origin, Accept-Encoding - Access-Control-Allow-Credentials: true - Cache-Control: max-age=43200 - Pragma: no-cache - Expires: -1 - X-Content-Type-Options: nosniff - Etag: W/"2-vyGp6PvFo4RvsFtPoIWeCReyIC8" - Via: 1.1 vegur - CF-Cache-Status: HIT - Age: 210 - Server: cloudflare - CF-RAY: 8b40951728d9c289-VIE - alt-svc: h3=":443"; ma=86400 - - {}""" - response_analyzer = ResponseAnalyzer() - response_analyzer.purpose = PromptPurpose.AUTHENTICATION_AUTHORIZATION - # Parse and analyze the HTTP response - analysis = response_analyzer.analyze_response(raw_http_response) - - # Print the analysis results - response_analyzer.print_analysis(analysis) - response_analyzer = ResponseAnalyzer() - response_analyzer.purpose = PromptPurpose.INPUT_VALIDATION - # Parse and analyze the HTTP response - analysis = response_analyzer.analyze_response(raw_http_response) - - # Print the analysis results - print(response_analyzer.print_analysis(analysis)) diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py index 59f6120b..0652e45c 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py @@ -14,20 +14,20 @@ from hackingBuddyGPT.capabilities.record_note import RecordNote from hackingBuddyGPT.usecases.base import AutonomousUseCase, use_case from hackingBuddyGPT.utils.capability_manager import CapabilityManager -from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper +from hackingBuddyGPT.utils.prompt_generation.information.prompt_information import PromptStrategy +from hackingBuddyGPT.utils.prompt_generation.prompt_generation_helper import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PenTestingInformation from hackingBuddyGPT.utils.prompt_generation.information import PromptPurpose from hackingBuddyGPT.utils.openapi.openapi_parser import OpenAPISpecificationParser -from hackingBuddyGPT.usecases.web_api_testing.documentation.report_handler import ReportHandler +from hackingBuddyGPT.usecases.web_api_testing.report_handler import ReportHandler from hackingBuddyGPT.utils.prompt_generation.information import PromptContext from hackingBuddyGPT.utils.prompt_generation.prompt_engineer import PromptEngineer -from hackingBuddyGPT.usecases.web_api_testing.response_processing.response_analyzer_with_llm import \ +from hackingBuddyGPT.utils.web_api.response_analyzer_with_llm import \ ResponseAnalyzerWithLLM -from hackingBuddyGPT.usecases.web_api_testing.response_processing.response_handler import ResponseHandler -from hackingBuddyGPT.usecases.web_api_testing.testing.test_handler import GenerationTestHandler -from hackingBuddyGPT.usecases.web_api_testing.utils.configuration_handler import ConfigurationHandler -from hackingBuddyGPT.usecases.web_api_testing.utils.custom_datatypes import Context, Prompt -from hackingBuddyGPT.usecases.web_api_testing.utils.llm_handler import LLMHandler +from hackingBuddyGPT.utils.web_api.response_handler import ResponseHandler +from hackingBuddyGPT.usecases.web_api_testing.test_handler import GenerationTestHandler +from hackingBuddyGPT.utils.web_api.custom_datatypes import Context, Prompt +from hackingBuddyGPT.utils.web_api.llm_handler import LLMHandler from hackingBuddyGPT.utils import tool_message from hackingBuddyGPT.utils.configurable import parameter from hackingBuddyGPT.utils.openai.openai_lib import OpenAILib @@ -73,12 +73,32 @@ class SimpleWebAPITesting(AutonomousUseCase): _capabilities: CapabilityManager = None _all_test_cases_run: bool = False + def get_strategy(self, strategy_string): + + strategies = { + "cot": PromptStrategy.CHAIN_OF_THOUGHT, + "tot": PromptStrategy.TREE_OF_THOUGHT, + "icl": PromptStrategy.IN_CONTEXT + } + return strategies.get(strategy_string, PromptStrategy.IN_CONTEXT) + def init(self): super().init() - configuration_handler = ConfigurationHandler(self.config_path, self.strategy_string) - self.config, self.strategy = configuration_handler.load() - self.token, self.host, self.description, self.correct_endpoints, self.query_params = configuration_handler._extract_config_values( - self.config) + + # load config file + self.strategy = self.get_strategy(self.strategy_string) + + """Loads JSON configuration from the specified path.""" + if not os.path.exists(self.config_path): + raise FileNotFoundError(f"Configuration file not found at {self.config_path}") + with open(self.config_path, 'r') as file: + self.config = json.load(file) + self.token = self.config.get("token") + self.host = self.config.get("host") + self.description = self.config.get("description") + self.correct_endpoints = self.config.get("correct_endpoints", {}) + self.query_params = self.config.get("query_params", {}) + self._load_openapi_specification() self._setup_environment() self._setup_handlers() diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/test_handler.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py rename to src/hackingBuddyGPT/usecases/web_api_testing/test_handler.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/testing/__init__.py b/src/hackingBuddyGPT/usecases/web_api_testing/testing/__init__.py deleted file mode 100644 index be3b5ebc..00000000 --- a/src/hackingBuddyGPT/usecases/web_api_testing/testing/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .test_handler import GenerationTestHandler diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/__init__.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/__init__.py deleted file mode 100644 index 92159799..00000000 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .custom_datatypes import Context, Prompt -from .llm_handler import LLMHandler diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py deleted file mode 100644 index 05cadd33..00000000 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py +++ /dev/null @@ -1,59 +0,0 @@ -import json -import os - -from hackingBuddyGPT.utils.prompt_generation.information import PromptStrategy - - -class ConfigurationHandler(object): - - def __init__(self, config_file, strategy_string=None): - self.config_path = config_file - self.strategy_string = strategy_string - - def load(self, strategy_string=None): - if self.config_path != "": - if self.config_path != "": - current_file_path = os.path.dirname(os.path.abspath(__file__)) - self.config_path = os.path.join(current_file_path, "configs", self.config_path) - config = self._load_config() - - if "spotify" in self.config_path: - os.environ['SPOTIPY_CLIENT_ID'] = config['client_id'] - os.environ['SPOTIPY_CLIENT_SECRET'] = config['client_secret'] - os.environ['SPOTIPY_REDIRECT_URI'] = config['redirect_uri'] - - return config, self.get_strategy(strategy_string) - - def get_strategy(self, strategy_string=None): - - strategies = { - "cot": PromptStrategy.CHAIN_OF_THOUGHT, - "tot": PromptStrategy.TREE_OF_THOUGHT, - "icl": PromptStrategy.IN_CONTEXT - } - if strategy_string: - return strategies.get(strategy_string, PromptStrategy.IN_CONTEXT) - - return strategies.get(self.strategy_string, PromptStrategy.IN_CONTEXT) - - def _load_config(self, config_path=None): - if config_path is None: - config_path = self.config_path - """Loads JSON configuration from the specified path.""" - if not os.path.exists(config_path): - raise FileNotFoundError(f"Configuration file not found at {config_path}") - with open(config_path, 'r') as file: - return json.load(file) - - - - - def _extract_config_values(self, config): - token = config.get("token") - host = config.get("host") - description = config.get("description") - correct_endpoints = config.get("correct_endpoints", {}) - query_params = config.get("query_params", {}) - return token, host, description, correct_endpoints, query_params - - diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/documentation_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/documentation_handler.py deleted file mode 100644 index 32aa8317..00000000 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/documentation_handler.py +++ /dev/null @@ -1,129 +0,0 @@ -import os -import yaml -from datetime import datetime -from hackingBuddyGPT.capabilities.yamlFile import YAMLFile - - -class DocumentationHandler: - """ - Handles the generation and updating of an OpenAPI specification document based on dynamic API responses. - - Attributes: - response_handler (object): An instance of the response handler for processing API responses. - schemas (dict): A dictionary to store API schemas. - filename (str): The filename for the OpenAPI specification file. - openapi_spec (dict): The OpenAPI specification document structure. - llm_handler (object): An instance of the LLM handler for interacting with the LLM. - api_key (str): The API key for accessing the LLM. - file_path (str): The path to the directory where the OpenAPI specification file will be stored. - file (str): The complete path to the OpenAPI specification file. - _capabilities (dict): A dictionary to store capabilities related to YAML file handling. - """ - - def __init__(self, llm_handler, response_handler): - """ - Initializes the handler with a template OpenAPI specification. - - Args: - llm_handler (object): An instance of the LLM handler for interacting with the LLM. - response_handler (object): An instance of the response handler for processing API responses. - """ - self.response_handler = response_handler - self.schemas = {} - self.filename = f"openapi_spec_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.yaml" - self.openapi_spec = { - "openapi": "3.0.0", - "info": { - "title": "Generated API Documentation", - "version": "1.0", - "description": "Automatically generated description of the API." - }, - "servers": [{"url": "https://localhost:8080"}], - "endpoints": {}, - "components": {"schemas": {}} - } - self.llm_handler = llm_handler - self.api_key = llm_handler.llm.api_key - current_path = os.path.dirname(os.path.abspath(__file__)) - self.file_path = os.path.join(current_path, "openapi_spec") - self.file = os.path.join(self.file_path, self.filename) - self._capabilities = { - "yaml": YAMLFile() - } - - def update_openapi_spec(self, resp, result): - """ - Updates the OpenAPI specification based on the API response provided. - - Args: - resp (object): The response object containing details like the path and method which should be documented. - result (str): The result of the API call. - """ - request = resp.action - - if request.__class__.__name__ == 'RecordNote': # TODO: check why isinstance does not work - self.check_openapi_spec(resp) - if request.__class__.__name__ == 'HTTPRequest': - path = request.path - method = request.method - # Ensure that path and method are not None and method has no numeric characters - if path and method: - # Initialize the path if not already present - if path not in self.openapi_spec['endpoints']: - self.openapi_spec['endpoints'][path] = {} - # Update the method description within the path - example, reference, self.openapi_spec = self.response_handler.parse_http_response_to_openapi_example( - self.openapi_spec, result, path, method) - if example is not None or reference is not None: - self.openapi_spec['endpoints'][path][method.lower()] = { - "summary": f"{method} operation on {path}", - "responses": { - "200": { - "description": "Successful response", - "content": { - "application/json": { - "schema": { - "$ref": reference - }, - "examples": example - } - } - } - } - } - - def write_openapi_to_yaml(self): - """ - Writes the updated OpenAPI specification to a YAML file with a timestamped filename. - """ - try: - # Prepare data to be written to YAML - openapi_data = { - "openapi": self.openapi_spec["openapi"], - "info": self.openapi_spec["info"], - "servers": self.openapi_spec["servers"], - "components": self.openapi_spec["components"], - "paths": self.openapi_spec["endpoints"] - } - - # Create directory if it doesn't exist and generate the timestamped filename - os.makedirs(self.file_path, exist_ok=True) - - # Write to YAML file - with open(self.file, 'w') as yaml_file: - yaml.dump(openapi_data, yaml_file, allow_unicode=True, default_flow_style=False) - print(f"OpenAPI specification written to {self.filename}.") - except Exception as e: - raise Exception(f"Error writing YAML file: {e}") - - def check_openapi_spec(self, note): - """ - Uses OpenAI's GPT model to generate a complete OpenAPI specification based on a natural language description. - - Args: - note (object): The note object containing the description of the API. - """ - description = self.response_handler.extract_description(note) - from hackingBuddyGPT.usecases.web_api_testing.documentation.parsing.yaml_assistant import YamlFileAssistant - yaml_file_assistant = YamlFileAssistant(self.file_path, self.llm_handler) - yaml_file_assistant.run(description) diff --git a/src/hackingBuddyGPT/utils/openapi/__init__.py b/src/hackingBuddyGPT/utils/openapi/__init__.py index 1dc8cc54..9595117f 100644 --- a/src/hackingBuddyGPT/utils/openapi/__init__.py +++ b/src/hackingBuddyGPT/utils/openapi/__init__.py @@ -1,3 +1 @@ -from .openapi_converter import OpenAPISpecificationConverter from .openapi_parser import OpenAPISpecificationParser -from .yaml_assistant import YamlFileAssistant diff --git a/src/hackingBuddyGPT/utils/openapi/openapi_converter.py b/src/hackingBuddyGPT/utils/openapi/openapi_converter.py deleted file mode 100644 index 0f23465d..00000000 --- a/src/hackingBuddyGPT/utils/openapi/openapi_converter.py +++ /dev/null @@ -1,161 +0,0 @@ -import json -import os.path - -import yaml - - -class OpenAPISpecificationConverter: - """ - OpenAPISpecificationConverter is a class for converting OpenAPI specification files between YAML and JSON formats. - - Attributes: - base_directory (str): The base directory for the output files. - """ - - def __init__(self, base_directory): - """ - Initializes the OpenAPISpecificationConverter with the specified base directory. - - Args: - base_directory (str): The base directory for the output files. - """ - self.base_directory = base_directory - - def convert_file(self, input_filepath, output_directory, input_type, output_type): - """ - Converts files between YAML and JSON formats. - - Args: - input_filepath (str): The path to the input file. - output_directory (str): The subdirectory for the output files. - input_type (str): The type of the input file ('yaml' or 'json'). - output_type (str): The type of the output file ('json' or 'yaml'). - - Returns: - str: The path to the converted output file, or None if an error occurred. - """ - try: - filename = os.path.basename(input_filepath) - output_filename = filename.replace(f".{input_type}", f".{output_type}") - output_path = os.path.join(self.base_directory, output_directory, output_filename) - - os.makedirs(os.path.dirname(output_path), exist_ok=True) - - with open(input_filepath, "r") as infile: - if input_type == "yaml": - content = yaml.safe_load(infile) - else: - content = json.load(infile) - - with open(output_path, "w") as outfile: - if output_type == "yaml": - yaml.dump(content, outfile, allow_unicode=True, default_flow_style=False) - else: - json.dump(content, outfile, indent=2) - - print(f"Successfully converted {input_filepath} to {output_filename}") - return output_path - - except Exception as e: - print(f"Error converting {input_filepath}: {e}") - return None - - def yaml_to_json(self, yaml_filepath): - """ - Converts a YAML file to a JSON file. - - Args: - yaml_filepath (str): The path to the YAML file to be converted. - - Returns: - str: The path to the converted JSON file, or None if an error occurred. - """ - return self.convert_file(yaml_filepath, "json", "yaml", "json") - - def json_to_yaml(self, json_filepath): - """ - Converts a JSON file to a YAML file. - - Args: - json_filepath (str): The path to the JSON file to be converted. - - Returns: - str: The path to the converted YAML file, or None if an error occurred. - """ - return self.convert_file(json_filepath, "yaml", "json", "yaml") - - def extract_openapi_info(self, openapi_spec_file, output_path=""): - """ - Extracts relevant information from an OpenAPI specification and writes it to a JSON file. - - Args: - openapi_spec (dict): The OpenAPI specification loaded as a dictionary. - output_file_path (str): Path to save the extracted information in JSON format. - - Returns: - dict: The extracted information saved in JSON format. - """ - openapi_spec = json.load(open(openapi_spec_file)) - - # Extract the API description and host URL - description = openapi_spec.get("info", {}).get("description", "No description provided.") - host = openapi_spec.get("servers", [{}])[0].get("url", "No host URL provided.") - - # Extract correct endpoints and query parameters - correct_endpoints = [] - query_params = {} - - for path, path_item in openapi_spec.get("paths", {}).items(): - correct_endpoints.append(path) - # Collect query parameters for each endpoint - endpoint_query_params = [] - for method, operation in path_item.items(): - if isinstance(operation, dict): - if "parameters" in operation.keys(): - parameters = operation.get("parameters", []) - for param in parameters: - if param.get("in") == "query": - endpoint_query_params.append(param.get("name")) - - if endpoint_query_params: - query_params[path] = endpoint_query_params - - # Create the final output structure - extracted_info = { - "token": "your_api_token_here", - "host": host, - "description": description, - "correct_endpoints": correct_endpoints, - "query_params": query_params - } - filename = os.path.basename(openapi_spec_file) - filename = filename.replace("_oas", "_config") - base_name, _ = os.path.splitext(filename) - output_filename = f"{base_name}.json" - output_path = os.path.join(output_path, output_filename) - - os.makedirs(os.path.dirname(output_path), exist_ok=True) - - # Write to JSON file - with open(output_path, 'w') as json_file: - json.dump(extracted_info, json_file, indent=2) - print(f'output path:{output_path}') - - return extracted_info - - -# Usage example -if __name__ == "__main__": - # yaml_input = "src/hackingBuddyGPT/usecases/web_api_testing/configs/test_config.json/hard/coincap_oas.json" - - converter = OpenAPISpecificationConverter("converted_files") - ## Convert YAML to JSON - # json_file = converter.yaml_to_json(yaml_input) - # - ## Convert JSON to YAML - # if json_file: - # converter.json_to_yaml(json_file) - - openapi_path = "/home/diana/Desktop/masterthesis/00/hackingBuddyGPT/tests/test_files/oas/fakeapi_oas.json" - converter.extract_openapi_info(openapi_path, - output_path="/home/diana/Desktop/masterthesis/00/hackingBuddyGPT/tests/test_files") diff --git a/src/hackingBuddyGPT/utils/openapi/yaml_assistant.py b/src/hackingBuddyGPT/utils/openapi/yaml_assistant.py deleted file mode 100644 index 40f0805e..00000000 --- a/src/hackingBuddyGPT/utils/openapi/yaml_assistant.py +++ /dev/null @@ -1,88 +0,0 @@ -from openai import OpenAI - - -class YamlFileAssistant: - """ - YamlFileAssistant is a class designed to interact with a YAML file using OpenAI's API. - - Attributes: - yaml_file (str): The path to the YAML file that the assistant will analyze. - client (OpenAI): The OpenAI client used to interact with the OpenAI API. - """ - - def __init__(self, yaml_file: str, client: OpenAI): - """ - Initializes the YamlFileAssistant with a specified YAML file and OpenAI client. - - Args: - yaml_file (str): The path to the YAML file to be analyzed. - client (OpenAI): The OpenAI client used to interact with the OpenAI API. - """ - self.yaml_file: str = yaml_file - self.client: OpenAI = client - - def run(self, recorded_note: str) -> None: - """ - Runs the assistant to analyze the YAML file based on a recorded note. - - This method would typically interact with OpenAI's API to create an assistant, - upload the YAML file, analyze its contents, and generate responses. However, the - actual implementation is currently commented out. - - Args: - recorded_note (str): A string containing the note or instructions for analysis. - - Note: - The current implementation is commented out and serves as a placeholder for - integrating with OpenAI's API. Uncomment and modify the code as needed. - """ - assistant = self.client.beta.assistants.create( - name="Yaml File Analysis Assistant", - instructions="You are an OpenAPI specification analyst. Use your knowledge to check " - f"if the following information is contained in the provided yaml file. Information: {recorded_note}", - model="gpt-4o", - tools=[{"type": "file_search"}], - ) - - # Create a vector store called "Financial Statements" - vector_store = self.client.beta.vector_stores.create(name="Financial Statements") - - # Ready the files for upload to OpenAI - file_streams = [open(self.yaml_file, "rb")] - - # Use the upload and poll SDK helper to upload the files, add them to the vector store, - # and poll the status of the file batch for completion. - file_batch = self.client.beta.vector_stores.file_batches.upload_and_poll( - vector_store_id=vector_store.id, files=file_streams - ) - - # You can print the status and the file counts of the batch to see the result of this operation. - print(file_batch.status) - print(file_batch.file_counts) - - assistant = self.client.beta.assistants.update( - assistant_id=assistant.id, - tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}}, - ) - - # Upload the user-provided file to OpenAI - message_file = self.client.files.create( - file=open("edgar/aapl-10k.pdf", "rb"), purpose="assistants" - ) - - # Create a thread and attach the file to the message - thread = self.client.beta.threads.create( - messages=[ - { - "role": "user", - "content": "How many shares of AAPL were outstanding at the end of October 2023?", - # Attach the new file to the message. - "attachments": [ - {"file_id": message_file.id, "tools": [{"type": "file_search"}]} - ], - } - ] - ) - - # The thread now has a vector store with that file in its tool resources. - print(thread.tool_resources.file_search) \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/prompt_generation/__init__.py b/src/hackingBuddyGPT/utils/prompt_generation/__init__.py index 72c52a57..e69de29b 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/__init__.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/__init__.py @@ -1,2 +0,0 @@ -from .prompt_engineer import PromptEngineer -from .prompt_generation_helper import PromptGenerationHelper diff --git a/src/hackingBuddyGPT/utils/prompt_generation/prompts/task_planning/tree_of_thought_prompt.py b/src/hackingBuddyGPT/utils/prompt_generation/prompts/task_planning/tree_of_thought_prompt.py index 0944b614..579097f1 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/prompts/task_planning/tree_of_thought_prompt.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/prompts/task_planning/tree_of_thought_prompt.py @@ -8,7 +8,7 @@ from hackingBuddyGPT.utils.prompt_generation.prompts.task_planning import ( TaskPlanningPrompt, ) -from hackingBuddyGPT.usecases.web_api_testing.utils.custom_datatypes import Prompt +from hackingBuddyGPT.utils.web_api.custom_datatypes import Prompt class TreeOfThoughtPrompt(TaskPlanningPrompt): diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/custom_datatypes.py b/src/hackingBuddyGPT/utils/web_api/custom_datatypes.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/utils/custom_datatypes.py rename to src/hackingBuddyGPT/utils/web_api/custom_datatypes.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py b/src/hackingBuddyGPT/utils/web_api/llm_handler.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py rename to src/hackingBuddyGPT/utils/web_api/llm_handler.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/pattern_matcher.py b/src/hackingBuddyGPT/utils/web_api/pattern_matcher.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/pattern_matcher.py rename to src/hackingBuddyGPT/utils/web_api/pattern_matcher.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py b/src/hackingBuddyGPT/utils/web_api/response_analyzer_with_llm.py similarity index 99% rename from src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py rename to src/hackingBuddyGPT/utils/web_api/response_analyzer_with_llm.py index 88c25715..9067b177 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py +++ b/src/hackingBuddyGPT/utils/web_api/response_analyzer_with_llm.py @@ -10,7 +10,7 @@ from hackingBuddyGPT.utils.prompt_generation.information import ( PromptPurpose, ) -from hackingBuddyGPT.usecases.web_api_testing.utils import LLMHandler +from hackingBuddyGPT.utils.web_api.llm_handler import LLMHandler from hackingBuddyGPT.utils import tool_message diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py b/src/hackingBuddyGPT/utils/web_api/response_handler.py similarity index 85% rename from src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py rename to src/hackingBuddyGPT/utils/web_api/response_handler.py index 9a7fe09f..bfdd57c3 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py +++ b/src/hackingBuddyGPT/utils/web_api/response_handler.py @@ -1,26 +1,21 @@ -import copy import json import re from collections import Counter from itertools import cycle -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Optional import random from urllib.parse import urlencode import pydantic_core from bs4 import BeautifulSoup from rich.panel import Panel -from hackingBuddyGPT.usecases.web_api_testing.documentation.pattern_matcher import PatternMatcher -from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper +from hackingBuddyGPT.utils.web_api.pattern_matcher import PatternMatcher +from hackingBuddyGPT.utils.prompt_generation.prompt_generation_helper import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PromptContext -from hackingBuddyGPT.utils.prompt_generation.information import ( - PenTestingInformation, -) -from hackingBuddyGPT.usecases.web_api_testing.response_processing.response_analyzer_with_llm import ( - ResponseAnalyzerWithLLM, -) -from hackingBuddyGPT.usecases.web_api_testing.utils import LLMHandler -from hackingBuddyGPT.usecases.web_api_testing.utils.custom_datatypes import Prompt +from hackingBuddyGPT.utils.prompt_generation.information import PenTestingInformation +from hackingBuddyGPT.utils.web_api.response_analyzer_with_llm import ResponseAnalyzerWithLLM +from hackingBuddyGPT.utils.web_api.llm_handler import LLMHandler +from hackingBuddyGPT.utils.web_api.custom_datatypes import Prompt from hackingBuddyGPT.utils import tool_message @@ -195,77 +190,6 @@ def extract_response_example(self, html_content: str) -> Optional[Dict[str, Any] return json.loads(result_text) return None - def parse_http_response_to_openapi_example( - self, openapi_spec: Dict[str, Any], http_response: str, path: str, method: str - ) -> Tuple[Optional[Dict[str, Any]], Optional[str], Dict[str, Any]]: - """ - Parses an HTTP response to generate an OpenAPI example. - - Args: - openapi_spec (Dict[str, Any]): The OpenAPI specification to update. - http_response (str): The HTTP response to parse. - path (str): The API path. - method (str): The HTTP method. - - Returns: - Tuple[Optional[Dict[str, Any]], Optional[str], Dict[str, Any]]: A tuple containing the entry dictionary, reference, and updated OpenAPI specification. - """ - - headers, body = http_response.split("\r\n\r\n", 1) - try: - body_dict = json.loads(body) - except json.decoder.JSONDecodeError: - return None, None, openapi_spec - - reference, object_name, openapi_spec = self.parse_http_response_to_schema(openapi_spec, body_dict, path) - entry_dict = {} - old_body_dict = copy.deepcopy(body_dict) - - if len(body_dict) == 1 and "data" not in body_dict: - entry_dict["id"] = body_dict - self.llm_handler._add_created_object(entry_dict, object_name) - else: - if "data" in body_dict: - body_dict = body_dict["data"] - if isinstance(body_dict, list) and len(body_dict) > 0: - body_dict = body_dict[0] - if isinstance(body_dict, list): - for entry in body_dict: - key = entry.get("title") or entry.get("name") or entry.get("id") - entry_dict[key] = {"value": entry} - self.llm_handler._add_created_object(entry_dict[key], object_name) - if len(entry_dict) > 3: - break - - - if isinstance(body_dict, list) and len(body_dict) > 0: - body_dict = body_dict[0] - if isinstance(body_dict, list): - - for entry in body_dict: - key = entry.get("title") or entry.get("name") or entry.get("id") - entry_dict[key] = entry - self.llm_handler._add_created_object(entry_dict[key], object_name) - if len(entry_dict) > 3: - break - else: - if isinstance(body_dict, list) and len(body_dict) == 0: - entry_dict = "" - elif isinstance(body_dict, dict) and "data" in body_dict.keys(): - entry_dict = body_dict["data"] - if isinstance(entry_dict, list) and len(entry_dict) > 0: - entry_dict = entry_dict[0] - else: - entry_dict= body_dict - self.llm_handler._add_created_object(entry_dict, object_name) - if isinstance(old_body_dict, dict) and len(old_body_dict.keys()) > 0 and "data" in old_body_dict.keys() and isinstance(old_body_dict, dict) \ - and isinstance(entry_dict, dict): - old_body_dict.pop("data") - entry_dict = {**entry_dict, **old_body_dict} - - - return entry_dict, reference, openapi_spec - def extract_description(self, note: Any) -> str: """ Extracts the description from a note. @@ -278,48 +202,6 @@ def extract_description(self, note: Any) -> str: """ return note.action.content - def parse_http_response_to_schema( - self, openapi_spec: Dict[str, Any], body_dict: Dict[str, Any], path: str - ) -> Tuple[str, str, Dict[str, Any]]: - """ - Parses an HTTP response body to generate an OpenAPI schema. - - Args: - openapi_spec (Dict[str, Any]): The OpenAPI specification to update. - body_dict (Dict[str, Any]): The HTTP response body as a dictionary or list. - path (str): The API path. - - Returns: - Tuple[str, str, Dict[str, Any]]: A tuple containing the reference, object name, and updated OpenAPI specification. - """ - if "/" not in path: - return None, None, openapi_spec - - object_name = path.split("/")[1].capitalize().rstrip("s") - properties_dict = {} - - # Handle different structures of `body_dict` - if isinstance(body_dict, dict): - for key, value in body_dict.items(): - # If it's a nested dictionary, extract keys recursively - properties_dict = self.extract_keys(key, value, properties_dict) - - elif isinstance(body_dict, list) and len(body_dict) > 0: - first_item = body_dict[0] - if isinstance(first_item, dict): - for key, value in first_item.items(): - properties_dict = self.extract_keys(key, value, properties_dict) - - # Create the schema object for this response - object_dict = {"type": "object", "properties": properties_dict} - - # Add the schema to OpenAPI spec if not already present - if object_name not in openapi_spec["components"]["schemas"]: - openapi_spec["components"]["schemas"][object_name] = object_dict - - reference = f"#/components/schemas/{object_name}" - return reference, object_name, openapi_spec - def read_yaml_to_string(self, filepath: str) -> Optional[str]: """ Reads a YAML file and returns its contents as a string. @@ -365,29 +247,6 @@ def extract_endpoints(self, note: str) -> Dict[str, list]: return required_endpoints - def extract_keys(self, key: str, value: Any, properties_dict: Dict[str, Any]) -> Dict[str, Any]: - """ - Extracts and formats the keys and values from a dictionary to generate OpenAPI properties. - - Args: - key (str): The key in the dictionary. - value (Any): The value associated with the key. - properties_dict (Dict[str, Any]): The dictionary to store the extracted properties. - - Returns: - Dict[str, Any]: The updated properties dictionary. - """ - if key == "id": - properties_dict[key] = { - "type": str(type(value).__name__), - "format": "uuid", - "example": str(value), - } - else: - properties_dict[key] = {"type": str(type(value).__name__), "example": str(value)} - - return properties_dict - def evaluate_result(self, result: Any, prompt_history: Prompt, analysis_context: Any) -> Any: """ Evaluates the result using the LLM-based response analyzer. diff --git a/tests/test_response_analyzer_with_llm.py b/tests/test_response_analyzer_with_llm.py index d384edaf..84b7a999 100644 --- a/tests/test_response_analyzer_with_llm.py +++ b/tests/test_response_analyzer_with_llm.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import MagicMock -from hackingBuddyGPT.usecases.web_api_testing.response_processing.response_analyzer_with_llm import ResponseAnalyzerWithLLM +from hackingBuddyGPT.utils.web_api.response_analyzer_with_llm import ResponseAnalyzerWithLLM from hackingBuddyGPT.utils.prompt_generation.information import PromptPurpose diff --git a/tests/test_response_handler.py b/tests/test_response_handler.py index a4f72c87..57e4d2c2 100644 --- a/tests/test_response_handler.py +++ b/tests/test_response_handler.py @@ -4,7 +4,7 @@ from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PromptContext -from hackingBuddyGPT.usecases.web_api_testing.response_processing.response_handler import ( +from hackingBuddyGPT.utils.web_api.response_handler import ( ResponseHandler, ) from hackingBuddyGPT.usecases.web_api_testing.utils import LLMHandler diff --git a/tests/test_web_api_documentation.py b/tests/test_web_api_documentation.py index c2b951f6..d517bc00 100644 --- a/tests/test_web_api_documentation.py +++ b/tests/test_web_api_documentation.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch from hackingBuddyGPT.utils.logging import LocalLogger -from hackingBuddyGPT.usecases.web_api_testing.simple_openapi_documentation import ( +from hackingBuddyGPT.usecases.web_api_documentation.simple_openapi_documentation import ( SimpleWebAPIDocumentation, SimpleWebAPIDocumentationUseCase, )