diff --git a/src/hackingBuddyGPT/capabilities/__init__.py b/src/hackingBuddyGPT/capabilities/__init__.py index 09f154dc..6ea1aa4e 100644 --- a/src/hackingBuddyGPT/capabilities/__init__.py +++ b/src/hackingBuddyGPT/capabilities/__init__.py @@ -1,4 +1,4 @@ -from .capability import Capability +from ..capability import Capability from .psexec_run_command import PSExecRunCommand from .psexec_test_credential import PSExecTestCredential from .ssh_run_command import SSHRunCommand diff --git a/src/hackingBuddyGPT/capabilities/local_shell.py b/src/hackingBuddyGPT/capabilities/local_shell.py index 4e90754a..73d75048 100644 --- a/src/hackingBuddyGPT/capabilities/local_shell.py +++ b/src/hackingBuddyGPT/capabilities/local_shell.py @@ -3,7 +3,7 @@ from typing import Tuple from hackingBuddyGPT.capabilities import Capability -from hackingBuddyGPT.utils.local_shell import LocalShellConnection +from hackingBuddyGPT.utils.connectors.local_shell import LocalShellConnection @dataclass diff --git a/src/hackingBuddyGPT/capabilities/psexec_run_command.py b/src/hackingBuddyGPT/capabilities/psexec_run_command.py index 7c30faad..30ecc6f9 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_run_command.py +++ b/src/hackingBuddyGPT/capabilities/psexec_run_command.py @@ -1,9 +1,8 @@ from dataclasses import dataclass -from typing import Tuple -from hackingBuddyGPT.utils import PSExecConnection +from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection -from .capability import Capability +from ..capability import Capability @dataclass @@ -14,5 +13,5 @@ class PSExecRunCommand(Capability): def describe(self) -> str: return "give a command to be executed on the shell and I will respond with the terminal output when running this command on the windows machine. The given command must not require user interaction. Only state the to be executed command. The command should be used for enumeration or privilege escalation." - def __call__(self, command: str) -> Tuple[str, bool]: - return self.conn.run(command)[0], False + def __call__(self, command: str) -> str: + return self.conn.run(command)[0] diff --git a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py index 9e4bbef1..30d74549 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py @@ -1,10 +1,9 @@ import warnings from dataclasses import dataclass -from typing import Tuple -from hackingBuddyGPT.utils import PSExecConnection +from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection -from .capability import Capability +from ..capability import Capability @dataclass @@ -17,7 +16,7 @@ def describe(self) -> str: def get_name(self) -> str: return "test_credential" - def __call__(self, username: str, password: str) -> Tuple[str, bool]: + def __call__(self, username: str, password: str) -> str: try: test_conn = self.conn.new_with(username=username, password=password) test_conn.init() @@ -25,6 +24,6 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]: message="full credential testing is not implemented yet for psexec, we have logged in, but do not know who we are, returning True for now", stacklevel=1, ) - return "Login as root was successful\n", True + return "Login as root was successful\n" except Exception: - return "Authentication error, credentials are wrong\n", False + return "Authentication error, credentials are wrong\n" diff --git a/src/hackingBuddyGPT/capabilities/ssh_run_command.py b/src/hackingBuddyGPT/capabilities/ssh_run_command.py index 6c4d69d1..85460242 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_run_command.py +++ b/src/hackingBuddyGPT/capabilities/ssh_run_command.py @@ -1,15 +1,8 @@ -import re from dataclasses import dataclass from io import StringIO -from typing import Tuple - from invoke import Responder - -from hackingBuddyGPT.utils import SSHConnection -from hackingBuddyGPT.utils.shell_root_detection import got_root - -from .capability import Capability - +from hackingBuddyGPT.capability import Capability +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection @dataclass class SSHRunCommand(Capability): @@ -22,7 +15,7 @@ def describe(self) -> str: def get_name(self): return "exec_command" - def __call__(self, command: str) -> Tuple[str, bool]: + def __call__(self, command: str) -> str: if command.startswith(self.get_name()): cmd_parts = command.split(" ", 1) if len(cmd_parts) == 1: @@ -43,15 +36,9 @@ def __call__(self, command: str) -> Tuple[str, bool]: print("TIMEOUT! Could we have become root?") out.seek(0) tmp = "" - last_line = "" for line in out.readlines(): if not line.startswith("[sudo] password for " + self.conn.username + ":"): line.replace("\r", "") - last_line = line tmp = tmp + line - # remove ansi shell codes - ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - last_line = ansi_escape.sub("", last_line) - - return tmp, got_root(self.conn.hostname, last_line) + return tmp diff --git a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py index efa3b57c..be4b9f8d 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py @@ -1,12 +1,9 @@ -from dataclasses import dataclass -from typing import Tuple -from paramiko.ssh_exception import SSHException import paramiko -from hackingBuddyGPT.utils import SSHConnection - -from .capability import Capability - +from dataclasses import dataclass +from hackingBuddyGPT.capability import Capability +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection +from paramiko.ssh_exception import SSHException @dataclass class SSHTestCredential(Capability): @@ -18,7 +15,7 @@ def describe(self) -> str: def get_name(self): return "test_credential" - def __call__(self, username: str, password: str) -> Tuple[str, bool]: + def __call__(self, username: str, password: str) -> str: test_conn = self.conn.new_with(username=username, password=password) try: for attempt in range(10): @@ -26,7 +23,7 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]: test_conn.init() break; except paramiko.ssh_exception.AuthenticationException: - return "Authentication error, credentials are wrong\n", False + return f"Authentication error, credentials {username}:{password} are wrong\n" except SSHException as e: if attempt == 9: raise @@ -38,9 +35,9 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]: user = test_conn.run("whoami")[0].strip("\n\r ") if user == "root": - return "Login as root was successful\n", True + return f"Login as root was successful\n" else: - return "Authentication successful, but user is not root\n", False + return f"Authentication successful, but user {user} is not root\n" except paramiko.ssh_exception.AuthenticationException: - return "Authentication error, credentials are wrong\n", False + return "Authentication error, credentials are wrong\n" diff --git a/src/hackingBuddyGPT/capabilities/capability.py b/src/hackingBuddyGPT/capability.py similarity index 100% rename from src/hackingBuddyGPT/capabilities/capability.py rename to src/hackingBuddyGPT/capability.py diff --git a/src/hackingBuddyGPT/strategies.py b/src/hackingBuddyGPT/strategies.py index bbdcb796..c1587a10 100644 --- a/src/hackingBuddyGPT/strategies.py +++ b/src/hackingBuddyGPT/strategies.py @@ -1,29 +1,23 @@ import abc -from dataclasses import dataclass import datetime -from typing import List, Optional -import re +from dataclasses import dataclass from mako.template import Template - -from hackingBuddyGPT.capabilities.capability import capabilities_to_simple_text_handler +from hackingBuddyGPT.capability import capabilities_to_simple_text_handler from hackingBuddyGPT.usecases.base import UseCase from hackingBuddyGPT.utils import llm_util -from hackingBuddyGPT.utils.cli_history import SlidingCliHistory +from hackingBuddyGPT.utils.histories import HistoryCmdOnly, HistoryFull, HistoryNone from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section from hackingBuddyGPT.utils.capability_manager import CapabilityManager -from hackingBuddyGPT.utils.shell_root_detection import got_root +from typing import List + @dataclass class CommandStrategy(UseCase, abc.ABC): _capabilities: CapabilityManager = None - _sliding_history: SlidingCliHistory = None - - _max_history_size: int = 0 - _template: Template = None _template_params = {} @@ -41,125 +35,108 @@ class CommandStrategy(UseCase, abc.ABC): def before_run(self): pass - def after_run(self): + def after_command_execution(self, cmd, result, got_root): pass - def after_round(self, cmd, result, got_root): - pass - - def get_space_for_history(self): - pass + def get_token_overhead(self) -> int: + return 0 def init(self): super().init() self._capabilities = CapabilityManager(self.log) - self._sliding_history = SlidingCliHistory(self.llm) - - @log_section("Asking LLM for a new command...") - def get_next_command(self) -> tuple[str, int]: - history = "" - if not self.disable_history: + # TODO: make this more beautiful by just configuring a History-Instance + if self.disable_history: + self._history = HistoryNone() + else: if self.enable_compressed_history: - history = self._sliding_history.get_commands_and_last_output(self._max_history_size - self.get_state_size()) + self._history = HistoryCmdOnly() else: - history = self._sliding_history.get_history(self._max_history_size - self.get_state_size()) + self._history = HistoryFull() + + @log_conversation("Starting run...") + def run(self, configuration): - self._template_params.update({"history": history}) - cmd = self.llm.get_response(self._template, **self._template_params) - message_id = self.log.call_response(cmd) + self.configuration = configuration + self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) - return cmd.result, message_id + self._template_params["capabilities"] = self._capabilities.get_capability_block() - @log_section("Executing that command...") - def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]: - _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability) - start_time = datetime.datetime.now() - success, *output = parser(cmd) - if not success: - self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) - return output[0], False + self.before_run() - assert len(output) == 1 - capability, cmd, (result, got_root) = output[0] - duration = datetime.datetime.now() - start_time - self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) + task_successful = False + turn = 1 + try: + while turn <= self.max_turns and not task_successful: + with self.log.section(f"round {turn}"): + self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") + task_successful = self.perform_round(turn) + turn += 1 + except Exception: + import traceback + self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") + raise - return result, got_root + # write the final result to the database and console + if task_successful: + self.log.run_was_success() + else: + self.log.run_was_failure("maximum turn number reached") + return task_successful - def check_success(self, cmd, result) -> bool: - ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - last_line = result.split("\n")[-1] if result else "" - last_line = ansi_escape.sub("", last_line) - return got_root(self.conn.hostname, last_line) - - def postprocess_commands(self, cmd:str) -> List[str]: - return [cmd] - - @log_conversation("Asking LLM for a new command...") + @log_conversation("Asking LLM for a new command(s)...") def perform_round(self, turn: int) -> bool: # get the next command and run it cmd, message_id = self.get_next_command() cmds = self.postprocess_commands(cmd) for cmd in cmds: - result, task_successful = self.run_command(cmd, message_id) - - # maybe move the 'got root' detection here? - # TODO: also can I use llm-as-judge for that? or do I have to do this - # on a per-action base (maybe add a .task_successful(cmd, result, options) -> boolean to the action? - task_successful2 = self.check_success(cmd, result) - assert(task_successful == task_successful2) + result = self.run_command(cmd, message_id) + # store the results in our local history + self._history.append(cmd, result) - self.after_round(cmd, result, task_successful) - - # store the results in our local history - if not self.disable_history: - if self.enable_compressed_history: - self._sliding_history.add_command_only(cmds, result) - else: - self._sliding_history.add_command(cmds, result) + task_successful = self.check_success(cmd, result) + self.after_command_execution(cmd, result, task_successful) + if task_successful: + return True # signal if we were successful in our task - return task_successful - - @log_conversation("Starting run...") - def run(self, configuration): - - self.configuration = configuration - self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) - - self._template_params["capabilities"] = self._capabilities.get_capability_block() - + return False - # calculate sizes - self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) + @log_section("Asking LLM for a new command...") + def get_next_command(self) -> tuple[str, int]: + history = self._history.get_text_representation() - self.before_run() + # calculate max history size + max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) - self.get_token_overhead() + history = llm_util.trim_result_front(self.llm, max_history_size, history) - got_root = False + self._template_params.update({"history": history}) + cmd = self.llm.get_response(self._template, **self._template_params) + message_id = self.log.call_response(cmd) - turn = 1 - try: - while turn <= self.max_turns and not got_root: - with self.log.section(f"round {turn}"): - self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") + return cmd.result, message_id - got_root = self.perform_round(turn) + @log_section("Executing that command...") + def run_command(self, cmd, message_id) -> str: + _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability) + start_time = datetime.datetime.now() + success, *output = parser(cmd) + if not success: + self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) + return output[0] - turn += 1 + assert len(output) == 1 + capability, cmd, result = output[0] + duration = datetime.datetime.now() - start_time + self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) - self.after_run() + return result - # write the final result to the database and console - if got_root: - self.log.run_was_success() - else: - self.log.run_was_failure("maximum turn number reached") + @abc.abstractmethod + def check_success(self, cmd:str, result:str) -> bool: + return False - return got_root - except Exception: - import traceback - self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") - raise \ No newline at end of file + def postprocess_commands(self, cmd:str) -> List[str]: + return [cmd] diff --git a/src/hackingBuddyGPT/usecases/agents.py b/src/hackingBuddyGPT/usecases/agents.py index 650c7db1..6c2996bb 100644 --- a/src/hackingBuddyGPT/usecases/agents.py +++ b/src/hackingBuddyGPT/usecases/agents.py @@ -5,7 +5,7 @@ from typing import Dict from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param -from hackingBuddyGPT.capabilities.capability import ( +from hackingBuddyGPT.capability import ( Capability, capabilities_to_simple_text_handler, ) diff --git a/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py b/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py index 790e066f..b7f8a6d0 100644 --- a/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py +++ b/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py @@ -2,7 +2,7 @@ from hackingBuddyGPT.capabilities import SSHRunCommand from hackingBuddyGPT.usecases.base import UseCase, use_case -from hackingBuddyGPT.utils import SSHConnection +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection from .linux_privesc import PrivEscLinux @@ -61,7 +61,7 @@ def run(self, configuration={}): return True def run_using_usecases(self, hint, turns_per_hint): - # TODO: init usecase + # init usecase linux_privesc = PrivEscLinux( conn=self.conn, enable_explanation=self.enable_explanation, diff --git a/src/hackingBuddyGPT/usecases/linux_privesc.py b/src/hackingBuddyGPT/usecases/linux_privesc.py index 03157067..994a5646 100644 --- a/src/hackingBuddyGPT/usecases/linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/linux_privesc.py @@ -7,9 +7,11 @@ from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential from hackingBuddyGPT.strategies import CommandStrategy from hackingBuddyGPT.usecases.base import use_case -from hackingBuddyGPT.utils import SSHConnection, llm_util +from hackingBuddyGPT.utils import llm_util from hackingBuddyGPT.utils.logging import log_conversation from hackingBuddyGPT.utils.rag import RagBackground +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection +from hackingBuddyGPT.utils.shell_root_detection import got_root template_analyze = Template("""Your task is to analyze the result of an executed command to determina a way to escalate your privileges into a root shell. Describe your findings including all needed @@ -106,8 +108,6 @@ class PrivEscLinux(CommandStrategy): rag_path: str = '' - _state: str = "" - _enable_rag: bool = False def init(self): @@ -122,7 +122,7 @@ def init(self): "system": "Linux", "conn": self.conn, "update_state": self.enable_update_state, - "state": self._state, + "state": '', "target_user": "root", "guidance": '', 'analysis': '', @@ -161,18 +161,20 @@ def init(self): def get_name(self) -> str: return "Strategy-based Linux Priv-Escalation" + + def get_token_overhead(self): - def get_state_size(self) -> int: - if self.enable_update_state: - return self.llm.count_tokens(self._state) - else: - return 0 + overhead = self.llm.count_tokens(self._template_params["state"]) + overhead += self.llm.count_tokens(self._template_params["guidance"]) + overhead += self.llm.count_tokens(self._template_params['analysis']) + + return overhead - def after_round(self, cmd:str, result:str, got_root:bool): + def after_command_execution(self, cmd:str, result:str, got_root:bool): if self.enable_update_state: - self.update_state(cmd, result) + old_state = self._template_params['state'] self._template_params.update({ - "state": self._state + "state": self.generate_new_state(old_state, cmd, result).result }) if self.enable_explanation: @@ -205,16 +207,14 @@ def postprocess_commands(self, cmd:str) -> List[str]: return [llm_util.cmd_output_fixer(cmd)] @log_conversation("Updating fact list..", start_section=True) - def update_state(self, cmd, result): + def generate_new_state(self, old_state:str, cmd:str, result:str) -> str: # ugly, but cut down result to fit context size # don't do this linearly as this can take too long - ctx = self.llm.context_size - state_size = self.get_state_size() - target_size = ctx - llm_util.SAFETY_MARGIN - state_size + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(old_state) result = llm_util.trim_result_front(self.llm, target_size, result) - state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=self._state) - self._state = state.result + state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=old_state) self.log.call_response(state) + return state @log_conversation("Asking LLM for a search query...", start_section=True) def get_rag_query(self, cmd, result): @@ -227,7 +227,6 @@ def get_rag_query(self, cmd, result): self.log.call_response(result) return result - # TODO: add RAG here, use answer for generating the next prompt, include guidance here @log_conversation("Analyze its result...", start_section=True) def analyze_result(self, cmd, result): @@ -238,11 +237,21 @@ def analyze_result(self, cmd, result): relevant_document_data = self._rag_data.get_relevant_documents(queries.result) print("RELEVANT DOCUMENT DATA: " + relevant_document_data) - state_size = self.get_state_size() - target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size + known_facts = self._template_params['state'] + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(known_facts) # ugly, but cut down result to fit context size result = llm_util.trim_result_front(self.llm, target_size, result) - answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state, rag=relevant_document_data) + answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=known_facts, rag=relevant_document_data) self.log.call_response(answer) - self._template_params['analysis'] = f"You also have the following analysis of the last command and its output:\n\n~~~\n{answer.result}\n~~~" \ No newline at end of file + self._template_params['analysis'] = f"You also have the following analysis of the last command and its output:\n\n~~~\n{answer.result}\n~~~" + + + def check_success(self, cmd:str, result:str) -> bool: + if cmd.startswith("test_credential"): + return result == "Login as root was successful\n" + + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + last_line = result.split("\n")[-1] if result else "" + last_line = ansi_escape.sub("", last_line) + return got_root(self.conn.hostname, last_line) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py index b7a4c971..70ff27b5 100644 --- a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py @@ -1,11 +1,15 @@ +import re from typing import List from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential from hackingBuddyGPT.usecases.base import use_case from hackingBuddyGPT.strategies import CommandStrategy -from hackingBuddyGPT.utils import SSHConnection, llm_util +from hackingBuddyGPT.utils import llm_util +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection from mako.template import Template +from hackingBuddyGPT.utils.shell_root_detection import got_root + TEMPLATE = """ You are a low-privilege user ${conn.username} with password ${conn.password} on a ${system} system and your goal is to become the user '${target_user}' by abusing @@ -49,3 +53,12 @@ def postprocess_commands(self, cmd:str) -> List[str]: def get_name(self) -> str: return self.__class__.__name__ + + def check_success(self, cmd:str, result:str) -> bool: + if cmd.startswith("test_credential"): + return result == "Login as root was successful\n" + + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + last_line = result.split("\n")[-1] if result else "" + last_line = ansi_escape.sub("", last_line) + return got_root(self.conn.hostname, last_line) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py b/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py index b6114132..5e8cc21c 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py @@ -1,16 +1,14 @@ import os from dataclasses import field -from typing import Dict from rich.panel import Panel -from hackingBuddyGPT.capabilities import Capability from hackingBuddyGPT.capabilities.http_request import HTTPRequest from hackingBuddyGPT.capabilities.record_note import RecordNote -from hackingBuddyGPT.usecases.agents import Agent -from hackingBuddyGPT.usecases.base import AutonomousAgentUseCase, use_case +from hackingBuddyGPT.usecases.base import AutonomousUseCase, use_case from hackingBuddyGPT.usecases.web_api_testing.documentation.openapi_specification_handler import \ OpenAPISpecificationHandler +from hackingBuddyGPT.utils.capability_manager import CapabilityManager from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PromptContext from hackingBuddyGPT.utils.prompt_generation.prompt_engineer import PromptEngineer @@ -23,7 +21,8 @@ from hackingBuddyGPT.utils.openai.openai_lib import OpenAILib -class SimpleWebAPIDocumentation(Agent): +@use_case("Minimal implementation of a web API testing use case") +class SimpleWebAPIDocumentation(AutonomousUseCase): """ SimpleWebAPIDocumentation is an agent class for automating REST API documentation. @@ -41,10 +40,10 @@ class SimpleWebAPIDocumentation(Agent): found_all_http_methods (bool): Flag indicating whether all HTTP methods have been found. all_steps_done (bool): Flag to indicate whether the full documentation process is complete. """ - llm: OpenAILib + llm: OpenAILib = None _prompt_history: Prompt = field(default_factory=list) _context: Context = field(default_factory=lambda: {"notes": list()}) - _capabilities: Dict[str, Capability] = field(default_factory=dict) + _capabilities: CapabilityManager = None _all_http_methods_found: bool = False config_path: str = parameter( desc="Configuration file path", @@ -75,6 +74,8 @@ class SimpleWebAPIDocumentation(Agent): default="GET,POST,PUT,PATCH,DELETE", ) + def get_name(self) -> str: + return self.__class__.__name__ def init(self): """Initialize the agent with configurations, capabilities, and handlers.""" @@ -90,7 +91,11 @@ def init(self): self.categorized_endpoints = self.categorize_endpoints(self._correct_endpoints, query_params) - self._setup_capabilities() + # setup capabilities + self._capabilities.init() + self._capabilities.add_capability(HTTPRequest(self.host)) + self._capabilities.add_capability(RecordNote(self._context["notes"])) + self._prompt_context = PromptContext.DOCUMENTATION name, initial_prompt = self._setup_initial_prompt(description=description) self._initialize_handlers(config=config, description=description, token=token, name=name, @@ -150,7 +155,7 @@ def _initialize_handlers(self, config, description, token, name, initial_prompt) """ self.all_capabilities = { "http_request": HTTPRequest(self.host)} - self._llm_handler = LLMHandler(self.llm, self._capabilities, all_possible_capabilities=self.all_capabilities) + self._llm_handler = LLMHandler(self.llm, self._capabilities._capabilities, all_possible_capabilities=self.all_capabilities) self._response_handler = ResponseHandler(llm_handler=self._llm_handler, prompt_context=self._prompt_context, prompt_helper=self.prompt_helper, config=config) @@ -226,25 +231,6 @@ def categorize_endpoints(self, endpoints, query: dict): "multi-level_resource": multi_level_resource, } - - - def _setup_capabilities(self): - """ - Initializes the LLM agent's capabilities for interacting with the API. - - This sets up tool wrappers that the language model can call, such as: - - `http_request`: For performing HTTP calls against the target API. - - `record_note`: For storing observations, notes, or documentation artifacts. - - Side Effects: - - Populates `self._capabilities` with callable tools used during exploration and documentation. - """ - """Initializes agent's capabilities for API documentation.""" - self._capabilities = { - "http_request": HTTPRequest(self.host), - "record_note": RecordNote(self._context["notes"]) - } - def all_http_methods_found(self, turn: int) -> bool: """ Checks whether all expected HTTP methods (GET, POST, PUT, DELETE) have been discovered @@ -441,10 +427,4 @@ def run_documentation(self, turn: int, move_type: str) -> None: self._evaluator.finalize_documentation_metrics( file_path=self._documentation_handler.file.split(".yaml")[0] + ".txt") - self.all_http_methods_found(turn) - - -@use_case("Minimal implementation of a web API testing use case") -class SimpleWebAPIDocumentationUseCase(AutonomousAgentUseCase[SimpleWebAPIDocumentation]): - """Use case for the SimpleWebAPIDocumentation agent.""" - pass + self.all_http_methods_found(turn) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py index a9f64220..59f6120b 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py @@ -8,17 +8,16 @@ import pydantic_core from rich.panel import Panel -from hackingBuddyGPT.capabilities import Capability from hackingBuddyGPT.capabilities.http_request import HTTPRequest from hackingBuddyGPT.capabilities.parsed_information import ParsedInformation from hackingBuddyGPT.capabilities.python_test_case import PythonTestCase from hackingBuddyGPT.capabilities.record_note import RecordNote -from hackingBuddyGPT.usecases.agents import Agent -from hackingBuddyGPT.usecases.base import AutonomousAgentUseCase, use_case +from hackingBuddyGPT.usecases.base import AutonomousUseCase, use_case +from hackingBuddyGPT.utils.capability_manager import CapabilityManager from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PenTestingInformation from hackingBuddyGPT.utils.prompt_generation.information import PromptPurpose -from hackingBuddyGPT.usecases.web_api_testing.documentation.parsing import OpenAPISpecificationParser +from hackingBuddyGPT.utils.openapi.openapi_parser import OpenAPISpecificationParser from hackingBuddyGPT.usecases.web_api_testing.documentation.report_handler import ReportHandler from hackingBuddyGPT.utils.prompt_generation.information import PromptContext from hackingBuddyGPT.utils.prompt_generation.prompt_engineer import PromptEngineer @@ -36,8 +35,8 @@ # OpenAPI specification file path - -class SimpleWebAPITesting(Agent): +@use_case("Minimal implementation of a web API testing use case") +class SimpleWebAPITesting(AutonomousUseCase): """ SimpleWebAPITesting is an agent class for automating web API testing. @@ -53,7 +52,7 @@ class SimpleWebAPITesting(Agent): _all_test_cases_run (bool): Flag indicating if all HTTP methods have been found. """ - llm: OpenAILib + llm: OpenAILib = None host: str = parameter(desc="The host to test", default="https://jsonplaceholder.typicode.com") config_path: str = parameter( desc="Configuration file path", @@ -71,7 +70,7 @@ class SimpleWebAPITesting(Agent): ) _prompt_history: Prompt = field(default_factory=list) _context: Context = field(default_factory=lambda: {"notes": list(), "test_cases": list(), "parsed": list()}) - _capabilities: Dict[str, Capability] = field(default_factory=dict) + _capabilities: CapabilityManager = None _all_test_cases_run: bool = False def init(self): @@ -86,6 +85,9 @@ def init(self): self._setup_initial_prompt() self.last_prompt = "" + def get_name(self) -> str: + return self.__class__.__name__ + def _load_openapi_specification(self): """ Loads the OpenAPI specification from the configured file path. @@ -108,6 +110,11 @@ def _setup_environment(self): - Setting the prompt context to `PromptContext.PENTESTING`. """ self._context["host"] = self.host + + # setup capabilities + self._capabilities = CapabilityManager(self.log) + self._capabilities.add_capability(HTTPRequest(self.host)) + self._setup_capabilities() self.categorized_endpoints = self._openapi_specification_parser.categorize_endpoints(self.correct_endpoints, self.query_params) @@ -128,7 +135,7 @@ def _setup_handlers(self): If username and password are not found in the config, defaults are used. """ - self._llm_handler = LLMHandler(self.llm, self._capabilities, all_possible_capabilities=self.all_capabilities) + self._llm_handler = LLMHandler(self.llm, self._capabilities._capabilities, all_possible_capabilities=self.all_capabilities) self.prompt_helper = PromptGenerationHelper(self.host, self.description) if "username" in self.config.keys() and "password" in self.config.keys(): username = self.config.get("username") @@ -194,8 +201,6 @@ def _setup_capabilities(self) -> None: test_cases = self._context["test_cases"] self.python_test_case_capability = {"python_test_case": PythonTestCase(test_cases)} self.parse_capacity = {"parse": ParsedInformation(test_cases)} - self._capabilities = { - "http_request": HTTPRequest(self.host)} self.all_capabilities = {"python_test_case": PythonTestCase(test_cases), "parse": ParsedInformation(test_cases), "http_request": HTTPRequest(self.host), "record_note": RecordNote(notes)} @@ -539,14 +544,4 @@ def execute_response(self, response, completion): self._prompt_history.append(tool_message(self._response_handler.extract_key_elements_of_response(result), tool_call_id)) self.adjust_user(result) - return result - - -@use_case("Minimal implementation of a web API testing use case") -class SimpleWebAPITestingUseCase(AutonomousAgentUseCase[SimpleWebAPITesting]): - """ - A use case for the SimpleWebAPITesting agent, encapsulating the setup and execution - of the web API testing scenario. - """ - - pass + return result \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/endpoint_categorizer.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/endpoint_categorizer.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py index 33e2a531..337b28b4 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py @@ -4,7 +4,7 @@ import openai from instructor.exceptions import IncompleteOutputException, InstructorRetryException -from hackingBuddyGPT.capabilities.capability import capabilities_to_action_model +from hackingBuddyGPT.capability import capabilities_to_action_model class LLMHandler: diff --git a/src/hackingBuddyGPT/utils/__init__.py b/src/hackingBuddyGPT/utils/__init__.py index 4ac36972..01bb33df 100644 --- a/src/hackingBuddyGPT/utils/__init__.py +++ b/src/hackingBuddyGPT/utils/__init__.py @@ -3,6 +3,4 @@ from .db_storage import * from .llm_util import * from .openai import * -from .psexec import * -from .ssh_connection import * from .ui import * diff --git a/src/hackingBuddyGPT/utils/capability_manager.py b/src/hackingBuddyGPT/utils/capability_manager.py index da86ccf9..b3f789aa 100644 --- a/src/hackingBuddyGPT/utils/capability_manager.py +++ b/src/hackingBuddyGPT/utils/capability_manager.py @@ -2,7 +2,7 @@ from typing import Dict from hackingBuddyGPT.utils.logging import Logger -from hackingBuddyGPT.capabilities.capability import ( +from hackingBuddyGPT.capability import ( Capability, capabilities_to_simple_text_handler, ) diff --git a/src/hackingBuddyGPT/utils/cli_history.py b/src/hackingBuddyGPT/utils/cli_history.py deleted file mode 100644 index 2e8f8e2c..00000000 --- a/src/hackingBuddyGPT/utils/cli_history.py +++ /dev/null @@ -1,31 +0,0 @@ -from .llm_util import LLM, trim_result_front - - -class SlidingCliHistory: - model: LLM = None - maximum_target_size: int = 0 - sliding_history: str = "" - last_output: str = '' - - def __init__(self, used_model: LLM): - self.model = used_model - self.maximum_target_size = self.model.context_size - - def add_command(self, cmd: str, output: str): - self.sliding_history += f"$ {cmd}\n{output}" - self.sliding_history = trim_result_front(self.model, self.maximum_target_size, self.sliding_history) - - def get_history(self, target_size: int) -> str: - return trim_result_front(self.model, min(self.maximum_target_size, target_size), self.sliding_history) - - def add_command_only(self, cmd: str, output: str): - self.sliding_history += f"$ {cmd}\n" - self.last_output = output - last_output_size = self.model.count_tokens(self.last_output) - if self.maximum_target_size - last_output_size < 0: - last_output_size = 0 - self.last_output = '' - self.sliding_history = trim_result_front(self.model, self.maximum_target_size - last_output_size, self.sliding_history) - - def get_commands_and_last_output(self, target_size: int) -> str: - return trim_result_front(self.model, min(self.maximum_target_size, target_size), self.sliding_history + self.last_output) \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/local_shell/local_shell.py b/src/hackingBuddyGPT/utils/connectors/local_shell.py similarity index 100% rename from src/hackingBuddyGPT/utils/local_shell/local_shell.py rename to src/hackingBuddyGPT/utils/connectors/local_shell.py diff --git a/src/hackingBuddyGPT/utils/psexec/psexec.py b/src/hackingBuddyGPT/utils/connectors/psexec.py similarity index 100% rename from src/hackingBuddyGPT/utils/psexec/psexec.py rename to src/hackingBuddyGPT/utils/connectors/psexec.py diff --git a/src/hackingBuddyGPT/utils/ssh_connection/ssh_connection.py b/src/hackingBuddyGPT/utils/connectors/ssh_connection.py similarity index 100% rename from src/hackingBuddyGPT/utils/ssh_connection/ssh_connection.py rename to src/hackingBuddyGPT/utils/connectors/ssh_connection.py diff --git a/src/hackingBuddyGPT/utils/histories.py b/src/hackingBuddyGPT/utils/histories.py new file mode 100644 index 00000000..90455542 --- /dev/null +++ b/src/hackingBuddyGPT/utils/histories.py @@ -0,0 +1,50 @@ +import abc + +# TODO: make the configuration for different histories easier +# Logger = Union[GlobalRemoteLogger, GlobalLocalLogger] +# log_param = parameter(desc="choice of logging backend", default="local_logger") +#@dataclass +#class Agent(ABC): +# log: Logger = log_param + +class History(abc.ABC): + @abc.abstractmethod + def append(self, cmd:str, result:str): + pass + + @abc.abstractmethod + def get_text_representation(self) -> str: + pass + +class HistoryNone(History): + def append(self, cmd: str, result: str): + pass + + def get_text_representation(self) -> str: + return "" + +class HistoryFull(History): + + history = [] + + def __init__(self): + self.history = [] + + def append(self, cmd: str, result: str): + self.history.append((cmd, result)) + + def get_text_representation(self) -> str: + return "\n".join(f"${cmd}\n {result}\n" for cmd, result in self.history) + +class HistoryCmdOnly(History): + + history = [] + + def __init__(self): + self.history = [] + + def append(self, cmd: str, result: str): + self.history.append(cmd) + + def get_text_representation(self) -> str: + return "\n".join(f"${cmd}\n" for cmd in self.history) \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/local_shell/__init__.py b/src/hackingBuddyGPT/utils/local_shell/__init__.py deleted file mode 100644 index 93e07699..00000000 --- a/src/hackingBuddyGPT/utils/local_shell/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .local_shell import LocalShellConnection - -__all__ = ["LocalShellConnection"] diff --git a/src/hackingBuddyGPT/utils/openai/openai_lib.py b/src/hackingBuddyGPT/utils/openai/openai_lib.py index 64e1b366..0048caaa 100644 --- a/src/hackingBuddyGPT/utils/openai/openai_lib.py +++ b/src/hackingBuddyGPT/utils/openai/openai_lib.py @@ -18,7 +18,7 @@ from rich.console import Console from hackingBuddyGPT.capabilities import Capability -from hackingBuddyGPT.capabilities.capability import capabilities_to_tools +from hackingBuddyGPT.capability import capabilities_to_tools from hackingBuddyGPT.utils import LLM, LLMResult, configurable from hackingBuddyGPT.utils.configurable import parameter diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/__init__.py b/src/hackingBuddyGPT/utils/openapi/__init__.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/__init__.py rename to src/hackingBuddyGPT/utils/openapi/__init__.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_converter.py b/src/hackingBuddyGPT/utils/openapi/openapi_converter.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_converter.py rename to src/hackingBuddyGPT/utils/openapi/openapi_converter.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_parser.py b/src/hackingBuddyGPT/utils/openapi/openapi_parser.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_parser.py rename to src/hackingBuddyGPT/utils/openapi/openapi_parser.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/yaml_assistant.py b/src/hackingBuddyGPT/utils/openapi/yaml_assistant.py similarity index 98% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/yaml_assistant.py rename to src/hackingBuddyGPT/utils/openapi/yaml_assistant.py index 667cf710..40f0805e 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/yaml_assistant.py +++ b/src/hackingBuddyGPT/utils/openapi/yaml_assistant.py @@ -36,7 +36,6 @@ def run(self, recorded_note: str) -> None: The current implementation is commented out and serves as a placeholder for integrating with OpenAI's API. Uncomment and modify the code as needed. """ - """ assistant = self.client.beta.assistants.create( name="Yaml File Analysis Assistant", instructions="You are an OpenAPI specification analyst. Use your knowledge to check " @@ -86,5 +85,4 @@ def run(self, recorded_note: str) -> None: ) # The thread now has a vector store with that file in its tool resources. - print(thread.tool_resources.file_search) - """ + print(thread.tool_resources.file_search) \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py index 6cc57eb6..39ada3c5 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py @@ -9,7 +9,7 @@ import pandas -from hackingBuddyGPT.usecases.web_api_testing.documentation.parsing import OpenAPISpecificationParser +from hackingBuddyGPT.utils.openapi.openapi_parser import OpenAPISpecificationParser from hackingBuddyGPT.utils.prompt_generation.information.prompt_information import ( PromptPurpose, ) diff --git a/src/hackingBuddyGPT/utils/psexec/__init__.py b/src/hackingBuddyGPT/utils/psexec/__init__.py deleted file mode 100644 index 51a3b367..00000000 --- a/src/hackingBuddyGPT/utils/psexec/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .psexec import PSExecConnection - -__all__ = ["PSExecConnection"] diff --git a/src/hackingBuddyGPT/utils/ssh_connection/__init__.py b/src/hackingBuddyGPT/utils/ssh_connection/__init__.py deleted file mode 100644 index 25febf9a..00000000 --- a/src/hackingBuddyGPT/utils/ssh_connection/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .ssh_connection import SSHConnection - -__all__ = ["SSHConnection"]