Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 69 additions & 12 deletions internal/exporter/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (
"github.com/NVIDIA/fleet-intelligence-sdk/pkg/eventstore"
"github.com/NVIDIA/fleet-intelligence-sdk/pkg/log"
pkgmetrics "github.com/NVIDIA/fleet-intelligence-sdk/pkg/metrics"
nvidianvml "github.com/NVIDIA/fleet-intelligence-sdk/pkg/nvidia-query/nvml"
"github.com/google/uuid"

"github.com/NVIDIA/fleet-intelligence-agent/internal/config"
"github.com/NVIDIA/fleet-intelligence-agent/internal/machineinfo"
)

const initialMachineInfoWait = 5 * time.Second

// GenerateCollectionID generates a unique identifier for a data collection cycle
func GenerateCollectionID() string {
bytes := make([]byte, 16)
Expand Down Expand Up @@ -65,12 +68,27 @@ type Collector interface {

// collector implements the Collector interface
type collector struct {
config *config.HealthExporterConfig
metricsStore pkgmetrics.Store
eventStore eventstore.Store
componentsRegistry components.Registry
machineID string // Agent's stable identity from server initialization
dcgmGPUIndexes map[string]string // UUID → DCGM device ID override for GPU indices
config *config.HealthExporterConfig
metricsStore pkgmetrics.Store
eventStore eventstore.Store
componentsRegistry components.Registry
machineID string // Agent's stable identity from server initialization
dcgmGPUIndexes map[string]string // UUID → DCGM device ID override for GPU indices
machineInfoProvider machineInfoProvider
}

type collectorOptions struct {
nvmlInstance nvidianvml.Instance
}

// Option configures optional collector dependencies.
type Option func(*collectorOptions)

// WithNVMLInstance enables cached machine-info collection for health exports.
func WithNVMLInstance(nvmlInstance nvidianvml.Instance) Option {
return func(o *collectorOptions) {
o.nvmlInstance = nvmlInstance
}
}

// New creates a new health data collector
Expand All @@ -81,14 +99,31 @@ func New(
componentsRegistry components.Registry,
machineID string,
dcgmGPUIndexes map[string]string,
opts ...Option,
) Collector {
var collectorOpts collectorOptions
for _, opt := range opts {
opt(&collectorOpts)
}

var provider machineInfoProvider
if cfg != nil && cfg.IncludeMachineInfo && collectorOpts.nvmlInstance != nil {
var machineInfoOpts []machineinfo.MachineInfoOption
if len(dcgmGPUIndexes) > 0 {
machineInfoOpts = append(machineInfoOpts, machineinfo.WithDCGMGPUIndexes(dcgmGPUIndexes))
}
provider = newCachedMachineInfoProvider(collectorOpts.nvmlInstance, 0, machineInfoOpts...)
provider.RefreshAsync(context.Background())
}

return &collector{
config: cfg,
metricsStore: metricsStore,
eventStore: eventStore,
componentsRegistry: componentsRegistry,
machineID: machineID,
dcgmGPUIndexes: dcgmGPUIndexes,
config: cfg,
metricsStore: metricsStore,
eventStore: eventStore,
componentsRegistry: componentsRegistry,
machineID: machineID,
dcgmGPUIndexes: dcgmGPUIndexes,
machineInfoProvider: provider,
}
}

Expand All @@ -110,6 +145,11 @@ func (c *collector) Collect(ctx context.Context) (*HealthData, error) {
GPUUUIDToIndex: cloneStringMap(c.dcgmGPUIndexes),
}

// Collect machine info if enabled. The converter only exports selected fields.
if c.config.IncludeMachineInfo {
c.collectMachineInfo(ctx, data)
}

// Collect metrics if enabled
if c.config.IncludeMetrics {
if err := c.collectMetrics(ctx, data); err != nil {
Expand All @@ -134,6 +174,23 @@ func (c *collector) Collect(ctx context.Context) (*HealthData, error) {
return data, nil
}

// collectMachineInfo reads cached machine info and triggers a best-effort refresh.
func (c *collector) collectMachineInfo(ctx context.Context, data *HealthData) {
if c.machineInfoProvider == nil {
return
}

if _, ok := c.machineInfoProvider.Get(); !ok {
c.machineInfoProvider.WaitForInitialRefresh(ctx, initialMachineInfoWait)
}

if machineInfo, ok := c.machineInfoProvider.Get(); ok {
data.MachineInfo = machineInfo
}

c.machineInfoProvider.RefreshAsync(ctx)
}

// collectMetrics collects metrics data from the metrics store
func (c *collector) collectMetrics(ctx context.Context, data *HealthData) error {
if c.metricsStore == nil {
Expand Down
42 changes: 42 additions & 0 deletions internal/exporter/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,28 @@ func TestCollector_CollectMachineInfo_NoNVML(t *testing.T) {
assert.Nil(t, data.MachineInfo, "MachineInfo should be nil without NVML")
}

func TestCollector_CollectMachineInfo_UsesCachedProvider(t *testing.T) {
info := &machineinfo.MachineInfo{
GPUInfo: &apiv1.MachineGPUInfo{
GPUs: []apiv1.MachineGPUInstance{{UUID: "GPU-123", GPUIndex: "0"}},
},
}
provider := &fakeMachineInfoProvider{
info: info,
ok: true,
}
data := &HealthData{}
c := &collector{
machineInfoProvider: provider,
}

c.collectMachineInfo(context.Background(), data)

assert.Same(t, info, data.MachineInfo)
assert.True(t, provider.refreshed)
assert.False(t, provider.waited)
}

func TestCachedMachineInfoProvider_DeduplicatesConcurrentRefresh(t *testing.T) {
originalGetMachineInfo := getMachineInfo
defer func() {
Expand Down Expand Up @@ -603,6 +625,26 @@ func (m *mockEventStore) Bucket(name string, opts ...eventstore.OpOption) (event
return nil, nil
}

type fakeMachineInfoProvider struct {
info *machineinfo.MachineInfo
ok bool
refreshed bool
waited bool
}

func (f *fakeMachineInfoProvider) Get() (*machineinfo.MachineInfo, bool) {
return f.info, f.ok
}

func (f *fakeMachineInfoProvider) RefreshAsync(parent context.Context) {
f.refreshed = true
}

func (f *fakeMachineInfoProvider) WaitForInitialRefresh(ctx context.Context, maxWait time.Duration) bool {
f.waited = true
return f.ok
}

type mockComponent struct {
name string
events []apiv1.Event
Expand Down
11 changes: 11 additions & 0 deletions internal/exporter/converter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ func (c *otlpConverter) createOTLPResource(data *collector.HealthData) *resource
})
}

if data.MachineInfo != nil && data.MachineInfo.GPUInfo != nil && len(data.MachineInfo.GPUInfo.GPUs) > 0 {
if gpus, err := json.Marshal(data.MachineInfo.GPUInfo.GPUs); err == nil {
attributes = append(attributes, &commonv1.KeyValue{
Key: "gpuInfo.gpus",
Value: &commonv1.AnyValue{
Value: &commonv1.AnyValue_StringValue{StringValue: string(gpus)},
},
})
}
}

return &resourcev1.Resource{
Attributes: attributes,
}
Expand Down
43 changes: 43 additions & 0 deletions internal/exporter/converter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,49 @@ func TestOTLPConverter_ResourceAttributes(t *testing.T) {
assert.Equal(t, "test-machine-123", attrMap["machine.id"])
}

func TestOTLPConverter_ResourceAttributes_IncludesOnlyGPUInfoGPUs(t *testing.T) {
data := &collector.HealthData{
Timestamp: time.Now(),
MachineID: "test-machine-123",
MachineInfo: &machineinfo.MachineInfo{
GPUInfo: &apiv1.MachineGPUInfo{
Product: "NVIDIA-H100",
Manufacturer: "NVIDIA",
Architecture: "hopper",
Memory: "85899345920",
GPUs: []apiv1.MachineGPUInstance{
{
UUID: "GPU-123",
GPUIndex: "0",
BusID: "0000:01:00.0",
SN: "serial-123",
MinorID: "0",
BoardID: 7,
VBIOSVersion: "96.00.68.00.01",
ChassisSN: "chassis-123",
},
},
},
},
}

converter := NewOTLPConverter()
otlpData := converter.Convert(data)

attrs := otlpData.Metrics.ResourceMetrics[0].Resource.Attributes
gpus := findAttribute(t, attrs, "gpuInfo.gpus").GetStringValue()
assert.JSONEq(t, `[{"uuid":"GPU-123","gpuIndex":"0","busID":"0000:01:00.0","sn":"serial-123","minorID":"0","boardID":7,"vbiosVersion":"96.00.68.00.01","chassisSN":"chassis-123"}]`, gpus)

for _, attr := range attrs {
assert.NotContains(t, []string{
"gpuInfo.product",
"gpuInfo.manufacturer",
"gpuInfo.architecture",
"gpuInfo.memory",
}, attr.Key)
}
}

func TestOTLPConverter_Interface(t *testing.T) {
// Verify otlpConverter implements OTLPConverter interface
var _ OTLPConverter = (*otlpConverter)(nil)
Expand Down
1 change: 1 addition & 0 deletions internal/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func New(ctx context.Context, opts ...ExporterOption) (Exporter, error) {
options.componentsRegistry,
options.machineID,
options.dcgmGPUIndexes,
collector.WithNVMLInstance(options.nvmlInstance),
)

otlpConverter := converter.NewOTLPConverter()
Expand Down
10 changes: 10 additions & 0 deletions internal/exporter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/NVIDIA/fleet-intelligence-sdk/components"
"github.com/NVIDIA/fleet-intelligence-sdk/pkg/eventstore"
pkgmetrics "github.com/NVIDIA/fleet-intelligence-sdk/pkg/metrics"
nvidianvml "github.com/NVIDIA/fleet-intelligence-sdk/pkg/nvidia-query/nvml"

"github.com/NVIDIA/fleet-intelligence-agent/internal/config"
)
Expand All @@ -40,6 +41,7 @@ type exporterOptions struct {
metricsStore pkgmetrics.Store
eventStore eventstore.Store
componentsRegistry components.Registry
nvmlInstance nvidianvml.Instance
httpClient *http.Client
timeout time.Duration
dbRW *sql.DB // Read-write database connection
Expand Down Expand Up @@ -84,6 +86,14 @@ func WithComponentsRegistry(registry components.Registry) ExporterOption {
}
}

// WithNVMLInstance sets the NVML instance used for cached machine-info collection.
func WithNVMLInstance(instance nvidianvml.Instance) ExporterOption {
return func(c *exporterOptions) error {
c.nvmlInstance = instance
return nil
}
}

// WithHTTPClient sets a custom HTTP client
func WithHTTPClient(client *http.Client) ExporterOption {
return func(c *exporterOptions) error {
Expand Down
1 change: 1 addition & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func New(ctx context.Context, auditLogger log.AuditLogger, config *config.Config
exporter.WithMetricsStore(metricsSQLiteStore),
exporter.WithEventStore(eventStore),
exporter.WithComponentsRegistry(s.componentsRegistry),
exporter.WithNVMLInstance(nvmlInstance),
exporter.WithDatabaseConnections(dbRW, dbRO),
exporter.WithMachineID(machineID),
exporter.WithDCGMGPUIndexes(dcgmGPUIndexes),
Expand Down
Loading