-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathopenai_integration.py
More file actions
129 lines (104 loc) · 4.21 KB
/
openai_integration.py
File metadata and controls
129 lines (104 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""Integrating Cycles with the OpenAI Python SDK.
Guards OpenAI chat completion calls with budget reservations,
caps-aware token limiting, and accurate cost tracking.
Requirements:
pip install runcycles openai
Environment variables:
CYCLES_BASE_URL - Cycles server URL (default: http://localhost:7878)
CYCLES_API_KEY - Cycles API key
CYCLES_TENANT - Tenant identifier
OPENAI_API_KEY - OpenAI API key
"""
import os
from openai import OpenAI
from runcycles import (
CyclesClient,
CyclesConfig,
CyclesMetrics,
cycles,
get_cycles_context,
set_default_client,
)
# ---------------------------------------------------------------------------
# 1. Configure Cycles
# ---------------------------------------------------------------------------
config = CyclesConfig(
base_url=os.environ.get("CYCLES_BASE_URL", "http://localhost:7878"),
api_key=os.environ.get("CYCLES_API_KEY", "your-api-key"),
tenant=os.environ.get("CYCLES_TENANT", "acme"),
app="openai-example",
)
cycles_client = CyclesClient(config)
set_default_client(cycles_client)
# ---------------------------------------------------------------------------
# 2. Configure OpenAI
# ---------------------------------------------------------------------------
openai_client = OpenAI() # reads OPENAI_API_KEY from env
# Approximate per-token pricing in USD microcents (1 USD = 100_000_000 microcents).
# Adjust these for the model you use.
PRICE_PER_INPUT_TOKEN = 250 # $2.50 / 1M tokens → 250 microcents / token
PRICE_PER_OUTPUT_TOKEN = 1_000 # $10.00 / 1M tokens → 1000 microcents / token
def estimate_cost(prompt: str, max_tokens: int = 1024) -> int:
"""Estimate the worst-case cost before calling the API."""
estimated_input_tokens = len(prompt.split()) * 2 # rough tokenizer proxy
input_cost = estimated_input_tokens * PRICE_PER_INPUT_TOKEN
output_cost = max_tokens * PRICE_PER_OUTPUT_TOKEN
return input_cost + output_cost
def actual_cost(result: dict) -> int:
"""Compute the real cost from the API response usage."""
usage = result["usage"]
return (
usage["prompt_tokens"] * PRICE_PER_INPUT_TOKEN
+ usage["completion_tokens"] * PRICE_PER_OUTPUT_TOKEN
)
# ---------------------------------------------------------------------------
# 3. Budget-guarded OpenAI call
# ---------------------------------------------------------------------------
@cycles(
estimate=lambda prompt, **kw: estimate_cost(prompt, kw.get("max_tokens", 1024)),
actual=actual_cost,
action_kind="llm.completion",
action_name="gpt-4o",
unit="USD_MICROCENTS",
ttl_ms=60_000,
)
def chat_completion(prompt: str, max_tokens: int = 1024) -> dict:
"""Call OpenAI with budget protection."""
ctx = get_cycles_context()
# Respect caps: if the budget authority limits max_tokens, obey it
if ctx and ctx.has_caps() and ctx.caps.max_tokens:
max_tokens = min(max_tokens, ctx.caps.max_tokens)
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
)
usage = response.usage
# Report detailed metrics for observability
if ctx:
ctx.metrics = CyclesMetrics(
tokens_input=usage.prompt_tokens,
tokens_output=usage.completion_tokens,
latency_ms=None, # set if you measure wall-clock time
model_version=response.model,
)
return {
"content": response.choices[0].message.content,
"usage": {
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
},
}
# ---------------------------------------------------------------------------
# 4. Run it
# ---------------------------------------------------------------------------
def main() -> None:
from runcycles import BudgetExceededError
try:
result = chat_completion("Explain what budget authority means in three sentences.")
print(f"Response: {result['content']}")
print(f"Tokens used: {result['usage']}")
except BudgetExceededError:
print("Budget exhausted — falling back to cached response.")
if __name__ == "__main__":
main()