Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 69 additions & 45 deletions crates/openfang-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,60 +390,84 @@ pub async fn get_agent_session(

match state.kernel.memory.get_session(entry.session_id) {
Ok(Some(session)) => {
let messages: Vec<serde_json::Value> = session
.messages
.iter()
.filter_map(|m| {
let mut tools: Vec<serde_json::Value> = Vec::new();
let content = match &m.content {
openfang_types::message::MessageContent::Text(t) => t.clone(),
openfang_types::message::MessageContent::Blocks(blocks) => {
// Extract human-readable text and tool info from blocks
let mut texts = Vec::new();
for b in blocks {
match b {
openfang_types::message::ContentBlock::Text { text } => {
texts.push(text.clone());
}
openfang_types::message::ContentBlock::Image { .. } => {
texts.push("[Image]".to_string());
}
openfang_types::message::ContentBlock::ToolUse {
name, ..
} => {
tools.push(serde_json::json!({
"name": name,
"running": false,
"expanded": false,
}));
}
openfang_types::message::ContentBlock::ToolResult {
content: result,
is_error,
..
} => {
// Attach result to the most recent tool without a result
if let Some(last_tool) = tools.last_mut() {
// ToolUse and ToolResult live in separate messages (Assistant then User).
// Two-pass: first build per-message tools/content, then attach results
// back to the correct ToolUse using the stable tool_use_id.
let mut tool_use_map: std::collections::HashMap<String, (usize, usize)> =
std::collections::HashMap::new();
let mut all_roles: Vec<String> = Vec::new();
let mut all_contents: Vec<String> = Vec::new();
let mut all_tools: Vec<Vec<serde_json::Value>> = Vec::new();

for m in &session.messages {
let mut tools: Vec<serde_json::Value> = Vec::new();
let content = match &m.content {
openfang_types::message::MessageContent::Text(t) => t.clone(),
openfang_types::message::MessageContent::Blocks(blocks) => {
let mut texts = Vec::new();
for b in blocks {
match b {
openfang_types::message::ContentBlock::Text { text } => {
texts.push(text.clone());
}
openfang_types::message::ContentBlock::Image { .. } => {
texts.push("[Image]".to_string());
}
openfang_types::message::ContentBlock::ToolUse {
id,
name,
input,
} => {
let msg_idx = all_tools.len();
let tool_idx = tools.len();
tool_use_map.insert(id.clone(), (msg_idx, tool_idx));
tools.push(serde_json::json!({
"name": name,
"input": serde_json::to_string(input).unwrap_or_default(),
"running": false,
"expanded": false,
}));
}
openfang_types::message::ContentBlock::ToolResult {
tool_use_id,
content: result,
is_error,
..
} => {
// Attach result to the ToolUse in a previous message
if let Some(&(mi, ti)) = tool_use_map.get(tool_use_id) {
if mi < all_tools.len() {
let preview: String =
result.chars().take(300).collect();
last_tool["result"] =
result.chars().take(2000).collect();
all_tools[mi][ti]["result"] =
serde_json::Value::String(preview);
last_tool["is_error"] =
all_tools[mi][ti]["is_error"] =
serde_json::Value::Bool(*is_error);
}
}
_ => {}
}
_ => {}
}
texts.join("\n")
}
};
// Skip messages that are purely tool results (User role with only ToolResult blocks)
texts.join("\n")
}
};
all_roles.push(format!("{:?}", m.role));
all_contents.push(content);
all_tools.push(tools);
}

let messages: Vec<serde_json::Value> = all_roles
.into_iter()
.zip(all_contents)
.zip(all_tools)
.filter_map(|((role, content), tools)| {
// Skip User messages that consist entirely of ToolResult blocks
if content.is_empty() && tools.is_empty() {
return None;
}
let mut msg = serde_json::json!({
"role": format!("{:?}", m.role),
"role": role,
"content": content,
});
if !tools.is_empty() {
Expand Down Expand Up @@ -5601,7 +5625,7 @@ pub async fn a2a_send_task(
let task = openfang_runtime::a2a::A2aTask {
id: task_id.clone(),
session_id: session_id.clone(),
status: openfang_runtime::a2a::A2aTaskStatus::Working,
status: openfang_runtime::a2a::A2aTaskStatus::Working.into(),
messages: vec![openfang_runtime::a2a::A2aMessage {
role: "user".to_string(),
parts: vec![openfang_runtime::a2a::A2aPart::Text {
Expand Down Expand Up @@ -5710,10 +5734,10 @@ pub async fn a2a_list_external_agents(State(state): State<Arc<AppState>>) -> imp
.unwrap_or_else(|e| e.into_inner());
let items: Vec<serde_json::Value> = agents
.iter()
.map(|(url, card)| {
.map(|(_, card)| {
serde_json::json!({
"name": card.name,
"url": url,
"url": card.url,
"description": card.description,
"skills": card.skills,
"version": card.version,
Expand Down
4 changes: 2 additions & 2 deletions crates/openfang-kernel/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5229,7 +5229,7 @@ impl KernelHandle for OpenFangKernel {
.unwrap_or_else(|e| e.into_inner());
agents
.iter()
.map(|(url, card)| (card.name.clone(), url.clone()))
.map(|(_, card)| (card.name.clone(), card.url.clone()))
.collect()
}

Expand All @@ -5242,7 +5242,7 @@ impl KernelHandle for OpenFangKernel {
agents
.iter()
.find(|(_, card)| card.name.to_lowercase() == name_lower)
.map(|(url, _)| url.clone())
.map(|(_, card)| card.url.clone())
}

async fn send_channel_message(
Expand Down
90 changes: 73 additions & 17 deletions crates/openfang-runtime/src/a2a.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ pub struct A2aTask {
/// Optional session identifier for conversation continuity.
#[serde(default)]
pub session_id: Option<String>,
/// Current task status.
pub status: A2aTaskStatus,
/// Current task status (may be object or string per A2A spec).
pub status: A2aTaskStatusWrapper,
/// Messages exchanged during the task.
#[serde(default)]
pub messages: Vec<A2aMessage>,
Expand All @@ -97,6 +97,49 @@ pub struct A2aTask {
pub artifacts: Vec<A2aArtifact>,
}

/// Wrapper to handle both string and object forms of A2A task status.
///
/// Some implementations return `{"state": "completed", "message": null}`,
/// others return the enum directly.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum A2aTaskStatusWrapper {
/// Object form: `{"state": "completed", "message": null}`
Object(A2aTaskStatusObject),
/// String/enum form: `"completed"`
Enum(A2aTaskStatus),
}

impl A2aTaskStatusWrapper {
/// Resolve to the inner status enum.
pub fn state(&self) -> &A2aTaskStatus {
match self {
A2aTaskStatusWrapper::Object(o) => &o.state,
A2aTaskStatusWrapper::Enum(s) => s,
}
}
}

impl From<A2aTaskStatus> for A2aTaskStatusWrapper {
fn from(s: A2aTaskStatus) -> Self {
A2aTaskStatusWrapper::Enum(s)
}
}

impl PartialEq<A2aTaskStatus> for A2aTaskStatusWrapper {
fn eq(&self, other: &A2aTaskStatus) -> bool {
self.state() == other
}
}

/// Object form of A2A task status returned by some implementations.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2aTaskStatusObject {
pub state: A2aTaskStatus,
#[serde(default)]
pub message: Option<serde_json::Value>,
}

/// A2A task status.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -147,9 +190,22 @@ pub enum A2aPart {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct A2aArtifact {
/// Artifact name.
pub name: String,
#[serde(default)]
pub name: Option<String>,
/// Artifact description.
#[serde(default)]
pub description: Option<String>,
/// Artifact content parts.
pub parts: Vec<A2aPart>,
/// Artifact metadata.
#[serde(default)]
pub metadata: Option<serde_json::Value>,
/// Artifact index.
#[serde(default)]
pub index: Option<u32>,
/// Whether this is the last chunk.
#[serde(default)]
pub last_chunk: Option<bool>,
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -185,7 +241,7 @@ impl A2aTaskStore {
.iter()
.filter(|(_, t)| {
matches!(
t.status,
t.status.state(),
A2aTaskStatus::Completed | A2aTaskStatus::Failed | A2aTaskStatus::Cancelled
)
})
Expand All @@ -211,7 +267,7 @@ impl A2aTaskStore {
pub fn update_status(&self, task_id: &str, status: A2aTaskStatus) -> bool {
let mut tasks = self.tasks.lock().unwrap_or_else(|e| e.into_inner());
if let Some(task) = tasks.get_mut(task_id) {
task.status = status;
task.status = status.into();
true
} else {
false
Expand All @@ -224,7 +280,7 @@ impl A2aTaskStore {
if let Some(task) = tasks.get_mut(task_id) {
task.messages.push(response);
task.artifacts.extend(artifacts);
task.status = A2aTaskStatus::Completed;
task.status = A2aTaskStatus::Completed.into();
}
}

Expand All @@ -233,7 +289,7 @@ impl A2aTaskStore {
let mut tasks = self.tasks.lock().unwrap_or_else(|e| e.into_inner());
if let Some(task) = tasks.get_mut(task_id) {
task.messages.push(error_message);
task.status = A2aTaskStatus::Failed;
task.status = A2aTaskStatus::Failed.into();
}
}

Expand Down Expand Up @@ -490,33 +546,33 @@ mod tests {
let task = A2aTask {
id: "task-1".to_string(),
session_id: None,
status: A2aTaskStatus::Submitted,
status: A2aTaskStatus::Submitted.into(),
messages: vec![],
artifacts: vec![],
};
assert_eq!(task.status, A2aTaskStatus::Submitted);

// Simulate progression
let working = A2aTask {
status: A2aTaskStatus::Working,
status: A2aTaskStatus::Working.into(),
..task.clone()
};
assert_eq!(working.status, A2aTaskStatus::Working);

let completed = A2aTask {
status: A2aTaskStatus::Completed,
status: A2aTaskStatus::Completed.into(),
..task.clone()
};
assert_eq!(completed.status, A2aTaskStatus::Completed);

let cancelled = A2aTask {
status: A2aTaskStatus::Cancelled,
status: A2aTaskStatus::Cancelled.into(),
..task.clone()
};
assert_eq!(cancelled.status, A2aTaskStatus::Cancelled);

let failed = A2aTask {
status: A2aTaskStatus::Failed,
status: A2aTaskStatus::Failed.into(),
..task
};
assert_eq!(failed.status, A2aTaskStatus::Failed);
Expand Down Expand Up @@ -554,7 +610,7 @@ mod tests {
let task = A2aTask {
id: "t-1".to_string(),
session_id: None,
status: A2aTaskStatus::Working,
status: A2aTaskStatus::Working.into(),
messages: vec![],
artifacts: vec![],
};
Expand All @@ -571,7 +627,7 @@ mod tests {
let task = A2aTask {
id: "t-2".to_string(),
session_id: None,
status: A2aTaskStatus::Working,
status: A2aTaskStatus::Working.into(),
messages: vec![],
artifacts: vec![],
};
Expand Down Expand Up @@ -599,7 +655,7 @@ mod tests {
let task = A2aTask {
id: "t-3".to_string(),
session_id: None,
status: A2aTaskStatus::Working,
status: A2aTaskStatus::Working.into(),
messages: vec![],
artifacts: vec![],
};
Expand All @@ -618,7 +674,7 @@ mod tests {
let task = A2aTask {
id: format!("t-{i}"),
session_id: None,
status: A2aTaskStatus::Completed,
status: A2aTaskStatus::Completed.into(),
messages: vec![],
artifacts: vec![],
};
Expand All @@ -630,7 +686,7 @@ mod tests {
let task = A2aTask {
id: "t-2".to_string(),
session_id: None,
status: A2aTaskStatus::Working,
status: A2aTaskStatus::Working.into(),
messages: vec![],
artifacts: vec![],
};
Expand Down