diff --git a/api-service/Dockerfile.minimal b/api-service/Dockerfile.minimal new file mode 100644 index 0000000..09dfa18 --- /dev/null +++ b/api-service/Dockerfile.minimal @@ -0,0 +1,5 @@ +FROM alpine:3.20 +RUN apk add --no-cache ca-certificates tzdata socat tar +COPY api-service-linux-amd64 /usr/bin/api-service +EXPOSE 8080 +ENTRYPOINT ["/usr/bin/api-service"] diff --git a/api-service/controller/env_instance.go b/api-service/controller/env_instance.go index 29d3f8d..0db53de 100644 --- a/api-service/controller/env_instance.go +++ b/api-service/controller/env_instance.go @@ -129,6 +129,8 @@ func (ctrl *EnvInstanceController) CreateEnvInstance(c *gin.Context) { } envInstance.Env = backendEnv + log.Infof("Created instance %s for env %s [labels=%v, owner=%s]", envInstance.ID, req.EnvName, req.Labels, req.Owner) + // Set owner from DeployConfig if available (controller stores it in pod labels but doesn't return it) if backendEnv.DeployConfig != nil { if ownerValue, ok := backendEnv.DeployConfig["owner"]; ok { diff --git a/api-service/docs/experiment-admission-design.md b/api-service/docs/experiment-admission-design.md new file mode 100644 index 0000000..978212a --- /dev/null +++ b/api-service/docs/experiment-admission-design.md @@ -0,0 +1,265 @@ +# Experiment-Level Resource Admission Control — 方案设计与比较 + +## 问题背景 + +当前 AEnvironment 沙箱集群提供的是 **env instance 级别的 API**。上层 RL 训练实验和评测实验在多个 step 中批量创建 env instance,用完后释放,呈现锯齿波形 (sawtooth pattern): + +```text +实验A: ████░░░░████░░░░████░░░░ +实验B: ████░░░░████░░░░████ +实验C: ████░░░░████░░░░ + ↑ 三个波峰重叠 → 资源耗尽 → 全部崩溃 +``` + +当多个实验的波峰重叠时,触发 huse-scheduler 的**全局**水位限制 (`ClusterResourceUtil >= ClusterHealthyWaterMark`),导致**所有实验同时失败**,爆炸半径过大。 + +**核心需求**: 实验间资源隔离 — 先到的实验保障资源,后到的实验在资源不足时被优雅拒绝,而非所有实验一起崩溃。 + +--- + +## 方案一:显式 Experiment API(注册制) + +### 设计概述 + +新增 experiment 级别的 API,实验在使用沙箱之前必须先注册,声明资源 quota。系统预留资源后,保证已注册实验始终有资源可用。 + +```text +┌─────────────────────────────────────────────────┐ +│ 实验生命周期 │ +│ │ +│ POST /experiment │ +│ { "experiment_id": "rl-train-001", │ +│ "quota": { "cpu": 8000, "instances": 4 } } │ +│ → 201: 注册成功,资源已预留 │ +│ → 429: 集群资源不足,无法注册 │ +│ │ +│ POST /env-instance │ +│ { "labels": {"experiment": "rl-train-001"} } │ +│ → 200: 在已预留 quota 内创建 │ +│ → 403: 超出该实验 quota 上限 │ +│ │ +│ DELETE /experiment/rl-train-001 │ +│ → 200: 释放预留资源 │ +└─────────────────────────────────────────────────┘ +``` + +### 数据模型 + +```go +type Experiment struct { + ID string `json:"id"` + Quota ResourceQuota `json:"quota"` + Status string `json:"status"` // "active" | "draining" | "terminated" + CurrentUsed ResourceUsage `json:"current_used"` + CreatedAt time.Time `json:"created_at"` + Owner string `json:"owner"` + Labels map[string]string `json:"labels"` +} + +type ResourceQuota struct { + MaxInstances int `json:"max_instances"` + CPUMilli int64 `json:"cpu_milli"` + MemoryMB int64 `json:"memory_mb"` +} +``` + +### 准入流程 + +```text +POST /experiment (注册) + → 检查: cluster_total - Σ(已注册实验 quota) >= 请求 quota? + → YES: 预留资源,返回 201 + → NO: 返回 429 "Insufficient cluster capacity" + +POST /env-instance (创建实例) + → 中间件检查: 该实验 current_used < quota? + → YES: 允许创建 + → NO: 返回 403 "Experiment quota exceeded" + +DELETE /experiment (去注册) + → 标记 status = "draining" + → 等待所有 instance 释放 (或强制清理) + → 释放预留资源 +``` + +### 存储需求 + +需要持久化实验注册信息(Redis 或 DB),因为 api-service 重启后需恢复预留状态。 + +--- + +## 方案二:隐式 Label 追踪(无注册) + +### 方案二概述 + +不新增 experiment API。api-service 根据 `CreateEnvInstance` 请求中携带的 `experiment` label 自动识别实验,通过历史 instance 峰值来计算预留资源,在资源不足时只拒绝新加入的实验。 + +```text +┌──────────────────────────────────────────────────────┐ +│ 准入决策流程 (ExperimentAdmission middleware) │ +│ │ +│ POST /env-instance │ +│ { "labels": {"experiment": "rl-train-001"} } │ +│ │ +│ 1. 提取 experiment label │ +│ 2. 是否已知实验? (有活跃 instance) │ +│ → YES: 直接放行 (先来先得保护) │ +│ → NO: 进入准入检查 │ +│ 3. 计算: reserved = Σ(各实验历史峰值 × 单实例CPU) │ +│ available = cluster_total - reserved │ +│ 4. available > per_instance_cpu ? │ +│ → YES: 放行新实验 │ +│ → NO: 返回 429 拒绝 │ +└──────────────────────────────────────────────────────┘ +``` + +### 数据模型(已实现) + +```go +type ExperimentState struct { + FirstSeen time.Time + CurrentCount int // 当前活跃 instance 数 + PeakCount int // 滑动窗口内峰值 + PeakSamples []PeakSample // 峰值采样环形缓冲区 +} +``` + +### 核心公式 + +```text +reserved_capacity = Σ (各活跃实验的历史峰值 instance 数 × 单实例 CPU) +available_for_new = cluster_total - reserved_capacity +新实验准入条件: available_for_new > per_instance_cpu +``` + +### 去注册机制 + +通过**滑动窗口衰减**自动去注册: + +- 实验无活跃 instance 后,历史峰值采样随时间过期 +- 窗口(默认 15m)内所有采样过期后,实验记录自动删除 +- 预留资源随之释放,新实验可进入 + +### 方案二存储需求 + +纯内存状态,无持久化依赖。api-service 重启后通过 `ListEnvInstances` 在 ~5m 内自动重建。 + +--- + +## 方案对比 + +| 维度 | 方案一:显式 API | 方案二:隐式追踪 | +|------|-----------------|-----------------| +| **上层改造** | 需要改造。SWEAgent 等调用方需新增 register/deregister 调用 | **零改造**。只需 label 中携带 experiment 字段(已支持) | +| **Quota 精确性** | 精确。用户声明多少就预留多少 | 近似。基于历史峰值推断,首次创建时无历史数据 | +| **资源利用率** | **较低**。预留即锁定,即使实验处于 inference 低谷期资源也不释放 | **较高**。预留跟随实际用量的峰值,低谷期其他实验可使用 | +| **去注册** | 需要调用方主动 DELETE,否则资源泄漏 | **自动**。滑动窗口衰减,无需人工干预 | +| **资源泄漏风险** | **高**。调用方 crash / 忘记去注册 → 永久占用 | **低**。自动衰减机制保证最终释放 | +| **Quota 评估负担** | **高**。用户需要预估峰值实例数,估大浪费,估小不够 | **无**。系统自动追踪 | +| **首次创建保护** | 注册时即保护 | 首次创建时无历史数据,靠 fail-open 放行 | +| **多副本一致性** | 需要**共享存储**(Redis/DB)同步实验注册状态 | 每个副本独立状态,保守估计(可接受) | +| **实现复杂度** | **高**。新增 API + 持久化 + 去注册超时清理 + SDK 适配 | **低**。中间件 + 内存状态,~200 行核心代码 | +| **API 兼容性** | 破坏性变更:新增必须调用的 API | **完全兼容**:现有 API 不变 | +| **Feature gate** | 需要,切换复杂 | 简单 `--admission-enabled` 开关 | +| **故障模式** | 注册服务不可用 → 新实验无法创建 | scheduler 不可用 → fail-open 放行所有请求 | + +--- + +## 关键场景分析 + +### 场景 1:正常运行 — 3 个实验交错运行 + +| | 方案一 | 方案二 | +|---|---|---| +| 效果 | 三个实验各自在 quota 内运行 | 三个实验被自动识别,各自峰值被追踪 | +| 利用率 | 低:每个实验预留的 quota 在低谷期闲置 | 高:低谷期资源可被其他实验的突发使用 | + +### 场景 2:资源紧张 — 第 4 个实验尝试加入 + +| | 方案一 | 方案二 | +|---|---|---| +| 效果 | 注册时直接拒绝 (429) | 创建第一个 instance 时拒绝 (429) | +| 时机 | 更早(注册阶段) | 稍晚(首次创建阶段) | +| 上层感知 | 需处理注册失败 | 需处理创建失败(已有错误处理路径) | + +### 场景 3:实验异常退出(调用方 crash) + +| | 方案一 | 方案二 | +|---|---|---| +| 效果 | **资源泄漏**,需额外超时清理机制 | 15m 窗口后自动释放,instance 由 TTL cleanup 回收 | +| 恢复时间 | 取决于超时清理间隔 | 等于滑动窗口 (15m) | + +### 场景 4:api-service 重启 + +| | 方案一 | 方案二 | +|---|---|---| +| 恢复 | 从 Redis/DB 读取注册状态,立即可用 | ~5m 内通过 ListEnvInstances 重建,期间 fail-open | +| 风险 | 存储不可用时无法恢复 | 短暂窗口内可能过度放行 | + +### 场景 5:用户 Quota 评估不准 + +| | 方案一 | 方案二 | +|---|---|---| +| 估大 | 资源浪费,其他实验无法注册 | N/A | +| 估小 | 实验运行中超额被拒,需重新注册 | N/A | +| 自适应 | 不支持 | **自动适应**实际使用模式 | + +--- + +## 综合评估 + +### 方案一的优势场景 + +- **合规/企业级多租户**:需要严格的资源隔离和 quota 审计 +- **资源计费**:按预留 quota 计费的商业模式 +- **高确定性**:对 SLA 要求极高,不允许任何概率性行为 + +### 方案二的优势场景 + +- **内部 RL 训练平台**(当前场景):快速迭代,上层不想改代码 +- **资源利用率敏感**:集群资源有限,不允许空预留 +- **实验行为不可预测**:无法准确预估 quota +- **快速上线**:零上层改造,feature gate 即开即关 + +--- + +## 推荐 + +对于当前 RL 训练场景,**方案二(隐式追踪)更适合**,原因: + +1. **零改造成本** — SWEAgent 等上层系统不需要任何修改,已经通过 label 携带 experiment 信息 +2. **资源利用率高** — RL 实验的锯齿波特征意味着方案一会有大量空闲预留 +3. **自适应** — 不需要用户评估 quota,系统自动追踪实际使用模式 +4. **容错性好** — 实验 crash 后自动释放,不存在资源泄漏 +5. **已部分实现** — 核心代码已就绪,feature gate 可控 + +### 可选增强(渐进演进到混合模式) + +方案二可以渐进增强为"混合模式",在不破坏现有行为的前提下叠加方案一的能力: + +- 新增**可选的** `POST /experiment` 注册 API,允许预声明 quota +- 有注册的实验:按声明的 quota 预留(方案一行为) +- 无注册的实验:按历史峰值追踪(方案二行为) +- 这样既保持向后兼容,又为有高 SLA 需求的实验提供确定性保障 + +--- + +## 附录:方案二已实现组件 + +| 文件 | 说明 | +|------|------| +| `service/experiment_admission.go` | 核心准入控制服务(峰值追踪、集群资源轮询、准入决策) | +| `service/experiment_admission_test.go` | 13 个单元测试(准入逻辑、滑动窗口、并发安全、容错降级) | +| `middleware/experiment_admission.go` | Gin 中间件(提取 experiment label、调用准入检查) | +| `metrics/admission_metrics.go` | Prometheus 指标(admission_total、reserved_capacity、experiment_count、peak_instances) | +| `main.go` | CLI flags 和组件接线(`--admission-enabled`、`--scheduler-addr`、`--per-instance-cpu`、`--peak-window`) | + +### 启用方式 + +```bash +api-service \ + --admission-enabled \ + --scheduler-addr=http://huse-scheduler-0:14457 \ + --per-instance-cpu=2000 \ + --peak-window=15m +``` diff --git a/api-service/docs/experiment-admission-detailed-design.md b/api-service/docs/experiment-admission-detailed-design.md new file mode 100644 index 0000000..64c538a --- /dev/null +++ b/api-service/docs/experiment-admission-detailed-design.md @@ -0,0 +1,519 @@ +# Experiment-Level Resource Admission Control — 详细设计方案 + +> 基于方案二(隐式 Label 追踪)+ 二级限流增强 + +## 1. 问题与目标 + +### 1.1 问题 + +RL 训练实验在多个 step 中批量创建 env instance,呈锯齿波形。多个实验波峰重叠时,触发 huse-scheduler 全局水位限制,导致**所有实验同时失败**。 + +```text +实验A: ████░░░░████░░░░████ +实验B: ████░░░░████░░░░ +实验C: ████░░░░████ + ↑ 波峰重叠 → 全局崩溃 +``` + +### 1.2 目标 + +- **先来先得保障**:已有实验的资源不受新实验影响 +- **二级限流**:集群利用率超过水位时,逐级收紧准入 +- **元数据规范化**:强制要求携带可配置的标签字段 +- **全功能可选**:feature gate 控制,默认关闭,零影响现有行为 + +--- + +## 2. 优先级模型 + +所有 `POST /env-instance` 请求根据标签分为三个优先级: + +| 优先级 | 名称 | 条件 | 准入策略 | +|--------|------|------|----------| +| **P0** | 已知实验 | experiment label 存在,且该实验已有活跃 instance | **始终放行**(核心保障) | +| **P1** | 新实验 | experiment label 存在,但该实验无活跃 instance 记录 | 集群利用率 < watermark **且** 预留容量有余量时放行 | +| **P2** | 无标签 | 缺少必需 label(可配置,默认要求 `experiment`) | **直接拒绝** (429) | + +### 决策流程图 + +```text +POST /env-instance 到达 + │ + ▼ +[提取 labels] + │ + ├── 缺少必需 label? ──YES──→ 429 "Missing required labels: experiment" (P2) + │ + ▼ +[查找 experiment 状态] + │ + ├── 已知实验? (有活跃 instance) ──YES──→ ✅ ALLOW (P0) + │ + ▼ +[新实验准入检查] + │ + ├── cluster_used / cluster_total >= watermark? + │ ──YES──→ 429 "Cluster utilization above watermark" (P1 rejected) + │ + ├── cluster_total - reserved_capacity <= per_instance_cpu? + │ ──YES──→ 429 "Insufficient capacity for new experiment" (P1 rejected) + │ + └── ✅ ALLOW (P1 admitted) +``` + +### P1 双重门控说明 + +新实验(P1)必须同时通过两道检查: + +1. **水位检查** `cluster_used / cluster_total < watermark` + - 基于 scheduler 返回的**实际集群利用率** + - 防止在集群已经繁忙时放入新实验 + - 捕获非沙箱负载(系统 pod、其他租户)导致的资源紧张 + +2. **预留容量检查** `cluster_total - reserved_capacity > per_instance_cpu` + - 基于已追踪实验的**历史峰值预留** + - 确保为每个已有实验保留足够的峰值资源 + - 即使实际利用率暂时较低(实验处于低谷),也不会过度承诺 + +两者互补:水位检查反映集群**当前真实状态**,预留容量检查反映**前瞻性承诺**。 + +--- + +## 3. 数据源 + +### 3.1 集群资源(来自 faas-api-service 统一接口) + +#### 架构背景 + +huse-scheduler 是**分区级组件**,每个 scheduler 实例只管理一个分区的节点子集: + +```text +faas-api-service ──gRPC──→ huse-coordinator ──路由──→ huse-scheduler-0 (分区 A) + ├──→ huse-scheduler-1 (分区 B) + └──→ huse-scheduler-2 (分区 C) +``` + +- 每个 scheduler 暴露 `:14457/clusterresource` 返回**该分区**的资源状态 +- huse-coordinator 已经周期性轮询每个 scheduler 的 `/clusterresource` 并缓存在 `SchedulerCache.State.ClusterResource` +- **不能直接轮询单个 scheduler**,否则只看到一个分区的数据 + +#### 解决方案:faas-api-service 新增统一聚合接口 + +在 faas-api-service(faas-apiserver)中新增 HTTP 接口,聚合所有 scheduler 分区的资源数据: + +```json +// GET http://faas-api-service:8233/hapis/faas.hcs.io/v1/clusterinfo +→ { + "success": true, + "data": { + "totalCPU": 600000, // 所有分区 TotalCPU 之和 + "usedCPU": 420000, // 所有分区 UsedCPU 之和 + "freeCPU": 180000, // 所有分区 FreeCPU 之和 + "totalMemory": 1200000, // 所有分区 TotalMemory 之和 + "usedMemory": 840000, + "freeMemory": 360000, + "partitions": [ // 可选:每分区明细 + {"name": "scheduler-0", "totalCPU": 200000, "usedCPU": 140000, "healthy": true}, + {"name": "scheduler-1", "totalCPU": 200000, "usedCPU": 140000, "healthy": true}, + {"name": "scheduler-2", "totalCPU": 200000, "usedCPU": 140000, "healthy": true} + ], + "healthyPartitions": 3, + "totalPartitions": 3 + } + } +``` + +**实现方式**:faas-api-service 通过 gRPC 连接 huse-coordinator,coordinator 已持有所有 scheduler 的 `SchedulerCache`。新增接口从 coordinator 获取(或 faas-api-service 自身缓存)所有分区的聚合资源。 + +#### api-service 侧 + +- api-service 的 `ExperimentAdmission` 轮询 faas-api-service 的 `/clusterinfo` 聚合接口(而非直接连 scheduler) +- 轮询间隔:10s +- 用途:计算 `utilization = UsedCPU / TotalCPU`(全局聚合值) +- 容错:faas-api-service 不可达时保留最后已知值,无数据时 fail-open +- 仅使用健康分区的聚合数据(`healthyPartitions > 0`) + +### 3.2 实验实例数(来自 ListEnvInstances) + +- 数据源:现有 `startUnifiedPeriodicTask` 每 5m 调用 `ListEnvInstances("")` +- 按 `labels["experiment"]` 分组统计活跃 instance 数 +- 排除 `Terminated` / `Failed` 状态的 instance +- 滑动窗口(默认 15m)内追踪每个实验的峰值 instance 数 + +--- + +## 4. 核心数据结构 + +### 4.1 ExperimentAdmission(增强后) + +```go +type ExperimentAdmission struct { + mu sync.RWMutex + experiments map[string]*ExperimentState + + // Cluster resource (from scheduler polling) + clusterTotal int64 // TotalCPU (milli) + clusterUsed int64 // UsedCPU (milli) + hasClusterData bool + + // Configuration + perInstanceCPU int64 // per-instance CPU (milli), default 2000 + peakWindow time.Duration // sliding window, default 15m + watermark float64 // utilization threshold, default 0.7 + requiredLabels []string // required labels, default ["experiment"] + clusterInfoEndpoint string // faas-api-service /clusterinfo URL + pollInterval time.Duration // default 10s + + httpClient *http.Client +} +``` + +### 4.2 ExperimentState(不变) + +```go +type ExperimentState struct { + FirstSeen time.Time + CurrentCount int + PeakCount int + PeakSamples []PeakSample +} +``` + +### 4.3 准入结果 + +```go +type AdmissionResult struct { + Allowed bool + Reason string + Tier string // "p0_known", "p1_new", "p2_unlabeled" +} +``` + +--- + +## 5. 核心公式 + +```text +utilization = cluster_used / cluster_total (实际利用率) +reserved = Σ (experiment.PeakCount × perInstanceCPU) (已承诺容量) +available = cluster_total - reserved (可分配余量) + +P1 准入条件: utilization < watermark AND available > perInstanceCPU +P0 准入条件: always true +P2 准入条件: always false (缺少必需 label) +``` + +--- + +## 6. 配置参数 + +| Flag | 类型 | 默认值 | 说明 | +|------|------|--------|------| +| `--admission-enabled` | bool | `false` | 功能总开关。关闭时所有请求直接放行,零额外开销 | +| `--scheduler-addr` | string | `""` | 集群资源 API 地址。FaaS 模式下指向 faas-api-service(如 `http://faas-api-service:8233`),复用 `--schedule-addr` 即可 | +| `--per-instance-cpu` | int64 | `2000` | 单实例 CPU(milli-cores),匹配 RL 沙箱规格 | +| `--peak-window` | duration | `15m` | 峰值滑动窗口,覆盖 3-5 个 RL 推理-执行周期 | +| `--admission-watermark` | float64 | `0.7` | 集群利用率阈值,超过则拒绝新实验 | +| `--admission-required-labels` | string | `"experiment"` | 逗号分隔的必需 label 列表,缺少任一则拒绝 | + +### 配置示例 + +```bash +# 基本启用(FaaS 模式,复用 --schedule-addr 指向 faas-api-service) +api-service --admission-enabled --schedule-addr=http://faas-api-service:8233 + +# 严格模式:要求 experiment+owner,60% 水位即收紧 +api-service \ + --admission-enabled \ + --schedule-addr=http://faas-api-service:8233 \ + --admission-watermark=0.6 \ + --admission-required-labels=experiment,owner \ + --per-instance-cpu=2000 \ + --peak-window=15m + +# 禁用(默认):完全不影响现有行为 +api-service # admission-enabled 默认 false +``` + +--- + +## 7. 中间件位置 + +在现有中间件链中的位置不变: + +```text +POST /env-instance + → AuthTokenMiddleware (身份认证) + → InstanceLimitMiddleware (Token 级别的实例配额) + → ExperimentAdmissionMiddleware ← 准入控制 + → RateLimit (全局 QPS 限流) + → CreateEnvInstance (业务逻辑) +``` + +### 中间件行为 + +```go +func ExperimentAdmissionMiddleware(admission *ExperimentAdmission) gin.HandlerFunc { + return func(c *gin.Context) { + // Feature gate + if admission == nil { + c.Next() + return + } + + // 1. Peek request body, extract labels + labels := extractLabelsFromRequest(c) + + // 2. Check required labels (P2 gate) + if missing := admission.CheckRequiredLabels(labels); len(missing) > 0 { + // 429, reject, metric p2_rejected + return + } + + // 3. Admission decision (P0/P1) + result := admission.ShouldAdmit(labels["experiment"]) + if !result.Allowed { + // 429, reject, metric by tier + return + } + + // metric allowed by tier + c.Next() + } +} +``` + +--- + +## 8. 滑动窗口与自动去注册 + +### 8.1 峰值追踪 + +每次 `ListEnvInstances` 返回时(每 5m): + +```text +对每个 experiment: + 1. 统计活跃 instance 数 → currentCount + 2. 记录 PeakSample{now, currentCount} + 3. 淘汰窗口外的旧采样 + 4. 重新计算 peakCount = max(窗口内所有采样) +``` + +### 8.2 自动去注册 + +```text +实验释放所有 instance 后: + → 不再产生新的非零采样 + → 旧采样逐渐过期 (15m 窗口) + → peakCount 衰减到 0 + → 实验记录自动删除 + → 预留资源释放,新实验可进入 +``` + +时序示意: + +```text +t=0 实验结束,所有 instance 释放 +t=5m UpdateExperimentCounts: currentCount=0, 窗口内仍有旧峰值采样 +t=10m UpdateExperimentCounts: currentCount=0, 旧采样继续过期 +t=15m UpdateExperimentCounts: 所有采样过期, peakCount=0 → 删除实验记录 +``` + +--- + +## 9. 容错设计 + +| 故障场景 | 行为 | 原因 | +|----------|------|------| +| faas-api-service 不可达 | 保留最后已知的 cluster_used/cluster_total,继续使用 | stale data 优于无 data | +| 部分 scheduler 分区不健康 | faas-api-service 仅聚合健康分区数据,返回中包含健康分区数 | 局部故障不影响准入决策 | +| 首次启动无数据 | fail-open,所有请求放行 | 避免冷启动阻塞所有业务 | +| api-service 重启 | ~5m 内通过 ListEnvInstances 自动重建实验状态,期间 fail-open | 无持久化依赖 | +| ListEnvInstances 失败 | 保持上一次的实验状态不变 | 已有行为 | +| 多副本独立状态 | 每个副本独立追踪,是全局的下界估计 | scheduler 全局水位是最终安全网 | + +--- + +## 10. 可观测性 + +### 10.1 Prometheus 指标 + +| 指标名 | 类型 | 标签 | 说明 | +|--------|------|------|------| +| `aenv_api_experiment_admission_total` | Counter | `decision={allowed,rejected}`, `tier={p0_known,p1_new,p2_unlabeled}` | 各优先级的准入决策计数 | +| `aenv_api_experiment_admission_watermark_ratio` | Gauge | — | 当前集群利用率 (`used/total`) | +| `aenv_api_experiment_reserved_capacity` | Gauge | — | 总预留 CPU (milli) | +| `aenv_api_experiment_count` | Gauge | — | 活跃实验数 | +| `aenv_api_experiment_peak_instances` | GaugeVec | `experiment` | 每实验的滑动窗口峰值 | + +### 10.2 日志 + +```text +# P2 拒绝 +WARN Experiment admission: rejected request missing required labels [experiment] (tier=p2) + +# P1 新实验被水位拒绝 +WARN Experiment admission: rejected new experiment "rl-train-005" + cluster_utilization=0.73 watermark=0.70 (tier=p1, reason=watermark) + +# P1 新实验被容量拒绝 +WARN Experiment admission: rejected new experiment "rl-train-005" + reserved=180000 available=20000 required=2000 (tier=p1, reason=capacity) + +# P0 已知实验放行 +DEBUG Experiment admission: allowed known experiment "rl-train-001" (tier=p0) +``` + +### 10.3 Debug 端点 + +`GET /metrics` 中包含上述 Prometheus 指标。`ExperimentAdmission.GetMetrics()` 返回完整内存状态,可用于排查。 + +--- + +## 11. 边界情况 + +| 场景 | 处理 | +|------|------| +| experiment label 为空字符串 `""` | 视为缺少 label → P2 拒绝 | +| 同一 experiment ID 不同 owner | 合并为同一实验追踪(experiment 是分组 key) | +| 单实验独占整个集群 | P0 始终放行,不限制已有实验(这是设计意图) | +| watermark 设为 1.0 | 等效于禁用水位检查,仅保留预留容量检查 | +| watermark 设为 0.0 | 永远拒绝新实验(仅已知实验可运行) | +| required-labels 为空 | 不做 label 检查,所有请求至少为 P1 | +| api-service 多副本状态不一致 | 每个副本的峰值追踪是全局的下界,保守方向安全;scheduler 水位是共享的真实值 | +| 部分 scheduler 分区不健康 | faas-api-service 仅聚合健康分区数据,不健康分区的资源不计入 total/used | +| 新增/缩减 scheduler 分区 | faas-api-service 通过 coordinator 自动发现,无需 api-service 侧配置变更 | + +--- + +## 12. 实现变更清单 + +### 12.1 修改文件 + +#### api-service 变更 + +| 文件 | 变更 | +|------|------| +| `service/experiment_admission.go` | 新增 `watermark`、`requiredLabels` 字段;`ShouldAdmit` 增加水位检查和 tier 返回;新增 `CheckRequiredLabels` 方法;`pollClusterResource` 改为调用 faas-api-service `/clusterinfo` 聚合接口 | +| `service/experiment_admission_test.go` | 新增水位门控测试、required labels 测试、tier 分级测试 | +| `middleware/experiment_admission.go` | 中间件增加 required labels 检查,metric 区分 tier | +| `metrics/admission_metrics.go` | `ExperimentAdmissionTotal` 增加 `tier` 标签;新增 `ExperimentAdmissionWatermarkRatio` gauge | +| `main.go` | 新增 `--admission-watermark`、`--admission-required-labels` flag;`schedulerEndpoint` 改为复用 `--schedule-addr`(指向 faas-api-service) | + +#### faas-api-service (faas-apiserver) 侧 + +| 文件 | 变更 | +|------|------| +| `pkg/httpserver/server.go` | 注册新路由 `GET /hapis/faas.hcs.io/v1/clusterinfo` | +| `pkg/controller/clusterinfo.go` | 新增 controller,调用 service 层获取聚合资源数据 | +| `pkg/service/clusterinfo.go` | 新增 service,通过 coordinator gRPC 或本地 scheduler cache 聚合所有分区的 `ClusterResource` | + +> faas-api-service 已通过 `--huse-scheduler-addr` 连接 huse-coordinator。coordinator 的 `SchedulerCache` 已持有所有分区的 `ClusterResource`。新接口仅需将这些数据 sum 聚合后返回。 + +### 12.2 不修改 + +| 文件 | 原因 | +|------|------| +| api-service/controller/* | 业务逻辑不变,准入由中间件完成 | +| huse-scheduler | 每个分区已暴露 `/clusterresource`,无需改动 | +| huse-coordinator | 已周期性轮询 scheduler 资源,无需改动 | +| SDK (aenv) | 无任何改动需求 | +| SWEAgent | 无任何改动需求(已携带 experiment label) | + +--- + +## 13. 实现步骤 + +```text +Phase A: faas-api-service 新增 /clusterinfo 聚合接口 + +Step A1: 新增 pkg/service/clusterinfo.go + - 通过 coordinator 连接获取所有 SchedulerCache 的 ClusterResource + - 聚合(sum)所有健康分区的 TotalCPU/UsedCPU/FreeCPU/Memory + - 返回聚合结果 + 每分区明细 + 健康分区数 + +Step A2: 新增 pkg/controller/clusterinfo.go + - GET handler,调用 service 层 + - 返回标准 JSON 响应 + +Step A3: 更新 pkg/httpserver/server.go + - 注册路由 GET /hapis/faas.hcs.io/v1/clusterinfo + +Step A4: 构建验证 + cd faas-apiserver && go build ./... && go test ./... + +Phase B: api-service 准入控制增强 + +Step B1: 更新 service/experiment_admission.go + - 添加 watermark, requiredLabels 配置 + - ShouldAdmit 增加水位检查和 tier 返回 (AdmissionResult) + - 新增 CheckRequiredLabels 方法 + - pollClusterResource 改为调用 faas-api-service /clusterinfo + +Step B2: 更新 service/experiment_admission_test.go + - 新增 TestWatermarkBlocksNewExperiment + - 新增 TestWatermarkAllowsKnownExperiment + - 新增 TestRequiredLabelsMissing + - 新增 TestAdmissionTierClassification + +Step B3: 更新 middleware/experiment_admission.go + - 提取完整 labels map(不仅是 experiment) + - 先检查 required labels + - 调用 ShouldAdmit,按 tier 记录 metrics + +Step B4: 更新 metrics/admission_metrics.go + - ExperimentAdmissionTotal 增加 tier 标签维度 + - 新增 ExperimentAdmissionWatermarkRatio gauge + +Step B5: 更新 main.go + - 新增 flag:--admission-watermark, --admission-required-labels + - clusterinfo endpoint 复用 --schedule-addr + +Step B6: 构建验证 + go build ./... && go vet ./... && go test ./service/ ./middleware/ -v +``` + +--- + +## 14. 部署与回滚 + +### 14.1 灰度上线 + +```bash +# Phase 0: 先部署 faas-api-service 新版(含 /clusterinfo 接口),验证接口可用 +curl http://faas-api-service:8233/hapis/faas.hcs.io/v1/clusterinfo + +# Phase 1: 仅开启,watermark=1.0(等效只检查 required labels + 预留容量) +--admission-enabled --admission-watermark=1.0 --admission-required-labels=experiment + +# Phase 2: 收紧水位到 0.8,观察 metric +--admission-enabled --admission-watermark=0.8 + +# Phase 3: 目标水位 0.7 +--admission-enabled --admission-watermark=0.7 + +# Phase 4: 如需要,增加 owner 要求 +--admission-required-labels=experiment,owner +``` + +### 14.2 回滚 + +```bash +# 去掉 --admission-enabled 即可,或设为 false +# 效果:中间件检测到 admission==nil,直接 c.Next(),零额外开销 +``` + +### 14.3 监控告警 + +```yaml +# 新实验被拒率过高(可能 watermark 过低) +alert: ExperimentAdmissionRejectRate +expr: rate(aenv_api_experiment_admission_total{decision="rejected",tier="p1_new"}[5m]) + / rate(aenv_api_experiment_admission_total{tier="p1_new"}[5m]) > 0.5 +for: 10m + +# 无标签请求仍然存在(上层未适配) +alert: UnlabeledExperimentRequests +expr: rate(aenv_api_experiment_admission_total{tier="p2_unlabeled"}[5m]) > 0 +for: 30m +``` diff --git a/api-service/main.go b/api-service/main.go index d8862a0..3531470 100644 --- a/api-service/main.go +++ b/api-service/main.go @@ -25,6 +25,8 @@ import ( log "github.com/sirupsen/logrus" + "strings" + "api-service/controller" "api-service/metrics" "api-service/middleware" @@ -48,6 +50,13 @@ var ( tokenCacheMaxEntries int tokenCacheTTLMinutes int cleanupInterval string + // Experiment admission control + admissionEnabled bool + schedulerHTTPAddr string + perInstanceCPU int64 + peakWindow string + admissionWatermark float64 + admissionRequiredLabels string ) func init() { @@ -63,6 +72,13 @@ func init() { pflag.StringVar(&redisAddr, "redis-addr", "", "Redis address (host:port)") pflag.StringVar(&redisPassword, "redis-password", "", "Redis password") pflag.StringVar(&cleanupInterval, "cleanup-interval", "5m", "Cleanup service interval (e.g., 5m, 1h)") + + pflag.BoolVar(&admissionEnabled, "admission-enabled", false, "Enable experiment-level resource admission control") + pflag.StringVar(&schedulerHTTPAddr, "scheduler-addr", "", "faas-api-service HTTP API address for cluster info polling (e.g., http://faas-api-service:8233)") + pflag.Int64Var(&perInstanceCPU, "per-instance-cpu", 2000, "CPU per instance in milli-cores for admission calculation") + pflag.StringVar(&peakWindow, "peak-window", "15m", "Sliding window duration for experiment peak instance count") + pflag.Float64Var(&admissionWatermark, "admission-watermark", 0.7, "Cluster utilization watermark threshold (0.0-1.0) for new experiment admission") + pflag.StringVar(&admissionRequiredLabels, "admission-required-labels", "experiment", "Comma-separated list of required labels for admission (default: experiment)") } func healthChecker(c *gin.Context) { @@ -110,10 +126,31 @@ func main() { envInstanceController := controller.NewEnvInstanceController(scheduleClient, backendClient, redisClient) + // Experiment admission control + var experimentAdmission *service.ExperimentAdmission + if admissionEnabled && schedulerHTTPAddr != "" { + pw, err := time.ParseDuration(peakWindow) + if err != nil { + log.Fatalf("Invalid peak-window duration: %v", err) + } + var requiredLabels []string + for _, l := range strings.Split(admissionRequiredLabels, ",") { + l = strings.TrimSpace(l) + if l != "" { + requiredLabels = append(requiredLabels, l) + } + } + experimentAdmission = service.NewExperimentAdmission(schedulerHTTPAddr, perInstanceCPU, pw, admissionWatermark, requiredLabels) + go experimentAdmission.StartClusterResourcePoller() + log.Infof("Experiment admission control enabled (scheduler=%s, per-instance-cpu=%d, peak-window=%s, watermark=%.2f, required-labels=%v)", + schedulerHTTPAddr, perInstanceCPU, peakWindow, admissionWatermark, requiredLabels) + } + // Main route configuration mainRouter.POST("/env-instance", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), middleware.InstanceLimitMiddleware(redisClient), + middleware.ExperimentAdmissionMiddleware(experimentAdmission), middleware.RateLimit(qps), envInstanceController.CreateEnvInstance) mainRouter.GET("/env-instance/:id/list", middleware.AuthTokenMiddleware(tokenEnabled, backendClient), envInstanceController.ListEnvInstances) @@ -175,8 +212,9 @@ func main() { // across cleanup and metrics collection, reducing redundant requests to meta-service. if scheduleType == "faas" { if faasClient, ok := scheduleClient.(*service.FaaSClient); ok { + cleanManager.WithRecordDeleter(faasClient) metricsCollector := metrics.NewCollector(faasClient, interval) - go startUnifiedPeriodicTask(scheduleClient, cleanManager, metricsCollector, interval) + go startUnifiedPeriodicTask(scheduleClient, cleanManager, metricsCollector, experimentAdmission, interval) } else { go cleanManager.Start() } @@ -195,6 +233,7 @@ func startUnifiedPeriodicTask( envInstanceService service.EnvInstanceService, cleanManager *service.AEnvCleanManager, metricsCollector *metrics.Collector, + experimentAdmission *service.ExperimentAdmission, interval time.Duration, ) { // Random jitter to stagger tickers across replicas @@ -203,6 +242,7 @@ func startUnifiedPeriodicTask( time.Sleep(jitter) runOnce := func() { + log.Infof("Unified periodic task: starting run cycle") envInstances, err := envInstanceService.ListEnvInstances("") if err != nil { log.Warnf("Unified periodic task: failed to list instances: %v", err) @@ -212,6 +252,9 @@ func startUnifiedPeriodicTask( // Feed the same data to both consumers cleanManager.CleanupFromInstances(envInstances) metricsCollector.CollectFromEnvInstances(envInstances) + if experimentAdmission != nil { + experimentAdmission.UpdateExperimentCounts(envInstances) + } } runOnce() diff --git a/api-service/metrics/admission_metrics.go b/api-service/metrics/admission_metrics.go new file mode 100644 index 0000000..83c45c0 --- /dev/null +++ b/api-service/metrics/admission_metrics.go @@ -0,0 +1,89 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + // ExperimentAdmissionTotal counts admission decisions by tier. + ExperimentAdmissionTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: "experiment_admission_total", + Help: "Total experiment admission decisions", + }, + []string{"decision", "tier"}, // decision: "allowed"|"rejected", tier: "p0_known"|"p1_new"|"p2_unlabeled" + ) + + // ExperimentReservedCapacity tracks total reserved CPU in milli-cores. + ExperimentReservedCapacity = promauto.NewGauge( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: "experiment_reserved_capacity", + Help: "Total reserved CPU capacity across active experiments (milli-cores)", + }, + ) + + // ExperimentCount tracks number of active experiments. + ExperimentCount = promauto.NewGauge( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: "experiment_count", + Help: "Number of currently active experiments", + }, + ) + + // ExperimentPeakInstances tracks per-experiment peak instance count. + ExperimentPeakInstances = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: "experiment_peak_instances", + Help: "Peak instance count per experiment in sliding window", + }, + []string{"experiment"}, + ) + + // ClusterTotalCPU tracks total cluster CPU in milli-cores. + ClusterTotalCPU = promauto.NewGauge( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: "cluster_total_cpu", + Help: "Total cluster CPU capacity (milli-cores)", + }, + ) + + // ClusterUsedCPU tracks used cluster CPU in milli-cores. + ClusterUsedCPU = promauto.NewGauge( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: "cluster_used_cpu", + Help: "Used cluster CPU (milli-cores)", + }, + ) + + // ClusterUtilization tracks cluster CPU utilization ratio. + ClusterUtilization = promauto.NewGauge( + prometheus.GaugeOpts{ + Subsystem: subsystem, + Name: "cluster_utilization", + Help: "Cluster CPU utilization ratio (0.0-1.0)", + }, + ) +) diff --git a/api-service/middleware/experiment_admission.go b/api-service/middleware/experiment_admission.go new file mode 100644 index 0000000..575d0d8 --- /dev/null +++ b/api-service/middleware/experiment_admission.go @@ -0,0 +1,136 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package middleware + +import ( + "api-service/metrics" + "api-service/service" + "bytes" + "encoding/json" + "fmt" + "io" + "strings" + + backendmodels "envhub/models" + + "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" +) + +// experimentRequest is a minimal struct to extract labels from the request body. +type experimentRequest struct { + Labels map[string]string `json:"labels"` +} + +// createResponse is a minimal struct to extract instance ID from the response. +type createResponse struct { + Success bool `json:"success"` + Data struct { + ID string `json:"id"` + } `json:"data"` +} + +// ExperimentAdmissionMiddleware checks whether a new experiment should be admitted +// based on cluster resource availability, watermark, and required labels. +// +// Tier logic: +// - P2 (unlabeled): missing required labels → reject 400 +// - P0 (known): existing experiment → always allow +// - P1 (new): new experiment → watermark + capacity gate +// +// After successful creation, registers the instance → experiment mapping +// so the periodic task can correctly track per-experiment counts even when +// FaaS backend doesn't return user labels. +func ExperimentAdmissionMiddleware(admission *service.ExperimentAdmission) gin.HandlerFunc { + return func(c *gin.Context) { + // Feature gate: if admission is nil (disabled), pass through + if admission == nil { + c.Next() + return + } + + labels := extractLabelsFromRequest(c) + + // P2 gate: check required labels + missing := admission.CheckRequiredLabels(labels) + if len(missing) > 0 { + reason := fmt.Sprintf("Experiment admission denied: missing required labels [%s]", strings.Join(missing, ", ")) + metrics.InstanceOpsTotal.WithLabelValues("create", "", "admission_rejected").Inc() + metrics.ExperimentAdmissionTotal.WithLabelValues("rejected", "p2_unlabeled").Inc() + backendmodels.JSONErrorWithMessage(c, 400, reason) + c.Abort() + return + } + + experiment := labels["experiment"] + result := admission.ShouldAdmitWithResult(experiment) + + if !result.Allowed { + metrics.InstanceOpsTotal.WithLabelValues("create", "", "admission_rejected").Inc() + metrics.ExperimentAdmissionTotal.WithLabelValues("rejected", result.Tier).Inc() + backendmodels.JSONErrorWithMessage(c, 429, result.Reason) + c.Abort() + return + } + + metrics.ExperimentAdmissionTotal.WithLabelValues("allowed", result.Tier).Inc() + + // Wrap response writer to capture the response body + rw := &responseCapture{ResponseWriter: c.Writer, body: &bytes.Buffer{}} + c.Writer = rw + + c.Next() + + // After handler: if creation succeeded, register instance → experiment + if c.Writer.Status() == 200 { + var resp createResponse + if err := json.Unmarshal(rw.body.Bytes(), &resp); err == nil && resp.Success && resp.Data.ID != "" { + admission.RegisterInstance(resp.Data.ID, experiment) + log.Infof("Experiment admission: registered instance %s → experiment %q", resp.Data.ID, experiment) + } + } + } +} + +// responseCapture wraps gin.ResponseWriter to capture the response body. +type responseCapture struct { + gin.ResponseWriter + body *bytes.Buffer +} + +func (rc *responseCapture) Write(b []byte) (int, error) { + rc.body.Write(b) + return rc.ResponseWriter.Write(b) +} + +// extractLabelsFromRequest peeks at the request body to extract labels +// without consuming the body, so downstream handlers can still read it. +func extractLabelsFromRequest(c *gin.Context) map[string]string { + body, err := io.ReadAll(c.Request.Body) + if err != nil { + return nil + } + // Restore the body for downstream handlers + c.Request.Body = io.NopCloser(bytes.NewBuffer(body)) + + var req experimentRequest + if err := json.Unmarshal(body, &req); err != nil { + return nil + } + + return req.Labels +} diff --git a/api-service/middleware/logging.go b/api-service/middleware/logging.go index 44833fb..a3eaa69 100644 --- a/api-service/middleware/logging.go +++ b/api-service/middleware/logging.go @@ -88,8 +88,8 @@ func (w ResponseWriter) WriteString(s string) (int, error) { func LoggingMiddleware() gin.HandlerFunc { return func(c *gin.Context) { - // Filter health check path, don't log - if c.Request.URL.Path == "/health" { + // Filter noisy paths, don't log + if c.Request.URL.Path == "/health" || c.Request.URL.Path == "/metrics" { c.Next() return } diff --git a/api-service/service/cleanup_service.go b/api-service/service/cleanup_service.go index 3a3003a..7b6ce9d 100644 --- a/api-service/service/cleanup_service.go +++ b/api-service/service/cleanup_service.go @@ -19,13 +19,23 @@ package service import ( "api-service/models" "context" + "fmt" + "strings" "time" log "github.com/sirupsen/logrus" ) +// RecordDeleter can delete only the metadata record of an instance +// without contacting the runtime node. Use as a fallback when the +// node gateway is unreachable. +type RecordDeleter interface { + DeleteInstanceRecord(id string) error +} + type AEnvCleanManager struct { envInstanceService EnvInstanceService + recordDeleter RecordDeleter // optional, nil for non-faas backends interval time.Duration ctx context.Context @@ -52,6 +62,12 @@ func NewAEnvCleanManager(envInstanceService EnvInstanceService, duration time.Du return AEnvCleanManager } +// WithRecordDeleter sets an optional record-only deleter for fallback cleanup. +func (cm *AEnvCleanManager) WithRecordDeleter(rd RecordDeleter) *AEnvCleanManager { + cm.recordDeleter = rd + return cm +} + // WithMetrics sets the metrics functions for the clean manager func (cm *AEnvCleanManager) WithMetrics(incrementSuccess, incrementFailure func()) *AEnvCleanManager { cm.incrementCleanupSuccess = incrementSuccess @@ -114,22 +130,52 @@ func (cm *AEnvCleanManager) CleanupFromInstances(envInstances []*models.EnvInsta // Check if TTL is set and has expired if cm.isExpired(instance) { - log.Infof("Instance %s has expired (TTL: %s), deleting...", instance.ID, instance.TTL) + instanceInfo := formatInstanceInfo(instance) + log.Infof("Instance %s has expired (TTL: %s), deleting... %s", instance.ID, instance.TTL, instanceInfo) err := cm.envInstanceService.DeleteEnvInstance(instance.ID) if err != nil { - log.Errorf("Failed to delete expired instance %s: %v", instance.ID, err) + // If the error indicates the node gateway is unreachable, + // fall back to record-only deletion to prevent infinite retry loops. + if cm.recordDeleter != nil && isNodeUnreachable(err) { + log.Warnf("Node unreachable for instance %s, falling back to record-only deletion. %s err: %v", instance.ID, instanceInfo, err) + if recordErr := cm.recordDeleter.DeleteInstanceRecord(instance.ID); recordErr != nil { + log.Errorf("Failed to delete record for instance %s: %v %s", instance.ID, recordErr, instanceInfo) + cm.incrementCleanupFailure() + continue + } + deletedCount++ + cm.incrementCleanupSuccess() + log.Infof("Successfully deleted record for unreachable instance %s %s", instance.ID, instanceInfo) + continue + } + + log.Errorf("Failed to delete expired instance %s: %v %s", instance.ID, err, instanceInfo) cm.incrementCleanupFailure() continue } deletedCount++ cm.incrementCleanupSuccess() - log.Infof("Successfully deleted expired instance %s", instance.ID) + log.Infof("Successfully deleted expired instance %s %s", instance.ID, instanceInfo) } } log.Infof("TTL-based cleanup task completed. Deleted %d expired instances", deletedCount) } +// isNodeUnreachable checks if an error indicates that the node gateway is unreachable +func isNodeUnreachable(err error) bool { + msg := err.Error() + return strings.Contains(msg, "connection refused") || + strings.Contains(msg, "no such host") || + strings.Contains(msg, "i/o timeout") || + strings.Contains(msg, "connect: network is unreachable") +} + +// formatInstanceInfo returns a human-readable string with instance labels and creation time +func formatInstanceInfo(instance *models.EnvInstance) string { + return fmt.Sprintf("[labels=%v, created_at=%s]", instance.Labels, instance.CreatedAt) +} + // isExpired checks if an environment instance has expired based on its TTL and creation time func (cm *AEnvCleanManager) isExpired(instance *models.EnvInstance) bool { // If TTL is not set, consider it as non-expiring diff --git a/api-service/service/experiment_admission.go b/api-service/service/experiment_admission.go new file mode 100644 index 0000000..db3ffc8 --- /dev/null +++ b/api-service/service/experiment_admission.go @@ -0,0 +1,437 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "api-service/metrics" + "api-service/models" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +// ExperimentAdmission implements first-come-first-served resource protection. +// Earlier experiments are guaranteed resources (historical peak instance count); +// only the newest experiments get rejected when resources are tight. +type ExperimentAdmission struct { + mu sync.RWMutex + experiments map[string]*ExperimentState + clusterTotal int64 // total CPU in milli-cores + clusterUsed int64 // used CPU in milli-cores + hasClusterData bool // whether we have received at least one cluster resource update + + // instanceExperiments maps instanceID → experiment name. + // FaaS backend doesn't return user labels in ListInstances, so we track + // the experiment assignment internally when instances are created. + instanceExperiments map[string]string + + perInstanceCPU int64 // CPU per instance in milli-cores + peakWindow time.Duration // sliding window for peak calculation + schedulerEndpoint string // faas-api-service HTTP API base URL (aggregated cluster info) + pollInterval time.Duration // cluster resource poll interval + watermark float64 // cluster utilization threshold (0.0-1.0), default 0.7 + requiredLabels []string // labels that must be present, default ["experiment"] + + httpClient *http.Client + pollFailCount int // consecutive poll failures (for log suppression) +} + +// AdmissionResult contains the outcome of an admission decision. +type AdmissionResult struct { + Allowed bool + Reason string + Tier string // "p0_known", "p1_new", "p2_unlabeled" +} + +// ExperimentState tracks per-experiment instance counts and peak history. +type ExperimentState struct { + FirstSeen time.Time + CurrentCount int + PeakCount int + PeakSamples []PeakSample // ring buffer for sliding window +} + +// PeakSample records an instance count observation at a point in time. +type PeakSample struct { + Timestamp time.Time + Count int +} + +// clusterInfoData matches the "data" field of faas-api-service /clusterinfo response. +type clusterInfoData struct { + TotalCPU int64 `json:"totalCPU"` + UsedCPU int64 `json:"usedCPU"` + FreeCPU int64 `json:"freeCPU"` + TotalMemory int64 `json:"totalMemory"` + UsedMemory int64 `json:"usedMemory"` + FreeMemory int64 `json:"freeMemory"` + HealthyPartitions int `json:"healthyPartitions"` + TotalPartitions int `json:"totalPartitions"` +} + +// clusterInfoResponse matches the faas-api-service /clusterinfo JSON response. +type clusterInfoResponse struct { + Success bool `json:"success"` + Data clusterInfoData `json:"data"` +} + +// NewExperimentAdmission creates a new admission controller. +func NewExperimentAdmission(schedulerEndpoint string, perInstanceCPU int64, peakWindow time.Duration, watermark float64, requiredLabels []string) *ExperimentAdmission { + if watermark <= 0 || watermark > 1.0 { + watermark = 0.7 + } + if len(requiredLabels) == 0 { + requiredLabels = []string{"experiment"} + } + return &ExperimentAdmission{ + experiments: make(map[string]*ExperimentState), + instanceExperiments: make(map[string]string), + perInstanceCPU: perInstanceCPU, + peakWindow: peakWindow, + schedulerEndpoint: schedulerEndpoint, + pollInterval: 10 * time.Second, + watermark: watermark, + requiredLabels: requiredLabels, + httpClient: &http.Client{ + Timeout: 5 * time.Second, + }, + } +} + +// StartClusterResourcePoller runs a blocking loop that polls the scheduler +// for cluster resource data. Should be called in a goroutine. +func (ea *ExperimentAdmission) StartClusterResourcePoller() { + log.Infof("Experiment admission: starting cluster info poller (faas-api-service=%s, interval=%v)", ea.schedulerEndpoint, ea.pollInterval) + + // Poll immediately on start + ea.pollClusterResource() + + ticker := time.NewTicker(ea.pollInterval) + defer ticker.Stop() + for range ticker.C { + ea.pollClusterResource() + } +} + +func (ea *ExperimentAdmission) pollClusterResource() { + url := ea.schedulerEndpoint + "/hapis/faas.hcs.io/v1/clusterinfo" + resp, err := ea.httpClient.Get(url) + if err != nil { + ea.logPollFailure("failed to poll cluster info from faas-api-service: %v", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + ea.logPollFailure("faas-api-service returned status %d", resp.StatusCode) + return + } + + var cr clusterInfoResponse + if err := json.NewDecoder(resp.Body).Decode(&cr); err != nil { + ea.logPollFailure("failed to decode cluster info: %v", err) + return + } + + if !cr.Success { + ea.logPollFailure("faas-api-service returned success=false") + return + } + + if cr.Data.HealthyPartitions == 0 && cr.Data.TotalPartitions > 0 { + ea.logPollFailure("no healthy scheduler partitions (%d total)", cr.Data.TotalPartitions) + return + } + + ea.mu.Lock() + ea.clusterTotal = cr.Data.TotalCPU + ea.clusterUsed = cr.Data.UsedCPU + ea.hasClusterData = true + ea.pollFailCount = 0 + ea.mu.Unlock() + + // Update Prometheus gauges + metrics.ClusterTotalCPU.Set(float64(cr.Data.TotalCPU)) + metrics.ClusterUsedCPU.Set(float64(cr.Data.UsedCPU)) + if cr.Data.TotalCPU > 0 { + metrics.ClusterUtilization.Set(float64(cr.Data.UsedCPU) / float64(cr.Data.TotalCPU)) + } + + log.Debugf("Experiment admission: cluster resource updated (total=%d, used=%d, partitions=%d/%d)", + cr.Data.TotalCPU, cr.Data.UsedCPU, cr.Data.HealthyPartitions, cr.Data.TotalPartitions) +} + +// logPollFailure logs poll failures with exponential backoff suppression. +// Logs on 1st, 6th, 60th, 360th failure, etc. to avoid log spam. +func (ea *ExperimentAdmission) logPollFailure(format string, args ...interface{}) { + ea.mu.Lock() + ea.pollFailCount++ + count := ea.pollFailCount + ea.mu.Unlock() + + if count == 1 || count == 6 || count%360 == 0 { + msg := fmt.Sprintf(format, args...) + log.Warnf("Experiment admission: %s (failure #%d, suppressing repeated warnings)", msg, count) + } +} + +// RegisterInstance records the experiment assignment for an instance. +// Called by middleware after a successful CreateEnvInstance. +func (ea *ExperimentAdmission) RegisterInstance(instanceID, experiment string) { + if experiment == "" { + experiment = "default" + } + ea.mu.Lock() + defer ea.mu.Unlock() + ea.instanceExperiments[instanceID] = experiment + log.Debugf("Experiment admission: registered instance %s → experiment %s (total tracked: %d)", + instanceID, experiment, len(ea.instanceExperiments)) +} + +// UnregisterInstance removes the experiment assignment for an instance. +func (ea *ExperimentAdmission) UnregisterInstance(instanceID string) { + ea.mu.Lock() + defer ea.mu.Unlock() + delete(ea.instanceExperiments, instanceID) +} + +// UpdateExperimentCounts updates per-experiment instance counts from the +// periodic ListEnvInstances result. Called from startUnifiedPeriodicTask. +// Uses the internal instanceExperiments map to resolve experiment labels, +// since FaaS backend doesn't return user labels in ListInstances. +func (ea *ExperimentAdmission) UpdateExperimentCounts(instances []*models.EnvInstance) { + ea.mu.Lock() + defer ea.mu.Unlock() + + // Count instances per experiment + counts := make(map[string]int) + activeInstanceIDs := make(map[string]bool) + for _, inst := range instances { + if inst.Status == "Terminated" || inst.Status == "Failed" { + continue + } + activeInstanceIDs[inst.ID] = true + // Try internal map first (for FaaS mode where labels aren't returned) + exp := ea.getInstanceExperimentLocked(inst) + counts[exp]++ + } + + now := time.Now() + + // Update existing experiments and add new ones + for exp, count := range counts { + state, exists := ea.experiments[exp] + if !exists { + state = &ExperimentState{ + FirstSeen: now, + } + ea.experiments[exp] = state + } + state.CurrentCount = count + state.PeakSamples = append(state.PeakSamples, PeakSample{ + Timestamp: now, + Count: count, + }) + + // Evict samples outside the sliding window + ea.evictOldSamples(state, now) + + // Recalculate peak from remaining samples + state.PeakCount = ea.calculatePeak(state) + } + + // Remove experiments with zero active instances + for exp, state := range ea.experiments { + if _, active := counts[exp]; !active { + state.CurrentCount = 0 + ea.evictOldSamples(state, now) + state.PeakCount = ea.calculatePeak(state) + + // Remove if all historical samples have expired (peak decayed to 0) + if len(state.PeakSamples) == 0 || state.PeakCount == 0 { + delete(ea.experiments, exp) + } + } + } + + // Clean up stale entries in instanceExperiments (instances no longer active) + for id := range ea.instanceExperiments { + if !activeInstanceIDs[id] { + delete(ea.instanceExperiments, id) + } + } + + // Update Prometheus gauges + metrics.ExperimentCount.Set(float64(len(ea.experiments))) + metrics.ExperimentReservedCapacity.Set(float64(ea.reservedCapacity())) + for exp, state := range ea.experiments { + metrics.ExperimentPeakInstances.WithLabelValues(exp).Set(float64(state.PeakCount)) + } +} + +// getInstanceExperimentLocked resolves the experiment name for an instance. +// Must be called with ea.mu held. +func (ea *ExperimentAdmission) getInstanceExperimentLocked(inst *models.EnvInstance) string { + if exp, ok := ea.instanceExperiments[inst.ID]; ok { + return exp + } + if exp := inst.Labels["experiment"]; exp != "" { + return exp + } + return "default" +} + +func (ea *ExperimentAdmission) evictOldSamples(state *ExperimentState, now time.Time) { + cutoff := now.Add(-ea.peakWindow) + i := 0 + for i < len(state.PeakSamples) && state.PeakSamples[i].Timestamp.Before(cutoff) { + i++ + } + if i > 0 { + state.PeakSamples = state.PeakSamples[i:] + } +} + +func (ea *ExperimentAdmission) calculatePeak(state *ExperimentState) int { + peak := 0 + for _, s := range state.PeakSamples { + if s.Count > peak { + peak = s.Count + } + } + return peak +} + +// CheckRequiredLabels checks whether the given labels contain all required labels. +// Returns missing label names. Empty return means all labels are present. +func (ea *ExperimentAdmission) CheckRequiredLabels(labels map[string]string) []string { + var missing []string + for _, required := range ea.requiredLabels { + if labels[required] == "" { + missing = append(missing, required) + } + } + return missing +} + +// ShouldAdmit is a convenience wrapper returning (bool, string). +// Use ShouldAdmitWithResult for full tier information. +func (ea *ExperimentAdmission) ShouldAdmit(experimentID string) (bool, string) { + result := ea.ShouldAdmitWithResult(experimentID) + return result.Allowed, result.Reason +} + +// ShouldAdmitWithResult decides whether a CreateEnvInstance request should be admitted. +// Returns an AdmissionResult with tier classification: +// - p0_known: existing experiment, always admitted +// - p1_new: new experiment, subject to watermark + capacity check +// - p2_unlabeled: handled by middleware (CheckRequiredLabels) +func (ea *ExperimentAdmission) ShouldAdmitWithResult(experimentID string) AdmissionResult { + if experimentID == "" { + experimentID = "default" + } + + ea.mu.RLock() + defer ea.mu.RUnlock() + + // Fail-open: no cluster data yet (startup, scheduler unreachable) + if !ea.hasClusterData { + return AdmissionResult{Allowed: true, Tier: "p1_new"} + } + + // P0: Known experiment — always admit + if _, exists := ea.experiments[experimentID]; exists { + return AdmissionResult{Allowed: true, Tier: "p0_known"} + } + + // P1: New experiment — dual gate: watermark + reserved capacity + + // Gate 1: Cluster utilization watermark + if ea.clusterTotal > 0 { + utilization := float64(ea.clusterUsed) / float64(ea.clusterTotal) + if utilization >= ea.watermark { + return AdmissionResult{ + Allowed: false, + Tier: "p1_new", + Reason: fmt.Sprintf( + "Experiment admission denied: cluster utilization %.1f%% exceeds watermark %.1f%% for new experiment %q "+ + "(total=%d, used=%d milli-CPU)", + utilization*100, ea.watermark*100, experimentID, ea.clusterTotal, ea.clusterUsed, + ), + } + } + } + + // Gate 2: Reserved capacity check + reserved := ea.reservedCapacity() + available := ea.clusterTotal - reserved + if available > ea.perInstanceCPU { + return AdmissionResult{Allowed: true, Tier: "p1_new"} + } + + return AdmissionResult{ + Allowed: false, + Tier: "p1_new", + Reason: fmt.Sprintf( + "Experiment admission denied: insufficient cluster capacity for new experiment %q "+ + "(total=%d, reserved=%d, available=%d, required=%d milli-CPU)", + experimentID, ea.clusterTotal, reserved, available, ea.perInstanceCPU, + ), + } +} + +// reservedCapacity returns the total CPU reserved by all active experiments. +// Must be called with ea.mu held (at least RLock). +func (ea *ExperimentAdmission) reservedCapacity() int64 { + var total int64 + for _, state := range ea.experiments { + total += int64(state.PeakCount) * ea.perInstanceCPU + } + return total +} + +// GetMetrics returns current admission state for debugging/observability. +func (ea *ExperimentAdmission) GetMetrics() map[string]interface{} { + ea.mu.RLock() + defer ea.mu.RUnlock() + + experiments := make(map[string]interface{}) + for exp, state := range ea.experiments { + experiments[exp] = map[string]interface{}{ + "current_count": state.CurrentCount, + "peak_count": state.PeakCount, + "first_seen": state.FirstSeen, + "samples": len(state.PeakSamples), + } + } + + return map[string]interface{}{ + "cluster_total": ea.clusterTotal, + "cluster_used": ea.clusterUsed, + "has_cluster_data": ea.hasClusterData, + "per_instance_cpu": ea.perInstanceCPU, + "reserved_capacity": ea.reservedCapacity(), + "watermark": ea.watermark, + "required_labels": ea.requiredLabels, + "experiment_count": len(ea.experiments), + "experiments": experiments, + } +} diff --git a/api-service/service/experiment_admission_test.go b/api-service/service/experiment_admission_test.go new file mode 100644 index 0000000..f037257 --- /dev/null +++ b/api-service/service/experiment_admission_test.go @@ -0,0 +1,444 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "api-service/models" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" +) + +func newTestAdmission(totalCPU, perInstanceCPU int64, peakWindow time.Duration) *ExperimentAdmission { + ea := NewExperimentAdmission("http://localhost:14457", perInstanceCPU, peakWindow, 0.7, []string{"experiment"}) + ea.mu.Lock() + ea.clusterTotal = totalCPU + ea.hasClusterData = true + ea.mu.Unlock() + return ea +} + +func makeInstances(experiments map[string]int) []*models.EnvInstance { + var instances []*models.EnvInstance + id := 0 + for exp, count := range experiments { + for i := 0; i < count; i++ { + id++ + labels := map[string]string{"experiment": exp} + instances = append(instances, &models.EnvInstance{ + ID: fmt.Sprintf("inst-%d", id), + Status: "Running", + Labels: labels, + }) + } + } + return instances +} + +// Need fmt for makeInstances +func init() {} + +func TestKnownExperimentAlwaysAdmitted(t *testing.T) { + ea := newTestAdmission(10000, 2000, 15*time.Minute) + + // Register experiment "exp-1" with instances + instances := makeInstances(map[string]int{"exp-1": 5}) + ea.UpdateExperimentCounts(instances) + + // Known experiment should always be admitted, even if cluster is "full" + ea.mu.Lock() + ea.clusterTotal = 1 // artificially tiny cluster + ea.mu.Unlock() + + allowed, reason := ea.ShouldAdmit("exp-1") + if !allowed { + t.Errorf("Expected known experiment to be admitted, got rejected: %s", reason) + } +} + +func TestNewExperimentAdmittedWhenCapacityAvailable(t *testing.T) { + ea := newTestAdmission(20000, 2000, 15*time.Minute) + + // Register experiment "exp-1" with 3 instances (peak=3, reserved=6000) + instances := makeInstances(map[string]int{"exp-1": 3}) + ea.UpdateExperimentCounts(instances) + + // New experiment "exp-2": available = 20000 - 6000 = 14000 > 2000 + allowed, reason := ea.ShouldAdmit("exp-2") + if !allowed { + t.Errorf("Expected new experiment to be admitted, got rejected: %s", reason) + } +} + +func TestNewExperimentRejectedWhenCapacityInsufficient(t *testing.T) { + ea := newTestAdmission(10000, 2000, 15*time.Minute) + + // Register experiment "exp-1" with 5 instances (peak=5, reserved=10000) + instances := makeInstances(map[string]int{"exp-1": 5}) + ea.UpdateExperimentCounts(instances) + + // New experiment "exp-2": available = 10000 - 10000 = 0, not > 2000 + allowed, _ := ea.ShouldAdmit("exp-2") + if allowed { + t.Error("Expected new experiment to be rejected when capacity is insufficient") + } +} + +func TestMultipleExperimentsReserveCapacity(t *testing.T) { + ea := newTestAdmission(20000, 2000, 15*time.Minute) + + // Two experiments: exp-1=3, exp-2=4, reserved = (3+4)*2000 = 14000 + instances := makeInstances(map[string]int{"exp-1": 3, "exp-2": 4}) + ea.UpdateExperimentCounts(instances) + + // New experiment "exp-3": available = 20000 - 14000 = 6000 > 2000 → admit + allowed, _ := ea.ShouldAdmit("exp-3") + if !allowed { + t.Error("Expected exp-3 to be admitted with 6000 available") + } + + // Now add more to exp-2 to fill cluster: exp-1=3, exp-2=7, reserved = (3+7)*2000 = 20000 + instances = makeInstances(map[string]int{"exp-1": 3, "exp-2": 7}) + ea.UpdateExperimentCounts(instances) + + // New experiment "exp-3": available = 20000 - 20000 = 0, not > 2000 → reject + allowed, _ = ea.ShouldAdmit("exp-3") + if allowed { + t.Error("Expected exp-3 to be rejected when cluster is fully reserved") + } +} + +func TestPeakSlidingWindowEviction(t *testing.T) { + peakWindow := 100 * time.Millisecond + ea := newTestAdmission(10000, 2000, peakWindow) + + // Record a high count + instances := makeInstances(map[string]int{"exp-1": 5}) + ea.UpdateExperimentCounts(instances) + + ea.mu.RLock() + peak := ea.experiments["exp-1"].PeakCount + ea.mu.RUnlock() + if peak != 5 { + t.Errorf("Expected peak=5, got %d", peak) + } + + // Wait for the window to expire + time.Sleep(peakWindow + 50*time.Millisecond) + + // Update with lower count — old peak sample should be evicted + instances = makeInstances(map[string]int{"exp-1": 2}) + ea.UpdateExperimentCounts(instances) + + ea.mu.RLock() + peak = ea.experiments["exp-1"].PeakCount + ea.mu.RUnlock() + if peak != 2 { + t.Errorf("Expected peak=2 after window eviction, got %d", peak) + } +} + +func TestExperimentRemovedAfterWindowExpires(t *testing.T) { + peakWindow := 100 * time.Millisecond + ea := newTestAdmission(10000, 2000, peakWindow) + + instances := makeInstances(map[string]int{"exp-1": 3}) + ea.UpdateExperimentCounts(instances) + + // Wait for window to expire + time.Sleep(peakWindow + 50*time.Millisecond) + + // Update with no instances for exp-1 + ea.UpdateExperimentCounts([]*models.EnvInstance{}) + + ea.mu.RLock() + _, exists := ea.experiments["exp-1"] + ea.mu.RUnlock() + if exists { + t.Error("Expected exp-1 to be removed after all samples expired") + } +} + +func TestFailOpenWhenNoClusterData(t *testing.T) { + ea := NewExperimentAdmission("http://localhost:14457", 2000, 15*time.Minute, 0.7, []string{"experiment"}) + // hasClusterData is false by default + + allowed, _ := ea.ShouldAdmit("new-experiment") + if !allowed { + t.Error("Expected fail-open when no cluster data available") + } +} + +func TestEmptyExperimentIDTreatedAsDefault(t *testing.T) { + ea := newTestAdmission(10000, 2000, 15*time.Minute) + + allowed, _ := ea.ShouldAdmit("") + if !allowed { + t.Error("Expected empty experiment ID to be admitted (treated as default)") + } +} + +func TestTerminatedInstancesNotCounted(t *testing.T) { + ea := newTestAdmission(10000, 2000, 15*time.Minute) + + instances := []*models.EnvInstance{ + {ID: "inst-1", Status: "Running", Labels: map[string]string{"experiment": "exp-1"}}, + {ID: "inst-2", Status: "Terminated", Labels: map[string]string{"experiment": "exp-1"}}, + {ID: "inst-3", Status: "Failed", Labels: map[string]string{"experiment": "exp-1"}}, + {ID: "inst-4", Status: "Running", Labels: map[string]string{"experiment": "exp-1"}}, + } + ea.UpdateExperimentCounts(instances) + + ea.mu.RLock() + count := ea.experiments["exp-1"].CurrentCount + peak := ea.experiments["exp-1"].PeakCount + ea.mu.RUnlock() + + if count != 2 { + t.Errorf("Expected current count=2 (excluding terminated/failed), got %d", count) + } + if peak != 2 { + t.Errorf("Expected peak=2, got %d", peak) + } +} + +func TestConcurrentAccess(t *testing.T) { + ea := newTestAdmission(100000, 2000, 15*time.Minute) + + instances := makeInstances(map[string]int{"exp-1": 10, "exp-2": 5}) + ea.UpdateExperimentCounts(instances) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + ea.ShouldAdmit("exp-1") + ea.ShouldAdmit("new-exp") + ea.GetMetrics() + }(i) + } + + // Concurrent updates + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ea.UpdateExperimentCounts(instances) + }() + } + + wg.Wait() +} + +func TestClusterResourcePoller(t *testing.T) { + // Create a mock faas-api-service HTTP server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/hapis/faas.hcs.io/v1/clusterinfo" { + http.NotFound(w, r) + return + } + resp := clusterInfoResponse{ + Success: true, + Data: clusterInfoData{ + TotalCPU: 50000, + UsedCPU: 20000, + FreeCPU: 30000, + TotalMemory: 128000, + UsedMemory: 64000, + FreeMemory: 64000, + HealthyPartitions: 2, + TotalPartitions: 2, + }, + } + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + ea := NewExperimentAdmission(server.URL, 2000, 15*time.Minute, 0.7, []string{"experiment"}) + ea.pollClusterResource() + + ea.mu.RLock() + defer ea.mu.RUnlock() + + if !ea.hasClusterData { + t.Error("Expected hasClusterData=true after successful poll") + } + if ea.clusterTotal != 50000 { + t.Errorf("Expected clusterTotal=50000, got %d", ea.clusterTotal) + } + if ea.clusterUsed != 20000 { + t.Errorf("Expected clusterUsed=20000, got %d", ea.clusterUsed) + } +} + +func TestClusterResourcePollerFailureGraceful(t *testing.T) { + // Create a server that returns errors + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + ea := NewExperimentAdmission(server.URL, 2000, 15*time.Minute, 0.7, []string{"experiment"}) + + // Set initial data + ea.mu.Lock() + ea.clusterTotal = 30000 + ea.hasClusterData = true + ea.mu.Unlock() + + // Poll should fail but keep existing data + ea.pollClusterResource() + + ea.mu.RLock() + defer ea.mu.RUnlock() + + if ea.clusterTotal != 30000 { + t.Errorf("Expected clusterTotal to remain 30000 after poll failure, got %d", ea.clusterTotal) + } +} + +func TestGetMetrics(t *testing.T) { + ea := newTestAdmission(20000, 2000, 15*time.Minute) + + instances := makeInstances(map[string]int{"exp-1": 3, "exp-2": 5}) + ea.UpdateExperimentCounts(instances) + + m := ea.GetMetrics() + + if m["cluster_total"].(int64) != 20000 { + t.Errorf("Expected cluster_total=20000, got %v", m["cluster_total"]) + } + if m["experiment_count"].(int) != 2 { + t.Errorf("Expected experiment_count=2, got %v", m["experiment_count"]) + } + if m["reserved_capacity"].(int64) != (3+5)*2000 { + t.Errorf("Expected reserved_capacity=16000, got %v", m["reserved_capacity"]) + } +} + +func TestWatermarkRejectsNewExperiment(t *testing.T) { + ea := newTestAdmission(100000, 2000, 15*time.Minute) + // Set watermark to 0.5 + ea.mu.Lock() + ea.watermark = 0.5 + ea.clusterUsed = 60000 // 60% utilization, above 50% watermark + ea.mu.Unlock() + + // Register exp-1 so it's known + instances := makeInstances(map[string]int{"exp-1": 3}) + ea.UpdateExperimentCounts(instances) + + // P0: known experiment should still be admitted + result := ea.ShouldAdmitWithResult("exp-1") + if !result.Allowed || result.Tier != "p0_known" { + t.Errorf("Expected P0 known experiment to be admitted, got allowed=%v tier=%s", result.Allowed, result.Tier) + } + + // P1: new experiment should be rejected due to watermark + result = ea.ShouldAdmitWithResult("exp-new") + if result.Allowed { + t.Error("Expected new experiment to be rejected when utilization exceeds watermark") + } + if result.Tier != "p1_new" { + t.Errorf("Expected tier=p1_new, got %s", result.Tier) + } +} + +func TestWatermarkPassesWhenUtilizationLow(t *testing.T) { + ea := newTestAdmission(100000, 2000, 15*time.Minute) + ea.mu.Lock() + ea.watermark = 0.7 + ea.clusterUsed = 50000 // 50% utilization, below 70% watermark + ea.mu.Unlock() + + result := ea.ShouldAdmitWithResult("new-exp") + if !result.Allowed { + t.Errorf("Expected new experiment to be admitted when utilization below watermark, reason: %s", result.Reason) + } + if result.Tier != "p1_new" { + t.Errorf("Expected tier=p1_new, got %s", result.Tier) + } +} + +func TestCheckRequiredLabels(t *testing.T) { + ea := NewExperimentAdmission("http://localhost", 2000, 15*time.Minute, 0.7, []string{"experiment", "team"}) + + // All present + missing := ea.CheckRequiredLabels(map[string]string{"experiment": "exp-1", "team": "ml"}) + if len(missing) != 0 { + t.Errorf("Expected no missing labels, got %v", missing) + } + + // One missing + missing = ea.CheckRequiredLabels(map[string]string{"experiment": "exp-1"}) + if len(missing) != 1 || missing[0] != "team" { + t.Errorf("Expected missing=[team], got %v", missing) + } + + // All missing + missing = ea.CheckRequiredLabels(map[string]string{}) + if len(missing) != 2 { + t.Errorf("Expected 2 missing labels, got %d", len(missing)) + } + + // Nil labels + missing = ea.CheckRequiredLabels(nil) + if len(missing) != 2 { + t.Errorf("Expected 2 missing labels for nil map, got %d", len(missing)) + } +} + +func TestTierClassification(t *testing.T) { + ea := newTestAdmission(100000, 2000, 15*time.Minute) + ea.mu.Lock() + ea.watermark = 0.7 + ea.clusterUsed = 10000 + ea.mu.Unlock() + + instances := makeInstances(map[string]int{"exp-1": 5}) + ea.UpdateExperimentCounts(instances) + + // P0: known + result := ea.ShouldAdmitWithResult("exp-1") + if result.Tier != "p0_known" { + t.Errorf("Expected tier=p0_known for known experiment, got %s", result.Tier) + } + + // P1: new + result = ea.ShouldAdmitWithResult("exp-new") + if result.Tier != "p1_new" { + t.Errorf("Expected tier=p1_new for new experiment, got %s", result.Tier) + } +} + +func TestDefaultWatermarkAndLabels(t *testing.T) { + // Invalid watermark should default to 0.7 + ea := NewExperimentAdmission("http://localhost", 2000, 15*time.Minute, 0, nil) + ea.mu.RLock() + if ea.watermark != 0.7 { + t.Errorf("Expected default watermark=0.7, got %f", ea.watermark) + } + if len(ea.requiredLabels) != 1 || ea.requiredLabels[0] != "experiment" { + t.Errorf("Expected default requiredLabels=[experiment], got %v", ea.requiredLabels) + } + ea.mu.RUnlock() +} diff --git a/api-service/service/faas_client.go b/api-service/service/faas_client.go index d0acc05..27749a4 100644 --- a/api-service/service/faas_client.go +++ b/api-service/service/faas_client.go @@ -407,6 +407,25 @@ func (c *FaaSClient) DeleteInstance(name string) error { return nil } +// DeleteInstanceRecord deletes only the metadata record of an instance +// without contacting the runtime node gateway. This is used as a fallback +// when the node is unreachable. +func (c *FaaSClient) DeleteInstanceRecord(name string) error { + uri := fmt.Sprintf("/hapis/faas.hcs.io/v1/instances/%s/record", name) + + resp := &faas_model.APIResponse{} + err := c.client.Delete(uri).Do().Into(resp) + if err != nil { + return fmt.Errorf("failed to delete instance record %s: %w", name, err) + } + + if !resp.Success { + return fmt.Errorf("failed to delete instance record %s: %s", name, resp.ErrorMessage) + } + + return nil +} + // --- Utility functions --- // convertStatus converts model.InstanceStatus to models.EnvInstanceStatus.String()