Skip to content

Commit 8b5d58e

Browse files
committed
add stuff
1 parent ba32083 commit 8b5d58e

2 files changed

Lines changed: 347 additions & 1 deletion

File tree

code-interpreter/app/services/executor_kubernetes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ def execute_python(
554554
duration_ms=duration_ms,
555555
files=workspace_snapshot,
556556
)
557-
557+
558558
def execute_python_streaming(
559559
self,
560560
*,
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
"""Tests for KubernetesExecutor.execute_python_streaming.
2+
3+
Unit tests that mock the Kubernetes API to exercise the streaming
4+
execution path without requiring a real cluster.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import base64
10+
import io
11+
import tarfile
12+
from typing import Any
13+
from unittest.mock import MagicMock, patch
14+
15+
import pytest
16+
17+
from app.services.executor_base import StreamChunk, StreamEvent, StreamResult
18+
from app.services.executor_kubernetes import KubernetesExecutor
19+
20+
# ---------------------------------------------------------------------------
21+
# Fixtures & helpers
22+
# ---------------------------------------------------------------------------
23+
24+
25+
@pytest.fixture()
26+
def executor() -> KubernetesExecutor:
27+
"""Create a KubernetesExecutor bypassing __init__ (no cluster needed)."""
28+
inst = KubernetesExecutor.__new__(KubernetesExecutor)
29+
inst.v1 = MagicMock()
30+
inst.namespace = "test"
31+
inst.image = "test:latest"
32+
inst.service_account = ""
33+
pod_mock = MagicMock()
34+
pod_mock.status.phase = "Running"
35+
inst.v1.read_namespaced_pod.return_value = pod_mock
36+
return inst
37+
38+
39+
class FakeExecResp:
40+
"""Simulates a Kubernetes WebSocket exec stream.
41+
42+
Delivers *stdout_chunks* and *stderr_chunks* one per loop iteration.
43+
Once both queues are drained, the *exit_status* is returned on the
44+
error channel, causing the reader to break out of the loop.
45+
"""
46+
47+
def __init__(
48+
self,
49+
stdout_chunks: list[str] | None = None,
50+
stderr_chunks: list[str] | None = None,
51+
exit_status: str = "{'status': 'Success'}",
52+
) -> None:
53+
self._stdout = list(stdout_chunks or [])
54+
self._stderr = list(stderr_chunks or [])
55+
self._exit_status = exit_status
56+
self._closed = False
57+
self._exit_delivered = False
58+
self.stdin_writes: list[Any] = []
59+
60+
def is_open(self) -> bool:
61+
if self._closed:
62+
return False
63+
return bool(self._stdout or self._stderr or not self._exit_delivered)
64+
65+
def update(self, timeout: float = 1) -> None: # noqa: ARG002
66+
pass
67+
68+
def peek_stdout(self) -> bool:
69+
return bool(self._stdout)
70+
71+
def read_stdout(self) -> str:
72+
return self._stdout.pop(0)
73+
74+
def peek_stderr(self) -> bool:
75+
return bool(self._stderr)
76+
77+
def read_stderr(self) -> str:
78+
return self._stderr.pop(0)
79+
80+
def read_channel(self, channel: int) -> str: # noqa: ARG002
81+
if not self._stdout and not self._stderr and not self._exit_delivered:
82+
self._exit_delivered = True
83+
return self._exit_status
84+
return ""
85+
86+
def write_stdin(self, data: str | bytes) -> None:
87+
self.stdin_writes.append(data)
88+
89+
def close(self) -> None:
90+
self._closed = True
91+
92+
93+
def _make_tar_mock() -> MagicMock:
94+
"""Mock for the tar-upload exec stream (succeeds immediately)."""
95+
resp = MagicMock()
96+
resp.is_open.side_effect = [True, False]
97+
resp.peek_stdout.return_value = False
98+
resp.peek_stderr.return_value = False
99+
resp.read_channel.return_value = "{'status': 'Success'}"
100+
return resp
101+
102+
103+
def _make_snapshot_mock(files: dict[str, bytes] | None = None) -> MagicMock:
104+
"""Mock for workspace-snapshot exec stream."""
105+
resp = MagicMock()
106+
if not files:
107+
resp.is_open.side_effect = [False]
108+
resp.peek_stdout.return_value = False
109+
resp.peek_stderr.return_value = False
110+
return resp
111+
112+
buf = io.BytesIO()
113+
with tarfile.open(fileobj=buf, mode="w") as tar:
114+
for name, content in files.items():
115+
info = tarfile.TarInfo(name=name)
116+
info.size = len(content)
117+
tar.addfile(info, io.BytesIO(content))
118+
b64 = base64.b64encode(buf.getvalue()).decode("ascii")
119+
120+
resp.is_open.side_effect = [True, False]
121+
resp.peek_stdout.side_effect = [True, False]
122+
resp.read_stdout.return_value = b64
123+
resp.peek_stderr.return_value = False
124+
return resp
125+
126+
127+
def _run_streaming(
128+
executor: KubernetesExecutor,
129+
exec_resp: FakeExecResp,
130+
*,
131+
extra_stream_mocks: list[Any] | None = None,
132+
snapshot_files: dict[str, bytes] | None = None,
133+
**kwargs: object,
134+
) -> list[StreamEvent]:
135+
"""Run execute_python_streaming with mocked Kubernetes streams."""
136+
mocks: list[Any] = [_make_tar_mock(), exec_resp]
137+
if extra_stream_mocks:
138+
mocks.extend(extra_stream_mocks)
139+
mocks.append(_make_snapshot_mock(snapshot_files))
140+
141+
defaults: dict[str, Any] = {
142+
"code": "print('hello')",
143+
"stdin": None,
144+
"timeout_ms": 5000,
145+
"max_output_bytes": 65536,
146+
}
147+
defaults.update(kwargs)
148+
149+
with patch("app.services.executor_kubernetes.stream.stream") as mock_stream:
150+
mock_stream.side_effect = mocks
151+
return list(executor.execute_python_streaming(**defaults))
152+
153+
154+
def _chunks(events: list[StreamEvent]) -> list[StreamChunk]:
155+
return [e for e in events if isinstance(e, StreamChunk)]
156+
157+
158+
def _result(events: list[StreamEvent]) -> StreamResult:
159+
results = [e for e in events if isinstance(e, StreamResult)]
160+
assert len(results) == 1, f"Expected exactly 1 StreamResult, got {len(results)}"
161+
return results[0]
162+
163+
164+
# ---------------------------------------------------------------------------
165+
# Tests
166+
# ---------------------------------------------------------------------------
167+
168+
169+
def test_streaming_yields_stdout_chunks(executor: KubernetesExecutor) -> None:
170+
events = _run_streaming(executor, FakeExecResp(stdout_chunks=["hello\n"]))
171+
172+
chunks = _chunks(events)
173+
assert len(chunks) == 1
174+
assert chunks[0] == StreamChunk(stream="stdout", data="hello\n")
175+
176+
result = _result(events)
177+
assert result.exit_code == 0
178+
assert result.timed_out is False
179+
assert result.duration_ms >= 0
180+
181+
182+
def test_streaming_yields_stderr_chunks(executor: KubernetesExecutor) -> None:
183+
events = _run_streaming(executor, FakeExecResp(stderr_chunks=["oops\n"]))
184+
185+
chunks = _chunks(events)
186+
assert len(chunks) == 1
187+
assert chunks[0] == StreamChunk(stream="stderr", data="oops\n")
188+
189+
190+
def test_streaming_multiple_stdout_chunks(executor: KubernetesExecutor) -> None:
191+
events = _run_streaming(executor, FakeExecResp(stdout_chunks=["line1\n", "line2\n"]))
192+
193+
chunks = _chunks(events)
194+
assert len(chunks) == 2
195+
assert chunks[0].data == "line1\n"
196+
assert chunks[1].data == "line2\n"
197+
assert all(c.stream == "stdout" for c in chunks)
198+
199+
200+
def test_streaming_mixed_stdout_and_stderr(executor: KubernetesExecutor) -> None:
201+
events = _run_streaming(
202+
executor, FakeExecResp(stdout_chunks=["out\n"], stderr_chunks=["err\n"])
203+
)
204+
205+
chunks = _chunks(events)
206+
stdout = [c for c in chunks if c.stream == "stdout"]
207+
stderr = [c for c in chunks if c.stream == "stderr"]
208+
209+
assert len(stdout) == 1
210+
assert stdout[0].data == "out\n"
211+
assert len(stderr) == 1
212+
assert stderr[0].data == "err\n"
213+
214+
215+
def test_streaming_nonzero_exit_code(executor: KubernetesExecutor) -> None:
216+
events = _run_streaming(
217+
executor,
218+
FakeExecResp(
219+
stderr_chunks=["error!\n"],
220+
exit_status="{'status': 'Failure', 'details': {'exitCode': 1}}",
221+
),
222+
)
223+
224+
result = _result(events)
225+
assert result.exit_code == 1
226+
assert result.timed_out is False
227+
228+
229+
def test_streaming_timeout(executor: KubernetesExecutor) -> None:
230+
"""timeout_ms=0 guarantees immediate timeout."""
231+
events = _run_streaming(
232+
executor,
233+
FakeExecResp(),
234+
extra_stream_mocks=[MagicMock()], # _kill_python_process
235+
timeout_ms=0,
236+
)
237+
238+
assert _chunks(events) == []
239+
result = _result(events)
240+
assert result.exit_code is None
241+
assert result.timed_out is True
242+
243+
244+
def test_streaming_timeout_calls_kill(executor: KubernetesExecutor) -> None:
245+
"""Verify _kill_python_process is invoked on timeout."""
246+
exec_resp = FakeExecResp()
247+
248+
with patch("app.services.executor_kubernetes.stream.stream") as mock_stream:
249+
mock_stream.side_effect = [
250+
_make_tar_mock(),
251+
exec_resp,
252+
MagicMock(), # kill
253+
_make_snapshot_mock(),
254+
]
255+
list(
256+
executor.execute_python_streaming(
257+
code="import time; time.sleep(999)",
258+
stdin=None,
259+
timeout_ms=0,
260+
max_output_bytes=65536,
261+
)
262+
)
263+
264+
kill_calls = [
265+
c
266+
for c in mock_stream.call_args_list
267+
if c.kwargs.get("command") == ["pkill", "-9", "python"]
268+
]
269+
assert len(kill_calls) == 1
270+
271+
272+
def test_streaming_truncates_stdout(executor: KubernetesExecutor) -> None:
273+
"""A single chunk exceeding the byte budget is truncated."""
274+
events = _run_streaming(
275+
executor,
276+
FakeExecResp(stdout_chunks=["hello world"]),
277+
max_output_bytes=5,
278+
)
279+
280+
chunks = _chunks(events)
281+
assert len(chunks) == 1
282+
assert chunks[0].data == "hello"
283+
assert chunks[0].stream == "stdout"
284+
285+
286+
def test_streaming_suppresses_chunks_past_limit(
287+
executor: KubernetesExecutor,
288+
) -> None:
289+
"""Once the byte budget is exhausted, further chunks are not yielded."""
290+
events = _run_streaming(
291+
executor,
292+
FakeExecResp(stdout_chunks=["aaa", "bbb"]),
293+
max_output_bytes=3,
294+
)
295+
296+
chunks = _chunks(events)
297+
assert len(chunks) == 1
298+
assert chunks[0].data == "aaa"
299+
300+
301+
def test_streaming_forwards_stdin(executor: KubernetesExecutor) -> None:
302+
exec_resp = FakeExecResp(stdout_chunks=["echoed\n"])
303+
_run_streaming(executor, exec_resp, stdin="input data")
304+
305+
assert exec_resp.stdin_writes == ["input data"]
306+
307+
308+
def test_streaming_includes_workspace_files(
309+
executor: KubernetesExecutor,
310+
) -> None:
311+
events = _run_streaming(
312+
executor,
313+
FakeExecResp(),
314+
snapshot_files={"output.txt": b"file content"},
315+
)
316+
317+
result = _result(events)
318+
assert len(result.files) == 1
319+
assert result.files[0].path == "output.txt"
320+
assert result.files[0].content == b"file content"
321+
322+
323+
def test_streaming_cleans_up_pod(executor: KubernetesExecutor) -> None:
324+
"""Pod is deleted via _cleanup_pod regardless of outcome."""
325+
_run_streaming(executor, FakeExecResp())
326+
327+
executor.v1.delete_namespaced_pod.assert_called_once()
328+
329+
330+
def test_streaming_empty_output(executor: KubernetesExecutor) -> None:
331+
events = _run_streaming(executor, FakeExecResp())
332+
333+
assert _chunks(events) == []
334+
result = _result(events)
335+
assert result.exit_code == 0
336+
assert result.timed_out is False
337+
338+
339+
def test_streaming_always_ends_with_result(
340+
executor: KubernetesExecutor,
341+
) -> None:
342+
"""The last event yielded must always be a StreamResult."""
343+
events = _run_streaming(executor, FakeExecResp(stdout_chunks=["data\n"]))
344+
345+
assert len(events) >= 1
346+
assert isinstance(events[-1], StreamResult)

0 commit comments

Comments
 (0)