From d0494de9667c6e4f81842c1da4980e6b5adb230b Mon Sep 17 00:00:00 2001 From: Heiko Hotz Date: Tue, 13 Jan 2026 09:41:49 +0000 Subject: [PATCH 1/3] Add Python Self-Healing Supply Chain Sample (UCP/AP2) --- python/self-healing-supply-chain/.gitignore | 20 ++ python/self-healing-supply-chain/README.md | 95 ++++++++ .../self-healing-supply-chain/buyer_agent.py | 229 ++++++++++++++++++ .../self-healing-supply-chain/demo_logger.py | 72 ++++++ python/self-healing-supply-chain/mock_db.py | 21 ++ .../requirements.txt | 8 + .../supplier_server.py | 110 +++++++++ 7 files changed, 555 insertions(+) create mode 100644 python/self-healing-supply-chain/.gitignore create mode 100644 python/self-healing-supply-chain/README.md create mode 100644 python/self-healing-supply-chain/buyer_agent.py create mode 100644 python/self-healing-supply-chain/demo_logger.py create mode 100644 python/self-healing-supply-chain/mock_db.py create mode 100644 python/self-healing-supply-chain/requirements.txt create mode 100644 python/self-healing-supply-chain/supplier_server.py diff --git a/python/self-healing-supply-chain/.gitignore b/python/self-healing-supply-chain/.gitignore new file mode 100644 index 0000000..8e5f45d --- /dev/null +++ b/python/self-healing-supply-chain/.gitignore @@ -0,0 +1,20 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class + +# Virtual Environment +.venv/ +venv/ +env/ + +# Environment Variables (Secrets) +.env +.env.local + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store diff --git a/python/self-healing-supply-chain/README.md b/python/self-healing-supply-chain/README.md new file mode 100644 index 0000000..8149f6a --- /dev/null +++ b/python/self-healing-supply-chain/README.md @@ -0,0 +1,95 @@ +# Self-Healing Supply Chain Demo (UCP & AP2) + +This project demonstrates an **Autonomous Supply Chain Agent** capable of "Self-Healing" when a primary supplier fails. It leverages two key next-generation protocols: + +* **UCP (Universal Commerce Protocol)**: For dynamic supplier discovery, checking inventory, and negotiating standardized checkout sessions. +* **AP2 (Agent Payments Protocol)**: For secure, policy-driven transaction governance and "Agentic Payments". + +## šŸš€ Scenario + +1. **Monitoring**: The Buyer Agent checks "Supplier A" for "Industrial Widget X". Supplier A is **Offline (503)**. +2. **Discovery (UCP)**: The Agent queries its network and discovers "Supplier B" (running locally on port 8000). +3. **Governance (AP2)**: The Agent compares the new price (Ā£550) against the standard (Ā£400). + * **Total Cost**: Ā£55,000. + * **Variance**: +37.5%. + * **Policy Check**: The Corporate Spending Policy (`mock_db.py`) limits auto-approval to Ā£1,000 or <15% variance. + * **Result**: The Agent pauses and requests **Human Sign-off** (Human-in-the-loop). +4. **Execution**: Once approved, the Agent constructs a cryptographically signed **AP2 Payment Mandate**, enables the UCP Checkout, and finalizes the order. + +## šŸ› ļø Setup & Installation + +### Prerequisites +* Python 3.12+ +* Google GenAI API Key + +### 1. Environment Configuration +Create a `.env.local` file in the root directory: +```bash +GOOGLE_API_KEY=your_api_key_here +``` + +### 2. Install Dependencies +```bash +pip install fastapi uvicorn requests python-dotenv +# Note: google-adk, ucp-sdk, ap2-sdk are currently mocked or included in this demo structure. +``` + +## šŸƒā€ā™‚ļø How to Run + +You will need **two separate terminal windows**. + +### Terminal 1: The Backup Supplier (Server) +Start the mock supplier server. This represents "Supplier B". +```bash +python supplier_server.py +``` +*Output: `šŸ­ [SUPPLIER] Server Online at http://0.0.0.0:8000`* + +### Terminal 2: The Buyer Agent (Client) +Run the agent. +```bash +python buyer_agent.py +``` +*Follow the on-screen prompts. When the policy check fails, press `Enter` to grant admin approval.* + +## šŸ­ Production Architecture + +In a real-world enterprise environment, this architecture scales from local scripts to distributed cloud services. + +```mermaid +graph TD + subgraph Buyer_Enterprise ["Enterprise Environment (Buyer)"] + Agent["Supply Chain Agent (Cloud Run)"] + Orch["Orchestrator (Temporal)"] + PolicyDB["Policy Engine (OPA)"] + + subgraph Human ["Human-in-the-Loop"] + Approver(("Supply Chain Manager")) + Dashboard["Operations Dashboard"] + end + end + + subgraph External ["External Ecosystem"] + Registry["UCP Registry"] + SupplierA["Supplier A (Primary - DOWN)"] + SupplierB["Supplier B (Backup - FOUND)"] + end + + Agent -->|1. Monitor| SupplierA + Agent -->|2. Discover| Registry + Registry -->|3. Resolve| SupplierB + Agent -->|4. Negotiate| SupplierB + + Agent -->|5. Policy Check| PolicyDB + PolicyDB -- Fail --> Orch + Orch -->|6. Request Approval| Dashboard + Approver -->|7. Approve| Dashboard + Dashboard -->|8. Sign Mandate| Agent + + Agent -->|9. Execute Payment| SupplierB +``` + +### Key Differences in Production +1. **Decentralized Discovery**: Instead of a hardcoded `mock_db.py`, the Agent queries a **UCP Registry** or DID Resolver to find verified suppliers dynamically. +2. **Cryptographic Trust**: AP2 Mandates are signed with real enterprise Keys (KMS/Vault), providing non-repudiable proof of intent. +3. **Governance Dashboard**: The `input()` prompt is replaced by a secure **Operations Dashboard** or Slack/Mobile push notification for one-click approval. diff --git a/python/self-healing-supply-chain/buyer_agent.py b/python/self-healing-supply-chain/buyer_agent.py new file mode 100644 index 0000000..07e5e4e --- /dev/null +++ b/python/self-healing-supply-chain/buyer_agent.py @@ -0,0 +1,229 @@ +# buyer_agent.py +import asyncio +import requests +import os +from datetime import datetime, timedelta +from dotenv import load_dotenv + +load_dotenv(dotenv_path=".env.local") + +# --- 1. ADK (The Brain) --- +from google.adk.agents import Agent +from google.adk.runners import Runner +from google.adk.sessions import InMemorySessionService +from google.genai import types as genai_types + +# --- 2. A2A (The Message Standard) --- +# pip install a2a-sdk +from a2a import types as a2a_types + +# --- 3. AP2 (The Trust/Payment Standard) --- +# pip install "git+https://github.com/google-agentic-commerce/AP2.git@main" +from ap2.types.mandate import ( + IntentMandate, + PaymentMandate, + PaymentMandateContents +) +from ap2.types.payment_request import PaymentResponse, PaymentItem, PaymentCurrencyAmount + +# --- 4. UCP (The Commerce Schema) --- +# pip install "git+https://github.com/Universal-Commerce-Protocol/python-sdk.git" +from ucp_sdk.models.schemas.shopping.checkout_create_req import CheckoutCreateRequest +from ucp_sdk.models.schemas.shopping.types.line_item_create_req import LineItemCreateRequest +from ucp_sdk.models.schemas.shopping.types.item_create_req import ItemCreateRequest +from ucp_sdk.models.schemas.shopping.payment_create_req import PaymentCreateRequest +from ucp_sdk.models.schemas.shopping.ap2_mandate import ( + CompleteRequestWithAp2, + Ap2CompleteRequest, + CheckoutMandate +) + +# --- Local Helpers --- +from demo_logger import logger +from mock_db import PRODUCT_CATALOG, SPENDING_POLICY + +# --- Tools --- + +def check_primary_supplier(item_id: str) -> dict: + """Checks inventory at the primary supplier.""" + logger.step("1. MONITORING INVENTORY") + logger.agent(f"Checking primary supplier for {item_id}...") + logger.error("Primary Supplier connection failed (503 Service Unavailable)") + return {"status": "error", "message": "Primary Supplier Offline/OOS"} + +def discover_backup_suppliers(item_id: str) -> list: + """Queries backup suppliers using UCP Dynamic Discovery.""" + logger.step("2. UCP DISCOVERY") + logger.agent("Initiating self-healing protocol. Scanning backup nodes...") + + product_data = PRODUCT_CATALOG.get(item_id) + if not product_data: + return [] + + found_suppliers = [] + + for url in product_data['backup_suppliers']: + try: + logger.ucp(f"GET {url}/.well-known/ucp") + response = requests.get(f"{url}/.well-known/ucp") + + if response.status_code == 200: + profile = response.json() + + # --- Dynamic Service Resolution --- + services = profile.get("ucp", {}).get("services", {}) + shopping_service = services.get("dev.ucp.shopping") + + if not shopping_service: + continue + + api_endpoint = shopping_service.get("rest", {}).get("endpoint") + logger.agent(f"Discovered Commerce Endpoint: {api_endpoint}") + + inventory = profile.get("inventory", {}).get(item_id, {}) + if inventory.get("in_stock"): + price_major = inventory.get("price") / 100 + + found_suppliers.append({ + "supplier": "Supplier B", + "discovery_url": url, + "api_endpoint": api_endpoint, + "price": price_major, + "currency": "GBP" + }) + except Exception as e: + logger.error(f"Discovery failed: {e}") + + return found_suppliers + +def check_governance_and_approve(supplier_price: float, item_id: str, quantity: int = 100) -> dict: + """Checks price variance and creates AP2 Intent Mandate.""" + logger.step("3. AP2 GOVERNANCE & INTENT") + + product = PRODUCT_CATALOG.get(item_id) + std_price = product['standard_price'] + total_cost = supplier_price * quantity + variance = (supplier_price - std_price) / std_price + + logger.ap2(f"Checking Price: Ā£{supplier_price:.2f} (Std: Ā£{std_price:.2f})") + logger.ap2(f"Total Cost: Ā£{total_cost:.2f}") + logger.ap2(f"Variance: {variance:.1%}") + + # [AP2 Protocol] Creating the Intent Mandate + intent = IntentMandate( + natural_language_description=f"Purchase {quantity} x {item_id} for supply chain resilience", + user_cart_confirmation_required=True, + intent_expiry=(datetime.now() + timedelta(hours=1)).isoformat() + ) + + if total_cost <= SPENDING_POLICY['auto_approve_limit']: + logger.ap2(f"āœ… Total cost (Ā£{total_cost:.2f}) within auto-approve limit (Ā£{SPENDING_POLICY['auto_approve_limit']:.2f}). Approved.") + return {"approved": True, "mandate": intent.model_dump(), "type": "auto"} + elif variance <= SPENDING_POLICY['max_variance']: + logger.ap2("āœ… Variance within policy limits. Approved.") + return {"approved": True, "mandate": intent.model_dump(), "type": "auto"} + else: + logger.ap2("āš ļø Policy Check Failed. Requesting Human Sign-off.") + input(f"[ADMIN] Sign off on Ā£{total_cost:.2f} ({variance:.1%} variance)? (Press Enter): ") + return {"approved": True, "mandate": intent.model_dump(), "type": "manual"} + +def execute_ucp_transaction(api_endpoint: str, item_id: str, price: float, mandate_type: str) -> dict: + """Executes the transaction using UCP schemas and AP2 Payment Mandates.""" + logger.step("4. UCP TRANSACTION & AP2 PAYMENT") + + # [UCP Protocol] 1. Create Checkout + logger.ucp("Building UCP Checkout Request...") + ucp_req = CheckoutCreateRequest( + currency="GBP", + line_items=[ + LineItemCreateRequest( + quantity=100, + item=ItemCreateRequest(id=item_id) + ) + ], + payment=PaymentCreateRequest() + ) + + target_url = f"{api_endpoint}/checkout-sessions" + logger.ucp(f"POST {target_url}", ucp_req) + + res = requests.post(target_url, json=ucp_req.model_dump(mode='json', exclude_none=True)) + checkout_data = res.json() + + # [AP2 Protocol] 2. Construct Payment Mandate (The Trust Proof) + logger.ap2("Constructing AP2 Payment Mandate...") + + payment_payload = PaymentMandate( + payment_mandate_contents=PaymentMandateContents( + payment_mandate_id="pm_unique_123", + payment_details_id=checkout_data['id'], + payment_details_total=PaymentItem( + label="Total", + amount=PaymentCurrencyAmount(currency="GBP", value=price * 100) + ), + payment_response=PaymentResponse( + request_id="req_1", + method_name="corporate_p_card" + ), + merchant_agent="supplier_b_agent" + ), + user_authorization="eyJhbGciOiJFUzI1NiJ9..signed_by_user_private_key" + ) + + # [UCP Protocol] 3. Complete Checkout (Embedding AP2) + logger.ucp("Finalizing UCP Transaction...") + + complete_req = CompleteRequestWithAp2( + ap2=Ap2CompleteRequest( + checkout_mandate=CheckoutMandate(root=payment_payload.user_authorization) + ) + ) + + complete_url = f"{target_url}/{checkout_data['id']}/complete" + logger.ucp(f"POST {complete_url}", complete_req) + + final_res = requests.post( + complete_url, + json=complete_req.model_dump(mode='json', exclude_none=True) + ) + + logger.step("5. RESULT") + logger.ucp("Transaction Finalized:", final_res.json()) + + return final_res.json() + +# --- Agent Definition --- +agent = Agent( + name="SelfHealingSupplyChainBot", + model="gemini-2.0-flash", + instruction=""" + You are a Supply Chain Agent utilizing UCP and AP2 protocols. + 1. Check primary supplier. + 2. If down, Discover UCP backup suppliers. + 3. Generate AP2 Intent Mandate and check governance. + 4. Execute UCP Checkout with AP2 Payment Mandate. + """, + tools=[check_primary_supplier, discover_backup_suppliers, check_governance_and_approve, execute_ucp_transaction] +) + +async def main(): + session_service = InMemorySessionService() + runner = Runner(agent=agent, session_service=session_service, app_name="demo") + + # FIX: Use keyword arguments (app_name=..., user_id=...) + # The ADK requires specific names for these parameters. + session = await session_service.create_session( + app_name="demo", + user_id="user" + ) + + print("\n--- AGENT INITIALIZED ---\n") + + # [A2A Protocol] Using strict types for user input + user_msg = genai_types.Content(parts=[genai_types.Part(text="Check inventory for widget-x.")], role="user") + + async for event in runner.run_async(session_id=session.id, user_id="user", new_message=user_msg): + pass + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/python/self-healing-supply-chain/demo_logger.py b/python/self-healing-supply-chain/demo_logger.py new file mode 100644 index 0000000..1bf52f1 --- /dev/null +++ b/python/self-healing-supply-chain/demo_logger.py @@ -0,0 +1,72 @@ +import json +import logging +import sys +from datetime import datetime + +class DemoLogger: + # Colors defined as class attributes so they can be accessed via logger.GREEN + CYAN = "\033[96m" + GREEN = "\033[92m" + YELLOW = "\033[93m" + RED = "\033[91m" + MAGENTA = "\033[95m" + RESET = "\033[0m" + BOLD = "\033[1m" + + def __init__(self, filename="demo_audit.log"): + self.filename = filename + # Clear previous log + with open(self.filename, 'w') as f: + f.write(f"--- SUPPLY CHAIN DEMO START: {datetime.now()} ---\n") + + def _write_file(self, text): + with open(self.filename, 'a') as f: + f.write(text + "\n") + + def step(self, step_name): + msg = f"\n{'='*60}\nSTEP: {step_name}\n{'='*60}" + print(f"{self.BOLD}{msg}{self.RESET}") + self._write_file(msg) + + def agent(self, message): + """Logs internal Agent reasoning""" + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"{self.CYAN}šŸ¤– [AGENT] {message}{self.RESET}") + self._write_file(f"[{timestamp}] [AGENT] {message}") + + def ucp(self, message, payload=None): + """Logs UCP Protocol interactions (Discovery/Checkout)""" + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"{self.MAGENTA}🌐 [UCP/NET] {message}{self.RESET}") + self._write_file(f"[{timestamp}] [UCP] {message}") + if payload: + if hasattr(payload, 'model_dump'): + payload = payload.model_dump(mode='json', exclude_none=True) + + json_str = json.dumps(payload, indent=2) + print(f"{self.MAGENTA}{json_str}{self.RESET}") + self._write_file(json_str) + + def ap2(self, message, payload=None): + """Logs AP2 Governance and Mandates""" + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"{self.YELLOW}šŸ” [AP2/GOV] {message}{self.RESET}") + self._write_file(f"[{timestamp}] [AP2] {message}") + if payload: + if hasattr(payload, 'model_dump'): + payload = payload.model_dump(mode='json', exclude_none=True) + json_str = json.dumps(payload, indent=2) + print(f"{self.YELLOW}{json_str}{self.RESET}") + self._write_file(json_str) + + def supplier(self, message): + """Logs Supplier-side actions""" + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"{self.GREEN}šŸ­ [SUPPLIER] {message}{self.RESET}") + self._write_file(f"[{timestamp}] [SUPPLIER] {message}") + + def error(self, message): + print(f"{self.RED}āŒ [ERROR] {message}{self.RESET}") + self._write_file(f"[ERROR] {message}") + +logger = DemoLogger() \ No newline at end of file diff --git a/python/self-healing-supply-chain/mock_db.py b/python/self-healing-supply-chain/mock_db.py new file mode 100644 index 0000000..f08ed16 --- /dev/null +++ b/python/self-healing-supply-chain/mock_db.py @@ -0,0 +1,21 @@ +# mock_db.py + +# The "Product Knowledge Graph" +PRODUCT_CATALOG = { + "widget-x": { + "name": "Industrial Widget X", + "standard_price": 400.00, # GBP + "currency": "GBP", + "variance_threshold": 0.15, # 15% + "primary_supplier": "supplier-a", + "backup_suppliers": [ + "http://localhost:8000" # This will be our Supplier B + ] + } +} + +# The Corporate Spending Policy (AP2 Logic) +SPENDING_POLICY = { + "auto_approve_limit": 1000.00, + "max_variance": 0.15 +} \ No newline at end of file diff --git a/python/self-healing-supply-chain/requirements.txt b/python/self-healing-supply-chain/requirements.txt new file mode 100644 index 0000000..2a26e02 --- /dev/null +++ b/python/self-healing-supply-chain/requirements.txt @@ -0,0 +1,8 @@ +fastapi +uvicorn +requests +python-dotenv +google-genai +# google-adk (Mocked/Included) +# ucp-sdk (Mocked/Included) +# ap2-sdk (Mocked/Included) diff --git a/python/self-healing-supply-chain/supplier_server.py b/python/self-healing-supply-chain/supplier_server.py new file mode 100644 index 0000000..748f652 --- /dev/null +++ b/python/self-healing-supply-chain/supplier_server.py @@ -0,0 +1,110 @@ +# supplier_server.py +import uvicorn +from fastapi import FastAPI, HTTPException, Request +from demo_logger import logger + +# --- UCP SDK Imports --- +from ucp_sdk.models.schemas.shopping.checkout_create_req import CheckoutCreateRequest +from ucp_sdk.models.schemas.shopping.checkout_resp import CheckoutResponse +from ucp_sdk.models.schemas.shopping.types.line_item_resp import LineItemResponse +from ucp_sdk.models.schemas.shopping.types.item_resp import ItemResponse +from ucp_sdk.models.schemas.shopping.types.total_resp import TotalResponse +from ucp_sdk.models.schemas.shopping.ap2_mandate import ( + CheckoutResponseWithAp2, + MerchantAuthorization, + Ap2CheckoutResponse, + CompleteRequestWithAp2 +) + +app = FastAPI() + +# 1. UCP Discovery +@app.get("/.well-known/ucp") +def get_ucp_profile(request: Request): + base_url = str(request.base_url).rstrip("/") + logger.supplier("Received UCP Discovery Request") + return { + "ucp": { + "version": "2026-01-11", + "services": { + "dev.ucp.shopping": { + "version": "2026-01-11", + "spec": "https://ucp.dev/specification/overview", + "rest": { + "endpoint": f"{base_url}/ucp/v1", + "schema": "https://ucp.dev/services/shopping/rest.openapi.json" + } + } + }, + "capabilities": [ + {"name": "dev.ucp.shopping.checkout", "version": "2026-01-11"}, + {"name": "dev.ucp.shopping.ap2_mandate", "version": "2026-01-11", "extends": "dev.ucp.shopping.checkout"} + ] + }, + "inventory": { + "widget-x": {"in_stock": True, "price": 55000} + } + } + +# 2. Checkout +@app.post("/ucp/v1/checkout-sessions", response_model=CheckoutResponseWithAp2) +def create_checkout(checkout_req: CheckoutCreateRequest): + logger.supplier("Received Valid UCP Checkout Request") + + # Simulate AP2 Signing + merchant_signature = "eyJhbGciOiJFUzI1NiJ9..signed_by_merchant_private_key" + + response = CheckoutResponseWithAp2( + ucp={"version": "2026-01-11", "capabilities": []}, + id="chk_123456789", + status="ready_for_complete", + currency="GBP", + links=[], + payment={"handlers": [], "instruments": []}, + line_items=[ + LineItemResponse( + id="li_1", + quantity=100, + item=ItemResponse( + id="widget-x", + title="Industrial Widget X", + price=55000 + ), + totals=[TotalResponse(type="total", amount=5500000)] + ) + ], + totals=[TotalResponse(type="total", amount=5500000)], + ap2=Ap2CheckoutResponse( + merchant_authorization=MerchantAuthorization(root=merchant_signature) + ) + ) + logger.supplier(f"Created Session {response.id} with AP2 Sig") + return response + +# 3. Complete +@app.post("/ucp/v1/checkout-sessions/{id}/complete") +def complete_checkout(id: str, request: CompleteRequestWithAp2): + logger.supplier(f"Processing Payment for Session {id}") + + if not request.ap2 or not request.ap2.checkout_mandate: + logger.error("Protocol Violation: Missing AP2 Mandate") + raise HTTPException(status_code=400, detail="Missing Mandate") + + mandate_token = request.ap2.checkout_mandate.root + logger.supplier(f"Verifying AP2 Mandate Token: {mandate_token[:15]}...") + logger.supplier("āœ… Signature Valid. Dispatching Goods.") + + return { + "id": id, + "status": "completed", + "order": {"id": "ord_999", "permalink_url": "http://order"} + } + +if __name__ == "__main__": + # Add a visible print statement so you know it's alive + print(f"\n{logger.GREEN}šŸ­ [SUPPLIER] Server Online at http://0.0.0.0:8000{logger.RESET}") + print(f"{logger.GREEN}šŸ­ [SUPPLIER] Waiting for UCP Discovery requests...{logger.RESET}\n") + + # Change log_level to "info" if you want to see standard Uvicorn logs, + # or keep "error" to rely solely on our custom logger. + uvicorn.run(app, host="0.0.0.0", port=8000, log_level="error") \ No newline at end of file From 14b738f1539f823f4a4019c4c6701d9d5122d5db Mon Sep 17 00:00:00 2001 From: Heiko Hotz Date: Tue, 13 Jan 2026 11:06:28 +0000 Subject: [PATCH 2/3] Update: Interactive Mode, Gemini 3, and Best Practices --- python/self-healing-supply-chain/README.md | 31 +++++++- .../self-healing-supply-chain/buyer_agent.py | 77 +++++++++++++++---- python/self-healing-supply-chain/mock_db.py | 1 - 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/python/self-healing-supply-chain/README.md b/python/self-healing-supply-chain/README.md index 8149f6a..ebb93a0 100644 --- a/python/self-healing-supply-chain/README.md +++ b/python/self-healing-supply-chain/README.md @@ -2,12 +2,15 @@ This project demonstrates an **Autonomous Supply Chain Agent** capable of "Self-Healing" when a primary supplier fails. It leverages two key next-generation protocols: +* **Google ADK (Agent Development Kit)**: For orchestrating the autonomous agent, managing state, and integrating with Gemini 3 Flash. * **UCP (Universal Commerce Protocol)**: For dynamic supplier discovery, checking inventory, and negotiating standardized checkout sessions. * **AP2 (Agent Payments Protocol)**: For secure, policy-driven transaction governance and "Agentic Payments". ## šŸš€ Scenario -1. **Monitoring**: The Buyer Agent checks "Supplier A" for "Industrial Widget X". Supplier A is **Offline (503)**. +1. **Start State**: The demo begins in an **Interactive Mode** with 100 units of inventory. You simulate sales manually. +2. **Trigger**: When inventory drops below the critical threshold (20 units), the **Autonomous Agent** wakes up to restock. +3. **Monitoring**: The Buyer Agent checks "Supplier A" for "Industrial Widget X". Supplier A is **Offline (503)**. 2. **Discovery (UCP)**: The Agent queries its network and discovers "Supplier B" (running locally on port 8000). 3. **Governance (AP2)**: The Agent compares the new price (Ā£550) against the standard (Ā£400). * **Total Cost**: Ā£55,000. @@ -50,9 +53,31 @@ Run the agent. ```bash python buyer_agent.py ``` -*Follow the on-screen prompts. When the policy check fails, press `Enter` to grant admin approval.* -## šŸ­ Production Architecture +* **Interactive Loop**: The agent will display current inventory (100). +* **Input**: Enter a number (e.g., `85`) to simulate sales. +* **Auto-Restock**: Once inventory drops below 20, the Self-Healing Agent automatically runs. +* **Approval**: When the policy check fails, press `Enter` to grant admin approval. + +## 🧠 Agent Architecture (Google ADK) + +The Buyer Agent is built using the **Google Agent Development Kit (ADK)**, utilizing a modular "Brain + Tools" pattern: + +* **Model**: Uses `gemini-2.0-flash` for high-speed reasoning and decision making. +* **Tools**: Encapsulates specific capabilities as Python functions (`discover_backup_suppliers`, `execute_ucp_transaction`). +* **Runner**: The ADK `Runner` handles the event loop, routing user inputs (sales data) or system triggers (low inventory) to the model, which then decides which Tool to call. +* **State**: Uses `InMemorySessionService` to maintain context across the multi-step recovery flow. + +## ļæ½ Best Practices Alignment + +This demo adheres to the official **Google Developers UCP & AP2 Architecture**: + +1. **Dynamic Discovery**: Endpoints are never hardcoded. The Agent resolves capabilities dynamically via `/.well-known/ucp`. +2. **Service-Based Architecture**: Separation of concerns between UCP (Commerce) and AP2 (Trust). +3. **Verifiable Intent**: Utilizes cryptographic **AP2 Mandates** (Detached JWS) to anchor every transaction to a signed user intent. +4. **Standardized Schemas**: Uses official `ucp-sdk` Pydantic models generated from the canonical JSON Schemas. + +## ļæ½šŸ­ Production Architecture In a real-world enterprise environment, this architecture scales from local scripts to distributed cloud services. diff --git a/python/self-healing-supply-chain/buyer_agent.py b/python/self-healing-supply-chain/buyer_agent.py index 07e5e4e..5a560b2 100644 --- a/python/self-healing-supply-chain/buyer_agent.py +++ b/python/self-healing-supply-chain/buyer_agent.py @@ -2,6 +2,7 @@ import asyncio import requests import os +import sys from datetime import datetime, timedelta from dotenv import load_dotenv @@ -14,11 +15,9 @@ from google.genai import types as genai_types # --- 2. A2A (The Message Standard) --- -# pip install a2a-sdk from a2a import types as a2a_types # --- 3. AP2 (The Trust/Payment Standard) --- -# pip install "git+https://github.com/google-agentic-commerce/AP2.git@main" from ap2.types.mandate import ( IntentMandate, PaymentMandate, @@ -27,7 +26,6 @@ from ap2.types.payment_request import PaymentResponse, PaymentItem, PaymentCurrencyAmount # --- 4. UCP (The Commerce Schema) --- -# pip install "git+https://github.com/Universal-Commerce-Protocol/python-sdk.git" from ucp_sdk.models.schemas.shopping.checkout_create_req import CheckoutCreateRequest from ucp_sdk.models.schemas.shopping.types.line_item_create_req import LineItemCreateRequest from ucp_sdk.models.schemas.shopping.types.item_create_req import ItemCreateRequest @@ -116,15 +114,13 @@ def check_governance_and_approve(supplier_price: float, item_id: str, quantity: intent_expiry=(datetime.now() + timedelta(hours=1)).isoformat() ) - if total_cost <= SPENDING_POLICY['auto_approve_limit']: - logger.ap2(f"āœ… Total cost (Ā£{total_cost:.2f}) within auto-approve limit (Ā£{SPENDING_POLICY['auto_approve_limit']:.2f}). Approved.") - return {"approved": True, "mandate": intent.model_dump(), "type": "auto"} - elif variance <= SPENDING_POLICY['max_variance']: + if variance <= SPENDING_POLICY['max_variance']: logger.ap2("āœ… Variance within policy limits. Approved.") return {"approved": True, "mandate": intent.model_dump(), "type": "auto"} else: logger.ap2("āš ļø Policy Check Failed. Requesting Human Sign-off.") - input(f"[ADMIN] Sign off on Ā£{total_cost:.2f} ({variance:.1%} variance)? (Press Enter): ") + # We use standard input here for simplicity in this demo flow + input(f"[ADMIN] Sign off on Ā£{total_cost:.2f} (Unit: Ā£{supplier_price:.2f} vs Std: Ā£{std_price:.2f}, {variance:.1%} variance)? (Press Enter): ") return {"approved": True, "mandate": intent.model_dump(), "type": "manual"} def execute_ucp_transaction(api_endpoint: str, item_id: str, price: float, mandate_type: str) -> dict: @@ -195,7 +191,7 @@ def execute_ucp_transaction(api_endpoint: str, item_id: str, price: float, manda # --- Agent Definition --- agent = Agent( name="SelfHealingSupplyChainBot", - model="gemini-2.0-flash", + model="gemini-3-flash-preview", instruction=""" You are a Supply Chain Agent utilizing UCP and AP2 protocols. 1. Check primary supplier. @@ -206,24 +202,77 @@ def execute_ucp_transaction(api_endpoint: str, item_id: str, price: float, manda tools=[check_primary_supplier, discover_backup_suppliers, check_governance_and_approve, execute_ucp_transaction] ) -async def main(): +# --- Interactive Mode Logic --- + +class InventoryManager: + def __init__(self, initial_stock=100, threshold=20): + self.inventory = initial_stock + self.threshold = threshold + + def sell(self, amount): + if amount > self.inventory: + print(f"āŒ Not enough stock! Current: {self.inventory}") + return False + self.inventory -= amount + return True + + def restock(self, amount): + self.inventory += amount + print(f"šŸ“¦ Restocked {amount} units. New Level: {self.inventory}") + + def is_critical(self): + return self.inventory < self.threshold + +async def trigger_restock_flow(): + print(f"\n{logger.RED}⚠ CRITICAL INVENTORY ALERT! Initiating Autonomous Restock Protocol...{logger.RESET}") session_service = InMemorySessionService() runner = Runner(agent=agent, session_service=session_service, app_name="demo") - # FIX: Use keyword arguments (app_name=..., user_id=...) - # The ADK requires specific names for these parameters. session = await session_service.create_session( app_name="demo", user_id="user" ) - print("\n--- AGENT INITIALIZED ---\n") - # [A2A Protocol] Using strict types for user input user_msg = genai_types.Content(parts=[genai_types.Part(text="Check inventory for widget-x.")], role="user") async for event in runner.run_async(session_id=session.id, user_id="user", new_message=user_msg): pass + + return True # Assume success for demo flow + +async def main(): + manager = InventoryManager(initial_stock=100, threshold=20) + + print("\n--- INTERACTIVE SUPPLY CHAIN DEMO ---") + print(f"Initial Inventory: {manager.inventory}") + print(f"Restock Threshold: < {manager.threshold}") + print("-------------------------------------") + + while True: + print(f"\nšŸ“Š Current Inventory: {logger.BOLD}{manager.inventory}{logger.RESET}") + + try: + user_input = await asyncio.to_thread(input, "šŸ›’ Enter units sold (or 'q' to quit): ") + + if user_input.lower() == 'q': + break + + try: + sold_amount = int(user_input) + except ValueError: + print("āŒ Please enter a valid number.") + continue + + if manager.sell(sold_amount): + if manager.is_critical(): + success = await trigger_restock_flow() + if success: + manager.restock(100) + + except KeyboardInterrupt: + print("\nExiting...") + break if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/python/self-healing-supply-chain/mock_db.py b/python/self-healing-supply-chain/mock_db.py index f08ed16..5727cd0 100644 --- a/python/self-healing-supply-chain/mock_db.py +++ b/python/self-healing-supply-chain/mock_db.py @@ -16,6 +16,5 @@ # The Corporate Spending Policy (AP2 Logic) SPENDING_POLICY = { - "auto_approve_limit": 1000.00, "max_variance": 0.15 } \ No newline at end of file From d90aad3e387e195a1ae0be9c83dd02b1453a2c90 Mon Sep 17 00:00:00 2001 From: Heiko Hotz Date: Tue, 13 Jan 2026 11:09:27 +0000 Subject: [PATCH 3/3] Update: Upgrade to Gemini 3 Flash Preview (README) --- python/self-healing-supply-chain/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/self-healing-supply-chain/README.md b/python/self-healing-supply-chain/README.md index ebb93a0..2fb75e8 100644 --- a/python/self-healing-supply-chain/README.md +++ b/python/self-healing-supply-chain/README.md @@ -63,7 +63,7 @@ python buyer_agent.py The Buyer Agent is built using the **Google Agent Development Kit (ADK)**, utilizing a modular "Brain + Tools" pattern: -* **Model**: Uses `gemini-2.0-flash` for high-speed reasoning and decision making. +* **Model**: Uses `gemini-3-flash-preview` for high-speed reasoning and decision making. * **Tools**: Encapsulates specific capabilities as Python functions (`discover_backup_suppliers`, `execute_ucp_transaction`). * **Runner**: The ADK `Runner` handles the event loop, routing user inputs (sales data) or system triggers (low inventory) to the model, which then decides which Tool to call. * **State**: Uses `InMemorySessionService` to maintain context across the multi-step recovery flow.