Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,13 @@ func runGateway() {
// Channel manager
channelMgr := channels.NewManager(msgBus)

// Wire WhatsApp group lister for HTTP API
if channelInstancesH != nil {
adapter := &waGroupListerAdapter{mgr: channelMgr}
channelInstancesH.SetWhatsAppGroupLister(adapter)
channelInstancesH.SetWhatsAppGroupRefresher(adapter)
}

// Wire channel sender + tenant checker on message tool (now that channelMgr exists)
if t, ok := toolsReg.Get("message"); ok {
if cs, ok := t.(tools.ChannelSenderAware); ok {
Expand Down
40 changes: 40 additions & 0 deletions cmd/gateway_channels_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/config"
"github.com/nextlevelbuilder/goclaw/internal/gateway"
"github.com/nextlevelbuilder/goclaw/internal/gateway/methods"
httpapi "github.com/nextlevelbuilder/goclaw/internal/http"
"github.com/nextlevelbuilder/goclaw/internal/store"
"github.com/nextlevelbuilder/goclaw/pkg/protocol"
)
Expand Down Expand Up @@ -262,3 +263,42 @@ func wireChannelEventSubscribers(
})
}
}

// waGroupListerAdapter adapts the channels.Manager to the http.WhatsAppGroupLister interface.
// This avoids the HTTP package depending on the channels package.
type waGroupListerAdapter struct {
mgr *channels.Manager
}

func (a *waGroupListerAdapter) ListWhatsAppGroups(channelName string) []httpapi.WhatsAppGroupInfo {
ch, ok := a.mgr.GetChannel(channelName)
if !ok {
return nil
}
provider, ok := ch.(interface{ GetCachedGroupsRaw() []whatsapp.WAGroupDiscovery })
if !ok {
return nil
}
raw := provider.GetCachedGroupsRaw()
result := make([]httpapi.WhatsAppGroupInfo, len(raw))
for i, g := range raw {
result[i] = httpapi.WhatsAppGroupInfo{
JID: g.JID,
Name: g.Name,
MemberCount: g.MemberCount,
}
}
return result
}

func (a *waGroupListerAdapter) RefreshWhatsAppGroups(channelName string) {
ch, ok := a.mgr.GetChannel(channelName)
if !ok {
return
}
provider, ok := ch.(interface{ RefreshGroups() })
if !ok {
return
}
provider.RefreshGroups()
}
6 changes: 2 additions & 4 deletions cmd/gateway_consumer_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,14 @@ func handleSubagentAnnounce(
// Enqueue into producer-consumer queue using tenant-scoped key from routing.
q, isProcessor := enqueueSubagentAnnounce(queueKey, entry)
if isProcessor {
deps.BgWg.Add(1)
go func() {
defer deps.BgWg.Done()
deps.BgWg.Go(func() {
defer safego.Recover(nil, "component", "subagent_announce_loop", "session", sessionKey)

// Fetch live roster for merged announce context.
roster := deps.SubagentMgr.RosterForParent(parentAgent)

processSubagentAnnounceLoop(ctx, q, routing, roster, deps.SubagentMgr, deps.Sched, deps.MsgBus, deps.Cfg)
}()
})
}

return true
Expand Down
123 changes: 123 additions & 0 deletions internal/channels/whatsapp/commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package whatsapp

import (
"fmt"
"log/slog"
"strings"

"go.mau.fi/whatsmeow/proto/waE2E"
"go.mau.fi/whatsmeow/types"

"github.com/nextlevelbuilder/goclaw/internal/bus"
)

const (
// Text command triggers (WhatsApp has no native slash commands).
cmdMenu = "#menu"
cmdHelp = "#help"
cmdStop = "#stop"
cmdStopAll = "#stopall"
cmdReset = "#reset"
)

// isCommand checks if the given text is a recognized command trigger.
// Commands use "#" prefix. Case-insensitive, matches first whitespace-delimited token.
func isCommand(text string) bool {
if len(text) == 0 || text[0] != '#' {
return false
}
token := strings.ToLower(strings.SplitN(text, " ", 2)[0])
switch token {
case cmdMenu, cmdHelp, cmdStop, cmdStopAll, cmdReset:
return true
default:
return false
}
}

// handleCommand processes a recognized text command (e.g. "#menu", "#stop").
// Returns true if the message was fully handled and should not reach the agent pipeline.
func (c *Channel) handleCommand(text, senderID, chatID, peerKind string) bool {
token := strings.ToLower(strings.SplitN(text, " ", 2)[0])

switch token {
case cmdMenu, cmdHelp:
chatJID, err := types.ParseJID(chatID)
if err != nil {
slog.Warn("whatsapp: menu: invalid chat JID", "chat_id", chatID, "error", err)
return true
}
c.sendMenu(chatJID)
return true

case cmdStop:
c.publishCommand("stop", senderID, chatID, peerKind)
return true

case cmdStopAll:
c.publishCommand("stopall", senderID, chatID, peerKind)
return true

case cmdReset:
c.publishCommand("reset", senderID, chatID, peerKind)
chatJID, err := types.ParseJID(chatID)
if err == nil {
c.sendTextMessage(chatJID, "Conversation history has been reset.")
}
return true

default:
return false
}
}

// sendMenu sends a text-based command menu.
// WhatsApp linked devices (whatsmeow) cannot send interactive ListMessage —
// WhatsApp's server silently drops them. Text-based commands are the reliable approach.
func (c *Channel) sendMenu(chatJID types.JID) {
menuText := "*GoClaw Menu*\n\n" +
"#menu / #help — Show this menu\n" +
"#stop — Stop current running task\n" +
"#stopall — Stop all running tasks\n" +
"#reset — Reset conversation history\n\n" +
"Just send a message to chat with the AI."
c.sendTextMessage(chatJID, menuText)
}

// sendTextMessage sends a plain text WhatsApp message.
func (c *Channel) sendTextMessage(chatJID types.JID, text string) {
if c.client == nil || !c.client.IsConnected() {
slog.Warn("whatsapp: cannot send text, not connected")
return
}
msg := &waE2E.Message{
Conversation: new(text),
}
if _, err := c.client.SendMessage(c.ctx, chatJID, msg); err != nil {
slog.Warn("whatsapp: failed to send text message", "error", err)
}
}

// publishCommand publishes a command as an InboundMessage with command metadata,
// following the same pattern as Telegram's /stop, /stopall, /reset handlers.
// The gateway consumer (handleResetCommand, handleStopCommand) processes these.
func (c *Channel) publishCommand(command, senderID, chatID, peerKind string) {
userID := senderID
if idx := strings.IndexByte(senderID, '|'); idx > 0 {
userID = senderID[:idx]
}

c.Bus().PublishInbound(bus.InboundMessage{
Channel: c.Name(),
SenderID: senderID,
ChatID: chatID,
Content: fmt.Sprintf("/%s", command),
PeerKind: peerKind,
AgentID: c.AgentID(),
UserID: userID,
TenantID: c.TenantID(),
Metadata: map[string]string{
"command": command,
},
})
}
2 changes: 2 additions & 0 deletions internal/channels/whatsapp/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type whatsappInstanceConfig struct {
HistoryLimit int `json:"history_limit,omitempty"`
AllowFrom []string `json:"allow_from,omitempty"`
BlockReply *bool `json:"block_reply,omitempty"`
Groups map[string]*config.WhatsAppGroupConfig `json:"groups,omitempty"`
}

// FactoryWithDB returns a ChannelFactory with DB access for whatsmeow auth state.
Expand Down Expand Up @@ -58,6 +59,7 @@ func FactoryWithDB(db *sql.DB, pendingStore store.PendingMessageStore, dialect s
RequireMention: ic.RequireMention,
HistoryLimit: ic.HistoryLimit,
BlockReply: ic.BlockReply,
Groups: ic.Groups,
}
// DB instances default to "pairing" for groups (secure by default).
if waCfg.GroupPolicy == "" {
Expand Down
17 changes: 16 additions & 1 deletion internal/channels/whatsapp/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func (c *Channel) handleIncomingMessage(evt *events.Message) {
slog.Info("whatsapp group message rejected by policy", "sender_id", senderID, "chat_id", chatID, "policy", c.config.GroupPolicy)
return
}
// Per-group enabled check.
if gc, ok := c.config.Groups[chatID]; ok && gc != nil {
if gc.Enabled != nil && !*gc.Enabled {
slog.Debug("whatsapp group disabled by config", "chat_id", chatID)
return
}
}
}

if !c.IsAllowed(senderID) {
Expand Down Expand Up @@ -158,6 +165,14 @@ func (c *Channel) handleIncomingMessage(evt *events.Message) {
userID = senderID[:idx]
}

// Resolve agent: per-group override or instance default.
agentID := c.AgentID()
if peerKind == "group" {
if gc, ok := c.config.Groups[chatID]; ok && gc != nil && gc.AgentID != "" {
agentID = gc.AgentID
}
}

c.Bus().PublishInbound(bus.InboundMessage{
Channel: c.Name(),
SenderID: senderID,
Expand All @@ -166,7 +181,7 @@ func (c *Channel) handleIncomingMessage(evt *events.Message) {
Media: mediaFiles,
PeerKind: peerKind,
UserID: userID,
AgentID: c.AgentID(),
AgentID: agentID,
TenantID: c.TenantID(),
Metadata: metadata,
})
Expand Down
13 changes: 6 additions & 7 deletions internal/channels/whatsapp/mention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"go.mau.fi/whatsmeow/proto/waE2E"
"go.mau.fi/whatsmeow/types"
"go.mau.fi/whatsmeow/types/events"
"google.golang.org/protobuf/proto"
)

func TestIsMentioned(t *testing.T) {
Expand All @@ -15,7 +14,7 @@ func TestIsMentioned(t *testing.T) {
return &events.Message{
Message: &waE2E.Message{
ExtendedTextMessage: &waE2E.ExtendedTextMessage{
Text: proto.String("hello @bot"),
Text: new("hello @bot"),
ContextInfo: &waE2E.ContextInfo{
MentionedJID: mentionedJIDs,
},
Expand All @@ -25,11 +24,11 @@ func TestIsMentioned(t *testing.T) {
}

tests := []struct {
name string
myJID string // bot's phone JID
myLID string // bot's LID
name string
myJID string // bot's phone JID
myLID string // bot's LID
mentions []string
want bool
want bool
}{
{
name: "mentioned by phone JID",
Expand Down Expand Up @@ -115,7 +114,7 @@ func TestIsMentioned(t *testing.T) {
// Plain conversation message — no extended text.
evt = &events.Message{
Message: &waE2E.Message{
Conversation: proto.String("hello"),
Conversation: new("hello"),
},
}
} else {
Expand Down
25 changes: 12 additions & 13 deletions internal/channels/whatsapp/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/proto/waE2E"
"go.mau.fi/whatsmeow/types"
"google.golang.org/protobuf/proto"

"github.com/nextlevelbuilder/goclaw/internal/bus"
)
Expand Down Expand Up @@ -61,7 +60,7 @@ func (c *Channel) Send(_ context.Context, msg bus.OutboundMessage) error {
chunks := chunkText(formatted, maxMessageLen)
for _, chunk := range chunks {
waMsg := &waE2E.Message{
Conversation: proto.String(chunk),
Conversation: new(chunk),
}
if _, err := c.client.SendMessage(c.ctx, chatJID, waMsg); err != nil {
return fmt.Errorf("send whatsapp message: %w", err)
Expand Down Expand Up @@ -90,14 +89,14 @@ func (c *Channel) buildMediaMessage(data []byte, mime, caption string) (*waE2E.M
}
return &waE2E.Message{
ImageMessage: &waE2E.ImageMessage{
Caption: proto.String(caption),
Mimetype: proto.String(mime),
Caption: new(caption),
Mimetype: new(mime),
URL: &uploaded.URL,
DirectPath: &uploaded.DirectPath,
MediaKey: uploaded.MediaKey,
FileEncSHA256: uploaded.FileEncSHA256,
FileSHA256: uploaded.FileSHA256,
FileLength: proto.Uint64(uint64(len(data))),
FileLength: new(uint64(len(data))),
},
}, nil

Expand All @@ -108,14 +107,14 @@ func (c *Channel) buildMediaMessage(data []byte, mime, caption string) (*waE2E.M
}
return &waE2E.Message{
VideoMessage: &waE2E.VideoMessage{
Caption: proto.String(caption),
Mimetype: proto.String(mime),
Caption: new(caption),
Mimetype: new(mime),
URL: &uploaded.URL,
DirectPath: &uploaded.DirectPath,
MediaKey: uploaded.MediaKey,
FileEncSHA256: uploaded.FileEncSHA256,
FileSHA256: uploaded.FileSHA256,
FileLength: proto.Uint64(uint64(len(data))),
FileLength: new(uint64(len(data))),
},
}, nil

Expand All @@ -126,13 +125,13 @@ func (c *Channel) buildMediaMessage(data []byte, mime, caption string) (*waE2E.M
}
return &waE2E.Message{
AudioMessage: &waE2E.AudioMessage{
Mimetype: proto.String(mime),
Mimetype: new(mime),
URL: &uploaded.URL,
DirectPath: &uploaded.DirectPath,
MediaKey: uploaded.MediaKey,
FileEncSHA256: uploaded.FileEncSHA256,
FileSHA256: uploaded.FileSHA256,
FileLength: proto.Uint64(uint64(len(data))),
FileLength: new(uint64(len(data))),
},
}, nil

Expand All @@ -143,14 +142,14 @@ func (c *Channel) buildMediaMessage(data []byte, mime, caption string) (*waE2E.M
}
return &waE2E.Message{
DocumentMessage: &waE2E.DocumentMessage{
Caption: proto.String(caption),
Mimetype: proto.String(mime),
Caption: new(caption),
Mimetype: new(mime),
URL: &uploaded.URL,
DirectPath: &uploaded.DirectPath,
MediaKey: uploaded.MediaKey,
FileEncSHA256: uploaded.FileEncSHA256,
FileSHA256: uploaded.FileSHA256,
FileLength: proto.Uint64(uint64(len(data))),
FileLength: new(uint64(len(data))),
},
}, nil
}
Expand Down
Loading
Loading