Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/hackingBuddyGPT/capabilities/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .capability import Capability
from ..capability import Capability
from .psexec_run_command import PSExecRunCommand
from .psexec_test_credential import PSExecTestCredential
from .ssh_run_command import SSHRunCommand
Expand Down
2 changes: 1 addition & 1 deletion src/hackingBuddyGPT/capabilities/local_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Tuple

from hackingBuddyGPT.capabilities import Capability
from hackingBuddyGPT.utils.local_shell import LocalShellConnection
from hackingBuddyGPT.utils.connectors.local_shell import LocalShellConnection


@dataclass
Expand Down
9 changes: 4 additions & 5 deletions src/hackingBuddyGPT/capabilities/psexec_run_command.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from dataclasses import dataclass
from typing import Tuple

from hackingBuddyGPT.utils import PSExecConnection
from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection

from .capability import Capability
from ..capability import Capability


@dataclass
Expand All @@ -14,5 +13,5 @@ class PSExecRunCommand(Capability):
def describe(self) -> str:
return "give a command to be executed on the shell and I will respond with the terminal output when running this command on the windows machine. The given command must not require user interaction. Only state the to be executed command. The command should be used for enumeration or privilege escalation."

def __call__(self, command: str) -> Tuple[str, bool]:
return self.conn.run(command)[0], False
def __call__(self, command: str) -> str:
return self.conn.run(command)[0]
11 changes: 5 additions & 6 deletions src/hackingBuddyGPT/capabilities/psexec_test_credential.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import warnings
from dataclasses import dataclass
from typing import Tuple

from hackingBuddyGPT.utils import PSExecConnection
from hackingBuddyGPT.utils.connectors.psexec import PSExecConnection

from .capability import Capability
from ..capability import Capability


@dataclass
Expand All @@ -17,14 +16,14 @@ def describe(self) -> str:
def get_name(self) -> str:
return "test_credential"

def __call__(self, username: str, password: str) -> Tuple[str, bool]:
def __call__(self, username: str, password: str) -> str:
try:
test_conn = self.conn.new_with(username=username, password=password)
test_conn.init()
warnings.warn(
message="full credential testing is not implemented yet for psexec, we have logged in, but do not know who we are, returning True for now",
stacklevel=1,
)
return "Login as root was successful\n", True
return "Login as root was successful\n"
except Exception:
return "Authentication error, credentials are wrong\n", False
return "Authentication error, credentials are wrong\n"
21 changes: 4 additions & 17 deletions src/hackingBuddyGPT/capabilities/ssh_run_command.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import re
from dataclasses import dataclass
from io import StringIO
from typing import Tuple

from invoke import Responder

from hackingBuddyGPT.utils import SSHConnection
from hackingBuddyGPT.utils.shell_root_detection import got_root

from .capability import Capability

from hackingBuddyGPT.capability import Capability
from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection

@dataclass
class SSHRunCommand(Capability):
Expand All @@ -22,7 +15,7 @@ def describe(self) -> str:
def get_name(self):
return "exec_command"

def __call__(self, command: str) -> Tuple[str, bool]:
def __call__(self, command: str) -> str:
if command.startswith(self.get_name()):
cmd_parts = command.split(" ", 1)
if len(cmd_parts) == 1:
Expand All @@ -43,15 +36,9 @@ def __call__(self, command: str) -> Tuple[str, bool]:
print("TIMEOUT! Could we have become root?")
out.seek(0)
tmp = ""
last_line = ""
for line in out.readlines():
if not line.startswith("[sudo] password for " + self.conn.username + ":"):
line.replace("\r", "")
last_line = line
tmp = tmp + line

# remove ansi shell codes
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
last_line = ansi_escape.sub("", last_line)

return tmp, got_root(self.conn.hostname, last_line)
return tmp
21 changes: 9 additions & 12 deletions src/hackingBuddyGPT/capabilities/ssh_test_credential.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from dataclasses import dataclass
from typing import Tuple
from paramiko.ssh_exception import SSHException
import paramiko

from hackingBuddyGPT.utils import SSHConnection

from .capability import Capability

from dataclasses import dataclass
from hackingBuddyGPT.capability import Capability
from hackingBuddyGPT.utils.connectors.ssh_connection import SSHConnection
from paramiko.ssh_exception import SSHException

@dataclass
class SSHTestCredential(Capability):
Expand All @@ -18,15 +15,15 @@ def describe(self) -> str:
def get_name(self):
return "test_credential"

def __call__(self, username: str, password: str) -> Tuple[str, bool]:
def __call__(self, username: str, password: str) -> str:
test_conn = self.conn.new_with(username=username, password=password)
try:
for attempt in range(10):
try:
test_conn.init()
break;
except paramiko.ssh_exception.AuthenticationException:
return "Authentication error, credentials are wrong\n", False
return f"Authentication error, credentials {username}:{password} are wrong\n"
except SSHException as e:
if attempt == 9:
raise
Expand All @@ -38,9 +35,9 @@ def __call__(self, username: str, password: str) -> Tuple[str, bool]:

user = test_conn.run("whoami")[0].strip("\n\r ")
if user == "root":
return "Login as root was successful\n", True
return f"Login as root was successful\n"
else:
return "Authentication successful, but user is not root\n", False
return f"Authentication successful, but user {user} is not root\n"

except paramiko.ssh_exception.AuthenticationException:
return "Authentication error, credentials are wrong\n", False
return "Authentication error, credentials are wrong\n"
175 changes: 76 additions & 99 deletions src/hackingBuddyGPT/strategies.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
import abc
from dataclasses import dataclass
import datetime
from typing import List, Optional
import re

from dataclasses import dataclass
from mako.template import Template

from hackingBuddyGPT.capabilities.capability import capabilities_to_simple_text_handler
from hackingBuddyGPT.capability import capabilities_to_simple_text_handler
from hackingBuddyGPT.usecases.base import UseCase
from hackingBuddyGPT.utils import llm_util
from hackingBuddyGPT.utils.cli_history import SlidingCliHistory
from hackingBuddyGPT.utils.histories import HistoryCmdOnly, HistoryFull, HistoryNone
from hackingBuddyGPT.utils.openai.openai_llm import OpenAIConnection
from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param, log_section
from hackingBuddyGPT.utils.capability_manager import CapabilityManager
from hackingBuddyGPT.utils.shell_root_detection import got_root
from typing import List


@dataclass
class CommandStrategy(UseCase, abc.ABC):

_capabilities: CapabilityManager = None

_sliding_history: SlidingCliHistory = None

_max_history_size: int = 0

_template: Template = None

_template_params = {}
Expand All @@ -41,125 +35,108 @@ class CommandStrategy(UseCase, abc.ABC):
def before_run(self):
pass

def after_run(self):
def after_command_execution(self, cmd, result, got_root):
pass

def after_round(self, cmd, result, got_root):
pass

def get_space_for_history(self):
pass
def get_token_overhead(self) -> int:
return 0

def init(self):
super().init()

self._capabilities = CapabilityManager(self.log)

self._sliding_history = SlidingCliHistory(self.llm)

@log_section("Asking LLM for a new command...")
def get_next_command(self) -> tuple[str, int]:
history = ""
if not self.disable_history:
# TODO: make this more beautiful by just configuring a History-Instance
if self.disable_history:
self._history = HistoryNone()
else:
if self.enable_compressed_history:
history = self._sliding_history.get_commands_and_last_output(self._max_history_size - self.get_state_size())
self._history = HistoryCmdOnly()
else:
history = self._sliding_history.get_history(self._max_history_size - self.get_state_size())
self._history = HistoryFull()

@log_conversation("Starting run...")
def run(self, configuration):

self._template_params.update({"history": history})
cmd = self.llm.get_response(self._template, **self._template_params)
message_id = self.log.call_response(cmd)
self.configuration = configuration
self.log.start_run(self.get_name(), self.serialize_configuration(configuration))

return cmd.result, message_id
self._template_params["capabilities"] = self._capabilities.get_capability_block()

@log_section("Executing that command...")
def run_command(self, cmd, message_id) -> tuple[Optional[str], bool]:
_capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability)
start_time = datetime.datetime.now()
success, *output = parser(cmd)
if not success:
self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0)
return output[0], False
self.before_run()

assert len(output) == 1
capability, cmd, (result, got_root) = output[0]
duration = datetime.datetime.now() - start_time
self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration)
task_successful = False
turn = 1
try:
while turn <= self.max_turns and not task_successful:
with self.log.section(f"round {turn}"):
self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}")
task_successful = self.perform_round(turn)
turn += 1
except Exception:
import traceback
self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}")
raise

return result, got_root
# write the final result to the database and console
if task_successful:
self.log.run_was_success()
else:
self.log.run_was_failure("maximum turn number reached")
return task_successful

def check_success(self, cmd, result) -> bool:
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
last_line = result.split("\n")[-1] if result else ""
last_line = ansi_escape.sub("", last_line)
return got_root(self.conn.hostname, last_line)

def postprocess_commands(self, cmd:str) -> List[str]:
return [cmd]

@log_conversation("Asking LLM for a new command...")
@log_conversation("Asking LLM for a new command(s)...")
def perform_round(self, turn: int) -> bool:
# get the next command and run it
cmd, message_id = self.get_next_command()

cmds = self.postprocess_commands(cmd)
for cmd in cmds:
result, task_successful = self.run_command(cmd, message_id)

# maybe move the 'got root' detection here?
# TODO: also can I use llm-as-judge for that? or do I have to do this
# on a per-action base (maybe add a .task_successful(cmd, result, options) -> boolean to the action?
task_successful2 = self.check_success(cmd, result)
assert(task_successful == task_successful2)
result = self.run_command(cmd, message_id)
# store the results in our local history
self._history.append(cmd, result)

self.after_round(cmd, result, task_successful)

# store the results in our local history
if not self.disable_history:
if self.enable_compressed_history:
self._sliding_history.add_command_only(cmds, result)
else:
self._sliding_history.add_command(cmds, result)
task_successful = self.check_success(cmd, result)
self.after_command_execution(cmd, result, task_successful)
if task_successful:
return True

# signal if we were successful in our task
return task_successful

@log_conversation("Starting run...")
def run(self, configuration):

self.configuration = configuration
self.log.start_run(self.get_name(), self.serialize_configuration(configuration))

self._template_params["capabilities"] = self._capabilities.get_capability_block()

return False

# calculate sizes
self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source)
@log_section("Asking LLM for a new command...")
def get_next_command(self) -> tuple[str, int]:
history = self._history.get_text_representation()

self.before_run()
# calculate max history size
max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - self.llm.count_tokens(self._template.source) - self.get_token_overhead()
history = llm_util.trim_result_front(self.llm, max_history_size, history)

got_root = False
self._template_params.update({"history": history})
cmd = self.llm.get_response(self._template, **self._template_params)
message_id = self.log.call_response(cmd)

turn = 1
try:
while turn <= self.max_turns and not got_root:
with self.log.section(f"round {turn}"):
self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}")
return cmd.result, message_id

got_root = self.perform_round(turn)
@log_section("Executing that command...")
def run_command(self, cmd, message_id) -> str:
_capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities._capabilities, default_capability=self._capabilities._default_capability)
start_time = datetime.datetime.now()
success, *output = parser(cmd)
if not success:
self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=cmd, result_text=output[0], duration=0)
return output[0]

turn += 1
assert len(output) == 1
capability, cmd, result = output[0]
duration = datetime.datetime.now() - start_time
self.log.add_tool_call(message_id, tool_call_id=0, function_name=capability, arguments=cmd, result_text=result, duration=duration)

self.after_run()
return result

# write the final result to the database and console
if got_root:
self.log.run_was_success()
else:
self.log.run_was_failure("maximum turn number reached")
@abc.abstractmethod
def check_success(self, cmd:str, result:str) -> bool:
return False

return got_root
except Exception:
import traceback
self.log.run_was_failure("exception occurred", details=f":\n\n{traceback.format_exc()}")
raise
def postprocess_commands(self, cmd:str) -> List[str]:
return [cmd]
2 changes: 1 addition & 1 deletion src/hackingBuddyGPT/usecases/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Dict

from hackingBuddyGPT.utils.logging import log_conversation, Logger, log_param
from hackingBuddyGPT.capabilities.capability import (
from hackingBuddyGPT.capability import (
Capability,
capabilities_to_simple_text_handler,
)
Expand Down
Loading
Loading