Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deployments/helm/fleet-intelligence-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Common values (defaults from `values.yaml`):
| `enroll.tokenSecretName` | `""` | Secret name for enrollment token. |
| `enroll.tokenSecretKey` | `token` | Secret key for enrollment token. |
| `enroll.tokenValue` | `""` | Inline token value (optional). |
| `enroll.nodeGroup` | `(not set)` | Optional `--node-group` enroll flag. Omit key to omit flag, set `""` to clear stored value. |
| `enroll.computeZone` | `(not set)` | Optional `--compute-zone` enroll flag. Omit key to omit flag, set `""` to clear stored value. |
| `enroll.securityContext.runAsUser` | `0` | Run enrollment init as root. |
| `ports.http` | `15133` | HTTP port. |
| `resources.requests.cpu` | `100m` | CPU request. |
Expand All @@ -70,6 +72,7 @@ Common values (defaults from `values.yaml`):

`enroll.enabled` and `enroll.unenroll` are mutually exclusive; do not set both to `true`.
Set `enroll.force=true` to append `--force` to `fleetint enroll`.
Set `enroll.nodeGroup` / `enroll.computeZone` to pass optional enrollment metadata via the init container command.

See `docs/install-helm.md` for the enrollment flow and secret creation steps.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ spec:
{{- if .Values.enroll.force }}
--force
{{- end }}
{{- if hasKey .Values.enroll "nodeGroup" }}
--node-group {{ .Values.enroll.nodeGroup | quote }}
{{- end }}
{{- if hasKey .Values.enroll "computeZone" }}
--compute-zone {{ .Values.enroll.computeZone | quote }}
{{- end }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
env:
Expand Down
7 changes: 7 additions & 0 deletions deployments/helm/fleet-intelligence-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ enroll:
tokenSecretName: ""
tokenSecretKey: "token"
tokenValue: ""
# Optional enrollment metadata:
# - omit key to preserve existing stored value
# - set non-empty string to overwrite
# - set empty string ("") to clear
#
# nodeGroup: "prod-a"
# computeZone: "us-east-1c"

ports:
http: 15133
Expand Down
17 changes: 17 additions & 0 deletions docs/install-helm.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ helm upgrade fleet-intelligence-agent oci://ghcr.io/nvidia/charts/fleet-intellig
--set enroll.tokenSecretName="$ENROLL_TOKEN_SECRET_NAME"
```

Optional: include node metadata during automatic enrollment:

```bash
helm upgrade fleet-intelligence-agent oci://ghcr.io/nvidia/charts/fleet-intelligence-agent \
--version "$CHART_VERSION" \
--namespace "$NS" \
--set enroll.enabled=true \
--set enroll.endpoint="$ENROLL_ENDPOINT" \
--set enroll.tokenSecretName="$ENROLL_TOKEN_SECRET_NAME" \
--set-string enroll.nodeGroup="prod-a" \
--set-string enroll.computeZone="us-east-1c"
```

Notes:
- Omit `enroll.nodeGroup` / `enroll.computeZone` keys to omit the flags and preserve existing stored values.
- Set either value to an empty string to clear it (for example: `--set-string enroll.nodeGroup=""`).

Upgrade (no enrollment):

```bash
Expand Down
10 changes: 10 additions & 0 deletions internal/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ func (e *healthExporter) Start() error {

log.Logger.Infow("Starting health exporter")

// In offline mode, emit one export immediately so short runs don't exit
Comment thread
ambermingxin marked this conversation as resolved.
// before the first ticker cycle and leave an empty output directory.
if e.options.config.OfflineMode {
if err := e.export(); err != nil {
log.Logger.Errorw("Initial offline export failed", "error", err)
} else {
e.lastExport = time.Now().UTC()
}
}

// Start the health export ticker
go func() {
ticker := time.NewTicker(e.options.config.Interval.Duration)
Expand Down
79 changes: 79 additions & 0 deletions internal/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,85 @@ func TestStart(t *testing.T) {
require.NoError(t, err)
})

t.Run("offline mode performs initial export before first tick", func(t *testing.T) {
tmpDir := t.TempDir()
cfg := &config.HealthExporterConfig{
Interval: metav1.Duration{Duration: 1 * time.Hour},
Timeout: metav1.Duration{Duration: 30 * time.Second},
OfflineMode: true,
OutputPath: tmpDir,
OutputFormat: "json",
}

exporter, err := New(ctx, WithConfig(cfg), WithMachineID("test-machine-id"))
require.NoError(t, err)
require.NotNil(t, exporter)

err = exporter.Start()
require.NoError(t, err)

// Initial export should be written immediately, without waiting for ticker.
time.Sleep(100 * time.Millisecond)
entries, err := os.ReadDir(tmpDir)
require.NoError(t, err)
assert.Greater(t, len(entries), 0, "Expected offline files from initial export")

err = exporter.Stop()
require.NoError(t, err)
})

t.Run("offline initial export includes collected data", func(t *testing.T) {
tmpDir := t.TempDir()
cfg := &config.HealthExporterConfig{
Interval: metav1.Duration{Duration: 1 * time.Hour},
Timeout: metav1.Duration{Duration: 30 * time.Second},
OfflineMode: true,
OutputPath: tmpDir,
OutputFormat: "json",
}

exporter, err := New(ctx, WithConfig(cfg), WithMachineID("test-machine-id"))
require.NoError(t, err)
require.NotNil(t, exporter)

he := exporter.(*healthExporter)
mockCollector := &MockCollector{}
mockCollector.On("Collect", mock.Anything).Return(&collector.HealthData{
MachineID: "test-machine",
Timestamp: time.Now(),
Metrics: []pkgmetrics.Metric{
{
Name: "test_metric",
Value: 42.0,
UnixMilliseconds: time.Now().UnixMilli(),
},
},
}, nil).Once()
he.collector = mockCollector

err = exporter.Start()
require.NoError(t, err)

entries, err := os.ReadDir(tmpDir)
require.NoError(t, err)
require.Greater(t, len(entries), 0, "Expected offline files from initial export")

allContent := ""
for _, entry := range entries {
fullPath := filepath.Join(tmpDir, entry.Name())
content, readErr := os.ReadFile(fullPath)
require.NoError(t, readErr)
allContent += string(content)
}
assert.Contains(t, allContent, "test_metric")
assert.Contains(t, allContent, "machine.id")

mockCollector.AssertExpectations(t)

err = exporter.Stop()
require.NoError(t, err)
})

}

// TestStop tests the Stop function
Expand Down
44 changes: 42 additions & 2 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type Server struct {
// signal handler in run.go both call Stop(), so without this guard
// components, databases, and the health exporter would be closed twice.
stopOnce sync.Once
loopWG sync.WaitGroup

// loopCtx/loopCancel control background inventory and attestation goroutines.
// Stop() cancels this context and waits on loopWG for graceful shutdown.
loopCtx context.Context
loopCancel context.CancelFunc

machineID string
}
Expand Down Expand Up @@ -210,6 +216,24 @@ func getAttestationTimeout(cfg *config.Config) time.Duration {
return config.DefaultAttestationTimeout
}

func waitForWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
if timeout <= 0 {
<-done
return true
}
select {
case <-done:
return true
case <-time.After(timeout):
return false
}
}

// shouldEnableComponent determines if a component should be enabled based on configuration
func shouldEnableComponent(name string, enabledByDefault bool, config *config.Config) bool {
shouldEnable := enabledByDefault
Expand All @@ -235,11 +259,14 @@ func New(ctx context.Context, auditLogger log.AuditLogger, config *config.Config
return nil, err
}

loopCtx, loopCancel := context.WithCancel(ctx)
s := &Server{
auditLogger: auditLogger,
dbRW: dbRW,
dbRO: dbRO,
config: config,
loopCtx: loopCtx,
loopCancel: loopCancel,
}
defer func() {
if retErr != nil {
Expand Down Expand Up @@ -383,8 +410,8 @@ func New(ctx context.Context, auditLogger log.AuditLogger, config *config.Config
}
}

s.startInventoryLoop(ctx, config, nvmlInstance, dcgmGPUIndexes)
s.startAttestationLoop(ctx, config)
s.startInventoryLoop(loopCtx, config, nvmlInstance, dcgmGPUIndexes)
s.startAttestationLoop(loopCtx, config)

// Create and start health exporter with all dependencies if enabled
if config.HealthExporter != nil {
Expand Down Expand Up @@ -462,7 +489,9 @@ func (s *Server) startInventoryLoop(
StartupJitter: inventory.DefaultStartupJitter,
})

s.loopWG.Add(1)
go func() {
defer s.loopWG.Done()
if err := manager.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
log.Logger.Errorw("inventory loop manager exited", "error", err)
}
Expand Down Expand Up @@ -497,7 +526,9 @@ func (s *Server) startAttestationLoop(ctx context.Context, cfg *config.Config) {
},
)

s.loopWG.Add(1)
go func() {
defer s.loopWG.Done()
if err := manager.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
log.Logger.Errorw("attestation loop exited", "error", err)
}
Expand All @@ -516,6 +547,15 @@ func (s *Server) GetHealthExporter() exporter.Exporter {
// signal handler in run.go both invoke it.
func (s *Server) Stop() {
s.stopOnce.Do(func() {
// Signal inventory/attestation loops to stop and wait for graceful exit
// before we close dependencies they may still be using.
if s.loopCancel != nil {
s.loopCancel()
}
if !waitForWaitGroup(&s.loopWG, 10*time.Second) {
log.Logger.Warnw("timed out waiting for background loops to stop")
}

// Gracefully shut down the HTTP server so in-flight requests complete
// before we close databases and components underneath them.
if s.srv != nil {
Expand Down
39 changes: 39 additions & 0 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -158,6 +159,26 @@ func TestGetInventorySyncTimeout(t *testing.T) {
}
}

func TestWaitForWaitGroup(t *testing.T) {
t.Run("returns true when waitgroup completes", func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(10 * time.Millisecond)
}()
assert.True(t, waitForWaitGroup(&wg, 200*time.Millisecond))
})

t.Run("returns false on timeout", func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
assert.False(t, waitForWaitGroup(&wg, 10*time.Millisecond))
wg.Done()
assert.True(t, waitForWaitGroup(&wg, 100*time.Millisecond))
})
}

func TestGetAttestationSettings(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -439,6 +460,24 @@ func TestServerStop(t *testing.T) {
}
}

func TestServerStopCancelsBackgroundLoops(t *testing.T) {
loopCtx, loopCancel := context.WithCancel(context.Background())
s := &Server{
loopCtx: loopCtx,
loopCancel: loopCancel,
}
s.loopWG.Add(1)
go func() {
defer s.loopWG.Done()
<-loopCtx.Done()
}()

s.Stop()

assert.ErrorIs(t, loopCtx.Err(), context.Canceled)
assert.True(t, waitForWaitGroup(&s.loopWG, 100*time.Millisecond))
}

// TestServerStopWithDatabases tests Stop with actual database connections.
func TestServerStopWithDatabases(t *testing.T) {
ctx := context.Background()
Expand Down
Loading