-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
77 lines (67 loc) · 2.11 KB
/
main.py
File metadata and controls
77 lines (67 loc) · 2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from openexec.models import ExecutionRequest
from openexec.receipts import verify_receipt
from openexec.engine import execute
from openexec.approval_validator import ApprovalError
from openexec.db import init_db
import os
import datetime
VERSION = "0.1.10"
@asynccontextmanager
async def lifespan(application):
init_db()
yield
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"service": "OpenExec", "status": "running", "version": VERSION}
@app.get("/health")
async def health():
exec_mode = os.getenv("OPENEXEC_MODE", "demo")
sig_status = "enabled" if exec_mode == "clawshield" else "disabled"
allowed_actions = os.getenv("OPENEXEC_ALLOWED_ACTIONS", "")
if allowed_actions:
allow_list = [a.strip() for a in allowed_actions.split(",") if a.strip()]
restriction = "restricted"
else:
allow_list = None
restriction = "open"
result = {
"status": "healthy",
"exec_mode": exec_mode,
"signature_verification": sig_status,
"restriction": restriction,
}
if allow_list is not None:
result["allow_list"] = allow_list
else:
result["warning"] = "No execution allow-list configured"
return result
@app.get("/version")
def version():
return {
"version": VERSION,
"timestamp": datetime.datetime.utcnow().isoformat()
}
@app.get("/ready")
def ready():
return {"ready": True}
@app.post("/execute")
def execute_action(request: ExecutionRequest):
try:
result = execute(request)
return result.model_dump()
except ApprovalError as e:
raise HTTPException(status_code=403, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
class ReceiptVerifyRequest(BaseModel):
exec_id: str
result: str
receipt: str
@app.post("/receipts/verify")
def verify_receipt_endpoint(req: ReceiptVerifyRequest):
valid = verify_receipt(req.exec_id, req.result, req.receipt)
return {"valid": valid}