Skip to content

Commit 9f6b171

Browse files
authored
Auto-standby (#183)
* Add Linux auto-standby controller and e2e coverage * Expose auto-standby policy in Stainless config * Address auto-standby review feedback * Default-skip compression standby integration test * Clarify auto-standby update metadata handling * Wire auto-standby controller into API app * Make auto-standby event-driven and observable * Add periodic auto-standby snapshot sync * Fix auto-standby review follow-ups * Format auto-standby files for CI * Add scope for auto-standby status route * Consolidate instance lifecycle subscriptions * Add config for lifecycle event buffer size * Fix Linux conntrack event test encoding * Clear stale auto-standby runtime on ineligible updates * Preserve auto-standby runtime on lifecycle events * Clone snapshot policy in metadata copies * Harden auto-standby startup and reconnects * Add auto-standby timer and lifecycle tests
1 parent 76a8772 commit 9f6b171

47 files changed

Lines changed: 4793 additions & 279 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/api/api/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package api
22

33
import (
44
"github.com/kernel/hypeman/cmd/api/config"
5+
"github.com/kernel/hypeman/lib/autostandby"
56
"github.com/kernel/hypeman/lib/builds"
67
"github.com/kernel/hypeman/lib/devices"
78
"github.com/kernel/hypeman/lib/guestmemory"
@@ -27,6 +28,7 @@ type ApiService struct {
2728
BuildManager builds.Manager
2829
ResourceManager *resources.Manager
2930
GuestMemoryController guestmemory.Controller
31+
AutoStandbyController *autostandby.Controller
3032
VMMetricsManager *vm_metrics.Manager
3133
}
3234

@@ -44,6 +46,7 @@ func New(
4446
buildManager builds.Manager,
4547
resourceManager *resources.Manager,
4648
guestMemoryController guestmemory.Controller,
49+
autoStandbyController *autostandby.Controller,
4750
vmMetricsManager *vm_metrics.Manager,
4851
) *ApiService {
4952
return &ApiService{
@@ -57,6 +60,7 @@ func New(
5760
BuildManager: buildManager,
5861
ResourceManager: resourceManager,
5962
GuestMemoryController: guestMemoryController,
63+
AutoStandbyController: autoStandbyController,
6064
VMMetricsManager: vmMetricsManager,
6165
}
6266
}

cmd/api/api/auto_standby.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package api
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/kernel/hypeman/lib/autostandby"
7+
"github.com/kernel/hypeman/lib/oapi"
8+
"github.com/samber/lo"
9+
)
10+
11+
func toDomainAutoStandbyPolicy(policy *oapi.AutoStandbyPolicy) (*autostandby.Policy, error) {
12+
if policy == nil {
13+
return nil, nil
14+
}
15+
16+
out := &autostandby.Policy{}
17+
if policy.Enabled != nil {
18+
out.Enabled = *policy.Enabled
19+
}
20+
if policy.IdleTimeout != nil {
21+
out.IdleTimeout = *policy.IdleTimeout
22+
}
23+
if policy.IgnoreSourceCidrs != nil {
24+
out.IgnoreSourceCIDRs = append([]string(nil), (*policy.IgnoreSourceCidrs)...)
25+
}
26+
if policy.IgnoreDestinationPorts != nil {
27+
out.IgnoreDestinationPorts = make([]uint16, 0, len(*policy.IgnoreDestinationPorts))
28+
for _, port := range *policy.IgnoreDestinationPorts {
29+
if port < 1 || port > 65535 {
30+
return nil, fmt.Errorf("auto_standby.ignore_destination_ports must be between 1 and 65535")
31+
}
32+
out.IgnoreDestinationPorts = append(out.IgnoreDestinationPorts, uint16(port))
33+
}
34+
}
35+
36+
return out, nil
37+
}
38+
39+
func toOAPIAutoStandbyPolicy(policy *autostandby.Policy) *oapi.AutoStandbyPolicy {
40+
if policy == nil {
41+
return nil
42+
}
43+
44+
out := &oapi.AutoStandbyPolicy{
45+
Enabled: lo.ToPtr(policy.Enabled),
46+
}
47+
if policy.IdleTimeout != "" {
48+
out.IdleTimeout = lo.ToPtr(policy.IdleTimeout)
49+
}
50+
if len(policy.IgnoreSourceCIDRs) > 0 {
51+
out.IgnoreSourceCidrs = lo.ToPtr(append([]string(nil), policy.IgnoreSourceCIDRs...))
52+
}
53+
if len(policy.IgnoreDestinationPorts) > 0 {
54+
ports := make([]int, 0, len(policy.IgnoreDestinationPorts))
55+
for _, port := range policy.IgnoreDestinationPorts {
56+
ports = append(ports, int(port))
57+
}
58+
out.IgnoreDestinationPorts = &ports
59+
}
60+
61+
return out
62+
}

cmd/api/api/auto_standby_status.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package api
2+
3+
import (
4+
"context"
5+
6+
"github.com/kernel/hypeman/lib/autostandby"
7+
"github.com/kernel/hypeman/lib/instances"
8+
"github.com/kernel/hypeman/lib/logger"
9+
"github.com/kernel/hypeman/lib/oapi"
10+
"github.com/samber/lo"
11+
)
12+
13+
func (s *ApiService) GetAutoStandbyStatus(ctx context.Context, request oapi.GetAutoStandbyStatusRequestObject) (oapi.GetAutoStandbyStatusResponseObject, error) {
14+
log := logger.FromContext(ctx)
15+
16+
inst, err := s.InstanceManager.GetInstance(ctx, request.Id)
17+
if err != nil {
18+
if err == instances.ErrNotFound || err == instances.ErrAmbiguousName {
19+
return oapi.GetAutoStandbyStatus404JSONResponse{
20+
Code: "not_found",
21+
Message: "instance not found",
22+
}, nil
23+
}
24+
log.ErrorContext(ctx, "failed to resolve instance for auto-standby status", "instance_id", request.Id, "error", err)
25+
return oapi.GetAutoStandbyStatus500JSONResponse{
26+
Code: "internal_error",
27+
Message: "failed to load instance",
28+
}, nil
29+
}
30+
31+
snapshot := autostandby.StatusSnapshot{
32+
Supported: false,
33+
Configured: inst.AutoStandby != nil,
34+
Enabled: inst.AutoStandby != nil && inst.AutoStandby.Enabled,
35+
TrackingMode: "conntrack_events_v4_tcp",
36+
Status: autostandby.StatusUnsupported,
37+
Reason: autostandby.ReasonUnsupportedPlatform,
38+
}
39+
if s.AutoStandbyController != nil {
40+
snapshot = s.AutoStandbyController.Describe(instanceToAutoStandby(*inst))
41+
}
42+
43+
return oapi.GetAutoStandbyStatus200JSONResponse(toOAPIAutoStandbyStatus(snapshot)), nil
44+
}
45+
46+
func instanceToAutoStandby(inst instances.Instance) autostandby.Instance {
47+
return autostandby.Instance{
48+
ID: inst.Id,
49+
Name: inst.Name,
50+
State: string(inst.State),
51+
NetworkEnabled: inst.NetworkEnabled,
52+
IP: inst.IP,
53+
HasVGPU: inst.GPUProfile != "" || inst.GPUMdevUUID != "",
54+
AutoStandby: inst.AutoStandby,
55+
}
56+
}
57+
58+
func toOAPIAutoStandbyStatus(status autostandby.StatusSnapshot) oapi.AutoStandbyStatus {
59+
out := oapi.AutoStandbyStatus{
60+
ActiveInboundConnections: status.ActiveInboundCount,
61+
Configured: status.Configured,
62+
Eligible: status.Eligible,
63+
Enabled: status.Enabled,
64+
Reason: oapi.AutoStandbyStatusReason(status.Reason),
65+
Status: oapi.AutoStandbyStatusStatus(status.Status),
66+
Supported: status.Supported,
67+
TrackingMode: status.TrackingMode,
68+
}
69+
if status.IdleTimeout != "" {
70+
out.IdleTimeout = lo.ToPtr(status.IdleTimeout)
71+
}
72+
out.IdleSince = status.IdleSince
73+
out.LastInboundActivityAt = status.LastInboundActivityAt
74+
out.NextStandbyAt = status.NextStandbyAt
75+
if status.CountdownRemaining != nil {
76+
out.CountdownRemaining = lo.ToPtr(status.CountdownRemaining.String())
77+
}
78+
return out
79+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"net/netip"
6+
"testing"
7+
"time"
8+
9+
"github.com/kernel/hypeman/lib/autostandby"
10+
"github.com/kernel/hypeman/lib/instances"
11+
"github.com/kernel/hypeman/lib/oapi"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
type captureStatusManager struct {
17+
instances.Manager
18+
instance *instances.Instance
19+
err error
20+
}
21+
22+
func (m *captureStatusManager) GetInstance(context.Context, string) (*instances.Instance, error) {
23+
if m.err != nil {
24+
return nil, m.err
25+
}
26+
return m.instance, nil
27+
}
28+
29+
type statusStore struct {
30+
instances []autostandby.Instance
31+
runtime map[string]*autostandby.Runtime
32+
events chan autostandby.InstanceEvent
33+
}
34+
35+
func (s *statusStore) ListInstances(context.Context) ([]autostandby.Instance, error) {
36+
return append([]autostandby.Instance(nil), s.instances...), nil
37+
}
38+
39+
func (s *statusStore) StandbyInstance(context.Context, string) error { return nil }
40+
41+
func (s *statusStore) SetRuntime(_ context.Context, id string, runtime *autostandby.Runtime) error {
42+
if s.runtime == nil {
43+
s.runtime = make(map[string]*autostandby.Runtime)
44+
}
45+
s.runtime[id] = runtime
46+
return nil
47+
}
48+
49+
func (s *statusStore) SubscribeInstanceEvents() (<-chan autostandby.InstanceEvent, func(), error) {
50+
if s.events == nil {
51+
s.events = make(chan autostandby.InstanceEvent)
52+
}
53+
return s.events, func() {}, nil
54+
}
55+
56+
type statusConnectionSource struct {
57+
connections []autostandby.Connection
58+
}
59+
60+
func (s *statusConnectionSource) ListConnections(context.Context) ([]autostandby.Connection, error) {
61+
return append([]autostandby.Connection(nil), s.connections...), nil
62+
}
63+
64+
func (s *statusConnectionSource) OpenStream(context.Context) (autostandby.ConnectionStream, error) {
65+
return &statusConnectionStream{
66+
events: make(chan autostandby.ConnectionEvent),
67+
errs: make(chan error),
68+
}, nil
69+
}
70+
71+
type statusConnectionStream struct {
72+
events chan autostandby.ConnectionEvent
73+
errs chan error
74+
}
75+
76+
func (s *statusConnectionStream) Events() <-chan autostandby.ConnectionEvent { return s.events }
77+
78+
func (s *statusConnectionStream) Errors() <-chan error { return s.errs }
79+
80+
func (s *statusConnectionStream) Close() error { return nil }
81+
82+
func TestGetAutoStandbyStatusUnsupportedWithoutController(t *testing.T) {
83+
t.Parallel()
84+
85+
base := newTestService(t)
86+
base.InstanceManager = &captureStatusManager{
87+
Manager: base.InstanceManager,
88+
instance: &instances.Instance{
89+
StoredMetadata: instances.StoredMetadata{
90+
Id: "inst-1",
91+
Name: "inst-1",
92+
NetworkEnabled: true,
93+
IP: "192.168.100.10",
94+
AutoStandby: &autostandby.Policy{Enabled: true, IdleTimeout: "5m"},
95+
},
96+
State: instances.StateRunning,
97+
},
98+
}
99+
100+
resp, err := base.GetAutoStandbyStatus(ctx(), oapi.GetAutoStandbyStatusRequestObject{Id: "inst-1"})
101+
require.NoError(t, err)
102+
103+
statusResp, ok := resp.(oapi.GetAutoStandbyStatus200JSONResponse)
104+
require.True(t, ok)
105+
assert.False(t, statusResp.Supported)
106+
assert.Equal(t, oapi.AutoStandbyStatusStatusUnsupported, statusResp.Status)
107+
assert.Equal(t, oapi.AutoStandbyStatusReasonUnsupportedPlatform, statusResp.Reason)
108+
}
109+
110+
func TestGetAutoStandbyStatusActive(t *testing.T) {
111+
t.Parallel()
112+
113+
inst := &instances.Instance{
114+
StoredMetadata: instances.StoredMetadata{
115+
Id: "inst-2",
116+
Name: "inst-2",
117+
NetworkEnabled: true,
118+
IP: "192.168.100.20",
119+
AutoStandby: &autostandby.Policy{Enabled: true, IdleTimeout: "5m"},
120+
},
121+
State: instances.StateRunning,
122+
}
123+
124+
now := time.Date(2026, 4, 6, 12, 0, 0, 0, time.UTC)
125+
store := &statusStore{
126+
instances: []autostandby.Instance{{
127+
ID: "inst-2",
128+
Name: "inst-2",
129+
State: autostandby.StateRunning,
130+
NetworkEnabled: true,
131+
IP: "192.168.100.20",
132+
AutoStandby: &autostandby.Policy{Enabled: true, IdleTimeout: "5m"},
133+
}},
134+
}
135+
source := &statusConnectionSource{connections: []autostandby.Connection{{
136+
OriginalSourceIP: mustStatusAddr("1.2.3.4"),
137+
OriginalSourcePort: 51234,
138+
OriginalDestinationIP: mustStatusAddr("192.168.100.20"),
139+
OriginalDestinationPort: 8080,
140+
TCPState: autostandby.TCPStateEstablished,
141+
}}}
142+
controller := autostandby.NewController(store, source, autostandby.ControllerOptions{
143+
Now: func() time.Time { return now },
144+
})
145+
require.NoError(t, controller.Run(withCanceledContext(t)))
146+
147+
base := newTestService(t)
148+
base.InstanceManager = &captureStatusManager{Manager: base.InstanceManager, instance: inst}
149+
base.AutoStandbyController = controller
150+
151+
resp, err := base.GetAutoStandbyStatus(ctx(), oapi.GetAutoStandbyStatusRequestObject{Id: "inst-2"})
152+
require.NoError(t, err)
153+
154+
statusResp, ok := resp.(oapi.GetAutoStandbyStatus200JSONResponse)
155+
require.True(t, ok)
156+
assert.True(t, statusResp.Supported)
157+
assert.Equal(t, oapi.AutoStandbyStatusStatusActive, statusResp.Status)
158+
assert.Equal(t, 1, statusResp.ActiveInboundConnections)
159+
}
160+
161+
func withCanceledContext(t *testing.T) context.Context {
162+
t.Helper()
163+
ctx, cancel := context.WithCancel(context.Background())
164+
cancel()
165+
return ctx
166+
}
167+
168+
func mustStatusAddr(raw string) netip.Addr {
169+
return netip.MustParseAddr(raw)
170+
}

cmd/api/api/instances.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,13 @@ func (s *ApiService) CreateInstance(ctx context.Context, request oapi.CreateInst
278278
if request.Body.Cmd != nil {
279279
cmd = *request.Body.Cmd
280280
}
281+
autoStandby, err := toDomainAutoStandbyPolicy(request.Body.AutoStandby)
282+
if err != nil {
283+
return oapi.CreateInstance400JSONResponse{
284+
Code: "invalid_auto_standby",
285+
Message: err.Error(),
286+
}, nil
287+
}
281288

282289
domainReq := instances.CreateInstanceRequest{
283290
Name: request.Body.Name,
@@ -302,6 +309,7 @@ func (s *ApiService) CreateInstance(ctx context.Context, request oapi.CreateInst
302309
Cmd: cmd,
303310
SkipKernelHeaders: request.Body.SkipKernelHeaders != nil && *request.Body.SkipKernelHeaders,
304311
SkipGuestAgent: request.Body.SkipGuestAgent != nil && *request.Body.SkipGuestAgent,
312+
AutoStandby: autoStandby,
305313
}
306314
if request.Body.SnapshotPolicy != nil {
307315
snapshotPolicy, err := toInstanceSnapshotPolicy(*request.Body.SnapshotPolicy)
@@ -924,9 +932,17 @@ func (s *ApiService) UpdateInstance(ctx context.Context, request oapi.UpdateInst
924932
if request.Body.Env != nil {
925933
env = *request.Body.Env
926934
}
935+
autoStandby, err := toDomainAutoStandbyPolicy(request.Body.AutoStandby)
936+
if err != nil {
937+
return oapi.UpdateInstance400JSONResponse{
938+
Code: "invalid_auto_standby",
939+
Message: err.Error(),
940+
}, nil
941+
}
927942

928943
result, err := s.InstanceManager.UpdateInstance(ctx, inst.Id, instances.UpdateInstanceRequest{
929-
Env: env,
944+
Env: env,
945+
AutoStandby: autoStandby,
930946
})
931947
if err != nil {
932948
switch {
@@ -1057,6 +1073,7 @@ func instanceToOAPI(inst instances.Instance) oapi.Instance {
10571073
oapiPolicy := toOAPISnapshotPolicy(*inst.SnapshotPolicy)
10581074
oapiInst.SnapshotPolicy = &oapiPolicy
10591075
}
1076+
oapiInst.AutoStandby = toOAPIAutoStandbyPolicy(inst.AutoStandby)
10601077

10611078
// Convert volume attachments
10621079
if len(inst.Volumes) > 0 {

0 commit comments

Comments
 (0)