Skip to content

Commit dd5a656

Browse files
committed
Merge commit '467ac65'
2 parents 3560cd6 + 467ac65 commit dd5a656

14 files changed

Lines changed: 309 additions & 103 deletions

File tree

.golangci.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,12 @@ version: "2"
33
linters:
44
enable:
55
- sqlclosecheck
6+
settings:
7+
errcheck:
8+
exclude-functions:
9+
- fmt.Fprintf
10+
- (io.ReadCloser).Close
11+
- (*sql.Rows).Close
12+
- (*sqlx.Rows).Close
613
run:
714
timeout: 30m

internal/dbutils/collect.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package dbutils
2+
3+
import (
4+
"database/sql"
5+
"errors"
6+
)
7+
8+
var ErrTooManyRows = errors.New("too many rows returned")
9+
10+
// RowToFunc is a function that scans or otherwise converts row to a T.
11+
type RowToFunc[T any] func(row *sql.Rows) (T, error)
12+
13+
// AppendRows iterates through rows, calling fn for each row, and appending the results into a slice of T.
14+
//
15+
// This function closes the rows automatically on return.
16+
func AppendRows[T any, S ~[]T](slice S, rows *sql.Rows, fn RowToFunc[T]) (S, error) {
17+
defer func() {
18+
_ = rows.Close()
19+
}()
20+
21+
for rows.Next() {
22+
value, err := fn(rows)
23+
if err != nil {
24+
return nil, err
25+
}
26+
slice = append(slice, value)
27+
}
28+
29+
if err := rows.Err(); err != nil {
30+
return nil, err
31+
}
32+
33+
return slice, nil
34+
}
35+
36+
// CollectRows iterates through rows, calling fn for each row, and collecting the results into a slice of T.
37+
//
38+
// This function closes the rows automatically on return.
39+
func CollectRows[T any](rows *sql.Rows, fn RowToFunc[T]) ([]T, error) {
40+
return AppendRows([]T{}, rows, fn)
41+
}
42+
43+
// CollectOneRow calls fn for the first row in rows and returns the result. If no rows are found returns an error where errors.Is(ErrNoRows) is true.
44+
// CollectOneRow is to CollectRows as QueryRow is to Query.
45+
//
46+
// This function closes the rows automatically on return.
47+
func CollectOneRow[T any](rows *sql.Rows, fn RowToFunc[T]) (T, error) {
48+
defer func() {
49+
_ = rows.Close()
50+
}()
51+
52+
var value T
53+
var err error
54+
55+
if !rows.Next() {
56+
if err = rows.Err(); err != nil {
57+
return value, err
58+
}
59+
return value, sql.ErrNoRows
60+
}
61+
62+
value, err = fn(rows)
63+
if err != nil {
64+
return value, err
65+
}
66+
67+
_ = rows.Close()
68+
return value, rows.Err()
69+
}
70+
71+
// CollectExactlyOneRow calls fn for the first row in rows and returns the result.
72+
// - If no rows are found returns an error where errors.Is(ErrNoRows) is true.
73+
// - If more than 1 row is found returns an error where errors.Is(ErrTooManyRows) is true.
74+
//
75+
// This function closes the rows automatically on return.
76+
func CollectExactlyOneRow[T any](rows *sql.Rows, fn RowToFunc[T]) (T, error) {
77+
defer func() {
78+
_ = rows.Close()
79+
}()
80+
81+
var (
82+
err error
83+
value T
84+
)
85+
86+
if !rows.Next() {
87+
if err = rows.Err(); err != nil {
88+
return value, err
89+
}
90+
91+
return value, sql.ErrNoRows
92+
}
93+
94+
value, err = fn(rows)
95+
if err != nil {
96+
return value, err
97+
}
98+
99+
if rows.Next() {
100+
var zero T
101+
102+
return zero, ErrTooManyRows
103+
}
104+
105+
return value, rows.Err()
106+
}

internal/pagination/pagination.go

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
sq "github.com/Masterminds/squirrel"
1313
"github.com/fxamacker/cbor/v2"
14+
"github.com/samber/lo"
1415
)
1516

1617
const (
@@ -89,10 +90,6 @@ type Cursor[T any] struct {
8990
func CreatePageT[T1 any, T2 any](data []T1, cursor Cursor[T2]) Page[T1] {
9091
var cursorString string
9192

92-
// TODO: Small bug, at this point the data has already been Limited and the data will always be equal to the Limit or smaller
93-
// this means that a cursor will be made if the set is exactly equal to the limit but in reality, there is no more data
94-
// meaning a new cursor is sent that won't result in any data
95-
// https://github.com/sensorbucket/SensorBucket/issues/82
9693
if len(data) >= int(cursor.Limit) {
9794
cursorString = EncodeCursor(cursor)
9895
}
@@ -137,24 +134,54 @@ func multiColumnCompare(columns []whereCol) sq.Sqlizer {
137134
if len(columns) == 0 {
138135
return nil
139136
}
140-
clause := sq.Or{}
141-
for i := 0; i < len(columns); i++ {
142-
and := sq.And{}
143-
for j := 0; j <= i; j++ {
144-
col := columns[j]
145-
if j == i {
146-
if col.order == "ASC" {
147-
and = append(and, sq.Gt{col.column: col.value})
148-
} else {
149-
and = append(and, sq.Lt{col.column: col.value})
150-
}
151-
continue
152-
}
153-
and = append(and, sq.Eq{col.column: col.value})
137+
138+
// Determine the comparison operator based on the first column's order.
139+
// This function assumes all columns in the cursor are sorted in the same direction (all ASC or all DESC).
140+
// If mixed orders are present, the simple tuple comparison `(col1, col2) OP (val1, val2)` is semantically incorrect.
141+
op := ""
142+
switch columns[0].order {
143+
case "ASC":
144+
op = ">"
145+
case "DESC":
146+
op = "<"
147+
default:
148+
// This panic indicates an internal inconsistency: `whereCol.order` should only be "ASC" or "DESC".
149+
panic(
150+
fmt.Sprintf(
151+
"multiColumnCompare: invalid order type %q encountered for column %q. Expected 'ASC' or 'DESC'.",
152+
columns[0].order,
153+
columns[0].column,
154+
),
155+
)
156+
}
157+
158+
// Validate that all columns have the same sorting order.
159+
// This is a strict requirement for the simple tuple comparison syntax used here.
160+
for i := 1; i < len(columns); i++ {
161+
if columns[i].order != columns[0].order {
162+
// This panic indicates a misconfiguration in the pagination tags or an invalid cursor.
163+
// Mixed ASC/DESC orders require a more complex WHERE clause (e.g., `(A > X) OR (A = X AND B < Y)`).
164+
panic(
165+
"multiColumnCompare: mixed ASC/DESC orders detected in pagination columns. This function supports only consistent ordering for tuple comparison.",
166+
)
154167
}
155-
clause = append(clause, and)
156168
}
157-
return clause
169+
170+
placeholders := make([]string, len(columns))
171+
for i := range columns {
172+
placeholders[i] = "?"
173+
}
174+
columnNames := lo.Map(columns, func(col whereCol, _ int) string { return col.column })
175+
columnValues := lo.Map(columns, func(col whereCol, _ int) any { return col.value })
176+
177+
return sq.Expr(
178+
fmt.Sprintf(
179+
"(%s) %s (%s)",
180+
strings.Join(columnNames, ", "),
181+
op,
182+
strings.Join(placeholders, ","),
183+
),
184+
columnValues...)
158185
}
159186

160187
func columnAlias(name string) string {
@@ -166,19 +193,14 @@ func columnAlias(name string) string {
166193
return "paginated_" + name
167194
}
168195

169-
func ApplyNoLimit[T any](q sq.SelectBuilder, c Cursor[T]) (sq.SelectBuilder, error) {
170-
c.Limit = 0
171-
return Apply(q, c)
172-
}
173-
174196
func Apply[T any](q sq.SelectBuilder, c Cursor[T]) (sq.SelectBuilder, error) {
175197
if c.Limit > 0 {
176198
q = q.Limit(c.Limit)
177199
}
178200
rt := reflect.TypeOf(c.Columns)
179201
rv := reflect.ValueOf(c.Columns)
180202
columns := []whereCol{}
181-
for ix := 0; ix < rt.NumField(); ix++ {
203+
for ix := range rt.NumField() {
182204
rf := rt.Field(ix)
183205
if !rf.IsExported() {
184206
continue
@@ -191,12 +213,20 @@ func Apply[T any](q sq.SelectBuilder, c Cursor[T]) (sq.SelectBuilder, error) {
191213

192214
tagParts := strings.Split(tag, ",")
193215
if len(tagParts) != 2 {
194-
return q, fmt.Errorf("invalid pagination tag on struct %s, for field %s", rt.Name(), rf.Name)
216+
return q, fmt.Errorf(
217+
"invalid pagination tag on struct %s, for field %s",
218+
rt.Name(),
219+
rf.Name,
220+
)
195221
}
196222

197223
column, order := tagParts[0], strings.ToUpper(tagParts[1])
198224
if order != "ASC" && order != "DESC" {
199-
return q, fmt.Errorf("invalid order in pagination tag on struct %s, for field %s", rt.Name(), rf.Name)
225+
return q, fmt.Errorf(
226+
"invalid order in pagination tag on struct %s, for field %s",
227+
rt.Name(),
228+
rf.Name,
229+
)
200230
}
201231
q = q.OrderBy(column + " " + order).Column(column + " AS " + columnAlias(column))
202232

pkg/auth/permissions.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ var stringPermissionMap = lo.SliceToMap(allPermissions, func(item Permission) (s
7979
return string(item), item
8080
})
8181

82-
func (this Permissions) Fulfills(that Permissions) error {
83-
_, missing := lo.Difference(this, that)
82+
func (have Permissions) Fulfills(want Permissions) error {
83+
_, missing := lo.Difference(have, want)
8484
if len(missing) > 0 {
8585
return fmt.Errorf("missing: %v", missing)
8686
}

pkg/mq/amqp_connection.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ func (c *AMQPConnection) Start() {
5757
c.usersLock.Unlock()
5858
if c.connection != nil {
5959
c.state = AMQP_DISCONNECTED
60-
c.connection.Close()
60+
if err := c.connection.Close(); err != nil {
61+
log.Printf(
62+
"AMQPConnection close returned an error but we're treating this as closed anyways: %s\n",
63+
err.Error(),
64+
)
65+
}
6166
}
6267
log.Println("AMQPConnection stopped")
6368
}()
@@ -109,7 +114,12 @@ func (c *AMQPConnection) Start() {
109114

110115
// Disconnected, so close to be sure
111116
log.Printf("AMQPConnection disconnected\n")
112-
c.connection.Close()
117+
if err := c.connection.Close(); err != nil {
118+
log.Printf(
119+
"AMQPConnection close returned an error but we're treating this as closed anyways: %s\n",
120+
err.Error(),
121+
)
122+
}
113123
}
114124
}
115125

services/core/devices/infra/query_builder.go

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package deviceinfra
22

33
import (
44
"context"
5+
"database/sql"
56
"fmt"
67

78
sq "github.com/Masterminds/squirrel"
89
"github.com/jmoiron/sqlx"
910

11+
"sensorbucket.nl/sensorbucket/internal/dbutils"
1012
"sensorbucket.nl/sensorbucket/internal/pagination"
1113
"sensorbucket.nl/sensorbucket/pkg/auth"
1214
"sensorbucket.nl/sensorbucket/services/core/devices"
@@ -75,7 +77,10 @@ func (b deviceQueryBuilder) WithinRange(r devices.RangeFilter) deviceQueryBuilde
7577
return b
7678
}
7779

78-
func (b deviceQueryBuilder) Query(ctx context.Context, db *sqlx.DB) (*pagination.Page[devices.Device], error) {
80+
func (b deviceQueryBuilder) Query(
81+
ctx context.Context,
82+
db *sqlx.DB,
83+
) (*pagination.Page[devices.Device], error) {
7984
if b.err != nil {
8085
return nil, b.err
8186
}
@@ -92,7 +97,10 @@ func (b deviceQueryBuilder) Query(ctx context.Context, db *sqlx.DB) (*pagination
9297
}
9398
if len(b.filters.Sensor) > 0 {
9499
// Update query here
95-
subQ, subArgs, err := sq.Select("DISTINCT sensor.device_id").From("sensors sensor").Where(sq.Eq{"sensor.id": b.filters.Sensor}).ToSql()
100+
subQ, subArgs, err := sq.Select("DISTINCT sensor.device_id").
101+
From("sensors sensor").
102+
Where(sq.Eq{"sensor.id": b.filters.Sensor}).
103+
ToSql()
96104
if err != nil {
97105
return nil, err
98106
}
@@ -113,30 +121,31 @@ func (b deviceQueryBuilder) Query(ctx context.Context, db *sqlx.DB) (*pagination
113121
if err != nil {
114122
return nil, err
115123
}
116-
defer rows.Close()
117-
118-
deviceModels := []DeviceModel{}
119-
for rows.Next() {
120-
var model DeviceModel
121-
err := rows.Scan(
122-
&model.ID,
123-
&model.Code,
124-
&model.Description,
125-
&model.TenantID,
126-
&model.Properties,
127-
&model.LocationDescription,
128-
&model.Longitude,
129-
&model.Latitude,
130-
&model.Altitude,
131-
&model.State,
132-
&model.CreatedAt,
133-
&b.cursor.Columns.CreatedAt,
134-
&b.cursor.Columns.ID,
135-
)
136-
if err != nil {
137-
return nil, err
138-
}
139-
deviceModels = append(deviceModels, model)
124+
125+
deviceModels, err := dbutils.CollectRows(
126+
rows,
127+
func(row *sql.Rows) (DeviceModel, error) {
128+
var model DeviceModel
129+
err := rows.Scan(
130+
&model.ID,
131+
&model.Code,
132+
&model.Description,
133+
&model.TenantID,
134+
&model.Properties,
135+
&model.LocationDescription,
136+
&model.Longitude,
137+
&model.Latitude,
138+
&model.Altitude,
139+
&model.State,
140+
&model.CreatedAt,
141+
&b.cursor.Columns.CreatedAt,
142+
&b.cursor.Columns.ID,
143+
)
144+
return model, err
145+
},
146+
)
147+
if err != nil {
148+
return nil, fmt.Errorf("devices query builder collecting rows: %w", err)
140149
}
141150

142151
ids := make([]int64, len(deviceModels))

services/core/featuresofinterest/feature_of_interest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type CreateFeatureOfInterestOpts struct {
2929

3030
func NewFeatureOfInterest(opts CreateFeatureOfInterestOpts) (*FeatureOfInterest, error) {
3131
if opts.Name == "" || opts.TenantID == 0 {
32-
return nil, fmt.Errorf("In NewFeatureOfInterest: missing required Name or TenantID")
32+
return nil, fmt.Errorf("in NewFeatureOfInterest: missing required Name or TenantID")
3333
}
3434

3535
var foi FeatureOfInterest

0 commit comments

Comments
 (0)