diff --git a/examples/01_standalone_sdk/32_configurable_security_policy.py b/examples/01_standalone_sdk/32_configurable_security_policy.py new file mode 100644 index 0000000000..9433ad3e3b --- /dev/null +++ b/examples/01_standalone_sdk/32_configurable_security_policy.py @@ -0,0 +1,156 @@ +"""OpenHands Agent SDK — Configurable Security Policy Example + +This example demonstrates how to use a custom security policy template +with an agent. Security policies define risk assessment guidelines that +help agents evaluate the safety of their actions. + +By default, agents use the built-in security_policy.j2 template. This +example shows how to: +1. Use the default security policy +2. Provide a custom security policy template embedded in the script +3. Apply the custom policy to guide agent behavior +""" + +import os +import tempfile +from pathlib import Path + +from pydantic import SecretStr + +from openhands.sdk import ( + LLM, + Agent, + Conversation, + Event, + LLMConvertibleEvent, + get_logger, +) +from openhands.sdk.tool import Tool +from openhands.tools.file_editor import FileEditorTool +from openhands.tools.terminal import TerminalTool + + +logger = get_logger(__name__) + +# Define a custom security policy template inline +CUSTOM_SECURITY_POLICY = ( + "# 🔐 Custom Security Risk Policy\n" + "When using tools that support the security_risk parameter, assess the " + "safety risk of your actions:\n" + "\n" + "- **LOW**: Safe read-only actions.\n" + " - Viewing files, calculations, documentation.\n" + "- **MEDIUM**: Moderate container-scoped actions.\n" + " - File modifications, package installations.\n" + "- **HIGH**: Potentially dangerous actions.\n" + " - Network access, system modifications, data exfiltration.\n" + "\n" + "**Custom Rules**\n" + "- Always prioritize user data safety.\n" + "- Escalate to **HIGH** for any external data transmission.\n" +) + +# Configure LLM +api_key = os.getenv("LLM_API_KEY") +assert api_key is not None, "LLM_API_KEY environment variable is not set." +model = os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929") +base_url = os.getenv("LLM_BASE_URL") +llm = LLM( + usage_id="agent", + model=model, + base_url=base_url, + api_key=SecretStr(api_key), +) + +# Tools +cwd = os.getcwd() +tools = [ + Tool(name=TerminalTool.name), + Tool(name=FileEditorTool.name), +] + +# Example 1: Agent with default security policy +print("=" * 100) +print("Example 1: Agent with default security policy") +print("=" * 100) +default_agent = Agent(llm=llm, tools=tools) +print(f"Security policy filename: {default_agent.security_policy_filename}") +print("\nDefault security policy is embedded in the agent's system message.") + +# Example 2: Agent with custom security policy +print("\n" + "=" * 100) +print("Example 2: Agent with custom security policy") +print("=" * 100) + +# Create a temporary file for the custom security policy +with tempfile.NamedTemporaryFile( + mode="w", suffix=".j2", delete=False, encoding="utf-8" +) as temp_file: + temp_file.write(CUSTOM_SECURITY_POLICY) + custom_policy_path = temp_file.name + +try: + # Create agent with custom security policy (using absolute path) + custom_agent = Agent( + llm=llm, + tools=tools, + security_policy_filename=custom_policy_path, + ) + print(f"Security policy filename: {custom_agent.security_policy_filename}") + print("\nCustom security policy loaded from temporary file.") + + # Verify the custom policy is in the system message + system_message = custom_agent.system_message + if "Custom Security Risk Policy" in system_message: + print("✓ Custom security policy successfully embedded in system message.") + else: + print("✗ Custom security policy not found in system message.") + + # Run a conversation with the custom agent + print("\n" + "=" * 100) + print("Running conversation with custom security policy") + print("=" * 100) + + llm_messages = [] # collect raw LLM messages + + def conversation_callback(event: Event): + if isinstance(event, LLMConvertibleEvent): + llm_messages.append(event.to_llm_message()) + + conversation = Conversation( + agent=custom_agent, + callbacks=[conversation_callback], + workspace=".", + ) + + conversation.send_message( + "Please create a simple Python script named hello.py that prints " + "'Hello, World!'. Make sure to follow security best practices." + ) + conversation.run() + + print("\n" + "=" * 100) + print("Conversation finished.") + print(f"Total LLM messages: {len(llm_messages)}") + print("=" * 100) + + # Report cost + cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost + print(f"EXAMPLE_COST: {cost}") + +finally: + # Clean up temporary file + Path(custom_policy_path).unlink(missing_ok=True) + +print("\n" + "=" * 100) +print("Example Summary") +print("=" * 100) +print("This example demonstrated:") +print("1. Using the default security policy (security_policy.j2)") +print("2. Creating a custom security policy template") +print("3. Applying the custom policy via security_policy_filename parameter") +print("4. Running a conversation with the custom security policy") +print( + "\nYou can customize security policies to match your organization's " + "specific requirements." +) diff --git a/openhands-sdk/openhands/sdk/agent/base.py b/openhands-sdk/openhands/sdk/agent/base.py index 072cd3c1b5..6cc47ab197 100644 --- a/openhands-sdk/openhands/sdk/agent/base.py +++ b/openhands-sdk/openhands/sdk/agent/base.py @@ -121,6 +121,15 @@ class AgentBase(DiscriminatedUnionMixin, ABC): "- An absolute path (e.g., '/path/to/custom_prompt.j2')" ), ) + security_policy_filename: str = Field( + default="security_policy.j2", + description=( + "Security policy template filename. Can be either:\n" + "- A relative filename (e.g., 'security_policy.j2') loaded from the " + "agent's prompts directory\n" + "- An absolute path (e.g., '/path/to/custom_security_policy.j2')" + ), + ) system_prompt_kwargs: dict[str, object] = Field( default_factory=dict, description="Optional kwargs to pass to the system prompt Jinja2 template.", @@ -165,6 +174,8 @@ def name(self) -> str: def system_message(self) -> str: """Compute system message on-demand to maintain statelessness.""" template_kwargs = dict(self.system_prompt_kwargs) + # Add security_policy_filename to template kwargs + template_kwargs["security_policy_filename"] = self.security_policy_filename template_kwargs.setdefault("model_name", self.llm.model) if ( "model_family" not in template_kwargs diff --git a/openhands-sdk/openhands/sdk/agent/prompts/system_prompt.j2 b/openhands-sdk/openhands/sdk/agent/prompts/system_prompt.j2 index 04689c9981..a79a9ba2ed 100644 --- a/openhands-sdk/openhands/sdk/agent/prompts/system_prompt.j2 +++ b/openhands-sdk/openhands/sdk/agent/prompts/system_prompt.j2 @@ -74,7 +74,7 @@ You are OpenHands agent, a helpful AI assistant that can interact with a compute -{% include 'security_policy.j2' %} +{% include security_policy_filename %} {% if llm_security_analyzer %} diff --git a/openhands-sdk/openhands/sdk/context/prompts/prompt.py b/openhands-sdk/openhands/sdk/context/prompts/prompt.py index 10acfb1fba..d255cd3354 100644 --- a/openhands-sdk/openhands/sdk/context/prompts/prompt.py +++ b/openhands-sdk/openhands/sdk/context/prompts/prompt.py @@ -4,7 +4,45 @@ import sys from functools import lru_cache -from jinja2 import Environment, FileSystemBytecodeCache, FileSystemLoader, Template +from jinja2 import ( + BaseLoader, + Environment, + FileSystemBytecodeCache, + Template, + TemplateNotFound, +) + + +class FlexibleFileSystemLoader(BaseLoader): + """A Jinja2 loader that supports both relative paths (within a base directory) + and absolute paths anywhere on the filesystem. + """ + + def __init__(self, searchpath: str): + self.searchpath = os.path.abspath(searchpath) + + def get_source(self, environment, template): # noqa: ARG002 + # If template is an absolute path, use it directly + if os.path.isabs(template): + path = template + else: + # Otherwise, look for it in the searchpath + path = os.path.join(self.searchpath, template) + + if not os.path.exists(path): + raise TemplateNotFound(template) + + mtime = os.path.getmtime(path) + with open(path, encoding="utf-8") as f: + source = f.read() + + def uptodate(): + try: + return os.path.getmtime(path) == mtime + except OSError: + return False + + return source, path, uptodate def refine(text: str) -> str: @@ -27,7 +65,7 @@ def _get_env(prompt_dir: str) -> Environment: os.makedirs(cache_folder, exist_ok=True) bcc = FileSystemBytecodeCache(directory=cache_folder) env = Environment( - loader=FileSystemLoader(prompt_dir), + loader=FlexibleFileSystemLoader(prompt_dir), bytecode_cache=bcc, autoescape=False, ) diff --git a/tests/sdk/agent/test_security_policy_integration.py b/tests/sdk/agent/test_security_policy_integration.py index d4c905fe7e..bbe3cee72f 100644 --- a/tests/sdk/agent/test_security_policy_integration.py +++ b/tests/sdk/agent/test_security_policy_integration.py @@ -1,5 +1,8 @@ -"""Test that the security policy is properly integrated into the agent system prompt.""" +"""Test configurable security policy functionality.""" +import shutil +import tempfile +from pathlib import Path from unittest.mock import patch from litellm import ChatCompletionMessageToolCall @@ -18,8 +21,7 @@ def test_security_policy_in_system_message(): - """Test that the security policy is included in the agent's system message.""" - # Create a minimal agent configuration + """Test that security policy is included in system message.""" agent = Agent( llm=LLM( usage_id="test-llm", @@ -28,11 +30,9 @@ def test_security_policy_in_system_message(): base_url="http://test", ) ) - - # Get the system message system_message = agent.system_message - # Verify that the security policy content is included + # Verify that security policy section is present assert "🔐 Security Policy" in system_message assert "OK to do without Explicit User Consent" in system_message assert "Do only with Explicit User Consent" in system_message @@ -58,6 +58,67 @@ def test_security_policy_in_system_message(): assert "Use APIs to work with GitHub or other platforms" in system_message +def test_custom_security_policy_in_system_message(): + """Test that custom security policy filename is used in system message.""" + # Create a temporary directory for test files + with tempfile.TemporaryDirectory() as temp_dir: + # Create a custom policy file with distinctive content + custom_policy_path = Path(temp_dir) / "custom_policy.j2" + custom_policy_content = ( + "# 🔐 Custom Test Security Policy\n" + "This is a custom security policy for testing.\n" + "- **CUSTOM_RULE**: Always test custom policies." + ) + custom_policy_path.write_text(custom_policy_content) + + # Copy required template files to temp directory + original_prompt_dir = ( + Path(__file__).parent.parent.parent.parent + / "openhands-sdk" + / "openhands" + / "sdk" + / "agent" + / "prompts" + ) + + # Copy system_prompt.j2 + system_prompt_path = Path(temp_dir) / "system_prompt.j2" + original_system_prompt = original_prompt_dir / "system_prompt.j2" + shutil.copy2(original_system_prompt, system_prompt_path) + + # Copy security_risk_assessment.j2 + security_risk_assessment_path = Path(temp_dir) / "security_risk_assessment.j2" + original_security_risk_assessment = ( + original_prompt_dir / "security_risk_assessment.j2" + ) + shutil.copy2(original_security_risk_assessment, security_risk_assessment_path) + + # Copy self_documentation.j2 + self_documentation_path = Path(temp_dir) / "self_documentation.j2" + original_self_documentation = original_prompt_dir / "self_documentation.j2" + shutil.copy2(original_self_documentation, self_documentation_path) + + # Create agent with custom security policy using absolute paths for both + agent = Agent( + llm=LLM( + usage_id="test-llm", + model="test-model", + api_key=SecretStr("test-key"), + base_url="http://test", + ), + system_prompt_filename=str(system_prompt_path), + security_policy_filename=str(custom_policy_path), + ) + + # Get system message - this should include our custom policy + system_message = agent.system_message + + # Verify that custom policy content appears in system message + assert "Custom Test Security Policy" in system_message + assert "CUSTOM_RULE" in system_message + assert "Always test custom policies" in system_message + + def test_security_policy_template_rendering(): """Test that the security policy template renders correctly.""" diff --git a/tests/sdk/context/test_prompt_absolute_path.py b/tests/sdk/context/test_prompt_absolute_path.py index c2a6ca776a..7f39c0c13d 100644 --- a/tests/sdk/context/test_prompt_absolute_path.py +++ b/tests/sdk/context/test_prompt_absolute_path.py @@ -25,6 +25,7 @@ def test_render_template_with_relative_path(): prompt_dir=agent_prompts_dir, template_name="system_prompt.j2", cli_mode=False, + security_policy_filename="security_policy.j2", ) # Verify result is a non-empty string