From da468c30a50852684174562cf320c05876e534f1 Mon Sep 17 00:00:00 2001 From: Jingxiang Zhang Date: Tue, 19 May 2026 15:24:58 -0700 Subject: [PATCH 1/3] fix: preserve counter metric types in OTLP Signed-off-by: Jingxiang Zhang --- internal/exporter/converter/otlp.go | 55 +++-- internal/exporter/converter/otlp_test.go | 54 +++++ .../accelerator/nvidia/dcgm/mem/metrics.go | 40 ++-- .../accelerator/nvidia/dcgm/nvlink/metrics.go | 60 ++--- .../accelerator/nvidia/dcgm/pcie/metrics.go | 4 +- .../accelerator/nvidia/dcgm/power/metrics.go | 12 +- .../nvidia/dcgm/thermal/metrics.go | 5 +- .../accelerator/nvidia/infiniband/metrics.go | 4 +- .../components/network/ethernet/metrics.go | 32 +-- .../pkg/metrics/scraper/prometheus.go | 2 + .../pkg/metrics/scraper/prometheus_test.go | 5 + .../pkg/metrics/settable_const_metric.go | 217 ++++++++++++++++++ .../pkg/metrics/settable_const_metric_test.go | 79 +++++++ .../pkg/metrics/store/sqlite.go | 67 +++++- .../pkg/metrics/store/sqlite_test.go | 45 ++++ .../pkg/metrics/types.go | 9 + 16 files changed, 584 insertions(+), 106 deletions(-) create mode 100644 third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric.go create mode 100644 third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric_test.go diff --git a/internal/exporter/converter/otlp.go b/internal/exporter/converter/otlp.go index 08c22cbd..5ddde529 100644 --- a/internal/exporter/converter/otlp.go +++ b/internal/exporter/converter/otlp.go @@ -25,6 +25,7 @@ import ( "time" apiv1 "github.com/NVIDIA/fleet-intelligence-sdk/api/v1" + pkgmetrics "github.com/NVIDIA/fleet-intelligence-sdk/pkg/metrics" commonv1 "go.opentelemetry.io/proto/otlp/common/v1" logsv1 "go.opentelemetry.io/proto/otlp/logs/v1" metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" @@ -152,25 +153,7 @@ func (c *otlpConverter) convertMetricsToOTLP(data *collector.HealthData) []*metr // Convert regular metrics if available if len(data.Metrics) > 0 { for _, metric := range data.Metrics { - otlpMetric := &metricsv1.Metric{ - Name: metric.Name, - Description: fmt.Sprintf("Metric from component %s", metric.Component), - Unit: "1", - Data: &metricsv1.Metric_Gauge{ - Gauge: &metricsv1.Gauge{ - DataPoints: []*metricsv1.NumberDataPoint{ - { - TimeUnixNano: uint64(metric.UnixMilliseconds) * 1_000_000, - Value: &metricsv1.NumberDataPoint_AsDouble{ - AsDouble: metric.Value, - }, - Attributes: c.convertLabelsToOTLPAttributes(metric.Labels, gpuUUIDToIndex), - }, - }, - }, - }, - } - otlpMetrics = append(otlpMetrics, otlpMetric) + otlpMetrics = append(otlpMetrics, c.convertMetricToOTLP(metric, gpuUUIDToIndex)) } } @@ -236,6 +219,40 @@ func (c *otlpConverter) convertMetricsToOTLP(data *collector.HealthData) []*metr return otlpMetrics } +func (c *otlpConverter) convertMetricToOTLP(metric pkgmetrics.Metric, gpuUUIDToIndex map[string]string) *metricsv1.Metric { + dataPoint := &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(metric.UnixMilliseconds) * 1_000_000, + Value: &metricsv1.NumberDataPoint_AsDouble{ + AsDouble: metric.Value, + }, + Attributes: c.convertLabelsToOTLPAttributes(metric.Labels, gpuUUIDToIndex), + } + + otlpMetric := &metricsv1.Metric{ + Name: metric.Name, + Description: fmt.Sprintf("Metric from component %s", metric.Component), + Unit: "1", + } + + if metric.Type == pkgmetrics.MetricTypeCounter { + otlpMetric.Data = &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: true, + DataPoints: []*metricsv1.NumberDataPoint{dataPoint}, + }, + } + return otlpMetric + } + + otlpMetric.Data = &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{dataPoint}, + }, + } + return otlpMetric +} + // convertLabelsToOTLPAttributes converts metric labels to OTLP attributes. // If the labels contain a "uuid" key but no "gpu" key, it enriches the // attributes with the GPU index looked up from the machine info mapping. diff --git a/internal/exporter/converter/otlp_test.go b/internal/exporter/converter/otlp_test.go index 92d2f192..6b960483 100644 --- a/internal/exporter/converter/otlp_test.go +++ b/internal/exporter/converter/otlp_test.go @@ -99,6 +99,51 @@ func TestOTLPConverter_Convert_WithMetrics(t *testing.T) { assert.Contains(t, metrics[0].Description, "gpu") } +func TestOTLPConverter_Convert_CounterMetricsBecomeCumulativeSums(t *testing.T) { + data := &collector.HealthData{ + Timestamp: time.Now(), + MachineID: "test-machine", + Metrics: metrics.Metrics{ + { + Component: "gpu", + Name: "dcgm_fi_dev_pcie_replay_counter", + Type: metrics.MetricTypeCounter, + UnixMilliseconds: 1699200000000, + Value: 42, + Labels: map[string]string{"uuid": "GPU-0", "gpu": "0"}, + }, + { + Component: "gpu", + Name: "dcgm_fi_dev_gpu_temp", + Type: metrics.MetricTypeGauge, + UnixMilliseconds: 1699200001000, + Value: 65, + Labels: map[string]string{"uuid": "GPU-0", "gpu": "0"}, + }, + }, + } + + converter := NewOTLPConverter() + otlpData := converter.Convert(data) + + convertedMetrics := otlpData.Metrics.ResourceMetrics[0].ScopeMetrics[0].Metrics + counterMetric := findOTLPMetric(convertedMetrics, "dcgm_fi_dev_pcie_replay_counter") + require.NotNil(t, counterMetric) + sum := counterMetric.GetSum() + require.NotNil(t, sum) + assert.True(t, sum.IsMonotonic) + assert.Equal(t, metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, sum.AggregationTemporality) + require.Len(t, sum.DataPoints, 1) + assert.Equal(t, 42.0, sum.DataPoints[0].GetAsDouble()) + + gaugeMetric := findOTLPMetric(convertedMetrics, "dcgm_fi_dev_gpu_temp") + require.NotNil(t, gaugeMetric) + gauge := gaugeMetric.GetGauge() + require.NotNil(t, gauge) + require.Len(t, gauge.DataPoints, 1) + assert.Equal(t, 65.0, gauge.DataPoints[0].GetAsDouble()) +} + func TestOTLPConverter_Convert_WithEvents(t *testing.T) { data := &collector.HealthData{ Timestamp: time.Now(), @@ -745,6 +790,15 @@ func TestOTLPConverter_UpMetric(t *testing.T) { assert.Empty(t, point.Attributes) } +func findOTLPMetric(metrics []*metricsv1.Metric, name string) *metricsv1.Metric { + for _, metric := range metrics { + if metric.Name == name { + return metric + } + } + return nil +} + func TestOTLPConverter_ResourceAttributes(t *testing.T) { data := &collector.HealthData{ Timestamp: time.Now(), diff --git a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/mem/metrics.go b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/mem/metrics.go index 2cbf8184..e09a7b1d 100644 --- a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/mem/metrics.go +++ b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/mem/metrics.go @@ -83,8 +83,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevUncorrectableRemappedRows = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevUncorrectableRemappedRows = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_uncorrectable_remapped_rows", @@ -93,8 +93,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevCorrectableRemappedRows = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevCorrectableRemappedRows = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_correctable_remapped_rows", @@ -133,8 +133,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCSBEVolTotal = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCSBEVolTotal = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_ecc_sbe_vol_total", @@ -143,8 +143,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCDBEVolTotal = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCDBEVolTotal = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_ecc_dbe_vol_total", @@ -153,8 +153,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCSBEAggTotal = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCSBEAggTotal = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_ecc_sbe_agg_total", @@ -163,8 +163,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCDBAggTotal = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCDBAggTotal = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_ecc_dbe_agg_total", @@ -173,32 +173,32 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCSBEVolDev = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCSBEVolDev = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Name: "dcgm_fi_dev_ecc_sbe_vol_dev", Help: "Device memory single bit volatile ECC errors.", }, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCDBEVolDev = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCDBEVolDev = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Name: "dcgm_fi_dev_ecc_dbe_vol_dev", Help: "Device memory double bit volatile ECC errors.", }, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCSBEAggDev = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCSBEAggDev = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Name: "dcgm_fi_dev_ecc_sbe_agg_dev", Help: "Device memory single bit aggregate (persistent) ECC errors. Note: monotonically increasing.", }, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevECCDBEAggDev = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevECCDBEAggDev = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Name: "dcgm_fi_dev_ecc_dbe_agg_dev", Help: "Device memory double bit aggregate (persistent) ECC errors. Note: monotonically increasing.", }, diff --git a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/nvlink/metrics.go b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/nvlink/metrics.go index 4107f3f5..be4ed8b1 100644 --- a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/nvlink/metrics.go +++ b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/nvlink/metrics.go @@ -61,8 +61,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkErrorDLCrc = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevNvlinkErrorDLCrc = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_nvlink_error_dl_crc", @@ -71,8 +71,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkErrorDLRecovery = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevNvlinkErrorDLRecovery = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_nvlink_error_dl_recovery", @@ -81,8 +81,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkErrorDLReplay = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevNvlinkErrorDLReplay = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_nvlink_error_dl_replay", @@ -91,8 +91,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountLinkRecoverySuccessfulEvents = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevNvlinkCountLinkRecoverySuccessfulEvents = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_nvlink_count_link_recovery_successful_events", @@ -101,8 +101,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountLinkRecoveryFailedEvents = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevNvlinkCountLinkRecoveryFailedEvents = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_nvlink_count_link_recovery_failed_events", @@ -115,40 +115,40 @@ var ( prometheus.GaugeOpts{Name: "dcgm_fi_dev_fabric_manager_status", Help: "The status of the fabric manager - a value from dcgmFabricManagerStatus_t"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevC2CLinkErrorReplay = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_c2c_link_error_replay", Help: "C2C Link Replay Error Counter"}, + metricDCGMFIDevC2CLinkErrorReplay = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_c2c_link_error_replay", Help: "C2C Link Replay Error Counter"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountRxGeneralErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_rx_general_errors", Help: "Total number of packets Rx with header mismatch"}, + metricDCGMFIDevNvlinkCountRxGeneralErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_rx_general_errors", Help: "Total number of packets Rx with header mismatch"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountRxErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_rx_errors", Help: "Total number of packets with errors Rx on a link"}, + metricDCGMFIDevNvlinkCountRxErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_rx_errors", Help: "Total number of packets with errors Rx on a link"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountRxMalformedPacketErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_rx_malformed_packet_errors", Help: "Number of packets Rx on a link where packets are malformed"}, + metricDCGMFIDevNvlinkCountRxMalformedPacketErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_rx_malformed_packet_errors", Help: "Number of packets Rx on a link where packets are malformed"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountRxRemoteErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_rx_remote_errors", Help: "Total number of packets Rx - stomp/EBP marker"}, + metricDCGMFIDevNvlinkCountRxRemoteErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_rx_remote_errors", Help: "Total number of packets Rx - stomp/EBP marker"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountRxSymbolErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_rx_symbol_errors", Help: "Number of errors in rx symbols"}, + metricDCGMFIDevNvlinkCountRxSymbolErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_rx_symbol_errors", Help: "Number of errors in rx symbols"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountRxBufferOverrunErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_rx_buffer_overrun_errors", Help: "Number of packets that were discarded on Rx due to buffer overrun"}, + metricDCGMFIDevNvlinkCountRxBufferOverrunErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_rx_buffer_overrun_errors", Help: "Number of packets that were discarded on Rx due to buffer overrun"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountLocalLinkIntegrityErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_local_link_integrity_errors", Help: "Total number of times that the count of local errors exceeded a threshold"}, + metricDCGMFIDevNvlinkCountLocalLinkIntegrityErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_local_link_integrity_errors", Help: "Total number of times that the count of local errors exceeded a threshold"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountEffectiveErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_effective_errors", Help: "Sum of the number of errors in each Nvlink packet"}, + metricDCGMFIDevNvlinkCountEffectiveErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_effective_errors", Help: "Sum of the number of errors in each Nvlink packet"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) metricDCGMFIDevNvlinkCountEffectiveBERFloat = prometheus.NewGaugeVec( @@ -159,8 +159,8 @@ var ( prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_symbol_ber_float", Help: "BER for symbol errors - decoded float value"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevNvlinkCountTxDiscards = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_nvlink_count_tx_discards", Help: "Total number of tx error packets that were discarded"}, + metricDCGMFIDevNvlinkCountTxDiscards = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_nvlink_count_tx_discards", Help: "Total number of tx error packets that were discarded"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) ) diff --git a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/pcie/metrics.go b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/pcie/metrics.go index 6decc753..ebd2b8d3 100644 --- a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/pcie/metrics.go +++ b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/pcie/metrics.go @@ -33,8 +33,8 @@ var ( pkgmetrics.MetricComponentLabelKey: Name, } - metricDCGMFIDevPCIeReplayCounter = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricDCGMFIDevPCIeReplayCounter = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: "", Name: "dcgm_fi_dev_pcie_replay_counter", diff --git a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/power/metrics.go b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/power/metrics.go index edc79bd1..d386de03 100644 --- a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/power/metrics.go +++ b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/power/metrics.go @@ -57,18 +57,18 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevPowerViolation = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_power_violation", Help: "Power Violation time in ns"}, + metricDCGMFIDevPowerViolation = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_power_violation", Help: "Power Violation time in ns"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevReliabilityViolation = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_reliability_violation", Help: "Reliability violation limit"}, + metricDCGMFIDevReliabilityViolation = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_reliability_violation", Help: "Reliability violation limit"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevBoardLimitViolation = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_board_limit_violation", Help: "Board violation limit"}, + metricDCGMFIDevBoardLimitViolation = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_board_limit_violation", Help: "Board violation limit"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) ) diff --git a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/thermal/metrics.go b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/thermal/metrics.go index d70a5492..c711ba8e 100644 --- a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/thermal/metrics.go +++ b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/thermal/metrics.go @@ -67,8 +67,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - metricDCGMFIDevThermalViolation = prometheus.NewGaugeVec( - prometheus.GaugeOpts{Name: "dcgm_fi_dev_thermal_violation", Help: "Thermal Violation time in ns"}, + metricDCGMFIDevThermalViolation = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{Name: "dcgm_fi_dev_thermal_violation", Help: "Thermal Violation time in ns"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) @@ -76,7 +76,6 @@ var ( prometheus.GaugeOpts{Name: "dcgm_fi_dev_gpu_temp_limit", Help: "Thermal margin temperature (distance to nearest slowdown threshold) for this GPU"}, []string{pkgmetrics.MetricComponentLabelKey, "uuid", "gpu"}, ).MustCurryWith(componentLabel) - ) func init() { diff --git a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/infiniband/metrics.go b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/infiniband/metrics.go index f940fcc8..b65a5c28 100644 --- a/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/infiniband/metrics.go +++ b/third_party/fleet-intelligence-sdk/components/accelerator/nvidia/infiniband/metrics.go @@ -16,8 +16,8 @@ var ( pkgmetrics.MetricComponentLabelKey: Name, } - metricIbLinkedDowned = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricIbLinkedDowned = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "link_downed", diff --git a/third_party/fleet-intelligence-sdk/components/network/ethernet/metrics.go b/third_party/fleet-intelligence-sdk/components/network/ethernet/metrics.go index 26f5f8e6..a95ce502 100644 --- a/third_party/fleet-intelligence-sdk/components/network/ethernet/metrics.go +++ b/third_party/fleet-intelligence-sdk/components/network/ethernet/metrics.go @@ -16,8 +16,8 @@ var ( pkgmetrics.MetricComponentLabelKey: Name, } - metricRxBytes = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricRxBytes = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "rx_bytes", @@ -26,8 +26,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "interface"}, ).MustCurryWith(componentLabel) - metricRxPackets = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricRxPackets = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "rx_packets", @@ -36,8 +36,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "interface"}, ).MustCurryWith(componentLabel) - metricRxErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricRxErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "rx_errors", @@ -46,8 +46,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "interface"}, ).MustCurryWith(componentLabel) - metricRxDropped = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricRxDropped = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "rx_dropped", @@ -56,8 +56,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "interface"}, ).MustCurryWith(componentLabel) - metricTxBytes = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricTxBytes = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "tx_bytes", @@ -66,8 +66,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "interface"}, ).MustCurryWith(componentLabel) - metricTxPackets = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricTxPackets = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "tx_packets", @@ -76,8 +76,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "interface"}, ).MustCurryWith(componentLabel) - metricTxErrors = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricTxErrors = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "tx_errors", @@ -86,8 +86,8 @@ var ( []string{pkgmetrics.MetricComponentLabelKey, "interface"}, ).MustCurryWith(componentLabel) - metricTxDropped = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + metricTxDropped = pkgmetrics.NewSettableCounterVec( + prometheus.CounterOpts{ Namespace: "", Subsystem: SubSystem, Name: "tx_dropped", diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go index 6236367f..be6e7886 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go @@ -71,8 +71,10 @@ func (s *promScraper) Scrape(_ context.Context) (pkgmetrics.Metrics, error) { // for now, only support counter and gauge switch { case mtRaw.GetCounter() != nil: + m.Type = pkgmetrics.MetricTypeCounter m.Value = mtRaw.GetCounter().GetValue() case mtRaw.GetGauge() != nil: + m.Type = pkgmetrics.MetricTypeGauge m.Value = mtRaw.GetGauge().GetValue() } diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go index 5ccd49f0..13b37f08 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go @@ -89,16 +89,19 @@ func TestPrometheusScraper(t *testing.T) { require.Equal(t, "component-1", ms[0].Component) require.Equal(t, "sqlite_insert_update_total", ms[0].Name) + require.Equal(t, pkgmetrics.MetricTypeCounter, ms[0].Type) require.Nil(t, ms[0].Labels, "Expected no labels for the first metric") require.Equal(t, float64(1), ms[0].Value) require.Equal(t, "component-1", ms[1].Component) require.Equal(t, "test_current_celsius", ms[1].Name) + require.Equal(t, pkgmetrics.MetricTypeGauge, ms[1].Type) require.Equal(t, "GPU-0", ms[1].Labels["label_uuid"]) require.Equal(t, float64(100), ms[1].Value) require.Equal(t, "component-1", ms[2].Component) require.Equal(t, "test_slowdown_used_percent", ms[2].Name) + require.Equal(t, pkgmetrics.MetricTypeGauge, ms[2].Type) require.Equal(t, "GPU-0", ms[2].Labels["label_uuid"]) require.Equal(t, float64(98), ms[2].Value) } @@ -263,10 +266,12 @@ func TestPrometheusScraper_MultipleMetricTypes(t *testing.T) { for _, m := range ms { if m.Name == "test_operations_total" && m.Component == "component1" { foundCounter = true + require.Equal(t, pkgmetrics.MetricTypeCounter, m.Type) require.Equal(t, float64(1), m.Value) } if m.Name == "test_resources_utilization" && m.Component == "component1" { foundGauge = true + require.Equal(t, pkgmetrics.MetricTypeGauge, m.Type) if labels := m.Labels; labels != nil { require.Equal(t, "resource1", labels["label_uuid"]) } diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric.go b/third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric.go new file mode 100644 index 00000000..09972590 --- /dev/null +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric.go @@ -0,0 +1,217 @@ +// Copyright 2024 Lepton AI Inc +// Source: https://github.com/leptonai/gpud + +package metrics + +import ( + "fmt" + "sort" + "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// SettableConstMetricVec stores externally-collected absolute values and emits +// them as constant Prometheus metrics with the configured ValueType. +type SettableConstMetricVec struct { + opts prometheus.Opts + desc *prometheus.Desc + valueType prometheus.ValueType + variableLabels []string + constLabels prometheus.Labels + + mu sync.RWMutex + samples map[string]settableConstMetricSample +} + +type settableConstMetricSample struct { + labelValues []string + value float64 +} + +// NewSettableCounterVec creates a collector for counters whose absolute values +// are read from an external source. +func NewSettableCounterVec(opts prometheus.CounterOpts, variableLabels []string) *SettableConstMetricVec { + return newSettableConstMetricVec(prometheus.Opts(opts), prometheus.CounterValue, variableLabels, nil) +} + +func newSettableConstMetricVec(opts prometheus.Opts, valueType prometheus.ValueType, variableLabels []string, constLabels prometheus.Labels) *SettableConstMetricVec { + copiedVariableLabels := append([]string(nil), variableLabels...) + copiedConstLabels := mergeLabels(opts.ConstLabels, constLabels) + + return &SettableConstMetricVec{ + opts: opts, + desc: newConstMetricDesc(opts, copiedVariableLabels, copiedConstLabels), + valueType: valueType, + variableLabels: copiedVariableLabels, + constLabels: copiedConstLabels, + samples: make(map[string]settableConstMetricSample), + } +} + +// MustCurryWith returns a collector with the provided labels emitted as constant +// labels. It panics if any provided label is not one of the variable labels. +func (v *SettableConstMetricVec) MustCurryWith(labels prometheus.Labels) *SettableConstMetricVec { + if v == nil { + panic("nil SettableConstMetricVec") + } + + curriedLabels := copyLabels(v.constLabels) + if curriedLabels == nil { + curriedLabels = make(prometheus.Labels, len(labels)) + } + for name, value := range labels { + if !containsLabel(v.variableLabels, name) { + panic(fmt.Sprintf("unknown label %q", name)) + } + curriedLabels[name] = value + } + + var remainingLabels []string + for _, name := range v.variableLabels { + if _, ok := labels[name]; ok { + continue + } + remainingLabels = append(remainingLabels, name) + } + + return newSettableConstMetricVec(v.opts, v.valueType, remainingLabels, curriedLabels) +} + +// With returns a handle for setting one labeled metric sample. +func (v *SettableConstMetricVec) With(labels prometheus.Labels) *SettableConstMetric { + if v == nil { + panic("nil SettableConstMetricVec") + } + + labelValues := make([]string, 0, len(v.variableLabels)) + for _, name := range v.variableLabels { + value, ok := labels[name] + if !ok { + panic(fmt.Sprintf("missing label %q", name)) + } + labelValues = append(labelValues, value) + } + for name := range labels { + if !containsLabel(v.variableLabels, name) { + panic(fmt.Sprintf("unknown label %q", name)) + } + } + + return &SettableConstMetric{ + vec: v, + labelValues: labelValues, + key: strings.Join(labelValues, "\xff"), + } +} + +// Delete removes the sample for the provided label set. +func (v *SettableConstMetricVec) Delete(labels prometheus.Labels) bool { + metric := v.With(labels) + + v.mu.Lock() + defer v.mu.Unlock() + + if _, ok := v.samples[metric.key]; !ok { + return false + } + delete(v.samples, metric.key) + return true +} + +// Reset removes all samples from the collector. +func (v *SettableConstMetricVec) Reset() { + v.mu.Lock() + defer v.mu.Unlock() + v.samples = make(map[string]settableConstMetricSample) +} + +// Describe implements prometheus.Collector. +func (v *SettableConstMetricVec) Describe(ch chan<- *prometheus.Desc) { + ch <- v.desc +} + +// Collect implements prometheus.Collector. +func (v *SettableConstMetricVec) Collect(ch chan<- prometheus.Metric) { + v.mu.RLock() + keys := make([]string, 0, len(v.samples)) + for key := range v.samples { + keys = append(keys, key) + } + sort.Strings(keys) + + samples := make([]settableConstMetricSample, 0, len(keys)) + for _, key := range keys { + samples = append(samples, v.samples[key]) + } + v.mu.RUnlock() + + for _, sample := range samples { + metric, err := prometheus.NewConstMetric(v.desc, v.valueType, sample.value, sample.labelValues...) + if err != nil { + ch <- prometheus.NewInvalidMetric(v.desc, err) + continue + } + ch <- metric + } +} + +// SettableConstMetric is a handle for one labeled sample. +type SettableConstMetric struct { + vec *SettableConstMetricVec + labelValues []string + key string +} + +// Set stores the current absolute value for this sample. +func (m *SettableConstMetric) Set(value float64) { + m.vec.mu.Lock() + defer m.vec.mu.Unlock() + m.vec.samples[m.key] = settableConstMetricSample{ + labelValues: append([]string(nil), m.labelValues...), + value: value, + } +} + +func newConstMetricDesc(opts prometheus.Opts, variableLabels []string, constLabels prometheus.Labels) *prometheus.Desc { + return prometheus.NewDesc( + prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + opts.Help, + variableLabels, + constLabels, + ) +} + +func containsLabel(labels []string, name string) bool { + for _, label := range labels { + if label == name { + return true + } + } + return false +} + +func copyLabels(labels prometheus.Labels) prometheus.Labels { + if labels == nil { + return nil + } + copied := make(prometheus.Labels, len(labels)) + for name, value := range labels { + copied[name] = value + } + return copied +} + +func mergeLabels(labelSets ...prometheus.Labels) prometheus.Labels { + var merged prometheus.Labels + for _, labels := range labelSets { + for name, value := range labels { + if merged == nil { + merged = make(prometheus.Labels) + } + merged[name] = value + } + } + return merged +} diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric_test.go b/third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric_test.go new file mode 100644 index 00000000..b5434a38 --- /dev/null +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/settable_const_metric_test.go @@ -0,0 +1,79 @@ +// Copyright 2024 Lepton AI Inc +// Source: https://github.com/leptonai/gpud + +package metrics + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func TestSettableCounterVecEmitsCounterMetric(t *testing.T) { + registry := prometheus.NewRegistry() + counter := NewSettableCounterVec( + prometheus.CounterOpts{ + Name: "external_absolute_total", + Help: "external absolute counter", + }, + []string{MetricComponentLabelKey, "device"}, + ).MustCurryWith(prometheus.Labels{MetricComponentLabelKey: "component1"}) + + if err := registry.Register(counter); err != nil { + t.Fatalf("Register() failed: %v", err) + } + + counter.With(prometheus.Labels{"device": "gpu0"}).Set(42) + + families, err := registry.Gather() + if err != nil { + t.Fatalf("Gather() failed: %v", err) + } + + family := findMetricFamily(families, "external_absolute_total") + if family == nil { + t.Fatalf("metric family was not gathered") + } + if family.GetType() != dto.MetricType_COUNTER { + t.Fatalf("metric type = %v, want %v", family.GetType(), dto.MetricType_COUNTER) + } + if got := family.GetMetric()[0].GetCounter().GetValue(); got != 42 { + t.Fatalf("counter value = %v, want 42", got) + } +} + +func TestSettableCounterVecResetAndDelete(t *testing.T) { + counter := NewSettableCounterVec( + prometheus.CounterOpts{ + Name: "resettable_external_absolute_total", + Help: "external absolute counter", + }, + []string{"device"}, + ) + + labels := prometheus.Labels{"device": "gpu0"} + counter.With(labels).Set(42) + + if !counter.Delete(labels) { + t.Fatalf("Delete() = false, want true") + } + if counter.Delete(labels) { + t.Fatalf("Delete() after delete = true, want false") + } + + counter.With(labels).Set(43) + counter.Reset() + if counter.Delete(labels) { + t.Fatalf("Delete() after reset = true, want false") + } +} + +func findMetricFamily(families []*dto.MetricFamily, name string) *dto.MetricFamily { + for _, family := range families { + if family.GetName() == name { + return family + } + } + return nil +} diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go index fc378219..b8c67363 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go @@ -43,6 +43,9 @@ const ( // columnMetricValue represents the numeric value of the metric. columnMetricValue = "metric_value" + + // columnMetricType represents the Prometheus metric type. + columnMetricType = "metric_type" ) // TODO: drop the old table "gpud_metrics" @@ -99,13 +102,17 @@ CREATE TABLE IF NOT EXISTS %s ( %s TEXT NOT NULL, %s TEXT, %s REAL NOT NULL, + %s TEXT NOT NULL DEFAULT '%s', PRIMARY KEY (%s, %s, %s, %s) ) WITHOUT ROWID;`, table, - columnUnixMilliseconds, columnComponentName, columnMetricName, columnMetricLabels, columnMetricValue, // columns + columnUnixMilliseconds, columnComponentName, columnMetricName, columnMetricLabels, columnMetricValue, columnMetricType, pkgmetrics.MetricTypeGauge, // columns columnUnixMilliseconds, columnComponentName, columnMetricName, columnMetricLabels, // primary keys )) - return err + if err != nil { + return err + } + return ensureMetricTypeColumn(ctx, dbRW, table) } func insert(ctx context.Context, dbRW *sql.DB, table string, ms ...pkgmetrics.Metric) error { @@ -129,23 +136,24 @@ func insert(ctx context.Context, dbRW *sql.DB, table string, ms ...pkgmetrics.Me // Build the query with placeholders for all metrics query := fmt.Sprintf( - "INSERT OR REPLACE INTO %s (%s, %s, %s, %s, %s) VALUES ", + "INSERT OR REPLACE INTO %s (%s, %s, %s, %s, %s, %s) VALUES ", table, columnUnixMilliseconds, columnComponentName, columnMetricName, columnMetricLabels, columnMetricValue, + columnMetricType, ) // Create proper placeholders with commas between value sets placeholders := make([]string, len(ms)) for i := range placeholders { - placeholders[i] = "(?, ?, ?, ?, ?)" + placeholders[i] = "(?, ?, ?, ?, ?, ?)" } query += strings.Join(placeholders, ", ") - args := make([]interface{}, 0, len(ms)*5) + args := make([]interface{}, 0, len(ms)*6) for _, m := range ms { labels := "" if len(m.Labels) > 0 { @@ -155,7 +163,11 @@ func insert(ctx context.Context, dbRW *sql.DB, table string, ms ...pkgmetrics.Me } labels = string(b) } - args = append(args, m.UnixMilliseconds, m.Component, m.Name, labels, m.Value) + metricType := m.Type + if metricType == "" { + metricType = pkgmetrics.MetricTypeGauge + } + args = append(args, m.UnixMilliseconds, m.Component, m.Name, labels, m.Value, metricType) } log.Logger.Infow("inserting metrics", "metrics", len(ms)) @@ -205,7 +217,7 @@ func read(ctx context.Context, dbRO *sql.DB, table string, opts ...pkgmetrics.Op whereStatement = fmt.Sprintf("WHERE %s", whereStatement) } - query := fmt.Sprintf(`SELECT %s, %s, %s, %s, %s + query := fmt.Sprintf(`SELECT %s, %s, %s, %s, %s, %s FROM %s `, columnUnixMilliseconds, @@ -213,6 +225,7 @@ FROM %s columnMetricName, columnMetricLabels, columnMetricValue, + columnMetricType, table, ) if whereStatement != "" { @@ -240,9 +253,12 @@ FROM %s for queryRows.Next() { m := pkgmetrics.Metric{} var labels sql.NullString - if err := queryRows.Scan(&m.UnixMilliseconds, &m.Component, &m.Name, &labels, &m.Value); err != nil { + if err := queryRows.Scan(&m.UnixMilliseconds, &m.Component, &m.Name, &labels, &m.Value, &m.Type); err != nil { return nil, err } + if m.Type == "" { + m.Type = pkgmetrics.MetricTypeGauge + } if labels.Valid && labels.String != "" { lm := make(map[string]string, 0) if err := json.Unmarshal([]byte(labels.String), &lm); err != nil { @@ -258,6 +274,41 @@ FROM %s return rows, nil } +func ensureMetricTypeColumn(ctx context.Context, dbRW *sql.DB, table string) error { + rows, err := dbRW.QueryContext(ctx, fmt.Sprintf("PRAGMA table_info(%s);", table)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var cid int + var name, columnType string + var notNull int + var defaultValue sql.NullString + var pk int + if err := rows.Scan(&cid, &name, &columnType, ¬Null, &defaultValue, &pk); err != nil { + return err + } + if name == columnMetricType { + return nil + } + } + if err := rows.Err(); err != nil { + return err + } + + _, err = dbRW.ExecContext(ctx, fmt.Sprintf( + "ALTER TABLE %s ADD COLUMN %s TEXT NOT NULL DEFAULT '%s';", + table, + columnMetricType, + pkgmetrics.MetricTypeGauge, + )) + return err +} + // purge purges the data for the corresponding component that is older // than the given time. func purge(ctx context.Context, dbRW *sql.DB, table string, before time.Time) (int, error) { diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go index 90593f35..68699916 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go @@ -34,6 +34,45 @@ func TestSQLiteNewStore(t *testing.T) { assert.Equal(t, ErrEmptyTableName, err) } +func TestSQLiteNewStore_AddsMetricTypeColumnToExistingTable(t *testing.T) { + dbRW, dbRO, cleanup := pkgsqlite.OpenTestDB(t) + defer cleanup() + + ctx := context.Background() + table := "test_metrics_legacy" + _, err := dbRW.ExecContext(ctx, fmt.Sprintf(` +CREATE TABLE %s ( + %s INTEGER NOT NULL, + %s TEXT NOT NULL, + %s TEXT NOT NULL, + %s TEXT, + %s REAL NOT NULL, + PRIMARY KEY (%s, %s, %s, %s) +) WITHOUT ROWID;`, + table, + columnUnixMilliseconds, columnComponentName, columnMetricName, columnMetricLabels, columnMetricValue, + columnUnixMilliseconds, columnComponentName, columnMetricName, columnMetricLabels, + )) + require.NoError(t, err) + + store, err := NewSQLiteStore(ctx, dbRW, dbRO, table) + require.NoError(t, err) + + err = store.Record(ctx, pkgmetrics.Metric{ + UnixMilliseconds: time.Now().UnixMilli(), + Component: "test-component", + Name: "counter_metric", + Type: pkgmetrics.MetricTypeCounter, + Value: 10, + }) + require.NoError(t, err) + + results, err := store.Read(ctx) + require.NoError(t, err) + require.Len(t, results, 1) + assert.Equal(t, pkgmetrics.MetricTypeCounter, results[0].Type) +} + func TestSQLiteStore_Record(t *testing.T) { // Setup test database dbRW, dbRO, cleanup := pkgsqlite.OpenTestDB(t) @@ -52,6 +91,7 @@ func TestSQLiteStore_Record(t *testing.T) { UnixMilliseconds: now, Component: "test-component", Name: "metric1", + Type: pkgmetrics.MetricTypeCounter, Value: 42.0, }, { @@ -90,6 +130,7 @@ func TestSQLiteStore_Record(t *testing.T) { UnixMilliseconds: now, Component: "test-component", Name: "metric1", + Type: pkgmetrics.MetricTypeCounter, Value: 99.9, } err = store.Record(ctx, updatedMetric) @@ -102,8 +143,12 @@ func TestSQLiteStore_Record(t *testing.T) { for _, m := range results { if m.Component == "test-component" && m.Name == "metric1" { assert.Equal(t, 99.9, m.Value) + assert.Equal(t, pkgmetrics.MetricTypeCounter, m.Type) found = true } + if m.Component == "test-component" && m.Name == "metric2" { + assert.Equal(t, pkgmetrics.MetricTypeGauge, m.Type) + } } assert.True(t, found, "Updated metric not found in results") } diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/types.go b/third_party/fleet-intelligence-sdk/pkg/metrics/types.go index f04ba4d7..759efc40 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/types.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/types.go @@ -11,6 +11,13 @@ import ( // MetricComponentLabelKey is the key for the component of the metric. const MetricComponentLabelKey = "gpud_component" +type MetricType string + +const ( + MetricTypeGauge MetricType = "gauge" + MetricTypeCounter MetricType = "counter" +) + // Metric represents a metric row in the database table. type Metric struct { // UnixMilliseconds represents the Unix timestamp of the metric. @@ -19,6 +26,8 @@ type Metric struct { Component string `json:"component"` // Name represents the name of the metric. Name string `json:"name"` + // Type represents the Prometheus metric type. + Type MetricType `json:"type,omitempty"` // Value represents the numeric value of the metric. Value float64 `json:"value"` From 0b61d83e870686ddb927aae8b00ffb9f15b268db Mon Sep 17 00:00:00 2001 From: Jingxiang Zhang Date: Tue, 19 May 2026 15:30:04 -0700 Subject: [PATCH 2/3] fix: omit OTLP metric units Signed-off-by: Jingxiang Zhang --- internal/exporter/converter/otlp.go | 3 --- internal/exporter/converter/otlp_test.go | 4 +++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/exporter/converter/otlp.go b/internal/exporter/converter/otlp.go index 5ddde529..ba5e512f 100644 --- a/internal/exporter/converter/otlp.go +++ b/internal/exporter/converter/otlp.go @@ -161,7 +161,6 @@ func (c *otlpConverter) convertMetricsToOTLP(data *collector.HealthData) []*metr summaryMetric := &metricsv1.Metric{ Name: "fleetint_agent_collection_summary", Description: "Summary of Fleet Intelligence data collection including counts of metrics, events, and components", - Unit: "1", Data: &metricsv1.Metric_Gauge{ Gauge: &metricsv1.Gauge{ DataPoints: []*metricsv1.NumberDataPoint{ @@ -200,7 +199,6 @@ func (c *otlpConverter) convertMetricsToOTLP(data *collector.HealthData) []*metr upMetric := &metricsv1.Metric{ Name: "fleetint_agent_up", Description: "Fleet Intelligence agent liveness. A value of 1 indicates the agent was running when telemetry was exported.", - Unit: "1", Data: &metricsv1.Metric_Gauge{ Gauge: &metricsv1.Gauge{ DataPoints: []*metricsv1.NumberDataPoint{ @@ -231,7 +229,6 @@ func (c *otlpConverter) convertMetricToOTLP(metric pkgmetrics.Metric, gpuUUIDToI otlpMetric := &metricsv1.Metric{ Name: metric.Name, Description: fmt.Sprintf("Metric from component %s", metric.Component), - Unit: "1", } if metric.Type == pkgmetrics.MetricTypeCounter { diff --git a/internal/exporter/converter/otlp_test.go b/internal/exporter/converter/otlp_test.go index 6b960483..80588680 100644 --- a/internal/exporter/converter/otlp_test.go +++ b/internal/exporter/converter/otlp_test.go @@ -129,6 +129,7 @@ func TestOTLPConverter_Convert_CounterMetricsBecomeCumulativeSums(t *testing.T) convertedMetrics := otlpData.Metrics.ResourceMetrics[0].ScopeMetrics[0].Metrics counterMetric := findOTLPMetric(convertedMetrics, "dcgm_fi_dev_pcie_replay_counter") require.NotNil(t, counterMetric) + assert.Empty(t, counterMetric.Unit) sum := counterMetric.GetSum() require.NotNil(t, sum) assert.True(t, sum.IsMonotonic) @@ -138,6 +139,7 @@ func TestOTLPConverter_Convert_CounterMetricsBecomeCumulativeSums(t *testing.T) gaugeMetric := findOTLPMetric(convertedMetrics, "dcgm_fi_dev_gpu_temp") require.NotNil(t, gaugeMetric) + assert.Empty(t, gaugeMetric.Unit) gauge := gaugeMetric.GetGauge() require.NotNil(t, gauge) require.Len(t, gauge.DataPoints, 1) @@ -778,7 +780,7 @@ func TestOTLPConverter_UpMetric(t *testing.T) { } require.NotNil(t, upMetric, "Should have fleetint_agent_up metric") - assert.Equal(t, "1", upMetric.Unit) + assert.Empty(t, upMetric.Unit) assert.Contains(t, upMetric.Description, "liveness") gauge := upMetric.Data.(*metricsv1.Metric_Gauge).Gauge From 18d01251b97b675de951c7da89412921f51f04a3 Mon Sep 17 00:00:00 2001 From: Jingxiang Zhang Date: Wed, 20 May 2026 10:54:18 -0700 Subject: [PATCH 3/3] fix: address metric review feedback Signed-off-by: Jingxiang Zhang --- .../pkg/metrics/scraper/prometheus.go | 2 + .../pkg/metrics/scraper/prometheus_test.go | 49 +++++++++++++++++-- .../pkg/metrics/store/sqlite.go | 22 ++++++++- .../pkg/metrics/store/sqlite_test.go | 10 ++++ 4 files changed, 78 insertions(+), 5 deletions(-) diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go index be6e7886..96cca597 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus.go @@ -76,6 +76,8 @@ func (s *promScraper) Scrape(_ context.Context) (pkgmetrics.Metrics, error) { case mtRaw.GetGauge() != nil: m.Type = pkgmetrics.MetricTypeGauge m.Value = mtRaw.GetGauge().GetValue() + default: + continue } ms = append(ms, m) diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go index 13b37f08..577af352 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/scraper/prometheus_test.go @@ -115,6 +115,14 @@ func (m *mockGathererWithError) Gather() ([]*dto.MetricFamily, error) { return nil, m.err } +type mockGatherer struct { + metricFamilies []*dto.MetricFamily +} + +func (m *mockGatherer) Gather() ([]*dto.MetricFamily, error) { + return m.metricFamilies, nil +} + func TestPrometheusScraper_GatherError(t *testing.T) { t.Parallel() @@ -137,6 +145,39 @@ func TestPrometheusScraper_GatherError(t *testing.T) { require.Nil(t, metrics) } +func TestPrometheusScraper_SkipsUnsupportedMetricKinds(t *testing.T) { + t.Parallel() + + scraper := &promScraper{ + gatherer: &mockGatherer{ + metricFamilies: []*dto.MetricFamily{ + { + Name: stringPtr("test_summary"), + Type: dto.MetricType_SUMMARY.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: stringPtr(pkgmetrics.MetricComponentLabelKey), + Value: stringPtr("component-1"), + }, + }, + Summary: &dto.Summary{}, + }, + }, + }, + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + metrics, err := scraper.Scrape(ctx) + require.NoError(t, err) + require.Empty(t, metrics) +} + func TestPrometheusScraper_NilGatherer(t *testing.T) { t.Parallel() @@ -154,6 +195,10 @@ func TestPrometheusScraper_NilGatherer(t *testing.T) { require.Nil(t, metrics) } +func stringPtr(s string) *string { + return &s +} + func TestPrometheusScraper_NilScraper(t *testing.T) { t.Parallel() @@ -257,9 +302,7 @@ func TestPrometheusScraper_MultipleMetricTypes(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, ms) - // Should have more than 4 metrics because histograms and summaries - // generate multiple underlying metrics - require.True(t, len(ms) >= 4, "Expected at least 4 metrics, got %d", len(ms)) + require.Len(t, ms, 2, "Only counter and gauge metrics should be returned") // Verify the counter and gauge are included var foundCounter, foundGauge bool diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go index b8c67363..d0518e73 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "fmt" + "regexp" "strings" "time" @@ -61,6 +62,8 @@ var ( var _ pkgmetrics.Store = &sqliteStore{} +var sqliteIdentifierRE = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) + type sqliteStore struct { dbRW *sql.DB dbRO *sql.DB @@ -275,7 +278,12 @@ FROM %s } func ensureMetricTypeColumn(ctx context.Context, dbRW *sql.DB, table string) error { - rows, err := dbRW.QueryContext(ctx, fmt.Sprintf("PRAGMA table_info(%s);", table)) + quotedTable, err := quoteSQLiteIdentifier(table) + if err != nil { + return err + } + + rows, err := dbRW.QueryContext(ctx, fmt.Sprintf("PRAGMA table_info(%s);", quotedTable)) if err != nil { return err } @@ -302,13 +310,23 @@ func ensureMetricTypeColumn(ctx context.Context, dbRW *sql.DB, table string) err _, err = dbRW.ExecContext(ctx, fmt.Sprintf( "ALTER TABLE %s ADD COLUMN %s TEXT NOT NULL DEFAULT '%s';", - table, + quotedTable, columnMetricType, pkgmetrics.MetricTypeGauge, )) return err } +func quoteSQLiteIdentifier(name string) (string, error) { + if name == "" { + return "", ErrEmptyTableName + } + if !sqliteIdentifierRE.MatchString(name) { + return "", fmt.Errorf("invalid sqlite identifier %q", name) + } + return `"` + name + `"`, nil +} + // purge purges the data for the corresponding component that is older // than the given time. func purge(ctx context.Context, dbRW *sql.DB, table string, before time.Time) (int, error) { diff --git a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go index 68699916..392e4931 100644 --- a/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go +++ b/third_party/fleet-intelligence-sdk/pkg/metrics/store/sqlite_test.go @@ -73,6 +73,16 @@ CREATE TABLE %s ( assert.Equal(t, pkgmetrics.MetricTypeCounter, results[0].Type) } +func TestEnsureMetricTypeColumnRejectsInvalidTableName(t *testing.T) { + dbRW, _, cleanup := pkgsqlite.OpenTestDB(t) + defer cleanup() + + ctx := context.Background() + err := ensureMetricTypeColumn(ctx, dbRW, `test_metrics; DROP TABLE test_metrics;`) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid sqlite identifier") +} + func TestSQLiteStore_Record(t *testing.T) { // Setup test database dbRW, dbRO, cleanup := pkgsqlite.OpenTestDB(t)