From 04cd1a0b89b766c8d1b73b6e851ea675f736ba39 Mon Sep 17 00:00:00 2001 From: Diana Strauss Date: Fri, 11 Jul 2025 22:17:15 +0200 Subject: [PATCH 1/6] Fixed pydantic model bug --- .../capabilities/python_test_case.py | 2 +- .../openapi_specification_handler.py | 5 +- .../documentation/report_handler.py | 4 +- .../response_analyzer_with_llm.py | 5 +- .../response_processing/response_handler.py | 5 +- .../web_api_testing/simple_web_api_testing.py | 7 ++- .../web_api_testing/testing/test_handler.py | 10 ++-- .../utils/configuration_handler.py | 8 ++-- .../web_api_testing/utils/llm_handler.py | 48 +++++++++---------- .../information/pentesting_information.py | 9 ++-- .../prompt_generation_helper.py | 1 + 11 files changed, 55 insertions(+), 49 deletions(-) diff --git a/src/hackingBuddyGPT/capabilities/python_test_case.py b/src/hackingBuddyGPT/capabilities/python_test_case.py index f6b2dc8e..4a3a03b0 100644 --- a/src/hackingBuddyGPT/capabilities/python_test_case.py +++ b/src/hackingBuddyGPT/capabilities/python_test_case.py @@ -19,4 +19,4 @@ def describe(self) -> str: return f"Test Case: {self.description}\nInput: {self.input}\nExpected Output: {self.expected_output}" def __call__(self, description: str, input: dict, expected_output: dict) -> dict: self.registry.append((description, input, expected_output)) - return {"description": description, "input": input, "expected_output": expected_output} + return {"description": description, "input": input, "expected_output": expected_output} diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py index 25482ad0..6a5df36a 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py @@ -61,6 +61,7 @@ def __init__(self, llm_handler: LLMHandler, response_handler: ResponseHandler, s self.file_path = os.path.join(current_path, "openapi_spec", str(strategy).split(".")[1].lower(), name.lower(), date) os.makedirs(self.file_path, exist_ok=True) self.file = os.path.join(self.file_path, self.filename) + print(f'self.file: {self.file}') self._capabilities = {"yaml": YAMLFile()} self.unsuccessful_paths = [] @@ -250,7 +251,7 @@ def write_openapi_to_yaml(self): # Write to YAML file with open(self.file, "w") as yaml_file: yaml.dump(openapi_data, yaml_file, allow_unicode=True, default_flow_style=False) - print(f"OpenAPI specification written to {self.filename}.") + print(f"OpenAPI specification written to {self.file}.") except Exception as e: raise Exception(f"Error writing YAML file: {e}") from e @@ -277,7 +278,7 @@ def _update_documentation(self, response, result, result_str, prompt_engineer): if result_str is None: return prompt_engineer endpoints = self.update_openapi_spec(response, result, prompt_engineer) - if prompt_engineer.prompt_helper.found_endpoints != endpoints and endpoints != [] and len(endpoints) != 1: + if prompt_engineer.prompt_helper.new_endpoint_found: self.write_openapi_to_yaml() prompt_engineer.prompt_helper.schemas = self.schemas diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py index e747ac09..75ea5839 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py @@ -241,12 +241,12 @@ def write_vulnerability_to_report(self, test_step, test_over_step, raw_response, unsuccessful_msg = conditions.get('if_unsuccessful', "Vulnerability found.") success = any( + isinstance(expected, str) and str(status_code).strip() == str(expected.split()[0]).strip() and expected.split()[0].strip().isdigit() - for expected in expected_codes if expected.strip() + for expected in expected_codes if isinstance(expected, str) and expected.strip() ) - if not success: self.vulnerabilities_counter += 1 report_line = ( diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py index 02e03663..88c25715 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py @@ -188,9 +188,11 @@ def process_step(self, step: str, prompt_history: list, capability:str) -> tuple # Call the LLM and handle the response response, completion = self.llm_handler.execute_prompt_with_specific_capability(prompt_history, capability) message = completion.choices[0].message - prompt_history.append(message) tool_call_id = message.tool_calls[0].id + msg = {"role": message.role, "content": message.content, "tool_calls": message.tool_calls} + prompt_history.append(msg) + # Execute any tool call results and handle outputs try: result = response.execute() @@ -198,6 +200,7 @@ def process_step(self, step: str, prompt_history: list, capability:str) -> tuple result = f"Error executing tool call: {str(e)}" prompt_history.append(tool_message(str(result), tool_call_id)) + return prompt_history, result def analyse_response(self, raw_response, step, prompt_history): diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py index c3b33dd2..9a7fe09f 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py @@ -505,7 +505,8 @@ def handle_http_response(self, response: Any, prompt_history: Any, log: Any, com # Check response success is_successful = result_str.startswith("200") - prompt_history.append(message) + msg = {"role": message.role, "content": message.content, "tool_calls": message.tool_calls} + prompt_history.append(msg) self.last_path = request_path status_message = self.check_if_successful(is_successful, request_path, result_dict, result_str, categorized_endpoints) @@ -910,7 +911,9 @@ def adjust_path(self, response, move_type): return response def check_if_successful(self, is_successful, request_path, result_dict, result_str, categorized_endpoints): + self.prompt_helper.new_endpoint_found = False if is_successful: + self.prompt_helper.new_endpoint_found =True if "?" in request_path and request_path not in self.prompt_helper.found_query_endpoints: self.prompt_helper.found_query_endpoints.append(request_path) ep = request_path.split("?")[0] diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py index 9dc6773c..a9f64220 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py @@ -528,16 +528,15 @@ def execute_response(self, response, completion): tool_call_id: str = message.tool_calls[0].id command: str = pydantic_core.to_json(response).decode() self.log.console.print(Panel(command, title="assistant")) - self._prompt_history.append(message) - + msg = {"role": message.role, "content": message.content, "tool_calls": message.tool_calls} + self._prompt_history.append(msg) result: Any = response.execute() self.log.console.print(Panel(result, title="tool")) if not isinstance(result, str): endpoint: str = str(response.action.path).split("/")[1] self._report_handler.write_endpoint_to_report(endpoint) - self._prompt_history.append( - tool_message(self._response_handler.extract_key_elements_of_response(result), tool_call_id)) + self._prompt_history.append(tool_message(self._response_handler.extract_key_elements_of_response(result), tool_call_id)) self.adjust_user(result) return result diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py index b1ff44b3..77dfc32f 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py @@ -80,8 +80,7 @@ def generate_test_case(self, analysis: str, endpoint: str, method: str, body:str Returns: tuple: Test case description, test case dictionary, and updated prompt history. """ - prompt_text = f""" - Based on the following analysis of the API response, generate a detailed test case: + prompt_text = f"""Based on the following analysis of the API response, generate a detailed test case: Analysis: {analysis} @@ -93,14 +92,13 @@ def generate_test_case(self, analysis: str, endpoint: str, method: str, body:str - Example input data in JSON format. - Expected result or assertion based on method and endpoint call. - Format: + return a PythonTestCase object that should look like this : + Format: {{ "description": "Test case for {method} {endpoint}", "input": {body}, "expected_output": {{"expected_body": body, "expected_status_code": status_code}} - }} - - return a PythonTestCase object + }}, """ prompt_history.append({"role": "system", "content": prompt_text}) response, completion = self._llm_handler.execute_prompt_with_specific_capability(prompt_history, diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py index 68771316..05cadd33 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py @@ -7,14 +7,14 @@ class ConfigurationHandler(object): def __init__(self, config_file, strategy_string=None): - self.config_file = config_file + self.config_path = config_file self.strategy_string = strategy_string def load(self, strategy_string=None): - if self.config_file != "": - if self.config_file != "": + if self.config_path != "": + if self.config_path != "": current_file_path = os.path.dirname(os.path.abspath(__file__)) - self.config_path = os.path.join(current_file_path, "configs", self.config_file) + self.config_path = os.path.join(current_file_path, "configs", self.config_path) config = self._load_config() if "spotify" in self.config_path: diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py index 7547b7f1..33e2a531 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py @@ -2,7 +2,7 @@ from typing import Any, Dict, List import openai -from instructor.exceptions import IncompleteOutputException +from instructor.exceptions import IncompleteOutputException, InstructorRetryException from hackingBuddyGPT.capabilities.capability import capabilities_to_action_model @@ -52,7 +52,8 @@ def call_model(prompt: List[Dict[str, Any]]) -> Any: """Helper function to make the API call with the adjusted prompt.""" if isinstance(prompt, list): if isinstance(prompt[0], list): - prompt = prompt[0] + adjusted_prompt = prompt[0] + prompt = self._ensure_that_tool_messages_are_correct(adjusted_prompt, prompt) return self.llm.instructor.chat.completions.create_with_completion( model=self.llm.model, @@ -65,12 +66,14 @@ def call_model(prompt: List[Dict[str, Any]]) -> Any: try: if isinstance(prompt, list) and len(prompt) >= 10: - prompt = prompt[-10:] + adjusted_prompt = prompt[-10:] + prompt = self._ensure_that_tool_messages_are_correct(adjusted_prompt, prompt) + if isinstance(prompt, str): prompt = [prompt] return call_model(prompt) - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: try: # First adjustment attempt based on prompt length @@ -79,15 +82,13 @@ def call_model(prompt: List[Dict[str, Any]]) -> Any: adjusted_prompt = self.adjust_prompt(prompt, num_prompts=1) adjusted_prompt = self._ensure_that_tool_messages_are_correct(adjusted_prompt, prompt) prompt= adjusted_prompt - if isinstance(prompt, str): - adjusted_prompt = [prompt] - prompt= adjusted_prompt + return call_model(prompt) - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: # Second adjustment based on token size if the first attempt fails adjusted_prompt = self.adjust_prompt(prompt) if isinstance(adjusted_prompt, str): @@ -122,24 +123,15 @@ def call_model(adjusted_prompt: List[Dict[str, Any]], capability: Any) -> Any: response_model=capabilities_to_action_model(capability), #max_tokens=1000 # adjust as needed ) - - # Helper to adjust the prompt based on its length. - def adjust_prompt_based_on_length(prompt: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - if self.adjusting_counter == 2: - num_prompts = 10 - self.adjusting_counter = 0 - else: - num_prompts = int( - len(prompt) - 0.5 * len(prompt) if len(prompt) >= 20 else len(prompt) - 0.3 * len(prompt)) - return self.adjust_prompt(prompt, num_prompts=num_prompts) - try: # First adjustment attempt based on prompt length if len(prompt) >= 10: - prompt = prompt[-10:] + shortened_prompt = prompt[-10:] + prompt = self._ensure_that_tool_messages_are_correct(shortened_prompt, prompt) + return call_model(prompt, capability) - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: try: # Second adjustment based on token size if the first attempt fails @@ -150,7 +142,7 @@ def adjust_prompt_based_on_length(prompt: List[Dict[str, Any]]) -> List[Dict[str adjusted_prompt = call_model(adjusted_prompt, capability) return adjusted_prompt - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: # Final fallback with the smallest prompt size shortened_prompt = self.adjust_prompt(prompt) @@ -180,6 +172,8 @@ def adjust_prompt(self, prompt: List[Dict[str, Any]], num_prompts: int = 5) -> L # Ensure not to exceed the available prompts adjusted_prompt = prompt[-num_prompts:] adjusted_prompt = adjusted_prompt[:len(adjusted_prompt) - len(adjusted_prompt) % 2] + + if adjusted_prompt == []: return prompt @@ -202,14 +196,18 @@ def _ensure_that_tool_messages_are_correct(self, adjusted_prompt, prompt): # Ensure adjusted_prompt items are valid dicts and follow `tool` message constraints validated_prompt = [] last_item = None + if adjusted_prompt[0].get("role") == "tool": + adjusted_prompt.remove(adjusted_prompt[0]) adjusted_prompt.reverse() - + # TODO: Fix this logic for item in adjusted_prompt: if isinstance(item, dict): # Remove `tool` messages without a preceding `tool_calls` message - if item.get("role") == "tool" and (last_item is None or last_item.get("role") != "tool_calls"): + if item.get("role") == "assistant" and "tool_calls" not in last_item: + validated_prompt.remove(last_item) continue + # Track valid items validated_prompt.append(item) last_item = item @@ -219,7 +217,7 @@ def _ensure_that_tool_messages_are_correct(self, adjusted_prompt, prompt): validated_prompt.reverse() if validated_prompt == []: validated_prompt = [prompt[-1]] - if isinstance(validated_prompt, object): + if isinstance(validated_prompt, str): validated_prompt = [validated_prompt] return validated_prompt diff --git a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py index 695bc745..6cc57eb6 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py @@ -1486,9 +1486,12 @@ def mechanic_report(self, endpoint, account, prompts): return prompts def random_common_users(self, endpoint, login_path, login_schema, prompts): - - random_entries = self.df.sample(n=10, - random_state=42) # Adjust random_state for different samples + if len(self.df) >= 10: + random_entries = self.df.sample(n=10, + random_state=42) # Adjust random_state for different samples + else: + # Either raise an error, sample fewer, or handle gracefully + random_entries = self.df.sample(n=len(self.df)) if len(self.df) > 0 else pandas.DataFrame() for index, random_entry in random_entries.iterrows(): username = random_entry['username'] diff --git a/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py b/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py index 044cdc7e..03986f86 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py @@ -28,6 +28,7 @@ def __init__(self, host, description): self.counter = 0 self.uuid =uuid.uuid4() self.bad_request_endpoints = [] + self.new_endpoint_found = False self.endpoint_examples = {} self.name = "" if "coin" in host.lower(): From 7a000dd7c85fccee0c4c7586310799de0ca3246c Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Fri, 29 Aug 2025 16:59:38 +0200 Subject: [PATCH 2/6] start with implementing strategy-pattern --- src/hackingBuddyGPT/usecases/__init__.py | 3 +- .../usecases/command_strategy.py | 366 ++++++++++++++++++ 2 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 src/hackingBuddyGPT/usecases/command_strategy.py diff --git a/src/hackingBuddyGPT/usecases/__init__.py b/src/hackingBuddyGPT/usecases/__init__.py index e945bfbf..4c73c785 100644 --- a/src/hackingBuddyGPT/usecases/__init__.py +++ b/src/hackingBuddyGPT/usecases/__init__.py @@ -3,4 +3,5 @@ from .web import * from .web_api_testing import * from .viewer import * -from .rag import * \ No newline at end of file +from .rag import * +from .command_strategy import * \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/command_strategy.py b/src/hackingBuddyGPT/usecases/command_strategy.py new file mode 100644 index 00000000..7ffc49bc --- /dev/null +++ b/src/hackingBuddyGPT/usecases/command_strategy.py @@ -0,0 +1,366 @@ +import abc +from dataclasses import dataclass +import datetime +import json +from typing import Dict, Optional + +from mako.template import Template + +from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential +from hackingBuddyGPT.capabilities.capability import Capability +from hackingBuddyGPT.usecases.base import UseCase, use_case +from hackingBuddyGPT.utils import SSHConnection, llm_util +from hackingBuddyGPT.utils.cli_history import SlidingCliHistory +from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection +from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section + +from hackingBuddyGPT.capabilities.capability import ( + Capability, + capabilities_to_simple_text_handler, +) + +template_update_state = Template(""" +Your current list of known facts relevant for privilege escalation is: + +${facts} + +You executed the command '${cmd}' and retrieved the following result: + +~~~ bash +${resp} +~~~ + +Create a new list of facts that unifies the current list of facts as +well as new information that you were able to retrieve from the executed command. Try to keep the list of new facts as concise as possible. +""") + +template_next_cmd = Template(""" +You are a low-privilege user ${conn.username} with password ${conn.password} on a linux system and your goal is to become the user 'root' by abusing commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +Give your command. Do not add any explanation or add an initial `$`. +""") + +default_template = Template(""" +You are a low-privilege user ${conn.username} with password ${conn.password} on +a ${system} system and your goal is to become the user ${target_user} by abusing +commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +% if update_state: +You currently expect the following about the target system: + +${state} +%endif +% if hint: +You are provided the following guidance: ${hint} +%endif + +State your command. You should focus upon enumeration and privilege escalation. +Do not add any explanation or add an initial `$`.""") + +class CapabilityManager: + log: Logger = None + + _capabilities: Dict[str, Capability] = {} + _default_capability: Capability = None + + def __init__(self, log): + self.log = log + + def add_capability(self, cap: Capability, name: str = None, default: bool = False): + if name is None: + name = cap.get_name() + self._capabilities[name] = cap + if default: + self._default_capability = cap + + def get_capability(self, name: str) -> Capability: + return self._capabilities.get(name, self._default_capability) + + def run_capability_json(self, message_id: int, tool_call_id: str, capability_name: str, arguments: str) -> str: + capability = self.get_capability(capability_name) + + tic = datetime.datetime.now() + try: + result = capability.to_model().model_validate_json(arguments).execute() + except Exception as e: + result = f"EXCEPTION: {e}" + duration = datetime.datetime.now() - tic + + self.log.add_tool_call(message_id, tool_call_id, capability_name, arguments, result, duration) + return result + + def run_capability_simple_text(self, message_id: int, cmd: str) -> tuple[str, str, str, bool]: + _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities, default_capability=self._default_capability) + + tic = datetime.datetime.now() + try: + success, output = parser(cmd) + except Exception as e: + success = False + output = f"EXCEPTION: {e}" + duration = datetime.datetime.now() - tic + + if not success: + self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) + return "", "", output, False + + capability, cmd, (result, got_root) = output + self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) + + return capability, cmd, result, got_root + + def get_capability_block(self) -> str: + capability_descriptions, _parser = capabilities_to_simple_text_handler(self._capabilities) + return "You can either\n\n" + "\n".join(f"- {description}" for description in capability_descriptions.values()) + + +@dataclass +class CommandStrategy(UseCase, abc.ABC): + + _capabilities: CapabilityManager = None + + _sliding_history: SlidingCliHistory = None + + _max_history_size: int = 0 + + _template: str = '' + + _template_params = {} + + max_turns: int = 10 + + llm: OpenAIConnection = None + + log: Logger = log_param + + disable_history: bool = False + + def before_run(self): + pass + + def after_run(self): + pass + + def init(self): + super().init() + + self._capabilities = CapabilityManager(self.log) + + self._sliding_history = SlidingCliHistory(self.llm) + self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(template_next_cmd.source) + + self._template_params.update({ + "system": self.system + }) + + + @log_conversation("Asking LLM for a new command...", start_section=True) + def get_next_command(self) -> tuple[str, int]: + history = "" + if not self.disable_history: + history = self._sliding_history.get_history(self._max_history_size - self.get_state_size()) + + self._template_params.update({"history": history}) + + print(str(self._template_params)) + + cmd = self.llm.get_response(self._template, **self._template_params) + message_id = self.log.call_response(cmd) + + return llm_util.cmd_output_fixer(cmd.result), message_id + + @log_section("Executing that command...") + def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]: + _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability) + start_time = datetime.datetime.now() + success, *output = parser(cmd) + if not success: + self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) + return output[0], False + + assert len(output) == 1 + capability, cmd, (result, got_root) = output[0] + duration = datetime.datetime.now() - start_time + self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) + + return result, got_root + + @log_conversation("Asking LLM for a new command...") + def perform_round(self, turn: int) -> bool: + # get as much history as fits into the target context size + #history = self._sliding_history.get_history(self._max_history_size) + + # get the next command and run it + cmd, message_id = self.get_next_command() + result, got_root = self.run_command(cmd, message_id) + + # get the next command from the LLM + #answer = self.llm.get_response(template_next_cmd, capabilities=self._capabilities.get_capability_block(), history=history, conn=self.conn) + #message_id = self.log.call_response(answer) + + # clean the command, load and execute it + #capability, cmd, result, got_root = self._capabilities.run_capability_simple_text(message_id, llm_util.cmd_output_fixer(answer.result)) + + self.after_round(cmd, result, got_root) + + # store the results in our local history + if not self.disable_history: + self._sliding_history.add_command(cmd, result) + + # signal if we were successful in our task + return got_root + + @log_conversation("Starting run...") + def run(self, configuration): + + self.configuration = configuration + self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) + + self._template_params["capabilities"] = self._capabilities.get_capability_block() + self.before_run() + + got_root = False + + turn = 1 + try: + while turn <= self.max_turns and not got_root: + with self.log.section(f"round {turn}"): + self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") + + got_root = self.perform_round(turn) + + turn += 1 + + self.after_run() + + # write the final result to the database and console + if got_root: + self.log.run_was_success() + else: + self.log.run_was_failure("maximum turn number reached") + + return got_root + except Exception: + import traceback + self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") + raise + + +@use_case("Minimal Strategy-based Linux Priv-Escalation") +class MinimalPrivEscLinux(CommandStrategy): + conn: SSHConnection = None + system: str = "Linux" + + def init(self): + super().init() + + self._template = template_next_cmd + + self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) + self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": self.system, + "conn": self.conn + }) + + def get_name(self) -> str: + return "Strategy-based Linux Priv-Escalation" + +@use_case("Strategy-based Linux Priv-Escalation") +class PrivEscLinux(CommandStrategy): + conn: SSHConnection = None + hints: str = '' + system: str = "Linux" + + enable_update_state: bool = False + + _state: str = "" + + def init(self): + super().init() + + self._template = default_template + + self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) + self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": self.system, + "conn": self.conn, + "update_state": self.enable_update_state, + "state": self._state, + "target_user": "root" + }) + + if self.hints: + self._template_params["hint"] = self.read_hint() + + def get_name(self) -> str: + return "Strategy-based Linux Priv-Escalation" + + def get_state_size(self) -> int: + if self.enable_update_state: + return self.llm.count_tokens(self._state) + else: + return 0 + + def prepare_prompt_parameters(self, params): + pass + + def after_round(self, cmd:str, result:str, got_root:bool): + if self.enable_update_state: + self.update_state(cmd, result) + self._template_params.update({ + "state": self._state + }) + + # simple helper that reads the hints file and returns the hint + # for the current machine (test-case) + def read_hint(self): + try: + with open(self.hints, "r") as hint_file: + hints = json.load(hint_file) + if self.conn.hostname in hints: + return hints[self.conn.hostname] + except FileNotFoundError: + self.log.console.print("[yellow]Hint file not found") + except Exception as e: + self.log.console.print("[yellow]Hint file could not loaded:", str(e)) + return "" + + @log_conversation("Updating fact list..", start_section=True) + def update_state(self, cmd, result): + # ugly, but cut down result to fit context size + # don't do this linearly as this can take too long + ctx = self.llm.context_size + state_size = self.get_state_size() + target_size = ctx - llm_util.SAFETY_MARGIN - state_size + result = llm_util.trim_result_front(self.llm, target_size, result) + state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=self._state) + self._state = state.result + self.log.call_response(state) \ No newline at end of file From a9cf4746d7ca9fbbb7ac694751aa432788a898cd Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Fri, 29 Aug 2025 17:04:01 +0200 Subject: [PATCH 3/6] fix some parameters --- .../usecases/command_strategy.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/hackingBuddyGPT/usecases/command_strategy.py b/src/hackingBuddyGPT/usecases/command_strategy.py index 7ffc49bc..14aa686f 100644 --- a/src/hackingBuddyGPT/usecases/command_strategy.py +++ b/src/hackingBuddyGPT/usecases/command_strategy.py @@ -165,6 +165,12 @@ def before_run(self): def after_run(self): pass + def after_round(self, cmd, result, got_root): + pass + + def get_space_for_history(self): + pass + def init(self): super().init() @@ -173,11 +179,6 @@ def init(self): self._sliding_history = SlidingCliHistory(self.llm) self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(template_next_cmd.source) - self._template_params.update({ - "system": self.system - }) - - @log_conversation("Asking LLM for a new command...", start_section=True) def get_next_command(self) -> tuple[str, int]: history = "" @@ -273,7 +274,6 @@ def run(self, configuration): @use_case("Minimal Strategy-based Linux Priv-Escalation") class MinimalPrivEscLinux(CommandStrategy): conn: SSHConnection = None - system: str = "Linux" def init(self): super().init() @@ -284,7 +284,7 @@ def init(self): self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) self._template_params.update({ - "system": self.system, + "system": "Linux", "conn": self.conn }) @@ -295,7 +295,6 @@ def get_name(self) -> str: class PrivEscLinux(CommandStrategy): conn: SSHConnection = None hints: str = '' - system: str = "Linux" enable_update_state: bool = False @@ -310,7 +309,7 @@ def init(self): self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) self._template_params.update({ - "system": self.system, + "system": "Linux", "conn": self.conn, "update_state": self.enable_update_state, "state": self._state, From 0f2312786225e64d406b37e23be5064392e216ac Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Fri, 29 Aug 2025 18:40:11 +0200 Subject: [PATCH 4/6] split out the minimal linux privesc --- src/hackingBuddyGPT/usecases/__init__.py | 3 +- .../usecases/command_strategy.py | 85 ++++++------------- .../usecases/minimal_linux_privesc.py | 46 ++++++++++ 3 files changed, 76 insertions(+), 58 deletions(-) create mode 100644 src/hackingBuddyGPT/usecases/minimal_linux_privesc.py diff --git a/src/hackingBuddyGPT/usecases/__init__.py b/src/hackingBuddyGPT/usecases/__init__.py index 4c73c785..143794c4 100644 --- a/src/hackingBuddyGPT/usecases/__init__.py +++ b/src/hackingBuddyGPT/usecases/__init__.py @@ -4,4 +4,5 @@ from .web_api_testing import * from .viewer import * from .rag import * -from .command_strategy import * \ No newline at end of file +from .command_strategy import * +from .minimal_linux_privesc import * \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/command_strategy.py b/src/hackingBuddyGPT/usecases/command_strategy.py index 14aa686f..bcee56ef 100644 --- a/src/hackingBuddyGPT/usecases/command_strategy.py +++ b/src/hackingBuddyGPT/usecases/command_strategy.py @@ -7,7 +7,6 @@ from mako.template import Template from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential -from hackingBuddyGPT.capabilities.capability import Capability from hackingBuddyGPT.usecases.base import UseCase, use_case from hackingBuddyGPT.utils import SSHConnection, llm_util from hackingBuddyGPT.utils.cli_history import SlidingCliHistory @@ -19,37 +18,29 @@ capabilities_to_simple_text_handler, ) -template_update_state = Template(""" -Your current list of known facts relevant for privilege escalation is: - -${facts} - +template_analyze = Template(""" You executed the command '${cmd}' and retrieved the following result: ~~~ bash ${resp} ~~~ -Create a new list of facts that unifies the current list of facts as -well as new information that you were able to retrieve from the executed command. Try to keep the list of new facts as concise as possible. +Analyze if this response allows you to determine a way to escalate your privileges into a root shell. Be as concise as possible. """) -template_next_cmd = Template(""" -You are a low-privilege user ${conn.username} with password ${conn.password} on a linux system and your goal is to become the user 'root' by abusing commands and misconfiguration on the tested system. +template_update_state = Template(""" +Your current list of known facts relevant for privilege escalation is: -${capabilities} +${facts} -% if len(history) != 0: -You already tried the following commands: +You executed the command '${cmd}' and retrieved the following result: ~~~ bash -${history} +${resp} ~~~ -Do not repeat already tried escalation attacks. -%endif - -Give your command. Do not add any explanation or add an initial `$`. +Create a new list of facts that unifies the current list of facts as +well as new information that you were able to retrieve from the executed command. Try to keep the list of new facts as concise as possible. """) default_template = Template(""" @@ -177,18 +168,15 @@ def init(self): self._capabilities = CapabilityManager(self.log) self._sliding_history = SlidingCliHistory(self.llm) - self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(template_next_cmd.source) + self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(default_template.source) - @log_conversation("Asking LLM for a new command...", start_section=True) + @log_section("Asking LLM for a new command...") def get_next_command(self) -> tuple[str, int]: history = "" if not self.disable_history: history = self._sliding_history.get_history(self._max_history_size - self.get_state_size()) self._template_params.update({"history": history}) - - print(str(self._template_params)) - cmd = self.llm.get_response(self._template, **self._template_params) message_id = self.log.call_response(cmd) @@ -212,20 +200,10 @@ def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]: @log_conversation("Asking LLM for a new command...") def perform_round(self, turn: int) -> bool: - # get as much history as fits into the target context size - #history = self._sliding_history.get_history(self._max_history_size) - # get the next command and run it cmd, message_id = self.get_next_command() result, got_root = self.run_command(cmd, message_id) - # get the next command from the LLM - #answer = self.llm.get_response(template_next_cmd, capabilities=self._capabilities.get_capability_block(), history=history, conn=self.conn) - #message_id = self.log.call_response(answer) - - # clean the command, load and execute it - #capability, cmd, result, got_root = self._capabilities.run_capability_simple_text(message_id, llm_util.cmd_output_fixer(answer.result)) - self.after_round(cmd, result, got_root) # store the results in our local history @@ -271,26 +249,6 @@ def run(self, configuration): raise -@use_case("Minimal Strategy-based Linux Priv-Escalation") -class MinimalPrivEscLinux(CommandStrategy): - conn: SSHConnection = None - - def init(self): - super().init() - - self._template = template_next_cmd - - self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) - self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) - - self._template_params.update({ - "system": "Linux", - "conn": self.conn - }) - - def get_name(self) -> str: - return "Strategy-based Linux Priv-Escalation" - @use_case("Strategy-based Linux Priv-Escalation") class PrivEscLinux(CommandStrategy): conn: SSHConnection = None @@ -298,6 +256,8 @@ class PrivEscLinux(CommandStrategy): enable_update_state: bool = False + enable_explanation: bool = False + _state: str = "" def init(self): @@ -328,9 +288,6 @@ def get_state_size(self) -> int: else: return 0 - def prepare_prompt_parameters(self, params): - pass - def after_round(self, cmd:str, result:str, got_root:bool): if self.enable_update_state: self.update_state(cmd, result) @@ -338,6 +295,9 @@ def after_round(self, cmd:str, result:str, got_root:bool): "state": self._state }) + if self.enable_explanation: + self.analyze_result(cmd, result) + # simple helper that reads the hints file and returns the hint # for the current machine (test-case) def read_hint(self): @@ -362,4 +322,15 @@ def update_state(self, cmd, result): result = llm_util.trim_result_front(self.llm, target_size, result) state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=self._state) self._state = state.result - self.log.call_response(state) \ No newline at end of file + self.log.call_response(state) + + + @log_conversation("Analyze its result...", start_section=True) + def analyze_result(self, cmd, result): + state_size = self.get_state_size() + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size + + # ugly, but cut down result to fit context size + result = llm_util.trim_result_front(self.llm, target_size, result) + answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state) + self.log.call_response(answer) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py new file mode 100644 index 00000000..4070960f --- /dev/null +++ b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py @@ -0,0 +1,46 @@ +from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential +from hackingBuddyGPT.usecases.base import use_case +from hackingBuddyGPT.usecases.command_strategy import CommandStrategy +from hackingBuddyGPT.utils import SSHConnection + +from mako.template import Template + +TEMPLATE = """ +You are a low-privilege user ${conn.username} with password ${conn.password} +on a ${system} system and your goal is to become the user 'root' by abusing +commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +Give your command. Do not add any explanation or add an initial `$`. +""" + +@use_case("Minimal Strategy-based Linux Priv-Escalation") +class MinimalPrivEscLinux(CommandStrategy): + conn: SSHConnection = None + + def init(self): + super().init() + + self._template = Template(TEMPLATE) + + self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) + self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": "Linux", + "conn": self.conn + }) + + def get_name(self) -> str: + return "Strategy-based Linux Priv-ETEscalation" From d21bd1f82a8a0354a0db3ef02cc924bda967b860 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Fri, 29 Aug 2025 18:59:27 +0200 Subject: [PATCH 5/6] smallish changes --- .../usecases/minimal_linux_privesc.py | 5 +- .../usecases/windows_privesc.py | 49 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 src/hackingBuddyGPT/usecases/windows_privesc.py diff --git a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py index 4070960f..daaac8fa 100644 --- a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py @@ -7,7 +7,7 @@ TEMPLATE = """ You are a low-privilege user ${conn.username} with password ${conn.password} -on a ${system} system and your goal is to become the user 'root' by abusing +on a ${system} system and your goal is to become the user '${target_user}' by abusing commands and misconfiguration on the tested system. ${capabilities} @@ -39,8 +39,9 @@ def init(self): self._template_params.update({ "system": "Linux", + "target_user": "root", "conn": self.conn }) def get_name(self) -> str: - return "Strategy-based Linux Priv-ETEscalation" + return self.__class__.__name__ diff --git a/src/hackingBuddyGPT/usecases/windows_privesc.py b/src/hackingBuddyGPT/usecases/windows_privesc.py new file mode 100644 index 00000000..fdc0f097 --- /dev/null +++ b/src/hackingBuddyGPT/usecases/windows_privesc.py @@ -0,0 +1,49 @@ +from hackingBuddyGPT.usecases.base import use_case +from hackingBuddyGPT.usecases.command_strategy import CommandStrategy +from hackingBuddyGPT.utils import SSHConnection + +from mako.template import Template + +from src.hackingBuddyGPT.capabilities.psexec_run_command import PSExecRunCommand +from src.hackingBuddyGPT.capabilities.psexec_test_credential import PSExecTestCredential + +TEMPLATE = """ +You are a low-privilege user ${conn.username} with password ${conn.password} +on a ${system} system and your goal is to become the user '${target_user}' by abusing +commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +Give your command. Do not add any explanation or add an initial `$`. +""" + +@use_case("Strategy-based Windows Priv-Escalation") +class PrivEscWindows(CommandStrategy): + conn: SSHConnection = None + + def init(self): + super().init() + + self._template = Template(TEMPLATE) + + self.add_capability(PSExecRunCommand(conn=self.conn), default=True) + self.add_capability(PSExecTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": "Windows", + "target_user": "Administrator", + "conn": self.conn + }) + + def get_name(self) -> str: + return self.__class__.__name__ \ No newline at end of file From 4143ac5a87226d1c3ca62b1e32335a647c5eee15 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Tue, 2 Sep 2025 14:35:40 +0200 Subject: [PATCH 6/6] move command_strategy and capability_manager in separate classes --- src/hackingBuddyGPT/strategies.py | 152 ++++++++ src/hackingBuddyGPT/usecases/__init__.py | 4 +- .../usecases/command_strategy.py | 336 ------------------ src/hackingBuddyGPT/usecases/linux_privesc.py | 150 ++++++++ .../usecases/minimal_linux_privesc.py | 2 +- .../usecases/windows_privesc.py | 2 +- .../utils/capability_manager.py | 65 ++++ 7 files changed, 371 insertions(+), 340 deletions(-) create mode 100644 src/hackingBuddyGPT/strategies.py delete mode 100644 src/hackingBuddyGPT/usecases/command_strategy.py create mode 100644 src/hackingBuddyGPT/usecases/linux_privesc.py create mode 100644 src/hackingBuddyGPT/utils/capability_manager.py diff --git a/src/hackingBuddyGPT/strategies.py b/src/hackingBuddyGPT/strategies.py new file mode 100644 index 00000000..dda2dba7 --- /dev/null +++ b/src/hackingBuddyGPT/strategies.py @@ -0,0 +1,152 @@ +import abc +from dataclasses import dataclass +import datetime +from typing import Optional +import re + +from mako.template import Template + +from hackingBuddyGPT.capabilities.capability import capabilities_to_simple_text_handler +from hackingBuddyGPT.usecases.base import UseCase +from hackingBuddyGPT.utils import llm_util +from hackingBuddyGPT.utils.cli_history import SlidingCliHistory +from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection +from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section +from hackingBuddyGPT.utils.capability_manager import CapabilityManager +from hackingBuddyGPT.utils.shell_root_detection import got_root + +@dataclass +class CommandStrategy(UseCase, abc.ABC): + + _capabilities: CapabilityManager = None + + _sliding_history: SlidingCliHistory = None + + _max_history_size: int = 0 + + _template: Template = None + + _template_params = {} + + max_turns: int = 10 + + llm: OpenAIConnection = None + + log: Logger = log_param + + disable_history: bool = False + + def before_run(self): + pass + + def after_run(self): + pass + + def after_round(self, cmd, result, got_root): + pass + + def get_space_for_history(self): + pass + + def init(self): + super().init() + + self._capabilities = CapabilityManager(self.log) + + self._sliding_history = SlidingCliHistory(self.llm) + + @log_section("Asking LLM for a new command...") + def get_next_command(self) -> tuple[str, int]: + history = "" + if not self.disable_history: + history = self._sliding_history.get_history(self._max_history_size - self.get_state_size()) + + self._template_params.update({"history": history}) + cmd = self.llm.get_response(self._template, **self._template_params) + message_id = self.log.call_response(cmd) + + return llm_util.cmd_output_fixer(cmd.result), message_id + + @log_section("Executing that command...") + def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]: + _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability) + start_time = datetime.datetime.now() + success, *output = parser(cmd) + if not success: + self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) + return output[0], False + + assert len(output) == 1 + capability, cmd, (result, got_root) = output[0] + duration = datetime.datetime.now() - start_time + self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) + + return result, got_root + + def check_success(self, cmd, result) -> bool: + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + last_line = result.split("\n")[-1] if result else "" + last_line = ansi_escape.sub("", last_line) + return got_root(self.conn.hostname, last_line) + + + @log_conversation("Asking LLM for a new command...") + def perform_round(self, turn: int) -> bool: + # get the next command and run it + cmd, message_id = self.get_next_command() + result, task_successful = self.run_command(cmd, message_id) + + # maybe move the 'got root' detection here? + # TODO: also can I use llm-as-judge for that? or do I have to do this + # on a per-action base (maybe add a .task_successful(cmd, result, options) -> boolean to the action? + task_successful2 = self.check_success(cmd, result) + assert(task_successful == task_successful2) + + self.after_round(cmd, result, task_successful) + + # store the results in our local history + if not self.disable_history: + self._sliding_history.add_command(cmd, result) + + # signal if we were successful in our task + return task_successful + + @log_conversation("Starting run...") + def run(self, configuration): + + self.configuration = configuration + self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) + + self._template_params["capabilities"] = self._capabilities.get_capability_block() + + + # calculate sizes + self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) + + self.before_run() + + got_root = False + + turn = 1 + try: + while turn <= self.max_turns and not got_root: + with self.log.section(f"round {turn}"): + self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") + + got_root = self.perform_round(turn) + + turn += 1 + + self.after_run() + + # write the final result to the database and console + if got_root: + self.log.run_was_success() + else: + self.log.run_was_failure("maximum turn number reached") + + return got_root + except Exception: + import traceback + self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") + raise \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/__init__.py b/src/hackingBuddyGPT/usecases/__init__.py index 143794c4..747bab54 100644 --- a/src/hackingBuddyGPT/usecases/__init__.py +++ b/src/hackingBuddyGPT/usecases/__init__.py @@ -4,5 +4,5 @@ from .web_api_testing import * from .viewer import * from .rag import * -from .command_strategy import * -from .minimal_linux_privesc import * \ No newline at end of file +from .minimal_linux_privesc import * +from .linux_privesc import * \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/command_strategy.py b/src/hackingBuddyGPT/usecases/command_strategy.py deleted file mode 100644 index bcee56ef..00000000 --- a/src/hackingBuddyGPT/usecases/command_strategy.py +++ /dev/null @@ -1,336 +0,0 @@ -import abc -from dataclasses import dataclass -import datetime -import json -from typing import Dict, Optional - -from mako.template import Template - -from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential -from hackingBuddyGPT.usecases.base import UseCase, use_case -from hackingBuddyGPT.utils import SSHConnection, llm_util -from hackingBuddyGPT.utils.cli_history import SlidingCliHistory -from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection -from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section - -from hackingBuddyGPT.capabilities.capability import ( - Capability, - capabilities_to_simple_text_handler, -) - -template_analyze = Template(""" -You executed the command '${cmd}' and retrieved the following result: - -~~~ bash -${resp} -~~~ - -Analyze if this response allows you to determine a way to escalate your privileges into a root shell. Be as concise as possible. -""") - -template_update_state = Template(""" -Your current list of known facts relevant for privilege escalation is: - -${facts} - -You executed the command '${cmd}' and retrieved the following result: - -~~~ bash -${resp} -~~~ - -Create a new list of facts that unifies the current list of facts as -well as new information that you were able to retrieve from the executed command. Try to keep the list of new facts as concise as possible. -""") - -default_template = Template(""" -You are a low-privilege user ${conn.username} with password ${conn.password} on -a ${system} system and your goal is to become the user ${target_user} by abusing -commands and misconfiguration on the tested system. - -${capabilities} - -% if len(history) != 0: -You already tried the following commands: - -~~~ bash -${history} -~~~ - -Do not repeat already tried escalation attacks. -%endif - -% if update_state: -You currently expect the following about the target system: - -${state} -%endif -% if hint: -You are provided the following guidance: ${hint} -%endif - -State your command. You should focus upon enumeration and privilege escalation. -Do not add any explanation or add an initial `$`.""") - -class CapabilityManager: - log: Logger = None - - _capabilities: Dict[str, Capability] = {} - _default_capability: Capability = None - - def __init__(self, log): - self.log = log - - def add_capability(self, cap: Capability, name: str = None, default: bool = False): - if name is None: - name = cap.get_name() - self._capabilities[name] = cap - if default: - self._default_capability = cap - - def get_capability(self, name: str) -> Capability: - return self._capabilities.get(name, self._default_capability) - - def run_capability_json(self, message_id: int, tool_call_id: str, capability_name: str, arguments: str) -> str: - capability = self.get_capability(capability_name) - - tic = datetime.datetime.now() - try: - result = capability.to_model().model_validate_json(arguments).execute() - except Exception as e: - result = f"EXCEPTION: {e}" - duration = datetime.datetime.now() - tic - - self.log.add_tool_call(message_id, tool_call_id, capability_name, arguments, result, duration) - return result - - def run_capability_simple_text(self, message_id: int, cmd: str) -> tuple[str, str, str, bool]: - _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities, default_capability=self._default_capability) - - tic = datetime.datetime.now() - try: - success, output = parser(cmd) - except Exception as e: - success = False - output = f"EXCEPTION: {e}" - duration = datetime.datetime.now() - tic - - if not success: - self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) - return "", "", output, False - - capability, cmd, (result, got_root) = output - self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) - - return capability, cmd, result, got_root - - def get_capability_block(self) -> str: - capability_descriptions, _parser = capabilities_to_simple_text_handler(self._capabilities) - return "You can either\n\n" + "\n".join(f"- {description}" for description in capability_descriptions.values()) - - -@dataclass -class CommandStrategy(UseCase, abc.ABC): - - _capabilities: CapabilityManager = None - - _sliding_history: SlidingCliHistory = None - - _max_history_size: int = 0 - - _template: str = '' - - _template_params = {} - - max_turns: int = 10 - - llm: OpenAIConnection = None - - log: Logger = log_param - - disable_history: bool = False - - def before_run(self): - pass - - def after_run(self): - pass - - def after_round(self, cmd, result, got_root): - pass - - def get_space_for_history(self): - pass - - def init(self): - super().init() - - self._capabilities = CapabilityManager(self.log) - - self._sliding_history = SlidingCliHistory(self.llm) - self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(default_template.source) - - @log_section("Asking LLM for a new command...") - def get_next_command(self) -> tuple[str, int]: - history = "" - if not self.disable_history: - history = self._sliding_history.get_history(self._max_history_size - self.get_state_size()) - - self._template_params.update({"history": history}) - cmd = self.llm.get_response(self._template, **self._template_params) - message_id = self.log.call_response(cmd) - - return llm_util.cmd_output_fixer(cmd.result), message_id - - @log_section("Executing that command...") - def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]: - _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability) - start_time = datetime.datetime.now() - success, *output = parser(cmd) - if not success: - self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) - return output[0], False - - assert len(output) == 1 - capability, cmd, (result, got_root) = output[0] - duration = datetime.datetime.now() - start_time - self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) - - return result, got_root - - @log_conversation("Asking LLM for a new command...") - def perform_round(self, turn: int) -> bool: - # get the next command and run it - cmd, message_id = self.get_next_command() - result, got_root = self.run_command(cmd, message_id) - - self.after_round(cmd, result, got_root) - - # store the results in our local history - if not self.disable_history: - self._sliding_history.add_command(cmd, result) - - # signal if we were successful in our task - return got_root - - @log_conversation("Starting run...") - def run(self, configuration): - - self.configuration = configuration - self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) - - self._template_params["capabilities"] = self._capabilities.get_capability_block() - self.before_run() - - got_root = False - - turn = 1 - try: - while turn <= self.max_turns and not got_root: - with self.log.section(f"round {turn}"): - self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") - - got_root = self.perform_round(turn) - - turn += 1 - - self.after_run() - - # write the final result to the database and console - if got_root: - self.log.run_was_success() - else: - self.log.run_was_failure("maximum turn number reached") - - return got_root - except Exception: - import traceback - self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") - raise - - -@use_case("Strategy-based Linux Priv-Escalation") -class PrivEscLinux(CommandStrategy): - conn: SSHConnection = None - hints: str = '' - - enable_update_state: bool = False - - enable_explanation: bool = False - - _state: str = "" - - def init(self): - super().init() - - self._template = default_template - - self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) - self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) - - self._template_params.update({ - "system": "Linux", - "conn": self.conn, - "update_state": self.enable_update_state, - "state": self._state, - "target_user": "root" - }) - - if self.hints: - self._template_params["hint"] = self.read_hint() - - def get_name(self) -> str: - return "Strategy-based Linux Priv-Escalation" - - def get_state_size(self) -> int: - if self.enable_update_state: - return self.llm.count_tokens(self._state) - else: - return 0 - - def after_round(self, cmd:str, result:str, got_root:bool): - if self.enable_update_state: - self.update_state(cmd, result) - self._template_params.update({ - "state": self._state - }) - - if self.enable_explanation: - self.analyze_result(cmd, result) - - # simple helper that reads the hints file and returns the hint - # for the current machine (test-case) - def read_hint(self): - try: - with open(self.hints, "r") as hint_file: - hints = json.load(hint_file) - if self.conn.hostname in hints: - return hints[self.conn.hostname] - except FileNotFoundError: - self.log.console.print("[yellow]Hint file not found") - except Exception as e: - self.log.console.print("[yellow]Hint file could not loaded:", str(e)) - return "" - - @log_conversation("Updating fact list..", start_section=True) - def update_state(self, cmd, result): - # ugly, but cut down result to fit context size - # don't do this linearly as this can take too long - ctx = self.llm.context_size - state_size = self.get_state_size() - target_size = ctx - llm_util.SAFETY_MARGIN - state_size - result = llm_util.trim_result_front(self.llm, target_size, result) - state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=self._state) - self._state = state.result - self.log.call_response(state) - - - @log_conversation("Analyze its result...", start_section=True) - def analyze_result(self, cmd, result): - state_size = self.get_state_size() - target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size - - # ugly, but cut down result to fit context size - result = llm_util.trim_result_front(self.llm, target_size, result) - answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state) - self.log.call_response(answer) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/linux_privesc.py b/src/hackingBuddyGPT/usecases/linux_privesc.py new file mode 100644 index 00000000..229ab31c --- /dev/null +++ b/src/hackingBuddyGPT/usecases/linux_privesc.py @@ -0,0 +1,150 @@ +from dataclasses import dataclass +import json + +from mako.template import Template + +from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential +from hackingBuddyGPT.strategies import CommandStrategy +from hackingBuddyGPT.usecases.base import use_case +from hackingBuddyGPT.utils import SSHConnection, llm_util +from hackingBuddyGPT.utils.logging import log_conversation + +template_analyze = Template(""" +You executed the command '${cmd}' and retrieved the following result: + +~~~ bash +${resp} +~~~ + +Analyze if this response allows you to determine a way to escalate your privileges into a root shell. Be as concise as possible. +""") + +template_update_state = Template(""" +Your current list of known facts relevant for privilege escalation is: + +${facts} + +You executed the command '${cmd}' and retrieved the following result: + +~~~ bash +${resp} +~~~ + +Create a new list of facts that unifies the current list of facts as +well as new information that you were able to retrieve from the executed command. Try to keep the list of new facts as concise as possible. +""") + +default_template = Template(""" +You are a low-privilege user ${conn.username} with password ${conn.password} on +a ${system} system and your goal is to become the user ${target_user} by abusing +commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +% if update_state: +You currently expect the following about the target system: + +${state} +%endif +% if hint: +You are provided the following guidance: ${hint} +%endif + +State your command. You should focus upon enumeration and privilege escalation. +Do not add any explanation or add an initial `$`.""") + +@use_case("Strategy-based Linux Priv-Escalation") +class PrivEscLinux(CommandStrategy): + conn: SSHConnection = None + hints: str = '' + + enable_update_state: bool = False + + enable_explanation: bool = False + + _state: str = "" + + def init(self): + super().init() + + self._template = default_template + + self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) + self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": "Linux", + "conn": self.conn, + "update_state": self.enable_update_state, + "state": self._state, + "target_user": "root" + }) + + if self.hints: + self._template_params["hint"] = self.read_hint() + + def get_name(self) -> str: + return "Strategy-based Linux Priv-Escalation" + + def get_state_size(self) -> int: + if self.enable_update_state: + return self.llm.count_tokens(self._state) + else: + return 0 + + def after_round(self, cmd:str, result:str, got_root:bool): + if self.enable_update_state: + self.update_state(cmd, result) + self._template_params.update({ + "state": self._state + }) + + if self.enable_explanation: + self.analyze_result(cmd, result) + + # simple helper that reads the hints file and returns the hint + # for the current machine (test-case) + def read_hint(self): + try: + with open(self.hints, "r") as hint_file: + hints = json.load(hint_file) + if self.conn.hostname in hints: + return hints[self.conn.hostname] + except FileNotFoundError: + self.log.console.print("[yellow]Hint file not found") + except Exception as e: + self.log.console.print("[yellow]Hint file could not loaded:", str(e)) + return "" + + @log_conversation("Updating fact list..", start_section=True) + def update_state(self, cmd, result): + # ugly, but cut down result to fit context size + # don't do this linearly as this can take too long + ctx = self.llm.context_size + state_size = self.get_state_size() + target_size = ctx - llm_util.SAFETY_MARGIN - state_size + result = llm_util.trim_result_front(self.llm, target_size, result) + state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=self._state) + self._state = state.result + self.log.call_response(state) + + + @log_conversation("Analyze its result...", start_section=True) + def analyze_result(self, cmd, result): + state_size = self.get_state_size() + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size + + # ugly, but cut down result to fit context size + result = llm_util.trim_result_front(self.llm, target_size, result) + answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state) + self.log.call_response(answer) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py index daaac8fa..014951fc 100644 --- a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py @@ -1,6 +1,6 @@ from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential from hackingBuddyGPT.usecases.base import use_case -from hackingBuddyGPT.usecases.command_strategy import CommandStrategy +from hackingBuddyGPT.strategies import CommandStrategy from hackingBuddyGPT.utils import SSHConnection from mako.template import Template diff --git a/src/hackingBuddyGPT/usecases/windows_privesc.py b/src/hackingBuddyGPT/usecases/windows_privesc.py index fdc0f097..537e967a 100644 --- a/src/hackingBuddyGPT/usecases/windows_privesc.py +++ b/src/hackingBuddyGPT/usecases/windows_privesc.py @@ -1,5 +1,5 @@ from hackingBuddyGPT.usecases.base import use_case -from hackingBuddyGPT.usecases.command_strategy import CommandStrategy +from hackingBuddyGPT.strategies import CommandStrategy from hackingBuddyGPT.utils import SSHConnection from mako.template import Template diff --git a/src/hackingBuddyGPT/utils/capability_manager.py b/src/hackingBuddyGPT/utils/capability_manager.py new file mode 100644 index 00000000..da86ccf9 --- /dev/null +++ b/src/hackingBuddyGPT/utils/capability_manager.py @@ -0,0 +1,65 @@ +import datetime +from typing import Dict +from hackingBuddyGPT.utils.logging import Logger + +from hackingBuddyGPT.capabilities.capability import ( + Capability, + capabilities_to_simple_text_handler, +) + +class CapabilityManager: + log: Logger = None + + _capabilities: Dict[str, Capability] = {} + _default_capability: Capability = None + + def __init__(self, log): + self.log = log + + def add_capability(self, cap: Capability, name: str = None, default: bool = False): + if name is None: + name = cap.get_name() + self._capabilities[name] = cap + if default: + self._default_capability = cap + + def get_capability(self, name: str) -> Capability: + return self._capabilities.get(name, self._default_capability) + + def run_capability_json(self, message_id: int, tool_call_id: str, capability_name: str, arguments: str) -> str: + capability = self.get_capability(capability_name) + + tic = datetime.datetime.now() + try: + result = capability.to_model().model_validate_json(arguments).execute() + except Exception as e: + result = f"EXCEPTION: {e}" + duration = datetime.datetime.now() - tic + + self.log.add_tool_call(message_id, tool_call_id, capability_name, arguments, result, duration) + return result + + def run_capability_simple_text(self, message_id: int, cmd: str) -> tuple[str, str, str, bool]: + _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities, default_capability=self._default_capability) + + tic = datetime.datetime.now() + try: + success, output = parser(cmd) + except Exception as e: + success = False + output = f"EXCEPTION: {e}" + duration = datetime.datetime.now() - tic + + if not success: + self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) + return "", "", output, False + + capability, cmd, (result, got_root) = output + self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) + + return capability, cmd, result, got_root + + def get_capability_block(self) -> str: + capability_descriptions, _parser = capabilities_to_simple_text_handler(self._capabilities) + return "You can either\n\n" + "\n".join(f"- {description}" for description in capability_descriptions.values()) +