Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 10 additions & 24 deletions e2e/approximate_location_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,18 @@ func TestApproximateLocation(t *testing.T) {
endLoc := h3.LatLng{Lat: 40.73899538333504, Lng: -73.99386110247163}
signals := []vss.Signal{
{
Source: ch.SourceTranslations["smartcar"][0],
Timestamp: locationTime.Add(-time.Hour * 24),
Name: vss.FieldCurrentLocationLatitude,
ValueNumber: startLoc.Lat,
TokenID: 39718,
Source: ch.SourceTranslations["smartcar"][0],
Timestamp: locationTime.Add(-time.Hour * 24),
Name: "currentLocation",
ValueLocation: vss.Location{Latitude: startLoc.Lat, Longitude: startLoc.Lng},
TokenID: 39718,
},
{
Source: ch.SourceTranslations["smartcar"][0],
Timestamp: locationTime.Add(-time.Hour * 24),
Name: vss.FieldCurrentLocationLongitude,
ValueNumber: startLoc.Lng,
TokenID: 39718,
},
{
Source: ch.SourceTranslations["smartcar"][0],
Timestamp: locationTime,
Name: vss.FieldCurrentLocationLatitude,
ValueNumber: endLoc.Lat,
TokenID: 39718,
},
{
Source: ch.SourceTranslations["smartcar"][0],
Timestamp: locationTime,
Name: vss.FieldCurrentLocationLongitude,
ValueNumber: endLoc.Lng,
TokenID: 39718,
Source: ch.SourceTranslations["smartcar"][0],
Timestamp: locationTime,
Name: "currentLocation",
ValueLocation: vss.Location{Latitude: endLoc.Lat, Longitude: endLoc.Lng},
TokenID: 39718,
},
}

Expand Down
10 changes: 6 additions & 4 deletions internal/graph/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/DIMO-Network/telemetry-api/internal/graph/model"
)

// signalLocationType is the name of the GraphQL SignalLocation type.
const signalLocationType = "SignalLocation"

// aggregationArgsFromContext creates an aggregated signals arguments from the context and the provided arguments.
func aggregationArgsFromContext(ctx context.Context, tokenID int, interval string, from time.Time, to time.Time, filter *model.SignalFilter) (*model.AggregatedSignalArgs, error) {
// 1h 1s
Expand Down Expand Up @@ -102,10 +105,9 @@ func latestArgsFromContext(ctx context.Context, tokenID int, filter *model.Signa
continue
}

if field.Name == model.ApproximateLatField || field.Name == model.ApproximateLongField {
latestArgs.SignalNames[vss.FieldCurrentLocationLatitude] = struct{}{}
latestArgs.SignalNames[vss.FieldCurrentLocationLongitude] = struct{}{}
} else if field.Definition.Type.Name() == "SignalLocation" {
if field.Name == model.ApproximateLatField || field.Name == model.ApproximateLongField || field.Name == vss.FieldCurrentLocationLatitude || field.Name == vss.FieldCurrentLocationLongitude {
latestArgs.LocationSignalNames[vss.FieldCurrentLocationCoordinates] = struct{}{}
} else if field.Definition.Type.Name() == signalLocationType {
latestArgs.LocationSignalNames[field.Name] = struct{}{}
} else {
latestArgs.SignalNames[field.Name] = struct{}{}
Expand Down
11 changes: 11 additions & 0 deletions internal/repositories/repositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ func (r *Repository) GetSignalLatest(ctx context.Context, latestArgs *model.Late
coll.LastSeen = &signal.Timestamp
continue
}
if signal.Name == vss.FieldCurrentLocationCoordinates {
coll.CurrentLocationLatitude = &model.SignalFloat{
Timestamp: signal.Timestamp,
Value: signal.ValueLocation.Latitude,
}
coll.CurrentLocationLongitude = &model.SignalFloat{
Timestamp: signal.Timestamp,
Value: signal.ValueLocation.Longitude,
}
continue
}
model.SetCollectionField(coll, signal)
}
setApproximateLocationInCollection(coll)
Expand Down
79 changes: 42 additions & 37 deletions internal/service/ch/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,6 @@ const (
latestTimestamp = "max(" + vss.TimestampCol + ") as ts"
)

// Aggregation functions for float signals.
const (
avgGroup = "avg(" + vss.ValueNumberCol + ")"
randFloatGroup = "groupArraySample(1, %d)(" + vss.ValueNumberCol + ")[1]"
minGroup = "min(" + vss.ValueNumberCol + ")"
maxGroup = "max(" + vss.ValueNumberCol + ")"
medGroup = "median(" + vss.ValueNumberCol + ")"
firstFloatGroup = "argMin(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")"
lastFloatGroup = "argMax(" + vss.ValueNumberCol + ", " + vss.TimestampCol + ")"
)

// Aggregation functions for string signals.
const (
randStringGroup = "groupArraySample(1, %d)(" + vss.ValueStringCol + ")[1]"
Expand Down Expand Up @@ -164,20 +153,32 @@ func selectInterval(microSeconds int64, origin time.Time) qm.QueryMod {
vss.TimestampCol, microSeconds, origin.UnixMicro(), IntervalGroup))
}

func getFloatSelectExpr(field string) string {
switch field {
case vss.FieldCurrentLocationLatitude:
return vss.ValueLocationCol + ".latitude"
case vss.FieldCurrentLocationLongitude:
return vss.ValueLocationCol + ".longitude"
default:
return vss.ValueNumberCol
}
}

func selectNumberAggs(numberAggs []model.FloatSignalArgs, appLocAggs map[model.FloatAggregation]struct{}) qm.QueryMod {
if len(numberAggs) == 0 && len(appLocAggs) == 0 {
return qm.Select("NULL AS " + AggNumberCol)
}
// Add a CASE statement for each name and its corresponding aggregation function
caseStmts := make([]string, 0, len(numberAggs)+2*len(appLocAggs))
for i, agg := range numberAggs {
caseStmts = append(caseStmts, fmt.Sprintf("WHEN %s = %d AND %s = %d THEN %s", signalTypeCol, FloatType, signalIndexCol, i, getFloatAggFunc(agg.Agg)))
caseStmts = append(caseStmts, fmt.Sprintf("WHEN %s = %d AND %s = %d THEN %s", signalTypeCol, FloatType, signalIndexCol, i, getFloatAggFunc(agg.Agg, getFloatSelectExpr(agg.Name))))
}
for i, agg := range model.AllFloatAggregation {
if _, ok := appLocAggs[agg]; ok {
caseStmts = append(caseStmts,
fmt.Sprintf("WHEN %s = %d AND %s = %d THEN %s", signalTypeCol, AppLocType, signalIndexCol, 2*i, getFloatAggFunc(agg)),
fmt.Sprintf("WHEN %s = %d AND %s = %d THEN %s", signalTypeCol, AppLocType, signalIndexCol, 2*i+1, getFloatAggFunc(agg)))
fmt.Sprintf("WHEN %s = %d AND %s = %d THEN %s", signalTypeCol, AppLocType, signalIndexCol, 2*i, getFloatAggFunc(agg, getFloatSelectExpr(vss.FieldCurrentLocationLatitude))),
fmt.Sprintf("WHEN %s = %d AND %s = %d THEN %s", signalTypeCol, AppLocType, signalIndexCol, 2*i+1, getFloatAggFunc(agg, getFloatSelectExpr(vss.FieldCurrentLocationLongitude))),
)
}
}
caseStmt := fmt.Sprintf("CASE %s ELSE NULL END AS %s", strings.Join(caseStmts, " "), AggNumberCol)
Expand Down Expand Up @@ -211,24 +212,24 @@ func selectLocationAggs(stringAggs []model.LocationSignalArgs) qm.QueryMod {
}

// returns a string representation of the aggregation function based on the aggregation type.
func getFloatAggFunc(aggType model.FloatAggregation) string {
aggStr := avgGroup
func getFloatAggFunc(aggType model.FloatAggregation, selectExpr string) string {
aggStr := "avg(" + selectExpr + ")"
switch aggType {
case model.FloatAggregationAvg:
aggStr = avgGroup
aggStr = "avg(" + selectExpr + ")"
case model.FloatAggregationRand:
seed := time.Now().UnixMilli()
aggStr = fmt.Sprintf(randFloatGroup, seed)
aggStr = fmt.Sprintf("groupArraySample(1, %d)("+selectExpr+")[1]", seed)
case model.FloatAggregationMin:
aggStr = minGroup
aggStr = "min(" + selectExpr + ")"
case model.FloatAggregationMax:
aggStr = maxGroup
aggStr = "max(" + selectExpr + ")"
case model.FloatAggregationMed:
aggStr = medGroup
aggStr = "median(" + selectExpr + ")"
case model.FloatAggregationFirst:
aggStr = firstFloatGroup
aggStr = "argMin(" + selectExpr + ", " + vss.TimestampCol + ")"
case model.FloatAggregationLast:
aggStr = lastFloatGroup
aggStr = "argMax(" + selectExpr + ", " + vss.TimestampCol + ")"
}
return aggStr
}
Expand Down Expand Up @@ -420,16 +421,20 @@ func getAggQuery(aggArgs *model.AggregatedSignalArgs) (string, []any, error) {
// You can see the alternatives in the issue and they are ugly.
valuesArgs := make([]string, 0, numAggs)
for i, agg := range aggArgs.FloatArgs {
valuesArgs = append(valuesArgs, aggTableEntry(FloatType, i, agg.Name))
name := agg.Name
if name == vss.FieldCurrentLocationLatitude || name == vss.FieldCurrentLocationLongitude {
name = vss.FieldCurrentLocationCoordinates
}
valuesArgs = append(valuesArgs, aggTableEntry(FloatType, i, name))
}
for i, agg := range aggArgs.StringArgs {
valuesArgs = append(valuesArgs, aggTableEntry(StringType, i, agg.Name))
}
for i, agg := range model.AllFloatAggregation {
if _, ok := aggArgs.ApproxLocArgs[agg]; ok {
valuesArgs = append(valuesArgs,
aggTableEntry(AppLocType, 2*i, vss.FieldCurrentLocationLatitude),
aggTableEntry(AppLocType, 2*i+1, vss.FieldCurrentLocationLongitude))
aggTableEntry(AppLocType, 2*i, vss.FieldCurrentLocationCoordinates+".latitude"),
aggTableEntry(AppLocType, 2*i+1, vss.FieldCurrentLocationCoordinates+".longitude"))
}
}
for i, agg := range aggArgs.LocationArgs {
Expand All @@ -447,7 +452,7 @@ func getAggQuery(aggArgs *model.AggregatedSignalArgs) (string, []any, error) {
fieldFilters := []qm.QueryMod{
qmhelper.Where(signalIndexCol, qmhelper.EQ, i),
}
fieldFilters = append(fieldFilters, buildFloatConditionList(agg.Filter)...)
fieldFilters = append(fieldFilters, buildFloatConditionList(agg.Filter, getFloatSelectExpr(agg.Name))...)

// It's okay to also use Or2 for the first entry: it's simply ignored.
innerFloatFilters = append(innerFloatFilters, qm.Or2(qm.Expr(fieldFilters...)))
Expand Down Expand Up @@ -517,41 +522,41 @@ func getAggQuery(aggArgs *model.AggregatedSignalArgs) (string, []any, error) {
return stmt, args, nil
}

func buildFloatConditionList(fil *model.SignalFloatFilter) []qm.QueryMod {
func buildFloatConditionList(fil *model.SignalFloatFilter, selectExpr string) []qm.QueryMod {
if fil == nil {
return nil
}

var mods []qm.QueryMod

if fil.Eq != nil {
mods = append(mods, qmhelper.Where(vss.ValueNumberCol, qmhelper.EQ, *fil.Eq))
mods = append(mods, qmhelper.Where(selectExpr, qmhelper.EQ, *fil.Eq))
}
if fil.Neq != nil {
mods = append(mods, qmhelper.Where(vss.ValueNumberCol, qmhelper.NEQ, *fil.Neq))
mods = append(mods, qmhelper.Where(selectExpr, qmhelper.NEQ, *fil.Neq))
}
if fil.Gt != nil {
mods = append(mods, qmhelper.Where(vss.ValueNumberCol, qmhelper.GT, *fil.Gt))
mods = append(mods, qmhelper.Where(selectExpr, qmhelper.GT, *fil.Gt))
}
if fil.Lt != nil {
mods = append(mods, qmhelper.Where(vss.ValueNumberCol, qmhelper.LT, *fil.Lt))
mods = append(mods, qmhelper.Where(selectExpr, qmhelper.LT, *fil.Lt))
}
if fil.Gte != nil {
mods = append(mods, qmhelper.Where(vss.ValueNumberCol, qmhelper.GTE, *fil.Gte))
mods = append(mods, qmhelper.Where(selectExpr, qmhelper.GTE, *fil.Gte))
}
if fil.Lte != nil {
mods = append(mods, qmhelper.Where(vss.ValueNumberCol, qmhelper.LTE, *fil.Lte))
mods = append(mods, qmhelper.Where(selectExpr, qmhelper.LTE, *fil.Lte))
}
if len(fil.NotIn) != 0 {
mods = append(mods, qm.WhereNotIn(vss.ValueNumberCol+" NOT IN ?", fil.NotIn))
mods = append(mods, qm.WhereNotIn(selectExpr+" NOT IN ?", fil.NotIn))
}
if len(fil.In) != 0 {
mods = append(mods, qm.WhereIn(vss.ValueNumberCol+" IN ?", fil.In))
mods = append(mods, qm.WhereIn(selectExpr+" IN ?", fil.In))
}

var orMods []qm.QueryMod
for _, cond := range fil.Or {
clauseMods := buildFloatConditionList(cond)
clauseMods := buildFloatConditionList(cond, selectExpr)
if len(clauseMods) != 0 {
orMods = append(orMods, qm.Or2(qm.Expr(clauseMods...)))
}
Expand Down
Loading