From b9d0e18b74f786b5b5cea11d54aeeed3e65009ed Mon Sep 17 00:00:00 2001 From: Wang Hanbin Date: Fri, 6 Mar 2026 09:06:32 +0800 Subject: [PATCH 1/2] feat(channels): add DingTalk Stream mode adapter Adds a WebSocket-based DingTalk Stream channel adapter as an alternative to the existing webhook-based DingTalk adapter. DingTalk Stream Mode uses a long-lived WebSocket connection to the DingTalk Gateway, eliminating the need for a public webhook endpoint. Changes: - `openfang-types`: add `DingTalkStreamConfig` struct and wire into `ChannelsConfig` alongside the existing `DingTalkConfig` - `openfang-channels`: implement `DingTalkStreamAdapter` (WebSocket connection management, ping/pong, token refresh, send via batchSend API) - `openfang-api`: register `dingtalk_stream` in the channel registry, `is_channel_configured`, and `channel_config_values` - `openfang-api`: wire adapter startup in `channel_bridge.rs` - `openfang-cli`: add `dingtalk_stream` entry to the TUI channels list Configuration: ```toml [channels.dingtalk_stream] app_key_env = "DINGTALK_APP_KEY" # Enterprise Internal App Key app_secret_env = "DINGTALK_APP_SECRET" # Enterprise Internal App Secret robot_code_env = "DINGTALK_ROBOT_CODE" # optional, defaults to app_key ``` Requires an Enterprise Internal App in the DingTalk Open Platform with Stream Mode enabled. No public endpoint needed. Made-with: Cursor --- crates/openfang-api/src/channel_bridge.rs | 17 +- crates/openfang-api/src/routes.rs | 17 + .../openfang-channels/src/dingtalk_stream.rs | 607 ++++++++++++++++++ crates/openfang-channels/src/lib.rs | 1 + .../openfang-cli/src/tui/screens/channels.rs | 9 +- crates/openfang-types/src/config.rs | 57 +- 6 files changed, 705 insertions(+), 3 deletions(-) create mode 100644 crates/openfang-channels/src/dingtalk_stream.rs diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index b72180670..32d8be8cb 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -43,6 +43,7 @@ use openfang_channels::webex::WebexAdapter; // Wave 5 use async_trait::async_trait; use openfang_channels::dingtalk::DingTalkAdapter; +use openfang_channels::dingtalk_stream::DingTalkStreamAdapter; use openfang_channels::discourse::DiscourseAdapter; use openfang_channels::gitter::GitterAdapter; use openfang_channels::gotify::GotifyAdapter; @@ -727,6 +728,7 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { // Wave 5 "mumble" => channels.mumble.as_ref().map(|c| c.overrides.clone()), "dingtalk" => channels.dingtalk.as_ref().map(|c| c.overrides.clone()), + "dingtalk_stream" => channels.dingtalk_stream.as_ref().map(|c| c.overrides.clone()), "discourse" => channels.discourse.as_ref().map(|c| c.overrides.clone()), "gitter" => channels.gitter.as_ref().map(|c| c.overrides.clone()), "ntfy" => channels.ntfy.as_ref().map(|c| c.overrides.clone()), @@ -1003,6 +1005,7 @@ pub async fn start_channel_bridge_with_config( // Wave 5 || config.mumble.is_some() || config.dingtalk.is_some() + || config.dingtalk_stream.is_some() || config.discourse.is_some() || config.gitter.is_some() || config.ntfy.is_some() @@ -1436,7 +1439,7 @@ pub async fn start_channel_bridge_with_config( } } - // DingTalk + // DingTalk (webhook mode) if let Some(ref dt_config) = config.dingtalk { if let Some(token) = read_token(&dt_config.access_token_env, "DingTalk") { let secret = read_token(&dt_config.secret_env, "DingTalk (secret)").unwrap_or_default(); @@ -1445,6 +1448,18 @@ pub async fn start_channel_bridge_with_config( } } + // DingTalk (stream mode) + if let Some(ref ds_config) = config.dingtalk_stream { + if let Some(app_key) = read_token(&ds_config.app_key_env, "DingTalk Stream (app_key)") { + if let Some(app_secret) = read_token(&ds_config.app_secret_env, "DingTalk Stream (app_secret)") { + let robot_code = read_token(&ds_config.robot_code_env, "DingTalk Stream (robot_code)") + .unwrap_or_else(|| app_key.clone()); + let adapter = Arc::new(DingTalkStreamAdapter::new(app_key, app_secret, robot_code)); + adapters.push((adapter, ds_config.default_agent.clone())); + } + } + } + // Discourse if let Some(ref dc_config) = config.discourse { if let Some(api_key) = read_token(&dc_config.api_key_env, "Discourse") { diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index 89353f39d..bea31ce8f 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -1515,6 +1515,21 @@ const CHANNEL_REGISTRY: &[ChannelMeta] = &[ setup_steps: &["Create a robot in your DingTalk group", "Copy the token and signing secret", "Paste them below"], config_template: "[channels.dingtalk]\naccess_token_env = \"DINGTALK_ACCESS_TOKEN\"\nsecret_env = \"DINGTALK_SECRET\"", }, + ChannelMeta { + name: "dingtalk_stream", display_name: "DingTalk Stream", icon: "DS", + description: "DingTalk Stream Mode (WebSocket long-connection)", + category: "enterprise", difficulty: "Easy", setup_time: "~5 min", + quick_setup: "Create an Enterprise Internal App with Stream Mode enabled", + setup_type: "form", + fields: &[ + ChannelField { key: "app_key_env", label: "App Key", field_type: FieldType::Secret, env_var: Some("DINGTALK_APP_KEY"), required: true, placeholder: "ding...", advanced: false }, + ChannelField { key: "app_secret_env", label: "App Secret", field_type: FieldType::Secret, env_var: Some("DINGTALK_APP_SECRET"), required: true, placeholder: "uAn4...", advanced: false }, + ChannelField { key: "robot_code_env", label: "Robot Code", field_type: FieldType::Text, env_var: Some("DINGTALK_ROBOT_CODE"), required: false, placeholder: "ding... (same as App Key)", advanced: true }, + ChannelField { key: "default_agent", label: "Default Agent", field_type: FieldType::Text, env_var: None, required: false, placeholder: "assistant", advanced: true }, + ], + setup_steps: &["Create an Enterprise Internal App in DingTalk Open Platform", "Enable Stream Mode in the app settings", "Add robot capability and configure permissions", "Copy App Key and App Secret below"], + config_template: "[channels.dingtalk_stream]\napp_key_env = \"DINGTALK_APP_KEY\"\napp_secret_env = \"DINGTALK_APP_SECRET\"", + }, ChannelMeta { name: "pumble", display_name: "Pumble", icon: "PB", description: "Pumble bot adapter", @@ -1804,6 +1819,7 @@ fn is_channel_configured(config: &openfang_types::config::ChannelsConfig, name: "webex" => config.webex.is_some(), "feishu" => config.feishu.is_some(), "dingtalk" => config.dingtalk.is_some(), + "dingtalk_stream" => config.dingtalk_stream.is_some(), "pumble" => config.pumble.is_some(), "flock" => config.flock.is_some(), "twist" => config.twist.is_some(), @@ -1926,6 +1942,7 @@ fn channel_config_values( "twist" => config.twist.as_ref().and_then(|c| serde_json::to_value(c).ok()), "mumble" => config.mumble.as_ref().and_then(|c| serde_json::to_value(c).ok()), "dingtalk" => config.dingtalk.as_ref().and_then(|c| serde_json::to_value(c).ok()), + "dingtalk_stream" => config.dingtalk_stream.as_ref().and_then(|c| serde_json::to_value(c).ok()), "discourse" => config.discourse.as_ref().and_then(|c| serde_json::to_value(c).ok()), "gitter" => config.gitter.as_ref().and_then(|c| serde_json::to_value(c).ok()), "ntfy" => config.ntfy.as_ref().and_then(|c| serde_json::to_value(c).ok()), diff --git a/crates/openfang-channels/src/dingtalk_stream.rs b/crates/openfang-channels/src/dingtalk_stream.rs new file mode 100644 index 000000000..c8abf114e --- /dev/null +++ b/crates/openfang-channels/src/dingtalk_stream.rs @@ -0,0 +1,607 @@ +//! DingTalk Stream channel adapter. +//! +//! Uses DingTalk Stream Mode (WebSocket long-connection) instead of the +//! legacy webhook approach. The webhook adapter in `dingtalk.rs` is preserved +//! for backwards compatibility. +//! +//! Protocol: +//! 1. POST /v1.0/oauth2/accessToken → get access token +//! 2. POST /v1.0/gateway/connections/open → get WebSocket URL +//! 3. Connect via WebSocket, handle ping/pong and EVENT messages +//! 4. Outbound: POST /v1.0/robot/oToMessages/batchSend + +use crate::types::{ + split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser, +}; +use async_trait::async_trait; +use chrono::Utc; +use futures::{SinkExt, Stream, StreamExt}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::sync::{mpsc, watch}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tracing::{error, info, warn}; + +const API_BASE: &str = "https://api.dingtalk.com"; +const MAX_MESSAGE_LEN: usize = 20000; + +// ─── Adapter ───────────────────────────────────────────────────────────────── + +pub struct DingTalkStreamAdapter { + app_key: String, + app_secret: String, + robot_code: String, + client: reqwest::Client, + token_cache: Arc>, + shutdown_tx: Arc>, + shutdown_rx: watch::Receiver, +} + +impl DingTalkStreamAdapter { + pub fn new(app_key: String, app_secret: String, robot_code: String) -> Self { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + Self { + app_key, + app_secret, + robot_code, + client: reqwest::Client::new(), + token_cache: Arc::new(Mutex::new(TokenCache::default())), + shutdown_tx: Arc::new(shutdown_tx), + shutdown_rx, + } + } + + async fn get_token(&self) -> Result> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + { + let c = self.token_cache.lock().unwrap(); + if !c.token.is_empty() && c.expire_at > now + 300 { + return Ok(c.token.clone()); + } + } + + let resp: serde_json::Value = self + .client + .post(format!("{API_BASE}/v1.0/oauth2/accessToken")) + .json(&serde_json::json!({ + "appKey": self.app_key, + "appSecret": self.app_secret, + })) + .send() + .await? + .error_for_status()? + .json() + .await?; + + let token = resp["accessToken"] + .as_str() + .ok_or("missing accessToken")? + .to_string(); + let expire_in = resp["expireIn"].as_u64().unwrap_or(7200); + + { + let mut c = self.token_cache.lock().unwrap(); + c.token = token.clone(); + c.expire_at = now + expire_in; + } + Ok(token) + } + + async fn send_to_ids( + &self, + user_ids: &[&str], + content: ChannelContent, + ) -> Result<(), Box> { + let token = self.get_token().await.map_err(|e| -> Box { e })?; + + let (msg_key, msg_param) = match &content { + ChannelContent::Text(t) => ( + "sampleText", + serde_json::json!({ "content": t }).to_string(), + ), + _ => ( + "sampleText", + serde_json::json!({ "content": "(unsupported content type)" }).to_string(), + ), + }; + + let text = match &content { + ChannelContent::Text(t) => t.as_str(), + _ => "(unsupported)", + }; + let chunks = split_message(text, MAX_MESSAGE_LEN); + + for chunk in &chunks { + let param = serde_json::json!({ "content": chunk }).to_string(); + let body = serde_json::json!({ + "robotCode": self.robot_code, + "userIds": user_ids, + "msgKey": msg_key, + "msgParam": param, + }); + + let resp = self + .client + .post(format!("{API_BASE}/v1.0/robot/oToMessages/batchSend")) + .header("x-acs-dingtalk-access-token", &token) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let status = resp.status(); + let err_body = resp.text().await.unwrap_or_default(); + return Err(format!("DingTalk batchSend error {status}: {err_body}").into()); + } + + if chunks.len() > 1 { + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + Ok(()) + } +} + +#[async_trait] +impl ChannelAdapter for DingTalkStreamAdapter { + fn name(&self) -> &str { + "dingtalk_stream" + } + + fn channel_type(&self) -> ChannelType { + ChannelType::Custom("dingtalk_stream".to_string()) + } + + async fn start( + &self, + ) -> Result + Send>>, Box> + { + let (tx, rx) = mpsc::channel::(256); + let app_key = self.app_key.clone(); + let app_secret = self.app_secret.clone(); + let client = self.client.clone(); + let token_cache = Arc::clone(&self.token_cache); + let mut shutdown_rx = self.shutdown_rx.clone(); + + info!("DingTalk Stream adapter starting WebSocket connection"); + + tokio::spawn(async move { + let mut attempt: u32 = 0; + + loop { + if *shutdown_rx.borrow() { + info!("DingTalk Stream: shutdown requested"); + break; + } + + // 1. Get access token + let token = match get_access_token(&client, &app_key, &app_secret, &token_cache) + .await + { + Ok(t) => t, + Err(e) => { + warn!("DingTalk Stream: token fetch failed: {e}"); + attempt += 1; + tokio::time::sleep(backoff(attempt)).await; + continue; + } + }; + + // 2. Get WebSocket endpoint + let ws_url = + match get_ws_endpoint(&client, &app_key, &app_secret, &token).await { + Ok(u) => u, + Err(e) => { + warn!("DingTalk Stream: endpoint fetch failed: {e}"); + attempt += 1; + tokio::time::sleep(backoff(attempt)).await; + continue; + } + }; + + info!( + "DingTalk Stream: connecting to {}...", + &ws_url[..ws_url.len().min(60)] + ); + + // 3. Connect + let ws_stream = match connect_async(&ws_url).await { + Ok((ws, _)) => ws, + Err(e) => { + warn!("DingTalk Stream: WS connect failed: {e}"); + attempt += 1; + tokio::time::sleep(backoff(attempt)).await; + continue; + } + }; + + info!("DingTalk Stream: connected"); + attempt = 0; + let (mut sink, mut source) = ws_stream.split(); + + // 4. Message loop + loop { + tokio::select! { + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + info!("DingTalk Stream: graceful shutdown"); + return; + } + } + msg = source.next() => { + match msg { + None => { warn!("DingTalk Stream: connection closed"); break; } + Some(Err(e)) => { warn!("DingTalk Stream: WS error: {e}"); break; } + Some(Ok(Message::Text(text))) => { + handle_frame(&text, &mut sink, &tx).await; + } + Some(Ok(Message::Ping(d))) => { let _ = sink.send(Message::Pong(d)).await; } + Some(Ok(Message::Close(_))) => { info!("DingTalk Stream: close frame"); break; } + _ => {} + } + } + } + } + + // Reconnect + attempt += 1; + let delay = backoff(attempt); + info!("DingTalk Stream: reconnecting in {delay:?}"); + tokio::time::sleep(delay).await; + } + }); + + Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))) + } + + async fn send( + &self, + user: &ChannelUser, + content: ChannelContent, + ) -> Result<(), Box> { + let uid = &user.platform_id; + if uid.is_empty() { + return Err("DingTalk Stream: no platform_id to reply to".into()); + } + self.send_to_ids(&[uid.as_str()], content).await + } + + async fn send_typing(&self, _user: &ChannelUser) -> Result<(), Box> { + Ok(()) + } + + async fn stop(&self) -> Result<(), Box> { + let _ = self.shutdown_tx.send(true); + Ok(()) + } +} + +// ─── Token helpers ─────────────────────────────────────────────────────────── + +struct TokenCache { + token: String, + expire_at: u64, +} + +impl Default for TokenCache { + fn default() -> Self { + Self { + token: String::new(), + expire_at: 0, + } + } +} + +async fn get_access_token( + http: &reqwest::Client, + app_key: &str, + app_secret: &str, + cache: &Arc>, +) -> Result> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + { + let c = cache.lock().unwrap(); + if !c.token.is_empty() && c.expire_at > now + 300 { + return Ok(c.token.clone()); + } + } + + let resp: serde_json::Value = http + .post(format!("{API_BASE}/v1.0/oauth2/accessToken")) + .json(&serde_json::json!({ "appKey": app_key, "appSecret": app_secret })) + .send() + .await? + .error_for_status()? + .json() + .await?; + + let token = resp["accessToken"] + .as_str() + .ok_or("missing accessToken")? + .to_string(); + let expire_in = resp["expireIn"].as_u64().unwrap_or(7200); + { + let mut c = cache.lock().unwrap(); + c.token = token.clone(); + c.expire_at = now + expire_in; + } + Ok(token) +} + +// ─── Gateway / WebSocket helpers ───────────────────────────────────────────── + +#[derive(Serialize)] +struct OpenConnectionRequest<'a> { + #[serde(rename = "clientId")] + client_id: &'a str, + #[serde(rename = "clientSecret")] + client_secret: &'a str, + subscriptions: Vec, + ua: &'a str, + #[serde(rename = "localIp")] + local_ip: &'a str, +} + +#[derive(Serialize)] +struct SubItem { + #[serde(rename = "type")] + sub_type: String, + topic: String, +} + +#[derive(Deserialize)] +struct OpenConnectionResponse { + endpoint: String, + ticket: String, +} + +async fn get_ws_endpoint( + http: &reqwest::Client, + app_key: &str, + app_secret: &str, + token: &str, +) -> Result> { + let body = OpenConnectionRequest { + client_id: app_key, + client_secret: app_secret, + subscriptions: vec![SubItem { + sub_type: "CALLBACK".to_string(), + topic: "/v1.0/im/bot/messages/get".to_string(), + }], + ua: "openfang/0.3", + local_ip: "", + }; + let resp: OpenConnectionResponse = http + .post(format!("{API_BASE}/v1.0/gateway/connections/open")) + .header("x-acs-dingtalk-access-token", token) + .json(&body) + .send() + .await? + .error_for_status()? + .json() + .await?; + let sep = if resp.endpoint.contains('?') { "&" } else { "?" }; + Ok(format!("{}{}ticket={}", resp.endpoint, sep, resp.ticket)) +} + +// ─── Frame handling ────────────────────────────────────────────────────────── + +#[derive(Deserialize)] +struct ProtoFrame { + #[serde(rename = "type")] + msg_type: String, + headers: ProtoHeaders, + #[serde(default)] + data: serde_json::Value, +} + +#[derive(Deserialize)] +struct ProtoHeaders { + #[serde(rename = "messageId", default)] + message_id: String, + #[serde(default)] + topic: String, +} + +#[derive(Serialize)] +struct AckReply { + code: u32, + headers: AckHeaders, + message: String, + data: String, +} + +#[derive(Serialize)] +struct AckHeaders { + #[serde(rename = "contentType")] + content_type: String, + #[serde(rename = "messageId")] + message_id: String, + topic: String, +} + +fn make_ack(message_id: &str, topic: &str) -> String { + serde_json::to_string(&AckReply { + code: 200, + headers: AckHeaders { + content_type: "application/json".to_string(), + message_id: message_id.to_string(), + topic: topic.to_string(), + }, + message: "OK".to_string(), + data: String::new(), + }) + .unwrap_or_default() +} + +#[derive(Deserialize)] +struct CallbackPayload { + #[serde(rename = "msgtype", default)] + msg_type: String, + #[serde(default)] + text: Option, + #[serde(rename = "senderStaffId", default)] + sender_staff_id: String, + #[serde(rename = "senderId", default)] + sender_id: String, + #[serde(rename = "senderNick", default)] + sender_nick: String, + #[serde(rename = "conversationId", default)] + conversation_id: String, + #[serde(rename = "conversationType", default)] + conversation_type: String, + #[serde(rename = "messageId", default)] + message_id: String, +} + +#[derive(Deserialize)] +struct TextContent { + content: String, +} + +async fn handle_frame(text: &str, sink: &mut S, tx: &mpsc::Sender) +where + S: SinkExt + Unpin, + >::Error: std::fmt::Display, +{ + let frame: ProtoFrame = match serde_json::from_str(text) { + Ok(f) => f, + Err(e) => { + warn!("DingTalk Stream: bad frame: {e}"); + return; + } + }; + + let mid = &frame.headers.message_id; + let topic = &frame.headers.topic; + + match frame.msg_type.as_str() { + "SYSTEM" if topic == "ping" => { + let _ = sink.send(Message::Text(make_ack(mid, "pong"))).await; + } + "CALLBACK" | "EVENT" => { + let data_str = frame.data.to_string(); + // Try direct parse, then try unwrapping double-encoded string + let cb: Option = serde_json::from_str(&data_str) + .ok() + .or_else(|| { + serde_json::from_str::(&data_str) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + }); + + if let Some(cb) = cb { + if cb.msg_type == "text" { + if let Some(ref tc) = cb.text { + let trimmed = tc.content.trim().to_string(); + if !trimmed.is_empty() { + let content = if trimmed.starts_with('/') { + let parts: Vec<&str> = trimmed.splitn(2, ' ').collect(); + let cmd = parts[0].trim_start_matches('/'); + let args: Vec = parts + .get(1) + .map(|a| { + a.split_whitespace().map(String::from).collect() + }) + .unwrap_or_default(); + ChannelContent::Command { + name: cmd.to_string(), + args, + } + } else { + ChannelContent::Text(trimmed) + }; + + let mut meta = HashMap::new(); + meta.insert( + "conversation_id".to_string(), + serde_json::Value::String(cb.conversation_id), + ); + + let uid = if cb.sender_staff_id.is_empty() { + cb.sender_id + } else { + cb.sender_staff_id + }; + + let msg = ChannelMessage { + channel: ChannelType::Custom("dingtalk_stream".to_string()), + platform_message_id: cb.message_id, + sender: ChannelUser { + platform_id: uid, + display_name: cb.sender_nick, + openfang_user: None, + }, + content, + target_agent: None, + timestamp: Utc::now(), + is_group: cb.conversation_type == "2", + thread_id: None, + metadata: meta, + }; + + if tx.send(msg).await.is_err() { + error!("DingTalk Stream: channel receiver dropped"); + } + } + } + } + } + + let _ = sink.send(Message::Text(make_ack(mid, topic))).await; + } + _ => { + let _ = sink.send(Message::Text(make_ack(mid, topic))).await; + } + } +} + +fn backoff(attempt: u32) -> Duration { + let ms = (1000u64 * 2u64.saturating_pow(attempt.min(6))).min(60_000); + Duration::from_millis(ms) +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn adapter_creation() { + let a = DingTalkStreamAdapter::new("k".into(), "s".into(), "r".into()); + assert_eq!(a.name(), "dingtalk_stream"); + assert_eq!( + a.channel_type(), + ChannelType::Custom("dingtalk_stream".to_string()) + ); + } + + #[test] + fn backoff_doubles() { + assert_eq!(backoff(0), Duration::from_millis(1000)); + assert_eq!(backoff(1), Duration::from_millis(2000)); + assert_eq!(backoff(2), Duration::from_millis(4000)); + } + + #[test] + fn backoff_capped() { + assert_eq!(backoff(10), Duration::from_millis(60_000)); + assert_eq!(backoff(20), Duration::from_millis(60_000)); + } + + #[test] + fn make_ack_valid_json() { + let ack = make_ack("msg1", "topic1"); + let v: serde_json::Value = serde_json::from_str(&ack).unwrap(); + assert_eq!(v["code"], 200); + assert_eq!(v["headers"]["messageId"], "msg1"); + } +} diff --git a/crates/openfang-channels/src/lib.rs b/crates/openfang-channels/src/lib.rs index 978c202e7..97db72e1a 100644 --- a/crates/openfang-channels/src/lib.rs +++ b/crates/openfang-channels/src/lib.rs @@ -43,6 +43,7 @@ pub mod twist; pub mod webex; // Wave 5 — Niche & differentiating channels pub mod dingtalk; +pub mod dingtalk_stream; pub mod discourse; pub mod gitter; pub mod gotify; diff --git a/crates/openfang-cli/src/tui/screens/channels.rs b/crates/openfang-cli/src/tui/screens/channels.rs index 8bcf0508a..aa51b432e 100644 --- a/crates/openfang-cli/src/tui/screens/channels.rs +++ b/crates/openfang-cli/src/tui/screens/channels.rs @@ -200,7 +200,14 @@ const CHANNEL_DEFS: &[ChannelDef] = &[ display_name: "DingTalk", category: "Enterprise", env_vars: &["DINGTALK_ACCESS_TOKEN", "DINGTALK_SECRET"], - description: "DingTalk Robot API adapter", + description: "DingTalk Robot API adapter (webhook mode)", + }, + ChannelDef { + name: "dingtalk_stream", + display_name: "DingTalk Stream", + category: "Enterprise", + env_vars: &["DINGTALK_APP_KEY", "DINGTALK_APP_SECRET", "DINGTALK_ROBOT_CODE"], + description: "DingTalk Stream Mode (WebSocket long-connection)", }, ChannelDef { name: "pumble", diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 617c09bfc..0bd5b061f 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -1507,8 +1507,10 @@ pub struct ChannelsConfig { // Wave 5 — Niche & differentiating channels /// Mumble text chat configuration (None = disabled). pub mumble: Option, - /// DingTalk robot configuration (None = disabled). + /// DingTalk robot configuration — webhook mode (None = disabled). pub dingtalk: Option, + /// DingTalk Stream mode — long-lived WebSocket (None = disabled). + pub dingtalk_stream: Option, /// Discourse forum configuration (None = disabled). pub discourse: Option, /// Gitter streaming configuration (None = disabled). @@ -2566,6 +2568,39 @@ impl Default for DingTalkConfig { } } +/// DingTalk Stream channel adapter configuration. +/// +/// Uses the DingTalk Stream Mode (WebSocket long-connection) instead of +/// the legacy webhook approach. Requires an Enterprise Internal App with +/// Stream Mode enabled in the DingTalk Open Platform console. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct DingTalkStreamConfig { + /// Env var holding the App Key (client_id). + pub app_key_env: String, + /// Env var holding the App Secret (client_secret). + pub app_secret_env: String, + /// Robot code for outbound batchSend (often same as app_key). + pub robot_code_env: String, + /// Default agent name to route messages to. + pub default_agent: Option, + /// Per-channel behavior overrides. + #[serde(default)] + pub overrides: ChannelOverrides, +} + +impl Default for DingTalkStreamConfig { + fn default() -> Self { + Self { + app_key_env: "DINGTALK_APP_KEY".to_string(), + app_secret_env: "DINGTALK_APP_SECRET".to_string(), + robot_code_env: "DINGTALK_ROBOT_CODE".to_string(), + default_agent: None, + overrides: ChannelOverrides::default(), + } + } +} + /// Discourse forum channel adapter configuration. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -3081,6 +3116,26 @@ impl KernelConfig { )); } } + if let Some(ref ds) = self.channels.dingtalk_stream { + if std::env::var(&ds.app_key_env) + .unwrap_or_default() + .is_empty() + { + warnings.push(format!( + "DingTalk Stream configured but {} is not set", + ds.app_key_env + )); + } + if std::env::var(&ds.app_secret_env) + .unwrap_or_default() + .is_empty() + { + warnings.push(format!( + "DingTalk Stream configured but {} is not set", + ds.app_secret_env + )); + } + } if let Some(ref dc) = self.channels.discourse { if std::env::var(&dc.api_key_env) .unwrap_or_default() From 9846ddf3474bac7a36b3fbbb220dacd3317e969c Mon Sep 17 00:00:00 2001 From: Wang Hanbin Date: Fri, 6 Mar 2026 14:32:11 +0800 Subject: [PATCH 2/2] fix(api): correct ToolUse/ToolResult correlation for session display MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ToolUse and ToolResult live in separate messages (Assistant then User) per Anthropic protocol. The old code tried to attach results to tools in the same message, so ToolResult was silently discarded. Two-pass approach: build tool_use_id→(msg_idx,tool_idx) map, then when processing ToolResult blocks, attach to the correct ToolUse in a previous message. Also adds 'input' field and increases result preview to 2000 chars. Made-with: Cursor --- crates/openfang-api/src/routes.rs | 108 ++++++++++++++++++------------ 1 file changed, 66 insertions(+), 42 deletions(-) diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index bea31ce8f..ae8847f4f 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -390,60 +390,84 @@ pub async fn get_agent_session( match state.kernel.memory.get_session(entry.session_id) { Ok(Some(session)) => { - let messages: Vec = session - .messages - .iter() - .filter_map(|m| { - let mut tools: Vec = Vec::new(); - let content = match &m.content { - openfang_types::message::MessageContent::Text(t) => t.clone(), - openfang_types::message::MessageContent::Blocks(blocks) => { - // Extract human-readable text and tool info from blocks - let mut texts = Vec::new(); - for b in blocks { - match b { - openfang_types::message::ContentBlock::Text { text } => { - texts.push(text.clone()); - } - openfang_types::message::ContentBlock::Image { .. } => { - texts.push("[Image]".to_string()); - } - openfang_types::message::ContentBlock::ToolUse { - name, .. - } => { - tools.push(serde_json::json!({ - "name": name, - "running": false, - "expanded": false, - })); - } - openfang_types::message::ContentBlock::ToolResult { - content: result, - is_error, - .. - } => { - // Attach result to the most recent tool without a result - if let Some(last_tool) = tools.last_mut() { + // ToolUse and ToolResult live in separate messages (Assistant then User). + // Two-pass: first build per-message tools/content, then attach results + // back to the correct ToolUse using the stable tool_use_id. + let mut tool_use_map: std::collections::HashMap = + std::collections::HashMap::new(); + let mut all_roles: Vec = Vec::new(); + let mut all_contents: Vec = Vec::new(); + let mut all_tools: Vec> = Vec::new(); + + for m in &session.messages { + let mut tools: Vec = Vec::new(); + let content = match &m.content { + openfang_types::message::MessageContent::Text(t) => t.clone(), + openfang_types::message::MessageContent::Blocks(blocks) => { + let mut texts = Vec::new(); + for b in blocks { + match b { + openfang_types::message::ContentBlock::Text { text } => { + texts.push(text.clone()); + } + openfang_types::message::ContentBlock::Image { .. } => { + texts.push("[Image]".to_string()); + } + openfang_types::message::ContentBlock::ToolUse { + id, + name, + input, + } => { + let msg_idx = all_tools.len(); + let tool_idx = tools.len(); + tool_use_map.insert(id.clone(), (msg_idx, tool_idx)); + tools.push(serde_json::json!({ + "name": name, + "input": serde_json::to_string(input).unwrap_or_default(), + "running": false, + "expanded": false, + })); + } + openfang_types::message::ContentBlock::ToolResult { + tool_use_id, + content: result, + is_error, + .. + } => { + // Attach result to the ToolUse in a previous message + if let Some(&(mi, ti)) = tool_use_map.get(tool_use_id) { + if mi < all_tools.len() { let preview: String = - result.chars().take(300).collect(); - last_tool["result"] = + result.chars().take(2000).collect(); + all_tools[mi][ti]["result"] = serde_json::Value::String(preview); - last_tool["is_error"] = + all_tools[mi][ti]["is_error"] = serde_json::Value::Bool(*is_error); } } - _ => {} } + _ => {} } - texts.join("\n") } - }; - // Skip messages that are purely tool results (User role with only ToolResult blocks) + texts.join("\n") + } + }; + all_roles.push(format!("{:?}", m.role)); + all_contents.push(content); + all_tools.push(tools); + } + + let messages: Vec = all_roles + .into_iter() + .zip(all_contents) + .zip(all_tools) + .filter_map(|((role, content), tools)| { + // Skip User messages that consist entirely of ToolResult blocks if content.is_empty() && tools.is_empty() { return None; } let mut msg = serde_json::json!({ - "role": format!("{:?}", m.role), + "role": role, "content": content, }); if !tools.is_empty() {