From 5a84a3cdde928e71d55cf6c97932105fbbca3795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Monteiro?= Date: Sun, 27 Jul 2025 15:59:15 +0100 Subject: [PATCH 1/3] Update docker image to go 1.22 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 031d3087..427ff6a7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.18 AS builder +FROM golang:1.22 AS builder ENV USER=app ENV UID=10001 From a0c530a66092975d112aeeb4ce56d8cf2dd55615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Monteiro?= Date: Sun, 27 Jul 2025 15:59:59 +0100 Subject: [PATCH 2/3] Add row filtering for show stats --- config/metric_config.go | 20 ++++ query.go | 215 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 231 insertions(+), 4 deletions(-) diff --git a/config/metric_config.go b/config/metric_config.go index 67a9b2ec..6f842ad5 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -24,6 +24,11 @@ type MetricConfig struct { StaticValue *float64 `yaml:"static_value,omitempty"` TimestampValue string `yaml:"timestamp_value,omitempty"` // optional column name containing a valid timestamp value + // SHOW STATS filtering and transformation features + RowFilters []RowFilter `yaml:"row_filters,omitempty"` // filter rows post-query + ColumnFilters []string `yaml:"column_filters,omitempty"` // include only these columns + LagCalculations []LagCalculation `yaml:"lag_calculations,omitempty"` // calculate time lag for timestamp fields + valueType prometheus.ValueType // TypeString converted to prometheus.ValueType query *QueryConfig // QueryConfig resolved from QueryRef or generated from Query @@ -31,6 +36,21 @@ type MetricConfig struct { XXX map[string]any `yaml:",inline" json:"-"` } +// RowFilter defines conditions to filter rows after query execution +type RowFilter struct { + Column string `yaml:"column"` // column name to filter on + Operator string `yaml:"operator"` // "equals", "in", "not_in", "contains", "not_equals" + Value string `yaml:"value,omitempty"` // single value for equals/not_equals/contains + Values []string `yaml:"values,omitempty"` // multiple values for in/not_in +} + +// LagCalculation defines how to calculate time lag from timestamp fields +type LagCalculation struct { + SourceColumn string `yaml:"source_column"` // column containing the timestamp (e.g., "high_value") + OutputColumn string `yaml:"output_column"` // new column name for the lag value (e.g., "lag_seconds") + TimestampFormat string `yaml:"timestamp_format,omitempty"` // format of timestamp, defaults to Trino format +} + // ValueType returns the metric type, converted to a prometheus.ValueType. func (m *MetricConfig) ValueType() prometheus.ValueType { return m.valueType diff --git a/query.go b/query.go index fed479ed..dc16a419 100644 --- a/query.go +++ b/query.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log/slog" + "strings" "time" "github.com/burningalchemist/sql_exporter/config" @@ -41,14 +42,35 @@ func NewQuery(logContext string, qc *config.QueryConfig, metricFamilies ...*Metr columnTypes := make(columnTypeMap) for _, mf := range metricFamilies { + // Create a map of output columns created by transformations + transformedColumns := make(map[string]bool) + for _, lagCalc := range mf.config.LagCalculations { + transformedColumns[lagCalc.OutputColumn] = true + // Add source columns to columnTypes since they're needed from SQL + // Use columnTypeKey since timestamp values are strings, not numbers + if err := setColumnType(logContext, lagCalc.SourceColumn, columnTypeKey, columnTypes); err != nil { + return nil, err + } + } + + // Add columns used in row filters + for _, filter := range mf.config.RowFilters { + if err := setColumnType(logContext, filter.Column, columnTypeKey, columnTypes); err != nil { + return nil, err + } + } + for _, kcol := range mf.config.KeyLabels { if err := setColumnType(logContext, kcol, columnTypeKey, columnTypes); err != nil { return nil, err } } for _, vcol := range mf.config.Values { - if err := setColumnType(logContext, vcol, columnTypeValue, columnTypes); err != nil { - return nil, err + // Skip columns that are created by transformations + if !transformedColumns[vcol] { + if err := setColumnType(logContext, vcol, columnTypeValue, columnTypes); err != nil { + return nil, err + } } } if mf.config.TimestampValue != "" { @@ -64,6 +86,14 @@ func NewQuery(logContext string, qc *config.QueryConfig, metricFamilies ...*Metr columnTypes: columnTypes, logContext: logContext, } + + // Debug logging to see what columns we're expecting + expectedColumns := make([]string, 0, len(columnTypes)) + for col := range columnTypes { + expectedColumns = append(expectedColumns, col) + } + slog.Debug("Expected columns from SQL", "logContext", logContext, "columns", expectedColumns) + return &q, nil } @@ -82,11 +112,13 @@ func setColumnType(logContext, columnName string, ctype columnType, columnTypes // Collect is the equivalent of prometheus.Collector.Collect() but takes a context to run in and a database to run on. func (q *Query) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) { + collectStart := time.Now() + if ctx.Err() != nil { ch <- NewInvalidMetric(errors.Wrap(q.logContext, ctx.Err())) - return } + rows, err := q.run(ctx, conn) if err != nil { ch <- NewInvalidMetric(err) @@ -103,19 +135,48 @@ func (q *Query) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) { ch <- NewInvalidMetric(err) return } + + totalRowsProcessed := 0 + totalRowsFiltered := 0 + metricsGenerated := 0 + for rows.Next() { + totalRowsProcessed++ + row, err := q.scanRow(rows, dest) if err != nil { ch <- NewInvalidMetric(err) continue } + + // Apply row filtering and transformations for each metric family for _, mf := range q.metricFamilies { - mf.Collect(row, ch) + // Apply row filters - skip row if it doesn't match + if !q.shouldIncludeRow(row, mf.config) { + totalRowsFiltered++ + continue + } + + // Apply lag calculations and other transformations + transformedRow := q.applyTransformations(row, mf.config) + + mf.Collect(transformedRow, ch) + metricsGenerated++ } } + if err1 := rows.Err(); err1 != nil { ch <- NewInvalidMetric(errors.Wrap(q.logContext, err1)) } + + // Log performance summary + slog.Debug("Query collection completed", + "logContext", q.logContext, + "duration_ms", time.Since(collectStart).Milliseconds(), + "rows_processed", totalRowsProcessed, + "rows_filtered", totalRowsFiltered, + "metrics_generated", metricsGenerated, + ) } // run executes the query on the provided database, in the provided context. @@ -230,3 +291,149 @@ func (q *Query) scanRow(rows *sql.Rows, dest []any) (map[string]any, errors.With } return result, nil } + +// shouldIncludeRow checks if a row matches the configured row filters +func (q *Query) shouldIncludeRow(row map[string]any, metric *config.MetricConfig) bool { + for _, filter := range metric.RowFilters { + if !q.applyRowFilter(row, filter) { + return false + } + } + return true +} + +// applyRowFilter applies a single row filter to determine if row should be included +func (q *Query) applyRowFilter(row map[string]any, filter config.RowFilter) bool { + value, exists := row[filter.Column] + if !exists { + return false + } + + // Handle sql.NullString, sql.NullFloat64, sql.NullTime types from updated codebase + var valueStr string + switch v := value.(type) { + case sql.NullString: + if !v.Valid { + return false + } + valueStr = v.String + case sql.NullFloat64: + if !v.Valid { + return false + } + valueStr = fmt.Sprintf("%v", v.Float64) + case sql.NullTime: + if !v.Valid { + return false + } + valueStr = v.Time.Format("2006-01-02 15:04:05.000 UTC") + default: + valueStr = fmt.Sprintf("%v", value) + } + + switch filter.Operator { + case "equals": + return valueStr == filter.Value + case "not_equals": + return valueStr != filter.Value + case "in": + for _, v := range filter.Values { + if valueStr == v { + return true + } + } + return false + case "not_in": + for _, v := range filter.Values { + if valueStr == v { + return false + } + } + return true + case "contains": + return strings.Contains(valueStr, filter.Value) + default: + slog.Warn("Unknown filter operator", "operator", filter.Operator) + return true + } +} + +// applyTransformations applies configured transformations like lag calculations to a row +func (q *Query) applyTransformations(row map[string]any, metric *config.MetricConfig) map[string]any { + result := make(map[string]any) + + // Copy original row data + for k, v := range row { + result[k] = v + } + + // Apply lag calculations + for _, lagCalc := range metric.LagCalculations { + if sourceValue, exists := row[lagCalc.SourceColumn]; exists { + lagSeconds := q.calculateLag(sourceValue, lagCalc.TimestampFormat) + // Create a sql.NullFloat64 to match the expected type system + result[lagCalc.OutputColumn] = sql.NullFloat64{Float64: lagSeconds, Valid: lagSeconds != 0} + } + } + + // Apply column filtering if specified + if len(metric.ColumnFilters) > 0 { + filtered := make(map[string]any) + for _, column := range metric.ColumnFilters { + if value, exists := result[column]; exists { + filtered[column] = value + } + } + return filtered + } + + return result +} + +// calculateLag calculates the lag in seconds between a timestamp and current time +func (q *Query) calculateLag(timestampValue any, format string) float64 { + if timestampValue == nil { + return 0 + } + + var timestampStr string + + // Handle different timestamp value types from the updated codebase + switch v := timestampValue.(type) { + case sql.NullString: + if !v.Valid { + return 0 + } + timestampStr = v.String + case sql.NullTime: + if !v.Valid { + return 0 + } + // Calculate lag directly from time.Time + return time.Since(v.Time).Seconds() + case string: + timestampStr = v + default: + timestampStr = fmt.Sprintf("%v", timestampValue) + } + + if timestampStr == "" { + return 0 + } + + // Default format for Trino timestamps + if format == "" { + format = "2006-01-02 15:04:05.000 UTC" + } + + // Parse the timestamp + parsedTime, err := time.Parse(format, timestampStr) + if err != nil { + slog.Warn("Failed to parse timestamp for lag calculation", "timestamp", timestampStr, "format", format, "error", err) + return 0 + } + + // Calculate lag in seconds + lag := time.Since(parsedTime).Seconds() + return lag +} From caa901fc1fbab183bfc7c58a631531a35c8435fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Monteiro?= Date: Sun, 27 Jul 2025 18:04:53 +0100 Subject: [PATCH 3/3] fmt --- config/metric_config.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/config/metric_config.go b/config/metric_config.go index 6f842ad5..ab1e761a 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -38,17 +38,17 @@ type MetricConfig struct { // RowFilter defines conditions to filter rows after query execution type RowFilter struct { - Column string `yaml:"column"` // column name to filter on - Operator string `yaml:"operator"` // "equals", "in", "not_in", "contains", "not_equals" - Value string `yaml:"value,omitempty"` // single value for equals/not_equals/contains - Values []string `yaml:"values,omitempty"` // multiple values for in/not_in + Column string `yaml:"column"` // column name to filter on + Operator string `yaml:"operator"` // "equals", "in", "not_in", "contains", "not_equals" + Value string `yaml:"value,omitempty"` // single value for equals/not_equals/contains + Values []string `yaml:"values,omitempty"` // multiple values for in/not_in } // LagCalculation defines how to calculate time lag from timestamp fields type LagCalculation struct { - SourceColumn string `yaml:"source_column"` // column containing the timestamp (e.g., "high_value") - OutputColumn string `yaml:"output_column"` // new column name for the lag value (e.g., "lag_seconds") - TimestampFormat string `yaml:"timestamp_format,omitempty"` // format of timestamp, defaults to Trino format + SourceColumn string `yaml:"source_column"` // column containing the timestamp (e.g., "high_value") + OutputColumn string `yaml:"output_column"` // new column name for the lag value (e.g., "lag_seconds") + TimestampFormat string `yaml:"timestamp_format,omitempty"` // format of timestamp, defaults to Trino format } // ValueType returns the metric type, converted to a prometheus.ValueType.