Skip to content

Commit f0e4080

Browse files
authored
Merge pull request #14 from onyx-dot-app/kube-stream-2
feat: Stream output in kubernetes
2 parents e587cf1 + 44b91df commit f0e4080

2 files changed

Lines changed: 449 additions & 1 deletion

File tree

code-interpreter/app/services/executor_kubernetes.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
EntryKind,
3333
ExecutionResult,
3434
HealthCheck,
35+
StreamChunk,
36+
StreamEvent,
37+
StreamResult,
3538
WorkspaceEntry,
3639
wrap_last_line_interactive,
3740
)
@@ -61,7 +64,7 @@ class _KubeExecContext:
6164
"""Holds the live pod and exec stream for the duration of an execution."""
6265

6366
pod_name: str
64-
exec_resp: Any # kubernetes WSClient
67+
exec_resp: ws_client.WSClient
6568
start: float
6669

6770

@@ -552,6 +555,52 @@ def execute_python(
552555
files=workspace_snapshot,
553556
)
554557

558+
def execute_python_streaming(
559+
self,
560+
*,
561+
code: str,
562+
stdin: str | None,
563+
timeout_ms: int,
564+
max_output_bytes: int,
565+
cpu_time_limit_sec: int | None = None,
566+
memory_limit_mb: int | None = None,
567+
files: Sequence[tuple[str, bytes]] | None = None,
568+
last_line_interactive: bool = True,
569+
) -> Generator[StreamEvent, None, None]:
570+
"""Execute Python code and yield output chunks as they arrive.
571+
572+
Yields StreamChunk events during execution, then a single StreamResult
573+
at the end containing exit_code, timing, and workspace files.
574+
"""
575+
with self._run_in_pod(
576+
code=code,
577+
cpu_time_limit_sec=cpu_time_limit_sec,
578+
memory_limit_mb=memory_limit_mb,
579+
files=files,
580+
last_line_interactive=last_line_interactive,
581+
) as ctx:
582+
if stdin:
583+
logger.debug("Writing stdin to Python process")
584+
ctx.exec_resp.write_stdin(stdin)
585+
586+
deadline = time.time() + (timeout_ms / 1000.0)
587+
exit_code, timed_out = yield from _stream_kube_output(
588+
ctx.exec_resp, deadline, max_output_bytes
589+
)
590+
591+
if timed_out:
592+
self._kill_python_process(ctx.pod_name)
593+
594+
workspace_snapshot = self._extract_workspace_snapshot(ctx.pod_name)
595+
596+
duration_ms = int((time.perf_counter() - ctx.start) * 1000)
597+
yield StreamResult(
598+
exit_code=exit_code if not timed_out else None,
599+
timed_out=timed_out,
600+
duration_ms=duration_ms,
601+
files=workspace_snapshot,
602+
)
603+
555604
def _validate_relative_path(self, path_str: str) -> Path:
556605
path = Path(path_str)
557606
if path.is_absolute():
@@ -569,3 +618,56 @@ def _validate_relative_path(self, path_str: str) -> Path:
569618
raise ValueError("File path must not be empty.")
570619

571620
return Path(*sanitized_parts)
621+
622+
623+
def _stream_kube_output(
624+
exec_resp: ws_client.WSClient,
625+
deadline: float,
626+
max_output_bytes: int,
627+
) -> Generator[StreamChunk, None, tuple[int | None, bool]]:
628+
"""Read stdout/stderr from a Kubernetes exec stream and yield StreamChunk events.
629+
630+
Returns a (exit_code, timed_out) tuple.
631+
"""
632+
stdout_bytes = 0
633+
stderr_bytes = 0
634+
exit_code: int | None = None
635+
timed_out = False
636+
637+
while exec_resp.is_open():
638+
remaining = deadline - time.time()
639+
if remaining <= 0:
640+
timed_out = True
641+
break
642+
643+
exec_resp.update(timeout=min(remaining, 1))
644+
645+
if exec_resp.peek_stdout():
646+
text: str = exec_resp.read_stdout()
647+
raw = text.encode("utf-8")
648+
if stdout_bytes < max_output_bytes:
649+
allowed = max_output_bytes - stdout_bytes
650+
if len(raw) > allowed:
651+
text = raw[:allowed].decode("utf-8", errors="ignore")
652+
if text:
653+
yield StreamChunk(stream="stdout", data=text)
654+
stdout_bytes += len(raw)
655+
656+
if exec_resp.peek_stderr():
657+
text = exec_resp.read_stderr()
658+
raw = text.encode("utf-8")
659+
if stderr_bytes < max_output_bytes:
660+
allowed = max_output_bytes - stderr_bytes
661+
if len(raw) > allowed:
662+
text = raw[:allowed].decode("utf-8", errors="ignore")
663+
if text:
664+
yield StreamChunk(stream="stderr", data=text)
665+
stderr_bytes += len(raw)
666+
667+
error: str = exec_resp.read_channel(ws_client.ERROR_CHANNEL)
668+
if error:
669+
exit_code = _parse_exit_code(error)
670+
break
671+
672+
exec_resp.close()
673+
return exit_code, timed_out

0 commit comments

Comments
 (0)