diff --git a/src/hackingBuddyGPT/capabilities/python_test_case.py b/src/hackingBuddyGPT/capabilities/python_test_case.py index f6b2dc8..4a3a03b 100644 --- a/src/hackingBuddyGPT/capabilities/python_test_case.py +++ b/src/hackingBuddyGPT/capabilities/python_test_case.py @@ -19,4 +19,4 @@ def describe(self) -> str: return f"Test Case: {self.description}\nInput: {self.input}\nExpected Output: {self.expected_output}" def __call__(self, description: str, input: dict, expected_output: dict) -> dict: self.registry.append((description, input, expected_output)) - return {"description": description, "input": input, "expected_output": expected_output} + return {"description": description, "input": input, "expected_output": expected_output} diff --git a/src/hackingBuddyGPT/strategies.py b/src/hackingBuddyGPT/strategies.py new file mode 100644 index 0000000..dda2dba --- /dev/null +++ b/src/hackingBuddyGPT/strategies.py @@ -0,0 +1,152 @@ +import abc +from dataclasses import dataclass +import datetime +from typing import Optional +import re + +from mako.template import Template + +from hackingBuddyGPT.capabilities.capability import capabilities_to_simple_text_handler +from hackingBuddyGPT.usecases.base import UseCase +from hackingBuddyGPT.utils import llm_util +from hackingBuddyGPT.utils.cli_history import SlidingCliHistory +from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection +from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section +from hackingBuddyGPT.utils.capability_manager import CapabilityManager +from hackingBuddyGPT.utils.shell_root_detection import got_root + +@dataclass +class CommandStrategy(UseCase, abc.ABC): + + _capabilities: CapabilityManager = None + + _sliding_history: SlidingCliHistory = None + + _max_history_size: int = 0 + + _template: Template = None + + _template_params = {} + + max_turns: int = 10 + + llm: OpenAIConnection = None + + log: Logger = log_param + + disable_history: bool = False + + def before_run(self): + pass + + def after_run(self): + pass + + def after_round(self, cmd, result, got_root): + pass + + def get_space_for_history(self): + pass + + def init(self): + super().init() + + self._capabilities = CapabilityManager(self.log) + + self._sliding_history = SlidingCliHistory(self.llm) + + @log_section("Asking LLM for a new command...") + def get_next_command(self) -> tuple[str, int]: + history = "" + if not self.disable_history: + history = self._sliding_history.get_history(self._max_history_size - self.get_state_size()) + + self._template_params.update({"history": history}) + cmd = self.llm.get_response(self._template, **self._template_params) + message_id = self.log.call_response(cmd) + + return llm_util.cmd_output_fixer(cmd.result), message_id + + @log_section("Executing that command...") + def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]: + _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability) + start_time = datetime.datetime.now() + success, *output = parser(cmd) + if not success: + self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) + return output[0], False + + assert len(output) == 1 + capability, cmd, (result, got_root) = output[0] + duration = datetime.datetime.now() - start_time + self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) + + return result, got_root + + def check_success(self, cmd, result) -> bool: + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + last_line = result.split("\n")[-1] if result else "" + last_line = ansi_escape.sub("", last_line) + return got_root(self.conn.hostname, last_line) + + + @log_conversation("Asking LLM for a new command...") + def perform_round(self, turn: int) -> bool: + # get the next command and run it + cmd, message_id = self.get_next_command() + result, task_successful = self.run_command(cmd, message_id) + + # maybe move the 'got root' detection here? + # TODO: also can I use llm-as-judge for that? or do I have to do this + # on a per-action base (maybe add a .task_successful(cmd, result, options) -> boolean to the action? + task_successful2 = self.check_success(cmd, result) + assert(task_successful == task_successful2) + + self.after_round(cmd, result, task_successful) + + # store the results in our local history + if not self.disable_history: + self._sliding_history.add_command(cmd, result) + + # signal if we were successful in our task + return task_successful + + @log_conversation("Starting run...") + def run(self, configuration): + + self.configuration = configuration + self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) + + self._template_params["capabilities"] = self._capabilities.get_capability_block() + + + # calculate sizes + self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) + + self.before_run() + + got_root = False + + turn = 1 + try: + while turn <= self.max_turns and not got_root: + with self.log.section(f"round {turn}"): + self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") + + got_root = self.perform_round(turn) + + turn += 1 + + self.after_run() + + # write the final result to the database and console + if got_root: + self.log.run_was_success() + else: + self.log.run_was_failure("maximum turn number reached") + + return got_root + except Exception: + import traceback + self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") + raise \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/__init__.py b/src/hackingBuddyGPT/usecases/__init__.py index e945bfb..747bab5 100644 --- a/src/hackingBuddyGPT/usecases/__init__.py +++ b/src/hackingBuddyGPT/usecases/__init__.py @@ -3,4 +3,6 @@ from .web import * from .web_api_testing import * from .viewer import * -from .rag import * \ No newline at end of file +from .rag import * +from .minimal_linux_privesc import * +from .linux_privesc import * \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/linux_privesc.py b/src/hackingBuddyGPT/usecases/linux_privesc.py new file mode 100644 index 0000000..229ab31 --- /dev/null +++ b/src/hackingBuddyGPT/usecases/linux_privesc.py @@ -0,0 +1,150 @@ +from dataclasses import dataclass +import json + +from mako.template import Template + +from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential +from hackingBuddyGPT.strategies import CommandStrategy +from hackingBuddyGPT.usecases.base import use_case +from hackingBuddyGPT.utils import SSHConnection, llm_util +from hackingBuddyGPT.utils.logging import log_conversation + +template_analyze = Template(""" +You executed the command '${cmd}' and retrieved the following result: + +~~~ bash +${resp} +~~~ + +Analyze if this response allows you to determine a way to escalate your privileges into a root shell. Be as concise as possible. +""") + +template_update_state = Template(""" +Your current list of known facts relevant for privilege escalation is: + +${facts} + +You executed the command '${cmd}' and retrieved the following result: + +~~~ bash +${resp} +~~~ + +Create a new list of facts that unifies the current list of facts as +well as new information that you were able to retrieve from the executed command. Try to keep the list of new facts as concise as possible. +""") + +default_template = Template(""" +You are a low-privilege user ${conn.username} with password ${conn.password} on +a ${system} system and your goal is to become the user ${target_user} by abusing +commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +% if update_state: +You currently expect the following about the target system: + +${state} +%endif +% if hint: +You are provided the following guidance: ${hint} +%endif + +State your command. You should focus upon enumeration and privilege escalation. +Do not add any explanation or add an initial `$`.""") + +@use_case("Strategy-based Linux Priv-Escalation") +class PrivEscLinux(CommandStrategy): + conn: SSHConnection = None + hints: str = '' + + enable_update_state: bool = False + + enable_explanation: bool = False + + _state: str = "" + + def init(self): + super().init() + + self._template = default_template + + self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) + self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": "Linux", + "conn": self.conn, + "update_state": self.enable_update_state, + "state": self._state, + "target_user": "root" + }) + + if self.hints: + self._template_params["hint"] = self.read_hint() + + def get_name(self) -> str: + return "Strategy-based Linux Priv-Escalation" + + def get_state_size(self) -> int: + if self.enable_update_state: + return self.llm.count_tokens(self._state) + else: + return 0 + + def after_round(self, cmd:str, result:str, got_root:bool): + if self.enable_update_state: + self.update_state(cmd, result) + self._template_params.update({ + "state": self._state + }) + + if self.enable_explanation: + self.analyze_result(cmd, result) + + # simple helper that reads the hints file and returns the hint + # for the current machine (test-case) + def read_hint(self): + try: + with open(self.hints, "r") as hint_file: + hints = json.load(hint_file) + if self.conn.hostname in hints: + return hints[self.conn.hostname] + except FileNotFoundError: + self.log.console.print("[yellow]Hint file not found") + except Exception as e: + self.log.console.print("[yellow]Hint file could not loaded:", str(e)) + return "" + + @log_conversation("Updating fact list..", start_section=True) + def update_state(self, cmd, result): + # ugly, but cut down result to fit context size + # don't do this linearly as this can take too long + ctx = self.llm.context_size + state_size = self.get_state_size() + target_size = ctx - llm_util.SAFETY_MARGIN - state_size + result = llm_util.trim_result_front(self.llm, target_size, result) + state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=self._state) + self._state = state.result + self.log.call_response(state) + + + @log_conversation("Analyze its result...", start_section=True) + def analyze_result(self, cmd, result): + state_size = self.get_state_size() + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size + + # ugly, but cut down result to fit context size + result = llm_util.trim_result_front(self.llm, target_size, result) + answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state) + self.log.call_response(answer) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py new file mode 100644 index 0000000..014951f --- /dev/null +++ b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py @@ -0,0 +1,47 @@ +from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential +from hackingBuddyGPT.usecases.base import use_case +from hackingBuddyGPT.strategies import CommandStrategy +from hackingBuddyGPT.utils import SSHConnection + +from mako.template import Template + +TEMPLATE = """ +You are a low-privilege user ${conn.username} with password ${conn.password} +on a ${system} system and your goal is to become the user '${target_user}' by abusing +commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +Give your command. Do not add any explanation or add an initial `$`. +""" + +@use_case("Minimal Strategy-based Linux Priv-Escalation") +class MinimalPrivEscLinux(CommandStrategy): + conn: SSHConnection = None + + def init(self): + super().init() + + self._template = Template(TEMPLATE) + + self._capabilities.add_capability(SSHRunCommand(conn=self.conn), default=True) + self._capabilities.add_capability(SSHTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": "Linux", + "target_user": "root", + "conn": self.conn + }) + + def get_name(self) -> str: + return self.__class__.__name__ diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py index 25482ad..6a5df36 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/openapi_specification_handler.py @@ -61,6 +61,7 @@ def __init__(self, llm_handler: LLMHandler, response_handler: ResponseHandler, s self.file_path = os.path.join(current_path, "openapi_spec", str(strategy).split(".")[1].lower(), name.lower(), date) os.makedirs(self.file_path, exist_ok=True) self.file = os.path.join(self.file_path, self.filename) + print(f'self.file: {self.file}') self._capabilities = {"yaml": YAMLFile()} self.unsuccessful_paths = [] @@ -250,7 +251,7 @@ def write_openapi_to_yaml(self): # Write to YAML file with open(self.file, "w") as yaml_file: yaml.dump(openapi_data, yaml_file, allow_unicode=True, default_flow_style=False) - print(f"OpenAPI specification written to {self.filename}.") + print(f"OpenAPI specification written to {self.file}.") except Exception as e: raise Exception(f"Error writing YAML file: {e}") from e @@ -277,7 +278,7 @@ def _update_documentation(self, response, result, result_str, prompt_engineer): if result_str is None: return prompt_engineer endpoints = self.update_openapi_spec(response, result, prompt_engineer) - if prompt_engineer.prompt_helper.found_endpoints != endpoints and endpoints != [] and len(endpoints) != 1: + if prompt_engineer.prompt_helper.new_endpoint_found: self.write_openapi_to_yaml() prompt_engineer.prompt_helper.schemas = self.schemas diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py index e747ac0..75ea583 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/documentation/report_handler.py @@ -241,12 +241,12 @@ def write_vulnerability_to_report(self, test_step, test_over_step, raw_response, unsuccessful_msg = conditions.get('if_unsuccessful', "Vulnerability found.") success = any( + isinstance(expected, str) and str(status_code).strip() == str(expected.split()[0]).strip() and expected.split()[0].strip().isdigit() - for expected in expected_codes if expected.strip() + for expected in expected_codes if isinstance(expected, str) and expected.strip() ) - if not success: self.vulnerabilities_counter += 1 report_line = ( diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py index 02e0366..88c2571 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_analyzer_with_llm.py @@ -188,9 +188,11 @@ def process_step(self, step: str, prompt_history: list, capability:str) -> tuple # Call the LLM and handle the response response, completion = self.llm_handler.execute_prompt_with_specific_capability(prompt_history, capability) message = completion.choices[0].message - prompt_history.append(message) tool_call_id = message.tool_calls[0].id + msg = {"role": message.role, "content": message.content, "tool_calls": message.tool_calls} + prompt_history.append(msg) + # Execute any tool call results and handle outputs try: result = response.execute() @@ -198,6 +200,7 @@ def process_step(self, step: str, prompt_history: list, capability:str) -> tuple result = f"Error executing tool call: {str(e)}" prompt_history.append(tool_message(str(result), tool_call_id)) + return prompt_history, result def analyse_response(self, raw_response, step, prompt_history): diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py index c3b33dd..9a7fe09 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/response_processing/response_handler.py @@ -505,7 +505,8 @@ def handle_http_response(self, response: Any, prompt_history: Any, log: Any, com # Check response success is_successful = result_str.startswith("200") - prompt_history.append(message) + msg = {"role": message.role, "content": message.content, "tool_calls": message.tool_calls} + prompt_history.append(msg) self.last_path = request_path status_message = self.check_if_successful(is_successful, request_path, result_dict, result_str, categorized_endpoints) @@ -910,7 +911,9 @@ def adjust_path(self, response, move_type): return response def check_if_successful(self, is_successful, request_path, result_dict, result_str, categorized_endpoints): + self.prompt_helper.new_endpoint_found = False if is_successful: + self.prompt_helper.new_endpoint_found =True if "?" in request_path and request_path not in self.prompt_helper.found_query_endpoints: self.prompt_helper.found_query_endpoints.append(request_path) ep = request_path.split("?")[0] diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py index 9dc6773..a9f6422 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py @@ -528,16 +528,15 @@ def execute_response(self, response, completion): tool_call_id: str = message.tool_calls[0].id command: str = pydantic_core.to_json(response).decode() self.log.console.print(Panel(command, title="assistant")) - self._prompt_history.append(message) - + msg = {"role": message.role, "content": message.content, "tool_calls": message.tool_calls} + self._prompt_history.append(msg) result: Any = response.execute() self.log.console.print(Panel(result, title="tool")) if not isinstance(result, str): endpoint: str = str(response.action.path).split("/")[1] self._report_handler.write_endpoint_to_report(endpoint) - self._prompt_history.append( - tool_message(self._response_handler.extract_key_elements_of_response(result), tool_call_id)) + self._prompt_history.append(tool_message(self._response_handler.extract_key_elements_of_response(result), tool_call_id)) self.adjust_user(result) return result diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py index b1ff44b..77dfc32 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/testing/test_handler.py @@ -80,8 +80,7 @@ def generate_test_case(self, analysis: str, endpoint: str, method: str, body:str Returns: tuple: Test case description, test case dictionary, and updated prompt history. """ - prompt_text = f""" - Based on the following analysis of the API response, generate a detailed test case: + prompt_text = f"""Based on the following analysis of the API response, generate a detailed test case: Analysis: {analysis} @@ -93,14 +92,13 @@ def generate_test_case(self, analysis: str, endpoint: str, method: str, body:str - Example input data in JSON format. - Expected result or assertion based on method and endpoint call. - Format: + return a PythonTestCase object that should look like this : + Format: {{ "description": "Test case for {method} {endpoint}", "input": {body}, "expected_output": {{"expected_body": body, "expected_status_code": status_code}} - }} - - return a PythonTestCase object + }}, """ prompt_history.append({"role": "system", "content": prompt_text}) response, completion = self._llm_handler.execute_prompt_with_specific_capability(prompt_history, diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py index 6877131..05cadd3 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/utils/configuration_handler.py @@ -7,14 +7,14 @@ class ConfigurationHandler(object): def __init__(self, config_file, strategy_string=None): - self.config_file = config_file + self.config_path = config_file self.strategy_string = strategy_string def load(self, strategy_string=None): - if self.config_file != "": - if self.config_file != "": + if self.config_path != "": + if self.config_path != "": current_file_path = os.path.dirname(os.path.abspath(__file__)) - self.config_path = os.path.join(current_file_path, "configs", self.config_file) + self.config_path = os.path.join(current_file_path, "configs", self.config_path) config = self._load_config() if "spotify" in self.config_path: diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py index 7547b7f..33e2a53 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py @@ -2,7 +2,7 @@ from typing import Any, Dict, List import openai -from instructor.exceptions import IncompleteOutputException +from instructor.exceptions import IncompleteOutputException, InstructorRetryException from hackingBuddyGPT.capabilities.capability import capabilities_to_action_model @@ -52,7 +52,8 @@ def call_model(prompt: List[Dict[str, Any]]) -> Any: """Helper function to make the API call with the adjusted prompt.""" if isinstance(prompt, list): if isinstance(prompt[0], list): - prompt = prompt[0] + adjusted_prompt = prompt[0] + prompt = self._ensure_that_tool_messages_are_correct(adjusted_prompt, prompt) return self.llm.instructor.chat.completions.create_with_completion( model=self.llm.model, @@ -65,12 +66,14 @@ def call_model(prompt: List[Dict[str, Any]]) -> Any: try: if isinstance(prompt, list) and len(prompt) >= 10: - prompt = prompt[-10:] + adjusted_prompt = prompt[-10:] + prompt = self._ensure_that_tool_messages_are_correct(adjusted_prompt, prompt) + if isinstance(prompt, str): prompt = [prompt] return call_model(prompt) - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: try: # First adjustment attempt based on prompt length @@ -79,15 +82,13 @@ def call_model(prompt: List[Dict[str, Any]]) -> Any: adjusted_prompt = self.adjust_prompt(prompt, num_prompts=1) adjusted_prompt = self._ensure_that_tool_messages_are_correct(adjusted_prompt, prompt) prompt= adjusted_prompt - if isinstance(prompt, str): - adjusted_prompt = [prompt] - prompt= adjusted_prompt + return call_model(prompt) - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: # Second adjustment based on token size if the first attempt fails adjusted_prompt = self.adjust_prompt(prompt) if isinstance(adjusted_prompt, str): @@ -122,24 +123,15 @@ def call_model(adjusted_prompt: List[Dict[str, Any]], capability: Any) -> Any: response_model=capabilities_to_action_model(capability), #max_tokens=1000 # adjust as needed ) - - # Helper to adjust the prompt based on its length. - def adjust_prompt_based_on_length(prompt: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - if self.adjusting_counter == 2: - num_prompts = 10 - self.adjusting_counter = 0 - else: - num_prompts = int( - len(prompt) - 0.5 * len(prompt) if len(prompt) >= 20 else len(prompt) - 0.3 * len(prompt)) - return self.adjust_prompt(prompt, num_prompts=num_prompts) - try: # First adjustment attempt based on prompt length if len(prompt) >= 10: - prompt = prompt[-10:] + shortened_prompt = prompt[-10:] + prompt = self._ensure_that_tool_messages_are_correct(shortened_prompt, prompt) + return call_model(prompt, capability) - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: try: # Second adjustment based on token size if the first attempt fails @@ -150,7 +142,7 @@ def adjust_prompt_based_on_length(prompt: List[Dict[str, Any]]) -> List[Dict[str adjusted_prompt = call_model(adjusted_prompt, capability) return adjusted_prompt - except (openai.BadRequestError, IncompleteOutputException) as e: + except (openai.BadRequestError, IncompleteOutputException, InstructorRetryException) as e: # Final fallback with the smallest prompt size shortened_prompt = self.adjust_prompt(prompt) @@ -180,6 +172,8 @@ def adjust_prompt(self, prompt: List[Dict[str, Any]], num_prompts: int = 5) -> L # Ensure not to exceed the available prompts adjusted_prompt = prompt[-num_prompts:] adjusted_prompt = adjusted_prompt[:len(adjusted_prompt) - len(adjusted_prompt) % 2] + + if adjusted_prompt == []: return prompt @@ -202,14 +196,18 @@ def _ensure_that_tool_messages_are_correct(self, adjusted_prompt, prompt): # Ensure adjusted_prompt items are valid dicts and follow `tool` message constraints validated_prompt = [] last_item = None + if adjusted_prompt[0].get("role") == "tool": + adjusted_prompt.remove(adjusted_prompt[0]) adjusted_prompt.reverse() - + # TODO: Fix this logic for item in adjusted_prompt: if isinstance(item, dict): # Remove `tool` messages without a preceding `tool_calls` message - if item.get("role") == "tool" and (last_item is None or last_item.get("role") != "tool_calls"): + if item.get("role") == "assistant" and "tool_calls" not in last_item: + validated_prompt.remove(last_item) continue + # Track valid items validated_prompt.append(item) last_item = item @@ -219,7 +217,7 @@ def _ensure_that_tool_messages_are_correct(self, adjusted_prompt, prompt): validated_prompt.reverse() if validated_prompt == []: validated_prompt = [prompt[-1]] - if isinstance(validated_prompt, object): + if isinstance(validated_prompt, str): validated_prompt = [validated_prompt] return validated_prompt diff --git a/src/hackingBuddyGPT/usecases/windows_privesc.py b/src/hackingBuddyGPT/usecases/windows_privesc.py new file mode 100644 index 0000000..537e967 --- /dev/null +++ b/src/hackingBuddyGPT/usecases/windows_privesc.py @@ -0,0 +1,49 @@ +from hackingBuddyGPT.usecases.base import use_case +from hackingBuddyGPT.strategies import CommandStrategy +from hackingBuddyGPT.utils import SSHConnection + +from mako.template import Template + +from src.hackingBuddyGPT.capabilities.psexec_run_command import PSExecRunCommand +from src.hackingBuddyGPT.capabilities.psexec_test_credential import PSExecTestCredential + +TEMPLATE = """ +You are a low-privilege user ${conn.username} with password ${conn.password} +on a ${system} system and your goal is to become the user '${target_user}' by abusing +commands and misconfiguration on the tested system. + +${capabilities} + +% if len(history) != 0: +You already tried the following commands: + +~~~ bash +${history} +~~~ + +Do not repeat already tried escalation attacks. +%endif + +Give your command. Do not add any explanation or add an initial `$`. +""" + +@use_case("Strategy-based Windows Priv-Escalation") +class PrivEscWindows(CommandStrategy): + conn: SSHConnection = None + + def init(self): + super().init() + + self._template = Template(TEMPLATE) + + self.add_capability(PSExecRunCommand(conn=self.conn), default=True) + self.add_capability(PSExecTestCredential(conn=self.conn)) + + self._template_params.update({ + "system": "Windows", + "target_user": "Administrator", + "conn": self.conn + }) + + def get_name(self) -> str: + return self.__class__.__name__ \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/capability_manager.py b/src/hackingBuddyGPT/utils/capability_manager.py new file mode 100644 index 0000000..da86ccf --- /dev/null +++ b/src/hackingBuddyGPT/utils/capability_manager.py @@ -0,0 +1,65 @@ +import datetime +from typing import Dict +from hackingBuddyGPT.utils.logging import Logger + +from hackingBuddyGPT.capabilities.capability import ( + Capability, + capabilities_to_simple_text_handler, +) + +class CapabilityManager: + log: Logger = None + + _capabilities: Dict[str, Capability] = {} + _default_capability: Capability = None + + def __init__(self, log): + self.log = log + + def add_capability(self, cap: Capability, name: str = None, default: bool = False): + if name is None: + name = cap.get_name() + self._capabilities[name] = cap + if default: + self._default_capability = cap + + def get_capability(self, name: str) -> Capability: + return self._capabilities.get(name, self._default_capability) + + def run_capability_json(self, message_id: int, tool_call_id: str, capability_name: str, arguments: str) -> str: + capability = self.get_capability(capability_name) + + tic = datetime.datetime.now() + try: + result = capability.to_model().model_validate_json(arguments).execute() + except Exception as e: + result = f"EXCEPTION: {e}" + duration = datetime.datetime.now() - tic + + self.log.add_tool_call(message_id, tool_call_id, capability_name, arguments, result, duration) + return result + + def run_capability_simple_text(self, message_id: int, cmd: str) -> tuple[str, str, str, bool]: + _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities, default_capability=self._default_capability) + + tic = datetime.datetime.now() + try: + success, output = parser(cmd) + except Exception as e: + success = False + output = f"EXCEPTION: {e}" + duration = datetime.datetime.now() - tic + + if not success: + self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) + return "", "", output, False + + capability, cmd, (result, got_root) = output + self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) + + return capability, cmd, result, got_root + + def get_capability_block(self) -> str: + capability_descriptions, _parser = capabilities_to_simple_text_handler(self._capabilities) + return "You can either\n\n" + "\n".join(f"- {description}" for description in capability_descriptions.values()) + diff --git a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py index 695bc74..6cc57eb 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py @@ -1486,9 +1486,12 @@ def mechanic_report(self, endpoint, account, prompts): return prompts def random_common_users(self, endpoint, login_path, login_schema, prompts): - - random_entries = self.df.sample(n=10, - random_state=42) # Adjust random_state for different samples + if len(self.df) >= 10: + random_entries = self.df.sample(n=10, + random_state=42) # Adjust random_state for different samples + else: + # Either raise an error, sample fewer, or handle gracefully + random_entries = self.df.sample(n=len(self.df)) if len(self.df) > 0 else pandas.DataFrame() for index, random_entry in random_entries.iterrows(): username = random_entry['username'] diff --git a/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py b/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py index 044cdc7..03986f8 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/prompt_generation_helper.py @@ -28,6 +28,7 @@ def __init__(self, host, description): self.counter = 0 self.uuid =uuid.uuid4() self.bad_request_endpoints = [] + self.new_endpoint_found = False self.endpoint_examples = {} self.name = "" if "coin" in host.lower():