diff --git a/deployments/helm/fleet-intelligence-agent/README.md b/deployments/helm/fleet-intelligence-agent/README.md index 1c28a645..2a5a32c6 100644 --- a/deployments/helm/fleet-intelligence-agent/README.md +++ b/deployments/helm/fleet-intelligence-agent/README.md @@ -51,6 +51,8 @@ Common values (defaults from `values.yaml`): | `enroll.tokenSecretName` | `""` | Secret name for enrollment token. | | `enroll.tokenSecretKey` | `token` | Secret key for enrollment token. | | `enroll.tokenValue` | `""` | Inline token value (optional). | +| `enroll.nodeGroup` | `(not set)` | Optional `--node-group` enroll flag. Omit key to omit flag, set `""` to clear stored value. | +| `enroll.computeZone` | `(not set)` | Optional `--compute-zone` enroll flag. Omit key to omit flag, set `""` to clear stored value. | | `enroll.securityContext.runAsUser` | `0` | Run enrollment init as root. | | `ports.http` | `15133` | HTTP port. | | `resources.requests.cpu` | `100m` | CPU request. | @@ -70,6 +72,7 @@ Common values (defaults from `values.yaml`): `enroll.enabled` and `enroll.unenroll` are mutually exclusive; do not set both to `true`. Set `enroll.force=true` to append `--force` to `fleetint enroll`. +Set `enroll.nodeGroup` / `enroll.computeZone` to pass optional enrollment metadata via the init container command. See `docs/install-helm.md` for the enrollment flow and secret creation steps. diff --git a/deployments/helm/fleet-intelligence-agent/templates/daemonset.yaml b/deployments/helm/fleet-intelligence-agent/templates/daemonset.yaml index 2900555e..8c059f83 100644 --- a/deployments/helm/fleet-intelligence-agent/templates/daemonset.yaml +++ b/deployments/helm/fleet-intelligence-agent/templates/daemonset.yaml @@ -74,6 +74,12 @@ spec: {{- if .Values.enroll.force }} --force {{- end }} + {{- if hasKey .Values.enroll "nodeGroup" }} + --node-group {{ .Values.enroll.nodeGroup | quote }} + {{- end }} + {{- if hasKey .Values.enroll "computeZone" }} + --compute-zone {{ .Values.enroll.computeZone | quote }} + {{- end }} securityContext: {{- toYaml .Values.securityContext | nindent 12 }} env: diff --git a/deployments/helm/fleet-intelligence-agent/values.yaml b/deployments/helm/fleet-intelligence-agent/values.yaml index 71834352..747bc608 100644 --- a/deployments/helm/fleet-intelligence-agent/values.yaml +++ b/deployments/helm/fleet-intelligence-agent/values.yaml @@ -61,6 +61,13 @@ enroll: tokenSecretName: "" tokenSecretKey: "token" tokenValue: "" + # Optional enrollment metadata: + # - omit key to preserve existing stored value + # - set non-empty string to overwrite + # - set empty string ("") to clear + # + # nodeGroup: "prod-a" + # computeZone: "us-east-1c" ports: http: 15133 diff --git a/docs/install-helm.md b/docs/install-helm.md index 3165dbca..c7aac70b 100644 --- a/docs/install-helm.md +++ b/docs/install-helm.md @@ -75,6 +75,23 @@ helm upgrade fleet-intelligence-agent oci://ghcr.io/nvidia/charts/fleet-intellig --set enroll.tokenSecretName="$ENROLL_TOKEN_SECRET_NAME" ``` +Optional: include node metadata during automatic enrollment: + +```bash +helm upgrade fleet-intelligence-agent oci://ghcr.io/nvidia/charts/fleet-intelligence-agent \ + --version "$CHART_VERSION" \ + --namespace "$NS" \ + --set enroll.enabled=true \ + --set enroll.endpoint="$ENROLL_ENDPOINT" \ + --set enroll.tokenSecretName="$ENROLL_TOKEN_SECRET_NAME" \ + --set-string enroll.nodeGroup="prod-a" \ + --set-string enroll.computeZone="us-east-1c" +``` + +Notes: +- Omit `enroll.nodeGroup` / `enroll.computeZone` keys to omit the flags and preserve existing stored values. +- Set either value to an empty string to clear it (for example: `--set-string enroll.nodeGroup=""`). + Upgrade (no enrollment): ```bash diff --git a/internal/exporter/exporter.go b/internal/exporter/exporter.go index b9f6ec98..52e67d2d 100644 --- a/internal/exporter/exporter.go +++ b/internal/exporter/exporter.go @@ -116,6 +116,16 @@ func (e *healthExporter) Start() error { log.Logger.Infow("Starting health exporter") + // In offline mode, emit one export immediately so short runs don't exit + // before the first ticker cycle and leave an empty output directory. + if e.options.config.OfflineMode { + if err := e.export(); err != nil { + log.Logger.Errorw("Initial offline export failed", "error", err) + } else { + e.lastExport = time.Now().UTC() + } + } + // Start the health export ticker go func() { ticker := time.NewTicker(e.options.config.Interval.Duration) diff --git a/internal/exporter/exporter_test.go b/internal/exporter/exporter_test.go index 8739eae1..8bc470ab 100644 --- a/internal/exporter/exporter_test.go +++ b/internal/exporter/exporter_test.go @@ -284,6 +284,85 @@ func TestStart(t *testing.T) { require.NoError(t, err) }) + t.Run("offline mode performs initial export before first tick", func(t *testing.T) { + tmpDir := t.TempDir() + cfg := &config.HealthExporterConfig{ + Interval: metav1.Duration{Duration: 1 * time.Hour}, + Timeout: metav1.Duration{Duration: 30 * time.Second}, + OfflineMode: true, + OutputPath: tmpDir, + OutputFormat: "json", + } + + exporter, err := New(ctx, WithConfig(cfg), WithMachineID("test-machine-id")) + require.NoError(t, err) + require.NotNil(t, exporter) + + err = exporter.Start() + require.NoError(t, err) + + // Initial export should be written immediately, without waiting for ticker. + time.Sleep(100 * time.Millisecond) + entries, err := os.ReadDir(tmpDir) + require.NoError(t, err) + assert.Greater(t, len(entries), 0, "Expected offline files from initial export") + + err = exporter.Stop() + require.NoError(t, err) + }) + + t.Run("offline initial export includes collected data", func(t *testing.T) { + tmpDir := t.TempDir() + cfg := &config.HealthExporterConfig{ + Interval: metav1.Duration{Duration: 1 * time.Hour}, + Timeout: metav1.Duration{Duration: 30 * time.Second}, + OfflineMode: true, + OutputPath: tmpDir, + OutputFormat: "json", + } + + exporter, err := New(ctx, WithConfig(cfg), WithMachineID("test-machine-id")) + require.NoError(t, err) + require.NotNil(t, exporter) + + he := exporter.(*healthExporter) + mockCollector := &MockCollector{} + mockCollector.On("Collect", mock.Anything).Return(&collector.HealthData{ + MachineID: "test-machine", + Timestamp: time.Now(), + Metrics: []pkgmetrics.Metric{ + { + Name: "test_metric", + Value: 42.0, + UnixMilliseconds: time.Now().UnixMilli(), + }, + }, + }, nil).Once() + he.collector = mockCollector + + err = exporter.Start() + require.NoError(t, err) + + entries, err := os.ReadDir(tmpDir) + require.NoError(t, err) + require.Greater(t, len(entries), 0, "Expected offline files from initial export") + + allContent := "" + for _, entry := range entries { + fullPath := filepath.Join(tmpDir, entry.Name()) + content, readErr := os.ReadFile(fullPath) + require.NoError(t, readErr) + allContent += string(content) + } + assert.Contains(t, allContent, "test_metric") + assert.Contains(t, allContent, "machine.id") + + mockCollector.AssertExpectations(t) + + err = exporter.Stop() + require.NoError(t, err) + }) + } // TestStop tests the Stop function diff --git a/internal/server/server.go b/internal/server/server.go index 791ace43..b3d69174 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -89,6 +89,12 @@ type Server struct { // signal handler in run.go both call Stop(), so without this guard // components, databases, and the health exporter would be closed twice. stopOnce sync.Once + loopWG sync.WaitGroup + + // loopCtx/loopCancel control background inventory and attestation goroutines. + // Stop() cancels this context and waits on loopWG for graceful shutdown. + loopCtx context.Context + loopCancel context.CancelFunc machineID string } @@ -210,6 +216,24 @@ func getAttestationTimeout(cfg *config.Config) time.Duration { return config.DefaultAttestationTimeout } +func waitForWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + if timeout <= 0 { + <-done + return true + } + select { + case <-done: + return true + case <-time.After(timeout): + return false + } +} + // shouldEnableComponent determines if a component should be enabled based on configuration func shouldEnableComponent(name string, enabledByDefault bool, config *config.Config) bool { shouldEnable := enabledByDefault @@ -235,11 +259,14 @@ func New(ctx context.Context, auditLogger log.AuditLogger, config *config.Config return nil, err } + loopCtx, loopCancel := context.WithCancel(ctx) s := &Server{ auditLogger: auditLogger, dbRW: dbRW, dbRO: dbRO, config: config, + loopCtx: loopCtx, + loopCancel: loopCancel, } defer func() { if retErr != nil { @@ -383,8 +410,8 @@ func New(ctx context.Context, auditLogger log.AuditLogger, config *config.Config } } - s.startInventoryLoop(ctx, config, nvmlInstance, dcgmGPUIndexes) - s.startAttestationLoop(ctx, config) + s.startInventoryLoop(loopCtx, config, nvmlInstance, dcgmGPUIndexes) + s.startAttestationLoop(loopCtx, config) // Create and start health exporter with all dependencies if enabled if config.HealthExporter != nil { @@ -462,7 +489,9 @@ func (s *Server) startInventoryLoop( StartupJitter: inventory.DefaultStartupJitter, }) + s.loopWG.Add(1) go func() { + defer s.loopWG.Done() if err := manager.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { log.Logger.Errorw("inventory loop manager exited", "error", err) } @@ -497,7 +526,9 @@ func (s *Server) startAttestationLoop(ctx context.Context, cfg *config.Config) { }, ) + s.loopWG.Add(1) go func() { + defer s.loopWG.Done() if err := manager.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { log.Logger.Errorw("attestation loop exited", "error", err) } @@ -516,6 +547,15 @@ func (s *Server) GetHealthExporter() exporter.Exporter { // signal handler in run.go both invoke it. func (s *Server) Stop() { s.stopOnce.Do(func() { + // Signal inventory/attestation loops to stop and wait for graceful exit + // before we close dependencies they may still be using. + if s.loopCancel != nil { + s.loopCancel() + } + if !waitForWaitGroup(&s.loopWG, 10*time.Second) { + log.Logger.Warnw("timed out waiting for background loops to stop") + } + // Gracefully shut down the HTTP server so in-flight requests complete // before we close databases and components underneath them. if s.srv != nil { diff --git a/internal/server/server_test.go b/internal/server/server_test.go index f00d1504..010e4ff9 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -20,6 +20,7 @@ import ( "net" "os" "path/filepath" + "sync" "testing" "time" @@ -158,6 +159,26 @@ func TestGetInventorySyncTimeout(t *testing.T) { } } +func TestWaitForWaitGroup(t *testing.T) { + t.Run("returns true when waitgroup completes", func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + }() + assert.True(t, waitForWaitGroup(&wg, 200*time.Millisecond)) + }) + + t.Run("returns false on timeout", func(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + assert.False(t, waitForWaitGroup(&wg, 10*time.Millisecond)) + wg.Done() + assert.True(t, waitForWaitGroup(&wg, 100*time.Millisecond)) + }) +} + func TestGetAttestationSettings(t *testing.T) { tests := []struct { name string @@ -439,6 +460,24 @@ func TestServerStop(t *testing.T) { } } +func TestServerStopCancelsBackgroundLoops(t *testing.T) { + loopCtx, loopCancel := context.WithCancel(context.Background()) + s := &Server{ + loopCtx: loopCtx, + loopCancel: loopCancel, + } + s.loopWG.Add(1) + go func() { + defer s.loopWG.Done() + <-loopCtx.Done() + }() + + s.Stop() + + assert.ErrorIs(t, loopCtx.Err(), context.Canceled) + assert.True(t, waitForWaitGroup(&s.loopWG, 100*time.Millisecond)) +} + // TestServerStopWithDatabases tests Stop with actual database connections. func TestServerStopWithDatabases(t *testing.T) { ctx := context.Background()