From 68d5f6c76f90f527c8fa2eea317c71f19b3569c6 Mon Sep 17 00:00:00 2001 From: RC-CHN <1051989940@qq.com> Date: Thu, 22 Jan 2026 14:58:55 +0800 Subject: [PATCH 1/7] test(bay): refactor tests to use container isolation and pytest structure Reorganize Bay test suite with proper pytest conventions: - Add `fresh_ship()` context manager for automatic container cleanup - Convert test_bay_api.py to use isolated containers per test - Create pytest-based e2e tests in tests/e2e/test_bay_api.py - Add unit tests for memory and disk utilities in tests/unit/ - Add test scripts for Docker, Podman (host/container modes) - Add Kubernetes test deployment configs and test script - Organize tests into unit, e2e, and integration categories - Add conftest.py with common fixtures and pytest markers --- pkgs/bay/test_bay_api.py | 890 ------------------ pkgs/bay/tests/README.md | 98 ++ pkgs/bay/tests/conftest.py | 71 +- pkgs/bay/tests/e2e/__init__.py | 1 + pkgs/bay/tests/e2e/test_bay_api.py | 486 ++++++++++ .../bay/{ => tests}/k8s/k8s-deploy-local.yaml | 2 +- pkgs/bay/{ => tests}/k8s/k8s-deploy.yaml | 2 +- pkgs/bay/tests/k8s/storageclass-retain.yaml | 10 + .../tests/scripts/test_docker_container.sh | 46 + pkgs/bay/tests/scripts/test_docker_host.sh | 54 ++ .../{k8s => tests/scripts}/test_kubernetes.sh | 20 +- .../tests/scripts/test_podman_container.sh | 73 ++ pkgs/bay/tests/scripts/test_podman_host.sh | 58 ++ pkgs/bay/tests/unit/__init__.py | 1 + pkgs/bay/tests/unit/test_utils.py | 180 ++++ plans/test-container-isolation.md | 271 ++++++ 16 files changed, 1360 insertions(+), 903 deletions(-) delete mode 100644 pkgs/bay/test_bay_api.py create mode 100644 pkgs/bay/tests/README.md create mode 100644 pkgs/bay/tests/e2e/__init__.py create mode 100644 pkgs/bay/tests/e2e/test_bay_api.py rename pkgs/bay/{ => tests}/k8s/k8s-deploy-local.yaml (98%) rename pkgs/bay/{ => tests}/k8s/k8s-deploy.yaml (98%) create mode 100644 pkgs/bay/tests/k8s/storageclass-retain.yaml create mode 100755 pkgs/bay/tests/scripts/test_docker_container.sh create mode 100755 pkgs/bay/tests/scripts/test_docker_host.sh rename pkgs/bay/{k8s => tests/scripts}/test_kubernetes.sh (93%) create mode 100755 pkgs/bay/tests/scripts/test_podman_container.sh create mode 100755 pkgs/bay/tests/scripts/test_podman_host.sh create mode 100644 pkgs/bay/tests/unit/__init__.py create mode 100644 pkgs/bay/tests/unit/test_utils.py create mode 100644 plans/test-container-isolation.md diff --git a/pkgs/bay/test_bay_api.py b/pkgs/bay/test_bay_api.py deleted file mode 100644 index 0b62725..0000000 --- a/pkgs/bay/test_bay_api.py +++ /dev/null @@ -1,890 +0,0 @@ -#!/usr/bin/env python3 -""" -Bay API 功能测试脚本(更全面) - -使用方法: - # 确保 Bay 服务正在运行 - uv run python test_bay_api.py -""" - -from __future__ import annotations - -import io -import json -import os -import sys -import time -import uuid -from typing import Any, Dict, Optional - -import requests - -# 配置 - 支持从环境变量覆盖 -BAY_URL = os.getenv("BAY_URL", "http://localhost:8156") -ACCESS_TOKEN = os.getenv("BAY_ACCESS_TOKEN", "secret-token") -SESSION_ID = str(uuid.uuid4()) - -AUTH_HEADERS = { - "Authorization": f"Bearer {ACCESS_TOKEN}", - "X-SESSION-ID": SESSION_ID, -} - - -def print_section(title: str) -> None: - print("\n" + "=" * 70) - print(title) - print("=" * 70) - - -def request_json(resp: requests.Response) -> Dict[str, Any] | list[Any] | str: - try: - return resp.json() - except Exception: - return resp.text - - -def check_status( - resp: requests.Response, - expected: int | list[int], - success_msg: str, - fail_msg: str, -) -> bool: - if isinstance(expected, int): - expected_list = [expected] - else: - expected_list = expected - - ok = resp.status_code in expected_list - print(f"状态码: {resp.status_code}") - print(f"响应: {request_json(resp)}") - if ok: - print(f"✅ {success_msg}") - else: - print(f"❌ {fail_msg}") - return ok - - -def test_memory_utils() -> bool: - print_section("测试 0: 内存工具函数单元测试") - try: - # 导入工具函数 - # 注意:这里假设脚本在 pkgs/bay 目录下运行,或者 PYTHONPATH 设置正确 - sys.path.append(os.getcwd()) - from app.drivers.core.utils import ( - parse_memory_string, - parse_and_enforce_minimum_memory, - ) - from app.drivers.kubernetes.utils import normalize_memory_for_k8s - - # 1. 测试 parse_memory_string - print("1. 测试 parse_memory_string...") - assert parse_memory_string("512Mi") == 536870912 - assert parse_memory_string("512m") == 536870912 - assert parse_memory_string("1Gi") == 1073741824 - assert parse_memory_string("1g") == 1073741824 - print("✅ parse_memory_string 通过") - - # 2. 测试 parse_and_enforce_minimum_memory - print("2. 测试 parse_and_enforce_minimum_memory...") - # 64Mi < 128Mi,应该被修正 - assert parse_and_enforce_minimum_memory("64Mi") == 134217728 - # 512Mi > 128Mi,应该保持不变 - assert parse_and_enforce_minimum_memory("512Mi") == 536870912 - print("✅ parse_and_enforce_minimum_memory 通过") - - # 3. 测试 normalize_memory_for_k8s - print("3. 测试 normalize_memory_for_k8s...") - - # 3.1 Docker 风格单位(kb/mb/gb 和 k/m/g,大小写混合) - # 注意:值必须 >= 128 MiB 才不会触发最小值限制 - # KB -> Ki (需要足够大的值,131072KB = 128Mi) - assert normalize_memory_for_k8s("256000KB") == "256000Ki" - assert normalize_memory_for_k8s("256000kb") == "256000Ki" - # MB/mb -> Mi (256MB > 128Mi) - assert normalize_memory_for_k8s("256MB") == "256Mi" - assert normalize_memory_for_k8s("256mb") == "256Mi" - # GB/gb -> Gi - assert normalize_memory_for_k8s("2GB") == "2Gi" - assert normalize_memory_for_k8s("2gb") == "2Gi" - # 简写 k/m/g -> Ki/Mi/Gi - assert normalize_memory_for_k8s("256000K") == "256000Ki" - assert normalize_memory_for_k8s("256000k") == "256000Ki" - assert normalize_memory_for_k8s("256M") == "256Mi" - assert normalize_memory_for_k8s("256m") == "256Mi" - assert normalize_memory_for_k8s("2G") == "2Gi" - assert normalize_memory_for_k8s("2g") == "2Gi" - - # 3.2 已经是 K8s 单位,保持原样(大小写敏感) - # 256Mi > 128Mi,不会触发最小值限制 - assert normalize_memory_for_k8s("256Mi") == "256Mi" - assert normalize_memory_for_k8s("256MI") == "256MI" - assert normalize_memory_for_k8s("1Gi") == "1Gi" - assert normalize_memory_for_k8s("1GI") == "1GI" - - # 3.3 纯字节数字(无单位)应原样透传(>= 128Mi = 134217728 字节) - assert normalize_memory_for_k8s("134217728") == "134217728" - assert normalize_memory_for_k8s("536870912") == "536870912" - - # 3.4 小的值,转换后低于最小值时应提升到最小内存(128Mi -> 134217728 字节) - # 64Mi < 128Mi,应该被提升 - assert normalize_memory_for_k8s("64Mi") == "134217728" - # 64m(Docker 的 MB 单位)在转换后也应被提升到最小值 - assert normalize_memory_for_k8s("64m") == "134217728" - # 1k(1024 字节)也应被提升 - assert normalize_memory_for_k8s("1k") == "134217728" - # 小的 Ki 值也应被提升 - assert normalize_memory_for_k8s("512Ki") == "134217728" - - # 3.5 空字符串处理 - assert normalize_memory_for_k8s("") == "" - - print("✅ normalize_memory_for_k8s 通过") - - return True - except ImportError: - print("⚠️ 无法导入应用模块,跳过单元测试(可能不在项目根目录下运行)") - return True - except AssertionError as e: - print(f"❌ 断言失败: {e}") - return False - except Exception as e: - print(f"❌ 测试出错: {e}") - return False - - -def test_disk_utils() -> bool: - print_section("测试 0.5: 磁盘工具函数单元测试") - try: - sys.path.append(os.getcwd()) - from app.config import settings - from app.models import ShipSpec - from app.drivers.core.utils import ( - parse_disk_string, - parse_and_enforce_minimum_disk, - MIN_DISK_BYTES, - ) - from app.drivers.kubernetes.utils import normalize_disk_for_k8s - - # 1. 测试配置中的默认值 - print("1. 测试默认磁盘配置...") - print(f" default_ship_disk: {settings.default_ship_disk}") - assert settings.default_ship_disk == "1Gi", f"Expected '1Gi', got '{settings.default_ship_disk}'" - print("✅ 默认磁盘配置正确") - - # 2. 测试 parse_disk_string - print("2. 测试 parse_disk_string...") - # K8s 风格单位 - assert parse_disk_string("1Gi") == 1073741824 - assert parse_disk_string("512Mi") == 536870912 - # Docker 风格单位 - assert parse_disk_string("1g") == 1073741824 - assert parse_disk_string("512m") == 536870912 - assert parse_disk_string("10G") == 10737418240 - # 纯字节 - assert parse_disk_string("1073741824") == 1073741824 - print("✅ parse_disk_string 通过") - - # 3. 测试 parse_and_enforce_minimum_disk - print("3. 测试 parse_and_enforce_minimum_disk...") - # 50Mi < 100Mi,应该被修正到 100Mi - assert parse_and_enforce_minimum_disk("50Mi") == MIN_DISK_BYTES - # 1Gi > 100Mi,应该保持不变 - assert parse_and_enforce_minimum_disk("1Gi") == 1073741824 - print("✅ parse_and_enforce_minimum_disk 通过") - - # 4. 测试 normalize_disk_for_k8s - print("4. 测试 normalize_disk_for_k8s...") - - # 4.1 Docker 风格单位转换为 K8s 风格 - assert normalize_disk_for_k8s("1g") == "1Gi" - assert normalize_disk_for_k8s("10G") == "10Gi" - assert normalize_disk_for_k8s("512m") == "512Mi" - assert normalize_disk_for_k8s("1024M") == "1024Mi" - - # 4.2 已经是 K8s 单位,保持原样 - assert normalize_disk_for_k8s("1Gi") == "1Gi" - assert normalize_disk_for_k8s("512Mi") == "512Mi" - - # 4.3 纯字节数字应原样透传(>= 100Mi) - assert normalize_disk_for_k8s("1073741824") == "1073741824" - - # 4.4 小的值应被提升到最小值(100Mi = 104857600 字节) - assert normalize_disk_for_k8s("50Mi") == str(MIN_DISK_BYTES) - assert normalize_disk_for_k8s("50m") == str(MIN_DISK_BYTES) - - # 4.5 空字符串处理 - assert normalize_disk_for_k8s("") == "" - - print("✅ normalize_disk_for_k8s 通过") - - # 5. 测试 ShipSpec 包含 disk 字段 - print("5. 测试 ShipSpec disk 字段...") - spec = ShipSpec(cpus=2.0, memory="512m", disk="5Gi") - assert spec.disk == "5Gi" - # 测试 disk 字段可选 - spec_no_disk = ShipSpec(cpus=1.0, memory="256m") - assert spec_no_disk.disk is None - print("✅ ShipSpec disk 字段通过") - - return True - except ImportError as e: - print(f"⚠️ 无法导入应用模块,跳过磁盘工具单元测试: {e}") - return True - except AssertionError as e: - print(f"❌ 断言失败: {e}") - return False - except Exception as e: - print(f"❌ 测试出错: {e}") - import traceback - traceback.print_exc() - return False - - -def test_health() -> bool: - print_section("测试 1: /health 健康检查") - try: - resp = requests.get(f"{BAY_URL}/health", timeout=5) - return check_status(resp, 200, "健康检查通过", "健康检查失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_root() -> bool: - print_section("测试 2: / 根路由") - try: - resp = requests.get(f"{BAY_URL}/", timeout=5) - return check_status(resp, 200, "根路由可用", "根路由失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_stat() -> bool: - print_section("测试 3: /stat 版本信息") - try: - resp = requests.get(f"{BAY_URL}/stat", timeout=5) - return check_status(resp, 200, "统计信息可用", "统计信息失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_auth_required() -> bool: - print_section("测试 4: 认证校验") - try: - resp = requests.get(f"{BAY_URL}/ships", timeout=5) - ok = check_status( - resp, - [401, 403], - "未授权访问被拒绝", - "未授权访问未被拒绝", - ) - if not ok: - return False - if resp.status_code == 403: - detail = "" - try: - payload = resp.json() - detail = payload.get("detail", "") if isinstance(payload, dict) else "" - except Exception: - detail = "" - print(f"提示: 当前未携带 token 时返回 403, detail={detail!r}") - return True - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_list_ships() -> bool: - print_section("测试 5: /ships 列出 Ships") - try: - resp = requests.get(f"{BAY_URL}/ships", headers=AUTH_HEADERS, timeout=5) - return check_status(resp, 200, "列出 ships 成功", "列出 ships 失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_create_ship_invalid_payload() -> bool: - print_section("测试 6: /ship 创建 Ship(非法参数)") - try: - payload = {"ttl": 0, "max_session_num": 0} - resp = requests.post( - f"{BAY_URL}/ship", - headers={**AUTH_HEADERS, "Content-Type": "application/json"}, - json=payload, - timeout=10, - ) - return check_status(resp, 422, "非法参数被拒绝", "非法参数未被拒绝") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_create_ship_with_small_memory() -> bool: - print_section("测试 6.5: /ship 创建 Ship(极小内存自动修正)") - try: - # 尝试使用 1m 内存 (1 MiB),如果没有修正,容器必死无疑 - payload = { - "ttl": 60, - "max_session_num": 1, - "spec": {"cpus": 0.1, "memory": "1m"}, - } - print(f"请求载荷: {payload}") - resp = requests.post( - f"{BAY_URL}/ship", - headers={**AUTH_HEADERS, "Content-Type": "application/json"}, - json=payload, - timeout=120, - ) - - if not check_status( - resp, - 201, - "极小内存 Ship 创建成功(说明已被修正)", - "极小内存 Ship 创建失败", - ): - return False - - data = resp.json() - ship_id = data.get("id") - print(f"Ship ID: {ship_id}") - - # 清理 - print(f"清理 Ship {ship_id}...") - requests.delete(f"{BAY_URL}/ship/{ship_id}", headers=AUTH_HEADERS, timeout=30) - - return True - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_create_ship_with_disk() -> bool: - print_section("测试 6.6: /ship 创建 Ship(带磁盘限制)") - try: - # 创建带有 disk 参数的 Ship - payload = { - "ttl": 60, - "max_session_num": 1, - "spec": {"cpus": 0.5, "memory": "256m", "disk": "2Gi"}, - } - print(f"请求载荷: {payload}") - resp = requests.post( - f"{BAY_URL}/ship", - headers={**AUTH_HEADERS, "Content-Type": "application/json"}, - json=payload, - timeout=120, - ) - - if not check_status( - resp, - 201, - "带磁盘限制的 Ship 创建成功", - "带磁盘限制的 Ship 创建失败", - ): - return False - - data = resp.json() - ship_id = data.get("id") - print(f"Ship ID: {ship_id}") - - # 清理 - print(f"清理 Ship {ship_id}...") - requests.delete(f"{BAY_URL}/ship/{ship_id}", headers=AUTH_HEADERS, timeout=30) - - return True - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_create_ship() -> Optional[str]: - print_section("测试 7: /ship 创建 Ship") - try: - payload = { - "ttl": 300, - "max_session_num": 2, - "spec": {"cpus": 0.5, "memory": "512m"}, - } - print(f"请求载荷: {payload}") - resp = requests.post( - f"{BAY_URL}/ship", - headers={**AUTH_HEADERS, "Content-Type": "application/json"}, - json=payload, - timeout=120, - ) - if not check_status(resp, 201, "Ship 创建成功", "Ship 创建失败"): - return None - data = resp.json() - ship_id = data.get("id") - print(f"Ship ID: {ship_id}") - return ship_id - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return None - - -def test_get_ship_not_found() -> bool: - print_section("测试 8: /ship/{id} 获取不存在 Ship") - try: - resp = requests.get( - f"{BAY_URL}/ship/not-exists-id", - headers=AUTH_HEADERS, - timeout=5, - ) - return check_status(resp, 404, "不存在 ship 返回 404", "不存在 ship 未返回 404") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_get_ship(ship_id: str) -> bool: - print_section(f"测试 9: /ship/{ship_id} 获取 Ship 信息") - try: - resp = requests.get( - f"{BAY_URL}/ship/{ship_id}", headers=AUTH_HEADERS, timeout=5 - ) - return check_status(resp, 200, "获取 ship 信息成功", "获取 ship 信息失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_exec_shell(ship_id: str) -> bool: - print_section(f"测试 10: /ship/{ship_id}/exec 执行 Shell") - try: - payload = {"type": "shell/exec", "payload": {"command": "echo Bay"}} - resp = requests.post( - f"{BAY_URL}/ship/{ship_id}/exec", - headers={**AUTH_HEADERS, "Content-Type": "application/json"}, - json=payload, - timeout=30, - ) - return check_status(resp, 200, "Shell 命令执行成功", "Shell 命令执行失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_exec_invalid_type(ship_id: str) -> bool: - print_section(f"测试 11: /ship/{ship_id}/exec 非法操作类型") - try: - payload = {"type": "unknown/exec", "payload": {"foo": "bar"}} - resp = requests.post( - f"{BAY_URL}/ship/{ship_id}/exec", - headers={**AUTH_HEADERS, "Content-Type": "application/json"}, - json=payload, - timeout=30, - ) - return check_status(resp, 400, "非法操作被拒绝", "非法操作未被拒绝") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_extend_ttl(ship_id: str) -> bool: - print_section(f"测试 12: /ship/{ship_id}/extend-ttl") - try: - payload = {"ttl": 600} - resp = requests.post( - f"{BAY_URL}/ship/{ship_id}/extend-ttl", - headers={**AUTH_HEADERS, "Content-Type": "application/json"}, - json=payload, - timeout=10, - ) - return check_status(resp, 200, "TTL 扩展成功", "TTL 扩展失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_get_logs(ship_id: str) -> bool: - print_section(f"测试 13: /ship/logs/{ship_id} 获取日志") - try: - resp = requests.get( - f"{BAY_URL}/ship/logs/{ship_id}", headers=AUTH_HEADERS, timeout=10 - ) - return check_status(resp, 200, "获取日志成功", "获取日志失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_upload_download(ship_id: str) -> bool: - print_section(f"测试 14: /ship/{ship_id}/upload + download") - try: - content = b"hello from bay upload" - file_obj = io.BytesIO(content) - files = {"file": ("hello.txt", file_obj, "text/plain")} - session_prefix = SESSION_ID.split("-")[0] - workspace_path = f"/home/ship_{session_prefix}/workspace/hello.txt" - data = {"file_path": workspace_path} - upload_resp = requests.post( - f"{BAY_URL}/ship/{ship_id}/upload", - headers=AUTH_HEADERS, - files=files, - data=data, - timeout=30, - ) - if not check_status(upload_resp, 200, "上传成功", "上传失败"): - return False - - download_resp = requests.get( - f"{BAY_URL}/ship/{ship_id}/download", - headers=AUTH_HEADERS, - params={"file_path": workspace_path}, - timeout=30, - ) - if download_resp.status_code != 200: - print(f"状态码: {download_resp.status_code}") - print(f"响应: {request_json(download_resp)}") - print("❌ 下载失败") - return False - - if download_resp.content != content: - print("❌ 下载内容不匹配") - return False - - print("✅ 下载成功且内容匹配") - return True - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_data_persistence() -> bool: - """ - 测试数据持久化:验证容器停止后重新启动时数据是否保留。 - - 测试流程: - 1. 创建 Ship - 2. 通过文件系统操作写入测试文件 - 3. 删除 Ship(停止容器,但保留数据目录/PVC) - 4. 使用相同 Session ID 重新创建 Ship(应该恢复之前的 Ship) - 5. 验证测试文件仍然存在且内容正确 - """ - print_section("测试 17: 数据持久化(容器重启后数据保留)") - - # 使用一个固定的 Session ID 以便测试恢复功能 - persistence_session_id = f"persistence-test-{uuid.uuid4().hex[:8]}" - persistence_headers = { - "Authorization": f"Bearer {ACCESS_TOKEN}", - "X-SESSION-ID": persistence_session_id, - } - - test_filename = "persistence_test.txt" - test_content = f"Persistence test content - {uuid.uuid4().hex}" - ship_id = None - - try: - # Step 1: 创建 Ship - print("\n步骤 1: 创建 Ship...") - payload = { - "ttl": 120, - "max_session_num": 1, - "spec": {"cpus": 0.5, "memory": "256m"}, - } - resp = requests.post( - f"{BAY_URL}/ship", - headers={**persistence_headers, "Content-Type": "application/json"}, - json=payload, - timeout=120, - ) - if resp.status_code != 201: - print(f"❌ 创建 Ship 失败: {resp.status_code} - {request_json(resp)}") - return False - - data = resp.json() - ship_id = data.get("id") - print(f"✅ Ship 创建成功: {ship_id}") - - # 等待 Ship 完全就绪 - print("等待 Ship 完全就绪...") - time.sleep(3) - - # Step 2: 写入测试文件 - print(f"\n步骤 2: 写入测试文件 {test_filename}...") - exec_payload = { - "type": "fs/write_file", - "payload": { - "path": test_filename, - "content": test_content, - }, - } - resp = requests.post( - f"{BAY_URL}/ship/{ship_id}/exec", - headers={**persistence_headers, "Content-Type": "application/json"}, - json=exec_payload, - timeout=30, - ) - if resp.status_code != 200: - print(f"❌ 写入文件失败: {resp.status_code} - {request_json(resp)}") - return False - print(f"✅ 文件写入成功") - - # Step 3: 验证文件已写入 - print(f"\n步骤 3: 验证文件已写入...") - exec_payload = { - "type": "fs/read_file", - "payload": {"path": test_filename}, - } - resp = requests.post( - f"{BAY_URL}/ship/{ship_id}/exec", - headers={**persistence_headers, "Content-Type": "application/json"}, - json=exec_payload, - timeout=30, - ) - if resp.status_code != 200: - print(f"❌ 读取文件失败: {resp.status_code} - {request_json(resp)}") - return False - exec_result = resp.json() - # ExecResponse 格式: {"success": true, "data": {"content": "...", "path": "...", "size": ...}} - read_data = exec_result.get("data", {}) - actual_content = read_data.get("content") - if actual_content != test_content: - print(f"❌ 文件内容不匹配: 期望 {test_content!r}, 实际 {actual_content!r}") - print(f" 完整响应: {exec_result}") - return False - print(f"✅ 文件内容验证成功") - - # Step 4: 删除 Ship - print(f"\n步骤 4: 删除 Ship {ship_id}...") - resp = requests.delete( - f"{BAY_URL}/ship/{ship_id}", - headers=persistence_headers, - timeout=30, - ) - if resp.status_code != 204: - print(f"❌ 删除 Ship 失败: {resp.status_code} - {request_json(resp)}") - return False - print(f"✅ Ship 删除成功") - - # 等待容器完全停止 - print("等待容器停止...") - time.sleep(3) - - # 调试信息:检查 PVC 状态(仅在 Kubernetes 模式下有意义) - print("\n调试: 检查 PVC 状态...") - try: - import subprocess - # 查看 PVC - result = subprocess.run( - ["kubectl", "get", "pvc", "-n", "shipyard", "-o", "wide"], - capture_output=True, - text=True, - timeout=10, - ) - print(f"PVC 状态:\n{result.stdout}") - if result.stderr: - print(f"kubectl stderr: {result.stderr}") - - # 查看 PVC 详细信息 - pvc_name = f"ship-{ship_id}" - result = subprocess.run( - ["kubectl", "get", "pvc", pvc_name, "-n", "shipyard", "-o", "yaml"], - capture_output=True, - text=True, - timeout=10, - ) - if result.returncode == 0: - # 解析 StorageClass - import re - sc_match = re.search(r'storageClassName:\s*(\S+)', result.stdout) - if sc_match: - print(f"✅ PVC {pvc_name} 使用的 StorageClass: {sc_match.group(1)}") - else: - print(f"⚠️ 无法从 PVC 中找到 StorageClass") - else: - print(f"⚠️ 无法获取 PVC {pvc_name} 详情: {result.stderr}") - except FileNotFoundError: - print("⚠️ kubectl 不可用,跳过 PVC 检查(可能不是 Kubernetes 模式)") - except Exception as e: - print(f"⚠️ 检查 PVC 时出错: {e}") - - # Step 5: 使用相同 Session ID 重新创建 Ship - print(f"\n步骤 5: 使用相同 Session ID 重新创建 Ship...") - payload = { - "ttl": 120, - "max_session_num": 1, - "spec": {"cpus": 0.5, "memory": "256m"}, - } - resp = requests.post( - f"{BAY_URL}/ship", - headers={**persistence_headers, "Content-Type": "application/json"}, - json=payload, - timeout=120, - ) - if resp.status_code != 201: - print(f"❌ 重新创建 Ship 失败: {resp.status_code} - {request_json(resp)}") - return False - - data = resp.json() - new_ship_id = data.get("id") - print(f"✅ Ship 恢复成功: {new_ship_id}") - - # 验证是否是同一个 Ship 被恢复 - if new_ship_id == ship_id: - print(f"✅ 确认是同一个 Ship 被恢复 (ID: {ship_id})") - else: - print(f"⚠️ 创建了新的 Ship (新 ID: {new_ship_id}, 旧 ID: {ship_id})") - # 更新 ship_id 用于后续清理 - ship_id = new_ship_id - - # 等待 Ship 完全就绪 - print("等待 Ship 完全就绪...") - time.sleep(3) - - # Step 6: 验证文件仍然存在 - print(f"\n步骤 6: 验证文件 {test_filename} 仍然存在...") - exec_payload = { - "type": "fs/read_file", - "payload": {"path": test_filename}, - } - resp = requests.post( - f"{BAY_URL}/ship/{ship_id}/exec", - headers={**persistence_headers, "Content-Type": "application/json"}, - json=exec_payload, - timeout=30, - ) - if resp.status_code != 200: - print(f"❌ 读取文件失败: {resp.status_code} - {request_json(resp)}") - print("❌ 数据持久化测试失败:文件未被保留") - return False - - exec_result = resp.json() - # ExecResponse 格式: {"success": true, "data": {"content": "...", "path": "...", "size": ...}} - read_data = exec_result.get("data", {}) - actual_content = read_data.get("content") - if actual_content != test_content: - print(f"❌ 文件内容不匹配: 期望 {test_content!r}, 实际 {actual_content!r}") - print(f" 完整响应: {exec_result}") - print("❌ 数据持久化测试失败:文件内容不一致") - return False - - print(f"✅ 文件内容验证成功") - print("\n" + "=" * 50) - print("✅ 数据持久化测试通过!") - print("=" * 50) - return True - - except Exception as exc: - print(f"❌ 测试过程中出错: {exc}") - import traceback - traceback.print_exc() - return False - - finally: - # 清理:删除测试 Ship - if ship_id: - print(f"\n清理: 删除 Ship {ship_id}...") - try: - requests.delete( - f"{BAY_URL}/ship/{ship_id}", - headers=persistence_headers, - timeout=30, - ) - print("✅ 清理完成") - except Exception as e: - print(f"⚠️ 清理时出错: {e}") - - -def test_delete_ship(ship_id: str) -> bool: - print_section(f"测试 15: /ship/{ship_id} 删除 Ship") - try: - resp = requests.delete( - f"{BAY_URL}/ship/{ship_id}", headers=AUTH_HEADERS, timeout=30 - ) - return check_status(resp, 204, "Ship 删除成功", "Ship 删除失败") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def test_delete_ship_not_found(ship_id: str) -> bool: - print_section(f"测试 16: /ship/{ship_id} 删除不存在 Ship") - try: - resp = requests.delete( - f"{BAY_URL}/ship/{ship_id}", headers=AUTH_HEADERS, timeout=10 - ) - return check_status(resp, 404, "重复删除返回 404", "重复删除未返回 404") - except Exception as exc: - print(f"❌ 请求失败: {exc}") - return False - - -def main() -> None: - print("Bay API 功能测试(全面版)") - print("=" * 70) - print(f"服务地址: {BAY_URL}") - print(f"Session ID: {SESSION_ID}") - print() - - # 单元测试 - if not test_memory_utils(): - print("\n内存单元测试失败,退出测试") - sys.exit(1) - - if not test_disk_utils(): - print("\n磁盘单元测试失败,退出测试") - sys.exit(1) - - # 基础健康与信息 - if not test_health(): - print("\n服务未运行,退出测试") - sys.exit(1) - test_root() - test_stat() - - # 认证与列表 - test_auth_required() - test_list_ships() - - # 非法参数 - test_create_ship_invalid_payload() - - # 需要 Docker 和 ship 镜像 - print("\n" + "-" * 70) - print("以下测试需要 Docker 和 ship 镜像") - print("-" * 70) - - test_get_ship_not_found() - - # 测试小内存自动修正 - test_create_ship_with_small_memory() - - # 测试带磁盘限制的 Ship 创建 - test_create_ship_with_disk() - - ship_id = test_create_ship() - if not ship_id: - print("\n跳过需要 Ship 的测试(可能没有 ship 镜像)") - return - - # 等待 ship 就绪 - print("\n等待 2 秒让 ship 完全就绪...") - time.sleep(2) - - test_get_ship(ship_id) - test_exec_shell(ship_id) - test_exec_invalid_type(ship_id) - test_extend_ttl(ship_id) - test_get_logs(ship_id) - test_upload_download(ship_id) - test_delete_ship(ship_id) - test_delete_ship_not_found(ship_id) - - # 数据持久化测试(独立的 Ship 生命周期) - print("\n" + "-" * 70) - print("数据持久化测试") - print("-" * 70) - test_data_persistence() - - print("\n" + "=" * 70) - print("测试完成!") - print("=" * 70) - - -if __name__ == "__main__": - main() diff --git a/pkgs/bay/tests/README.md b/pkgs/bay/tests/README.md new file mode 100644 index 0000000..b086f29 --- /dev/null +++ b/pkgs/bay/tests/README.md @@ -0,0 +1,98 @@ +# Bay 测试 + +本目录包含 Bay 服务的所有测试。 + +## 目录结构 + +``` +tests/ +├── conftest.py # pytest 配置和通用 fixtures +├── unit/ # 单元测试(不需要外部依赖) +│ └── test_utils.py # 工具函数测试 +├── e2e/ # 端到端测试(需要 Bay 服务运行) +│ └── test_bay_api.py # API 功能测试 +├── integration/ # 集成测试(需要 Docker 环境) +│ └── test_integration.py +├── k8s/ # Kubernetes 测试配置 +│ ├── k8s-deploy.yaml +│ └── storageclass-retain.yaml +└── scripts/ # 测试脚本 + ├── test_docker_container.sh + ├── test_docker_host.sh + ├── test_podman_container.sh + ├── test_podman_host.sh + └── test_kubernetes.sh +``` + +## 运行测试 + +### 单元测试 + +单元测试不需要任何外部依赖,可以直接运行: + +```bash +cd pkgs/bay +python -m pytest tests/unit/ -v +``` + +### E2E 测试 + +E2E 测试需要 Bay 服务运行。可以通过以下方式运行: + +1. 确保 Bay 服务正在运行(默认 http://localhost:8156) +2. 运行测试: + +```bash +cd pkgs/bay +python -m pytest tests/e2e/ -v +``` + +可通过环境变量配置: +- `BAY_URL`: Bay 服务地址(默认 http://localhost:8156) +- `BAY_ACCESS_TOKEN`: 访问令牌(默认 secret-token) + +### 集成测试 + +集成测试需要 Docker 环境,会自动构建和启动 Bay 容器: + +```bash +cd pkgs/bay +python -m pytest tests/integration/ -v +``` + +### 使用测试脚本 + +测试脚本位于 `tests/scripts/` 目录下,用于在不同环境中运行完整的测试流程: + +```bash +# Docker 容器模式测试 +./tests/scripts/test_docker_container.sh + +# Docker 主机模式测试 +./tests/scripts/test_docker_host.sh + +# Podman 容器模式测试 +./tests/scripts/test_podman_container.sh + +# Podman 主机模式测试 +./tests/scripts/test_podman_host.sh + +# Kubernetes 模式测试 +./tests/scripts/test_kubernetes.sh [命令] [集群类型] +``` + +## pytest 标记 + +测试使用以下标记进行分类: + +- `@pytest.mark.unit`: 单元测试 +- `@pytest.mark.e2e`: 端到端测试 +- `@pytest.mark.integration`: 集成测试 + +可以使用 `-m` 参数运行特定类型的测试: + +```bash +python -m pytest -m unit # 仅运行单元测试 +python -m pytest -m e2e # 仅运行 E2E 测试 +python -m pytest -m integration # 仅运行集成测试 +``` diff --git a/pkgs/bay/tests/conftest.py b/pkgs/bay/tests/conftest.py index 94db570..5eeb2f9 100644 --- a/pkgs/bay/tests/conftest.py +++ b/pkgs/bay/tests/conftest.py @@ -1,9 +1,74 @@ -import pytest -import docker +""" +Bay 测试配置 + +包含通用的 pytest fixtures 和配置。 +""" + +import os import time -import requests +import uuid from pathlib import Path +import docker +import pytest +import requests + + +# ============================================================================ +# pytest 标记注册 +# ============================================================================ + +def pytest_configure(config): + """注册自定义标记""" + config.addinivalue_line( + "markers", "e2e: 端到端测试,需要 Bay 服务运行" + ) + config.addinivalue_line( + "markers", "integration: 集成测试,需要 Docker 环境" + ) + config.addinivalue_line( + "markers", "unit: 单元测试,不需要外部依赖" + ) + + +# ============================================================================ +# 通用配置 +# ============================================================================ + +# 配置 - 支持从环境变量覆盖 +BAY_URL = os.getenv("BAY_URL", "http://localhost:8156") +ACCESS_TOKEN = os.getenv("BAY_ACCESS_TOKEN", "secret-token") + + +def get_auth_headers(session_id: str | None = None) -> dict[str, str]: + """获取认证请求头""" + sid = session_id or str(uuid.uuid4()) + return { + "Authorization": f"Bearer {ACCESS_TOKEN}", + "X-SESSION-ID": sid, + } + + +# ============================================================================ +# 通用 fixtures +# ============================================================================ + +@pytest.fixture(scope="module") +def bay_url() -> str: + """返回 Bay 服务 URL""" + return BAY_URL + + +@pytest.fixture(scope="module") +def auth_headers() -> dict[str, str]: + """返回认证请求头""" + return get_auth_headers() + + +# ============================================================================ +# Docker/Integration 测试 fixtures +# ============================================================================ + @pytest.fixture(scope="session") def docker_client(): diff --git a/pkgs/bay/tests/e2e/__init__.py b/pkgs/bay/tests/e2e/__init__.py new file mode 100644 index 0000000..3f64167 --- /dev/null +++ b/pkgs/bay/tests/e2e/__init__.py @@ -0,0 +1 @@ +# Bay E2E 测试 diff --git a/pkgs/bay/tests/e2e/test_bay_api.py b/pkgs/bay/tests/e2e/test_bay_api.py new file mode 100644 index 0000000..f91d84c --- /dev/null +++ b/pkgs/bay/tests/e2e/test_bay_api.py @@ -0,0 +1,486 @@ +""" +Bay API E2E 测试 + +需要 Bay 服务运行,测试完整的 API 功能。 + +使用方法: + # 确保 Bay 服务正在运行 + # 设置环境变量(可选): + # BAY_URL=http://localhost:8156 + # BAY_ACCESS_TOKEN=secret-token + + pytest tests/e2e/ -v +""" + +from __future__ import annotations + +import io +import os +import time +import uuid +from contextlib import contextmanager +from typing import Any, Generator + +import pytest +import requests + + +# 配置 - 支持从环境变量覆盖 +BAY_URL = os.getenv("BAY_URL", "http://localhost:8156") +ACCESS_TOKEN = os.getenv("BAY_ACCESS_TOKEN", "secret-token") + + +def get_auth_headers(session_id: str | None = None) -> dict[str, str]: + """获取认证请求头""" + sid = session_id or str(uuid.uuid4()) + return { + "Authorization": f"Bearer {ACCESS_TOKEN}", + "X-SESSION-ID": sid, + } + + +@contextmanager +def fresh_ship( + session_id: str | None = None, + ttl: int = 60, + cpus: float = 0.5, + memory: str = "256m", + disk: str | None = None, +) -> Generator[tuple[str, dict], None, None]: + """ + 创建独立的 Ship 容器用于单个测试,测试结束后自动清理。 + + Yields: + tuple[ship_id, headers]: Ship ID 和请求头 + """ + # 生成唯一的 session ID + test_session_id = session_id or f"test-{uuid.uuid4().hex[:12]}" + headers = get_auth_headers(test_session_id) + + ship_id = None + try: + # 创建 Ship + spec: dict[str, Any] = {"cpus": cpus, "memory": memory} + if disk: + spec["disk"] = disk + payload = {"ttl": ttl, "max_session_num": 1, "spec": spec} + + resp = requests.post( + f"{BAY_URL}/ship", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + if resp.status_code != 201: + raise RuntimeError(f"创建 Ship 失败: {resp.status_code} - {resp.text}") + + data = resp.json() + ship_id = data.get("id") + + # 等待容器就绪 + time.sleep(2) + + yield ship_id, headers + + finally: + # 清理容器 + if ship_id: + try: + requests.delete( + f"{BAY_URL}/ship/{ship_id}", + headers=headers, + timeout=30, + ) + except Exception: + pass # 清理失败不影响测试结果 + + +@pytest.fixture(scope="module") +def bay_url() -> str: + """返回 Bay 服务 URL""" + return BAY_URL + + +@pytest.fixture(scope="module") +def auth_headers() -> dict[str, str]: + """返回认证请求头""" + return get_auth_headers() + + +@pytest.mark.e2e +class TestHealthAndBasicEndpoints: + """阶段 1: 健康检查和基础端点测试(无容器)""" + + def test_health(self, bay_url): + """/health 健康检查""" + resp = requests.get(f"{bay_url}/health", timeout=5) + assert resp.status_code == 200, f"健康检查失败: {resp.text}" + + def test_root(self, bay_url): + """/ 根路由""" + resp = requests.get(f"{bay_url}/", timeout=5) + assert resp.status_code == 200, f"根路由失败: {resp.text}" + + def test_stat(self, bay_url): + """/stat 版本信息""" + resp = requests.get(f"{bay_url}/stat", timeout=5) + assert resp.status_code == 200, f"统计信息失败: {resp.text}" + + +@pytest.mark.e2e +class TestAuthentication: + """阶段 2: 认证测试""" + + def test_auth_required(self, bay_url): + """未授权访问应被拒绝""" + resp = requests.get(f"{bay_url}/ships", timeout=5) + assert resp.status_code in [401, 403], f"未授权访问未被拒绝: {resp.status_code}" + + def test_list_ships_with_auth(self, bay_url, auth_headers): + """/ships 列出 Ships(需要认证)""" + resp = requests.get(f"{bay_url}/ships", headers=auth_headers, timeout=5) + assert resp.status_code == 200, f"列出 ships 失败: {resp.text}" + + +@pytest.mark.e2e +class TestShipCreation: + """阶段 3: Ship 创建相关测试""" + + def test_create_ship_invalid_payload(self, bay_url, auth_headers): + """/ship 创建 Ship(非法参数)""" + payload = {"ttl": 0, "max_session_num": 0} + resp = requests.post( + f"{bay_url}/ship", + headers={**auth_headers, "Content-Type": "application/json"}, + json=payload, + timeout=10, + ) + assert resp.status_code == 422, f"非法参数未被拒绝: {resp.status_code}" + + def test_create_ship_with_small_memory(self, bay_url, auth_headers): + """创建 Ship(极小内存自动修正)""" + # 使用 1m 内存 (1 MiB),如果没有修正,容器必死无疑 + payload = { + "ttl": 60, + "max_session_num": 1, + "spec": {"cpus": 0.1, "memory": "1m"}, + } + resp = requests.post( + f"{bay_url}/ship", + headers={**auth_headers, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + assert resp.status_code == 201, f"极小内存 Ship 创建失败: {resp.text}" + + data = resp.json() + ship_id = data.get("id") + + # 清理 + requests.delete(f"{bay_url}/ship/{ship_id}", headers=auth_headers, timeout=30) + + def test_create_ship_with_disk(self, bay_url, auth_headers): + """创建 Ship(带磁盘限制)""" + payload = { + "ttl": 60, + "max_session_num": 1, + "spec": {"cpus": 0.5, "memory": "256m", "disk": "2Gi"}, + } + resp = requests.post( + f"{bay_url}/ship", + headers={**auth_headers, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + assert resp.status_code == 201, f"带磁盘限制的 Ship 创建失败: {resp.text}" + + data = resp.json() + ship_id = data.get("id") + + # 清理 + requests.delete(f"{bay_url}/ship/{ship_id}", headers=auth_headers, timeout=30) + + +@pytest.mark.e2e +class TestShipOperations: + """阶段 4: Ship 操作测试(每个测试使用独立容器)""" + + def test_create_and_get_ship(self, bay_url): + """创建和获取 Ship""" + with fresh_ship(ttl=60) as (ship_id, headers): + # 验证可以获取 Ship 信息 + resp = requests.get( + f"{bay_url}/ship/{ship_id}", headers=headers, timeout=5 + ) + assert resp.status_code == 200, f"获取 ship 信息失败: {resp.text}" + + def test_get_ship_not_found(self, bay_url, auth_headers): + """获取不存在的 Ship""" + resp = requests.get( + f"{bay_url}/ship/not-exists-id", + headers=auth_headers, + timeout=5, + ) + assert resp.status_code == 404, f"不存在 ship 未返回 404: {resp.status_code}" + + def test_exec_shell(self, bay_url): + """执行 Shell 命令""" + with fresh_ship() as (ship_id, headers): + payload = {"type": "shell/exec", "payload": {"command": "echo Bay"}} + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + timeout=30, + ) + assert resp.status_code == 200, f"Shell 命令执行失败: {resp.text}" + + def test_exec_invalid_type(self, bay_url): + """非法操作类型""" + with fresh_ship() as (ship_id, headers): + payload = {"type": "unknown/exec", "payload": {"foo": "bar"}} + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + timeout=30, + ) + assert resp.status_code == 400, f"非法操作未被拒绝: {resp.status_code}" + + def test_extend_ttl(self, bay_url): + """扩展 TTL""" + with fresh_ship() as (ship_id, headers): + payload = {"ttl": 600} + resp = requests.post( + f"{bay_url}/ship/{ship_id}/extend-ttl", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + timeout=10, + ) + assert resp.status_code == 200, f"TTL 扩展失败: {resp.text}" + + def test_get_logs(self, bay_url): + """获取日志""" + with fresh_ship() as (ship_id, headers): + resp = requests.get( + f"{bay_url}/ship/logs/{ship_id}", headers=headers, timeout=10 + ) + assert resp.status_code == 200, f"获取日志失败: {resp.text}" + + def test_upload_download(self, bay_url): + """上传和下载文件""" + with fresh_ship() as (ship_id, headers): + content = b"hello from bay upload" + file_obj = io.BytesIO(content) + files = {"file": ("hello.txt", file_obj, "text/plain")} + # 使用相对路径,会自动相对于 workspace 目录解析 + workspace_path = "hello.txt" + data = {"file_path": workspace_path} + + upload_resp = requests.post( + f"{bay_url}/ship/{ship_id}/upload", + headers=headers, + files=files, + data=data, + timeout=30, + ) + assert upload_resp.status_code == 200, f"上传失败: {upload_resp.text}" + + download_resp = requests.get( + f"{bay_url}/ship/{ship_id}/download", + headers=headers, + params={"file_path": workspace_path}, + timeout=30, + ) + assert download_resp.status_code == 200, f"下载失败: {download_resp.status_code}" + assert download_resp.content == content, "下载内容不匹配" + + +@pytest.mark.e2e +class TestShipDeletion: + """阶段 5: Ship 删除测试""" + + def test_delete_ship_not_found(self, bay_url): + """删除不存在的 Ship(创建-删除-再删除)""" + # 创建独立的 session + test_session_id = f"delete-test-{uuid.uuid4().hex[:8]}" + headers = get_auth_headers(test_session_id) + + # 创建 Ship + spec: dict[str, Any] = {"cpus": 0.5, "memory": "256m"} + payload = {"ttl": 60, "max_session_num": 1, "spec": spec} + + resp = requests.post( + f"{bay_url}/ship", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + assert resp.status_code == 201, f"创建 Ship 失败: {resp.status_code}" + + data = resp.json() + ship_id = data.get("id") + + # 等待容器就绪 + time.sleep(2) + + # 第一次删除,应该成功 + resp = requests.delete( + f"{bay_url}/ship/{ship_id}", + headers=headers, + timeout=30, + ) + assert resp.status_code == 204, f"第一次删除失败: {resp.status_code}" + + # 第二次删除,应该返回 404 + resp = requests.delete( + f"{bay_url}/ship/{ship_id}", + headers=headers, + timeout=10, + ) + assert resp.status_code == 404, f"重复删除未返回 404: {resp.status_code}" + + +@pytest.mark.e2e +class TestDataPersistence: + """阶段 6: 数据持久化测试 + + 验证容器停止后重新启动时数据是否保留。 + 注意:此测试有特定的执行顺序要求,必须按步骤执行。 + """ + + def test_data_persistence(self, bay_url): + """ + 测试数据持久化完整流程: + 1. 创建 Ship + 2. 写入测试文件 + 3. 删除 Ship + 4. 使用相同 Session ID 重新创建 Ship + 5. 验证测试文件仍然存在 + """ + # 使用一个固定的 Session ID 以便测试恢复功能 + persistence_session_id = f"persistence-test-{uuid.uuid4().hex[:8]}" + persistence_headers = get_auth_headers(persistence_session_id) + + test_filename = "persistence_test.txt" + test_content = f"Persistence test content - {uuid.uuid4().hex}" + ship_id = None + + try: + # Step 1: 创建 Ship + payload = { + "ttl": 120, + "max_session_num": 1, + "spec": {"cpus": 0.5, "memory": "256m"}, + } + resp = requests.post( + f"{bay_url}/ship", + headers={**persistence_headers, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + assert resp.status_code == 201, f"创建 Ship 失败: {resp.status_code} - {resp.text}" + + data = resp.json() + ship_id = data.get("id") + + # 等待 Ship 完全就绪 + time.sleep(3) + + # Step 2: 写入测试文件 + exec_payload = { + "type": "fs/write_file", + "payload": { + "path": test_filename, + "content": test_content, + }, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers={**persistence_headers, "Content-Type": "application/json"}, + json=exec_payload, + timeout=30, + ) + assert resp.status_code == 200, f"写入文件失败: {resp.status_code} - {resp.text}" + + # Step 3: 验证文件已写入 + exec_payload = { + "type": "fs/read_file", + "payload": {"path": test_filename}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers={**persistence_headers, "Content-Type": "application/json"}, + json=exec_payload, + timeout=30, + ) + assert resp.status_code == 200, f"读取文件失败: {resp.status_code} - {resp.text}" + exec_result = resp.json() + read_data = exec_result.get("data", {}) + actual_content = read_data.get("content") + assert actual_content == test_content, f"文件内容不匹配: 期望 {test_content!r}, 实际 {actual_content!r}" + + # Step 4: 删除 Ship + resp = requests.delete( + f"{bay_url}/ship/{ship_id}", + headers=persistence_headers, + timeout=30, + ) + assert resp.status_code == 204, f"删除 Ship 失败: {resp.status_code} - {resp.text}" + + # 等待容器完全停止 + time.sleep(3) + + # Step 5: 使用相同 Session ID 重新创建 Ship + payload = { + "ttl": 120, + "max_session_num": 1, + "spec": {"cpus": 0.5, "memory": "256m"}, + } + resp = requests.post( + f"{bay_url}/ship", + headers={**persistence_headers, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + assert resp.status_code == 201, f"重新创建 Ship 失败: {resp.status_code} - {resp.text}" + + data = resp.json() + new_ship_id = data.get("id") + + # 更新 ship_id 用于后续清理 + if new_ship_id != ship_id: + ship_id = new_ship_id + + # 等待 Ship 完全就绪 + time.sleep(3) + + # Step 6: 验证文件仍然存在 + exec_payload = { + "type": "fs/read_file", + "payload": {"path": test_filename}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers={**persistence_headers, "Content-Type": "application/json"}, + json=exec_payload, + timeout=30, + ) + assert resp.status_code == 200, f"读取文件失败(持久化验证): {resp.status_code} - {resp.text}" + + exec_result = resp.json() + read_data = exec_result.get("data", {}) + actual_content = read_data.get("content") + assert actual_content == test_content, f"持久化失败:文件内容不匹配: 期望 {test_content!r}, 实际 {actual_content!r}" + + finally: + # 清理:删除测试 Ship + if ship_id: + try: + requests.delete( + f"{bay_url}/ship/{ship_id}", + headers=persistence_headers, + timeout=30, + ) + except Exception: + pass diff --git a/pkgs/bay/k8s/k8s-deploy-local.yaml b/pkgs/bay/tests/k8s/k8s-deploy-local.yaml similarity index 98% rename from pkgs/bay/k8s/k8s-deploy-local.yaml rename to pkgs/bay/tests/k8s/k8s-deploy-local.yaml index f1d49b8..ec3cb96 100644 --- a/pkgs/bay/k8s/k8s-deploy-local.yaml +++ b/pkgs/bay/tests/k8s/k8s-deploy-local.yaml @@ -1,5 +1,5 @@ --- -# Shipyard Kubernetes Deployment +# Shipyard Kubernetes Deployment (测试用) # # 用于在本地 K8s 集群(如 kind, minikube, k3s)中测试 Kubernetes 驱动 # diff --git a/pkgs/bay/k8s/k8s-deploy.yaml b/pkgs/bay/tests/k8s/k8s-deploy.yaml similarity index 98% rename from pkgs/bay/k8s/k8s-deploy.yaml rename to pkgs/bay/tests/k8s/k8s-deploy.yaml index 96bfb1d..b92d87b 100644 --- a/pkgs/bay/k8s/k8s-deploy.yaml +++ b/pkgs/bay/tests/k8s/k8s-deploy.yaml @@ -1,5 +1,5 @@ --- -# Shipyard Kubernetes Deployment +# Shipyard Kubernetes Deployment (测试用) # # 用于在本地 K8s 集群(如 kind, minikube, k3s)中测试 Kubernetes 驱动 # diff --git a/pkgs/bay/tests/k8s/storageclass-retain.yaml b/pkgs/bay/tests/k8s/storageclass-retain.yaml new file mode 100644 index 0000000..8b41d48 --- /dev/null +++ b/pkgs/bay/tests/k8s/storageclass-retain.yaml @@ -0,0 +1,10 @@ +# StorageClass with Retain policy for Ship data persistence (测试用) +# This ensures PV data is preserved when PVC is deleted +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: ship-hostpath-retain +provisioner: docker.io/hostpath +reclaimPolicy: Retain +volumeBindingMode: Immediate +allowVolumeExpansion: false diff --git a/pkgs/bay/tests/scripts/test_docker_container.sh b/pkgs/bay/tests/scripts/test_docker_container.sh new file mode 100755 index 0000000..67d3632 --- /dev/null +++ b/pkgs/bay/tests/scripts/test_docker_container.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +PROJECT_ROOT=$(cd "${SCRIPT_DIR}/../.." && pwd) + +cd "${PROJECT_ROOT}" + +echo "==> [Docker Container] 检查 Docker 服务" +docker info >/dev/null + +echo "==> [Docker Container] 本地构建 Bay 镜像 (soulter/shipyard-bay:latest)" +docker build -t soulter/shipyard-bay:latest "${PROJECT_ROOT}" + +echo "==> [Docker Container] 本地构建 Ship 镜像 (soulter/shipyard-ship:latest)" +docker build -t soulter/shipyard-ship:latest "${PROJECT_ROOT}/../ship" + +echo "==> [Docker Container] 创建网络 shipyard_network (如不存在)" +docker network inspect shipyard_network >/dev/null 2>&1 || docker network create shipyard_network + +mkdir -p "${PROJECT_ROOT}/data/shipyard/bay_data" + +export PWD="${PROJECT_ROOT}" + +echo "==> [Docker Container] 启动 docker-compose" +docker compose -f docker-compose.yml up -d + +echo "==> [Docker Container] 等待健康检查" +for i in {1..30}; do + if curl -fsS "http://127.0.0.1:8156/health" >/dev/null; then + echo "==> [Docker Container] Bay 已就绪" + break + fi + sleep 1 + if [[ "$i" -eq 30 ]]; then + echo "❌ [Docker Container] Bay 启动超时" + docker compose -f docker-compose.yml logs + exit 1 + fi +done + +echo "==> [Docker Container] 运行 E2E 测试" +python -m pytest tests/e2e/ -v + +echo "==> [Docker Container] 关闭 docker-compose" +docker compose -f docker-compose.yml down diff --git a/pkgs/bay/tests/scripts/test_docker_host.sh b/pkgs/bay/tests/scripts/test_docker_host.sh new file mode 100755 index 0000000..3a4b2a3 --- /dev/null +++ b/pkgs/bay/tests/scripts/test_docker_host.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +PROJECT_ROOT=$(cd "${SCRIPT_DIR}/../.." && pwd) + +cd "${PROJECT_ROOT}" + +echo "==> [Docker Host] 检查 Docker 服务" +docker info >/dev/null + +echo "==> [Docker Host] 构建 Ship 镜像 (ship:latest)" +docker build -t ship:latest "${PROJECT_ROOT}/../ship" + +echo "==> [Docker Host] 创建网络 shipyard_network (如不存在)" +docker network inspect shipyard_network >/dev/null 2>&1 || docker network create shipyard_network + +mkdir -p "${PROJECT_ROOT}/data" + +export ACCESS_TOKEN="secret-token" +export DATABASE_URL="sqlite+aiosqlite:///./data/bay_test.db" +export CONTAINER_DRIVER="docker-host" +export DOCKER_IMAGE="ship:latest" +export DOCKER_NETWORK="shipyard_network" +export SHIP_DATA_DIR="${PROJECT_ROOT}/data/shipyard/ship_mnt_data" + +BAY_PID="" +cleanup() { + if [[ -n "${BAY_PID}" ]] && kill -0 "${BAY_PID}" >/dev/null 2>&1; then + echo "==> [Docker Host] 停止 Bay (PID=${BAY_PID})" + kill "${BAY_PID}" || true + fi +} +trap cleanup EXIT + +echo "==> [Docker Host] 启动 Bay (python run.py)" +python run.py & +BAY_PID=$! + +echo "==> [Docker Host] 等待健康检查" +for i in {1..30}; do + if curl -fsS "http://127.0.0.1:8156/health" >/dev/null; then + echo "==> [Docker Host] Bay 已就绪" + break + fi + sleep 1 + if [[ "$i" -eq 30 ]]; then + echo "❌ [Docker Host] Bay 启动超时" + exit 1 + fi +done + +echo "==> [Docker Host] 运行 E2E 测试" +python -m pytest tests/e2e/ -v diff --git a/pkgs/bay/k8s/test_kubernetes.sh b/pkgs/bay/tests/scripts/test_kubernetes.sh similarity index 93% rename from pkgs/bay/k8s/test_kubernetes.sh rename to pkgs/bay/tests/scripts/test_kubernetes.sh index 5fe4f17..8af8c38 100755 --- a/pkgs/bay/k8s/test_kubernetes.sh +++ b/pkgs/bay/tests/scripts/test_kubernetes.sh @@ -3,7 +3,7 @@ # # 此脚本用于在本地 K8s 集群中测试 Kubernetes 驱动 # 支持: Docker Desktop Kubernetes, kind, minikube, k3d -# 使用 test_bay_api.py 进行 API 测试 +# 使用 pytest tests/e2e/ 进行 API 测试 # # 使用方法: # ./test_kubernetes.sh [命令] [集群类型] @@ -14,8 +14,10 @@ set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" SHIP_DIR="$(cd "$PROJECT_ROOT/../ship" && pwd)" +TESTS_DIR="$PROJECT_ROOT/tests" +K8S_DIR="$TESTS_DIR/k8s" # 默认参数 COMMAND="${1:-all}" @@ -32,6 +34,8 @@ echo "命令: $COMMAND" echo "集群类型: $CLUSTER_TYPE" echo "Bay 目录: $PROJECT_ROOT" echo "Ship 目录: $SHIP_DIR" +echo "K8s 配置目录: $K8S_DIR" +echo "Tests 目录: $TESTS_DIR" echo "" # 检查必要工具 @@ -131,12 +135,12 @@ load_images() { # 生成使用本地镜像的 YAML generate_local_yaml() { - local output_file="$SCRIPT_DIR/k8s-deploy-local.yaml" + local output_file="$K8S_DIR/k8s-deploy-local.yaml" sed -e "s|soulter/shipyard-bay:latest|$BAY_IMAGE|g" \ -e "s|soulter/shipyard-ship:latest|$SHIP_IMAGE|g" \ -e "s|imagePullPolicy: IfNotPresent|imagePullPolicy: Never|g" \ - "$SCRIPT_DIR/k8s-deploy.yaml" > "$output_file" + "$K8S_DIR/k8s-deploy.yaml" > "$output_file" echo "$output_file" } @@ -148,7 +152,7 @@ deploy() { # 先创建 StorageClass(如果不存在) echo " 创建 StorageClass..." - kubectl apply -f "$SCRIPT_DIR/deploy/06-storageclass-retain.yaml" || true + kubectl apply -f "$K8S_DIR/storageclass-retain.yaml" || true # 生成本地 YAML local yaml_file @@ -210,9 +214,9 @@ run_tests() { # 运行测试脚本 echo "" - echo " 运行 test_bay_api.py..." + echo " 运行 pytest tests/e2e/..." cd "$PROJECT_ROOT" - uv run python test_bay_api.py || true + python -m pytest tests/e2e/ -v || true # 清理端口转发 kill $PF_PID 2>/dev/null || true @@ -249,7 +253,7 @@ cleanup() { kubectl delete clusterrolebinding shipyard-bay-namespace-reader --ignore-not-found=true # 删除生成的本地 YAML - rm -f "$SCRIPT_DIR/k8s-deploy-local.yaml" + rm -f "$K8S_DIR/k8s-deploy-local.yaml" echo "✅ 清理完成" } diff --git a/pkgs/bay/tests/scripts/test_podman_container.sh b/pkgs/bay/tests/scripts/test_podman_container.sh new file mode 100755 index 0000000..c2156da --- /dev/null +++ b/pkgs/bay/tests/scripts/test_podman_container.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +PROJECT_ROOT=$(cd "${SCRIPT_DIR}/../.." && pwd) + +cd "${PROJECT_ROOT}" + +export HTTP_PROXY="http://192.168.44.2:10820" +export HTTPS_PROXY="http://192.168.44.2:10820" +export NO_PROXY="127.0.0.1,localhost" + +echo "==> [Podman Container] 检查 Podman 服务" +podman info >/dev/null + +echo "==> [Podman Container] 本地构建 Bay 镜像 (localhost/soulter/shipyard-bay:latest)" +podman build -t localhost/soulter/shipyard-bay:latest "${PROJECT_ROOT}" + +echo "==> [Podman Container] 本地构建 Ship 镜像 (docker.io/soulter/shipyard-ship:latest)" +podman build -t docker.io/soulter/shipyard-ship:latest "${PROJECT_ROOT}/../ship" + +echo "==> [Podman Container] 创建网络 bay_shipyard (如不存在)" +podman network exists bay_shipyard || podman network create bay_shipyard + +HOST_SHIP_DATA_DIR="${PROJECT_ROOT}/data/shipyard/ship_mnt_data" +mkdir -p "${PROJECT_ROOT}/data/shipyard/bay_data" "${HOST_SHIP_DATA_DIR}" + +BAY_CONTAINER_NAME="shipyard-bay-podman" + +cleanup() { + if podman ps -a --format "{{.Names}}" | grep -q "^${BAY_CONTAINER_NAME}$"; then + echo "==> [Podman Container] 清理 Bay 容器" + podman rm -f "${BAY_CONTAINER_NAME}" || true + fi +} +trap cleanup EXIT + +cleanup + +echo "==> [Podman Container] 启动 Bay 容器" +podman run -d --name "${BAY_CONTAINER_NAME}" \ + -p 8156:8156 \ + -e PORT=8156 \ + -e DATABASE_URL=sqlite+aiosqlite:///./data/bay.db \ + -e ACCESS_TOKEN=secret-token \ + -e MAX_SHIP_NUM=10 \ + -e BEHAVIOR_AFTER_MAX_SHIP=reject \ + -e CONTAINER_DRIVER=podman \ + -e DOCKER_IMAGE=docker.io/soulter/shipyard-ship:latest \ + -e DOCKER_NETWORK=bay_shipyard \ + -e SHIP_DATA_DIR="${HOST_SHIP_DATA_DIR}" \ + -v "${PROJECT_ROOT}/data/shipyard/bay_data:/app/data" \ + -v "${HOST_SHIP_DATA_DIR}:${HOST_SHIP_DATA_DIR}" \ + -v "$XDG_RUNTIME_DIR/podman/podman.sock:/var/run/podman/podman.sock" \ + --network bay_shipyard \ + localhost/soulter/shipyard-bay:latest + +echo "==> [Podman Container] 等待健康检查" +for i in {1..30}; do + if curl -fsS "http://127.0.0.1:8156/health" >/dev/null; then + echo "==> [Podman Container] Bay 已就绪" + break + fi + sleep 1 + if [[ "$i" -eq 30 ]]; then + echo "❌ [Podman Container] Bay 启动超时" + podman logs "${BAY_CONTAINER_NAME}" || true + exit 1 + fi +done + +echo "==> [Podman Container] 运行 E2E 测试" +python -m pytest tests/e2e/ -v diff --git a/pkgs/bay/tests/scripts/test_podman_host.sh b/pkgs/bay/tests/scripts/test_podman_host.sh new file mode 100755 index 0000000..f8dd6b9 --- /dev/null +++ b/pkgs/bay/tests/scripts/test_podman_host.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +PROJECT_ROOT=$(cd "${SCRIPT_DIR}/../.." && pwd) + +cd "${PROJECT_ROOT}" + +export HTTP_PROXY="http://192.168.44.2:10820" +export HTTPS_PROXY="http://192.168.44.2:10820" +export NO_PROXY="127.0.0.1,localhost" + +echo "==> [Podman Host] 检查 Podman 服务" +podman info >/dev/null + +echo "==> [Podman Host] 构建 Ship 镜像 (docker.io/soulter/shipyard-ship:latest)" +podman build -t docker.io/soulter/shipyard-ship:latest "${PROJECT_ROOT}/../ship" + +echo "==> [Podman Host] 创建网络 bay_shipyard (如不存在)" +podman network exists bay_shipyard || podman network create bay_shipyard + +mkdir -p "${PROJECT_ROOT}/data" + +export ACCESS_TOKEN="secret-token" +export DATABASE_URL="sqlite+aiosqlite:///./data/bay_test.db" +export CONTAINER_DRIVER="podman-host" +export DOCKER_IMAGE="docker.io/soulter/shipyard-ship:latest" +export DOCKER_NETWORK="bay_shipyard" +export SHIP_DATA_DIR="${PROJECT_ROOT}/data/shipyard/ship_mnt_data" + +BAY_PID="" +cleanup() { + if [[ -n "${BAY_PID}" ]] && kill -0 "${BAY_PID}" >/dev/null 2>&1; then + echo "==> [Podman Host] 停止 Bay (PID=${BAY_PID})" + kill "${BAY_PID}" || true + fi +} +trap cleanup EXIT + +echo "==> [Podman Host] 启动 Bay (python run.py)" +python run.py & +BAY_PID=$! + +echo "==> [Podman Host] 等待健康检查" +for i in {1..30}; do + if curl -fsS "http://127.0.0.1:8156/health" >/dev/null; then + echo "==> [Podman Host] Bay 已就绪" + break + fi + sleep 1 + if [[ "$i" -eq 30 ]]; then + echo "❌ [Podman Host] Bay 启动超时" + exit 1 + fi +done + +echo "==> [Podman Host] 运行 E2E 测试" +python -m pytest tests/e2e/ -v diff --git a/pkgs/bay/tests/unit/__init__.py b/pkgs/bay/tests/unit/__init__.py new file mode 100644 index 0000000..78a0261 --- /dev/null +++ b/pkgs/bay/tests/unit/__init__.py @@ -0,0 +1 @@ +# Bay 单元测试 diff --git a/pkgs/bay/tests/unit/test_utils.py b/pkgs/bay/tests/unit/test_utils.py new file mode 100644 index 0000000..4b28cfb --- /dev/null +++ b/pkgs/bay/tests/unit/test_utils.py @@ -0,0 +1,180 @@ +""" +单元测试:工具函数测试 + +测试 memory 和 disk 相关的工具函数,不需要 Bay 服务运行。 +""" + +import pytest + + +class TestMemoryUtils: + """内存工具函数单元测试""" + + def test_parse_memory_string(self): + """测试 parse_memory_string 函数""" + from app.drivers.core.utils import parse_memory_string + + # K8s 风格单位 + assert parse_memory_string("512Mi") == 536870912 + assert parse_memory_string("1Gi") == 1073741824 + + # Docker 风格单位 + assert parse_memory_string("512m") == 536870912 + assert parse_memory_string("1g") == 1073741824 + + def test_parse_and_enforce_minimum_memory(self): + """测试 parse_and_enforce_minimum_memory 函数""" + from app.drivers.core.utils import parse_and_enforce_minimum_memory + + # 64Mi < 128Mi,应该被修正 + assert parse_and_enforce_minimum_memory("64Mi") == 134217728 + # 512Mi > 128Mi,应该保持不变 + assert parse_and_enforce_minimum_memory("512Mi") == 536870912 + + def test_normalize_memory_for_k8s_docker_style(self): + """测试 Docker 风格单位转换为 K8s 风格""" + from app.drivers.kubernetes.utils import normalize_memory_for_k8s + + # KB/kb -> Ki (需要足够大的值,131072KB = 128Mi) + assert normalize_memory_for_k8s("256000KB") == "256000Ki" + assert normalize_memory_for_k8s("256000kb") == "256000Ki" + + # MB/mb -> Mi (256MB > 128Mi) + assert normalize_memory_for_k8s("256MB") == "256Mi" + assert normalize_memory_for_k8s("256mb") == "256Mi" + + # GB/gb -> Gi + assert normalize_memory_for_k8s("2GB") == "2Gi" + assert normalize_memory_for_k8s("2gb") == "2Gi" + + # 简写 k/m/g -> Ki/Mi/Gi + assert normalize_memory_for_k8s("256000K") == "256000Ki" + assert normalize_memory_for_k8s("256000k") == "256000Ki" + assert normalize_memory_for_k8s("256M") == "256Mi" + assert normalize_memory_for_k8s("256m") == "256Mi" + assert normalize_memory_for_k8s("2G") == "2Gi" + assert normalize_memory_for_k8s("2g") == "2Gi" + + def test_normalize_memory_for_k8s_already_k8s_style(self): + """测试已经是 K8s 单位时保持原样""" + from app.drivers.kubernetes.utils import normalize_memory_for_k8s + + # 256Mi > 128Mi,不会触发最小值限制 + assert normalize_memory_for_k8s("256Mi") == "256Mi" + assert normalize_memory_for_k8s("256MI") == "256MI" + assert normalize_memory_for_k8s("1Gi") == "1Gi" + assert normalize_memory_for_k8s("1GI") == "1GI" + + def test_normalize_memory_for_k8s_raw_bytes(self): + """测试纯字节数字原样透传""" + from app.drivers.kubernetes.utils import normalize_memory_for_k8s + + # >= 128Mi = 134217728 字节 + assert normalize_memory_for_k8s("134217728") == "134217728" + assert normalize_memory_for_k8s("536870912") == "536870912" + + def test_normalize_memory_for_k8s_minimum_enforcement(self): + """测试小的值应被提升到最小内存""" + from app.drivers.kubernetes.utils import normalize_memory_for_k8s + + # 64Mi < 128Mi,应该被提升 + assert normalize_memory_for_k8s("64Mi") == "134217728" + # 64m(Docker 的 MB 单位)在转换后也应被提升到最小值 + assert normalize_memory_for_k8s("64m") == "134217728" + # 1k(1024 字节)也应被提升 + assert normalize_memory_for_k8s("1k") == "134217728" + # 小的 Ki 值也应被提升 + assert normalize_memory_for_k8s("512Ki") == "134217728" + + def test_normalize_memory_for_k8s_empty_string(self): + """测试空字符串处理""" + from app.drivers.kubernetes.utils import normalize_memory_for_k8s + + assert normalize_memory_for_k8s("") == "" + + +class TestDiskUtils: + """磁盘工具函数单元测试""" + + def test_default_disk_config(self): + """测试默认磁盘配置""" + from app.config import settings + + assert settings.default_ship_disk == "1Gi", f"Expected '1Gi', got '{settings.default_ship_disk}'" + + def test_parse_disk_string(self): + """测试 parse_disk_string 函数""" + from app.drivers.core.utils import parse_disk_string + + # K8s 风格单位 + assert parse_disk_string("1Gi") == 1073741824 + assert parse_disk_string("512Mi") == 536870912 + + # Docker 风格单位 + assert parse_disk_string("1g") == 1073741824 + assert parse_disk_string("512m") == 536870912 + assert parse_disk_string("10G") == 10737418240 + + # 纯字节 + assert parse_disk_string("1073741824") == 1073741824 + + def test_parse_and_enforce_minimum_disk(self): + """测试 parse_and_enforce_minimum_disk 函数""" + from app.drivers.core.utils import ( + parse_and_enforce_minimum_disk, + MIN_DISK_BYTES, + ) + + # 50Mi < 100Mi,应该被修正到 100Mi + assert parse_and_enforce_minimum_disk("50Mi") == MIN_DISK_BYTES + # 1Gi > 100Mi,应该保持不变 + assert parse_and_enforce_minimum_disk("1Gi") == 1073741824 + + def test_normalize_disk_for_k8s_docker_style(self): + """测试 Docker 风格单位转换为 K8s 风格""" + from app.drivers.kubernetes.utils import normalize_disk_for_k8s + + assert normalize_disk_for_k8s("1g") == "1Gi" + assert normalize_disk_for_k8s("10G") == "10Gi" + assert normalize_disk_for_k8s("512m") == "512Mi" + assert normalize_disk_for_k8s("1024M") == "1024Mi" + + def test_normalize_disk_for_k8s_already_k8s_style(self): + """测试已经是 K8s 单位时保持原样""" + from app.drivers.kubernetes.utils import normalize_disk_for_k8s + + assert normalize_disk_for_k8s("1Gi") == "1Gi" + assert normalize_disk_for_k8s("512Mi") == "512Mi" + + def test_normalize_disk_for_k8s_raw_bytes(self): + """测试纯字节数字应原样透传""" + from app.drivers.kubernetes.utils import normalize_disk_for_k8s + + # >= 100Mi + assert normalize_disk_for_k8s("1073741824") == "1073741824" + + def test_normalize_disk_for_k8s_minimum_enforcement(self): + """测试小的值应被提升到最小值""" + from app.drivers.core.utils import MIN_DISK_BYTES + from app.drivers.kubernetes.utils import normalize_disk_for_k8s + + # 100Mi = 104857600 字节 + assert normalize_disk_for_k8s("50Mi") == str(MIN_DISK_BYTES) + assert normalize_disk_for_k8s("50m") == str(MIN_DISK_BYTES) + + def test_normalize_disk_for_k8s_empty_string(self): + """测试空字符串处理""" + from app.drivers.kubernetes.utils import normalize_disk_for_k8s + + assert normalize_disk_for_k8s("") == "" + + def test_ship_spec_disk_field(self): + """测试 ShipSpec 包含 disk 字段""" + from app.models import ShipSpec + + spec = ShipSpec(cpus=2.0, memory="512m", disk="5Gi") + assert spec.disk == "5Gi" + + # 测试 disk 字段可选 + spec_no_disk = ShipSpec(cpus=1.0, memory="256m") + assert spec_no_disk.disk is None diff --git a/plans/test-container-isolation.md b/plans/test-container-isolation.md new file mode 100644 index 0000000..339d8e7 --- /dev/null +++ b/plans/test-container-isolation.md @@ -0,0 +1,271 @@ +# 测试容器隔离改造方案 + +## 目标 + +将 `pkgs/bay/test_bay_api.py` 中的测试改造为:**每个需要容器的测试使用独立的容器**,测试结束后自动清理。 + +## 当前问题 + +当前代码在 `main()` 函数中创建一个共享的 Ship 容器,然后多个测试复用这个容器: +- `test_get_ship(ship_id)` +- `test_exec_shell(ship_id)` +- `test_exec_invalid_type(ship_id)` +- `test_extend_ttl(ship_id)` +- `test_get_logs(ship_id)` +- `test_upload_download(ship_id)` +- `test_delete_ship(ship_id)` + +这导致测试之间存在隐式依赖,一个测试的副作用可能影响后续测试。 + +## 改造方案 + +### 1. 添加辅助基础设施 + +创建一个上下文管理器 `fresh_ship()` 用于自动创建和清理容器: + +```python +from contextlib import contextmanager +from typing import Generator + +@contextmanager +def fresh_ship( + session_id: str | None = None, + ttl: int = 60, + cpus: float = 0.5, + memory: str = "256m", + disk: str | None = None, +) -> Generator[tuple[str, dict], None, None]: + """ + 创建独立的 Ship 容器用于单个测试,测试结束后自动清理。 + + Yields: + tuple[ship_id, headers]: Ship ID 和请求头 + """ + # 生成唯一的 session ID + test_session_id = session_id or f"test-{uuid.uuid4().hex[:12]}" + headers = { + "Authorization": f"Bearer {ACCESS_TOKEN}", + "X-SESSION-ID": test_session_id, + } + + ship_id = None + try: + # 创建 Ship + spec = {"cpus": cpus, "memory": memory} + if disk: + spec["disk"] = disk + payload = {"ttl": ttl, "max_session_num": 1, "spec": spec} + + resp = requests.post( + f"{BAY_URL}/ship", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + if resp.status_code != 201: + raise RuntimeError(f"创建 Ship 失败: {resp.status_code} - {resp.text}") + + data = resp.json() + ship_id = data.get("id") + + # 等待容器就绪 + time.sleep(2) + + yield ship_id, headers + + finally: + # 清理容器 + if ship_id: + try: + requests.delete( + f"{BAY_URL}/ship/{ship_id}", + headers=headers, + timeout=30, + ) + except Exception: + pass # 清理失败不影响测试结果 +``` + +### 2. 测试分类与改造 + +#### 第一类:不需要容器的测试(无需修改) + +| 测试函数 | 说明 | +|---------|------| +| `test_memory_utils()` | 本地单元测试 | +| `test_disk_utils()` | 本地单元测试 | +| `test_health()` | API 测试,无需容器 | +| `test_root()` | API 测试,无需容器 | +| `test_stat()` | API 测试,无需容器 | +| `test_auth_required()` | API 测试,无需容器 | +| `test_list_ships()` | API 测试,无需容器 | +| `test_create_ship_invalid_payload()` | API 测试,无需容器 | +| `test_get_ship_not_found()` | API 测试,无需容器 | + +#### 第二类:已经使用独立容器的测试(无需修改) + +| 测试函数 | 说明 | +|---------|------| +| `test_create_ship_with_small_memory()` | 已自包含 ✅ | +| `test_create_ship_with_disk()` | 已自包含 ✅ | +| `test_data_persistence()` | 已自包含 ✅ | + +#### 第三类:需要改造为独立容器的测试 + +| 测试函数 | 改造方式 | +|---------|---------| +| `test_create_ship()` | 改为 `test_create_and_get_ship()` - 创建、验证、删除 | +| `test_get_ship()` | 合并到 `test_create_and_get_ship()` | +| `test_exec_shell()` | 改为自包含,使用 `fresh_ship()` | +| `test_exec_invalid_type()` | 改为自包含,使用 `fresh_ship()` | +| `test_extend_ttl()` | 改为自包含,使用 `fresh_ship()` | +| `test_get_logs()` | 改为自包含,使用 `fresh_ship()` | +| `test_upload_download()` | 改为自包含,使用 `fresh_ship()` | +| `test_delete_ship()` | 合并到每个测试的清理逻辑中 | +| `test_delete_ship_not_found()` | 改为独立测试,创建后删除两次 | + +### 3. 改造后的测试函数示例 + +#### test_exec_shell 改造 + +```python +def test_exec_shell() -> bool: + print_section("测试: 执行 Shell 命令") + try: + with fresh_ship() as (ship_id, headers): + print(f"使用独立 Ship: {ship_id}") + payload = {"type": "shell/exec", "payload": {"command": "echo Bay"}} + resp = requests.post( + f"{BAY_URL}/ship/{ship_id}/exec", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + timeout=30, + ) + return check_status(resp, 200, "Shell 命令执行成功", "Shell 命令执行失败") + except Exception as exc: + print(f"❌ 请求失败: {exc}") + return False +``` + +#### test_delete_ship_not_found 改造 + +```python +def test_delete_ship_not_found() -> bool: + print_section("测试: 删除不存在的 Ship") + try: + with fresh_ship() as (ship_id, headers): + # 先删除一次 + resp = requests.delete( + f"{BAY_URL}/ship/{ship_id}", + headers=headers, + timeout=30, + ) + if resp.status_code != 204: + print(f"❌ 第一次删除失败: {resp.status_code}") + return False + + # 再删除一次,应该返回 404 + resp = requests.delete( + f"{BAY_URL}/ship/{ship_id}", + headers=headers, + timeout=10, + ) + return check_status(resp, 404, "重复删除返回 404", "重复删除未返回 404") + except Exception as exc: + print(f"❌ 请求失败: {exc}") + return False +``` + +### 4. 改造后的 main() 函数 + +```python +def main() -> None: + print("Bay API 功能测试(容器隔离版)") + print("=" * 70) + print(f"服务地址: {BAY_URL}") + print() + + # ===== 第一阶段:本地单元测试 ===== + print("\n" + "=" * 70) + print("阶段 1: 本地单元测试") + print("=" * 70) + + if not test_memory_utils(): + print("\n内存单元测试失败,退出测试") + sys.exit(1) + if not test_disk_utils(): + print("\n磁盘单元测试失败,退出测试") + sys.exit(1) + + # ===== 第二阶段:无容器 API 测试 ===== + print("\n" + "=" * 70) + print("阶段 2: 无容器 API 测试") + print("=" * 70) + + if not test_health(): + print("\n服务未运行,退出测试") + sys.exit(1) + test_root() + test_stat() + test_auth_required() + test_list_ships() + test_create_ship_invalid_payload() + test_get_ship_not_found() + + # ===== 第三阶段:独立容器测试 ===== + print("\n" + "=" * 70) + print("阶段 3: 独立容器测试(每个测试使用独立容器)") + print("=" * 70) + + test_create_ship_with_small_memory() + test_create_ship_with_disk() + test_create_and_get_ship() + test_exec_shell() + test_exec_invalid_type() + test_extend_ttl() + test_get_logs() + test_upload_download() + test_delete_ship_not_found() + + # ===== 第四阶段:特殊生命周期测试 ===== + print("\n" + "=" * 70) + print("阶段 4: 特殊生命周期测试") + print("=" * 70) + + test_data_persistence() + + print("\n" + "=" * 70) + print("测试完成!") + print("=" * 70) +``` + +## 实施计划 + +- [ ] 添加 `fresh_ship()` 上下文管理器 +- [ ] 创建 `test_create_and_get_ship()` 合并创建和获取测试 +- [ ] 改造 `test_exec_shell()` 使用独立容器 +- [ ] 改造 `test_exec_invalid_type()` 使用独立容器 +- [ ] 改造 `test_extend_ttl()` 使用独立容器 +- [ ] 改造 `test_get_logs()` 使用独立容器 +- [ ] 改造 `test_upload_download()` 使用独立容器 +- [ ] 改造 `test_delete_ship_not_found()` 使用独立容器 +- [ ] 更新 `main()` 函数移除共享容器逻辑 +- [ ] 删除不再需要的 `test_create_ship()`, `test_get_ship()`, `test_delete_ship()` 函数 + +## 优势 + +1. **完全隔离** - 每个测试使用独立容器,不会相互影响 +2. **更健壮** - 单个测试失败不会导致后续测试全部失败 +3. **可并行** - 理论上可以并行运行所有独立容器测试 +4. **易于调试** - 每个测试的容器状态是确定的 + +## 潜在问题 + +1. **运行时间增加** - 每个测试都需要创建和销毁容器,总时间会增加 +2. **资源消耗** - 同时运行多个测试时可能消耗更多资源 + +## 缓解措施 + +- 可以添加 `--share-container` 参数用于快速开发测试场景 +- 容器创建可以使用较小的资源配置(0.5 CPU, 256m 内存) +- TTL 设置较短(60秒)以便快速回收 From 5fb9adb9891070ba582c6ec8a779925969586de2 Mon Sep 17 00:00:00 2001 From: RC-CHN <1051989940@qq.com> Date: Thu, 22 Jan 2026 15:41:07 +0800 Subject: [PATCH 2/7] fix(ship): refresh ship after session increment Add logging for ship exec responses and extend e2e tests to cover filesystem, ipython execution, and multi-session reuse. --- pkgs/bay/app/services/ship/http_client.py | 1 + pkgs/bay/app/services/ship/service.py | 4 +- pkgs/bay/tests/e2e/test_bay_api.py | 187 ++++++++++++++++++++++ 3 files changed, 190 insertions(+), 2 deletions(-) diff --git a/pkgs/bay/app/services/ship/http_client.py b/pkgs/bay/app/services/ship/http_client.py index b6252e4..443a48a 100644 --- a/pkgs/bay/app/services/ship/http_client.py +++ b/pkgs/bay/app/services/ship/http_client.py @@ -84,6 +84,7 @@ async def forward_request_to_ship( ) as response: if response.status == 200: data = await response.json() + logger.info(f"Ship exec response for {request.type}: {data}") return ExecResponse(success=True, data=data) else: error_text = await response.text() diff --git a/pkgs/bay/app/services/ship/service.py b/pkgs/bay/app/services/ship/service.py index 14cc74f..ab69407 100644 --- a/pkgs/bay/app/services/ship/service.py +++ b/pkgs/bay/app/services/ship/service.py @@ -114,7 +114,7 @@ async def create_ship(self, request: CreateShipRequest, session_id: str) -> Ship initial_ttl=request.ttl, ) await db_service.create_session_ship(session_ship) - await db_service.increment_ship_session_count(available_ship.id) + available_ship = await db_service.increment_ship_session_count(available_ship.id) # Recalculate ship's TTL based on all sessions' expiration times await self._recalculate_and_schedule_cleanup(available_ship.id) @@ -177,7 +177,7 @@ async def create_ship(self, request: CreateShipRequest, session_id: str) -> Ship initial_ttl=request.ttl, ) await db_service.create_session_ship(session_ship) - await db_service.increment_ship_session_count(ship.id) + ship = await db_service.increment_ship_session_count(ship.id) # Schedule TTL cleanup await self._schedule_cleanup(ship.id, ship.ttl) diff --git a/pkgs/bay/tests/e2e/test_bay_api.py b/pkgs/bay/tests/e2e/test_bay_api.py index f91d84c..63bdb76 100644 --- a/pkgs/bay/tests/e2e/test_bay_api.py +++ b/pkgs/bay/tests/e2e/test_bay_api.py @@ -295,6 +295,127 @@ def test_upload_download(self, bay_url): assert download_resp.status_code == 200, f"下载失败: {download_resp.status_code}" assert download_resp.content == content, "下载内容不匹配" + def test_filesystem_operations(self, bay_url): + """文件系统操作测试(创建、读取、写入、列表、删除)""" + with fresh_ship() as (ship_id, headers): + headers_with_content_type = {**headers, "Content-Type": "application/json"} + + # 1. 创建文件 + create_file_data = { + "type": "fs/create_file", + "payload": {"path": "test_file.txt", "content": "Hello, World!"}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers=headers_with_content_type, + json=create_file_data, + timeout=30, + ) + assert resp.status_code == 200, f"创建文件失败: {resp.text}" + result = resp.json() + assert result.get("success") is True, f"创建文件操作失败: {result}" + + # 2. 读取文件 + read_file_data = { + "type": "fs/read_file", + "payload": {"path": "test_file.txt"}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers=headers_with_content_type, + json=read_file_data, + timeout=30, + ) + assert resp.status_code == 200, f"读取文件失败: {resp.text}" + result = resp.json() + assert result.get("success") is True, f"读取文件操作失败: {result}" + assert result["data"]["content"] == "Hello, World!", "文件内容不匹配" + + # 3. 写入文件 + write_file_data = { + "type": "fs/write_file", + "payload": {"path": "test_file.txt", "content": "Updated content!"}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers=headers_with_content_type, + json=write_file_data, + timeout=30, + ) + assert resp.status_code == 200, f"写入文件失败: {resp.text}" + result = resp.json() + assert result.get("success") is True, f"写入文件操作失败: {result}" + + # 4. 列表目录 + list_dir_data = { + "type": "fs/list_dir", + "payload": {"path": "./"}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers=headers_with_content_type, + json=list_dir_data, + timeout=30, + ) + assert resp.status_code == 200, f"列表目录失败: {resp.text}" + result = resp.json() + assert result.get("success") is True, f"列表目录操作失败: {result}" + + # 5. 删除文件 + delete_file_data = { + "type": "fs/delete_file", + "payload": {"path": "test_file.txt"}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers=headers_with_content_type, + json=delete_file_data, + timeout=30, + ) + assert resp.status_code == 200, f"删除文件失败: {resp.text}" + result = resp.json() + assert result.get("success") is True, f"删除文件操作失败: {result}" + + def test_ipython_operations(self, bay_url): + """IPython 操作测试""" + with fresh_ship() as (ship_id, headers): + headers_with_content_type = {**headers, "Content-Type": "application/json"} + + # 1. 执行简单 Python 代码 + ipython_data = { + "type": "ipython/exec", + "payload": {"code": "x = 5 + 3\nprint(f'Result: {x}')", "timeout": 10}, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers=headers_with_content_type, + json=ipython_data, + timeout=30, + ) + assert resp.status_code == 200, f"IPython 执行失败: {resp.text}" + result = resp.json() + assert result.get("success") is True, f"IPython 操作失败: {result}" + assert "Result: 8" in result["data"]["output"]["text"], f"IPython 输出不匹配: {result}" + + # 2. 执行带 import 的代码 + import_data = { + "type": "ipython/exec", + "payload": { + "code": "import math\nresult = math.sqrt(16)\nprint(f'Square root of 16 is {result}')", + "timeout": 10, + }, + } + resp = requests.post( + f"{bay_url}/ship/{ship_id}/exec", + headers=headers_with_content_type, + json=import_data, + timeout=30, + ) + assert resp.status_code == 200, f"IPython import 执行失败: {resp.text}" + result = resp.json() + assert result.get("success") is True, f"IPython import 操作失败: {result}" + assert "Square root of 16 is 4.0" in result["data"]["output"]["text"], f"IPython import 输出不匹配: {result}" + @pytest.mark.e2e class TestShipDeletion: @@ -341,6 +462,72 @@ def test_delete_ship_not_found(self, bay_url): assert resp.status_code == 404, f"重复删除未返回 404: {resp.status_code}" +@pytest.mark.e2e +class TestMultipleSessions: + """阶段 5.5: 多会话复用测试""" + + def test_multiple_sessions(self, bay_url): + """测试 Ship 多会话复用功能 + + 创建一个 max_session_num=2 的 Ship,然后用不同的 session ID 访问, + 验证是否可以复用同一个 Ship。 + """ + # 创建第一个 session + session_id_1 = f"multi-session-test-{uuid.uuid4().hex[:8]}" + headers_1 = get_auth_headers(session_id_1) + + # 创建 Ship with max_session_num = 2 + payload = { + "ttl": 600, + "max_session_num": 2, + "spec": {"cpus": 0.5, "memory": "256m"}, + } + + resp = requests.post( + f"{bay_url}/ship", + headers={**headers_1, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + assert resp.status_code == 201, f"创建 Ship 失败: {resp.status_code}" + + ship_data = resp.json() + ship_id = ship_data.get("id") + assert ship_data.get("current_session_num") == 1, "初始会话数应为 1" + + try: + # 等待 Ship 就绪 + time.sleep(3) + + # 用第二个 session ID 请求 + session_id_2 = f"multi-session-test-{uuid.uuid4().hex[:8]}" + headers_2 = get_auth_headers(session_id_2) + + resp = requests.post( + f"{bay_url}/ship", + headers={**headers_2, "Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + # 应该成功创建或复用 + assert resp.status_code == 201, f"第二个会话请求失败: {resp.status_code}" + + reused_ship = resp.json() + # 记录是否复用了同一个 Ship + if reused_ship.get("id") == ship_id: + # 复用了同一个 Ship + assert reused_ship.get("current_session_num") == 2, "复用后会话数应为 2" + # 如果是新 Ship 也是可以接受的行为 + + finally: + # 清理 + requests.delete( + f"{bay_url}/ship/{ship_id}", + headers=headers_1, + timeout=30, + ) + + @pytest.mark.e2e class TestDataPersistence: """阶段 6: 数据持久化测试 From f086ed7f686cb94a5426dc7d3381f5c324260545 Mon Sep 17 00:00:00 2001 From: RC-CHN <1051989940@qq.com> Date: Thu, 22 Jan 2026 16:06:52 +0800 Subject: [PATCH 3/7] ci: add github actions workflow Add CI pipeline configuration to run unit and E2E tests for bay and ship packages. Include additional improvements for bay: - Sanitize ship execution logs to prevent data exposure by creating a response summary and moving full logs to debug level. - Add unit tests for invalid memory string parsing scenarios. --- .github/workflows/ci.yml | 102 ++++++++++++++++++++++ pkgs/bay/app/services/ship/http_client.py | 26 +++++- pkgs/bay/tests/unit/test_utils.py | 19 ++++ 3 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..ba8f9a3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,102 @@ +name: CI + +on: + push: + branches: + - main + - master + paths: + - 'pkgs/bay/**' + - 'pkgs/ship/**' + - '.github/workflows/ci.yml' + pull_request: + branches: + - main + - master + paths: + - 'pkgs/bay/**' + - 'pkgs/ship/**' + - '.github/workflows/ci.yml' + +jobs: + test-bay: + runs-on: ubuntu-latest + defaults: + run: + working-directory: pkgs/bay + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version-file: "pkgs/bay/pyproject.toml" + + - name: Install dependencies + run: uv sync --all-extras --dev + + - name: Run tests + run: uv run pytest tests/unit + + e2e-bay: + runs-on: ubuntu-latest + defaults: + run: + working-directory: pkgs/bay + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version-file: "pkgs/bay/pyproject.toml" + + - name: Install dependencies + run: uv sync --all-extras --dev + + - name: Run E2E tests (Docker Container) + run: | + chmod +x tests/scripts/test_docker_container.sh + ./tests/scripts/test_docker_container.sh + + test-ship: + runs-on: ubuntu-latest + defaults: + run: + working-directory: pkgs/ship + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version-file: "pkgs/ship/pyproject.toml" + + - name: Install dependencies + run: uv sync --all-extras --dev + + # Assuming ship has tests, if not this step might fail or do nothing. + # Based on file list, ship has tests/integration but maybe not unit tests yet. + # We'll try to run pytest if tests directory exists. + - name: Run tests + run: | + if [ -d "tests" ]; then + uv run pytest tests + else + echo "No tests directory found for ship" + fi diff --git a/pkgs/bay/app/services/ship/http_client.py b/pkgs/bay/app/services/ship/http_client.py index 443a48a..4a43182 100644 --- a/pkgs/bay/app/services/ship/http_client.py +++ b/pkgs/bay/app/services/ship/http_client.py @@ -84,7 +84,31 @@ async def forward_request_to_ship( ) as response: if response.status == 200: data = await response.json() - logger.info(f"Ship exec response for {request.type}: {data}") + + # Log full response at DEBUG level + logger.debug(f"Full ship exec response for {request.type}: {data}") + + # Create summary for INFO level to avoid noise and data exposure + summary = {} + if isinstance(data, dict): + # Whitelist specific safe fields + for k in ["status", "exit_code", "execution_count", "success", "error"]: + if k in data: + summary[k] = data[k] + + # Summarize other fields + for k, v in data.items(): + if k not in summary: + if isinstance(v, str): + summary[f"{k}_len"] = len(v) + elif isinstance(v, (list, dict)): + summary[f"{k}_size"] = len(v) + else: + summary[f"{k}_type"] = type(v).__name__ + else: + summary = {"type": type(data).__name__} + + logger.info(f"Ship exec response for {request.type}: {summary}") return ExecResponse(success=True, data=data) else: error_text = await response.text() diff --git a/pkgs/bay/tests/unit/test_utils.py b/pkgs/bay/tests/unit/test_utils.py index 4b28cfb..f2d30ec 100644 --- a/pkgs/bay/tests/unit/test_utils.py +++ b/pkgs/bay/tests/unit/test_utils.py @@ -22,6 +22,25 @@ def test_parse_memory_string(self): assert parse_memory_string("512m") == 536870912 assert parse_memory_string("1g") == 1073741824 + def test_parse_memory_string_invalid(self): + """测试 parse_memory_string 函数处理无效输入""" + from app.drivers.core.utils import parse_memory_string + + # 无效格式 + with pytest.raises(ValueError): + parse_memory_string("abc") + + with pytest.raises(ValueError): + parse_memory_string("10XZ") + + # 负数 + with pytest.raises(ValueError): + parse_memory_string("-512Mi") + + # None + with pytest.raises(AttributeError): + parse_memory_string(None) + def test_parse_and_enforce_minimum_memory(self): """测试 parse_and_enforce_minimum_memory 函数""" from app.drivers.core.utils import parse_and_enforce_minimum_memory From 3a4cb45e9697282ef5e33a6118abc60768622709 Mon Sep 17 00:00:00 2001 From: RC-CHN <1051989940@qq.com> Date: Thu, 22 Jan 2026 16:10:25 +0800 Subject: [PATCH 4/7] ci: set python version to 3.13 --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ba8f9a3..69d8672 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version-file: "pkgs/bay/pyproject.toml" + python-version: "3.13" - name: Install dependencies run: uv sync --all-extras --dev @@ -59,7 +59,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version-file: "pkgs/bay/pyproject.toml" + python-version: "3.13" - name: Install dependencies run: uv sync --all-extras --dev @@ -85,7 +85,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version-file: "pkgs/ship/pyproject.toml" + python-version: "3.13" - name: Install dependencies run: uv sync --all-extras --dev From 63728be08723d2f743c4327ead568778000f748a Mon Sep 17 00:00:00 2001 From: RC-CHN <1051989940@qq.com> Date: Thu, 22 Jan 2026 16:21:47 +0800 Subject: [PATCH 5/7] ci: run pytest via uv and set PYTHONPATH --- .github/workflows/ci.yml | 4 +++- pkgs/bay/tests/scripts/test_docker_container.sh | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 69d8672..288de41 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,7 +41,9 @@ jobs: run: uv sync --all-extras --dev - name: Run tests - run: uv run pytest tests/unit + env: + PYTHONPATH: . + run: uv run python -m pytest tests/unit e2e-bay: runs-on: ubuntu-latest diff --git a/pkgs/bay/tests/scripts/test_docker_container.sh b/pkgs/bay/tests/scripts/test_docker_container.sh index 67d3632..c092251 100755 --- a/pkgs/bay/tests/scripts/test_docker_container.sh +++ b/pkgs/bay/tests/scripts/test_docker_container.sh @@ -40,7 +40,7 @@ for i in {1..30}; do done echo "==> [Docker Container] 运行 E2E 测试" -python -m pytest tests/e2e/ -v +uv run python -m pytest tests/e2e/ -v echo "==> [Docker Container] 关闭 docker-compose" docker compose -f docker-compose.yml down From a18271b83720e05d0ea5b7d886d6bfe943157ca9 Mon Sep 17 00:00:00 2001 From: RC-CHN <1051989940@qq.com> Date: Fri, 23 Jan 2026 09:02:45 +0800 Subject: [PATCH 6/7] feat(ship): add background process management and improve shell execution - Add background process registry to track processes per session - Implement process listing endpoint GET /processes - Fix environment variable passing in sudo commands - Set kernel working directory via os.chdir in IPython init - Add Python version file (.python-version) - Fix integration test NO_PROXY handling and startup error flow --- pkgs/ship/.python-version | 1 + pkgs/ship/app/components/ipython.py | 4 +- pkgs/ship/app/components/shell.py | 152 ++++------------- pkgs/ship/app/components/user_manager.py | 153 ++++++++++++++++-- .../tests/integration/test_integration.py | 17 +- 5 files changed, 191 insertions(+), 136 deletions(-) create mode 100644 pkgs/ship/.python-version diff --git a/pkgs/ship/.python-version b/pkgs/ship/.python-version new file mode 100644 index 0000000..24ee5b1 --- /dev/null +++ b/pkgs/ship/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/pkgs/ship/app/components/ipython.py b/pkgs/ship/app/components/ipython.py index 05eb52a..445673b 100644 --- a/pkgs/ship/app/components/ipython.py +++ b/pkgs/ship/app/components/ipython.py @@ -60,11 +60,13 @@ async def _set_kernel_working_directory(km: AsyncKernelManager, working_dir: str kc = km.client() try: # 执行初始化代码,包括中文字体配置 - init_code = """ + init_code = f""" import matplotlib.pyplot as plt import matplotlib.font_manager as fm import shutil, os +os.chdir({working_dir!r}) + cache_dir = os.path.expanduser("~/.cache/matplotlib") if os.path.exists(cache_dir): shutil.rmtree(cache_dir) diff --git a/pkgs/ship/app/components/shell.py b/pkgs/ship/app/components/shell.py index 7c4351c..7f9b0aa 100644 --- a/pkgs/ship/app/components/shell.py +++ b/pkgs/ship/app/components/shell.py @@ -1,11 +1,6 @@ -import asyncio -import os -import signal -from typing import Dict, Optional, Any +from typing import Dict, Optional, List from fastapi import APIRouter, HTTPException, Header from pydantic import BaseModel -from pathlib import Path -from ..workspace import get_session_workspace from .user_manager import run_as_user router = APIRouter() @@ -26,133 +21,19 @@ class ExecuteShellResponse(BaseModel): stdout: str = "" stderr: str = "" pid: Optional[int] = None - # process_id: Optional[str] = None # 用于后台进程 + process_id: Optional[str] = None # 用于后台进程 error: Optional[str] = None class ProcessInfo(BaseModel): - # process_id: str + process_id: str pid: int command: str status: str -def generate_process_id() -> str: - """生成进程ID""" - import uuid - - return str(uuid.uuid4())[:8] - - -async def run_command( - session_id: str, - command: str, - cwd: Optional[str] = None, - env: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, - shell: bool = True, - background: bool = False, -) -> Dict[str, Any]: - """执行shell命令""" - - # 准备环境变量 - process_env = os.environ.copy() - if env: - process_env.update(env) - - # 准备工作目录。如果未指定,使用 session 工作目录 - if cwd is None: - working_dir = await get_session_workspace(session_id) - else: - # 相对路径相对于 session 工作目录解析 - if not os.path.isabs(cwd): - working_dir = await get_session_workspace(session_id) / cwd - else: - working_dir = Path(cwd) - - if not working_dir.exists(): - raise ValueError(f"Working directory does not exist: {working_dir}") - - try: - if shell: - # 使用shell模式 - process = await asyncio.create_subprocess_shell( - command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=str(working_dir), - env=process_env, - ) - else: - # 分割命令参数 - args = command.split() - process = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=str(working_dir), - env=process_env, - ) - - if background: - # 后台进程 - process_id = generate_process_id() - - return { - "success": True, - "pid": process.pid, - "process_id": process_id, - "return_code": None, - "stdout": "", - "stderr": "", - "error": None, - } - else: - # 前台进程,等待完成 - try: - stdout, stderr = await asyncio.wait_for( - process.communicate(), timeout=timeout - ) - - return { - "success": process.returncode == 0, - "return_code": process.returncode, - "stdout": stdout.decode("utf-8", errors="replace"), - "stderr": stderr.decode("utf-8", errors="replace"), - "pid": process.pid, - "process_id": None, - "error": None, - } - - except asyncio.TimeoutError: - # 超时,终止进程 - try: - process.terminate() - await asyncio.wait_for(process.wait(), timeout=5) - except asyncio.TimeoutError: - process.kill() - await process.wait() - - return { - "success": False, - "return_code": -signal.SIGTERM, - "stdout": "", - "stderr": "", - "pid": process.pid, - "process_id": None, - "error": f"Command timed out after {timeout} seconds", - } - - except Exception as e: - return { - "success": False, - "return_code": None, - "stdout": "", - "stderr": "", - "pid": None, - "process_id": None, - "error": str(e), - } +class ProcessListResponse(BaseModel): + processes: List[ProcessInfo] @router.post("/exec", response_model=ExecuteShellResponse) @@ -162,7 +43,7 @@ async def execute_shell_command( """执行Shell命令""" try: result = await run_as_user( - username=x_session_id, + session_id=x_session_id, command=request.command, cwd=request.cwd, env=request.env, @@ -177,3 +58,24 @@ async def execute_shell_command( raise HTTPException( status_code=500, detail=f"Failed to execute command: {str(e)}" ) + + +@router.get("/processes", response_model=ProcessListResponse) +async def list_background_processes( + x_session_id: str = Header(..., alias="X-SESSION-ID") +): + """获取当前会话的后台进程列表""" + from .user_manager import get_session_background_processes + + processes = get_session_background_processes(x_session_id) + return ProcessListResponse( + processes=[ + ProcessInfo( + process_id=p["process_id"], + pid=p["pid"], + command=p["command"], + status=p["status"], + ) + for p in processes + ] + ) diff --git a/pkgs/ship/app/components/user_manager.py b/pkgs/ship/app/components/user_manager.py index c6efca8..121bbf2 100644 --- a/pkgs/ship/app/components/user_manager.py +++ b/pkgs/ship/app/components/user_manager.py @@ -13,9 +13,10 @@ import shutil import shlex import json +import uuid from dataclasses import dataclass from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, List from fastapi import HTTPException logger = logging.getLogger(__name__) @@ -23,6 +24,9 @@ # 存储session到用户名的映射 session_users: Dict[str, str] = {} +# 后台进程注册表:session_id -> {process_id -> BackgroundProcessEntry} +_background_processes: Dict[str, Dict[str, "BackgroundProcessEntry"]] = {} + # 用户ID范围(从10000开始,避免与系统用户冲突) USER_ID_START = 10000 USER_ID_COUNTER = USER_ID_START @@ -44,9 +48,83 @@ class ProcessResult: stderr: str return_code: Optional[int] = None pid: Optional[int] = None + process_id: Optional[str] = None error: Optional[str] = None +class BackgroundProcessEntry: + """后台进程条目""" + + def __init__( + self, + process_id: str, + pid: int, + command: str, + process: asyncio.subprocess.Process, + ): + self.process_id = process_id + self.pid = pid + self.command = command + self.process = process + + @property + def status(self) -> str: + """获取进程状态""" + if self.process.returncode is None: + return "running" + elif self.process.returncode == 0: + return "completed" + else: + return "failed" + + +def generate_process_id() -> str: + """生成进程ID""" + return str(uuid.uuid4())[:8] + + +def register_background_process( + session_id: str, + process_id: str, + pid: int, + command: str, + process: asyncio.subprocess.Process, +) -> None: + """注册后台进程""" + if session_id not in _background_processes: + _background_processes[session_id] = {} + _background_processes[session_id][process_id] = BackgroundProcessEntry( + process_id=process_id, + pid=pid, + command=command, + process=process, + ) + logger.info( + "Registered background process: session=%s process_id=%s pid=%s", + session_id, + process_id, + pid, + ) + + +def get_session_background_processes(session_id: str) -> List[Dict]: + """获取指定 session 的所有后台进程""" + if session_id not in _background_processes: + return [] + + processes = [] + for entry in _background_processes[session_id].values(): + processes.append( + { + "process_id": entry.process_id, + "pid": entry.pid, + "command": entry.command, + "status": entry.status, + } + ) + return processes + + def save_session_users(): """保存 session 到用户的映射关系到磁盘""" try: @@ -399,7 +477,7 @@ async def get_or_create_session_user(session_id: str) -> str: async def run_as_user( - username: str, + session_id: str, command: str, cwd: Optional[str] = None, env: Optional[Dict[str, str]] = None, @@ -409,7 +487,7 @@ async def run_as_user( ) -> ProcessResult: """以指定用户身份运行命令""" try: - username = await get_or_create_session_user(username) + username = await get_or_create_session_user(session_id) user_info = await UserManager.get_user_info(username) user_home = user_info["home_dir"] @@ -443,27 +521,56 @@ async def run_as_user( # 使用 sudo 切换用户执行命令 # sudo_command = f"sudo -u {username} -H bash -c 'cd {working_dir} && {command}'" + env_args = [] + if env: + for key, value in env.items(): + env_args.append(f"{key}={value}") + if shell: - process = await asyncio.create_subprocess_exec( + sudo_args = [ "sudo", "-u", username, "-H", - "bash", - "-lc", - f"cd {shlex.quote(str(working_dir))} && {command}", + ] + if env_args: + sudo_args.extend(["env", *env_args]) + sudo_args.extend( + [ + "bash", + "-lc", + f"cd {shlex.quote(str(working_dir))} && {command}", + ] + ) + logger.debug( + "Shell exec args: %s env_keys=%s", + sudo_args, + list(env.keys()) if env else [], + ) + process = await asyncio.create_subprocess_exec( + *sudo_args, env=process_env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) else: args = shlex.split(command) - process = await asyncio.create_subprocess_exec( + sudo_args = [ "sudo", "-u", username, "-H", - *args, + ] + if env_args: + sudo_args.extend(["env", *env_args]) + sudo_args.extend(args) + logger.debug( + "Exec args: %s env_keys=%s", + sudo_args, + list(env.keys()) if env else [], + ) + process = await asyncio.create_subprocess_exec( + *sudo_args, env=process_env, cwd=str(working_dir), stdout=asyncio.subprocess.PIPE, @@ -471,12 +578,30 @@ async def run_as_user( ) if background: + process_id = generate_process_id() + # 注册后台进程(使用原始的 session_id,即调用时传入的 username 参数值) + # 注意:run_as_user 的第一个参数实际上是 session_id + register_background_process( + session_id=session_id, + process_id=process_id, + pid=process.pid, + command=command, + process=process, + ) + logger.info( + "Background shell exec started: user=%s pid=%s process_id=%s cmd=%s", + username, + process.pid, + process_id, + command, + ) return ProcessResult( success=True, return_code=0, stdout="", stderr="", pid=process.pid, + process_id=process_id, ) else: try: @@ -489,6 +614,7 @@ async def run_as_user( stdout=stdout.decode().strip(), stderr=stderr.decode().strip(), pid=process.pid, + process_id=None, ) except asyncio.TimeoutError: process.kill() @@ -499,10 +625,18 @@ async def run_as_user( stdout="", stderr="", pid=process.pid, + process_id=None, error="Command timed out", ) except Exception as e: + logger.exception( + "Shell exec failed: session_id=%s cmd=%s cwd=%s env_keys=%s", + session_id, + command, + cwd, + list(env.keys()) if env else [], + ) return ProcessResult( success=False, return_code=-1, @@ -510,4 +644,5 @@ async def run_as_user( stderr="", error=str(e), pid=None, + process_id=None, ) diff --git a/pkgs/ship/tests/integration/test_integration.py b/pkgs/ship/tests/integration/test_integration.py index daca93b..be59f42 100644 --- a/pkgs/ship/tests/integration/test_integration.py +++ b/pkgs/ship/tests/integration/test_integration.py @@ -1,3 +1,4 @@ +import os import pytest import docker import requests @@ -5,6 +6,12 @@ import uuid from pathlib import Path +existing_no_proxy = os.environ.get("NO_PROXY", "") +if "127.0.0.1" not in existing_no_proxy and "localhost" not in existing_no_proxy: + os.environ["NO_PROXY"] = ( + f"{existing_no_proxy},127.0.0.1,localhost".strip(",") + ) + @pytest.fixture(scope="session") def docker_client(): @@ -63,6 +70,7 @@ def ship_container(docker_client, ship_image): # 等待服务启动 max_retries = 30 + startup_error = None for i in range(max_retries): try: response = requests.get(f"{base_url}/health", timeout=5) @@ -77,7 +85,14 @@ def ship_container(docker_client, ship_image): # 打印容器日志用于调试 logs = container.logs().decode("utf-8") print(f"Container logs:\n{logs}") - raise RuntimeError("Ship container failed to start") + startup_error = RuntimeError("Ship container failed to start") + + if startup_error: + try: + container.stop(timeout=10) + except Exception: + pass + raise startup_error yield {"container": container, "base_url": base_url, "port": host_port} From 77324212b6cdc437d165ab74c9cb44837f4d7ff9 Mon Sep 17 00:00:00 2001 From: RC-CHN <1051989940@qq.com> Date: Fri, 23 Jan 2026 09:25:11 +0800 Subject: [PATCH 7/7] refactor(ship): use cwd parameter for kernel working directory setup - Set kernel working directory via start_kernel(cwd=...) instead of dynamic os.chdir() execution for better reliability - Extract matplotlib font configuration to static constant - Rename function to _init_kernel_matplotlib for clarity - Add error handling for image cleanup in integration tests --- pkgs/ship/app/components/ipython.py | 36 +++++++++++-------- .../tests/integration/test_integration.py | 6 +++- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/pkgs/ship/app/components/ipython.py b/pkgs/ship/app/components/ipython.py index 445673b..ef29cfe 100644 --- a/pkgs/ship/app/components/ipython.py +++ b/pkgs/ship/app/components/ipython.py @@ -38,13 +38,14 @@ async def get_or_create_kernel(session_id: str) -> AsyncKernelManager: # 创建会话工作目录 workspace_dir = await get_session_workspace(session_id) - # 创建新的内核管理器 + # 创建新的内核管理器,在启动时设置工作目录 km: AsyncKernelManager = AsyncKernelManager() - await km.start_kernel() + # 通过 cwd 参数在启动时设置工作目录,避免动态代码执行 + await km.start_kernel(cwd=str(workspace_dir)) kernel_managers[session_id] = km - # 在内核启动后立即设置工作目录 - await _set_kernel_working_directory(km, str(workspace_dir)) + # 执行静态初始化代码(字体配置等) + await _init_kernel_matplotlib(km) return kernel_managers[session_id] @@ -55,18 +56,12 @@ async def ensure_kernel_running(km: AsyncKernelManager): await km.start_kernel() -async def _set_kernel_working_directory(km: AsyncKernelManager, working_dir: str): - """设置内核的工作目录""" - kc = km.client() - try: - # 执行初始化代码,包括中文字体配置 - init_code = f""" +# 静态初始化代码(matplotlib 字体配置等,不包含任何动态内容) +_KERNEL_INIT_CODE = """ import matplotlib.pyplot as plt import matplotlib.font_manager as fm import shutil, os -os.chdir({working_dir!r}) - cache_dir = os.path.expanduser("~/.cache/matplotlib") if os.path.exists(cache_dir): shutil.rmtree(cache_dir) @@ -77,10 +72,21 @@ async def _set_kernel_working_directory(km: AsyncKernelManager, working_dir: str plt.rcParams['font.family'] = font_prop.get_name() plt.rcParams['axes.unicode_minus'] = False """ - kc.execute(init_code, silent=True, store_history=False) + +async def _init_kernel_matplotlib(km: AsyncKernelManager): + """初始化内核的 matplotlib 配置 + + 执行静态初始化代码来配置中文字体等。 + 工作目录已在 start_kernel(cwd=...) 时设置。 + """ + kc = km.client() + try: + # 执行静态初始化代码(不包含任何动态内容) + kc.execute(_KERNEL_INIT_CODE, silent=True, store_history=False) + # 等待执行完成 - timeout = 10 # 增加超时时间以便字体配置完成 + timeout = 10 while True: try: msg = await asyncio.wait_for(kc.get_iopub_msg(), timeout=timeout) @@ -93,7 +99,7 @@ async def _set_kernel_working_directory(km: AsyncKernelManager, working_dir: str break except Exception as e: - print(f"Warning: Failed to set working directory: {e}") + print(f"Warning: Failed to initialize matplotlib: {e}") async def execute_code_in_kernel( diff --git a/pkgs/ship/tests/integration/test_integration.py b/pkgs/ship/tests/integration/test_integration.py index be59f42..2f32997 100644 --- a/pkgs/ship/tests/integration/test_integration.py +++ b/pkgs/ship/tests/integration/test_integration.py @@ -44,7 +44,11 @@ def ship_image(docker_client): # 测试完成后清理镜像 print("Cleaning up ship image...") - docker_client.images.remove(image.id, force=True) + try: + docker_client.images.remove(image.id, force=True) + except docker.errors.ImageNotFound: + # 镜像可能已经被其他进程删除 + print(f"Image {image.id} already removed, skipping cleanup") @pytest.fixture(scope="session")