Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use openfang_channels::webex::WebexAdapter;
// Wave 5
use async_trait::async_trait;
use openfang_channels::dingtalk::DingTalkAdapter;
use openfang_channels::dingtalk_stream::DingTalkStreamAdapter;
use openfang_channels::discourse::DiscourseAdapter;
use openfang_channels::gitter::GitterAdapter;
use openfang_channels::gotify::GotifyAdapter;
Expand Down Expand Up @@ -727,6 +728,7 @@ impl ChannelBridgeHandle for KernelBridgeAdapter {
// Wave 5
"mumble" => channels.mumble.as_ref().map(|c| c.overrides.clone()),
"dingtalk" => channels.dingtalk.as_ref().map(|c| c.overrides.clone()),
"dingtalk_stream" => channels.dingtalk_stream.as_ref().map(|c| c.overrides.clone()),
"discourse" => channels.discourse.as_ref().map(|c| c.overrides.clone()),
"gitter" => channels.gitter.as_ref().map(|c| c.overrides.clone()),
"ntfy" => channels.ntfy.as_ref().map(|c| c.overrides.clone()),
Expand Down Expand Up @@ -1003,6 +1005,7 @@ pub async fn start_channel_bridge_with_config(
// Wave 5
|| config.mumble.is_some()
|| config.dingtalk.is_some()
|| config.dingtalk_stream.is_some()
|| config.discourse.is_some()
|| config.gitter.is_some()
|| config.ntfy.is_some()
Expand Down Expand Up @@ -1436,7 +1439,7 @@ pub async fn start_channel_bridge_with_config(
}
}

// DingTalk
// DingTalk (webhook mode)
if let Some(ref dt_config) = config.dingtalk {
if let Some(token) = read_token(&dt_config.access_token_env, "DingTalk") {
let secret = read_token(&dt_config.secret_env, "DingTalk (secret)").unwrap_or_default();
Expand All @@ -1445,6 +1448,18 @@ pub async fn start_channel_bridge_with_config(
}
}

// DingTalk (stream mode)
if let Some(ref ds_config) = config.dingtalk_stream {
if let Some(app_key) = read_token(&ds_config.app_key_env, "DingTalk Stream (app_key)") {
if let Some(app_secret) = read_token(&ds_config.app_secret_env, "DingTalk Stream (app_secret)") {
let robot_code = read_token(&ds_config.robot_code_env, "DingTalk Stream (robot_code)")
.unwrap_or_else(|| app_key.clone());
let adapter = Arc::new(DingTalkStreamAdapter::new(app_key, app_secret, robot_code));
adapters.push((adapter, ds_config.default_agent.clone()));
}
}
}

// Discourse
if let Some(ref dc_config) = config.discourse {
if let Some(api_key) = read_token(&dc_config.api_key_env, "Discourse") {
Expand Down
125 changes: 83 additions & 42 deletions crates/openfang-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,60 +390,84 @@ pub async fn get_agent_session(

match state.kernel.memory.get_session(entry.session_id) {
Ok(Some(session)) => {
let messages: Vec<serde_json::Value> = session
.messages
.iter()
.filter_map(|m| {
let mut tools: Vec<serde_json::Value> = Vec::new();
let content = match &m.content {
openfang_types::message::MessageContent::Text(t) => t.clone(),
openfang_types::message::MessageContent::Blocks(blocks) => {
// Extract human-readable text and tool info from blocks
let mut texts = Vec::new();
for b in blocks {
match b {
openfang_types::message::ContentBlock::Text { text } => {
texts.push(text.clone());
}
openfang_types::message::ContentBlock::Image { .. } => {
texts.push("[Image]".to_string());
}
openfang_types::message::ContentBlock::ToolUse {
name, ..
} => {
tools.push(serde_json::json!({
"name": name,
"running": false,
"expanded": false,
}));
}
openfang_types::message::ContentBlock::ToolResult {
content: result,
is_error,
..
} => {
// Attach result to the most recent tool without a result
if let Some(last_tool) = tools.last_mut() {
// ToolUse and ToolResult live in separate messages (Assistant then User).
// Two-pass: first build per-message tools/content, then attach results
// back to the correct ToolUse using the stable tool_use_id.
let mut tool_use_map: std::collections::HashMap<String, (usize, usize)> =
std::collections::HashMap::new();
let mut all_roles: Vec<String> = Vec::new();
let mut all_contents: Vec<String> = Vec::new();
let mut all_tools: Vec<Vec<serde_json::Value>> = Vec::new();

for m in &session.messages {
let mut tools: Vec<serde_json::Value> = Vec::new();
let content = match &m.content {
openfang_types::message::MessageContent::Text(t) => t.clone(),
openfang_types::message::MessageContent::Blocks(blocks) => {
let mut texts = Vec::new();
for b in blocks {
match b {
openfang_types::message::ContentBlock::Text { text } => {
texts.push(text.clone());
}
openfang_types::message::ContentBlock::Image { .. } => {
texts.push("[Image]".to_string());
}
openfang_types::message::ContentBlock::ToolUse {
id,
name,
input,
} => {
let msg_idx = all_tools.len();
let tool_idx = tools.len();
tool_use_map.insert(id.clone(), (msg_idx, tool_idx));
tools.push(serde_json::json!({
"name": name,
"input": serde_json::to_string(input).unwrap_or_default(),
"running": false,
"expanded": false,
}));
}
openfang_types::message::ContentBlock::ToolResult {
tool_use_id,
content: result,
is_error,
..
} => {
// Attach result to the ToolUse in a previous message
if let Some(&(mi, ti)) = tool_use_map.get(tool_use_id) {
if mi < all_tools.len() {
let preview: String =
result.chars().take(300).collect();
last_tool["result"] =
result.chars().take(2000).collect();
all_tools[mi][ti]["result"] =
serde_json::Value::String(preview);
last_tool["is_error"] =
all_tools[mi][ti]["is_error"] =
serde_json::Value::Bool(*is_error);
}
}
_ => {}
}
_ => {}
}
texts.join("\n")
}
};
// Skip messages that are purely tool results (User role with only ToolResult blocks)
texts.join("\n")
}
};
all_roles.push(format!("{:?}", m.role));
all_contents.push(content);
all_tools.push(tools);
}

let messages: Vec<serde_json::Value> = all_roles
.into_iter()
.zip(all_contents)
.zip(all_tools)
.filter_map(|((role, content), tools)| {
// Skip User messages that consist entirely of ToolResult blocks
if content.is_empty() && tools.is_empty() {
return None;
}
let mut msg = serde_json::json!({
"role": format!("{:?}", m.role),
"role": role,
"content": content,
});
if !tools.is_empty() {
Expand Down Expand Up @@ -1515,6 +1539,21 @@ const CHANNEL_REGISTRY: &[ChannelMeta] = &[
setup_steps: &["Create a robot in your DingTalk group", "Copy the token and signing secret", "Paste them below"],
config_template: "[channels.dingtalk]\naccess_token_env = \"DINGTALK_ACCESS_TOKEN\"\nsecret_env = \"DINGTALK_SECRET\"",
},
ChannelMeta {
name: "dingtalk_stream", display_name: "DingTalk Stream", icon: "DS",
description: "DingTalk Stream Mode (WebSocket long-connection)",
category: "enterprise", difficulty: "Easy", setup_time: "~5 min",
quick_setup: "Create an Enterprise Internal App with Stream Mode enabled",
setup_type: "form",
fields: &[
ChannelField { key: "app_key_env", label: "App Key", field_type: FieldType::Secret, env_var: Some("DINGTALK_APP_KEY"), required: true, placeholder: "ding...", advanced: false },
ChannelField { key: "app_secret_env", label: "App Secret", field_type: FieldType::Secret, env_var: Some("DINGTALK_APP_SECRET"), required: true, placeholder: "uAn4...", advanced: false },
ChannelField { key: "robot_code_env", label: "Robot Code", field_type: FieldType::Text, env_var: Some("DINGTALK_ROBOT_CODE"), required: false, placeholder: "ding... (same as App Key)", advanced: true },
ChannelField { key: "default_agent", label: "Default Agent", field_type: FieldType::Text, env_var: None, required: false, placeholder: "assistant", advanced: true },
],
setup_steps: &["Create an Enterprise Internal App in DingTalk Open Platform", "Enable Stream Mode in the app settings", "Add robot capability and configure permissions", "Copy App Key and App Secret below"],
config_template: "[channels.dingtalk_stream]\napp_key_env = \"DINGTALK_APP_KEY\"\napp_secret_env = \"DINGTALK_APP_SECRET\"",
},
ChannelMeta {
name: "pumble", display_name: "Pumble", icon: "PB",
description: "Pumble bot adapter",
Expand Down Expand Up @@ -1804,6 +1843,7 @@ fn is_channel_configured(config: &openfang_types::config::ChannelsConfig, name:
"webex" => config.webex.is_some(),
"feishu" => config.feishu.is_some(),
"dingtalk" => config.dingtalk.is_some(),
"dingtalk_stream" => config.dingtalk_stream.is_some(),
"pumble" => config.pumble.is_some(),
"flock" => config.flock.is_some(),
"twist" => config.twist.is_some(),
Expand Down Expand Up @@ -1926,6 +1966,7 @@ fn channel_config_values(
"twist" => config.twist.as_ref().and_then(|c| serde_json::to_value(c).ok()),
"mumble" => config.mumble.as_ref().and_then(|c| serde_json::to_value(c).ok()),
"dingtalk" => config.dingtalk.as_ref().and_then(|c| serde_json::to_value(c).ok()),
"dingtalk_stream" => config.dingtalk_stream.as_ref().and_then(|c| serde_json::to_value(c).ok()),
"discourse" => config.discourse.as_ref().and_then(|c| serde_json::to_value(c).ok()),
"gitter" => config.gitter.as_ref().and_then(|c| serde_json::to_value(c).ok()),
"ntfy" => config.ntfy.as_ref().and_then(|c| serde_json::to_value(c).ok()),
Expand Down
Loading