diff --git a/contributing/samples/agent_governance/README.md b/contributing/samples/agent_governance/README.md new file mode 100644 index 0000000..22fef09 --- /dev/null +++ b/contributing/samples/agent_governance/README.md @@ -0,0 +1,80 @@ +# Agent Governance Toolkit plugin for Google ADK + +An ADK plugin that enforces policy-as-code rules before tool execution using the +[Agent Governance Toolkit](https://github.com/microsoft/agent-governance-toolkit) +(MIT licensed). + +## Install + +```bash +pip install google-adk-community agentmesh-platform +``` + +## Usage + +```python +from pathlib import Path +from google.adk.agents import Agent +from google.adk.runners import Runner +from google.adk_community.plugins import AgentGovernancePlugin + +plugin = AgentGovernancePlugin( + policy_dir=Path(__file__).parent / "policies", + agent_did="did:mesh:my-agent", +) + +agent = Agent( + name="governed-agent", + model="gemini-2.0-flash", + tools=[my_tool], +) + +runner = Runner(agent=agent, plugins=[plugin], app_name="my-app") +``` + +## Policy example (`policies/default.yaml`) + +```yaml +apiVersion: governance.toolkit/v1 +name: adk-agent-policy +rules: + - name: block-dangerous-tools + condition: "action in ['shell_exec', 'file_delete']" + action: deny + - name: rate-limit-api-calls + condition: "action == 'api_call'" + action: allow + limit: "100/hour" +default_action: allow +``` + +## How it works + +The plugin extends `google.adk.plugins.BasePlugin` and implements +`before_tool_callback`. When a tool call is denied by policy, the callback +returns a dict response that short-circuits execution (per the ADK plugin +contract). Allowed calls return `None`, letting the tool proceed normally. + +## Fail-closed by default + +If `agentmesh-platform` is not installed, the plugin raises `ImportError` +at construction time. Pass `fail_open=True` to degrade gracefully instead +(all calls pass through with a logged warning). + +## Strict mode + +By default, the plugin skips policy files that fail to parse and logs a +warning. Pass `strict=True` to raise a `RuntimeError` instead, which is +recommended when every policy file is security-critical: + +```python +plugin = AgentGovernancePlugin( + policy_dir=Path(__file__).parent / "policies", + strict=True, # abort if any policy fails to load +) +``` + +## Links + +- [Agent Governance Toolkit](https://github.com/microsoft/agent-governance-toolkit) +- [ADK Plugin docs](https://google.github.io/adk-docs/plugins/) diff --git a/contributing/samples/agent_governance/main.py b/contributing/samples/agent_governance/main.py new file mode 100644 index 0000000..68f0688 --- /dev/null +++ b/contributing/samples/agent_governance/main.py @@ -0,0 +1,48 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Example: Google ADK agent with Agent Governance Toolkit policy enforcement. + +Demonstrates: +1. Loading YAML governance policies +2. Evaluating policies before tool calls via the ADK plugin lifecycle +3. Producing tamper-evident audit trails +""" + +from pathlib import Path + +from google.adk.agents import Agent +from google.adk.runners import Runner +from google.adk_community.plugins import AgentGovernancePlugin + + +def create_governed_runner() -> Runner: + """Create an ADK runner with governance controls.""" + plugin = AgentGovernancePlugin( + policy_dir=Path(__file__).parent / "policies", + agent_did="did:mesh:adk-demo-agent", + ) + + agent = Agent( + name="governed-research-agent", + model="gemini-2.0-flash", + instruction="You are a research assistant with governance controls.", + ) + + return Runner(agent=agent, plugins=[plugin], app_name="governed-demo") + + +if __name__ == "__main__": + runner = create_governed_runner() + print(f"Runner created with governance plugin enabled.") diff --git a/contributing/samples/agent_governance/policies/default.yaml b/contributing/samples/agent_governance/policies/default.yaml new file mode 100644 index 0000000..7ef6ea3 --- /dev/null +++ b/contributing/samples/agent_governance/policies/default.yaml @@ -0,0 +1,25 @@ +apiVersion: governance.toolkit/v1 +name: adk-demo-policy +description: Example governance policy for Google ADK agents +rules: + - name: block-shell-execution + condition: "action in ['shell_exec', 'code_exec', 'file_delete']" + action: deny + description: Block dangerous system-level tool calls + priority: 100 + + - name: rate-limit-api-calls + condition: "action == 'api_call'" + action: allow + limit: "100/hour" + description: Rate limit external API calls + priority: 50 + + - name: require-approval-for-payments + condition: "action == 'process_payment'" + action: require_approval + approvers: ["admin@example.com"] + description: Payment actions require human approval + priority: 90 + +default_action: allow diff --git a/src/google/adk_community/plugins/__init__.py b/src/google/adk_community/plugins/__init__.py new file mode 100644 index 0000000..ab61116 --- /dev/null +++ b/src/google/adk_community/plugins/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.adk_community.plugins.agent_governance_plugin import ( + AgentGovernancePlugin, +) + +__all__ = ["AgentGovernancePlugin"] diff --git a/src/google/adk_community/plugins/agent_governance_plugin.py b/src/google/adk_community/plugins/agent_governance_plugin.py new file mode 100644 index 0000000..05baae3 --- /dev/null +++ b/src/google/adk_community/plugins/agent_governance_plugin.py @@ -0,0 +1,185 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ADK plugin for Agent Governance Toolkit policy enforcement. + +Evaluates policy-as-code rules before tool execution using the Agent +Governance Toolkit (https://github.com/microsoft/agent-governance-toolkit). +Denied tool calls are short-circuited with a policy violation response. + +Requires: ``pip install agentmesh-platform`` +""" + +from __future__ import annotations + +import asyncio +import functools +import logging +from pathlib import Path +from typing import Any, Optional + +from google.adk.plugins.base_plugin import BasePlugin +from google.adk.tools.base_tool import BaseTool +from google.adk.tools.tool_context import ToolContext + +logger = logging.getLogger(__name__) + + +class _GovernanceUnavailableError(ImportError): + """Raised when agentmesh-platform is not installed and fail_open=False.""" + + +class AgentGovernancePlugin(BasePlugin): + """ADK plugin that enforces governance policies before tool execution. + + Uses the Agent Governance Toolkit to evaluate YAML/OPA/Cedar policies. + When a tool call is denied by policy, the plugin returns a dict response + that short-circuits tool execution (per the ADK plugin contract). + + Args: + policy_dir: Absolute or relative path to the directory containing + ``*.yaml`` policy files. Resolved relative to the caller's + working directory. Must be provided explicitly. + agent_did: Decentralized identifier for the agent. + fail_open: If ``True``, tool calls proceed when ``agentmesh-platform`` + is not installed (logs a warning). If ``False`` (default), raises + ``ImportError`` at construction time. + strict: If ``True``, raises on policy load errors instead of + skipping the file. Useful when every policy file is + security-critical and a partial load is unacceptable. + Defaults to ``False``. + + Raises: + ImportError: If ``agentmesh-platform`` is not installed and + ``fail_open`` is False. + RuntimeError: If ``strict`` is True and a policy file fails to load. + + Example:: + + from google.adk_community.plugins import AgentGovernancePlugin + + plugin = AgentGovernancePlugin( + policy_dir=Path(__file__).parent / "policies", + ) + runner = Runner(agent=my_agent, plugins=[plugin], ...) + """ + + def __init__( + self, + policy_dir: str | Path, + agent_did: str = "did:mesh:adk-agent", + fail_open: bool = False, + strict: bool = False, + ) -> None: + super().__init__(name="agent_governance") + self._policy_dir = Path(policy_dir).resolve() + self._agent_did = agent_did + self._engine = None + self._audit = None + self._setup(fail_open=fail_open, strict=strict) + + def _setup(self, *, fail_open: bool, strict: bool) -> None: + """Initialize AGT policy engine and audit service.""" + try: + from agentmesh.governance.policy import PolicyEngine + from agentmesh.services.audit import AuditService + + self._engine = PolicyEngine() + self._audit = AuditService() + + if self._policy_dir.exists(): + for f in sorted(self._policy_dir.glob("*.yaml")): + try: + self._engine.load_yaml(f.read_text()) + logger.info("Loaded policy: %s", f.name) + except Exception as exc: + if strict: + raise RuntimeError( + f"Failed to load policy {f.name}: {exc}" + ) from exc + logger.warning("Skipped %s: %s", f.name, exc) + else: + logger.warning( + "Policy directory does not exist: %s", self._policy_dir + ) + + logger.info( + "AgentGovernancePlugin initialized (agent=%s, policies=%s)", + self._agent_did, + self._policy_dir, + ) + except ImportError: + if not fail_open: + raise _GovernanceUnavailableError( + "agentmesh-platform is required for governance enforcement. " + "Install with: pip install agentmesh-platform" + ) + logger.warning( + "agentmesh-platform not installed; governance checks disabled. " + "Install with: pip install agentmesh-platform" + ) + + async def before_tool_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: ToolContext, + ) -> Optional[dict]: + """Evaluate governance policy before a tool call. + + Returns ``None`` to allow the tool to proceed, or a dict response + to short-circuit execution when the policy denies the call. + """ + if self._engine is None: + return None + + context = { + "action": tool.name, + "tool_args": tool_args, + } + + loop = asyncio.get_running_loop() + result = await loop.run_in_executor( + None, + functools.partial( + self._engine.evaluate, + agent_did=self._agent_did, + context=context, + ), + ) + + if self._audit: + self._audit.log_policy_decision( + agent_did=self._agent_did, + action=tool.name, + decision=result.action, + policy_name=result.policy_name or "", + data={"tool_args": tool_args, "reason": result.reason}, + ) + + if not result.allowed: + logger.warning( + "Policy denied tool '%s': %s (rule: %s)", + tool.name, + result.reason, + result.matched_rule, + ) + return { + "error": "policy_denied", + "reason": result.reason, + "matched_rule": result.matched_rule, + } + + return None diff --git a/tests/plugins/__init__.py b/tests/plugins/__init__.py new file mode 100644 index 0000000..58d482e --- /dev/null +++ b/tests/plugins/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/plugins/test_agent_governance_plugin.py b/tests/plugins/test_agent_governance_plugin.py new file mode 100644 index 0000000..2e4c27e --- /dev/null +++ b/tests/plugins/test_agent_governance_plugin.py @@ -0,0 +1,379 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for AgentGovernancePlugin. + +These tests mock the agentmesh-platform dependency so they run without +it installed, verifying the plugin's integration with the ADK plugin API. +""" + +from __future__ import annotations + +import sys +import types as stdlib_types +from pathlib import Path +from typing import Any +from unittest.mock import patch + +import pytest + +from google.adk_community.plugins.agent_governance_plugin import ( + AgentGovernancePlugin, +) + + +# --------------------------------------------------------------------------- +# Mock fixtures for agentmesh-platform +# --------------------------------------------------------------------------- + + +class _FakePolicyResult: + """Mimics agentmesh.governance.policy.PolicyResult.""" + + def __init__( + self, + allowed: bool, + action: str = "allow", + reason: str = "", + policy_name: str = "", + matched_rule: str = "", + ): + self.allowed = allowed + self.action = action + self.reason = reason + self.policy_name = policy_name + self.matched_rule = matched_rule + + +class _FakePolicyEngine: + """Mimics agentmesh.governance.policy.PolicyEngine.""" + + def __init__(self): + self._policies = [] + + def load_yaml(self, text: str) -> None: + self._policies.append(text) + + def evaluate(self, agent_did: str, context: dict) -> _FakePolicyResult: + action_name = context.get("action", "") + if "shell" in action_name or "delete" in action_name: + return _FakePolicyResult( + allowed=False, + action="deny", + reason=f"Blocked by policy: {action_name}", + matched_rule="block-dangerous-tools", + ) + return _FakePolicyResult( + allowed=True, action="allow", reason="Policy check passed" + ) + + +class _FakeAuditEntry: + entry_id = "audit-001" + + +class _FakeAuditService: + """Mimics agentmesh.services.audit.AuditService.""" + + def __init__(self): + self.entries = [] + + def log_policy_decision(self, **kwargs) -> _FakeAuditEntry: + self.entries.append(kwargs) + return _FakeAuditEntry() + + +def _install_fake_agentmesh(): + """Install fake agentmesh modules in sys.modules.""" + agentmesh = stdlib_types.ModuleType("agentmesh") + governance = stdlib_types.ModuleType("agentmesh.governance") + policy = stdlib_types.ModuleType("agentmesh.governance.policy") + services = stdlib_types.ModuleType("agentmesh.services") + audit = stdlib_types.ModuleType("agentmesh.services.audit") + + policy.PolicyEngine = _FakePolicyEngine + audit.AuditService = _FakeAuditService + + agentmesh.governance = governance + governance.policy = policy + agentmesh.services = services + services.audit = audit + + sys.modules["agentmesh"] = agentmesh + sys.modules["agentmesh.governance"] = governance + sys.modules["agentmesh.governance.policy"] = policy + sys.modules["agentmesh.services"] = services + sys.modules["agentmesh.services.audit"] = audit + + +def _uninstall_fake_agentmesh(): + """Remove fake agentmesh modules from sys.modules.""" + for key in list(sys.modules): + if key.startswith("agentmesh"): + del sys.modules[key] + + +# --------------------------------------------------------------------------- +# Fake ADK types for testing +# --------------------------------------------------------------------------- + + +class _FakeBaseTool: + def __init__(self, name: str): + self.name = name + + +class _FakeToolContext: + pass + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _clean_agentmesh(): + """Ensure agentmesh mocks are cleaned between tests.""" + yield + _uninstall_fake_agentmesh() + + +@pytest.fixture() +def policy_dir(tmp_path: Path) -> Path: + """Create a temporary policy directory with a sample policy.""" + policies = tmp_path / "policies" + policies.mkdir() + (policies / "default.yaml").write_text( + """\ +apiVersion: governance.toolkit/v1 +name: test-policy +rules: + - name: block-dangerous-tools + condition: "action in ['shell_exec', 'file_delete']" + action: deny +default_action: allow +""" + ) + return policies + + +class TestAgentGovernancePlugin: + """Tests for AgentGovernancePlugin construction and behavior.""" + + def test_raises_import_error_when_agentmesh_missing(self, tmp_path: Path): + """Plugin raises ImportError by default when agentmesh is missing.""" + # Simulate agentmesh not being importable by patching _setup + with patch.object( + AgentGovernancePlugin, + "_setup", + side_effect=ImportError("agentmesh-platform is required"), + ): + with pytest.raises(ImportError, match="agentmesh-platform"): + AgentGovernancePlugin(policy_dir=tmp_path) + + def test_fail_open_mode_allows_construction(self, tmp_path: Path): + """With fail_open=True, plugin degrades when engine returns no policies.""" + plugin = AgentGovernancePlugin(policy_dir=tmp_path, fail_open=True) + assert plugin.name == "agent_governance" + + def test_construction_with_agentmesh(self, policy_dir: Path): + """Plugin loads policies when agentmesh is available.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin(policy_dir=policy_dir) + assert plugin._engine is not None + assert plugin._audit is not None + assert len(plugin._engine._policies) == 1 + + def test_plugin_name(self, policy_dir: Path): + """Plugin registers with the correct name.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin(policy_dir=policy_dir) + assert plugin.name == "agent_governance" + + @pytest.mark.asyncio + async def test_allows_safe_tool_call(self, policy_dir: Path): + """Plugin returns None for allowed tool calls.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin(policy_dir=policy_dir) + + result = await plugin.before_tool_callback( + tool=_FakeBaseTool("web_search"), + tool_args={"query": "test"}, + tool_context=_FakeToolContext(), + ) + assert result is None + + @pytest.mark.asyncio + async def test_blocks_denied_tool_call(self, policy_dir: Path): + """Plugin returns error dict for denied tool calls.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin(policy_dir=policy_dir) + + result = await plugin.before_tool_callback( + tool=_FakeBaseTool("shell_exec"), + tool_args={"cmd": "rm -rf /"}, + tool_context=_FakeToolContext(), + ) + assert result is not None + assert result["error"] == "policy_denied" + assert "shell_exec" in result["reason"] + assert result["matched_rule"] == "block-dangerous-tools" + + @pytest.mark.asyncio + async def test_blocks_delete_tool(self, policy_dir: Path): + """Plugin denies file_delete actions.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin(policy_dir=policy_dir) + + result = await plugin.before_tool_callback( + tool=_FakeBaseTool("file_delete"), + tool_args={"path": "/etc/passwd"}, + tool_context=_FakeToolContext(), + ) + assert result is not None + assert result["error"] == "policy_denied" + + @pytest.mark.asyncio + async def test_audit_logs_decisions(self, policy_dir: Path): + """Plugin logs policy decisions to the audit service.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin(policy_dir=policy_dir) + + await plugin.before_tool_callback( + tool=_FakeBaseTool("web_search"), + tool_args={}, + tool_context=_FakeToolContext(), + ) + await plugin.before_tool_callback( + tool=_FakeBaseTool("shell_exec"), + tool_args={}, + tool_context=_FakeToolContext(), + ) + assert len(plugin._audit.entries) == 2 + assert plugin._audit.entries[0]["action"] == "web_search" + assert plugin._audit.entries[0]["decision"] == "allow" + assert plugin._audit.entries[1]["action"] == "shell_exec" + assert plugin._audit.entries[1]["decision"] == "deny" + + @pytest.mark.asyncio + async def test_fail_open_allows_all_when_no_engine(self, tmp_path: Path): + """With fail_open=True and no engine, all calls pass through.""" + plugin = AgentGovernancePlugin(policy_dir=tmp_path, fail_open=True) + # Simulate no engine (as if agentmesh wasn't installed) + plugin._engine = None + + result = await plugin.before_tool_callback( + tool=_FakeBaseTool("shell_exec"), + tool_args={"cmd": "rm -rf /"}, + tool_context=_FakeToolContext(), + ) + assert result is None + + def test_missing_policy_dir_still_constructs(self, tmp_path: Path): + """Plugin constructs even when policy_dir doesn't exist.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin( + policy_dir=tmp_path / "nonexistent" + ) + assert plugin._engine is not None + + def test_custom_agent_did(self, policy_dir: Path): + """Plugin uses custom agent_did in evaluations.""" + _install_fake_agentmesh() + plugin = AgentGovernancePlugin( + policy_dir=policy_dir, + agent_did="did:mesh:custom-agent", + ) + assert plugin._agent_did == "did:mesh:custom-agent" + + def test_strict_mode_raises_on_bad_policy(self, tmp_path: Path): + """With strict=True, plugin raises RuntimeError on policy load failure.""" + _install_fake_agentmesh() + + # Patch _FakePolicyEngine.load_yaml to raise on bad content + original_load = _FakePolicyEngine.load_yaml + + def failing_load(self, text): + if "INVALID" in text: + raise ValueError("invalid YAML syntax") + original_load(self, text) + + _FakePolicyEngine.load_yaml = failing_load + + policies = tmp_path / "policies" + policies.mkdir() + (policies / "bad.yaml").write_text("INVALID policy content") + + try: + with pytest.raises(RuntimeError, match="Failed to load policy"): + AgentGovernancePlugin(policy_dir=policies, strict=True) + finally: + _FakePolicyEngine.load_yaml = original_load + + def test_non_strict_skips_bad_policy(self, tmp_path: Path): + """Without strict mode, bad policies are skipped with a warning.""" + _install_fake_agentmesh() + + original_load = _FakePolicyEngine.load_yaml + + def failing_load(self, text): + if "INVALID" in text: + raise ValueError("invalid YAML syntax") + original_load(self, text) + + _FakePolicyEngine.load_yaml = failing_load + + policies = tmp_path / "policies" + policies.mkdir() + (policies / "good.yaml").write_text("valid policy") + (policies / "bad.yaml").write_text("INVALID policy content") + + try: + plugin = AgentGovernancePlugin(policy_dir=policies) + # Good policy loaded, bad one skipped + assert len(plugin._engine._policies) == 1 + finally: + _FakePolicyEngine.load_yaml = original_load + + @pytest.mark.asyncio + async def test_evaluate_runs_in_thread_pool(self, policy_dir: Path): + """Policy evaluation is offloaded to a thread pool executor.""" + import asyncio + + _install_fake_agentmesh() + plugin = AgentGovernancePlugin(policy_dir=policy_dir) + + # Track which thread evaluate() runs on + import threading + + eval_thread = None + original_evaluate = plugin._engine.evaluate + + def tracking_evaluate(**kwargs): + nonlocal eval_thread + eval_thread = threading.current_thread() + return original_evaluate(**kwargs) + + plugin._engine.evaluate = tracking_evaluate + main_thread = threading.current_thread() + + await plugin.before_tool_callback( + tool=_FakeBaseTool("web_search"), + tool_args={"query": "test"}, + tool_context=_FakeToolContext(), + ) + + assert eval_thread is not None + assert eval_thread != main_thread