Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/twmb/franz-go/plugin/kprom v1.3.0
github.com/xmidt-org/httpaux v0.4.2
github.com/xmidt-org/wrp-go/v5 v5.4.0
github.com/xmidt-org/wrpkafka v0.1.1
github.com/xmidt-org/wrpkafka v0.1.2
go.uber.org/fx v1.24.0
gopkg.in/dealancer/validate.v2 v2.1.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ github.com/xmidt-org/wrp-go/v3 v3.7.0 h1:m9ghdq79Zzb0WjomUJ02rzFpI0RK8KTjArYpNIw
github.com/xmidt-org/wrp-go/v3 v3.7.0/go.mod h1:eyMj+q/7LQ4SU6Z3s6VOwuTVSh6/DJBb2soBGBFSung=
github.com/xmidt-org/wrp-go/v5 v5.4.0 h1:9bOO8e3uR+qek9uTmpZRahVJHbz6FM0Y7dLleLZ3MNY=
github.com/xmidt-org/wrp-go/v5 v5.4.0/go.mod h1:SQlOnkDG5HnaziOf83g+/38GQfE2mIInPWP+Wdw3/FE=
github.com/xmidt-org/wrpkafka v0.1.1 h1:Cgm4v1AdU79ocJrKef4VJrWdUIn8/oUeb3cuVPyqrm0=
github.com/xmidt-org/wrpkafka v0.1.1/go.mod h1:OrQDfmzIOESfLCPBQDbC6iOT7pFZLpvxrbL50tQ79v4=
github.com/xmidt-org/wrpkafka v0.1.2 h1:6sKSujPhi40YWgSWwnImuDNw4tnjZHoRD7KsJtiWnPg=
github.com/xmidt-org/wrpkafka v0.1.2/go.mod h1:OrQDfmzIOESfLCPBQDbC6iOT7pFZLpvxrbL50tQ79v4=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/splitter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ producer((replace)):
topic_routes:
- pattern: "*"
topic: "default-events"
hash_key: "source" # Extract device ID from source field
- pattern: "device-status"
topic: "device-status-events"
hash_key: "source" # Extract device ID from source field

buckets:
possible_buckets:
- name: "bucket-0"
threshold: 1.0
target_bucket: "bucket-0"
partition_key_type: "source" # Extract device ID from source field for partitioning
missing_partition_key_action: "include"

consumer((replace)):
brokers:
Expand Down
6 changes: 5 additions & 1 deletion internal/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,15 +405,19 @@ func TestProvideConsumer(t *testing.T) {
{
Topic: "wrp-events",
Pattern: "*",
HashKey: "source",
},
},
}

routes, err := pubConfig.ToWRPKafkaRoutes()
require.NoError(t, err, "Setup should convert routes successfully")

pub, err := publisher.New(
publisher.WithLogEmitter(logEmitter),
publisher.WithMetricsEmitter(metricEmitter),
publisher.WithBrokers(pubConfig.Brokers),
publisher.WithTopicRoutes(pubConfig.ToWRPKafkaRoutes()...),
publisher.WithTopicRoutes(routes...),
)
require.NoError(t, err, "Setup should create publisher successfully")
return pub
Expand Down
3 changes: 2 additions & 1 deletion internal/app/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ buckets:
# List of all bucket names (e.g. "bucket-0", "bucket-1", "bucket-2")
possible_buckets:
- name: "bucket-0"
threshold: 0.5
threshold: 1.0

# Required: Target bucket name to route messages to
target_bucket: "bucket-0"
Expand All @@ -54,6 +54,7 @@ producer:
topic_routes:
- pattern: "*"
topic: "default-events"
hash_key: "metadata/hw-deviceid" # Use hardware device ID from metadata for partitioning
brokers:
restart_on_config_change: false
target_region: us-east-1
Expand Down
5 changes: 4 additions & 1 deletion internal/app/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func providePublisher(in PublisherIn) (PublisherOut, error) {
cfg := in.Config

// Convert config routes to wrpkafka routes
wrpRoutes := cfg.ToWRPKafkaRoutes()
wrpRoutes, err := cfg.ToWRPKafkaRoutes()
if err != nil {
return PublisherOut{}, fmt.Errorf("failed to convert topic routes: %w", err)
}

// Build options from configuration - validation is handled by the option functions
opts := []publisher.Option{
Expand Down
22 changes: 17 additions & 5 deletions internal/publisher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package publisher

import (
"fmt"
"time"

"github.com/xmidt-org/wrpkafka"
Expand Down Expand Up @@ -43,24 +44,35 @@ type Brokers struct {
type TopicRoute struct {
Topic string
Pattern string
HashKey string
}

// ToWRPKafkaRoute converts a TopicRoute to a wrpkafka.TopicRoute
func (tr TopicRoute) ToWRPKafkaRoute() wrpkafka.TopicRoute {
func (tr TopicRoute) ToWRPKafkaRoute() (wrpkafka.TopicRoute, error) {
hashKey, err := wrpkafka.ParseHashKey(tr.HashKey)
if err != nil {
return wrpkafka.TopicRoute{}, fmt.Errorf("failed to parse hash key %q: %w", tr.HashKey, err)
}

route := wrpkafka.TopicRoute{
Topic: tr.Topic,
Pattern: wrpkafka.Pattern(tr.Pattern),
HashKey: hashKey,
}
return route
return route, nil
}

// ToWRPKafkaRoutes converts all TopicRoutes to wrpkafka.TopicRoute slice
func (c Config) ToWRPKafkaRoutes() []wrpkafka.TopicRoute {
func (c Config) ToWRPKafkaRoutes() ([]wrpkafka.TopicRoute, error) {
routes := make([]wrpkafka.TopicRoute, len(c.TopicRoutes))
for i, route := range c.TopicRoutes {
routes[i] = route.ToWRPKafkaRoute()
wrpRoute, err := route.ToWRPKafkaRoute()
if err != nil {
return nil, fmt.Errorf("failed to convert route %d: %w", i, err)
}
routes[i] = wrpRoute
}
return routes
return routes, nil
}

// SASLConfig represents SASL authentication configuration
Expand Down
80 changes: 66 additions & 14 deletions internal/publisher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,20 @@ func (suite *ConfigTestSuite) TestTopicRoute_ToWRPKafkaRoute() {
name string
topicRoute TopicRoute
expected wrpkafka.TopicRoute
expectError bool
description string
}{
{
name: "simple_route",
topicRoute: TopicRoute{
Topic: "events",
Pattern: "event:.*",
HashKey: "source",
},
expected: wrpkafka.TopicRoute{
Topic: "events",
Pattern: wrpkafka.Pattern("event:.*"),
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource},
},
description: "Should convert TopicRoute to wrpkafka.TopicRoute correctly",
},
Expand All @@ -65,31 +68,64 @@ func (suite *ConfigTestSuite) TestTopicRoute_ToWRPKafkaRoute() {
topicRoute: TopicRoute{
Topic: "commands",
Pattern: "mac:.*/command",
HashKey: "metadata/hw-deviceid",
},
expected: wrpkafka.TopicRoute{
Topic: "commands",
Pattern: wrpkafka.Pattern("mac:.*/command"),
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"},
},
description: "Should handle complex routing patterns",
description: "Should handle complex routing patterns with metadata hash key",
},
{
name: "wildcard_route",
topicRoute: TopicRoute{
Topic: "all-messages",
Pattern: ".*",
HashKey: "none",
},
expected: wrpkafka.TopicRoute{
Topic: "all-messages",
Pattern: wrpkafka.Pattern(".*"),
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyNone},
},
description: "Should handle wildcard patterns with no hash key",
},
{
name: "route_with_default_hash_key",
topicRoute: TopicRoute{
Topic: "events",
Pattern: "event:.*",
HashKey: "", // Empty should default to metadata/hw-deviceid
},
expected: wrpkafka.TopicRoute{
Topic: "events",
Pattern: wrpkafka.Pattern("event:.*"),
HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"},
},
description: "Should default to metadata/hw-deviceid when hash_key is empty",
},
{
name: "invalid_hash_key",
topicRoute: TopicRoute{
Topic: "events",
Pattern: "event:.*",
HashKey: "invalid",
},
description: "Should handle wildcard patterns",
expectError: true,
description: "Should return error for invalid hash key",
},
}

for _, tt := range tests {
suite.Run(tt.name, func() {
result := tt.topicRoute.ToWRPKafkaRoute()
suite.Equal(tt.expected, result, tt.description)
result, err := tt.topicRoute.ToWRPKafkaRoute()
if tt.expectError {
suite.Error(err, tt.description)
} else {
suite.NoError(err, tt.description)
suite.Equal(tt.expected, result, tt.description)
}
})
}
}
Expand All @@ -100,33 +136,34 @@ func (suite *ConfigTestSuite) TestConfig_ToWRPKafkaRoutes() {
name string
config Config
expected []wrpkafka.TopicRoute
expectError bool
description string
}{
{
name: "single_route",
config: Config{
TopicRoutes: []TopicRoute{
{Topic: "events", Pattern: "event:.*"},
{Topic: "events", Pattern: "event:.*", HashKey: "source"},
},
},
expected: []wrpkafka.TopicRoute{
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*")},
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource}},
},
description: "Should convert single route correctly",
},
{
name: "multiple_routes",
config: Config{
TopicRoutes: []TopicRoute{
{Topic: "events", Pattern: "event:.*"},
{Topic: "commands", Pattern: "mac:.*/command"},
{Topic: "responses", Pattern: ".*response.*"},
{Topic: "events", Pattern: "event:.*", HashKey: "source"},
{Topic: "commands", Pattern: "mac:.*/command", HashKey: "metadata/hw-deviceid"},
{Topic: "responses", Pattern: ".*response.*", HashKey: "none"},
},
},
expected: []wrpkafka.TopicRoute{
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*")},
{Topic: "commands", Pattern: wrpkafka.Pattern("mac:.*/command")},
{Topic: "responses", Pattern: wrpkafka.Pattern(".*response.*")},
{Topic: "events", Pattern: wrpkafka.Pattern("event:.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource}},
{Topic: "commands", Pattern: wrpkafka.Pattern("mac:.*/command"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"}},
{Topic: "responses", Pattern: wrpkafka.Pattern(".*response.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyNone}},
},
description: "Should convert multiple routes correctly",
},
Expand All @@ -138,12 +175,27 @@ func (suite *ConfigTestSuite) TestConfig_ToWRPKafkaRoutes() {
expected: []wrpkafka.TopicRoute{},
description: "Should handle empty routes slice",
},
{
name: "route_with_invalid_hash_key",
config: Config{
TopicRoutes: []TopicRoute{
{Topic: "events", Pattern: "event:.*", HashKey: "invalid"},
},
},
expectError: true,
description: "Should return error for invalid hash key",
},
}

for _, tt := range tests {
suite.Run(tt.name, func() {
result := tt.config.ToWRPKafkaRoutes()
suite.Equal(tt.expected, result, tt.description)
result, err := tt.config.ToWRPKafkaRoutes()
if tt.expectError {
suite.Error(err, tt.description)
} else {
suite.NoError(err, tt.description)
suite.Equal(tt.expected, result, tt.description)
}
})
}
}
Expand Down
Loading