diff --git a/README.md b/README.md index e221a92..5a8cf2e 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,10 @@ Mu is a collection of apps for everyday use. While other platforms monetize your - **Home** - Your personalized dashboard - **Blog** - Thoughtful microblogging -- **Chat** - Discuss topics with AI +- **Chat** - Discuss topics with AI (Web) or federated XMPP chat - **News** - RSS feeds with AI summaries - **Video** - Watch YouTube without ads -- **Mail** - Private messaging & email +- **Mail** - Private messaging & email with SMTP - **Wallet** - Credits and crypto payments Mu runs as a single Go binary on your own server or use the hosted version at [mu.xyz](https://mu.xyz). @@ -31,7 +31,9 @@ Mu runs as a single Go binary on your own server or use the hosted version at [m - [x] Chat - Discussion rooms - [x] News - RSS news feed - [x] Video - YouTube search -- [x] Mail - Private messaging +- [x] Mail - Private messaging +- [x] SMTP - Email server for federation +- [x] XMPP - Chat server for federation - [x] Wallet - Crypto payments - [ ] Services - Marketplace, etc @@ -188,6 +190,7 @@ Full documentation is available in the [docs](docs/) folder and at `/docs` on an **Features** - [Messaging](docs/MESSAGING_SYSTEM.md) - Email and messaging setup +- [XMPP Chat](docs/XMPP_CHAT.md) - Federated chat with XMPP - [Wallet & Credits](docs/WALLET_AND_CREDITS.md) - Credit system for metered usage **Reference** diff --git a/app/status.go b/app/status.go index ec90bf5..9ef166d 100644 --- a/app/status.go +++ b/app/status.go @@ -65,6 +65,9 @@ type MemoryStatus struct { // DKIMStatusFunc is set by main to avoid import cycle var DKIMStatusFunc func() (enabled bool, domain, selector string) +// XMPPStatusFunc is set by main to avoid import cycle +var XMPPStatusFunc func() map[string]interface{} + // StatusHandler handles the /status endpoint func StatusHandler(w http.ResponseWriter, r *http.Request) { // Quick health check endpoint @@ -121,6 +124,31 @@ func buildStatus() StatusResponse { Details: fmt.Sprintf("Port %s", smtpPort), }) + // Check XMPP server + if XMPPStatusFunc != nil { + xmppStatus := XMPPStatusFunc() + enabled, ok := xmppStatus["enabled"].(bool) + if !ok { + enabled = false + } + details := "Not enabled" + if enabled { + domain, domainOk := xmppStatus["domain"].(string) + port, portOk := xmppStatus["port"].(string) + sessions, sessionsOk := xmppStatus["sessions"].(int) + if domainOk && portOk && sessionsOk { + details = fmt.Sprintf("%s:%s (%d sessions)", domain, port, sessions) + } else { + details = "Configuration error" + } + } + services = append(services, StatusCheck{ + Name: "XMPP Server", + Status: enabled, + Details: details, + }) + } + // Check LLM provider llmProvider, llmConfigured := checkLLMConfig() services = append(services, StatusCheck{ diff --git a/chat/chat.go b/chat/chat.go index 222b6be..069abd0 100644 --- a/chat/chat.go +++ b/chat/chat.go @@ -105,6 +105,39 @@ type Client struct { var rooms = make(map[string]*Room) var roomsMutex sync.RWMutex +// ChatMessage represents a direct message between users +type ChatMessage struct { + ID string `json:"id"` + From string `json:"from"` // Sender username + FromID string `json:"from_id"` // Sender account ID + To string `json:"to"` // Recipient username + ToID string `json:"to_id"` // Recipient account ID + Body string `json:"body"` + Read bool `json:"read"` + ReplyTo string `json:"reply_to"` // ID of message this is replying to + ThreadID string `json:"thread_id"` // Root message ID for O(1) thread grouping + CreatedAt time.Time `json:"created_at"` +} + +// ChatThread represents a conversation thread +type ChatThread struct { + Root *ChatMessage + Messages []*ChatMessage + Latest *ChatMessage + HasUnread bool +} + +// ChatInbox organizes messages by thread for a user +type ChatInbox struct { + Threads map[string]*ChatThread // threadID -> Thread + UnreadCount int // Cached unread message count +} + +// stored direct messages +var chatMessages []*ChatMessage +var chatInboxes map[string]*ChatInbox +var chatMessagesMutex sync.RWMutex + // saveRoomMessages persists room messages to disk func saveRoomMessages(roomID string, messages []RoomMessage) { filename := "room_" + strings.ReplaceAll(roomID, "/", "_") + ".json" @@ -1006,6 +1039,9 @@ func Load() { } } + // Load chat messages + loadChatMessages() + // Subscribe to summary generation requests summaryRequestSub := data.Subscribe(data.EventGenerateSummary) go func() { @@ -1217,6 +1253,14 @@ func generateSummaries() { } func Handler(w http.ResponseWriter, r *http.Request) { + // Check mode parameter - "messages" for direct messaging, default is AI chat + mode := r.URL.Query().Get("mode") + + if mode == "messages" { + handleMessagesMode(w, r) + return + } + // Check if this is a room-based chat (e.g., /chat?id=post_123) roomID := r.URL.Query().Get("id") @@ -1331,6 +1375,13 @@ func handleGetChat(w http.ResponseWriter, r *http.Request, roomID string) { roomJSON, _ := json.Marshal(roomData) tmpl := app.RenderHTMLForRequest("Chat", "Chat with AI", fmt.Sprintf(Template, topicTabs), r) + + // Add a link to messages mode + messagesLink := `
+

💬 New: Direct Messaging - Send messages to other users or chat with @micro (AI assistant)

+
` + + tmpl = strings.Replace(tmpl, `
`, messagesLink+`
`, 1) tmpl = strings.Replace(tmpl, "", fmt.Sprintf(``, summariesJSON, roomJSON), 1) w.Write([]byte(tmpl)) @@ -1642,3 +1693,255 @@ func cleanupIdleRooms() { } } } + +// Chat Messaging Functions (Direct Messages) + +// loadChatMessages loads chat messages from disk +func loadChatMessages() { +b, err := data.LoadFile("chat_messages.json") +if err != nil { +chatMessages = []*ChatMessage{} +chatInboxes = make(map[string]*ChatInbox) +return +} + +if err := json.Unmarshal(b, &chatMessages); err != nil { +chatMessages = []*ChatMessage{} +chatInboxes = make(map[string]*ChatInbox) +return +} + +app.Log("chat", "Loaded %d chat messages", len(chatMessages)) +fixChatThreading() +rebuildChatInboxes() +} + +// saveChatMessages saves chat messages to disk +func saveChatMessages() error { +chatMessagesMutex.RLock() +defer chatMessagesMutex.RUnlock() + +b, err := json.Marshal(chatMessages) +if err != nil { +return err +} + +return data.SaveFile("chat_messages.json", string(b)) +} + +// fixChatThreading repairs broken threading relationships +func fixChatThreading() { +fixed := 0 + +for _, msg := range chatMessages { +if msg.ReplyTo == "" { +continue +} + +if getChatMessageUnlocked(msg.ReplyTo) == nil { +app.Log("chat", "Message %s has missing parent %s - marking as root", msg.ID, msg.ReplyTo) +msg.ReplyTo = "" +fixed++ +} +} + +for _, msg := range chatMessages { +threadID := computeChatThreadID(msg) +if msg.ThreadID != threadID { +msg.ThreadID = threadID +fixed++ +} +} + +if fixed > 0 { +app.Log("chat", "Fixed threading for %d messages", fixed) +saveChatMessages() +} +} + +// computeChatThreadID walks up the chain to find the root message ID +func computeChatThreadID(msg *ChatMessage) string { +if msg.ReplyTo == "" { +return msg.ID +} + +visited := make(map[string]bool) +current := msg +for current.ReplyTo != "" && !visited[current.ID] { +visited[current.ID] = true +parent := getChatMessageUnlocked(current.ReplyTo) +if parent == nil { +return current.ID +} +current = parent +} + +return current.ID +} + +// getChatMessageUnlocked returns message by ID (caller must hold lock) +func getChatMessageUnlocked(id string) *ChatMessage { +for _, m := range chatMessages { +if m.ID == id { +return m +} +} +return nil +} + +// rebuildChatInboxes builds inbox structures from messages +func rebuildChatInboxes() { +chatInboxes = make(map[string]*ChatInbox) + +for _, msg := range chatMessages { +// Add to sender's inbox (sent messages) +if _, exists := chatInboxes[msg.FromID]; !exists { +chatInboxes[msg.FromID] = &ChatInbox{ +Threads: make(map[string]*ChatThread), +UnreadCount: 0, +} +} + +// Add to recipient's inbox (received messages) +if _, exists := chatInboxes[msg.ToID]; !exists { +chatInboxes[msg.ToID] = &ChatInbox{ +Threads: make(map[string]*ChatThread), +UnreadCount: 0, +} +} + +addChatMessageToInbox(chatInboxes[msg.FromID], msg, msg.FromID) +addChatMessageToInbox(chatInboxes[msg.ToID], msg, msg.ToID) +} +} + +// addChatMessageToInbox adds a message to an inbox +func addChatMessageToInbox(inbox *ChatInbox, msg *ChatMessage, userID string) { +threadID := msg.ThreadID +if threadID == "" { +threadID = computeChatThreadID(msg) +if threadID == "" { +threadID = msg.ID +} +} + +isUnread := !msg.Read && msg.ToID == userID +thread := inbox.Threads[threadID] +if thread == nil { +rootMsg := getChatMessageUnlocked(threadID) +if rootMsg == nil { +rootMsg = msg +} +thread = &ChatThread{ +Root: rootMsg, +Messages: []*ChatMessage{msg}, +Latest: msg, +HasUnread: isUnread, +} +inbox.Threads[threadID] = thread +if isUnread { +inbox.UnreadCount++ +} +} else { +thread.Messages = append(thread.Messages, msg) +if msg.CreatedAt.After(thread.Latest.CreatedAt) { +thread.Latest = msg +} +if isUnread { +thread.HasUnread = true +inbox.UnreadCount++ +} +} +} + +// SendChatMessage creates and stores a new chat message +func SendChatMessage(fromName, fromID, toName, toID, body, replyTo string) error { +chatMessagesMutex.Lock() +defer chatMessagesMutex.Unlock() + +msg := &ChatMessage{ +ID: fmt.Sprintf("%d", time.Now().UnixNano()), +From: fromName, +FromID: fromID, +To: toName, +ToID: toID, +Body: body, +Read: false, +ReplyTo: replyTo, +CreatedAt: time.Now(), +} + +// Compute thread ID +if replyTo != "" { +parent := getChatMessageUnlocked(replyTo) +if parent != nil { +msg.ThreadID = computeChatThreadID(parent) +} else { +msg.ThreadID = msg.ID +} +} else { +msg.ThreadID = msg.ID +} + +chatMessages = append(chatMessages, msg) + +// Update inboxes +if chatInboxes[fromID] == nil { +chatInboxes[fromID] = &ChatInbox{ +Threads: make(map[string]*ChatThread), +UnreadCount: 0, +} +} +if chatInboxes[toID] == nil { +chatInboxes[toID] = &ChatInbox{ +Threads: make(map[string]*ChatThread), +UnreadCount: 0, +} +} + +addChatMessageToInbox(chatInboxes[fromID], msg, fromID) +addChatMessageToInbox(chatInboxes[toID], msg, toID) + +app.Log("chat", "Sent message from %s to %s", fromName, toName) + +return saveChatMessages() +} + +// GetChatInbox returns the inbox for a user +func GetChatInbox(userID string) *ChatInbox { +chatMessagesMutex.RLock() +defer chatMessagesMutex.RUnlock() + +inbox := chatInboxes[userID] +if inbox == nil { +return &ChatInbox{ +Threads: make(map[string]*ChatThread), +UnreadCount: 0, +} +} +return inbox +} + +// MarkChatMessageAsRead marks a message as read +func MarkChatMessageAsRead(msgID, userID string) error { +chatMessagesMutex.Lock() +defer chatMessagesMutex.Unlock() + +for _, msg := range chatMessages { +if msg.ID == msgID && msg.ToID == userID && !msg.Read { +msg.Read = true + +// Update inbox unread count +if inbox := chatInboxes[userID]; inbox != nil { +inbox.UnreadCount-- +if inbox.UnreadCount < 0 { +inbox.UnreadCount = 0 +} +} + +return saveChatMessages() +} +} + +return nil +} diff --git a/chat/messages.go b/chat/messages.go new file mode 100644 index 0000000..23beba1 --- /dev/null +++ b/chat/messages.go @@ -0,0 +1,307 @@ +package chat + +import ( + "fmt" + "html" + "net/http" + "sort" + "strings" + + "mu/app" + "mu/auth" +) + +// handleMessagesMode handles direct messaging UI and logic +func handleMessagesMode(w http.ResponseWriter, r *http.Request) { + _, acc, err := auth.RequireSession(r) + if err != nil { + app.Unauthorized(w, r) + return + } + + // Handle POST - send message + if r.Method == "POST" { + if err := r.ParseForm(); err != nil { + app.BadRequest(w, r, "Failed to parse form") + return + } + + to := strings.TrimSpace(r.FormValue("to")) + body := strings.TrimSpace(r.FormValue("body")) + replyTo := strings.TrimSpace(r.FormValue("reply_to")) + + if to == "" || body == "" { + http.Error(w, "Recipient and message are required", http.StatusBadRequest) + return + } + + // Check if recipient is @micro (AI assistant) + if to == "micro" || to == "@micro" { + // This is handled by the AI chat - redirect + http.Redirect(w, r, "/chat", http.StatusSeeOther) + return + } + + // Look up recipient + toAcc, err := auth.GetAccountByName(to) + if err != nil { + http.Error(w, "Recipient not found", http.StatusNotFound) + return + } + + // Send message + if err := SendChatMessage(acc.Name, acc.ID, toAcc.Name, toAcc.ID, body, replyTo); err != nil { + http.Error(w, "Failed to send message", http.StatusInternalServerError) + return + } + + // Redirect to thread if replying, otherwise to inbox + threadID := r.URL.Query().Get("id") + if threadID != "" { + http.Redirect(w, r, "/chat?mode=messages&id="+threadID, http.StatusSeeOther) + } else if replyTo != "" { + chatMessagesMutex.RLock() + parentMsg := getChatMessageUnlocked(replyTo) + chatMessagesMutex.RUnlock() + if parentMsg != nil { + http.Redirect(w, r, "/chat?mode=messages&id="+parentMsg.ThreadID, http.StatusSeeOther) + } else { + http.Redirect(w, r, "/chat?mode=messages", http.StatusSeeOther) + } + } else { + http.Redirect(w, r, "/chat?mode=messages", http.StatusSeeOther) + } + return + } + + // Handle GET - show inbox or thread + msgID := r.URL.Query().Get("id") + compose := r.URL.Query().Get("compose") + + if msgID != "" { + // Show thread + renderChatThread(w, r, msgID, acc) + return + } + + if compose != "" { + // Show compose form + renderChatCompose(w, r, acc) + return + } + + // Show inbox + renderChatInbox(w, r, acc) +} + +// renderChatInbox renders the chat inbox with conversations +func renderChatInbox(w http.ResponseWriter, r *http.Request, acc *auth.Account) { + inbox := GetChatInbox(acc.ID) + + // Get all threads and sort by latest message time + type threadInfo struct { + thread *ChatThread + id string + } + var threads []threadInfo + for id, thread := range inbox.Threads { + threads = append(threads, threadInfo{thread: thread, id: id}) + } + + sort.Slice(threads, func(i, j int) bool { + return threads[i].thread.Latest.CreatedAt.After(threads[j].thread.Latest.CreatedAt) + }) + + // Render thread previews + var items []string + for _, t := range threads { + thread := t.thread + root := thread.Root + latest := thread.Latest + + // Determine who we're chatting with + var otherUser string + if root.FromID == acc.ID { + otherUser = root.To + } else { + otherUser = root.From + } + + // Build preview + unreadMarker := "" + if thread.HasUnread { + unreadMarker = `` + } + + preview := latest.Body + if len(preview) > 100 { + preview = preview[:100] + "..." + } + preview = html.EscapeString(preview) + + timeAgo := app.TimeAgo(latest.CreatedAt) + + item := fmt.Sprintf(` +
+
+ %s%s +
+
+
%s
+ %s +
+
+ `, root.ID, html.EscapeString(otherUser), unreadMarker, preview, timeAgo) + items = append(items, item) + } + + content := ` +
+ New Chat + AI Chat +
+ ` + + if len(items) == 0 { + content += `
+

No conversations yet.

+

Start a new chat or try chatting with @micro (AI assistant)

+
` + } else { + content += `
` + strings.Join(items, "\n") + `
` + } + + htmlContent := app.RenderHTMLForRequest("Chat Messages", "Direct messages", content, r) + w.Write([]byte(htmlContent)) +} + +// renderChatCompose renders the new message compose form +func renderChatCompose(w http.ResponseWriter, r *http.Request, acc *auth.Account) { + to := r.URL.Query().Get("to") + + content := fmt.Sprintf(` +
+ ← Back to Inbox +
+
+

New Chat

+
+
+ + + Tip: Type 'micro' to chat with the AI assistant +
+
+ + +
+ +
+
+ `, html.EscapeString(to)) + + htmlContent := app.RenderHTMLForRequest("New Chat", "Compose message", content, r) + w.Write([]byte(htmlContent)) +} + +// renderChatThread renders a conversation thread +func renderChatThread(w http.ResponseWriter, r *http.Request, threadID string, acc *auth.Account) { + chatMessagesMutex.RLock() + + // Get all messages in thread + var threadMessages []*ChatMessage + for _, msg := range chatMessages { + if msg.ThreadID == threadID && (msg.FromID == acc.ID || msg.ToID == acc.ID) { + threadMessages = append(threadMessages, msg) + } + } + + if len(threadMessages) == 0 { + chatMessagesMutex.RUnlock() + http.Error(w, "Thread not found", http.StatusNotFound) + return + } + + // Sort by created time + sort.Slice(threadMessages, func(i, j int) bool { + return threadMessages[i].CreatedAt.Before(threadMessages[j].CreatedAt) + }) + + // Determine who we're chatting with + firstMsg := threadMessages[0] + var otherUser string + var otherUserID string + if firstMsg.FromID == acc.ID { + otherUser = firstMsg.To + otherUserID = firstMsg.ToID + } else { + otherUser = firstMsg.From + otherUserID = firstMsg.FromID + } + + chatMessagesMutex.RUnlock() + + // Mark all unread messages as read + for _, msg := range threadMessages { + if msg.ToID == acc.ID && !msg.Read { + MarkChatMessageAsRead(msg.ID, acc.ID) + } + } + + // Render messages + var messageHTML []string + for _, msg := range threadMessages { + isSent := msg.FromID == acc.ID + + sender := msg.From + if isSent { + sender = "You" + } + + body := html.EscapeString(msg.Body) + body = strings.ReplaceAll(body, "\n", "
") + + timeStr := msg.CreatedAt.Format("Jan 2, 3:04 PM") + + borderColor := "#28a745" + if isSent { + borderColor = "#007bff" + } + + msgHTML := fmt.Sprintf(` +
+
%s
+
%s
+
%s
+
+ `, borderColor, borderColor, sender, body, timeStr) + messageHTML = append(messageHTML, msgHTML) + } + + content := fmt.Sprintf(` +
+ ← Back to Inbox +

Chat with %s

+
+
+ %s +
+
+
+ + +
+ + +
+ +
+
+ `, html.EscapeString(otherUser), strings.Join(messageHTML, "\n"), threadID, otherUserID, threadMessages[len(threadMessages)-1].ID) + + htmlContent := app.RenderHTMLForRequest("Chat Thread", "Conversation with "+otherUser, content, r) + w.Write([]byte(htmlContent)) +} diff --git a/chat/xmpp.go b/chat/xmpp.go new file mode 100644 index 0000000..055d458 --- /dev/null +++ b/chat/xmpp.go @@ -0,0 +1,565 @@ +package chat + +import ( + "context" + "encoding/base64" + "encoding/xml" + "fmt" + "io" + "log" + "net" + "os" + "strings" + "sync" + "time" + + "mu/app" + "mu/auth" + + "golang.org/x/crypto/bcrypt" +) + +// XMPP server implementation for chat federation +// Similar to mail/SMTP, this provides decentralized chat capability +// Implements core XMPP (RFC 6120, 6121, 6122) + +const ( + xmppNamespace = "jabber:client" + xmppStreamNamespace = "http://etherx.jabber.org/streams" + xmppSASLNamespace = "urn:ietf:params:xml:ns:xmpp-sasl" + xmppBindNamespace = "urn:ietf:params:xml:ns:xmpp-bind" +) + +// XMPPServer represents the XMPP server +type XMPPServer struct { + Domain string + Port string + listener net.Listener + sessions map[string]*XMPPSession + mutex sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +// XMPPSession represents a client connection +type XMPPSession struct { + conn net.Conn + jid string // Full JID (user@domain/resource) + username string + resource string + domain string + authorized bool + encoder *xml.Encoder + decoder *xml.Decoder + mutex sync.Mutex +} + +// XMPP stream elements +type StreamStart struct { + XMLName xml.Name `xml:"http://etherx.jabber.org/streams stream"` + From string `xml:"from,attr,omitempty"` + To string `xml:"to,attr,omitempty"` + ID string `xml:"id,attr,omitempty"` + Version string `xml:"version,attr,omitempty"` + Lang string `xml:"xml:lang,attr,omitempty"` +} + +type StreamFeatures struct { + XMLName xml.Name `xml:"stream:features"` + Mechanisms []string `xml:"mechanisms>mechanism,omitempty"` + Bind *struct{} `xml:"bind,omitempty"` + Session *struct{} `xml:"session,omitempty"` +} + +type SASLAuth struct { + XMLName xml.Name `xml:"urn:ietf:params:xml:ns:xmpp-sasl auth"` + Mechanism string `xml:"mechanism,attr"` + Value string `xml:",chardata"` +} + +type SASLSuccess struct { + XMLName xml.Name `xml:"urn:ietf:params:xml:ns:xmpp-sasl success"` +} + +type SASLFailure struct { + XMLName xml.Name `xml:"urn:ietf:params:xml:ns:xmpp-sasl failure"` + Reason string `xml:",innerxml"` +} + +type IQBind struct { + XMLName xml.Name `xml:"urn:ietf:params:xml:ns:xmpp-bind bind"` + Resource string `xml:"resource,omitempty"` + JID string `xml:"jid,omitempty"` +} + +type IQ struct { + XMLName xml.Name `xml:"iq"` + Type string `xml:"type,attr"` + ID string `xml:"id,attr,omitempty"` + From string `xml:"from,attr,omitempty"` + To string `xml:"to,attr,omitempty"` + Bind *IQBind `xml:"bind,omitempty"` + Error *struct { + Type string `xml:"type,attr"` + Text string `xml:",innerxml"` + } `xml:"error,omitempty"` +} + +type Message struct { + XMLName xml.Name `xml:"message"` + Type string `xml:"type,attr,omitempty"` + From string `xml:"from,attr,omitempty"` + To string `xml:"to,attr,omitempty"` + ID string `xml:"id,attr,omitempty"` + Body string `xml:"body,omitempty"` +} + +type Presence struct { + XMLName xml.Name `xml:"presence"` + Type string `xml:"type,attr,omitempty"` + From string `xml:"from,attr,omitempty"` + To string `xml:"to,attr,omitempty"` + Show string `xml:"show,omitempty"` + Status string `xml:"status,omitempty"` +} + +// NewXMPPServer creates a new XMPP server instance +func NewXMPPServer(domain, port string) *XMPPServer { + ctx, cancel := context.WithCancel(context.Background()) + return &XMPPServer{ + Domain: domain, + Port: port, + sessions: make(map[string]*XMPPSession), + ctx: ctx, + cancel: cancel, + } +} + +// Start begins listening for XMPP connections +func (s *XMPPServer) Start() error { + addr := ":" + s.Port + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to start XMPP server: %v", err) + } + + s.listener = listener + app.Log("xmpp", "XMPP server listening on %s (domain: %s)", addr, s.Domain) + + // Accept connections + go s.acceptConnections() + + return nil +} + +// acceptConnections handles incoming connections +func (s *XMPPServer) acceptConnections() { + for { + select { + case <-s.ctx.Done(): + return + default: + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.ctx.Done(): + return + default: + app.Log("xmpp", "Error accepting connection: %v", err) + continue + } + } + + // Handle each connection in a goroutine + go s.handleConnection(conn) + } + } +} + +// handleConnection processes a single XMPP client connection +func (s *XMPPServer) handleConnection(conn net.Conn) { + defer conn.Close() + + session := &XMPPSession{ + conn: conn, + domain: s.Domain, + encoder: xml.NewEncoder(conn), + decoder: xml.NewDecoder(conn), + } + + remoteAddr := conn.RemoteAddr().String() + app.Log("xmpp", "New connection from %s", remoteAddr) + + // Initial stream negotiation + if err := s.handleStreamNegotiation(session); err != nil { + app.Log("xmpp", "Stream negotiation failed: %v", err) + return + } + + // Main stanza processing loop + s.handleStanzas(session) +} + +// handleStreamNegotiation performs initial XMPP stream setup +func (s *XMPPServer) handleStreamNegotiation(session *XMPPSession) error { + // Read opening stream tag + var streamStart StreamStart + if err := session.decoder.Decode(&streamStart); err != nil { + return fmt.Errorf("failed to read stream start: %v", err) + } + + // Send stream response + streamID := generateStreamID() + response := fmt.Sprintf(` +`, s.Domain, streamID) + + if _, err := session.conn.Write([]byte(response)); err != nil { + return fmt.Errorf("failed to send stream header: %v", err) + } + + // Send stream features + features := StreamFeatures{ + Mechanisms: []string{"PLAIN"}, + Bind: &struct{}{}, + Session: &struct{}{}, + } + + if err := session.encoder.Encode(&features); err != nil { + return fmt.Errorf("failed to send features: %v", err) + } + + return nil +} + +// handleStanzas processes incoming XMPP stanzas +func (s *XMPPServer) handleStanzas(session *XMPPSession) { + for { + // Read next token + token, err := session.decoder.Token() + if err != nil { + if err != io.EOF { + app.Log("xmpp", "Error reading token: %v", err) + } + return + } + + switch t := token.(type) { + case xml.StartElement: + switch t.Name.Local { + case "auth": + s.handleAuth(session) + case "iq": + s.handleIQ(session, t) + case "message": + s.handleMessage(session, t) + case "presence": + s.handlePresence(session, t) + } + case xml.EndElement: + if t.Name.Local == "stream" { + return + } + } + } +} + +// handleAuth processes SASL authentication +func (s *XMPPServer) handleAuth(session *XMPPSession) { + var authStanza SASLAuth + if err := session.decoder.DecodeElement(&authStanza, nil); err != nil { + app.Log("xmpp", "Failed to decode auth: %v", err) + if err := session.encoder.Encode(&SASLFailure{Reason: "malformed-request"}); err != nil { + app.Log("xmpp", "Failed to send auth failure: %v", err) + } + return + } + + // For PLAIN mechanism, decode credentials + if authStanza.Mechanism == "PLAIN" { + // PLAIN format: \0username\0password (base64 encoded) + // Decode the base64 auth value + decoded, err := base64.StdEncoding.DecodeString(authStanza.Value) + if err != nil { + app.Log("xmpp", "Failed to decode auth credentials: %v", err) + if err := session.encoder.Encode(&SASLFailure{Reason: "malformed-request"}); err != nil { + app.Log("xmpp", "Failed to send auth failure: %v", err) + } + return + } + + // Parse PLAIN SASL format: [authzid]\0username\0password + parts := strings.Split(string(decoded), "\x00") + if len(parts) < 3 { + app.Log("xmpp", "Invalid PLAIN SASL format") + if err := session.encoder.Encode(&SASLFailure{Reason: "invalid-authzid"}); err != nil { + app.Log("xmpp", "Failed to send auth failure: %v", err) + } + return + } + + username := parts[1] + password := parts[2] + + // Verify credentials against auth system + acc, err := auth.GetAccountByName(username) + if err != nil { + app.Log("xmpp", "Authentication failed for user %s: user not found", username) + if err := session.encoder.Encode(&SASLFailure{Reason: "not-authorized"}); err != nil { + app.Log("xmpp", "Failed to send auth failure: %v", err) + } + return + } + + // Verify password using bcrypt + if err := bcrypt.CompareHashAndPassword([]byte(acc.Secret), []byte(password)); err != nil { + app.Log("xmpp", "Authentication failed for user %s: invalid password", username) + if err := session.encoder.Encode(&SASLFailure{Reason: "not-authorized"}); err != nil { + app.Log("xmpp", "Failed to send auth failure: %v", err) + } + return + } + + // Authentication successful + session.authorized = true + session.username = username + app.Log("xmpp", "User %s authenticated successfully", username) + + // Send success + if err := session.encoder.Encode(&SASLSuccess{}); err != nil { + app.Log("xmpp", "Failed to send auth success: %v", err) + return + } + + // Client will restart stream after successful auth + } else { + if err := session.encoder.Encode(&SASLFailure{Reason: "invalid-mechanism"}); err != nil { + app.Log("xmpp", "Failed to send auth failure: %v", err) + } + } +} + +// handleIQ processes IQ (Info/Query) stanzas +func (s *XMPPServer) handleIQ(session *XMPPSession, start xml.StartElement) { + var iq IQ + if err := session.decoder.DecodeElement(&iq, &start); err != nil { + app.Log("xmpp", "Failed to decode IQ: %v", err) + return + } + + // Handle resource binding + if iq.Type == "set" && iq.Bind != nil { + resource := iq.Bind.Resource + if resource == "" { + resource = "mu-" + generateStreamID()[:8] + } + + session.resource = resource + session.jid = fmt.Sprintf("%s@%s/%s", session.username, s.Domain, resource) + + // Store session + s.mutex.Lock() + s.sessions[session.jid] = session + s.mutex.Unlock() + + // Send bind result + result := IQ{ + Type: "result", + ID: iq.ID, + Bind: &IQBind{ + JID: session.jid, + }, + } + + if err := session.encoder.Encode(&result); err != nil { + app.Log("xmpp", "Failed to send bind result: %v", err) + } + + app.Log("xmpp", "User bound to JID: %s", session.jid) + } +} + +// handleMessage processes message stanzas +func (s *XMPPServer) handleMessage(session *XMPPSession, start xml.StartElement) { + var msg Message + if err := session.decoder.DecodeElement(&msg, &start); err != nil { + app.Log("xmpp", "Failed to decode message: %v", err) + return + } + + // Set from if not already set + if msg.From == "" { + msg.From = session.jid + } + + app.Log("xmpp", "Message from %s to %s: %s", msg.From, msg.To, msg.Body) + + // Route message to recipient + if msg.To != "" { + s.routeMessage(&msg) + } +} + +// handlePresence processes presence stanzas +func (s *XMPPServer) handlePresence(session *XMPPSession, start xml.StartElement) { + var pres Presence + if err := session.decoder.DecodeElement(&pres, &start); err != nil { + app.Log("xmpp", "Failed to decode presence: %v", err) + return + } + + if pres.From == "" { + pres.From = session.jid + } + + app.Log("xmpp", "Presence from %s: %s", pres.From, pres.Type) + + // Update user presence in auth system + if session.username != "" { + if account, err := auth.GetAccountByName(session.username); err == nil { + auth.UpdatePresence(account.ID) + } + } + + // Broadcast presence to other sessions + s.broadcastPresence(&pres) +} + +// routeMessage delivers a message to the recipient +func (s *XMPPServer) routeMessage(msg *Message) { + // Extract recipient JID + recipientJID := msg.To + + // Check if recipient is local or remote + parts := strings.Split(recipientJID, "@") + if len(parts) < 2 { + app.Log("xmpp", "Invalid recipient JID: %s", recipientJID) + return + } + + domain := strings.Split(parts[1], "/")[0] + + if domain == s.Domain { + // Local delivery + s.mutex.RLock() + session, exists := s.sessions[recipientJID] + s.mutex.RUnlock() + + if exists { + session.mutex.Lock() + defer session.mutex.Unlock() + + if err := session.encoder.Encode(msg); err != nil { + app.Log("xmpp", "Failed to deliver message: %v", err) + } else { + app.Log("xmpp", "Message delivered to %s", recipientJID) + } + } else { + // Store offline message (would integrate with mail system) + app.Log("xmpp", "User %s offline, message would be stored", recipientJID) + } + } else { + // Remote delivery via S2S (Server-to-Server) + // For now, log that we'd relay it + app.Log("xmpp", "Would relay message to remote domain: %s", domain) + } +} + +// broadcastPresence sends presence to all sessions +func (s *XMPPServer) broadcastPresence(pres *Presence) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + for jid, session := range s.sessions { + if jid != pres.From { + session.mutex.Lock() + if err := session.encoder.Encode(pres); err != nil { + app.Log("xmpp", "Failed to broadcast presence to %s: %v", jid, err) + } + session.mutex.Unlock() + } + } +} + +// generateStreamID creates a unique stream identifier +func generateStreamID() string { + return fmt.Sprintf("%d", time.Now().UnixNano()) +} + +// Stop gracefully shuts down the XMPP server +func (s *XMPPServer) Stop() error { + app.Log("xmpp", "Shutting down XMPP server") + s.cancel() + + if s.listener != nil { + return s.listener.Close() + } + + return nil +} + +// Global XMPP server instance +var xmppServer *XMPPServer + +// StartXMPPServer initializes and starts the XMPP server +func StartXMPPServer() error { + // Get configuration from environment + domain := os.Getenv("XMPP_DOMAIN") + if domain == "" { + domain = "localhost" // Default domain + } + + port := os.Getenv("XMPP_PORT") + if port == "" { + port = "5222" // Standard XMPP client-to-server port + } + + // Create and start server + xmppServer = NewXMPPServer(domain, port) + + // Start in goroutine + go func() { + if err := xmppServer.Start(); err != nil { + log.Printf("XMPP server error: %v", err) + } + }() + + return nil +} + +// StartXMPPServerIfEnabled starts the XMPP server if configured +func StartXMPPServerIfEnabled() bool { + // Check if XMPP is enabled + enabled := os.Getenv("XMPP_ENABLED") + if enabled == "" || enabled == "false" || enabled == "0" { + app.Log("xmpp", "XMPP server disabled (set XMPP_ENABLED=true to enable)") + return false + } + + if err := StartXMPPServer(); err != nil { + app.Log("xmpp", "Failed to start XMPP server: %v", err) + return false + } + + return true +} + +// GetXMPPStatus returns the XMPP server status for health checks +func GetXMPPStatus() map[string]interface{} { + status := map[string]interface{}{ + "enabled": false, + } + + if xmppServer != nil { + xmppServer.mutex.RLock() + sessionCount := len(xmppServer.sessions) + xmppServer.mutex.RUnlock() + + status["enabled"] = true + status["domain"] = xmppServer.Domain + status["port"] = xmppServer.Port + status["sessions"] = sessionCount + } + + return status +} diff --git a/chat/xmpp_test.go b/chat/xmpp_test.go new file mode 100644 index 0000000..d70b764 --- /dev/null +++ b/chat/xmpp_test.go @@ -0,0 +1,104 @@ +package chat + +import ( + "testing" + "time" +) + +// TestNewXMPPServer tests server initialization +func TestNewXMPPServer(t *testing.T) { + server := NewXMPPServer("test.example.com", "5222") + + if server == nil { + t.Fatal("Expected server to be created, got nil") + } + + if server.Domain != "test.example.com" { + t.Errorf("Expected domain 'test.example.com', got '%s'", server.Domain) + } + + if server.Port != "5222" { + t.Errorf("Expected port '5222', got '%s'", server.Port) + } + + if server.sessions == nil { + t.Error("Expected sessions map to be initialized") + } + + if len(server.sessions) != 0 { + t.Errorf("Expected 0 sessions initially, got %d", len(server.sessions)) + } +} + +// TestGenerateStreamID tests stream ID generation +func TestGenerateStreamID(t *testing.T) { + id1 := generateStreamID() + if id1 == "" { + t.Error("Expected non-empty stream ID") + } + + // Wait a bit to ensure different timestamp + time.Sleep(1 * time.Millisecond) + + id2 := generateStreamID() + if id2 == "" { + t.Error("Expected non-empty stream ID") + } + + if id1 == id2 { + t.Error("Expected different stream IDs for different calls") + } +} + +// TestGetXMPPStatus tests status retrieval +func TestGetXMPPStatus(t *testing.T) { + // Test when server is nil (not started) + status := GetXMPPStatus() + + if status["enabled"] != false { + t.Error("Expected enabled to be false when server is nil") + } + + // Create a server instance + xmppServer = NewXMPPServer("test.example.com", "5222") + + status = GetXMPPStatus() + + if status["enabled"] != true { + t.Error("Expected enabled to be true when server exists") + } + + if status["domain"] != "test.example.com" { + t.Errorf("Expected domain 'test.example.com', got '%v'", status["domain"]) + } + + if status["port"] != "5222" { + t.Errorf("Expected port '5222', got '%v'", status["port"]) + } + + if status["sessions"] != 0 { + t.Errorf("Expected 0 sessions, got '%v'", status["sessions"]) + } + + // Clean up + xmppServer = nil +} + +// TestXMPPServerStop tests graceful shutdown +func TestXMPPServerStop(t *testing.T) { + server := NewXMPPServer("test.example.com", "5222") + + // Stop should not error even if listener is nil + err := server.Stop() + if err != nil { + t.Errorf("Expected no error on stop with nil listener, got %v", err) + } + + // Check that context is cancelled + select { + case <-server.ctx.Done(): + // Context cancelled as expected + case <-time.After(100 * time.Millisecond): + t.Error("Expected context to be cancelled after Stop()") + } +} diff --git a/docs/ENVIRONMENT_VARIABLES.md b/docs/ENVIRONMENT_VARIABLES.md index be1a635..59705b4 100644 --- a/docs/ENVIRONMENT_VARIABLES.md +++ b/docs/ENVIRONMENT_VARIABLES.md @@ -60,6 +60,28 @@ export MAIL_SELECTOR="default" # Default: default - DKIM signing enables automatically if keys exist at `~/.mu/keys/dkim.key` - External email costs credits (SMTP delivery cost) +## XMPP Chat Configuration + +Mu includes an XMPP server for federated chat, similar to how SMTP enables federated email. + +```bash +# Enable XMPP server (disabled by default) +export XMPP_ENABLED="true" # Default: false + +# Domain for XMPP addresses (JIDs) +export XMPP_DOMAIN="chat.yourdomain.com" # Default: localhost + +# XMPP client-to-server port +export XMPP_PORT="5222" # Default: 5222 (standard XMPP port) +``` + +**Notes:** +- XMPP is disabled by default - set `XMPP_ENABLED=true` to enable +- Users can connect with any XMPP client (Conversations, Gajim, etc.) +- Provides federated chat like email federation via SMTP +- See [XMPP Chat documentation](XMPP_CHAT.md) for setup guide +- Requires DNS SRV records for federation + ## Payment Configuration (Optional) Enable donations to support your instance. All variables are optional - leave empty for a free instance. @@ -154,6 +176,9 @@ export MAIL_SELECTOR="default" | `MAIL_PORT` | `2525` | Port for messaging server (SMTP protocol, use 25 for production) | | `MAIL_DOMAIN` | `localhost` | Your domain for message addresses | | `MAIL_SELECTOR` | `default` | DKIM selector for DNS lookup | +| `XMPP_ENABLED` | `false` | Enable XMPP chat server | +| `XMPP_DOMAIN` | `localhost` | Domain for XMPP chat addresses (JIDs) | +| `XMPP_PORT` | `5222` | Port for XMPP client-to-server connections | | `DONATION_URL` | - | Payment link for one-time donations (optional) | | `SUPPORT_URL` | - | Community/support link like Discord (optional) | | `WALLET_SEED` | - | BIP39 mnemonic for HD wallet (auto-generated if not set) | @@ -292,6 +317,7 @@ docker run -d \ | Vector Search | Ollama with `nomic-embed-text` model (`MODEL_API_URL`) | | Video | `YOUTUBE_API_KEY` | | Messaging | `MAIL_PORT`, `MAIL_DOMAIN` (optional: `MAIL_SELECTOR` for DKIM) | +| XMPP Chat | `XMPP_ENABLED=true`, `XMPP_DOMAIN` (optional: `XMPP_PORT`) | | Donations | `DONATION_URL` (optional: `SUPPORT_URL`) | | Payments | `WALLET_SEED` or auto-generated in `~/.mu/keys/wallet.seed` | diff --git a/docs/XMPP_CHAT.md b/docs/XMPP_CHAT.md new file mode 100644 index 0000000..785e2a7 --- /dev/null +++ b/docs/XMPP_CHAT.md @@ -0,0 +1,207 @@ +# XMPP Chat Federation + +Mu includes an XMPP (Extensible Messaging and Presence Protocol) server that provides federated chat capabilities, similar to how SMTP provides federated email. + +## Overview + +Just like the mail system uses SMTP for decentralized email, Mu can use XMPP for decentralized chat. This provides: + +- **Federation**: Users can communicate across different Mu instances +- **Standard Protocol**: Compatible with existing XMPP clients (Conversations, Gajim, etc.) +- **Autonomy**: No reliance on centralized chat platforms +- **Privacy**: Self-hosted chat infrastructure + +## Configuration + +The XMPP server is disabled by default. To enable it, set the following environment variables: + +```bash +# Enable XMPP server +export XMPP_ENABLED=true + +# Set your domain (required for federation) +export XMPP_DOMAIN=chat.yourdomain.com + +# Set the port (optional, defaults to 5222) +export XMPP_PORT=5222 +``` + +## DNS Configuration + +For federation to work, you'll need to configure DNS SRV records: + +``` +_xmpp-client._tcp.yourdomain.com. 86400 IN SRV 5 0 5222 chat.yourdomain.com. +_xmpp-server._tcp.yourdomain.com. 86400 IN SRV 5 0 5269 chat.yourdomain.com. +``` + +## Usage + +### Connecting with XMPP Clients + +Users can connect to your Mu instance using any XMPP-compatible client: + +**Connection Details:** +- **Username**: Your Mu username +- **Domain**: Your XMPP_DOMAIN +- **Port**: 5222 (default) +- **JID Format**: username@yourdomain.com + +**Recommended Clients:** +- **Mobile**: Conversations (Android), Siskin (iOS) +- **Desktop**: Gajim (Linux/Windows), Beagle IM (macOS) +- **Web**: Converse.js + +### Authentication + +The XMPP server integrates with Mu's authentication system. Users authenticate with their Mu credentials using SASL PLAIN authentication. + +## Features + +### Current Implementation + +- **Client-to-Server (C2S)**: Users can connect and send/receive messages +- **Basic Authentication**: SASL PLAIN mechanism +- **Presence**: Online/offline status tracking +- **Resource Binding**: Multiple devices per user +- **Message Routing**: Local message delivery + +### Planned Features + +- **Server-to-Server (S2S)**: Federation with other XMPP servers +- **Message History**: Persistent chat storage (MAM - Message Archive Management) +- **Multi-User Chat (MUC)**: Group chat rooms +- **File Transfer**: Share files between users +- **End-to-End Encryption**: OMEMO support +- **Push Notifications**: Mobile push via XEP-0357 + +## Architecture + +The XMPP server follows the same pattern as the SMTP server: + +``` +chat/ +├── chat.go # Web-based chat interface +├── xmpp.go # XMPP server implementation +└── prompts.json # Chat prompts +``` + +Like SMTP, the XMPP server: +- Runs in a separate goroutine +- Listens on a dedicated port (5222 by default) +- Integrates with Mu's authentication system +- Provides autonomy and federation + +## Status Monitoring + +The XMPP server status is visible on the `/status` page: + +```json +{ + "services": [ + { + "name": "XMPP Server", + "status": true, + "details": "chat.yourdomain.com:5222 (3 sessions)" + } + ] +} +``` + +## Security Considerations + +### Current Implementation + +- SASL PLAIN authentication (credentials sent in plaintext) +- No TLS encryption yet + +### Production Recommendations + +1. **Use TLS**: Add STARTTLS support for encrypted connections +2. **Strong Authentication**: Implement SCRAM-SHA-256 in addition to PLAIN +3. **Rate Limiting**: Implement connection and message rate limits +4. **Spam Prevention**: Add anti-spam measures +5. **Monitoring**: Track failed authentication attempts + +## Comparison with SMTP + +| Feature | SMTP (Mail) | XMPP (Chat) | +|---------|-------------|-------------| +| Protocol | RFC 5321 | RFC 6120 | +| Port | 2525/587 | 5222 | +| Federation | Yes | Yes | +| Real-time | No | Yes | +| Offline Delivery | Yes | Planned | +| Encryption | DKIM/SPF | TLS/OMEMO | + +## Example Use Cases + +### 1. Self-Hosted Chat +Run your own chat server without depending on Discord, Slack, or WhatsApp. + +### 2. Federated Communities +Connect multiple Mu instances for a distributed community. + +### 3. Privacy-Focused Messaging +Chat with end-to-end encryption on your own infrastructure. + +### 4. Integration with Existing Tools +Use existing XMPP clients and bots with your Mu instance. + +## Troubleshooting + +### Server Won't Start + +Check logs for errors: +```bash +# Look for XMPP server logs +mu --serve | grep xmpp +``` + +Common issues: +- Port 5222 already in use +- Incorrect XMPP_DOMAIN configuration +- Missing permissions to bind to port + +### Can't Connect from Client + +Verify configuration: +1. Check XMPP_ENABLED is set to true +2. Verify XMPP_DOMAIN matches your setup +3. Ensure port 5222 is accessible (firewall rules) +4. Check DNS SRV records are configured + +### Messages Not Delivering + +- Ensure both users are connected +- Check server logs for routing errors +- Verify JID format (user@domain/resource) + +## Future Development + +The XMPP implementation is currently minimal, providing basic chat functionality. Future enhancements include: + +1. **Complete S2S Implementation**: Full federation with other XMPP servers +2. **XEP Compliance**: Implement more XMPP Extension Protocols +3. **Message Archive Management**: Persistent message history +4. **Group Chat**: Multi-user chat rooms (MUC) +5. **Modern Features**: Reactions, typing indicators, read receipts +6. **Mobile Support**: Push notifications for offline users + +## References + +- [RFC 6120](https://tools.ietf.org/html/rfc6120) - XMPP Core +- [RFC 6121](https://tools.ietf.org/html/rfc6121) - XMPP Instant Messaging +- [XMPP Standards Foundation](https://xmpp.org/) +- [XEPs](https://xmpp.org/extensions/) - XMPP Extension Protocols + +## Contributing + +The XMPP implementation is a work in progress. Contributions welcome for: +- S2S federation +- Additional XEP implementations +- TLS/STARTTLS support +- Enhanced authentication mechanisms +- Testing and documentation + +See [CONTRIBUTING.md](../CONTRIBUTING.md) for guidelines. diff --git a/main.go b/main.go index 90afc14..79d45cd 100644 --- a/main.go +++ b/main.go @@ -185,6 +185,7 @@ func main() { // status page - public health check app.DKIMStatusFunc = mail.DKIMStatus + app.XMPPStatusFunc = chat.GetXMPPStatus http.HandleFunc("/status", app.StatusHandler) // documentation @@ -340,6 +341,9 @@ func main() { // Start SMTP server if enabled (disabled by default) mail.StartSMTPServerIfEnabled() + // Start XMPP server if enabled (disabled by default) + chat.StartXMPPServerIfEnabled() + // Log initial memory usage var m runtime.MemStats runtime.ReadMemStats(&m)