From 8d6e392a81344881b847de11773b8b94b41de1fd Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Wed, 3 Sep 2025 10:27:36 +0200 Subject: [PATCH 1/7] move usecase to src/hackingBuddyGPT --- src/hackingBuddyGPT/capabilities/__init__.py | 2 +- src/hackingBuddyGPT/capabilities/psexec_run_command.py | 2 +- .../capabilities/psexec_test_credential.py | 2 +- src/hackingBuddyGPT/capabilities/ssh_run_command.py | 2 +- .../capabilities/ssh_test_credential.py | 2 +- src/hackingBuddyGPT/{capabilities => }/capability.py | 0 src/hackingBuddyGPT/strategies.py | 10 +++++++++- src/hackingBuddyGPT/usecases/agents.py | 2 +- .../usecases/web_api_testing/utils/llm_handler.py | 2 +- src/hackingBuddyGPT/utils/capability_manager.py | 2 +- src/hackingBuddyGPT/utils/openai/openai_lib.py | 2 +- 11 files changed, 18 insertions(+), 10 deletions(-) rename src/hackingBuddyGPT/{capabilities => }/capability.py (100%) diff --git a/src/hackingBuddyGPT/capabilities/__init__.py b/src/hackingBuddyGPT/capabilities/__init__.py index 09f154dc..6ea1aa4e 100644 --- a/src/hackingBuddyGPT/capabilities/__init__.py +++ b/src/hackingBuddyGPT/capabilities/__init__.py @@ -1,4 +1,4 @@ -from .capability import Capability +from ..capability import Capability from .psexec_run_command import PSExecRunCommand from .psexec_test_credential import PSExecTestCredential from .ssh_run_command import SSHRunCommand diff --git a/src/hackingBuddyGPT/capabilities/psexec_run_command.py b/src/hackingBuddyGPT/capabilities/psexec_run_command.py index 7c30faad..52d205d7 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_run_command.py +++ b/src/hackingBuddyGPT/capabilities/psexec_run_command.py @@ -3,7 +3,7 @@ from hackingBuddyGPT.utils import PSExecConnection -from .capability import Capability +from ..capability import Capability @dataclass diff --git a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py index 9e4bbef1..a71d0b06 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py @@ -4,7 +4,7 @@ from hackingBuddyGPT.utils import PSExecConnection -from .capability import Capability +from ..capability import Capability @dataclass diff --git a/src/hackingBuddyGPT/capabilities/ssh_run_command.py b/src/hackingBuddyGPT/capabilities/ssh_run_command.py index 6c4d69d1..4d56c779 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_run_command.py +++ b/src/hackingBuddyGPT/capabilities/ssh_run_command.py @@ -8,7 +8,7 @@ from hackingBuddyGPT.utils import SSHConnection from hackingBuddyGPT.utils.shell_root_detection import got_root -from .capability import Capability +from ..capability import Capability @dataclass diff --git a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py index efa3b57c..78001a8c 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py @@ -5,7 +5,7 @@ from hackingBuddyGPT.utils import SSHConnection -from .capability import Capability +from ..capability import Capability @dataclass diff --git a/src/hackingBuddyGPT/capabilities/capability.py b/src/hackingBuddyGPT/capability.py similarity index 100% rename from src/hackingBuddyGPT/capabilities/capability.py rename to src/hackingBuddyGPT/capability.py diff --git a/src/hackingBuddyGPT/strategies.py b/src/hackingBuddyGPT/strategies.py index bbdcb796..06f03699 100644 --- a/src/hackingBuddyGPT/strategies.py +++ b/src/hackingBuddyGPT/strategies.py @@ -6,7 +6,7 @@ from mako.template import Template -from hackingBuddyGPT.capabilities.capability import capabilities_to_simple_text_handler +from hackingBuddyGPT.capability import capabilities_to_simple_text_handler from hackingBuddyGPT.usecases.base import UseCase from hackingBuddyGPT.utils import llm_util from hackingBuddyGPT.utils.cli_history import SlidingCliHistory @@ -15,6 +15,14 @@ from hackingBuddyGPT.utils.capability_manager import CapabilityManager from hackingBuddyGPT.utils.shell_root_detection import got_root + +class History(abc.ABC): + def append(self, cmd:str, result:str): + pass + + def get_text_representation(self, size:int) -> str: + pass + @dataclass class CommandStrategy(UseCase, abc.ABC): diff --git a/src/hackingBuddyGPT/usecases/agents.py b/src/hackingBuddyGPT/usecases/agents.py index 650c7db1..6c2996bb 100644 --- a/src/hackingBuddyGPT/usecases/agents.py +++ b/src/hackingBuddyGPT/usecases/agents.py @@ -5,7 +5,7 @@ from typing import Dict from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param -from hackingBuddyGPT.capabilities.capability import ( +from hackingBuddyGPT.capability import ( Capability, capabilities_to_simple_text_handler, ) diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py index 33e2a531..337b28b4 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/utils/llm_handler.py @@ -4,7 +4,7 @@ import openai from instructor.exceptions import IncompleteOutputException, InstructorRetryException -from hackingBuddyGPT.capabilities.capability import capabilities_to_action_model +from hackingBuddyGPT.capability import capabilities_to_action_model class LLMHandler: diff --git a/src/hackingBuddyGPT/utils/capability_manager.py b/src/hackingBuddyGPT/utils/capability_manager.py index da86ccf9..b3f789aa 100644 --- a/src/hackingBuddyGPT/utils/capability_manager.py +++ b/src/hackingBuddyGPT/utils/capability_manager.py @@ -2,7 +2,7 @@ from typing import Dict from hackingBuddyGPT.utils.logging import Logger -from hackingBuddyGPT.capabilities.capability import ( +from hackingBuddyGPT.capability import ( Capability, capabilities_to_simple_text_handler, ) diff --git a/src/hackingBuddyGPT/utils/openai/openai_lib.py b/src/hackingBuddyGPT/utils/openai/openai_lib.py index 64e1b366..0048caaa 100644 --- a/src/hackingBuddyGPT/utils/openai/openai_lib.py +++ b/src/hackingBuddyGPT/utils/openai/openai_lib.py @@ -18,7 +18,7 @@ from rich.console import Console from hackingBuddyGPT.capabilities import Capability -from hackingBuddyGPT.capabilities.capability import capabilities_to_tools +from hackingBuddyGPT.capability import capabilities_to_tools from hackingBuddyGPT.utils import LLM, LLMResult, configurable from hackingBuddyGPT.utils.configurable import parameter From 6b8553e3a83dfbac4a50fbbebc9588ce2adf6280 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Wed, 3 Sep 2025 10:50:29 +0200 Subject: [PATCH 2/7] move all connector infrastructure into a single directory --- src/hackingBuddyGPT/capabilities/local_shell.py | 2 +- src/hackingBuddyGPT/capabilities/psexec_run_command.py | 2 +- src/hackingBuddyGPT/capabilities/psexec_test_credential.py | 2 +- src/hackingBuddyGPT/capabilities/ssh_run_command.py | 2 +- src/hackingBuddyGPT/capabilities/ssh_test_credential.py | 2 +- src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py | 2 +- src/hackingBuddyGPT/usecases/linux_privesc.py | 3 ++- src/hackingBuddyGPT/usecases/minimal_linux_privesc.py | 3 ++- src/hackingBuddyGPT/utils/__init__.py | 2 -- .../utils/{local_shell => connectors}/local_shell.py | 0 src/hackingBuddyGPT/utils/{psexec => connectors}/psexec.py | 0 .../utils/{ssh_connection => connectors}/ssh_connection.py | 0 src/hackingBuddyGPT/utils/local_shell/__init__.py | 3 --- src/hackingBuddyGPT/utils/psexec/__init__.py | 3 --- src/hackingBuddyGPT/utils/ssh_connection/__init__.py | 3 --- 15 files changed, 10 insertions(+), 19 deletions(-) rename src/hackingBuddyGPT/utils/{local_shell => connectors}/local_shell.py (100%) rename src/hackingBuddyGPT/utils/{psexec => connectors}/psexec.py (100%) rename src/hackingBuddyGPT/utils/{ssh_connection => connectors}/ssh_connection.py (100%) delete mode 100644 src/hackingBuddyGPT/utils/local_shell/__init__.py delete mode 100644 src/hackingBuddyGPT/utils/psexec/__init__.py delete mode 100644 src/hackingBuddyGPT/utils/ssh_connection/__init__.py diff --git a/src/hackingBuddyGPT/capabilities/local_shell.py b/src/hackingBuddyGPT/capabilities/local_shell.py index 4e90754a..73d75048 100644 --- a/src/hackingBuddyGPT/capabilities/local_shell.py +++ b/src/hackingBuddyGPT/capabilities/local_shell.py @@ -3,7 +3,7 @@ from typing import Tuple from hackingBuddyGPT.capabilities import Capability -from hackingBuddyGPT.utils.local_shell import LocalShellConnection +from hackingBuddyGPT.utils.connectors.local_shell import LocalShellConnection @dataclass diff --git a/src/hackingBuddyGPT/capabilities/psexec_run_command.py b/src/hackingBuddyGPT/capabilities/psexec_run_command.py index 52d205d7..8790b7ab 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_run_command.py +++ b/src/hackingBuddyGPT/capabilities/psexec_run_command.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import Tuple -from hackingBuddyGPT.utils import PSExecConnection +from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection from ..capability import Capability diff --git a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py index a71d0b06..b7177558 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from typing import Tuple -from hackingBuddyGPT.utils import PSExecConnection +from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection from ..capability import Capability diff --git a/src/hackingBuddyGPT/capabilities/ssh_run_command.py b/src/hackingBuddyGPT/capabilities/ssh_run_command.py index 4d56c779..89c8ab7f 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_run_command.py +++ b/src/hackingBuddyGPT/capabilities/ssh_run_command.py @@ -5,7 +5,7 @@ from invoke import Responder -from hackingBuddyGPT.utils import SSHConnection +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection from hackingBuddyGPT.utils.shell_root_detection import got_root from ..capability import Capability diff --git a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py index 78001a8c..53b621e1 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py @@ -3,7 +3,7 @@ from paramiko.ssh_exception import SSHException import paramiko -from hackingBuddyGPT.utils import SSHConnection +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection from ..capability import Capability diff --git a/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py b/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py index 790e066f..0066067b 100644 --- a/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py +++ b/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py @@ -2,7 +2,7 @@ from hackingBuddyGPT.capabilities import SSHRunCommand from hackingBuddyGPT.usecases.base import UseCase, use_case -from hackingBuddyGPT.utils import SSHConnection +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection from .linux_privesc import PrivEscLinux diff --git a/src/hackingBuddyGPT/usecases/linux_privesc.py b/src/hackingBuddyGPT/usecases/linux_privesc.py index 03157067..a8af0f53 100644 --- a/src/hackingBuddyGPT/usecases/linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/linux_privesc.py @@ -7,9 +7,10 @@ from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential from hackingBuddyGPT.strategies import CommandStrategy from hackingBuddyGPT.usecases.base import use_case -from hackingBuddyGPT.utils import SSHConnection, llm_util +from hackingBuddyGPT.utils import llm_util from hackingBuddyGPT.utils.logging import log_conversation from hackingBuddyGPT.utils.rag import RagBackground +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection template_analyze = Template("""Your task is to analyze the result of an executed command to determina a way to escalate your privileges into a root shell. Describe your findings including all needed diff --git a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py index b7a4c971..cf2820c4 100644 --- a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py @@ -2,7 +2,8 @@ from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential from hackingBuddyGPT.usecases.base import use_case from hackingBuddyGPT.strategies import CommandStrategy -from hackingBuddyGPT.utils import SSHConnection, llm_util +from hackingBuddyGPT.utils import llm_util +from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection from mako.template import Template diff --git a/src/hackingBuddyGPT/utils/__init__.py b/src/hackingBuddyGPT/utils/__init__.py index 4ac36972..01bb33df 100644 --- a/src/hackingBuddyGPT/utils/__init__.py +++ b/src/hackingBuddyGPT/utils/__init__.py @@ -3,6 +3,4 @@ from .db_storage import * from .llm_util import * from .openai import * -from .psexec import * -from .ssh_connection import * from .ui import * diff --git a/src/hackingBuddyGPT/utils/local_shell/local_shell.py b/src/hackingBuddyGPT/utils/connectors/local_shell.py similarity index 100% rename from src/hackingBuddyGPT/utils/local_shell/local_shell.py rename to src/hackingBuddyGPT/utils/connectors/local_shell.py diff --git a/src/hackingBuddyGPT/utils/psexec/psexec.py b/src/hackingBuddyGPT/utils/connectors/psexec.py similarity index 100% rename from src/hackingBuddyGPT/utils/psexec/psexec.py rename to src/hackingBuddyGPT/utils/connectors/psexec.py diff --git a/src/hackingBuddyGPT/utils/ssh_connection/ssh_connection.py b/src/hackingBuddyGPT/utils/connectors/ssh_connection.py similarity index 100% rename from src/hackingBuddyGPT/utils/ssh_connection/ssh_connection.py rename to src/hackingBuddyGPT/utils/connectors/ssh_connection.py diff --git a/src/hackingBuddyGPT/utils/local_shell/__init__.py b/src/hackingBuddyGPT/utils/local_shell/__init__.py deleted file mode 100644 index 93e07699..00000000 --- a/src/hackingBuddyGPT/utils/local_shell/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .local_shell import LocalShellConnection - -__all__ = ["LocalShellConnection"] diff --git a/src/hackingBuddyGPT/utils/psexec/__init__.py b/src/hackingBuddyGPT/utils/psexec/__init__.py deleted file mode 100644 index 51a3b367..00000000 --- a/src/hackingBuddyGPT/utils/psexec/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .psexec import PSExecConnection - -__all__ = ["PSExecConnection"] diff --git a/src/hackingBuddyGPT/utils/ssh_connection/__init__.py b/src/hackingBuddyGPT/utils/ssh_connection/__init__.py deleted file mode 100644 index 25febf9a..00000000 --- a/src/hackingBuddyGPT/utils/ssh_connection/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .ssh_connection import SSHConnection - -__all__ = ["SSHConnection"] From 006269998abf3424ba58a70e955b867e00ae03d1 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Wed, 3 Sep 2025 11:11:39 +0200 Subject: [PATCH 3/7] make history-classes a bit more generic --- src/hackingBuddyGPT/strategies.py | 46 +++++++++--------------- src/hackingBuddyGPT/utils/cli_history.py | 31 ---------------- src/hackingBuddyGPT/utils/histories.py | 45 +++++++++++++++++++++++ 3 files changed, 61 insertions(+), 61 deletions(-) delete mode 100644 src/hackingBuddyGPT/utils/cli_history.py create mode 100644 src/hackingBuddyGPT/utils/histories.py diff --git a/src/hackingBuddyGPT/strategies.py b/src/hackingBuddyGPT/strategies.py index 06f03699..134905bf 100644 --- a/src/hackingBuddyGPT/strategies.py +++ b/src/hackingBuddyGPT/strategies.py @@ -9,29 +9,18 @@ from hackingBuddyGPT.capability import capabilities_to_simple_text_handler from hackingBuddyGPT.usecases.base import UseCase from hackingBuddyGPT.utils import llm_util -from hackingBuddyGPT.utils.cli_history import SlidingCliHistory +from hackingBuddyGPT.utils.histories import HistoryCmdOnly, HistoryFull, HistoryNone from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section from hackingBuddyGPT.utils.capability_manager import CapabilityManager from hackingBuddyGPT.utils.shell_root_detection import got_root -class History(abc.ABC): - def append(self, cmd:str, result:str): - pass - - def get_text_representation(self, size:int) -> str: - pass - @dataclass class CommandStrategy(UseCase, abc.ABC): _capabilities: CapabilityManager = None - _sliding_history: SlidingCliHistory = None - - _max_history_size: int = 0 - _template: Template = None _template_params = {} @@ -63,16 +52,22 @@ def init(self): self._capabilities = CapabilityManager(self.log) - self._sliding_history = SlidingCliHistory(self.llm) + if self.disable_history: + self._history = HistoryNone() + else: + if self.enable_compressed_history: + self._history = HistoryCmdOnly() + else: + self._history = HistoryFull() @log_section("Asking LLM for a new command...") def get_next_command(self) -> tuple[str, int]: - history = "" - if not self.disable_history: - if self.enable_compressed_history: - history = self._sliding_history.get_commands_and_last_output(self._max_history_size - self.get_state_size()) - else: - history = self._sliding_history.get_history(self._max_history_size - self.get_state_size()) + history = self._history.get_text_representation() + + # calculate max history size + # TODO: need to incorporate state, etc. + max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) + history = llm_util.trim_result_front(self.llm, max_history_size, history) self._template_params.update({"history": history}) cmd = self.llm.get_response(self._template, **self._template_params) @@ -113,6 +108,8 @@ def perform_round(self, turn: int) -> bool: cmds = self.postprocess_commands(cmd) for cmd in cmds: result, task_successful = self.run_command(cmd, message_id) + # store the results in our local history + self._history.append(cmd, result) # maybe move the 'got root' detection here? # TODO: also can I use llm-as-judge for that? or do I have to do this @@ -122,13 +119,6 @@ def perform_round(self, turn: int) -> bool: self.after_round(cmd, result, task_successful) - # store the results in our local history - if not self.disable_history: - if self.enable_compressed_history: - self._sliding_history.add_command_only(cmds, result) - else: - self._sliding_history.add_command(cmds, result) - # signal if we were successful in our task return task_successful @@ -140,10 +130,6 @@ def run(self, configuration): self._template_params["capabilities"] = self._capabilities.get_capability_block() - - # calculate sizes - self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) - self.before_run() got_root = False diff --git a/src/hackingBuddyGPT/utils/cli_history.py b/src/hackingBuddyGPT/utils/cli_history.py deleted file mode 100644 index 2e8f8e2c..00000000 --- a/src/hackingBuddyGPT/utils/cli_history.py +++ /dev/null @@ -1,31 +0,0 @@ -from .llm_util import LLM, trim_result_front - - -class SlidingCliHistory: - model: LLM = None - maximum_target_size: int = 0 - sliding_history: str = "" - last_output: str = '' - - def __init__(self, used_model: LLM): - self.model = used_model - self.maximum_target_size = self.model.context_size - - def add_command(self, cmd: str, output: str): - self.sliding_history += f"$ {cmd}\n{output}" - self.sliding_history = trim_result_front(self.model, self.maximum_target_size, self.sliding_history) - - def get_history(self, target_size: int) -> str: - return trim_result_front(self.model, min(self.maximum_target_size, target_size), self.sliding_history) - - def add_command_only(self, cmd: str, output: str): - self.sliding_history += f"$ {cmd}\n" - self.last_output = output - last_output_size = self.model.count_tokens(self.last_output) - if self.maximum_target_size - last_output_size < 0: - last_output_size = 0 - self.last_output = '' - self.sliding_history = trim_result_front(self.model, self.maximum_target_size - last_output_size, self.sliding_history) - - def get_commands_and_last_output(self, target_size: int) -> str: - return trim_result_front(self.model, min(self.maximum_target_size, target_size), self.sliding_history + self.last_output) \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/histories.py b/src/hackingBuddyGPT/utils/histories.py new file mode 100644 index 00000000..90c766ef --- /dev/null +++ b/src/hackingBuddyGPT/utils/histories.py @@ -0,0 +1,45 @@ +import abc + +class History(abc.ABC): + @abc.abstractmethod + def append(self, cmd:str, result:str): + pass + + @abc.abstractmethod + def get_text_representation(self) -> str: + pass + +class HistoryNone(History): + def append(self, cmd: str, result: str): + pass + + def get_text_representation(self) -> str: + return "" + +class HistoryFull(History): + + history = [] + + def __init__(self): + self.history = [] + + def append(self, cmd: str, result: str): + self.history.append((cmd, result)) + + # TODO: implement size limiter + def get_text_representation(self) -> str: + return "\n".join(f"${cmd}\n {result}\n" for cmd, result in self.history) + +class HistoryCmdOnly(History): + + history = [] + + def __init__(self): + self.history = [] + + def append(self, cmd: str, result: str): + self.history.append(cmd) + + # TODO: implement size limiter + def get_text_representation(self) -> str: + return "\n".join(f"${cmd}\n" for cmd in self.history) \ No newline at end of file From 3ee9c9e1aeb43a8b2247e4f8b101a30206edb133 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Wed, 3 Sep 2025 11:44:08 +0200 Subject: [PATCH 4/7] unify _state mgmt and add prompt max-size calculation --- src/hackingBuddyGPT/strategies.py | 12 +++--- .../usecases/call_usecase_from_usecase.py | 2 +- src/hackingBuddyGPT/usecases/linux_privesc.py | 37 +++++++++---------- src/hackingBuddyGPT/utils/histories.py | 2 - 4 files changed, 23 insertions(+), 30 deletions(-) diff --git a/src/hackingBuddyGPT/strategies.py b/src/hackingBuddyGPT/strategies.py index 134905bf..72ae2635 100644 --- a/src/hackingBuddyGPT/strategies.py +++ b/src/hackingBuddyGPT/strategies.py @@ -1,11 +1,9 @@ import abc -from dataclasses import dataclass import datetime -from typing import List, Optional import re +from dataclasses import dataclass from mako.template import Template - from hackingBuddyGPT.capability import capabilities_to_simple_text_handler from hackingBuddyGPT.usecases.base import UseCase from hackingBuddyGPT.utils import llm_util @@ -14,6 +12,7 @@ from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section from hackingBuddyGPT.utils.capability_manager import CapabilityManager from hackingBuddyGPT.utils.shell_root_detection import got_root +from typing import List, Optional @dataclass @@ -44,8 +43,8 @@ def after_run(self): def after_round(self, cmd, result, got_root): pass - def get_space_for_history(self): - pass + def get_token_overhead(self) -> int: + return 0 def init(self): super().init() @@ -65,8 +64,7 @@ def get_next_command(self) -> tuple[str, int]: history = self._history.get_text_representation() # calculate max history size - # TODO: need to incorporate state, etc. - max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) + max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) - self.get_token_overhead() history = llm_util.trim_result_front(self.llm, max_history_size, history) self._template_params.update({"history": history}) diff --git a/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py b/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py index 0066067b..b7f8a6d0 100644 --- a/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py +++ b/src/hackingBuddyGPT/usecases/call_usecase_from_usecase.py @@ -61,7 +61,7 @@ def run(self, configuration={}): return True def run_using_usecases(self, hint, turns_per_hint): - # TODO: init usecase + # init usecase linux_privesc = PrivEscLinux( conn=self.conn, enable_explanation=self.enable_explanation, diff --git a/src/hackingBuddyGPT/usecases/linux_privesc.py b/src/hackingBuddyGPT/usecases/linux_privesc.py index a8af0f53..5f63c74e 100644 --- a/src/hackingBuddyGPT/usecases/linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/linux_privesc.py @@ -107,8 +107,6 @@ class PrivEscLinux(CommandStrategy): rag_path: str = '' - _state: str = "" - _enable_rag: bool = False def init(self): @@ -123,7 +121,7 @@ def init(self): "system": "Linux", "conn": self.conn, "update_state": self.enable_update_state, - "state": self._state, + "state": '', "target_user": "root", "guidance": '', 'analysis': '', @@ -162,18 +160,20 @@ def init(self): def get_name(self) -> str: return "Strategy-based Linux Priv-Escalation" + + def get_token_overhead(self): - def get_state_size(self) -> int: - if self.enable_update_state: - return self.llm.count_tokens(self._state) - else: - return 0 + overhead = self.llm.count_tokens(self._template_params["state"]) + overhead += self.llm.count_tokens(self._template_params["guidance"]) + overhead += self.llm.count_tokens(self._template_params['analysis']) + + return overhead def after_round(self, cmd:str, result:str, got_root:bool): if self.enable_update_state: - self.update_state(cmd, result) + old_state = self._template_params['state'] self._template_params.update({ - "state": self._state + "state": self.generate_new_state(old_state, cmd, result).result }) if self.enable_explanation: @@ -206,16 +206,14 @@ def postprocess_commands(self, cmd:str) -> List[str]: return [llm_util.cmd_output_fixer(cmd)] @log_conversation("Updating fact list..", start_section=True) - def update_state(self, cmd, result): + def generate_new_state(self, old_state:str, cmd:str, result:str) -> str: # ugly, but cut down result to fit context size # don't do this linearly as this can take too long - ctx = self.llm.context_size - state_size = self.get_state_size() - target_size = ctx - llm_util.SAFETY_MARGIN - state_size + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(old_state) result = llm_util.trim_result_front(self.llm, target_size, result) - state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=self._state) - self._state = state.result + state = self.llm.get_response(template_update_state, cmd=cmd, resp=result, facts=old_state) self.log.call_response(state) + return state @log_conversation("Asking LLM for a search query...", start_section=True) def get_rag_query(self, cmd, result): @@ -228,7 +226,6 @@ def get_rag_query(self, cmd, result): self.log.call_response(result) return result - # TODO: add RAG here, use answer for generating the next prompt, include guidance here @log_conversation("Analyze its result...", start_section=True) def analyze_result(self, cmd, result): @@ -239,11 +236,11 @@ def analyze_result(self, cmd, result): relevant_document_data = self._rag_data.get_relevant_documents(queries.result) print("RELEVANT DOCUMENT DATA: " + relevant_document_data) - state_size = self.get_state_size() - target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - state_size + known_facts = self._template_params['state'] + target_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(known_facts) # ugly, but cut down result to fit context size result = llm_util.trim_result_front(self.llm, target_size, result) - answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=self._state, rag=relevant_document_data) + answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=known_facts, rag=relevant_document_data) self.log.call_response(answer) self._template_params['analysis'] = f"You also have the following analysis of the last command and its output:\n\n~~~\n{answer.result}\n~~~" \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/histories.py b/src/hackingBuddyGPT/utils/histories.py index 90c766ef..b711c693 100644 --- a/src/hackingBuddyGPT/utils/histories.py +++ b/src/hackingBuddyGPT/utils/histories.py @@ -26,7 +26,6 @@ def __init__(self): def append(self, cmd: str, result: str): self.history.append((cmd, result)) - # TODO: implement size limiter def get_text_representation(self) -> str: return "\n".join(f"${cmd}\n {result}\n" for cmd, result in self.history) @@ -40,6 +39,5 @@ def __init__(self): def append(self, cmd: str, result: str): self.history.append(cmd) - # TODO: implement size limiter def get_text_representation(self) -> str: return "\n".join(f"${cmd}\n" for cmd in self.history) \ No newline at end of file From 9e835abbd9d026131009933e4f1af265e6b8a95c Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Wed, 3 Sep 2025 14:57:45 +0200 Subject: [PATCH 5/7] move openapi parsing into utils --- .../simple_openapi_documentation.py | 50 ++++++------------- .../web_api_testing/simple_web_api_testing.py | 39 +++++++-------- .../utils/endpoint_categorizer.py | 0 .../parsing => utils/openapi}/__init__.py | 0 .../openapi}/openapi_converter.py | 0 .../openapi}/openapi_parser.py | 0 .../openapi}/yaml_assistant.py | 4 +- .../information/pentesting_information.py | 2 +- 8 files changed, 34 insertions(+), 61 deletions(-) delete mode 100644 src/hackingBuddyGPT/usecases/web_api_testing/utils/endpoint_categorizer.py rename src/hackingBuddyGPT/{usecases/web_api_testing/documentation/parsing => utils/openapi}/__init__.py (100%) rename src/hackingBuddyGPT/{usecases/web_api_testing/documentation/parsing => utils/openapi}/openapi_converter.py (100%) rename src/hackingBuddyGPT/{usecases/web_api_testing/documentation/parsing => utils/openapi}/openapi_parser.py (100%) rename src/hackingBuddyGPT/{usecases/web_api_testing/documentation/parsing => utils/openapi}/yaml_assistant.py (98%) diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py b/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py index b6114132..5e8cc21c 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/simple_openapi_documentation.py @@ -1,16 +1,14 @@ import os from dataclasses import field -from typing import Dict from rich.panel import Panel -from hackingBuddyGPT.capabilities import Capability from hackingBuddyGPT.capabilities.http_request import HTTPRequest from hackingBuddyGPT.capabilities.record_note import RecordNote -from hackingBuddyGPT.usecases.agents import Agent -from hackingBuddyGPT.usecases.base import AutonomousAgentUseCase, use_case +from hackingBuddyGPT.usecases.base import AutonomousUseCase, use_case from hackingBuddyGPT.usecases.web_api_testing.documentation.openapi_specification_handler import \ OpenAPISpecificationHandler +from hackingBuddyGPT.utils.capability_manager import CapabilityManager from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PromptContext from hackingBuddyGPT.utils.prompt_generation.prompt_engineer import PromptEngineer @@ -23,7 +21,8 @@ from hackingBuddyGPT.utils.openai.openai_lib import OpenAILib -class SimpleWebAPIDocumentation(Agent): +@use_case("Minimal implementation of a web API testing use case") +class SimpleWebAPIDocumentation(AutonomousUseCase): """ SimpleWebAPIDocumentation is an agent class for automating REST API documentation. @@ -41,10 +40,10 @@ class SimpleWebAPIDocumentation(Agent): found_all_http_methods (bool): Flag indicating whether all HTTP methods have been found. all_steps_done (bool): Flag to indicate whether the full documentation process is complete. """ - llm: OpenAILib + llm: OpenAILib = None _prompt_history: Prompt = field(default_factory=list) _context: Context = field(default_factory=lambda: {"notes": list()}) - _capabilities: Dict[str, Capability] = field(default_factory=dict) + _capabilities: CapabilityManager = None _all_http_methods_found: bool = False config_path: str = parameter( desc="Configuration file path", @@ -75,6 +74,8 @@ class SimpleWebAPIDocumentation(Agent): default="GET,POST,PUT,PATCH,DELETE", ) + def get_name(self) -> str: + return self.__class__.__name__ def init(self): """Initialize the agent with configurations, capabilities, and handlers.""" @@ -90,7 +91,11 @@ def init(self): self.categorized_endpoints = self.categorize_endpoints(self._correct_endpoints, query_params) - self._setup_capabilities() + # setup capabilities + self._capabilities.init() + self._capabilities.add_capability(HTTPRequest(self.host)) + self._capabilities.add_capability(RecordNote(self._context["notes"])) + self._prompt_context = PromptContext.DOCUMENTATION name, initial_prompt = self._setup_initial_prompt(description=description) self._initialize_handlers(config=config, description=description, token=token, name=name, @@ -150,7 +155,7 @@ def _initialize_handlers(self, config, description, token, name, initial_prompt) """ self.all_capabilities = { "http_request": HTTPRequest(self.host)} - self._llm_handler = LLMHandler(self.llm, self._capabilities, all_possible_capabilities=self.all_capabilities) + self._llm_handler = LLMHandler(self.llm, self._capabilities._capabilities, all_possible_capabilities=self.all_capabilities) self._response_handler = ResponseHandler(llm_handler=self._llm_handler, prompt_context=self._prompt_context, prompt_helper=self.prompt_helper, config=config) @@ -226,25 +231,6 @@ def categorize_endpoints(self, endpoints, query: dict): "multi-level_resource": multi_level_resource, } - - - def _setup_capabilities(self): - """ - Initializes the LLM agent's capabilities for interacting with the API. - - This sets up tool wrappers that the language model can call, such as: - - `http_request`: For performing HTTP calls against the target API. - - `record_note`: For storing observations, notes, or documentation artifacts. - - Side Effects: - - Populates `self._capabilities` with callable tools used during exploration and documentation. - """ - """Initializes agent's capabilities for API documentation.""" - self._capabilities = { - "http_request": HTTPRequest(self.host), - "record_note": RecordNote(self._context["notes"]) - } - def all_http_methods_found(self, turn: int) -> bool: """ Checks whether all expected HTTP methods (GET, POST, PUT, DELETE) have been discovered @@ -441,10 +427,4 @@ def run_documentation(self, turn: int, move_type: str) -> None: self._evaluator.finalize_documentation_metrics( file_path=self._documentation_handler.file.split(".yaml")[0] + ".txt") - self.all_http_methods_found(turn) - - -@use_case("Minimal implementation of a web API testing use case") -class SimpleWebAPIDocumentationUseCase(AutonomousAgentUseCase[SimpleWebAPIDocumentation]): - """Use case for the SimpleWebAPIDocumentation agent.""" - pass + self.all_http_methods_found(turn) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py index a9f64220..59f6120b 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py +++ b/src/hackingBuddyGPT/usecases/web_api_testing/simple_web_api_testing.py @@ -8,17 +8,16 @@ import pydantic_core from rich.panel import Panel -from hackingBuddyGPT.capabilities import Capability from hackingBuddyGPT.capabilities.http_request import HTTPRequest from hackingBuddyGPT.capabilities.parsed_information import ParsedInformation from hackingBuddyGPT.capabilities.python_test_case import PythonTestCase from hackingBuddyGPT.capabilities.record_note import RecordNote -from hackingBuddyGPT.usecases.agents import Agent -from hackingBuddyGPT.usecases.base import AutonomousAgentUseCase, use_case +from hackingBuddyGPT.usecases.base import AutonomousUseCase, use_case +from hackingBuddyGPT.utils.capability_manager import CapabilityManager from hackingBuddyGPT.utils.prompt_generation import PromptGenerationHelper from hackingBuddyGPT.utils.prompt_generation.information import PenTestingInformation from hackingBuddyGPT.utils.prompt_generation.information import PromptPurpose -from hackingBuddyGPT.usecases.web_api_testing.documentation.parsing import OpenAPISpecificationParser +from hackingBuddyGPT.utils.openapi.openapi_parser import OpenAPISpecificationParser from hackingBuddyGPT.usecases.web_api_testing.documentation.report_handler import ReportHandler from hackingBuddyGPT.utils.prompt_generation.information import PromptContext from hackingBuddyGPT.utils.prompt_generation.prompt_engineer import PromptEngineer @@ -36,8 +35,8 @@ # OpenAPI specification file path - -class SimpleWebAPITesting(Agent): +@use_case("Minimal implementation of a web API testing use case") +class SimpleWebAPITesting(AutonomousUseCase): """ SimpleWebAPITesting is an agent class for automating web API testing. @@ -53,7 +52,7 @@ class SimpleWebAPITesting(Agent): _all_test_cases_run (bool): Flag indicating if all HTTP methods have been found. """ - llm: OpenAILib + llm: OpenAILib = None host: str = parameter(desc="The host to test", default="https://jsonplaceholder.typicode.com") config_path: str = parameter( desc="Configuration file path", @@ -71,7 +70,7 @@ class SimpleWebAPITesting(Agent): ) _prompt_history: Prompt = field(default_factory=list) _context: Context = field(default_factory=lambda: {"notes": list(), "test_cases": list(), "parsed": list()}) - _capabilities: Dict[str, Capability] = field(default_factory=dict) + _capabilities: CapabilityManager = None _all_test_cases_run: bool = False def init(self): @@ -86,6 +85,9 @@ def init(self): self._setup_initial_prompt() self.last_prompt = "" + def get_name(self) -> str: + return self.__class__.__name__ + def _load_openapi_specification(self): """ Loads the OpenAPI specification from the configured file path. @@ -108,6 +110,11 @@ def _setup_environment(self): - Setting the prompt context to `PromptContext.PENTESTING`. """ self._context["host"] = self.host + + # setup capabilities + self._capabilities = CapabilityManager(self.log) + self._capabilities.add_capability(HTTPRequest(self.host)) + self._setup_capabilities() self.categorized_endpoints = self._openapi_specification_parser.categorize_endpoints(self.correct_endpoints, self.query_params) @@ -128,7 +135,7 @@ def _setup_handlers(self): If username and password are not found in the config, defaults are used. """ - self._llm_handler = LLMHandler(self.llm, self._capabilities, all_possible_capabilities=self.all_capabilities) + self._llm_handler = LLMHandler(self.llm, self._capabilities._capabilities, all_possible_capabilities=self.all_capabilities) self.prompt_helper = PromptGenerationHelper(self.host, self.description) if "username" in self.config.keys() and "password" in self.config.keys(): username = self.config.get("username") @@ -194,8 +201,6 @@ def _setup_capabilities(self) -> None: test_cases = self._context["test_cases"] self.python_test_case_capability = {"python_test_case": PythonTestCase(test_cases)} self.parse_capacity = {"parse": ParsedInformation(test_cases)} - self._capabilities = { - "http_request": HTTPRequest(self.host)} self.all_capabilities = {"python_test_case": PythonTestCase(test_cases), "parse": ParsedInformation(test_cases), "http_request": HTTPRequest(self.host), "record_note": RecordNote(notes)} @@ -539,14 +544,4 @@ def execute_response(self, response, completion): self._prompt_history.append(tool_message(self._response_handler.extract_key_elements_of_response(result), tool_call_id)) self.adjust_user(result) - return result - - -@use_case("Minimal implementation of a web API testing use case") -class SimpleWebAPITestingUseCase(AutonomousAgentUseCase[SimpleWebAPITesting]): - """ - A use case for the SimpleWebAPITesting agent, encapsulating the setup and execution - of the web API testing scenario. - """ - - pass + return result \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/utils/endpoint_categorizer.py b/src/hackingBuddyGPT/usecases/web_api_testing/utils/endpoint_categorizer.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/__init__.py b/src/hackingBuddyGPT/utils/openapi/__init__.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/__init__.py rename to src/hackingBuddyGPT/utils/openapi/__init__.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_converter.py b/src/hackingBuddyGPT/utils/openapi/openapi_converter.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_converter.py rename to src/hackingBuddyGPT/utils/openapi/openapi_converter.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_parser.py b/src/hackingBuddyGPT/utils/openapi/openapi_parser.py similarity index 100% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/openapi_parser.py rename to src/hackingBuddyGPT/utils/openapi/openapi_parser.py diff --git a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/yaml_assistant.py b/src/hackingBuddyGPT/utils/openapi/yaml_assistant.py similarity index 98% rename from src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/yaml_assistant.py rename to src/hackingBuddyGPT/utils/openapi/yaml_assistant.py index 667cf710..40f0805e 100644 --- a/src/hackingBuddyGPT/usecases/web_api_testing/documentation/parsing/yaml_assistant.py +++ b/src/hackingBuddyGPT/utils/openapi/yaml_assistant.py @@ -36,7 +36,6 @@ def run(self, recorded_note: str) -> None: The current implementation is commented out and serves as a placeholder for integrating with OpenAI's API. Uncomment and modify the code as needed. """ - """ assistant = self.client.beta.assistants.create( name="Yaml File Analysis Assistant", instructions="You are an OpenAPI specification analyst. Use your knowledge to check " @@ -86,5 +85,4 @@ def run(self, recorded_note: str) -> None: ) # The thread now has a vector store with that file in its tool resources. - print(thread.tool_resources.file_search) - """ + print(thread.tool_resources.file_search) \ No newline at end of file diff --git a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py index 6cc57eb6..39ada3c5 100644 --- a/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py +++ b/src/hackingBuddyGPT/utils/prompt_generation/information/pentesting_information.py @@ -9,7 +9,7 @@ import pandas -from hackingBuddyGPT.usecases.web_api_testing.documentation.parsing import OpenAPISpecificationParser +from hackingBuddyGPT.utils.openapi.openapi_parser import OpenAPISpecificationParser from hackingBuddyGPT.utils.prompt_generation.information.prompt_information import ( PromptPurpose, ) From 6bf8ca278bdc6cd11be51edb517a3cbef4af50a1 Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Wed, 3 Sep 2025 14:58:01 +0200 Subject: [PATCH 6/7] add note about howto simplify history configuration --- src/hackingBuddyGPT/utils/histories.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/hackingBuddyGPT/utils/histories.py b/src/hackingBuddyGPT/utils/histories.py index b711c693..90455542 100644 --- a/src/hackingBuddyGPT/utils/histories.py +++ b/src/hackingBuddyGPT/utils/histories.py @@ -1,5 +1,12 @@ import abc +# TODO: make the configuration for different histories easier +# Logger = Union[GlobalRemoteLogger, GlobalLocalLogger] +# log_param = parameter(desc="choice of logging backend", default="local_logger") +#@dataclass +#class Agent(ABC): +# log: Logger = log_param + class History(abc.ABC): @abc.abstractmethod def append(self, cmd:str, result:str): From 0c170f18cfe4987f455f49d4afa33f5290600aae Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Wed, 3 Sep 2025 15:51:15 +0200 Subject: [PATCH 7/7] generalize the got_root check --- .../capabilities/psexec_run_command.py | 5 +- .../capabilities/psexec_test_credential.py | 7 +- .../capabilities/ssh_run_command.py | 19 +-- .../capabilities/ssh_test_credential.py | 19 ++- src/hackingBuddyGPT/strategies.py | 135 ++++++++---------- src/hackingBuddyGPT/usecases/linux_privesc.py | 15 +- .../usecases/minimal_linux_privesc.py | 12 ++ 7 files changed, 101 insertions(+), 111 deletions(-) diff --git a/src/hackingBuddyGPT/capabilities/psexec_run_command.py b/src/hackingBuddyGPT/capabilities/psexec_run_command.py index 8790b7ab..30ecc6f9 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_run_command.py +++ b/src/hackingBuddyGPT/capabilities/psexec_run_command.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from typing import Tuple from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection @@ -14,5 +13,5 @@ class PSExecRunCommand(Capability): def describe(self) -> str: return "give a command to be executed on the shell and I will respond with the terminal output when running this command on the windows machine. The given command must not require user interaction. Only state the to be executed command. The command should be used for enumeration or privilege escalation." - def __call__(self, command: str) -> Tuple[str, bool]: - return self.conn.run(command)[0], False + def __call__(self, command: str) -> str: + return self.conn.run(command)[0] diff --git a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py index b7177558..30d74549 100644 --- a/src/hackingBuddyGPT/capabilities/psexec_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/psexec_test_credential.py @@ -1,6 +1,5 @@ import warnings from dataclasses import dataclass -from typing import Tuple from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection @@ -17,7 +16,7 @@ def describe(self) -> str: def get_name(self) -> str: return "test_credential" - def __call__(self, username: str, password: str) -> Tuple[str, bool]: + def __call__(self, username: str, password: str) -> str: try: test_conn = self.conn.new_with(username=username, password=password) test_conn.init() @@ -25,6 +24,6 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]: message="full credential testing is not implemented yet for psexec, we have logged in, but do not know who we are, returning True for now", stacklevel=1, ) - return "Login as root was successful\n", True + return "Login as root was successful\n" except Exception: - return "Authentication error, credentials are wrong\n", False + return "Authentication error, credentials are wrong\n" diff --git a/src/hackingBuddyGPT/capabilities/ssh_run_command.py b/src/hackingBuddyGPT/capabilities/ssh_run_command.py index 89c8ab7f..85460242 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_run_command.py +++ b/src/hackingBuddyGPT/capabilities/ssh_run_command.py @@ -1,15 +1,8 @@ -import re from dataclasses import dataclass from io import StringIO -from typing import Tuple - from invoke import Responder - +from hackingBuddyGPT.capability import Capability from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection -from hackingBuddyGPT.utils.shell_root_detection import got_root - -from ..capability import Capability - @dataclass class SSHRunCommand(Capability): @@ -22,7 +15,7 @@ def describe(self) -> str: def get_name(self): return "exec_command" - def __call__(self, command: str) -> Tuple[str, bool]: + def __call__(self, command: str) -> str: if command.startswith(self.get_name()): cmd_parts = command.split(" ", 1) if len(cmd_parts) == 1: @@ -43,15 +36,9 @@ def __call__(self, command: str) -> Tuple[str, bool]: print("TIMEOUT! Could we have become root?") out.seek(0) tmp = "" - last_line = "" for line in out.readlines(): if not line.startswith("[sudo] password for " + self.conn.username + ":"): line.replace("\r", "") - last_line = line tmp = tmp + line - # remove ansi shell codes - ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - last_line = ansi_escape.sub("", last_line) - - return tmp, got_root(self.conn.hostname, last_line) + return tmp diff --git a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py index 53b621e1..be4b9f8d 100644 --- a/src/hackingBuddyGPT/capabilities/ssh_test_credential.py +++ b/src/hackingBuddyGPT/capabilities/ssh_test_credential.py @@ -1,12 +1,9 @@ -from dataclasses import dataclass -from typing import Tuple -from paramiko.ssh_exception import SSHException import paramiko +from dataclasses import dataclass +from hackingBuddyGPT.capability import Capability from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection - -from ..capability import Capability - +from paramiko.ssh_exception import SSHException @dataclass class SSHTestCredential(Capability): @@ -18,7 +15,7 @@ def describe(self) -> str: def get_name(self): return "test_credential" - def __call__(self, username: str, password: str) -> Tuple[str, bool]: + def __call__(self, username: str, password: str) -> str: test_conn = self.conn.new_with(username=username, password=password) try: for attempt in range(10): @@ -26,7 +23,7 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]: test_conn.init() break; except paramiko.ssh_exception.AuthenticationException: - return "Authentication error, credentials are wrong\n", False + return f"Authentication error, credentials {username}:{password} are wrong\n" except SSHException as e: if attempt == 9: raise @@ -38,9 +35,9 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]: user = test_conn.run("whoami")[0].strip("\n\r ") if user == "root": - return "Login as root was successful\n", True + return f"Login as root was successful\n" else: - return "Authentication successful, but user is not root\n", False + return f"Authentication successful, but user {user} is not root\n" except paramiko.ssh_exception.AuthenticationException: - return "Authentication error, credentials are wrong\n", False + return "Authentication error, credentials are wrong\n" diff --git a/src/hackingBuddyGPT/strategies.py b/src/hackingBuddyGPT/strategies.py index 72ae2635..c1587a10 100644 --- a/src/hackingBuddyGPT/strategies.py +++ b/src/hackingBuddyGPT/strategies.py @@ -1,6 +1,5 @@ import abc import datetime -import re from dataclasses import dataclass from mako.template import Template @@ -11,8 +10,7 @@ from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section from hackingBuddyGPT.utils.capability_manager import CapabilityManager -from hackingBuddyGPT.utils.shell_root_detection import got_root -from typing import List, Optional +from typing import List @dataclass @@ -37,10 +35,7 @@ class CommandStrategy(UseCase, abc.ABC): def before_run(self): pass - def after_run(self): - pass - - def after_round(self, cmd, result, got_root): + def after_command_execution(self, cmd, result, got_root): pass def get_token_overhead(self) -> int: @@ -51,6 +46,7 @@ def init(self): self._capabilities = CapabilityManager(self.log) + # TODO: make this more beautiful by just configuring a History-Instance if self.disable_history: self._history = HistoryNone() else: @@ -58,6 +54,55 @@ def init(self): self._history = HistoryCmdOnly() else: self._history = HistoryFull() + + @log_conversation("Starting run...") + def run(self, configuration): + + self.configuration = configuration + self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) + + self._template_params["capabilities"] = self._capabilities.get_capability_block() + + self.before_run() + + task_successful = False + turn = 1 + try: + while turn <= self.max_turns and not task_successful: + with self.log.section(f"round {turn}"): + self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") + task_successful = self.perform_round(turn) + turn += 1 + except Exception: + import traceback + self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") + raise + + # write the final result to the database and console + if task_successful: + self.log.run_was_success() + else: + self.log.run_was_failure("maximum turn number reached") + return task_successful + + @log_conversation("Asking LLM for a new command(s)...") + def perform_round(self, turn: int) -> bool: + # get the next command and run it + cmd, message_id = self.get_next_command() + + cmds = self.postprocess_commands(cmd) + for cmd in cmds: + result = self.run_command(cmd, message_id) + # store the results in our local history + self._history.append(cmd, result) + + task_successful = self.check_success(cmd, result) + self.after_command_execution(cmd, result, task_successful) + if task_successful: + return True + + # signal if we were successful in our task + return False @log_section("Asking LLM for a new command...") def get_next_command(self) -> tuple[str, int]: @@ -74,84 +119,24 @@ def get_next_command(self) -> tuple[str, int]: return cmd.result, message_id @log_section("Executing that command...") - def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]: + def run_command(self, cmd, message_id) -> str: _capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability) start_time = datetime.datetime.now() success, *output = parser(cmd) if not success: self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0) - return output[0], False + return output[0] assert len(output) == 1 - capability, cmd, (result, got_root) = output[0] + capability, cmd, result = output[0] duration = datetime.datetime.now() - start_time self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration) - return result, got_root - - def check_success(self, cmd, result) -> bool: - ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") - last_line = result.split("\n")[-1] if result else "" - last_line = ansi_escape.sub("", last_line) - return got_root(self.conn.hostname, last_line) + return result + + @abc.abstractmethod + def check_success(self, cmd:str, result:str) -> bool: + return False def postprocess_commands(self, cmd:str) -> List[str]: return [cmd] - - @log_conversation("Asking LLM for a new command...") - def perform_round(self, turn: int) -> bool: - # get the next command and run it - cmd, message_id = self.get_next_command() - - cmds = self.postprocess_commands(cmd) - for cmd in cmds: - result, task_successful = self.run_command(cmd, message_id) - # store the results in our local history - self._history.append(cmd, result) - - # maybe move the 'got root' detection here? - # TODO: also can I use llm-as-judge for that? or do I have to do this - # on a per-action base (maybe add a .task_successful(cmd, result, options) -> boolean to the action? - task_successful2 = self.check_success(cmd, result) - assert(task_successful == task_successful2) - - self.after_round(cmd, result, task_successful) - - # signal if we were successful in our task - return task_successful - - @log_conversation("Starting run...") - def run(self, configuration): - - self.configuration = configuration - self.log.start_run(self.get_name(), self.serialize_configuration(configuration)) - - self._template_params["capabilities"] = self._capabilities.get_capability_block() - - self.before_run() - - got_root = False - - turn = 1 - try: - while turn <= self.max_turns and not got_root: - with self.log.section(f"round {turn}"): - self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}") - - got_root = self.perform_round(turn) - - turn += 1 - - self.after_run() - - # write the final result to the database and console - if got_root: - self.log.run_was_success() - else: - self.log.run_was_failure("maximum turn number reached") - - return got_root - except Exception: - import traceback - self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}") - raise \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/linux_privesc.py b/src/hackingBuddyGPT/usecases/linux_privesc.py index 5f63c74e..994a5646 100644 --- a/src/hackingBuddyGPT/usecases/linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/linux_privesc.py @@ -11,6 +11,7 @@ from hackingBuddyGPT.utils.logging import log_conversation from hackingBuddyGPT.utils.rag import RagBackground from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection +from hackingBuddyGPT.utils.shell_root_detection import got_root template_analyze = Template("""Your task is to analyze the result of an executed command to determina a way to escalate your privileges into a root shell. Describe your findings including all needed @@ -169,7 +170,7 @@ def get_token_overhead(self): return overhead - def after_round(self, cmd:str, result:str, got_root:bool): + def after_command_execution(self, cmd:str, result:str, got_root:bool): if self.enable_update_state: old_state = self._template_params['state'] self._template_params.update({ @@ -243,4 +244,14 @@ def analyze_result(self, cmd, result): result = llm_util.trim_result_front(self.llm, target_size, result) answer = self.llm.get_response(template_analyze, cmd=cmd, resp=result, facts=known_facts, rag=relevant_document_data) self.log.call_response(answer) - self._template_params['analysis'] = f"You also have the following analysis of the last command and its output:\n\n~~~\n{answer.result}\n~~~" \ No newline at end of file + self._template_params['analysis'] = f"You also have the following analysis of the last command and its output:\n\n~~~\n{answer.result}\n~~~" + + + def check_success(self, cmd:str, result:str) -> bool: + if cmd.startswith("test_credential"): + return result == "Login as root was successful\n" + + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + last_line = result.split("\n")[-1] if result else "" + last_line = ansi_escape.sub("", last_line) + return got_root(self.conn.hostname, last_line) \ No newline at end of file diff --git a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py index cf2820c4..70ff27b5 100644 --- a/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py +++ b/src/hackingBuddyGPT/usecases/minimal_linux_privesc.py @@ -1,3 +1,4 @@ +import re from typing import List from hackingBuddyGPT.capabilities import SSHRunCommand, SSHTestCredential from hackingBuddyGPT.usecases.base import use_case @@ -7,6 +8,8 @@ from mako.template import Template +from hackingBuddyGPT.utils.shell_root_detection import got_root + TEMPLATE = """ You are a low-privilege user ${conn.username} with password ${conn.password} on a ${system} system and your goal is to become the user '${target_user}' by abusing @@ -50,3 +53,12 @@ def postprocess_commands(self, cmd:str) -> List[str]: def get_name(self) -> str: return self.__class__.__name__ + + def check_success(self, cmd:str, result:str) -> bool: + if cmd.startswith("test_credential"): + return result == "Login as root was successful\n" + + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + last_line = result.split("\n")[-1] if result else "" + last_line = ansi_escape.sub("", last_line) + return got_root(self.conn.hostname, last_line) \ No newline at end of file