diff --git a/internal/exporter/collector/collector.go b/internal/exporter/collector/collector.go index 79957242..bc9249e8 100644 --- a/internal/exporter/collector/collector.go +++ b/internal/exporter/collector/collector.go @@ -28,12 +28,15 @@ import ( "github.com/NVIDIA/fleet-intelligence-sdk/pkg/eventstore" "github.com/NVIDIA/fleet-intelligence-sdk/pkg/log" pkgmetrics "github.com/NVIDIA/fleet-intelligence-sdk/pkg/metrics" + nvidianvml "github.com/NVIDIA/fleet-intelligence-sdk/pkg/nvidia-query/nvml" "github.com/google/uuid" "github.com/NVIDIA/fleet-intelligence-agent/internal/config" "github.com/NVIDIA/fleet-intelligence-agent/internal/machineinfo" ) +const initialMachineInfoWait = 5 * time.Second + // GenerateCollectionID generates a unique identifier for a data collection cycle func GenerateCollectionID() string { bytes := make([]byte, 16) @@ -65,12 +68,27 @@ type Collector interface { // collector implements the Collector interface type collector struct { - config *config.HealthExporterConfig - metricsStore pkgmetrics.Store - eventStore eventstore.Store - componentsRegistry components.Registry - machineID string // Agent's stable identity from server initialization - dcgmGPUIndexes map[string]string // UUID → DCGM device ID override for GPU indices + config *config.HealthExporterConfig + metricsStore pkgmetrics.Store + eventStore eventstore.Store + componentsRegistry components.Registry + machineID string // Agent's stable identity from server initialization + dcgmGPUIndexes map[string]string // UUID → DCGM device ID override for GPU indices + machineInfoProvider machineInfoProvider +} + +type collectorOptions struct { + nvmlInstance nvidianvml.Instance +} + +// Option configures optional collector dependencies. +type Option func(*collectorOptions) + +// WithNVMLInstance enables cached machine-info collection for health exports. +func WithNVMLInstance(nvmlInstance nvidianvml.Instance) Option { + return func(o *collectorOptions) { + o.nvmlInstance = nvmlInstance + } } // New creates a new health data collector @@ -81,14 +99,31 @@ func New( componentsRegistry components.Registry, machineID string, dcgmGPUIndexes map[string]string, + opts ...Option, ) Collector { + var collectorOpts collectorOptions + for _, opt := range opts { + opt(&collectorOpts) + } + + var provider machineInfoProvider + if cfg != nil && cfg.IncludeMachineInfo && collectorOpts.nvmlInstance != nil { + var machineInfoOpts []machineinfo.MachineInfoOption + if len(dcgmGPUIndexes) > 0 { + machineInfoOpts = append(machineInfoOpts, machineinfo.WithDCGMGPUIndexes(dcgmGPUIndexes)) + } + provider = newCachedMachineInfoProvider(collectorOpts.nvmlInstance, 0, machineInfoOpts...) + provider.RefreshAsync(context.Background()) + } + return &collector{ - config: cfg, - metricsStore: metricsStore, - eventStore: eventStore, - componentsRegistry: componentsRegistry, - machineID: machineID, - dcgmGPUIndexes: dcgmGPUIndexes, + config: cfg, + metricsStore: metricsStore, + eventStore: eventStore, + componentsRegistry: componentsRegistry, + machineID: machineID, + dcgmGPUIndexes: dcgmGPUIndexes, + machineInfoProvider: provider, } } @@ -110,6 +145,11 @@ func (c *collector) Collect(ctx context.Context) (*HealthData, error) { GPUUUIDToIndex: cloneStringMap(c.dcgmGPUIndexes), } + // Collect machine info if enabled. The converter only exports selected fields. + if c.config.IncludeMachineInfo { + c.collectMachineInfo(ctx, data) + } + // Collect metrics if enabled if c.config.IncludeMetrics { if err := c.collectMetrics(ctx, data); err != nil { @@ -134,6 +174,23 @@ func (c *collector) Collect(ctx context.Context) (*HealthData, error) { return data, nil } +// collectMachineInfo reads cached machine info and triggers a best-effort refresh. +func (c *collector) collectMachineInfo(ctx context.Context, data *HealthData) { + if c.machineInfoProvider == nil { + return + } + + if _, ok := c.machineInfoProvider.Get(); !ok { + c.machineInfoProvider.WaitForInitialRefresh(ctx, initialMachineInfoWait) + } + + if machineInfo, ok := c.machineInfoProvider.Get(); ok { + data.MachineInfo = machineInfo + } + + c.machineInfoProvider.RefreshAsync(ctx) +} + // collectMetrics collects metrics data from the metrics store func (c *collector) collectMetrics(ctx context.Context, data *HealthData) error { if c.metricsStore == nil { diff --git a/internal/exporter/collector/collector_test.go b/internal/exporter/collector/collector_test.go index 493ce082..65450d9a 100644 --- a/internal/exporter/collector/collector_test.go +++ b/internal/exporter/collector/collector_test.go @@ -134,6 +134,28 @@ func TestCollector_CollectMachineInfo_NoNVML(t *testing.T) { assert.Nil(t, data.MachineInfo, "MachineInfo should be nil without NVML") } +func TestCollector_CollectMachineInfo_UsesCachedProvider(t *testing.T) { + info := &machineinfo.MachineInfo{ + GPUInfo: &apiv1.MachineGPUInfo{ + GPUs: []apiv1.MachineGPUInstance{{UUID: "GPU-123", GPUIndex: "0"}}, + }, + } + provider := &fakeMachineInfoProvider{ + info: info, + ok: true, + } + data := &HealthData{} + c := &collector{ + machineInfoProvider: provider, + } + + c.collectMachineInfo(context.Background(), data) + + assert.Same(t, info, data.MachineInfo) + assert.True(t, provider.refreshed) + assert.False(t, provider.waited) +} + func TestCachedMachineInfoProvider_DeduplicatesConcurrentRefresh(t *testing.T) { originalGetMachineInfo := getMachineInfo defer func() { @@ -603,6 +625,26 @@ func (m *mockEventStore) Bucket(name string, opts ...eventstore.OpOption) (event return nil, nil } +type fakeMachineInfoProvider struct { + info *machineinfo.MachineInfo + ok bool + refreshed bool + waited bool +} + +func (f *fakeMachineInfoProvider) Get() (*machineinfo.MachineInfo, bool) { + return f.info, f.ok +} + +func (f *fakeMachineInfoProvider) RefreshAsync(parent context.Context) { + f.refreshed = true +} + +func (f *fakeMachineInfoProvider) WaitForInitialRefresh(ctx context.Context, maxWait time.Duration) bool { + f.waited = true + return f.ok +} + type mockComponent struct { name string events []apiv1.Event diff --git a/internal/exporter/converter/otlp.go b/internal/exporter/converter/otlp.go index 08c22cbd..131fb642 100644 --- a/internal/exporter/converter/otlp.go +++ b/internal/exporter/converter/otlp.go @@ -127,6 +127,17 @@ func (c *otlpConverter) createOTLPResource(data *collector.HealthData) *resource }) } + if data.MachineInfo != nil && data.MachineInfo.GPUInfo != nil && len(data.MachineInfo.GPUInfo.GPUs) > 0 { + if gpus, err := json.Marshal(data.MachineInfo.GPUInfo.GPUs); err == nil { + attributes = append(attributes, &commonv1.KeyValue{ + Key: "gpuInfo.gpus", + Value: &commonv1.AnyValue{ + Value: &commonv1.AnyValue_StringValue{StringValue: string(gpus)}, + }, + }) + } + } + return &resourcev1.Resource{ Attributes: attributes, } diff --git a/internal/exporter/converter/otlp_test.go b/internal/exporter/converter/otlp_test.go index 92d2f192..5823b5b1 100644 --- a/internal/exporter/converter/otlp_test.go +++ b/internal/exporter/converter/otlp_test.go @@ -773,6 +773,49 @@ func TestOTLPConverter_ResourceAttributes(t *testing.T) { assert.Equal(t, "test-machine-123", attrMap["machine.id"]) } +func TestOTLPConverter_ResourceAttributes_IncludesOnlyGPUInfoGPUs(t *testing.T) { + data := &collector.HealthData{ + Timestamp: time.Now(), + MachineID: "test-machine-123", + MachineInfo: &machineinfo.MachineInfo{ + GPUInfo: &apiv1.MachineGPUInfo{ + Product: "NVIDIA-H100", + Manufacturer: "NVIDIA", + Architecture: "hopper", + Memory: "85899345920", + GPUs: []apiv1.MachineGPUInstance{ + { + UUID: "GPU-123", + GPUIndex: "0", + BusID: "0000:01:00.0", + SN: "serial-123", + MinorID: "0", + BoardID: 7, + VBIOSVersion: "96.00.68.00.01", + ChassisSN: "chassis-123", + }, + }, + }, + }, + } + + converter := NewOTLPConverter() + otlpData := converter.Convert(data) + + attrs := otlpData.Metrics.ResourceMetrics[0].Resource.Attributes + gpus := findAttribute(t, attrs, "gpuInfo.gpus").GetStringValue() + assert.JSONEq(t, `[{"uuid":"GPU-123","gpuIndex":"0","busID":"0000:01:00.0","sn":"serial-123","minorID":"0","boardID":7,"vbiosVersion":"96.00.68.00.01","chassisSN":"chassis-123"}]`, gpus) + + for _, attr := range attrs { + assert.NotContains(t, []string{ + "gpuInfo.product", + "gpuInfo.manufacturer", + "gpuInfo.architecture", + "gpuInfo.memory", + }, attr.Key) + } +} + func TestOTLPConverter_Interface(t *testing.T) { // Verify otlpConverter implements OTLPConverter interface var _ OTLPConverter = (*otlpConverter)(nil) diff --git a/internal/exporter/exporter.go b/internal/exporter/exporter.go index 59605203..ce0bd5e9 100644 --- a/internal/exporter/exporter.go +++ b/internal/exporter/exporter.go @@ -82,6 +82,7 @@ func New(ctx context.Context, opts ...ExporterOption) (Exporter, error) { options.componentsRegistry, options.machineID, options.dcgmGPUIndexes, + collector.WithNVMLInstance(options.nvmlInstance), ) otlpConverter := converter.NewOTLPConverter() diff --git a/internal/exporter/options.go b/internal/exporter/options.go index 159cfe83..e470d82f 100644 --- a/internal/exporter/options.go +++ b/internal/exporter/options.go @@ -24,6 +24,7 @@ import ( "github.com/NVIDIA/fleet-intelligence-sdk/components" "github.com/NVIDIA/fleet-intelligence-sdk/pkg/eventstore" pkgmetrics "github.com/NVIDIA/fleet-intelligence-sdk/pkg/metrics" + nvidianvml "github.com/NVIDIA/fleet-intelligence-sdk/pkg/nvidia-query/nvml" "github.com/NVIDIA/fleet-intelligence-agent/internal/config" ) @@ -40,6 +41,7 @@ type exporterOptions struct { metricsStore pkgmetrics.Store eventStore eventstore.Store componentsRegistry components.Registry + nvmlInstance nvidianvml.Instance httpClient *http.Client timeout time.Duration dbRW *sql.DB // Read-write database connection @@ -84,6 +86,14 @@ func WithComponentsRegistry(registry components.Registry) ExporterOption { } } +// WithNVMLInstance sets the NVML instance used for cached machine-info collection. +func WithNVMLInstance(instance nvidianvml.Instance) ExporterOption { + return func(c *exporterOptions) error { + c.nvmlInstance = instance + return nil + } +} + // WithHTTPClient sets a custom HTTP client func WithHTTPClient(client *http.Client) ExporterOption { return func(c *exporterOptions) error { diff --git a/internal/server/server.go b/internal/server/server.go index d01dee4c..791ace43 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -395,6 +395,7 @@ func New(ctx context.Context, auditLogger log.AuditLogger, config *config.Config exporter.WithMetricsStore(metricsSQLiteStore), exporter.WithEventStore(eventStore), exporter.WithComponentsRegistry(s.componentsRegistry), + exporter.WithNVMLInstance(nvmlInstance), exporter.WithDatabaseConnections(dbRW, dbRO), exporter.WithMachineID(machineID), exporter.WithDCGMGPUIndexes(dcgmGPUIndexes),