feat: chat channel routing for broker plugins#283
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements chat channel routing by adding Channel and ThreadID fields to StructuredMessage and store.Message, allowing targeted delivery through specific broker plugins instead of fanning out to all. It updates the FanOutBroker to handle channel-aware routing, adds CLI flags to target channels, and introduces a new hub API endpoint to list registered channels. The review feedback identifies a critical bug where default messages from the Web UI or CLI (which default to the "web" channel) will fail to publish because "web" is not a registered broker plugin. Additionally, the reviewer notes missing validation for outbound messages in handleAgentOutboundMessage, warns that channel flags are silently ignored in local mode, and suggests adding a test case for "web" channel routing.
| if msg.Channel != "" { | ||
| var matched bool | ||
| for _, nb := range f.brokers { |
There was a problem hiding this comment.
Critical Bug: Web UI and Default CLI Messages Will Fail to Publish
In handleAgentMessage, if a message is sent from the Web UI or the CLI without an explicit --channel flag, the channel is automatically defaulted to "web".
However, since "web" is not a registered broker plugin, FanOutBroker.Publish() will fail to find a matching broker and return a no broker registered for channel "web" error. This completely breaks message delivery from the Web UI and the default CLI.
To fix this, we should initialize matched to true if the channel is "web" or "inprocess", as these are internal/local-only channels that are handled by the InProcessBroker and do not require an external broker plugin.
if msg.Channel != "" {
var matched bool
if msg.Channel == "web" || msg.Channel == InProcessBrokerName {
matched = true
}
for _, nb := range f.brokers {|
|
||
| // Route through broker when available; otherwise persist and publish |
There was a problem hiding this comment.
Missing Validation for Outbound Messages
In handleAgentOutboundMessage, the incoming req.Channel and req.ThreadID are copied directly into the structured message and published/persisted without validation.
If an agent sends an outbound message with an invalid channel name (e.g., containing spaces or exceeding the maximum length) or a ThreadID without a Channel, it will bypass the validation rules defined in StructuredMessage.Validate().
We should validate the structuredMsg before publishing or persisting it, returning a ValidationError if it fails.
if err := structuredMsg.Validate(); err != nil {
ValidationError(w, "invalid structured message: "+err.Error(), nil)
return
}
// Route through broker when available; otherwise persist and publish| // Validate --thread-id requires --channel | ||
| if msgThreadID != "" && msgChannel == "" { | ||
| return fmt.Errorf("--thread-id requires --channel to be set") | ||
| } |
There was a problem hiding this comment.
Usability: --channel and --thread-id are Silently Ignored in Local Mode
When running scion message in local mode (Hub disabled), the CLI directly delivers the message via tmux using plain text. In this mode, --channel and --thread-id are silently ignored, which can mislead users into thinking their message is being routed to a specific channel.
We should validate that --channel and --thread-id require Hub mode, similar to how --notify and --wake are validated. Since hubCtx is resolved later in the command execution, this check should be added right before the local mode execution path (around where --wake is checked).
| if !strings.Contains(err.Error(), "telegram") { | ||
| t.Fatalf("error should mention channel name, got: %v", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Add Test Case for "web" Channel Routing
To prevent regressions and verify that the "web" channel is correctly routed to the InProcessBroker without returning an error or fanning out to other brokers, we should add a dedicated test case.
}
func TestFanOutBroker_ChannelRoutingWeb(t *testing.T) {
inproc := newStubBroker()
telegram := newStubBroker()
fan := NewFanOutBroker([]NamedBroker{
{Name: InProcessBrokerName, Broker: inproc},
{Name: "telegram", Broker: telegram},
}, slog.Default())
msg := messages.NewInstruction("user:alice", "agent:bot", "hello")
msg.Channel = "web"
if err := fan.Publish(context.Background(), "test.topic", msg); err != nil {
t.Fatalf("unexpected error: %v", err)
}
inproc.mu.Lock()
if len(inproc.published) != 1 {
t.Errorf("inprocess broker: expected 1 message, got %d", len(inproc.published))
}
inproc.mu.Unlock()
telegram.mu.Lock()
if len(telegram.published) != 0 {
t.Errorf("telegram broker: expected 0 messages, got %d", len(telegram.published))
}
telegram.mu.Unlock()
}39847ca to
cddae16
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements chat channel routing, allowing messages to be directed to specific broker plugins (such as Telegram or Google Chat) and maintaining thread context using new Channel and ThreadID fields. It updates the core message schemas, the CLI, the Hub API, and the event bus to support channel-aware routing. Feedback on the implementation highlights a critical bug in FanOutEventBus.Publish where targeting the 'inprocess' channel directly results in a 'no broker registered' error, alongside a performance concern regarding synchronous publishing blocking the caller on slow external RPC calls.
| if msg.Channel != "" { | ||
| var matched bool | ||
| for _, nb := range f.buses { | ||
| if nb.Name == InProcessBusName { | ||
| if err := nb.Bus.Publish(ctx, topic, msg); err != nil { | ||
| return fmt.Errorf("inprocess bus publish failed: %w", err) | ||
| } | ||
| continue | ||
| } | ||
| if nb.Name == msg.Channel { | ||
| matched = true | ||
| if err := nb.Bus.Publish(ctx, topic, msg); err != nil { | ||
| return fmt.Errorf("channel %q publish failed: %w", msg.Channel, err) | ||
| } | ||
| } | ||
| } | ||
| if !matched { | ||
| return fmt.Errorf("no broker registered for channel %q", msg.Channel) | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Publishing channel-specific messages synchronously can block the caller (such as HTTP handlers) on slow external RPC calls (e.g., Telegram/GChat plugins). Additionally, if msg.Channel is set to "inprocess", the continue statement inside the loop will bypass setting matched = true, causing the publish to incorrectly fail with a "no broker registered" error.
Refactoring this to publish concurrently using a sync.WaitGroup (similar to the default fan-out path) resolves both the blocking issue and the "inprocess" channel bug.
if msg.Channel != "" {
var matched bool
var wg sync.WaitGroup
var targets []NamedEventBus
for _, nb := range f.buses {
if nb.Name == InProcessBusName {
targets = append(targets, nb)
if msg.Channel == InProcessBusName {
matched = true
}
} else if nb.Name == msg.Channel {
matched = true
targets = append(targets, nb)
}
}
var errs []error
if len(targets) > 0 {
errs = make([]error, len(targets))
for i, nb := range targets {
wg.Add(1)
go func(idx int, b NamedEventBus) {
defer wg.Done()
if err := b.Bus.Publish(ctx, topic, msg); err != nil {
f.log.Error("channel publish failed",
"bus", b.Name, "topic", topic, "error", err)
if !b.Observer {
errs[idx] = err
}
}
}(i, nb)
}
wg.Wait()
}
if !matched {
return fmt.Errorf("no broker registered for channel %q", msg.Channel)
}
return errors.Join(errs...)
}|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements chat channel routing by adding Channel and ThreadID fields to StructuredMessage and store.Message, allowing targeted delivery of messages to specific broker plugins instead of fanning out to all. It updates the Telegram and Chat App plugins to support these fields, adds a new hub API endpoint and CLI subcommand to list available channels, and introduces channel-aware routing in FanOutEventBus. Feedback on the changes highlights two issues: first, channel-targeted routing in FanOutEventBus fails to respect the Observer flag, meaning observer channel publish errors could unexpectedly bubble up; second, the handleAgentOutboundMessage handler lacks validation for the newly constructed StructuredMessage, potentially allowing malformed messages to bypass the new validation rules.
| if inproc != nil { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| if err := inproc.Bus.Publish(ctx, topic, msg); err != nil { | ||
| errs[0] = fmt.Errorf("inprocess bus publish failed: %w", err) | ||
| } | ||
| }() | ||
| } | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| if err := target.Bus.Publish(ctx, topic, msg); err != nil { | ||
| errs[1] = fmt.Errorf("channel %q publish failed: %w", msg.Channel, err) | ||
| } | ||
| }() |
There was a problem hiding this comment.
When performing channel-targeted routing, the Publish method does not respect the Observer flag of the target event bus. Observer channels are designed to be fire-and-forget, meaning their publish errors should be logged but not returned to the caller (as implemented in the general fan-out path). Currently, if the targeted channel is an observer and its publish fails, the error is returned, which can cause the entire publish operation to fail unexpectedly.
if inproc != nil {
wg.Add(1)
go func() {
defer wg.Done()
if err := inproc.Bus.Publish(ctx, topic, msg); err != nil {
f.log.Error("inprocess publish failed", "topic", topic, "error", err)
if !inproc.Observer {
errs[0] = fmt.Errorf("inprocess bus publish failed: %w", err)
}
}
}()
}
wg.Add(1)
go func() {
defer wg.Done()
if err := target.Bus.Publish(ctx, topic, msg); err != nil {
f.log.Error("channel publish failed", "channel", msg.Channel, "topic", topic, "error", err)
if !target.Observer {
errs[1] = fmt.Errorf("channel %q publish failed: %w", msg.Channel, err)
}
}
}()| Attachments: req.Attachments, | ||
| Visibility: req.Visibility, | ||
| Metadata: req.Metadata, | ||
| Channel: req.Channel, | ||
| ThreadID: req.ThreadID, | ||
| } |
There was a problem hiding this comment.
The handleAgentOutboundMessage handler does not validate the newly constructed StructuredMessage before publishing it. Since new validation rules have been introduced (e.g., ThreadID requiring Channel, and Channel name format constraints), it is important to call structuredMsg.Validate() here to prevent malformed messages from being published or persisted.
Attachments: req.Attachments,
Visibility: req.Visibility,
Metadata: req.Metadata,
Channel: req.Channel,
ThreadID: req.ThreadID,
}
if err := structuredMsg.Validate(); err != nil {
ValidationError(w, err.Error(), nil)
return
}Add channel-aware message routing so broker plugins (Telegram, Google Chat, etc.) can tag messages with a channel name and optional thread ID. The FanOutEventBus routes channel-tagged messages only to the matching bus and the in-process bus, while untagged messages fan out to all buses as before. Includes CLI --channel/--thread-id flags, a /message-channels API endpoint, and client library support.
…rocess channel - Channel-targeted messages now publish to InProcessBus and the matched channel concurrently (via goroutines), preventing slow external RPC calls from blocking HTTP handlers - Reject msg.Channel="inprocess" with a clear error — inprocess is a reserved internal bus, not a user-visible channel - Unmatched channels now fail fast before publishing to any bus - Add test for reserved inprocess channel rejection
…utbound messages - Channel-targeted publish now checks target.Observer: observer channel errors are logged but not returned, consistent with the fan-out path - Add StructuredMessage.Validate() call in handleAgentOutboundMessage before publishing, catching invalid Channel/ThreadID combinations - Add test for observer error suppression in channel-targeted path
Summary
Implements chat channel routing for the message broker system, allowing messages to be directed to specific broker plugins instead of fan-out to all. Closes ptone#113.
ChannelandThreadIDfields toStructuredMessage,deliveryMessage, andstore.Messagewith validation (channel format, thread requires channel)FanOutBroker.Publish()now routes to the matching broker plugin whenChannelis set, while preserving fan-out behavior when empty. InProcessBroker always receives for local dispatch. Errors on unmatched channels.Channel: "telegram"andThreadIDfrom forum topics. Chat-app setsChannel: "gchat". Web UI defaults toChannel: "web". Broker-log logs channel/thread fields.--channeland--thread-idflags toscion message. Addedscion message channelssubcommand andGET /api/v1/message-channelsendpoint for channel discovery.Design doc
See
.design/chat-channel-routing.mdfor full design rationale and phased implementation plan.Commits
ae49d05— Core message schema (Channel/ThreadID fields + validation + tests)8c6e467— Channel-aware routing in FanOutBroker (+ 4 routing tests)1e7e1f0— Inbound channel tagging (telegram, gchat, web, broker-log)cb0952b— CLI flags, message-channels API, channels subcommandTest plan
pkg/messages,pkg/broker,pkg/hub,pkg/store,pkg/hubclient)go build ./...compiles cleanlyscion message channelslists registered broker pluginsscion message agent:x "msg" --channel telegramroutes only to telegram pluginchannel: "telegram"in delivery