-
Notifications
You must be signed in to change notification settings - Fork 11
Add kafka connectors (advanced users only) #363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThis pull request introduces a complete Kafka connector implementation for DSYNC ADIOM, adding both source and destination connectors with plan generation, stateful cursor management, and streaming capabilities. It includes the Sarama Kafka client library and integrates the new connectors into the CLI flag system. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Kafka as Kafka Broker
participant Source as Kafka Source<br/>Connector
participant Consumer as Sarama<br/>Consumer
Client->>Source: GeneratePlan(namespaces)
Source->>Consumer: Get partition offsets
Consumer->>Kafka: FetchMetadata(topics)
Kafka-->>Consumer: Partition list & offsets
Consumer-->>Source: Offsets per partition
Source-->>Client: UpdatesPartition entries<br/>(with encoded Cursor)
Note over Client,Source: Streaming Phase
Client->>Source: StreamUpdates(Cursor,<br/>namespaces)
Source->>Consumer: Consume from<br/>partition @ offset
Consumer->>Kafka: FetchMessages(topic,<br/>partition, offset)
Kafka-->>Consumer: ConsumerMessage batch
Consumer-->>Source: Message stream
Source->>Source: Decode Update via<br/>messageToUpdate
Source->>Source: Filter by namespace
Source-->>Client: Update + Namespace +<br/>NextCursor
Note over Source: Repeat until<br/>cancellation
sequenceDiagram
participant Client
participant Dest as Kafka Destination<br/>Connector
participant Producer as Sarama<br/>Producer
participant Kafka as Kafka Broker
Client->>Dest: WriteUpdates(updates)
Dest->>Dest: Marshal Update to<br/>protobuf
Dest->>Dest: Extract namespace from<br/>Update metadata
Dest->>Dest: Build message key<br/>from update IDs
Dest->>Dest: Map namespace to<br/>topic (or use default)
Dest->>Producer: Send(topic, key,<br/>protobuf value)
Producer->>Kafka: ProduceMessage
Kafka-->>Producer: Ack
Producer-->>Dest: Confirmation
Dest-->>Client: WriteUpdatesResponse
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Important Action Needed: IP Allowlist UpdateIf your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:
Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@connectors/kafka/dest.go`:
- Around line 119-154: The WriteUpdates handler (destConn.WriteUpdates)
currently may construct sarama.ProducerMessage with an empty Topic when
d.defaultTopic is empty and ns isn't in d.namespaceToTopic; add validation
before creating messages to ensure topic is non-empty (check the resolved topic
variable after choosing namespaceToTopic or defaultTopic) and return a clear
connect.NewError(connect.CodeInvalidArgument or CodeInternal) if it's empty so
SendMessages is never called with a ProducerMessage lacking Topic; update any
error branches to include context (e.g., mentioning namespace and that no topic
was configured) to help debugging.
In `@go.mod`:
- Line 36: Update the Weaviate dependency to a patched release by replacing the
current module version "github.com/weaviate/weaviate v1.31.3" with v1.31.19 or
later in go.mod, then run the Go toolchain (e.g., go get
github.com/weaviate/weaviate@v1.31.19 and go mod tidy) and run tests/build to
validate; if you cannot upgrade immediately, apply the temporary mitigations
mentioned: disable backup modules and/or set the environment variable
REPLICA_MOVEMENT_ENABLED=false until the dependency is updated.
In `@internal/app/options/connectorflags.go`:
- Around line 681-699: Fail fast when kafka-src has no topics/mappings and when
data-type is invalid: check that topics or topicMappings (variables topics and
topicMappings) are not both empty and return an error if so, and validate the
string lookup into adiomv1.DataType_value before casting to adiomv1.DataType
(variable dataType) — if the key is missing, return a descriptive error rather
than silently defaulting to zero; keep building tm from topicMappings and topics
as before but perform these validations early in the function handling connector
flags (referencing topics, topicMappings, tm, dataType, and
adiomv1.DataType_value).
- Around line 632-638: The code silently maps the CLI value
c.String("data-type") via adiomv1.DataType_value which yields the zero enum for
unknown strings; change the logic around dataType so you first lookup the raw
string (e.g., s := c.String("data-type")), check presence in
adiomv1.DataType_value (ok := adiomv1.DataType_value[s]) and if not present
return/propagate an explicit error indicating an invalid data-type rather than
constructing adiomv1.DataType(0); only convert to adiomv1.DataType when the
lookup succeeds (assigning to dataType) so invalid CLI input is rejected with a
clear error.
🧹 Nitpick comments (2)
connectors/kafka/kafka.go (2)
186-254: Advance the cursor even when namespace‑filtered messages are skipped.
Skipping sends for filtered/empty namespaces means checkpoints don’t advance; restarts will replay those messages and can stall progress if nothing matches. Consider emitting a heartbeat response carryingNextCursorwhen a message is skipped.Proposed adjustment
- if _, ok := namespaceMap[ns]; !ok && len(namespaceMap) > 0 { - continue - } - if ns == "" { - continue - } + if _, ok := namespaceMap[ns]; (!ok && len(namespaceMap) > 0) || ns == "" { + if err := s.Send(&adiomv1.StreamLSNResponse{ + NextCursor: encodedNextCursor, + Lsn: lsn, + }); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return connect.NewError(connect.CodeInternal, err) + } + continue + }
256-322: Advance the cursor when updates are filtered or nil.
If a message is skipped (namespace filter) ormessageToUpdatereturns nil, the cursor never advances, so a restart replays those messages. Consider sending aStreamUpdatesResponsewith onlyNextCursor(no updates) to move the checkpoint forward.Proposed adjustment
- if _, ok := namespaceMap[namespace]; !ok && len(namespaceMap) > 0 { - continue - } - if update != nil { - // TODO: maybe batch it up - if err := s.Send(&adiomv1.StreamUpdatesResponse{ - Updates: []*adiomv1.Update{update}, - Namespace: namespace, - NextCursor: encodedNextCursor, - }); err != nil { - if errors.Is(err, context.Canceled) { - return nil - } - return connect.NewError(connect.CodeInternal, err) - } - } + if _, ok := namespaceMap[namespace]; !ok && len(namespaceMap) > 0 { + if err := s.Send(&adiomv1.StreamUpdatesResponse{ + Namespace: namespace, + NextCursor: encodedNextCursor, + }); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return connect.NewError(connect.CodeInternal, err) + } + continue + } + if update == nil { + if err := s.Send(&adiomv1.StreamUpdatesResponse{ + Namespace: namespace, + NextCursor: encodedNextCursor, + }); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return connect.NewError(connect.CodeInternal, err) + } + continue + } + // TODO: maybe batch it up + if err := s.Send(&adiomv1.StreamUpdatesResponse{ + Updates: []*adiomv1.Update{update}, + Namespace: namespace, + NextCursor: encodedNextCursor, + }); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return connect.NewError(connect.CodeInternal, err) + }
| func key(id []*adiomv1.BsonValue) []byte { | ||
| var res []byte | ||
| for _, part := range id { | ||
| res = append(res, part.GetData()...) | ||
| } | ||
| return res | ||
| } | ||
|
|
||
| // WriteUpdates implements [adiomv1connect.ConnectorServiceHandler]. | ||
| func (d *destConn) WriteUpdates(ctx context.Context, r *connect.Request[adiomv1.WriteUpdatesRequest]) (*connect.Response[adiomv1.WriteUpdatesResponse], error) { | ||
| topic := d.defaultTopic | ||
| ns := r.Msg.GetNamespace() | ||
| header := []sarama.RecordHeader{{Key: []byte("ns"), Value: []byte(ns)}} | ||
| if namespaceTopic, ok := d.namespaceToTopic[ns]; ok { | ||
| topic = namespaceTopic | ||
| } | ||
| var messages []*sarama.ProducerMessage | ||
| for _, update := range r.Msg.GetUpdates() { | ||
| b, err := proto.Marshal(update) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err marshalling proto: %w", err)) | ||
| } | ||
| msg := &sarama.ProducerMessage{ | ||
| Topic: topic, | ||
| Key: sarama.ByteEncoder(key(update.GetId())), | ||
| Value: sarama.ByteEncoder(b), | ||
| Headers: header, | ||
| Partition: -1, | ||
| } | ||
| messages = append(messages, msg) | ||
| } | ||
| if err := d.producer.SendMessages(messages); err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err sending message: %w", err)) | ||
| } | ||
| return connect.NewResponse(&adiomv1.WriteUpdatesResponse{}), nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
Sarama ProducerMessage Topic empty string requirement error
💡 Result:
In Sarama (Go Kafka client), a producer message must have a non-empty topic name. If you send a ProducerMessage with Topic: "" (or you end up calling internal metadata refresh with an empty topic), Sarama will fail with an “empty topic” / ErrInvalidTopic-style error because it explicitly rejects empty topic strings. [1], [2]
Fix: ensure every produced message sets ProducerMessage.Topic to a real topic name (Sarama does not infer a default topic for you):
msg := &sarama.ProducerMessage{
Topic: "my_topic",
Value: sarama.StringEncoder("hello"),
}
_, _, err := producer.SendMessage(msg)Also verify any config/env/templating you use to build the topic isn’t producing "" at runtime (e.g., missing env var, empty string after trim, etc.). [1]
Sources:
[1] Sarama SyncProducer example using ProducerMessage{Topic: "my_topic", ...} (Go package docs) (pkg.go.dev)
[2] Sarama RefreshMetadata rejects empty topic and returns ErrInvalidTopic (Sarama client code) (gerrit.opencord.org)
Citations:
- 1: https://pkg.go.dev/github.com/ContextLogic/sarama?utm_source=openai
- 2: https://gerrit.opencord.org/plugins/gitiles/voltha-go/%2B/4a2a53ed5757f6293fa41be2d5d67edbaeab5b56/vendor/github.com/Shopify/sarama/client.go?utm_source=openai
🏁 Script executed:
# Search for how defaultTopic and namespaceToTopic are initialized
rg -A 3 -B 3 "defaultTopic|namespaceToTopic" connectors/kafka/Repository: adiom-data/dsync
Length of output: 1865
🏁 Script executed:
# Look for other topic validation patterns in the codebase
rg "topic.*=.*\"\"" connectors/kafka/ -A 3Repository: adiom-data/dsync
Length of output: 42
🏁 Script executed:
# Check if there's any topic validation or initialization code
rg "Topic" connectors/kafka/dest.go -B 2 -A 2Repository: adiom-data/dsync
Length of output: 1234
Add validation to ensure topic is configured before publishing.
Sarama's ProducerMessage requires a non-empty Topic field. If defaultTopic is empty and the namespace is not mapped in namespaceToTopic, the topic variable remains empty, causing a runtime error when SendMessages() is called. Add a guard to fail fast with a clear error message.
Proposed fix
topic := d.defaultTopic
ns := r.Msg.GetNamespace()
header := []sarama.RecordHeader{{Key: []byte("ns"), Value: []byte(ns)}}
if namespaceTopic, ok := d.namespaceToTopic[ns]; ok {
topic = namespaceTopic
}
+ if topic == "" {
+ return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("kafka topic is required"))
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func key(id []*adiomv1.BsonValue) []byte { | |
| var res []byte | |
| for _, part := range id { | |
| res = append(res, part.GetData()...) | |
| } | |
| return res | |
| } | |
| // WriteUpdates implements [adiomv1connect.ConnectorServiceHandler]. | |
| func (d *destConn) WriteUpdates(ctx context.Context, r *connect.Request[adiomv1.WriteUpdatesRequest]) (*connect.Response[adiomv1.WriteUpdatesResponse], error) { | |
| topic := d.defaultTopic | |
| ns := r.Msg.GetNamespace() | |
| header := []sarama.RecordHeader{{Key: []byte("ns"), Value: []byte(ns)}} | |
| if namespaceTopic, ok := d.namespaceToTopic[ns]; ok { | |
| topic = namespaceTopic | |
| } | |
| var messages []*sarama.ProducerMessage | |
| for _, update := range r.Msg.GetUpdates() { | |
| b, err := proto.Marshal(update) | |
| if err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err marshalling proto: %w", err)) | |
| } | |
| msg := &sarama.ProducerMessage{ | |
| Topic: topic, | |
| Key: sarama.ByteEncoder(key(update.GetId())), | |
| Value: sarama.ByteEncoder(b), | |
| Headers: header, | |
| Partition: -1, | |
| } | |
| messages = append(messages, msg) | |
| } | |
| if err := d.producer.SendMessages(messages); err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err sending message: %w", err)) | |
| } | |
| return connect.NewResponse(&adiomv1.WriteUpdatesResponse{}), nil | |
| } | |
| func key(id []*adiomv1.BsonValue) []byte { | |
| var res []byte | |
| for _, part := range id { | |
| res = append(res, part.GetData()...) | |
| } | |
| return res | |
| } | |
| // WriteUpdates implements [adiomv1connect.ConnectorServiceHandler]. | |
| func (d *destConn) WriteUpdates(ctx context.Context, r *connect.Request[adiomv1.WriteUpdatesRequest]) (*connect.Response[adiomv1.WriteUpdatesResponse], error) { | |
| topic := d.defaultTopic | |
| ns := r.Msg.GetNamespace() | |
| header := []sarama.RecordHeader{{Key: []byte("ns"), Value: []byte(ns)}} | |
| if namespaceTopic, ok := d.namespaceToTopic[ns]; ok { | |
| topic = namespaceTopic | |
| } | |
| if topic == "" { | |
| return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("kafka topic is required")) | |
| } | |
| var messages []*sarama.ProducerMessage | |
| for _, update := range r.Msg.GetUpdates() { | |
| b, err := proto.Marshal(update) | |
| if err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err marshalling proto: %w", err)) | |
| } | |
| msg := &sarama.ProducerMessage{ | |
| Topic: topic, | |
| Key: sarama.ByteEncoder(key(update.GetId())), | |
| Value: sarama.ByteEncoder(b), | |
| Headers: header, | |
| Partition: -1, | |
| } | |
| messages = append(messages, msg) | |
| } | |
| if err := d.producer.SendMessages(messages); err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err sending message: %w", err)) | |
| } | |
| return connect.NewResponse(&adiomv1.WriteUpdatesResponse{}), nil | |
| } |
🤖 Prompt for AI Agents
In `@connectors/kafka/dest.go` around lines 119 - 154, The WriteUpdates handler
(destConn.WriteUpdates) currently may construct sarama.ProducerMessage with an
empty Topic when d.defaultTopic is empty and ns isn't in d.namespaceToTopic; add
validation before creating messages to ensure topic is non-empty (check the
resolved topic variable after choosing namespaceToTopic or defaultTopic) and
return a clear connect.NewError(connect.CodeInvalidArgument or CodeInternal) if
it's empty so SendMessages is never called with a ProducerMessage lacking Topic;
update any error branches to include context (e.g., mentioning namespace and
that no topic was configured) to help debugging.
| github.com/stretchr/testify v1.11.1 | ||
| github.com/tryvium-travels/memongo v0.12.0 | ||
| github.com/urfave/cli/v2 v2.27.7 | ||
| github.com/weaviate/weaviate v1.31.3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
GO-2025-4237 GO-2025-4238 Weaviate path traversal vulnerability patched version
💡 Result:
For both GO-2025-4237 (CVE-2025-67818, Backup ZipSlip) and GO-2025-4238 (CVE-2025-67819, Shard Movement API), the first patched Weaviate releases are:
- v1.30.20 (fixes all v1.30.0–v1.30.19)
- v1.31.19 (fixes all v1.31.0-rc.0–v1.31.18)
- v1.32.16 (fixes all v1.32.0-rc.0–v1.32.15)
- v1.33.4 (fixes all v1.33.0-rc.0–v1.33.3)
Sources: Go vuln entries / OSV for GO-2025-4237 and GO-2025-4238 [1] [2].
References
🌐 Web query:
GHSA-7v39-2hx7-7c43 GHSA-hmmh-292h-3364 Weaviate security advisory
💡 Result:
Weaviate security advisories
GHSA-7v39-2hx7-7c43 (CVE-2025-67818) — Backup restore “ZipSlip” path traversal
- Impact: An attacker who can insert data can craft backup entry names with absolute paths or
../traversal so that, when a backup is restored, files may be created/overwritten outside the restore root (within the Weaviate process privileges). [1] - Affected:
< 1.30.20;>= 1.31.0-rc.0, < 1.31.19;>= 1.32.0-rc.0, < 1.32.16;>= 1.33.0-rc.0, < 1.33.4. [1] - Fix: Upgrade to 1.30.20 / 1.31.19 / 1.32.16 / 1.33.4 (as applicable). [1]
GHSA-hmmh-292h-3364 (CVE-2025-67819) — Shard Movement API path traversal (arbitrary file read)
- Impact: Due to missing validation of a
fileNamefield in shard transfer logic, an attacker who can callGetFile(while a shard is in “Pause file activity” state and the FileReplicationService is reachable) may read arbitrary files accessible to the Weaviate service process. [2] - Affected:
>= 1.30.0, < 1.30.20;>= 1.31.0-rc.0, < 1.31.19;>= 1.32.0-rc.0, < 1.32.16;>= 1.33.0-rc.0, < 1.33.4. [2] - Fix: Upgrade to 1.30.20 / 1.31.19 / 1.32.16 / 1.33.4 (as applicable). [2]
Practical mitigations (if you can’t upgrade immediately)
- Restrict who can trigger backup restore and who can access shard movement / replication endpoints; ensure the FileReplicationService is not reachable from untrusted networks. [1][2]
Sources: GitHub Advisory Database pages for GHSA-7v39-2hx7-7c43 and GHSA-hmmh-292h-3364. [1][2]
🌐 Web query:
Weaviate v1.31.3 security vulnerabilities path traversal
💡 Result:
Weaviate v1.31.3 is affected by the two known path traversal issues disclosed in December 2025 (because it is < 1.31.19):
- CVE-2025-67818 (High, CVSS 7.2) – Backup restore ZipSlip/path traversal: during backup restore, crafted paths (e.g.,
../../..or absolute paths, including via symlinks) can escape the intended restore directory and create/overwrite arbitrary files within the Weaviate process’ privileges. Fixed in 1.31.19. [1][2] - CVE-2025-67819 (Medium, CVSS 4.9) – Shard Movement “GetFile” path traversal: insufficient validation of
fileNamecan allow an attacker who can call GetFile (under specific shard state / service reachability conditions) to read arbitrary files accessible to the Weaviate process. Fixed in 1.31.19. [1][3]
Remediation (recommended): upgrade from 1.31.3 → 1.31.19+. [2][3]
Mitigations (if you can’t upgrade immediately): disable backup modules (remove backup* from enabled_modules) and/or disable shard movement API by setting REPLICA_MOVEMENT_ENABLED=false. [1]
Sources
[1] Weaviate security release (Nov 7, 2025) – CVE-2025-67818 / CVE-2025-67819 details and mitigations: (weaviate.io)
[2] GitLab Advisory DB – CVE-2025-67818 affected/fixed versions (fix: 1.31.19): (advisories.gitlab.com)
[3] GitLab Advisory DB – CVE-2025-67819 affected/fixed versions (fix: 1.31.19): (advisories.gitlab.com)
Citations:
- 1: https://weaviate.io/blog/weaviate-security-release-november-2025
- 2: https://advisories.gitlab.com/pkg/golang/github.com/weaviate/weaviate/CVE-2025-67818/?utm_source=openai
- 3: https://advisories.gitlab.com/pkg/golang/github.com/weaviate/weaviate/CVE-2025-67819/?utm_source=openai
Upgrade Weaviate to v1.31.19 or later to fix known path-traversal vulnerabilities.
v1.31.3 is affected by two high-severity path-traversal vulnerabilities:
- CVE-2025-67818 (CVSS 7.2): Backup restore ZipSlip allows arbitrary file creation/overwriting within process privileges
- CVE-2025-67819 (CVSS 4.9): Shard Movement API missing validation allows arbitrary file read
Upgrade to v1.31.19 or later to patch both issues. If immediate upgrade is not feasible, disable backup modules and/or set REPLICA_MOVEMENT_ENABLED=false as temporary mitigations.
🧰 Tools
🪛 OSV Scanner (2.3.2)
[HIGH] 36-36: github.com/weaviate/weaviate 1.31.3: Weaviate OSS has a Path Traversal Vulnerability via Backup ZipSlip in github.com/weaviate/weaviate
(GO-2025-4237)
[HIGH] 36-36: github.com/weaviate/weaviate 1.31.3: Weaviate OSS has path traversal vulnerability via the Shard Movement API in github.com/weaviate/weaviate
(GO-2025-4238)
[HIGH] 36-36: github.com/weaviate/weaviate 1.31.3: Weaviate OSS has a Path Traversal Vulnerability via Backup ZipSlip
[HIGH] 36-36: github.com/weaviate/weaviate 1.31.3: Weaviate OSS has path traversal vulnerability via the Shard Movement API
🤖 Prompt for AI Agents
In `@go.mod` at line 36, Update the Weaviate dependency to a patched release by
replacing the current module version "github.com/weaviate/weaviate v1.31.3" with
v1.31.19 or later in go.mod, then run the Go toolchain (e.g., go get
github.com/weaviate/weaviate@v1.31.19 and go mod tidy) and run tests/build to
validate; if you cannot upgrade immediately, apply the temporary mitigations
mentioned: disable backup modules and/or set the environment variable
REPLICA_MOVEMENT_ENABLED=false until the dependency is updated.
| brokers := c.StringSlice("brokers") | ||
| defaultTopic := c.String("default-topic") | ||
| namespaceTopic := c.StringSlice("namespace-topic") | ||
| user := c.String("sasl-user") | ||
| password := c.String("sasl-password") | ||
| dataType := adiomv1.DataType(adiomv1.DataType_value[c.String("data-type")]) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate data-type for kafka‑dst instead of silently defaulting.
DataType_value[...] returns zero for unknown strings, which can mask invalid CLI input. Consider validating and returning an explicit error.
Proposed fix
- dataType := adiomv1.DataType(adiomv1.DataType_value[c.String("data-type")])
+ dataTypeStr := c.String("data-type")
+ dataTypeVal, ok := adiomv1.DataType_value[dataTypeStr]
+ if !ok {
+ return nil, fmt.Errorf("invalid data-type %q", dataTypeStr)
+ }
+ dataType := adiomv1.DataType(dataTypeVal)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| brokers := c.StringSlice("brokers") | |
| defaultTopic := c.String("default-topic") | |
| namespaceTopic := c.StringSlice("namespace-topic") | |
| user := c.String("sasl-user") | |
| password := c.String("sasl-password") | |
| dataType := adiomv1.DataType(adiomv1.DataType_value[c.String("data-type")]) | |
| brokers := c.StringSlice("brokers") | |
| defaultTopic := c.String("default-topic") | |
| namespaceTopic := c.StringSlice("namespace-topic") | |
| user := c.String("sasl-user") | |
| password := c.String("sasl-password") | |
| dataTypeStr := c.String("data-type") | |
| dataTypeVal, ok := adiomv1.DataType_value[dataTypeStr] | |
| if !ok { | |
| return nil, fmt.Errorf("invalid data-type %q", dataTypeStr) | |
| } | |
| dataType := adiomv1.DataType(dataTypeVal) |
🤖 Prompt for AI Agents
In `@internal/app/options/connectorflags.go` around lines 632 - 638, The code
silently maps the CLI value c.String("data-type") via adiomv1.DataType_value
which yields the zero enum for unknown strings; change the logic around dataType
so you first lookup the raw string (e.g., s := c.String("data-type")), check
presence in adiomv1.DataType_value (ok := adiomv1.DataType_value[s]) and if not
present return/propagate an explicit error indicating an invalid data-type
rather than constructing adiomv1.DataType(0); only convert to adiomv1.DataType
when the lookup succeeds (assigning to dataType) so invalid CLI input is
rejected with a clear error.
| brokers := c.StringSlice(prefixDash + "brokers") | ||
| topics := c.StringSlice(prefixDash + "topics") | ||
| topicMappings := c.StringSlice(prefixDash + "topic-mappings") | ||
| user := c.String(prefixDash + "sasl-user") | ||
| password := c.String(prefixDash + "sasl-password") | ||
| kafkaOffset := c.Int64(prefixDash + "offset") | ||
| dataType := adiomv1.DataType(adiomv1.DataType_value[c.String(prefixDash+"data-type")]) | ||
|
|
||
| tm := map[string][]string{} | ||
| for _, topic := range topics { | ||
| tm[topic] = nil | ||
| } | ||
| for _, m := range topicMappings { | ||
| topic, ns, ok := strings.Cut(m, ":") | ||
| if !ok { | ||
| return nil, fmt.Errorf("invalid topic mapping %v", m) | ||
| } | ||
| tm[topic] = append(tm[topic], ns) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail fast on invalid data‑type and empty topic configuration for kafka‑src.
If no topics or mappings are provided, the source plan is empty; and invalid data-type silently defaults to zero.
Proposed fix
- dataType := adiomv1.DataType(adiomv1.DataType_value[c.String(prefixDash+"data-type")])
+ dataTypeStr := c.String(prefixDash + "data-type")
+ dataTypeVal, ok := adiomv1.DataType_value[dataTypeStr]
+ if !ok {
+ return nil, fmt.Errorf("invalid data-type %q", dataTypeStr)
+ }
+ dataType := adiomv1.DataType(dataTypeVal)
+
+ if len(topics) == 0 && len(topicMappings) == 0 {
+ return nil, fmt.Errorf("at least one topic or topic-mapping is required")
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| brokers := c.StringSlice(prefixDash + "brokers") | |
| topics := c.StringSlice(prefixDash + "topics") | |
| topicMappings := c.StringSlice(prefixDash + "topic-mappings") | |
| user := c.String(prefixDash + "sasl-user") | |
| password := c.String(prefixDash + "sasl-password") | |
| kafkaOffset := c.Int64(prefixDash + "offset") | |
| dataType := adiomv1.DataType(adiomv1.DataType_value[c.String(prefixDash+"data-type")]) | |
| tm := map[string][]string{} | |
| for _, topic := range topics { | |
| tm[topic] = nil | |
| } | |
| for _, m := range topicMappings { | |
| topic, ns, ok := strings.Cut(m, ":") | |
| if !ok { | |
| return nil, fmt.Errorf("invalid topic mapping %v", m) | |
| } | |
| tm[topic] = append(tm[topic], ns) | |
| } | |
| brokers := c.StringSlice(prefixDash + "brokers") | |
| topics := c.StringSlice(prefixDash + "topics") | |
| topicMappings := c.StringSlice(prefixDash + "topic-mappings") | |
| user := c.String(prefixDash + "sasl-user") | |
| password := c.String(prefixDash + "sasl-password") | |
| kafkaOffset := c.Int64(prefixDash + "offset") | |
| dataTypeStr := c.String(prefixDash + "data-type") | |
| dataTypeVal, ok := adiomv1.DataType_value[dataTypeStr] | |
| if !ok { | |
| return nil, fmt.Errorf("invalid data-type %q", dataTypeStr) | |
| } | |
| dataType := adiomv1.DataType(dataTypeVal) | |
| if len(topics) == 0 && len(topicMappings) == 0 { | |
| return nil, fmt.Errorf("at least one topic or topic-mapping is required") | |
| } | |
| tm := map[string][]string{} | |
| for _, topic := range topics { | |
| tm[topic] = nil | |
| } | |
| for _, m := range topicMappings { | |
| topic, ns, ok := strings.Cut(m, ":") | |
| if !ok { | |
| return nil, fmt.Errorf("invalid topic mapping %v", m) | |
| } | |
| tm[topic] = append(tm[topic], ns) | |
| } |
🤖 Prompt for AI Agents
In `@internal/app/options/connectorflags.go` around lines 681 - 699, Fail fast
when kafka-src has no topics/mappings and when data-type is invalid: check that
topics or topicMappings (variables topics and topicMappings) are not both empty
and return an error if so, and validate the string lookup into
adiomv1.DataType_value before casting to adiomv1.DataType (variable dataType) —
if the key is missing, return a descriptive error rather than silently
defaulting to zero; keep building tm from topicMappings and topics as before but
perform these validations early in the function handling connector flags
(referencing topics, topicMappings, tm, dataType, and adiomv1.DataType_value).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@connectors/mongo/conn.go`:
- Around line 192-196: When delegating GeneratePlan to the Kafka connector (the
call to c.settings.KafkaConn.GeneratePlan inside the same method that currently
wraps errors as connect.NewError(connect.CodeInternal, err)), preserve original
connect.Error codes by detecting if the returned error is a *connect.Error and
returning it directly; only wrap non-*connect.Error errors into
connect.NewError(connect.CodeInternal, err). Mirror the behavior used by
StreamLSN and StreamUpdates when they delegate errors to the underlying
connector.
In `@internal/app/options/connectorflags.go`:
- Around line 686-711: The ParseKafkaSrcFlags function must validate that the
brokers slice is non-empty and return a clear error if it is empty; add a check
after brokers := c.StringSlice(prefixDash + "brokers") (and after computing
prefixDash) that returns an error like "missing required brokers for kafka
source" when len(brokers) == 0, so callers fail fast before calling
kafka.NewKafkaConn.
connectors/mongo/conn.go
Outdated
| if c.settings.KafkaConn != nil { | ||
| kafkaPlanResp, err := c.settings.KafkaConn.GeneratePlan(ctx, r) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, find the file and examine the relevant code
find . -name "conn.go" -path "*/mongo/*" | head -5Repository: adiom-data/dsync
Length of output: 86
🏁 Script executed:
# Check the file size and read the relevant section
if [ -f "connectors/mongo/conn.go" ]; then
wc -l connectors/mongo/conn.go
sed -n '185,205p' connectors/mongo/conn.go
fiRepository: adiom-data/dsync
Length of output: 892
🏁 Script executed:
# Check imports at the top of the file to see what packages are used
if [ -f "connectors/mongo/conn.go" ]; then
head -30 connectors/mongo/conn.go
fiRepository: adiom-data/dsync
Length of output: 780
🏁 Script executed:
# Look for GeneratePlan definition and other error handling patterns
rg "GeneratePlan" --type go -B 2 -A 5Repository: adiom-data/dsync
Length of output: 43210
🏁 Script executed:
# Check if errors.As is used elsewhere in the file
rg "errors\.As" connectors/mongo/conn.goRepository: adiom-data/dsync
Length of output: 132
🏁 Script executed:
# Check the KafkaConn interface definition to understand what GeneratePlan returns
rg "KafkaConn" --type go -A 3 | head -50Repository: adiom-data/dsync
Length of output: 2326
🏁 Script executed:
# Look for the connect.Error type and how it's used in the codebase
rg "connect\.Error" --type go | head -20Repository: adiom-data/dsync
Length of output: 153
🏁 Script executed:
# Check if there are similar patterns of wrapping errors that should preserve types
rg "NewError.*CodeInternal" --type go -B 2 | head -40Repository: adiom-data/dsync
Length of output: 2376
🏁 Script executed:
# Check if there are other places delegating to KafkaConn and how they handle errors
rg "KafkaConn\." connectors/mongo/conn.go -B 1 -A 3Repository: adiom-data/dsync
Length of output: 549
🏁 Script executed:
# Check if there are similar error delegation patterns elsewhere
rg "\.GeneratePlan\(" --type go -A 2 | grep -A 2 "CodeInternal"Repository: adiom-data/dsync
Length of output: 264
Preserve Kafka connector error codes when delegating GeneratePlan.
Wrapping the error as CodeInternal hides meaningful statuses like InvalidArgument from the Kafka connector. Return the original *connect.Error when present, similar to how StreamLSN and StreamUpdates delegate directly without wrapping.
🔧 Suggested fix
- kafkaPlanResp, err := c.settings.KafkaConn.GeneratePlan(ctx, r)
- if err != nil {
- return nil, connect.NewError(connect.CodeInternal, err)
- }
+ kafkaPlanResp, err := c.settings.KafkaConn.GeneratePlan(ctx, r)
+ if err != nil {
+ var cerr *connect.Error
+ if errors.As(err, &cerr) {
+ return nil, cerr
+ }
+ return nil, connect.NewError(connect.CodeInternal, err)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if c.settings.KafkaConn != nil { | |
| kafkaPlanResp, err := c.settings.KafkaConn.GeneratePlan(ctx, r) | |
| if err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, err) | |
| } | |
| if c.settings.KafkaConn != nil { | |
| kafkaPlanResp, err := c.settings.KafkaConn.GeneratePlan(ctx, r) | |
| if err != nil { | |
| var cerr *connect.Error | |
| if errors.As(err, &cerr) { | |
| return nil, cerr | |
| } | |
| return nil, connect.NewError(connect.CodeInternal, err) | |
| } |
🤖 Prompt for AI Agents
In `@connectors/mongo/conn.go` around lines 192 - 196, When delegating
GeneratePlan to the Kafka connector (the call to
c.settings.KafkaConn.GeneratePlan inside the same method that currently wraps
errors as connect.NewError(connect.CodeInternal, err)), preserve original
connect.Error codes by detecting if the returned error is a *connect.Error and
returning it directly; only wrap non-*connect.Error errors into
connect.NewError(connect.CodeInternal, err). Mirror the behavior used by
StreamLSN and StreamUpdates when they delegate errors to the underlying
connector.
| func ParseKafkaSrcFlags(prefix string, c *cli.Context) (adiomv1connect.ConnectorServiceHandler, error) { | ||
| prefixDash := prefix | ||
| if prefix != "" { | ||
| prefixDash = prefix + "-" | ||
| } | ||
|
|
||
| brokers := c.StringSlice(prefixDash + "brokers") | ||
| topics := c.StringSlice(prefixDash + "topics") | ||
| topicMappings := c.StringSlice(prefixDash + "topic-mappings") | ||
| user := c.String(prefixDash + "sasl-user") | ||
| password := c.String(prefixDash + "sasl-password") | ||
| kafkaOffset := c.Int64(prefixDash + "offset") | ||
| dataType := adiomv1.DataType(adiomv1.DataType_value[c.String(prefixDash+"data-type")]) | ||
|
|
||
| tm := map[string][]string{} | ||
| for _, topic := range topics { | ||
| tm[topic] = nil | ||
| } | ||
| for _, m := range topicMappings { | ||
| topic, ns, ok := strings.Cut(m, ":") | ||
| if !ok { | ||
| return nil, fmt.Errorf("invalid topic mapping %v", m) | ||
| } | ||
| tm[topic] = append(tm[topic], ns) | ||
| } | ||
| return kafka.NewKafkaConn(brokers, tm, kafka.DsyncMessageToUpdate, kafka.DsyncMessageToNamespace, user, password, kafkaOffset, dataType), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate brokers is provided for Kafka source.
If brokers is empty, the connector will fail later with a less actionable error; fail fast in flag parsing.
🔧 Suggested fix
brokers := c.StringSlice(prefixDash + "brokers")
+ if len(brokers) == 0 {
+ return nil, fmt.Errorf("at least one broker is required")
+ }🤖 Prompt for AI Agents
In `@internal/app/options/connectorflags.go` around lines 686 - 711, The
ParseKafkaSrcFlags function must validate that the brokers slice is non-empty
and return a clear error if it is empty; add a check after brokers :=
c.StringSlice(prefixDash + "brokers") (and after computing prefixDash) that
returns an error like "missing required brokers for kafka source" when
len(brokers) == 0, so callers fail fast before calling kafka.NewKafkaConn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@connectors/kafka/kafka.go`:
- Around line 194-207: After calling DecodeStreamCursor (which returns cursor),
validate the decoded cursor before attempting to consume: ensure cursor.Topic is
not empty (and optionally validate cursor.Partition/Offset if relevant) and if
invalid return connect.NewError(connect.CodeInvalidArgument, ...) with a clear
message; update both occurrences that call DecodeStreamCursor (the block around
ConsumePartition and the similar block later) to perform this check and fail
fast rather than letting sarama produce an internal error.
In `@internal/app/options/connectorflags.go`:
- Around line 632-646: Validate Kafka destination flags early by returning an
error if no brokers are provided or if neither a default topic nor any
namespace-topic mappings exist: after reading brokers, defaultTopic and
namespaceTopic (and after parsing namespaceTopic into tm), check len(brokers) ==
0 and len(defaultTopic) == 0 && len(tm) == 0 and return a descriptive error
(e.g., "kafka-dst requires at least one broker and either a default-topic or
namespace-topic mappings"). Keep this logic next to the existing variables
(brokers, defaultTopic, namespaceTopic, tm) so flag parsing fails fast before
proceeding to Sarama/WriteUpdates.
| cursor, err := DecodeStreamCursor(r.Msg.GetCursor()) | ||
| if err != nil { | ||
| return connect.NewError(connect.CodeInternal, err) | ||
| } | ||
|
|
||
| cfg := k.NewConfig() | ||
| consumer, err := sarama.NewConsumer(k.brokers, cfg) | ||
| if err != nil { | ||
| return connect.NewError(connect.CodeInternal, err) | ||
| } | ||
| defer consumer.Close() | ||
|
|
||
| pc, err := consumer.ConsumePartition(cursor.Topic, cursor.Partition, cursor.Offset) | ||
| if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate decoded cursor before consuming a partition.
An empty cursor yields an empty topic and a confusing internal error; fail fast with InvalidArgument.
🛠️ Proposed fix
cursor, err := DecodeStreamCursor(r.Msg.GetCursor())
if err != nil {
return connect.NewError(connect.CodeInternal, err)
}
+ if cursor.Topic == "" {
+ return connect.NewError(connect.CodeInvalidArgument, errors.New("cursor topic is required"))
+ }
@@
cursor, err := DecodeStreamCursor(r.Msg.GetCursor())
if err != nil {
return connect.NewError(connect.CodeInternal, err)
}
+ if cursor.Topic == "" {
+ return connect.NewError(connect.CodeInvalidArgument, errors.New("cursor topic is required"))
+ }Also applies to: 264-277
🤖 Prompt for AI Agents
In `@connectors/kafka/kafka.go` around lines 194 - 207, After calling
DecodeStreamCursor (which returns cursor), validate the decoded cursor before
attempting to consume: ensure cursor.Topic is not empty (and optionally validate
cursor.Partition/Offset if relevant) and if invalid return
connect.NewError(connect.CodeInvalidArgument, ...) with a clear message; update
both occurrences that call DecodeStreamCursor (the block around ConsumePartition
and the similar block later) to perform this check and fail fast rather than
letting sarama produce an internal error.
| brokers := c.StringSlice("brokers") | ||
| defaultTopic := c.String("default-topic") | ||
| namespaceTopic := c.StringSlice("namespace-topic") | ||
| user := c.String("sasl-user") | ||
| password := c.String("sasl-password") | ||
| dataType := adiomv1.DataType(adiomv1.DataType_value[c.String("data-type")]) | ||
|
|
||
| tm := map[string]string{} | ||
| for _, m := range namespaceTopic { | ||
| ns, topic, ok := strings.Cut(m, ":") | ||
| if !ok { | ||
| return nil, fmt.Errorf("invalid topic mapping %v", m) | ||
| } | ||
| tm[ns] = topic | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail fast when brokers or topics are missing for kafka‑dst.
It’s clearer to validate this in flag parsing than to fail deeper in Sarama/WriteUpdates.
✅ Proposed fix
brokers := c.StringSlice("brokers")
defaultTopic := c.String("default-topic")
namespaceTopic := c.StringSlice("namespace-topic")
user := c.String("sasl-user")
password := c.String("sasl-password")
+ if len(brokers) == 0 {
+ return nil, fmt.Errorf("at least one broker is required")
+ }
+ if defaultTopic == "" && len(namespaceTopic) == 0 {
+ return nil, fmt.Errorf("default-topic or namespace-topic is required")
+ }🤖 Prompt for AI Agents
In `@internal/app/options/connectorflags.go` around lines 632 - 646, Validate
Kafka destination flags early by returning an error if no brokers are provided
or if neither a default topic nor any namespace-topic mappings exist: after
reading brokers, defaultTopic and namespaceTopic (and after parsing
namespaceTopic into tm), check len(brokers) == 0 and len(defaultTopic) == 0 &&
len(tm) == 0 and return a descriptive error (e.g., "kafka-dst requires at least
one broker and either a default-topic or namespace-topic mappings"). Keep this
logic next to the existing variables (brokers, defaultTopic, namespaceTopic, tm)
so flag parsing fails fast before proceeding to Sarama/WriteUpdates.
Summary by CodeRabbit