diff --git a/e2e/segments_test.go b/e2e/segments_test.go index abbf718..ffb06ff 100644 --- a/e2e/segments_test.go +++ b/e2e/segments_test.go @@ -352,4 +352,50 @@ func TestSegmentDetectors(t *testing.T) { segments[0].StartTime.Format(time.RFC3339), fromMidTrip1.Format(time.RFC3339)) assert.False(t, segments[1].StartedBeforeRange, "Trip 2 should have StartedBeforeRange=false") }) + + // Excessive idling: insert engine speed (RPM) in idle range for a contiguous period + idleStart := baseTime.Add(48 * time.Hour) + idleDurationSec := 15 * 60 // 15 minutes + t.Run("StaticRpm", func(t *testing.T) { + idleSignals := generateIdleRpmSignals(idleStart, idleDurationSec) + insertTestSignals(t, conn, idleSignals) + + fromIdle := idleStart.Add(-1 * time.Hour) + toIdle := idleStart.Add(time.Duration(idleDurationSec)*time.Second + 1*time.Hour) + + detector := ch.NewStaticRpmDetector(conn) + segments, err := detector.DetectSegments(ctx, testTokenID, fromIdle, toIdle, nil) + require.NoError(t, err) + + require.Len(t, segments, 1, "Expected 1 static RPM (idling) segment") + seg := segments[0] + assert.False(t, seg.IsOngoing) + assert.NotNil(t, seg.EndTime) + // minSegmentDurationSeconds default is 240 (4 min); we have 15 min of idle + assert.GreaterOrEqual(t, seg.DurationSeconds, int32(240)) + t.Logf("Idling segment: %s - %v (duration: %ds)", + seg.StartTime.Format(time.RFC3339), seg.EndTime, seg.DurationSeconds) + }) +} + +// generateIdleRpmSignals creates powertrainCombustionEngineSpeed signals in idle range (e.g. 800 rpm) +// at 10s intervals for the given duration so 60s windows have enough samples and max(rpm) <= 1000. +func generateIdleRpmSignals(startTime time.Time, durationSeconds int) []testSignal { + const engineSpeedName = "powertrainCombustionEngineSpeed" + const idleRpm = 800.0 + signals := []testSignal{} + for offset := 0; offset < durationSeconds; offset += 10 { + ts := startTime.Add(time.Duration(offset) * time.Second) + signals = append(signals, testSignal{ + TokenID: testTokenID, + Timestamp: ts, + Name: engineSpeedName, + ValueNumber: idleRpm, + ValueString: "", + Source: testSource, + Producer: testProducer, + CloudEventID: fmt.Sprintf("idle-%s-%d", ts.Format("150405"), offset), + }) + } + return signals } diff --git a/internal/graph/generated.go b/internal/graph/generated.go index f649b37..ecb5644 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -3078,6 +3078,12 @@ input EventFilter { Best alternative when ignition signal is unavailable - same accuracy, same speed as frequency analysis. """ changePointDetection + + """ + Static RPM: Segments are contiguous periods where engine RPM remains in idle range. + Uses repeated windows of idle RPM (e.g. powertrainCombustionEngineSpeed <= maxIdleRpm) merged like trips. + """ + staticRpm } extend type Query { @@ -3119,13 +3125,18 @@ input SegmentConfig { minSegmentDurationSeconds: Int = 240 """ - [frequencyAnalysis only] Minimum signal count per window for activity detection. - Higher values = more conservative (filters parked telemetry better). - Lower values = more sensitive (works for sparse signal vehicles). - Default: 10 (tuned to match ignition detection accuracy) - Min: 1, Max: 3600 + [frequencyAnalysis] Minimum signal count per window for activity detection. + [staticRpm] Minimum samples per window to consider it idle (same semantics). + Higher values = more conservative. Lower values = more sensitive. + Default: 10, Min: 1, Max: 3600 """ signalCountThreshold: Int = 10 + + """ + [staticRpm only] Upper bound for idle RPM. Windows with max(RPM) <= this are considered idle. + Default: 1500, Min: 300, Max: 3000 + """ + maxIdleRpm: Int = 1500 } type Segment { @@ -21985,8 +21996,11 @@ func (ec *executionContext) unmarshalInputSegmentConfig(ctx context.Context, obj if _, present := asMap["signalCountThreshold"]; !present { asMap["signalCountThreshold"] = 10 } + if _, present := asMap["maxIdleRpm"]; !present { + asMap["maxIdleRpm"] = 1500 + } - fieldsInOrder := [...]string{"minIdleSeconds", "minSegmentDurationSeconds", "signalCountThreshold"} + fieldsInOrder := [...]string{"minIdleSeconds", "minSegmentDurationSeconds", "signalCountThreshold", "maxIdleRpm"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -22014,6 +22028,13 @@ func (ec *executionContext) unmarshalInputSegmentConfig(ctx context.Context, obj return it, err } it.SignalCountThreshold = data + case "maxIdleRpm": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("maxIdleRpm")) + data, err := ec.unmarshalOInt2áš–int(ctx, v) + if err != nil { + return it, err + } + it.MaxIdleRpm = data } } diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 92f1384..2878971 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -167,12 +167,14 @@ type SegmentConfig struct { // Filters very short segments (testing, engine cycling). // Default: 240 (4 minutes), Min: 60, Max: 3600 MinSegmentDurationSeconds *int `json:"minSegmentDurationSeconds,omitempty"` - // [frequencyAnalysis only] Minimum signal count per window for activity detection. - // Higher values = more conservative (filters parked telemetry better). - // Lower values = more sensitive (works for sparse signal vehicles). - // Default: 10 (tuned to match ignition detection accuracy) - // Min: 1, Max: 3600 + // [frequencyAnalysis] Minimum signal count per window for activity detection. + // [staticRpm] Minimum samples per window to consider it idle (same semantics). + // Higher values = more conservative. Lower values = more sensitive. + // Default: 10, Min: 1, Max: 3600 SignalCountThreshold *int `json:"signalCountThreshold,omitempty"` + // [staticRpm only] Upper bound for idle RPM. Windows with max(RPM) <= this are considered idle. + // Default: 1500, Min: 300, Max: 3000 + MaxIdleRpm *int `json:"maxIdleRpm,omitempty"` } type SignalCollection struct { @@ -626,17 +628,21 @@ const ( // Excellent noise resistance with 100% accuracy match to ignition baseline. // Best alternative when ignition signal is unavailable - same accuracy, same speed as frequency analysis. DetectionMechanismChangePointDetection DetectionMechanism = "changePointDetection" + // Static RPM: Segments are contiguous periods where engine RPM remains in idle range. + // Uses repeated windows of idle RPM (e.g. powertrainCombustionEngineSpeed <= maxIdleRpm) merged like trips. + DetectionMechanismStaticRpm DetectionMechanism = "staticRpm" ) var AllDetectionMechanism = []DetectionMechanism{ DetectionMechanismIgnitionDetection, DetectionMechanismFrequencyAnalysis, DetectionMechanismChangePointDetection, + DetectionMechanismStaticRpm, } func (e DetectionMechanism) IsValid() bool { switch e { - case DetectionMechanismIgnitionDetection, DetectionMechanismFrequencyAnalysis, DetectionMechanismChangePointDetection: + case DetectionMechanismIgnitionDetection, DetectionMechanismFrequencyAnalysis, DetectionMechanismChangePointDetection, DetectionMechanismStaticRpm: return true } return false diff --git a/internal/repositories/segments.go b/internal/repositories/segments.go index 8e6edb5..9e74810 100644 --- a/internal/repositories/segments.go +++ b/internal/repositories/segments.go @@ -39,8 +39,9 @@ func validateSegmentArgs(tokenID int, from, to time.Time) error { return nil } -// validateSegmentConfig validates the segment configuration parameters -func validateSegmentConfig(config *model.SegmentConfig) error { +// validateSegmentConfig validates the segment configuration parameters. +// When mechanism is staticRpm, also validates idling-specific fields. +func validateSegmentConfig(config *model.SegmentConfig, mechanism model.DetectionMechanism) error { if config == nil { return nil } @@ -63,6 +64,14 @@ func validateSegmentConfig(config *model.SegmentConfig) error { } } + if mechanism == model.DetectionMechanismStaticRpm { + if config.MaxIdleRpm != nil { + if *config.MaxIdleRpm < 300 || *config.MaxIdleRpm > 3000 { + return fmt.Errorf("maxIdleRpm must be between 300 and 3000") + } + } + } + return nil } @@ -73,7 +82,7 @@ func (r *Repository) GetSegments(ctx context.Context, tokenID int, from, to time return nil, errorhandler.NewBadRequestError(ctx, err) } - if err := validateSegmentConfig(config); err != nil { + if err := validateSegmentConfig(config, mechanism); err != nil { return nil, errorhandler.NewBadRequestError(ctx, err) } diff --git a/internal/repositories/validate_test.go b/internal/repositories/validate_test.go index 3853fd4..5206fb2 100644 --- a/internal/repositories/validate_test.go +++ b/internal/repositories/validate_test.go @@ -90,3 +90,35 @@ func TestValidateSegmentArgs(t *testing.T) { require.Error(t, err) }) } + +func TestValidateSegmentConfig(t *testing.T) { + validConfig := &model.SegmentConfig{} + otherMechanism := model.DetectionMechanismIgnitionDetection + idlingMechanism := model.DetectionMechanismStaticRpm + + t.Run("nil config", func(t *testing.T) { + require.NoError(t, validateSegmentConfig(nil, otherMechanism)) + require.NoError(t, validateSegmentConfig(nil, idlingMechanism)) + }) + + t.Run("valid config other mechanism", func(t *testing.T) { + require.NoError(t, validateSegmentConfig(validConfig, otherMechanism)) + }) + + t.Run("valid config staticRpm with idling fields", func(t *testing.T) { + cfg := &model.SegmentConfig{ + MaxIdleRpm: ptr(1000), + SignalCountThreshold: ptr(5), + } + require.NoError(t, validateSegmentConfig(cfg, idlingMechanism)) + }) + + t.Run("staticRpm maxIdleRpm out of range", func(t *testing.T) { + cfg := &model.SegmentConfig{MaxIdleRpm: ptr(100)} + require.Error(t, validateSegmentConfig(cfg, idlingMechanism)) + cfg.MaxIdleRpm = ptr(4000) + require.Error(t, validateSegmentConfig(cfg, idlingMechanism)) + }) +} + +func ptr(i int) *int { return &i } diff --git a/internal/service/ch/segments.go b/internal/service/ch/segments.go index 1b60124..bf76916 100644 --- a/internal/service/ch/segments.go +++ b/internal/service/ch/segments.go @@ -35,6 +35,8 @@ func (s *Service) GetSegments( detector = &FrequencyDetector{conn: s.conn} case model.DetectionMechanismChangePointDetection: detector = &ChangePointDetector{conn: s.conn} + case model.DetectionMechanismStaticRpm: + detector = &StaticRpmDetector{conn: s.conn} default: return nil, fmt.Errorf("unknown detection mechanism: %s", mechanism) } diff --git a/internal/service/ch/static_rpm_detector.go b/internal/service/ch/static_rpm_detector.go new file mode 100644 index 0000000..b725a94 --- /dev/null +++ b/internal/service/ch/static_rpm_detector.go @@ -0,0 +1,132 @@ +package ch + +import ( + "context" + "fmt" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/DIMO-Network/telemetry-api/internal/graph/model" +) + +const ( + defaultIdleWindowSizeSeconds = 60 // 1 minute windows (same as frequency detector) + defaultMaxIdleRpm = 1500 + engineSpeedSignalName = "powertrainCombustionEngineSpeed" // fixed; not configurable + defaultSignalCountThresholdIdle = 2 // powertrainCombustionEngineSpeed ~2/min; min samples per 1min window + defaultMinIdleRpmForEngineRunning = 0 // min_rpm > this to exclude engine-off +) + +// StaticRpmDetector detects segments where engine RPM remains in idle range (static/low RPM). +// Uses repeated windows of idle RPM merged like trips. +type StaticRpmDetector struct { + conn clickhouse.Conn +} + +// NewStaticRpmDetector creates a new StaticRpmDetector with the given connection. +func NewStaticRpmDetector(conn clickhouse.Conn) *StaticRpmDetector { + return &StaticRpmDetector{conn: conn} +} + +// DetectSegments implements idle-RPM-based segment detection. +func (d *StaticRpmDetector) DetectSegments( + ctx context.Context, + tokenID uint32, + from, to time.Time, + config *model.SegmentConfig, +) ([]*Segment, error) { + maxGap := defaultMinIdleSeconds + minDuration := defaultMinSegmentDurationSeconds + maxIdleRpm := defaultMaxIdleRpm + signalThreshold := defaultSignalCountThresholdIdle + + if config != nil { + if config.MinIdleSeconds != nil { + maxGap = *config.MinIdleSeconds + } + if config.MinSegmentDurationSeconds != nil { + minDuration = *config.MinSegmentDurationSeconds + } + if config.MaxIdleRpm != nil { + maxIdleRpm = *config.MaxIdleRpm + } + if config.SignalCountThreshold != nil { + signalThreshold = *config.SignalCountThreshold + } + } + + windowSize := defaultIdleWindowSizeSeconds + lookbackFrom := from.Add(-time.Duration(maxGap) * time.Second) + windows, err := d.getIdleWindows(ctx, tokenID, lookbackFrom, to, windowSize, maxIdleRpm, signalThreshold) + if err != nil { + return nil, fmt.Errorf("failed to get idle windows: %w", err) + } + + if len(windows) == 0 { + return nil, nil + } + + segments := mergeWindowsIntoSegments(tokenID, windows, from, to, maxGap, minDuration) + return segments, nil +} + +// getIdleWindows returns time windows where engine speed is in idle band (0 < rpm <= maxIdleRpm). +// Uses signal FINAL; groups by window and keeps windows with sample_count >= signalThreshold and max(rpm) <= maxIdleRpm and min(rpm) > 0. +func (d *StaticRpmDetector) getIdleWindows( + ctx context.Context, + tokenID uint32, + from, to time.Time, + windowSizeSeconds int, + maxIdleRpm int, + signalThreshold int, +) ([]ActiveWindow, error) { + query := ` +SELECT + toStartOfInterval(timestamp, INTERVAL ? second) AS window_start, + toStartOfInterval(timestamp, INTERVAL ? second) + INTERVAL ? second AS window_end, + count() AS signal_count, + uniq(name) AS distinct_signal_count +FROM signal FINAL +PREWHERE token_id = ? +WHERE name = ? + AND timestamp >= ? + AND timestamp < ? +GROUP BY window_start +HAVING signal_count >= ? AND max(value_number) <= ? AND min(value_number) > ? +ORDER BY window_start` + + rows, err := d.conn.Query(ctx, query, + windowSizeSeconds, windowSizeSeconds, windowSizeSeconds, + tokenID, engineSpeedSignalName, from, to, + signalThreshold, maxIdleRpm, defaultMinIdleRpmForEngineRunning) + if err != nil { + return nil, fmt.Errorf("failed querying idle windows: %w", err) + } + defer func() { _ = rows.Close() }() + + expectedWindows := int(to.Sub(from).Seconds()) / windowSizeSeconds + if expectedWindows <= 0 { + expectedWindows = 1 + } + windows := make([]ActiveWindow, 0, expectedWindows) + + for rows.Next() { + var w ActiveWindow + err := rows.Scan(&w.WindowStart, &w.WindowEnd, &w.SignalCount, &w.DistinctSignalCount) + if err != nil { + return nil, fmt.Errorf("failed scanning idle window row: %w", err) + } + windows = append(windows, w) + } + + if rows.Err() != nil { + return nil, fmt.Errorf("idle window row error: %w", rows.Err()) + } + + return windows, nil +} + +// GetMechanismName returns the name of this detection mechanism. +func (d *StaticRpmDetector) GetMechanismName() string { + return "staticRpm" +} diff --git a/internal/service/ch/static_rpm_detector_test.go b/internal/service/ch/static_rpm_detector_test.go new file mode 100644 index 0000000..9bb7230 --- /dev/null +++ b/internal/service/ch/static_rpm_detector_test.go @@ -0,0 +1,91 @@ +package ch + +import ( + "testing" + "time" + + "github.com/DIMO-Network/telemetry-api/internal/graph/model" + "github.com/stretchr/testify/require" +) + +func TestStaticRpmDetector_GetMechanismName(t *testing.T) { + d := &StaticRpmDetector{} + require.Equal(t, "staticRpm", d.GetMechanismName()) +} + +func TestStaticRpmDetector_DetectSegments_ConfigDefaults(t *testing.T) { + // Config: static RPM uses SignalCountThreshold (same as frequency), maxIdleRpm; engine speed signal name is fixed. + _ = model.DetectionMechanismStaticRpm + maxRpm := 900 + threshold := 5 + config := &model.SegmentConfig{ + MaxIdleRpm: &maxRpm, + SignalCountThreshold: &threshold, + MinIdleSeconds: ptr(600), + MinSegmentDurationSeconds: ptr(300), + } + require.NotNil(t, config) + require.Equal(t, 900, *config.MaxIdleRpm) + require.Equal(t, 5, *config.SignalCountThreshold) +} + +func TestStaticRpmDetector_IdleWindowsMergeIntoOneSegment(t *testing.T) { + // Static RPM detector uses mergeWindowsIntoSegments with idle windows; merge logic is shared with frequency detector. + // Verify that a run of consecutive idle windows produces one segment when gap and duration are satisfied. + now := time.Now() + from := now.Add(-30 * time.Minute) + to := now.Add(-5 * time.Minute) + tokenID := uint32(1) + maxGap := 300 + minDuration := 60 + + // Consecutive 1-minute idle windows; last window ends before query 'to' so segment is completed (not ongoing) + endWindows := to.Add(-10 * time.Minute) // last window ends 10 min before query end + windows := make([]ActiveWindow, 0, 15) + for s := from; s.Before(endWindows); s = s.Add(time.Minute) { + windows = append(windows, ActiveWindow{ + WindowStart: s, + WindowEnd: s.Add(time.Minute), + SignalCount: 5, + DistinctSignalCount: 1, + }) + } + segments := mergeWindowsIntoSegments(tokenID, windows, from, to, maxGap, minDuration) + require.Len(t, segments, 1) + require.False(t, segments[0].IsOngoing) + require.NotNil(t, segments[0].EndTime) + require.True(t, segments[0].DurationSeconds >= int32(minDuration)) +} + +func TestStaticRpmDetector_TwoIdleBlocksProduceTwoSegments(t *testing.T) { + now := time.Now() + from := now.Add(-2 * time.Hour) + to := now.Add(-10 * time.Minute) + tokenID := uint32(1) + maxGap := 300 // 5 min + minDuration := 120 // 2 min + + // Block 1: 2 hours ago, 3 minutes of idle windows + block1Start := from + block1End := from.Add(3 * time.Minute) + // Block 2: 30 min ago, 3 minutes of idle windows (gap between block1 and block2 > maxGap) + block2Start := to.Add(-30 * time.Minute) + block2End := block2Start.Add(3 * time.Minute) + + windows := []ActiveWindow{ + {WindowStart: block1Start, WindowEnd: block1Start.Add(time.Minute), SignalCount: 5, DistinctSignalCount: 1}, + {WindowStart: block1Start.Add(time.Minute), WindowEnd: block1End, SignalCount: 5, DistinctSignalCount: 1}, + {WindowStart: block1End, WindowEnd: block1End.Add(time.Minute), SignalCount: 5, DistinctSignalCount: 1}, + {WindowStart: block2Start, WindowEnd: block2Start.Add(time.Minute), SignalCount: 5, DistinctSignalCount: 1}, + {WindowStart: block2Start.Add(time.Minute), WindowEnd: block2End, SignalCount: 5, DistinctSignalCount: 1}, + {WindowStart: block2End, WindowEnd: block2End.Add(time.Minute), SignalCount: 5, DistinctSignalCount: 1}, + } + segments := mergeWindowsIntoSegments(tokenID, windows, from, to, maxGap, minDuration) + require.Len(t, segments, 2) + require.False(t, segments[0].IsOngoing) + require.False(t, segments[1].IsOngoing) + require.True(t, segments[0].StartTime.Equal(block1Start)) + require.True(t, segments[1].StartTime.Equal(block2Start)) +} + +func ptr(i int) *int { return &i } diff --git a/schema/segments.graphqls b/schema/segments.graphqls index d76f968..1853d84 100644 --- a/schema/segments.graphqls +++ b/schema/segments.graphqls @@ -19,6 +19,12 @@ enum DetectionMechanism { Best alternative when ignition signal is unavailable - same accuracy, same speed as frequency analysis. """ changePointDetection + + """ + Static RPM: Segments are contiguous periods where engine RPM remains in idle range. + Uses repeated windows of idle RPM (e.g. powertrainCombustionEngineSpeed <= maxIdleRpm) merged like trips. + """ + staticRpm } extend type Query { @@ -60,13 +66,18 @@ input SegmentConfig { minSegmentDurationSeconds: Int = 240 """ - [frequencyAnalysis only] Minimum signal count per window for activity detection. - Higher values = more conservative (filters parked telemetry better). - Lower values = more sensitive (works for sparse signal vehicles). - Default: 10 (tuned to match ignition detection accuracy) - Min: 1, Max: 3600 + [frequencyAnalysis] Minimum signal count per window for activity detection. + [staticRpm] Minimum samples per window to consider it idle (same semantics). + Higher values = more conservative. Lower values = more sensitive. + Default: 10, Min: 1, Max: 3600 """ signalCountThreshold: Int = 10 + + """ + [staticRpm only] Upper bound for idle RPM. Windows with max(RPM) <= this are considered idle. + Default: 1500, Min: 300, Max: 3000 + """ + maxIdleRpm: Int = 1500 } type Segment {