Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ settings.yaml
__debug_bin*
*.code-workspace
.history/
.sample-data/
internal/service/ch/detector_queries_test.go
sample_data/
e2e/local_data_test.go
e2e/clickhouse_container_test.go
17 changes: 17 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ docker: dep
@docker build -f ./Dockerfile . -t dimozone/$(BIN_NAME):$(VER_CUT)
@docker tag dimozone/$(BIN_NAME):$(VER_CUT) dimozone/$(BIN_NAME):latest

# Build multi-arch (amd64 + arm64) and push with a random tag. Does not trigger GitHub workflows.
# Requires: docker buildx, docker login. Run from repo root.
docker-push-multiarch:
$(eval TAG := dev-$(shell openssl rand -hex 6))
@echo "Building and pushing dimozone/$(BIN_NAME):$(TAG) (linux/amd64, linux/arm64)"
@docker buildx build --platform linux/amd64,linux/arm64 -f ./Dockerfile --push -t dimozone/$(BIN_NAME):$(TAG) .
@echo "Pushed dimozone/$(BIN_NAME):$(TAG)"

# Same as docker-push-multiarch but using podman (manifest list + manifest push --all).
podman-push-multiarch:
$(eval TAG := dev-$(shell openssl rand -hex 6))
$(eval IMAGE := dimozone/$(BIN_NAME):$(TAG))
@echo "Building and pushing $(IMAGE) (linux/amd64, linux/arm64)"
@podman build --platform linux/amd64,linux/arm64 --manifest $(IMAGE) -f ./Dockerfile .
@podman manifest push --all $(IMAGE) docker://$(IMAGE)
@echo "Pushed $(IMAGE)"

gqlgen: ## Generate gqlgen code.
@go tool gqlgen generate

Expand Down
2 changes: 1 addition & 1 deletion charts/telemetry-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ replicaCount: 1
image:
repository: dimozone/telemetry-api
pullPolicy: IfNotPresent
tag: 69c2cd4
tag: df47a59
imagePullSecrets: []
nameOverride: ''
fullnameOverride: ''
Expand Down
104 changes: 104 additions & 0 deletions e2e/clickhouse_container_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package e2e_test

import (
"bufio"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"os"
"strconv"
"testing"
"time"

chconfig "github.com/DIMO-Network/clickhouse-infra/pkg/connect/config"
"github.com/DIMO-Network/clickhouse-infra/pkg/container"
Expand All @@ -12,6 +18,8 @@ import (
"github.com/stretchr/testify/require"
)

const loadBatchSize = 5000

func setupClickhouseContainer(t *testing.T) *container.Container {
t.Helper()
ctx := context.Background()
Expand Down Expand Up @@ -67,3 +75,99 @@ func insertEvent(t *testing.T, ch *container.Container, events []vss.Event) {
err = batch.Send()
require.NoError(t, err, "Failed to send batch")
}

// LoadSampleDataInto loads signal and event CSVs into the ClickHouse container.
func LoadSampleDataInto(t *testing.T, ch *container.Container, signalPath, eventPath string) {
t.Helper()

sf, err := os.Open(signalPath)
require.NoError(t, err)
defer func() { _ = sf.Close() }()
br := bufio.NewReader(sf)
if peek, _ := br.Peek(3); len(peek) == 3 && peek[0] == 0xef && peek[1] == 0xbb && peek[2] == 0xbf {
_, _ = br.Discard(3)
}
sr := csv.NewReader(br)
sr.FieldsPerRecord = -1
sigRows, err := sr.ReadAll()
require.NoError(t, err)
if len(sigRows) < 2 {
t.Fatalf("signal CSV has no data rows")
}
signals := make([]vss.Signal, 0, len(sigRows)-1)
for _, row := range sigRows[1:] {
if len(row) < 9 {
continue
}
tokenID, _ := strconv.ParseUint(row[0], 10, 32)
ts, _ := time.Parse("2006-01-02 15:04:05.000000", row[1])
var loc vss.Location
if len(row[8]) > 0 {
_ = json.Unmarshal([]byte(row[8]), &loc)
}
valNum, _ := strconv.ParseFloat(row[6], 64)
signals = append(signals, vss.Signal{
TokenID: uint32(tokenID),
Timestamp: ts,
Name: row[2],
Source: row[3],
Producer: row[4],
CloudEventID: row[5],
ValueNumber: valNum,
ValueString: row[7],
ValueLocation: loc,
})
}
for i := 0; i < len(signals); i += loadBatchSize {
end := i + loadBatchSize
if end > len(signals) {
end = len(signals)
}
insertSignal(t, ch, signals[i:end])
}

ef, err := os.Open(eventPath)
require.NoError(t, err)
defer func() { _ = ef.Close() }()
erBr := bufio.NewReader(ef)
if peek, _ := erBr.Peek(3); len(peek) == 3 && peek[0] == 0xef && peek[1] == 0xbb && peek[2] == 0xbf {
_, _ = erBr.Discard(3)
}
er := csv.NewReader(erBr)
er.FieldsPerRecord = -1
evRows, err := er.ReadAll()
require.NoError(t, err)
if len(evRows) < 2 {
t.Fatalf("event CSV has no data rows")
}
events := make([]vss.Event, 0, len(evRows)-1)
for _, row := range evRows[1:] {
if len(row) < 9 {
continue
}
ts, _ := time.Parse("2006-01-02 15:04:05.000000", row[5])
durNs, _ := strconv.ParseUint(row[6], 10, 64)
var tags []string
if len(row[8]) > 0 {
_ = json.Unmarshal([]byte(row[8]), &tags)
}
events = append(events, vss.Event{
Subject: row[0],
Source: row[1],
Producer: row[2],
CloudEventID: row[3],
Name: row[4],
Timestamp: ts,
DurationNs: durNs,
Metadata: row[7],
Tags: tags,
})
}
for i := 0; i < len(events); i += loadBatchSize {
end := i + loadBatchSize
if end > len(events) {
end = len(events)
}
insertEvent(t, ch, events[i:end])
}
}
6 changes: 3 additions & 3 deletions e2e/permission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestPermission(t *testing.T) {
to: "2023-01-02T00:00:00Z"
mechanism: ignitionDetection
) {
startTime
start { timestamp }
}
}`,
permissions: []string{tokenclaims.PermissionGetLocationHistory, tokenclaims.PermissionGetNonLocationHistory},
Expand All @@ -147,7 +147,7 @@ func TestPermission(t *testing.T) {
to: "2023-01-02T00:00:00Z"
mechanism: ignitionDetection
) {
startTime
start { timestamp }
}
}`,
permissions: []string{tokenclaims.PermissionGetNonLocationHistory},
Expand All @@ -163,7 +163,7 @@ func TestPermission(t *testing.T) {
to: "2023-01-02T00:00:00Z"
mechanism: ignitionDetection
) {
startTime
start { timestamp }
}
}`,
permissions: []string{tokenclaims.PermissionGetLocationHistory},
Expand Down
52 changes: 49 additions & 3 deletions e2e/segments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ const (
testTokenID = uint32(12345)
testSource = "test-source"
testProducer = "test-producer"
// tripDuration must be > 150 seconds (minSegmentDurationSeconds default)
tripDuration = 180 // 3 minutes
// tripGap must be > 600 seconds (minIdleSeconds default) to ensure separate trips
// tripDuration must be >= 240s (defaultMinSegmentDurationSeconds in detectors)
tripDuration = 300 // 5 minutes
// tripGap must be > 300s (defaultMinIdleSeconds) to ensure separate trips
tripGap = 720 // 12 minutes
)

Expand Down Expand Up @@ -352,4 +352,50 @@ func TestSegmentDetectors(t *testing.T) {
segments[0].StartTime.Format(time.RFC3339), fromMidTrip1.Format(time.RFC3339))
assert.False(t, segments[1].StartedBeforeRange, "Trip 2 should have StartedBeforeRange=false")
})

// Excessive idling: insert engine speed (RPM) in idle range for a contiguous period
idleStart := baseTime.Add(48 * time.Hour)
idleDurationSec := 15 * 60 // 15 minutes
t.Run("StaticRpm", func(t *testing.T) {
idleSignals := generateIdleRpmSignals(idleStart, idleDurationSec)
insertTestSignals(t, conn, idleSignals)

fromIdle := idleStart.Add(-1 * time.Hour)
toIdle := idleStart.Add(time.Duration(idleDurationSec)*time.Second + 1*time.Hour)

detector := ch.NewStaticRpmDetector(conn)
segments, err := detector.DetectSegments(ctx, testTokenID, fromIdle, toIdle, nil)
require.NoError(t, err)

require.Len(t, segments, 1, "Expected 1 static RPM (idling) segment")
seg := segments[0]
assert.False(t, seg.IsOngoing)
assert.NotNil(t, seg.EndTime)
// minSegmentDurationSeconds default is 240 (4 min); we have 15 min of idle
assert.GreaterOrEqual(t, seg.DurationSeconds, int32(240))
t.Logf("Idling segment: %s - %v (duration: %ds)",
seg.StartTime.Format(time.RFC3339), seg.EndTime, seg.DurationSeconds)
})
}

// generateIdleRpmSignals creates powertrainCombustionEngineSpeed signals in idle range (e.g. 800 rpm)
// at 10s intervals for the given duration so 60s windows have enough samples and max(rpm) <= 1000.
func generateIdleRpmSignals(startTime time.Time, durationSeconds int) []testSignal {
const engineSpeedName = "powertrainCombustionEngineSpeed"
const idleRpm = 800.0
signals := []testSignal{}
for offset := 0; offset < durationSeconds; offset += 10 {
ts := startTime.Add(time.Duration(offset) * time.Second)
signals = append(signals, testSignal{
TokenID: testTokenID,
Timestamp: ts,
Name: engineSpeedName,
ValueNumber: idleRpm,
ValueString: "",
Source: testSource,
Producer: testProducer,
CloudEventID: fmt.Sprintf("idle-%s-%d", ts.Format("150405"), offset),
})
}
return signals
}
Loading