Skip to content

Commit 08cdcc4

Browse files
authored
Merge pull request #110 from xmidt-org/deviceid
update to latest wrpkafka
2 parents 33d07b3 + 9a63da4 commit 08cdcc4

8 files changed

Lines changed: 107 additions & 25 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/twmb/franz-go/plugin/kprom v1.3.0
2020
github.com/xmidt-org/httpaux v0.4.2
2121
github.com/xmidt-org/wrp-go/v5 v5.4.0
22-
github.com/xmidt-org/wrpkafka v0.1.1
22+
github.com/xmidt-org/wrpkafka v0.1.2
2323
go.uber.org/fx v1.24.0
2424
gopkg.in/dealancer/validate.v2 v2.1.0
2525
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ github.com/xmidt-org/wrp-go/v3 v3.7.0 h1:m9ghdq79Zzb0WjomUJ02rzFpI0RK8KTjArYpNIw
219219
github.com/xmidt-org/wrp-go/v3 v3.7.0/go.mod h1:eyMj+q/7LQ4SU6Z3s6VOwuTVSh6/DJBb2soBGBFSung=
220220
github.com/xmidt-org/wrp-go/v5 v5.4.0 h1:9bOO8e3uR+qek9uTmpZRahVJHbz6FM0Y7dLleLZ3MNY=
221221
github.com/xmidt-org/wrp-go/v5 v5.4.0/go.mod h1:SQlOnkDG5HnaziOf83g+/38GQfE2mIInPWP+Wdw3/FE=
222-
github.com/xmidt-org/wrpkafka v0.1.1 h1:Cgm4v1AdU79ocJrKef4VJrWdUIn8/oUeb3cuVPyqrm0=
223-
github.com/xmidt-org/wrpkafka v0.1.1/go.mod h1:OrQDfmzIOESfLCPBQDbC6iOT7pFZLpvxrbL50tQ79v4=
222+
github.com/xmidt-org/wrpkafka v0.1.2 h1:6sKSujPhi40YWgSWwnImuDNw4tnjZHoRD7KsJtiWnPg=
223+
github.com/xmidt-org/wrpkafka v0.1.2/go.mod h1:OrQDfmzIOESfLCPBQDbC6iOT7pFZLpvxrbL50tQ79v4=
224224
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
225225
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
226226
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=

integration_tests/splitter.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,18 @@ producer((replace)):
2929
topic_routes:
3030
- pattern: "*"
3131
topic: "default-events"
32+
hash_key: "source" # Extract device ID from source field
3233
- pattern: "device-status"
3334
topic: "device-status-events"
35+
hash_key: "source" # Extract device ID from source field
36+
37+
buckets:
38+
possible_buckets:
39+
- name: "bucket-0"
40+
threshold: 1.0
41+
target_bucket: "bucket-0"
42+
partition_key_type: "source" # Extract device ID from source field for partitioning
43+
missing_partition_key_action: "include"
3444

3545
consumer((replace)):
3646
brokers:

internal/app/app_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,15 +405,19 @@ func TestProvideConsumer(t *testing.T) {
405405
{
406406
Topic: "wrp-events",
407407
Pattern: "*",
408+
HashKey: "source",
408409
},
409410
},
410411
}
411412

413+
routes, err := pubConfig.ToWRPKafkaRoutes()
414+
require.NoError(t, err, "Setup should convert routes successfully")
415+
412416
pub, err := publisher.New(
413417
publisher.WithLogEmitter(logEmitter),
414418
publisher.WithMetricsEmitter(metricEmitter),
415419
publisher.WithBrokers(pubConfig.Brokers),
416-
publisher.WithTopicRoutes(pubConfig.ToWRPKafkaRoutes()...),
420+
publisher.WithTopicRoutes(routes...),
417421
)
418422
require.NoError(t, err, "Setup should create publisher successfully")
419423
return pub

internal/app/default-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ buckets:
4343
# List of all bucket names (e.g. "bucket-0", "bucket-1", "bucket-2")
4444
possible_buckets:
4545
- name: "bucket-0"
46-
threshold: 0.5
46+
threshold: 1.0
4747

4848
# Required: Target bucket name to route messages to
4949
target_bucket: "bucket-0"
@@ -54,6 +54,7 @@ producer:
5454
topic_routes:
5555
- pattern: "*"
5656
topic: "default-events"
57+
hash_key: "metadata/hw-deviceid" # Use hardware device ID from metadata for partitioning
5758
brokers:
5859
restart_on_config_change: false
5960
target_region: us-east-1

internal/app/publisher.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ func providePublisher(in PublisherIn) (PublisherOut, error) {
3333
cfg := in.Config
3434

3535
// Convert config routes to wrpkafka routes
36-
wrpRoutes := cfg.ToWRPKafkaRoutes()
36+
wrpRoutes, err := cfg.ToWRPKafkaRoutes()
37+
if err != nil {
38+
return PublisherOut{}, fmt.Errorf("failed to convert topic routes: %w", err)
39+
}
3740

3841
// Build options from configuration - validation is handled by the option functions
3942
opts := []publisher.Option{

internal/publisher/config.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package publisher
55

66
import (
7+
"fmt"
78
"time"
89

910
"github.com/xmidt-org/wrpkafka"
@@ -43,24 +44,35 @@ type Brokers struct {
4344
type TopicRoute struct {
4445
Topic string
4546
Pattern string
47+
HashKey string
4648
}
4749

4850
// ToWRPKafkaRoute converts a TopicRoute to a wrpkafka.TopicRoute
49-
func (tr TopicRoute) ToWRPKafkaRoute() wrpkafka.TopicRoute {
51+
func (tr TopicRoute) ToWRPKafkaRoute() (wrpkafka.TopicRoute, error) {
52+
hashKey, err := wrpkafka.ParseHashKey(tr.HashKey)
53+
if err != nil {
54+
return wrpkafka.TopicRoute{}, fmt.Errorf("failed to parse hash key %q: %w", tr.HashKey, err)
55+
}
56+
5057
route := wrpkafka.TopicRoute{
5158
Topic: tr.Topic,
5259
Pattern: wrpkafka.Pattern(tr.Pattern),
60+
HashKey: hashKey,
5361
}
54-
return route
62+
return route, nil
5563
}
5664

5765
// ToWRPKafkaRoutes converts all TopicRoutes to wrpkafka.TopicRoute slice
58-
func (c Config) ToWRPKafkaRoutes() []wrpkafka.TopicRoute {
66+
func (c Config) ToWRPKafkaRoutes() ([]wrpkafka.TopicRoute, error) {
5967
routes := make([]wrpkafka.TopicRoute, len(c.TopicRoutes))
6068
for i, route := range c.TopicRoutes {
61-
routes[i] = route.ToWRPKafkaRoute()
69+
wrpRoute, err := route.ToWRPKafkaRoute()
70+
if err != nil {
71+
return nil, fmt.Errorf("failed to convert route %d: %w", i, err)
72+
}
73+
routes[i] = wrpRoute
6274
}
63-
return routes
75+
return routes, nil
6476
}
6577

6678
// SASLConfig represents SASL authentication configuration

internal/publisher/config_test.go

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,20 @@ func (suite *ConfigTestSuite) TestTopicRoute_ToWRPKafkaRoute() {
4646
name string
4747
topicRoute TopicRoute
4848
expected wrpkafka.TopicRoute
49+
expectError bool
4950
description string
5051
}{
5152
{
5253
name: "simple_route",
5354
topicRoute: TopicRoute{
5455
Topic: "events",
5556
Pattern: "event:.*",
57+
HashKey: "source",
5658
},
5759
expected: wrpkafka.TopicRoute{
5860
Topic: "events",
5961
Pattern: wrpkafka.Pattern("event:.*"),
62+
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource},
6063
},
6164
description: "Should convert TopicRoute to wrpkafka.TopicRoute correctly",
6265
},
@@ -65,31 +68,64 @@ func (suite *ConfigTestSuite) TestTopicRoute_ToWRPKafkaRoute() {
6568
topicRoute: TopicRoute{
6669
Topic: "commands",
6770
Pattern: "mac:.*/command",
71+
HashKey: "metadata/hw-deviceid",
6872
},
6973
expected: wrpkafka.TopicRoute{
7074
Topic: "commands",
7175
Pattern: wrpkafka.Pattern("mac:.*/command"),
76+
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"},
7277
},
73-
description: "Should handle complex routing patterns",
78+
description: "Should handle complex routing patterns with metadata hash key",
7479
},
7580
{
7681
name: "wildcard_route",
7782
topicRoute: TopicRoute{
7883
Topic: "all-messages",
7984
Pattern: ".*",
85+
HashKey: "none",
8086
},
8187
expected: wrpkafka.TopicRoute{
8288
Topic: "all-messages",
8389
Pattern: wrpkafka.Pattern(".*"),
90+
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyNone},
91+
},
92+
description: "Should handle wildcard patterns with no hash key",
93+
},
94+
{
95+
name: "route_with_default_hash_key",
96+
topicRoute: TopicRoute{
97+
Topic: "events",
98+
Pattern: "event:.*",
99+
HashKey: "", // Empty should default to metadata/hw-deviceid
100+
},
101+
expected: wrpkafka.TopicRoute{
102+
Topic: "events",
103+
Pattern: wrpkafka.Pattern("event:.*"),
104+
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"},
105+
},
106+
description: "Should default to metadata/hw-deviceid when hash_key is empty",
107+
},
108+
{
109+
name: "invalid_hash_key",
110+
topicRoute: TopicRoute{
111+
Topic: "events",
112+
Pattern: "event:.*",
113+
HashKey: "invalid",
84114
},
85-
description: "Should handle wildcard patterns",
115+
expectError: true,
116+
description: "Should return error for invalid hash key",
86117
},
87118
}
88119

89120
for _, tt := range tests {
90121
suite.Run(tt.name, func() {
91-
result := tt.topicRoute.ToWRPKafkaRoute()
92-
suite.Equal(tt.expected, result, tt.description)
122+
result, err := tt.topicRoute.ToWRPKafkaRoute()
123+
if tt.expectError {
124+
suite.Error(err, tt.description)
125+
} else {
126+
suite.NoError(err, tt.description)
127+
suite.Equal(tt.expected, result, tt.description)
128+
}
93129
})
94130
}
95131
}
@@ -100,33 +136,34 @@ func (suite *ConfigTestSuite) TestConfig_ToWRPKafkaRoutes() {
100136
name string
101137
config Config
102138
expected []wrpkafka.TopicRoute
139+
expectError bool
103140
description string
104141
}{
105142
{
106143
name: "single_route",
107144
config: Config{
108145
TopicRoutes: []TopicRoute{
109-
{Topic: "events", Pattern: "event:.*"},
146+
{Topic: "events", Pattern: "event:.*", HashKey: "source"},
110147
},
111148
},
112149
expected: []wrpkafka.TopicRoute{
113-
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*")},
150+
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource}},
114151
},
115152
description: "Should convert single route correctly",
116153
},
117154
{
118155
name: "multiple_routes",
119156
config: Config{
120157
TopicRoutes: []TopicRoute{
121-
{Topic: "events", Pattern: "event:.*"},
122-
{Topic: "commands", Pattern: "mac:.*/command"},
123-
{Topic: "responses", Pattern: ".*response.*"},
158+
{Topic: "events", Pattern: "event:.*", HashKey: "source"},
159+
{Topic: "commands", Pattern: "mac:.*/command", HashKey: "metadata/hw-deviceid"},
160+
{Topic: "responses", Pattern: ".*response.*", HashKey: "none"},
124161
},
125162
},
126163
expected: []wrpkafka.TopicRoute{
127-
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*")},
128-
{Topic: "commands", Pattern: wrpkafka.Pattern("mac:.*/command")},
129-
{Topic: "responses", Pattern: wrpkafka.Pattern(".*response.*")},
164+
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource}},
165+
{Topic: "commands", Pattern: wrpkafka.Pattern("mac:.*/command"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"}},
166+
{Topic: "responses", Pattern: wrpkafka.Pattern(".*response.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyNone}},
130167
},
131168
description: "Should convert multiple routes correctly",
132169
},
@@ -138,12 +175,27 @@ func (suite *ConfigTestSuite) TestConfig_ToWRPKafkaRoutes() {
138175
expected: []wrpkafka.TopicRoute{},
139176
description: "Should handle empty routes slice",
140177
},
178+
{
179+
name: "route_with_invalid_hash_key",
180+
config: Config{
181+
TopicRoutes: []TopicRoute{
182+
{Topic: "events", Pattern: "event:.*", HashKey: "invalid"},
183+
},
184+
},
185+
expectError: true,
186+
description: "Should return error for invalid hash key",
187+
},
141188
}
142189

143190
for _, tt := range tests {
144191
suite.Run(tt.name, func() {
145-
result := tt.config.ToWRPKafkaRoutes()
146-
suite.Equal(tt.expected, result, tt.description)
192+
result, err := tt.config.ToWRPKafkaRoutes()
193+
if tt.expectError {
194+
suite.Error(err, tt.description)
195+
} else {
196+
suite.NoError(err, tt.description)
197+
suite.Equal(tt.expected, result, tt.description)
198+
}
147199
})
148200
}
149201
}

0 commit comments

Comments
 (0)