Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 126 additions & 72 deletions src/blaxel/core/sandbox/default/interpreter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import asyncio
import json
import logging
from typing import Any, Callable, Dict

import httpx

from ...client.models import Sandbox
from ...common.settings import settings
from ..types import SandboxCreateConfiguration
from .sandbox import SandboxInstance

Expand Down Expand Up @@ -220,51 +222,76 @@ async def run_code(
write=write_timeout,
pool=pool_timeout,
)
async with client.stream(
"POST",
"/port/8888/execute",
json=body,
timeout=timeout_cfg,
) as response:
if response.status_code >= 400:
try:
body_text = await response.aread()
body_text = body_text.decode(errors="ignore")
except Exception:
body_text = "<unavailable>"
req = getattr(response, "request", None)
method = getattr(req, "method", "UNKNOWN") if req else "UNKNOWN"
url = str(getattr(req, "url", "UNKNOWN")) if req else "UNKNOWN"
reason = getattr(response, "reason_phrase", "")
details = (
"Execution failed\n"
f"- method: {method}\n- url: {url}\n- status: {response.status_code} {reason}\n"
f"- response-headers: {dict(response.headers)}\n- body:\n{body_text}"
)
self.logger.debug(details)
raise RuntimeError(details)

async for line in response.aiter_lines():
if not line:
continue
try:
decoded = line
except Exception:
decoded = str(line)
try:
self._parse_output(
execution,
decoded,
on_stdout=on_stdout,
on_stderr=on_stderr,
on_result=on_result,
on_error=on_error,

max_retries = settings.sandbox_read_retries
base_delay = 0.5 # seconds, matching backend guidance
max_delay = 30.0

for attempt in range(max_retries):
async with client.stream(
"POST",
"/port/8888/execute",
json=body,
timeout=timeout_cfg,
) as response:
if response.status_code >= 400:
try:
body_bytes = await response.aread()
body_text = body_bytes.decode(errors="ignore")
except Exception:
body_bytes = b""
body_text = "<unavailable>"

# Retry on transient sandbox errors
is_transient = response.status_code in (502, 503, 504) or (
response.status_code == 404
and (b"unavailable" in body_bytes.lower() or b"workload" in body_bytes.lower())
)
if is_transient and attempt < max_retries - 1:
delay = min(base_delay * (2**attempt), max_delay)
self.logger.debug(
f"Transient error (status {response.status_code}), "
f"retrying in {delay:.1f}s (attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
continue

req = getattr(response, "request", None)
method = getattr(req, "method", "UNKNOWN") if req else "UNKNOWN"
url = str(getattr(req, "url", "UNKNOWN")) if req else "UNKNOWN"
reason = getattr(response, "reason_phrase", "")
details = (
"Execution failed\n"
f"- method: {method}\n- url: {url}\n- status: {response.status_code} {reason}\n"
f"- response-headers: {dict(response.headers)}\n- body:\n{body_text}"
)
except json.JSONDecodeError:
# Fallback: treat as stdout text-only message
execution.logs.stdout.append(decoded)
if on_stdout:
on_stdout(CodeInterpreter.OutputMessage(decoded, None, False))
self.logger.debug(details)
raise RuntimeError(details)

async for line in response.aiter_lines():
if not line:
continue
try:
decoded = line
except Exception:
decoded = str(line)
try:
self._parse_output(
execution,
decoded,
on_stdout=on_stdout,
on_stderr=on_stderr,
on_result=on_result,
on_error=on_error,
)
except json.JSONDecodeError:
# Fallback: treat as stdout text-only message
execution.logs.stdout.append(decoded)
if on_stdout:
on_stdout(CodeInterpreter.OutputMessage(decoded, None, False))

# Success — break out of retry loop
break

return execution

Expand All @@ -281,32 +308,59 @@ async def create_code_context(
data["cwd"] = cwd

client = self.process.get_client()
response = await client.post(
"/port/8888/contexts",
json=data,
timeout=request_timeout or 10.0,
)
try:
# Always read response body first
body_bytes = await response.aread()

if response.status_code >= 400:
try:
body_text = body_bytes.decode("utf-8", errors="ignore")
except Exception:
body_text = "<unavailable>"
method = getattr(response.request, "method", "UNKNOWN")
url = str(getattr(response.request, "url", "UNKNOWN"))
reason = getattr(response, "reason_phrase", "")
details = (
"Create context failed\n"
f"- method: {method}\n- url: {url}\n- status: {response.status_code} {reason}\n"
f"- response-headers: {dict(response.headers)}\n- body:\n{body_text}"
)
self.logger.debug(details)
raise RuntimeError(details)

data = json.loads(body_bytes)
return CodeInterpreter.Context.from_json(data)
finally:
await response.aclose()
max_retries = settings.sandbox_read_retries
base_delay = 0.5
max_delay = 30.0

for attempt in range(max_retries):
response = await client.post(
"/port/8888/contexts",
json=data,
timeout=request_timeout or 10.0,
)
try:
# Always read response body first
body_bytes = await response.aread()

if response.status_code >= 400:
try:
body_text = body_bytes.decode("utf-8", errors="ignore")
except Exception:
body_text = "<unavailable>"

# Retry on transient sandbox errors
is_transient = response.status_code in (502, 503, 504) or (
response.status_code == 404
and (
b"unavailable" in body_bytes.lower()
or b"workload" in body_bytes.lower()
)
)
if is_transient and attempt < max_retries - 1:
delay = min(base_delay * (2**attempt), max_delay)
self.logger.debug(
f"Transient error (status {response.status_code}), "
f"retrying in {delay:.1f}s (attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
continue

method = getattr(response.request, "method", "UNKNOWN")
url = str(getattr(response.request, "url", "UNKNOWN"))
reason = getattr(response, "reason_phrase", "")
details = (
"Create context failed\n"
f"- method: {method}\n- url: {url}\n- status: {response.status_code} {reason}\n"
f"- response-headers: {dict(response.headers)}\n- body:\n{body_text}"
)
self.logger.debug(details)
raise RuntimeError(details)

result_data = json.loads(body_bytes)
return CodeInterpreter.Context.from_json(result_data)
finally:
await response.aclose()

# Should not reach here, but satisfy type checker
raise RuntimeError("Exhausted retries for create_code_context")
Loading