From cd2c9686278858048d0617c6961f62089c1c371d Mon Sep 17 00:00:00 2001 From: M3gA-Mind Date: Thu, 21 May 2026 19:59:40 +0530 Subject: [PATCH 1/2] feat(composio): add Linear as a native memory provider Adds LinearProvider under src/openhuman/composio/providers/linear/, joining gmail/notion/slack/clickup as the fifth toolkit with periodic Memory Tree ingest. Syncs issues assigned to the connected user via LINEAR_LIST_LINEAR_ISSUES, with cursor-based incremental updates, daily budget enforcement, and per-item dedup. Migrates LINEAR_CURATED catalog from catalogs_productivity.rs into the provider module (consistent with gmail/notion/clickup), adds LINEAR_LIST_LINEAR_USERS to the curated surface for viewer-id resolution, and registers the provider in init_default_providers. Closes #2400 --- src/openhuman/composio/providers/catalogs.rs | 2 +- .../providers/catalogs_productivity.rs | 88 ---- .../composio/providers/descriptions.rs | 2 +- .../composio/providers/linear/mod.rs | 13 + .../composio/providers/linear/provider.rs | 417 ++++++++++++++++++ .../composio/providers/linear/sync.rs | 300 +++++++++++++ .../composio/providers/linear/tests.rs | 172 ++++++++ .../composio/providers/linear/tools.rs | 90 ++++ src/openhuman/composio/providers/mod.rs | 35 +- src/openhuman/composio/providers/registry.rs | 1 + 10 files changed, 1028 insertions(+), 92 deletions(-) create mode 100644 src/openhuman/composio/providers/linear/mod.rs create mode 100644 src/openhuman/composio/providers/linear/provider.rs create mode 100644 src/openhuman/composio/providers/linear/sync.rs create mode 100644 src/openhuman/composio/providers/linear/tests.rs create mode 100644 src/openhuman/composio/providers/linear/tools.rs diff --git a/src/openhuman/composio/providers/catalogs.rs b/src/openhuman/composio/providers/catalogs.rs index 329584bc3..3b58206db 100644 --- a/src/openhuman/composio/providers/catalogs.rs +++ b/src/openhuman/composio/providers/catalogs.rs @@ -27,6 +27,6 @@ pub use super::catalogs_messaging::{ DISCORD_CURATED, MICROSOFT_TEAMS_CURATED, SLACK_CURATED, TELEGRAM_CURATED, WHATSAPP_CURATED, }; pub use super::catalogs_productivity::{ - ASANA_CURATED, DROPBOX_CURATED, JIRA_CURATED, LINEAR_CURATED, OUTLOOK_CURATED, TRELLO_CURATED, + ASANA_CURATED, DROPBOX_CURATED, JIRA_CURATED, OUTLOOK_CURATED, TRELLO_CURATED, }; pub use super::catalogs_social_media::{SPOTIFY_CURATED, TWITTER_CURATED, YOUTUBE_CURATED}; diff --git a/src/openhuman/composio/providers/catalogs_productivity.rs b/src/openhuman/composio/providers/catalogs_productivity.rs index 4b911fd07..a599404ec 100644 --- a/src/openhuman/composio/providers/catalogs_productivity.rs +++ b/src/openhuman/composio/providers/catalogs_productivity.rs @@ -99,94 +99,6 @@ pub const OUTLOOK_CURATED: &[CuratedTool] = &[ }, ]; -// ── linear ────────────────────────────────────────────────────────── -pub const LINEAR_CURATED: &[CuratedTool] = &[ - CuratedTool { - slug: "LINEAR_LIST_LINEAR_ISSUES", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_GET_LINEAR_ISSUE", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_LIST_LINEAR_TEAMS", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_LIST_LINEAR_PROJECTS", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_LIST_LINEAR_STATES", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_SEARCH_ISSUES", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_GET_CYCLES_BY_TEAM_ID", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_LIST_LINEAR_USERS", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_LIST_LINEAR_LABELS", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_GET_LINEAR_PROJECT", - scope: ToolScope::Read, - }, - CuratedTool { - slug: "LINEAR_CREATE_LINEAR_ISSUE", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_UPDATE_ISSUE", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_CREATE_LINEAR_COMMENT", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_CREATE_ATTACHMENT", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_CREATE_LINEAR_PROJECT", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_CREATE_LINEAR_LABEL", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_UPDATE_LINEAR_COMMENT", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_CREATE_ISSUE_RELATION", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_UPDATE_LINEAR_PROJECT", - scope: ToolScope::Write, - }, - CuratedTool { - slug: "LINEAR_DELETE_LINEAR_ISSUE", - scope: ToolScope::Admin, - }, - CuratedTool { - slug: "LINEAR_REMOVE_ISSUE_LABEL", - scope: ToolScope::Admin, - }, -]; - // ── jira ──────────────────────────────────────────────────────────── pub const JIRA_CURATED: &[CuratedTool] = &[ CuratedTool { diff --git a/src/openhuman/composio/providers/descriptions.rs b/src/openhuman/composio/providers/descriptions.rs index 1077cc03d..99fa0e623 100644 --- a/src/openhuman/composio/providers/descriptions.rs +++ b/src/openhuman/composio/providers/descriptions.rs @@ -20,7 +20,7 @@ pub fn toolkit_description(slug: &str) -> &'static str { "google_sheets" => "Read, write, and manage Google Sheets spreadsheets", "outlook" => "Send, read, and manage emails in Microsoft Outlook", "microsoft_teams" => "Send messages and manage channels in Microsoft Teams", - "linear" => "Create and manage issues, projects, and cycles in Linear", + "linear" => "Create and manage issues, projects, and cycles in Linear; sync assigned issues into Memory Tree", "jira" => "Create and manage issues, projects, and sprints in Jira", "trello" => "Create and manage cards, lists, and boards in Trello", "asana" => "Create and manage tasks, projects, and sections in Asana", diff --git a/src/openhuman/composio/providers/linear/mod.rs b/src/openhuman/composio/providers/linear/mod.rs new file mode 100644 index 000000000..d744dbe6a --- /dev/null +++ b/src/openhuman/composio/providers/linear/mod.rs @@ -0,0 +1,13 @@ +//! Linear Composio provider — incremental Memory Tree ingest for +//! issues assigned to the connected user. +//! +//! Issue: #2400. + +mod provider; +mod sync; +#[cfg(test)] +mod tests; +pub mod tools; + +pub use provider::LinearProvider; +pub use tools::LINEAR_CURATED; diff --git a/src/openhuman/composio/providers/linear/provider.rs b/src/openhuman/composio/providers/linear/provider.rs new file mode 100644 index 000000000..b59ef3ce3 --- /dev/null +++ b/src/openhuman/composio/providers/linear/provider.rs @@ -0,0 +1,417 @@ +//! Linear provider — incremental sync of issues assigned to the +//! authenticated user, with per-item persistence into the Memory Tree. +//! +//! On each sync pass: +//! +//! 1. Load persistent [`SyncState`] from the KV store. +//! 2. Check the daily request budget — bail early if exhausted. +//! 3. Resolve the viewer ID via `LINEAR_LIST_LINEAR_USERS { isMe: true }`. +//! 4. Page through `LINEAR_LIST_LINEAR_ISSUES` filtered to the viewer as +//! assignee, ordered by `updatedAt` descending. Stop early once we hit +//! issues older than the cursor or a page without a next-page cursor. +//! 5. For each issue, persist as a single memory document if it's new +//! *or* edited since the last sync. +//! 6. Advance the cursor to the newest `updatedAt` seen and save. +//! +//! Privacy posture: we only pull issues the user is assigned to, never +//! the whole workspace's issue graph. This mirrors the +//! "fetch-what-the-user-sees" model `gmail` / `notion` already follow +//! and avoids accidentally ingesting other teammates' private issues. + +use async_trait::async_trait; +use serde_json::json; + +use super::sync; +use crate::openhuman::composio::providers::sync_state::{persist_single_item, SyncState}; +use crate::openhuman::composio::providers::{ + pick_str, ComposioProvider, CuratedTool, ProviderContext, ProviderUserProfile, SyncOutcome, + SyncReason, +}; + +const ACTION_LIST_USERS: &str = "LINEAR_LIST_LINEAR_USERS"; +const ACTION_LIST_ISSUES: &str = "LINEAR_LIST_LINEAR_ISSUES"; + +/// Page size per API call. We use a small window on steady-state syncs +/// to keep response sizes bounded. +const PAGE_SIZE: u64 = 50; + +/// Larger initial-sync page size so the first backfill catches up faster. +const INITIAL_PAGE_SIZE: u64 = 100; + +/// Maximum pages per sync pass before yielding. Caps initial backfill +/// churn — anything beyond this rolls over to the next sync interval. +const MAX_PAGES_PER_SYNC: u32 = 20; + +/// Paths for extracting a Linear issue's unique ID. +const ISSUE_ID_PATHS: &[&str] = &["id", "data.id", "identifier", "data.identifier"]; + +pub struct LinearProvider; + +impl LinearProvider { + pub fn new() -> Self { + Self + } +} + +impl Default for LinearProvider { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl ComposioProvider for LinearProvider { + fn toolkit_slug(&self) -> &'static str { + "linear" + } + + fn curated_tools(&self) -> Option<&'static [CuratedTool]> { + Some(super::tools::LINEAR_CURATED) + } + + fn sync_interval_secs(&self) -> Option { + // 30 minutes — same cadence as ClickUp/Notion. Linear issues change + // more slowly than chat but faster than email. + Some(30 * 60) + } + + async fn fetch_user_profile( + &self, + ctx: &ProviderContext, + ) -> Result { + tracing::debug!( + connection_id = ?ctx.connection_id, + "[composio:linear] fetch_user_profile via {ACTION_LIST_USERS}" + ); + + let resp = ctx + .execute(ACTION_LIST_USERS, Some(json!({ "isMe": true }))) + .await + .map_err(|e| format!("[composio:linear] {ACTION_LIST_USERS} failed: {e:#}"))?; + + if !resp.successful { + let err = resp + .error + .clone() + .unwrap_or_else(|| "provider reported failure".to_string()); + return Err(format!("[composio:linear] {ACTION_LIST_USERS}: {err}")); + } + + let data = &resp.data; + let viewer = sync::extract_viewer(data); + let viewer_ref = viewer.as_ref().unwrap_or(data); + + let display_name = pick_str(viewer_ref, &["name", "data.name", "displayName"]); + let email = pick_str(viewer_ref, &["email", "data.email"]); + let username = pick_str(viewer_ref, &["id", "data.id"]); + let avatar_url = pick_str(viewer_ref, &["avatarUrl", "data.avatarUrl"]); + let profile_url = pick_str(viewer_ref, &["url", "data.url"]); + + Ok(ProviderUserProfile { + toolkit: "linear".to_string(), + connection_id: ctx.connection_id.clone(), + display_name, + email, + username, + avatar_url, + profile_url, + extras: data.clone(), + }) + } + + async fn sync(&self, ctx: &ProviderContext, reason: SyncReason) -> Result { + let started_at_ms = sync::now_ms(); + let connection_id = ctx + .connection_id + .clone() + .unwrap_or_else(|| "default".to_string()); + + tracing::info!( + connection_id = %connection_id, + reason = reason.as_str(), + "[composio:linear] incremental sync starting" + ); + + // ── Step 1: load persistent sync state ────────────────────── + let Some(memory) = ctx.memory_client() else { + return Err("[composio:linear] memory client not ready".to_string()); + }; + let mut state = SyncState::load(&memory, "linear", &connection_id).await?; + + // ── Step 2: check daily budget ────────────────────────────── + if state.budget_exhausted() { + tracing::info!( + connection_id = %connection_id, + "[composio:linear] daily request budget exhausted, skipping sync" + ); + return Ok(SyncOutcome { + toolkit: "linear".to_string(), + connection_id: Some(connection_id), + reason: reason.as_str().to_string(), + items_ingested: 0, + started_at_ms, + finished_at_ms: sync::now_ms(), + summary: "linear sync skipped: daily budget exhausted".to_string(), + details: json!({ "budget_exhausted": true }), + }); + } + + // ── Step 3: resolve the authenticated user's ID ───────────── + let viewer_id = match self.resolve_viewer_id(ctx, &mut state).await { + Ok(id) => id, + Err(e) => { + let _ = state.save(&memory).await; + return Err(e); + } + }; + + // Re-check budget after the viewer-id probe. + if state.budget_exhausted() { + tracing::info!( + connection_id = %connection_id, + "[composio:linear] budget exhausted after viewer-id probe, skipping sync" + ); + state.save(&memory).await?; + return Ok(SyncOutcome { + toolkit: "linear".to_string(), + connection_id: Some(connection_id), + reason: reason.as_str().to_string(), + items_ingested: 0, + started_at_ms, + finished_at_ms: sync::now_ms(), + summary: "linear sync skipped: daily budget exhausted after viewer-id probe" + .to_string(), + details: json!({ "budget_exhausted": true, "viewer_id_resolved": true }), + }); + } + + // ── Step 4: paginated incremental fetch ────────────────────── + let page_size = match reason { + SyncReason::ConnectionCreated => INITIAL_PAGE_SIZE, + _ => PAGE_SIZE, + }; + + let mut total_fetched: usize = 0; + let mut total_persisted: usize = 0; + let mut newest_updated: Option = None; + let mut after_cursor: Option = None; + let mut hit_cursor_boundary = false; + + for page_num in 0..MAX_PAGES_PER_SYNC { + if state.budget_exhausted() { + tracing::info!( + page = page_num, + "[composio:linear] budget exhausted mid-sync, stopping pagination" + ); + break; + } + + let mut args = json!({ + "assigneeId": viewer_id, + "first": page_size, + "orderBy": "updatedAt", + }); + + if let Some(ref cursor) = after_cursor { + args["after"] = json!(cursor); + } + + let resp = ctx + .execute(ACTION_LIST_ISSUES, Some(args)) + .await + .map_err(|e| { + format!("[composio:linear] {ACTION_LIST_ISSUES} page={page_num}: {e:#}") + })?; + + state.record_requests(1); + + if !resp.successful { + let err = resp + .error + .clone() + .unwrap_or_else(|| "provider reported failure".to_string()); + let _ = state.save(&memory).await; + return Err(format!( + "[composio:linear] {ACTION_LIST_ISSUES} page={page_num}: {err}" + )); + } + + let issues = sync::extract_issues(&resp.data); + total_fetched += issues.len(); + + if issues.is_empty() { + tracing::debug!( + page = page_num, + "[composio:linear] empty page, stopping pagination" + ); + break; + } + + // ── Per-item dedup + persist ───────────────────────────── + for issue in &issues { + let Some(issue_id) = + crate::openhuman::composio::providers::sync_state::extract_item_id( + issue, + ISSUE_ID_PATHS, + ) + else { + tracing::debug!("[composio:linear] issue missing ID, skipping"); + continue; + }; + + let updated = sync::extract_issue_updated(issue); + + // Track newest `updatedAt` for cursor advancement. + if let Some(ref ts) = updated { + if newest_updated.as_ref().is_none_or(|existing| ts > existing) { + newest_updated = Some(ts.clone()); + } + } + + // Composite (issue_id, updatedAt) key so re-edited + // issues are re-persisted on the next sync. + let sync_key = match &updated { + Some(ts) => format!("{issue_id}@{ts}"), + None => issue_id.clone(), + }; + + // If `updatedAt` is at or older than our cursor *and* + // we already synced this key, the rest of the page is + // by definition older — stop early. + if let (Some(ref cursor), Some(ref ts)) = (&state.cursor, &updated) { + if ts <= cursor && state.is_synced(&sync_key) { + hit_cursor_boundary = true; + continue; + } + } + + if state.is_synced(&sync_key) { + continue; + } + + let title_text = sync::extract_issue_title(issue) + .unwrap_or_else(|| format!("Linear issue {issue_id}")); + let doc_id = format!("composio-linear-issue-{issue_id}"); + let title = format!("Linear: {title_text}"); + + match persist_single_item( + &memory, + "linear", + &doc_id, + &title, + issue, + "linear", + ctx.connection_id.as_deref(), + ) + .await + { + Ok(_) => { + state.mark_synced(&sync_key); + total_persisted += 1; + } + Err(e) => { + tracing::warn!( + issue_id = %issue_id, + error = %e, + "[composio:linear] failed to persist issue (continuing)" + ); + } + } + } + + if hit_cursor_boundary { + tracing::debug!( + page = page_num, + "[composio:linear] reached cursor boundary, stopping pagination" + ); + break; + } + + // Advance to the next page using Linear's cursor-based pagination. + match sync::extract_pagination_cursor(&resp.data) { + Some(next_cursor) => { + after_cursor = Some(next_cursor); + } + None => { + tracing::debug!( + page = page_num, + "[composio:linear] no next page cursor, end of results" + ); + break; + } + } + } + + // ── Step 5: advance cursor and save state ──────────────────── + if let Some(new_cursor) = newest_updated { + state.advance_cursor(&new_cursor); + } + state.set_last_sync_at_ms(sync::now_ms()); + state.save(&memory).await?; + + let finished_at_ms = sync::now_ms(); + let summary = format!( + "linear sync ({reason}): fetched {total_fetched}, persisted {total_persisted} new, \ + budget remaining {remaining}", + reason = reason.as_str(), + remaining = state.budget_remaining(), + ); + tracing::info!( + connection_id = %connection_id, + elapsed_ms = finished_at_ms.saturating_sub(started_at_ms), + total_fetched, + total_persisted, + budget_remaining = state.budget_remaining(), + "[composio:linear] incremental sync complete" + ); + + Ok(SyncOutcome { + toolkit: "linear".to_string(), + connection_id: Some(connection_id), + reason: reason.as_str().to_string(), + items_ingested: total_persisted, + started_at_ms, + finished_at_ms, + summary, + details: json!({ + "issues_fetched": total_fetched, + "issues_persisted": total_persisted, + "budget_remaining": state.budget_remaining(), + "cursor": state.cursor, + "synced_ids_total": state.synced_ids.len(), + }), + }) + } +} + +impl LinearProvider { + /// Look up (and budget-record) the authenticated viewer's ID. + /// + /// The ID is stable for the connection's lifetime. We re-fetch on + /// every sync rather than caching it in `SyncState` because (a) the + /// call is cheap, (b) it implicitly validates that the OAuth + /// connection is still good before we start paginating. + async fn resolve_viewer_id( + &self, + ctx: &ProviderContext, + state: &mut SyncState, + ) -> Result { + let resp = ctx + .execute(ACTION_LIST_USERS, Some(json!({ "isMe": true }))) + .await + .map_err(|e| format!("[composio:linear] {ACTION_LIST_USERS} failed: {e:#}"))?; + state.record_requests(1); + + if !resp.successful { + let err = resp + .error + .clone() + .unwrap_or_else(|| "provider reported failure".to_string()); + return Err(format!("[composio:linear] {ACTION_LIST_USERS}: {err}")); + } + + sync::extract_viewer_id(&resp.data).ok_or_else(|| { + "[composio:linear] LINEAR_LIST_LINEAR_USERS returned no viewer id".to_string() + }) + } +} diff --git a/src/openhuman/composio/providers/linear/sync.rs b/src/openhuman/composio/providers/linear/sync.rs new file mode 100644 index 000000000..83aa117ba --- /dev/null +++ b/src/openhuman/composio/providers/linear/sync.rs @@ -0,0 +1,300 @@ +//! Linear sync helpers — result extraction, issue-title extraction, +//! viewer identity, cursor extraction, and time utilities. +//! +//! Linear's GraphQL API (and therefore Composio's wrapping of it) returns +//! connection-style lists (`{ nodes: [...], pageInfo: {...} }`) at the top +//! level or nested under `data`. The functions here walk the union of +//! common shapes so the provider does not have to branch per Composio +//! envelope variant. + +use serde_json::Value; + +use crate::openhuman::composio::providers::pick_str; + +/// Walk the Composio response envelope for Linear issue list results. +/// +/// Linear's list endpoints return `{ nodes: [...] }` or +/// `{ issues: { nodes: [...] } }` shapes; Composio may re-wrap the +/// upstream payload under `data` or `data.data`. We probe each shape +/// in order and return the first array we find. +pub(crate) fn extract_issues(data: &Value) -> Vec { + let candidates = [ + data.pointer("/data/nodes"), + data.pointer("/nodes"), + data.pointer("/data/data/nodes"), + data.pointer("/data/issues/nodes"), + data.pointer("/data/results"), + data.pointer("/results"), + data.pointer("/data/items"), + data.pointer("/items"), + ]; + for cand in candidates.into_iter().flatten() { + if let Some(arr) = cand.as_array() { + return arr.clone(); + } + } + Vec::new() +} + +/// Extract a human-readable title from a Linear issue object. +/// +/// Linear issues store the name at `title` (or `data.title` after +/// Composio envelope wrapping). Falls back to `name` / `identifier` +/// so the chunk remains identifiable even for unusual response shapes. +pub(crate) fn extract_issue_title(issue: &Value) -> Option { + pick_str( + issue, + &[ + "title", + "data.title", + "name", + "data.name", + "identifier", + "data.identifier", + ], + ) +} + +/// Extract a stable cursor timestamp from a Linear issue object. +/// +/// Linear uses ISO-8601 strings for timestamps (`updatedAt`). We keep +/// the value as a string so lexicographic comparison against the stored +/// cursor is valid. +pub(crate) fn extract_issue_updated(issue: &Value) -> Option { + pick_str( + issue, + &[ + "updatedAt", + "data.updatedAt", + "updated_at", + "data.updated_at", + ], + ) +} + +/// Extract the viewer (authenticated user) object from a +/// `LINEAR_LIST_LINEAR_USERS { isMe: true }` response. +/// +/// Linear's GraphQL viewer endpoint returns `{ nodes: [{ id, email, … }] }`. +/// Composio may wrap this under `data` or `data.data`. We probe each +/// shape and return the first element of the nodes array, falling back +/// to the payload itself if it looks like a direct user object (has +/// `id` or `email`). +pub(crate) fn extract_viewer(data: &Value) -> Option { + let array_candidates = [ + data.pointer("/data/nodes"), + data.pointer("/nodes"), + data.pointer("/data/data/nodes"), + data.pointer("/data/users/nodes"), + ]; + for cand in array_candidates.into_iter().flatten() { + if let Some(arr) = cand.as_array() { + if let Some(first) = arr.first() { + return Some(first.clone()); + } + } + } + // Fallback: if the payload itself looks like a user object, return it. + if data.get("id").is_some() || data.get("email").is_some() { + return Some(data.clone()); + } + None +} + +/// Extract the viewer's ID string from a `LINEAR_LIST_LINEAR_USERS` +/// response. Returns `None` if the payload does not contain a +/// recognizable user ID. +pub(crate) fn extract_viewer_id(data: &Value) -> Option { + let viewer = extract_viewer(data)?; + pick_str(&viewer, &["id", "data.id"]) +} + +/// Extract a pagination cursor from a Linear connection `pageInfo` block. +/// +/// Returns `Some(endCursor)` only when `hasNextPage` is `true`; +/// `None` when the last page has been reached or when the envelope does +/// not carry `pageInfo` at all. +pub(crate) fn extract_pagination_cursor(data: &Value) -> Option { + let page_info_candidates = [ + data.pointer("/data/pageInfo"), + data.pointer("/pageInfo"), + data.pointer("/data/data/pageInfo"), + data.pointer("/data/issues/pageInfo"), + ]; + for cand in page_info_candidates.into_iter().flatten() { + let has_next = cand + .get("hasNextPage") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + if has_next { + if let Some(cursor) = cand.get("endCursor").and_then(|v| v.as_str()) { + let trimmed = cursor.trim(); + if !trimmed.is_empty() { + return Some(trimmed.to_string()); + } + } + } + } + None +} + +/// Current wall-clock time in milliseconds since the UNIX epoch. +pub(crate) fn now_ms() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + // ── extract_issues ─────────────────────────────────────────────── + + #[test] + fn extract_issues_from_data_nodes() { + let data = json!({ "data": { "nodes": [{"id": "i1"}, {"id": "i2"}] } }); + assert_eq!(extract_issues(&data).len(), 2); + } + + #[test] + fn extract_issues_from_top_level_nodes() { + let data = json!({ "nodes": [{"id": "i3"}] }); + assert_eq!(extract_issues(&data).len(), 1); + } + + #[test] + fn extract_issues_from_data_issues_nodes() { + let data = json!({ "data": { "issues": { "nodes": [{"id": "i4"}, {"id": "i5"}, {"id": "i6"}] } } }); + assert_eq!(extract_issues(&data).len(), 3); + } + + #[test] + fn extract_issues_from_results() { + let data = json!({ "results": [{"id": "i7"}] }); + assert_eq!(extract_issues(&data).len(), 1); + } + + #[test] + fn extract_issues_empty_when_missing() { + let data = json!({ "foo": "bar" }); + assert!(extract_issues(&data).is_empty()); + } + + // ── extract_issue_title ────────────────────────────────────────── + + #[test] + fn extract_issue_title_from_title_field() { + let issue = json!({ "id": "i1", "title": "Fix the login bug" }); + assert_eq!( + extract_issue_title(&issue), + Some("Fix the login bug".into()) + ); + } + + #[test] + fn extract_issue_title_falls_back_to_wrapped_data() { + let issue = json!({ "data": { "title": "Wrapped issue" } }); + assert_eq!(extract_issue_title(&issue), Some("Wrapped issue".into())); + } + + #[test] + fn extract_issue_title_falls_back_to_identifier() { + let issue = json!({ "identifier": "ENG-42" }); + assert_eq!(extract_issue_title(&issue), Some("ENG-42".into())); + } + + // ── extract_issue_updated ──────────────────────────────────────── + + #[test] + fn extract_issue_updated_from_updated_at() { + let issue = json!({ "updatedAt": "2026-03-01T12:00:00.000Z" }); + assert_eq!( + extract_issue_updated(&issue), + Some("2026-03-01T12:00:00.000Z".to_string()) + ); + } + + #[test] + fn extract_issue_updated_falls_back_to_snake_case() { + let issue = json!({ "data": { "updated_at": "2026-01-15T08:30:00.000Z" } }); + assert_eq!( + extract_issue_updated(&issue), + Some("2026-01-15T08:30:00.000Z".to_string()) + ); + } + + // ── extract_viewer ─────────────────────────────────────────────── + + #[test] + fn extract_viewer_from_data_nodes() { + let data = json!({ "data": { "nodes": [{ "id": "usr_1", "email": "a@b.com" }] } }); + let v = extract_viewer(&data).expect("should find viewer"); + assert_eq!(v["id"], "usr_1"); + } + + #[test] + fn extract_viewer_from_top_level_nodes() { + let data = json!({ "nodes": [{ "id": "usr_2" }] }); + let v = extract_viewer(&data).expect("should find viewer"); + assert_eq!(v["id"], "usr_2"); + } + + #[test] + fn extract_viewer_fallback_direct_object() { + let data = json!({ "id": "usr_direct", "name": "Direct User" }); + let v = extract_viewer(&data).expect("should return direct object"); + assert_eq!(v["id"], "usr_direct"); + } + + #[test] + fn extract_viewer_returns_none_when_absent() { + let data = json!({ "foo": "bar" }); + assert!(extract_viewer(&data).is_none()); + } + + // ── extract_pagination_cursor ──────────────────────────────────── + + #[test] + fn extract_pagination_cursor_returns_cursor_when_has_next_page() { + let data = json!({ + "data": { + "pageInfo": { + "hasNextPage": true, + "endCursor": "cursor_abc" + } + } + }); + assert_eq!( + extract_pagination_cursor(&data), + Some("cursor_abc".to_string()) + ); + } + + #[test] + fn extract_pagination_cursor_returns_none_when_last_page() { + let data = json!({ + "pageInfo": { + "hasNextPage": false, + "endCursor": "cursor_xyz" + } + }); + assert!(extract_pagination_cursor(&data).is_none()); + } + + #[test] + fn extract_pagination_cursor_returns_none_when_absent() { + let data = json!({ "nodes": [{"id": "i1"}] }); + assert!(extract_pagination_cursor(&data).is_none()); + } + + // ── now_ms ─────────────────────────────────────────────────────── + + #[test] + fn now_ms_returns_nonzero() { + assert!(now_ms() > 0); + } +} diff --git a/src/openhuman/composio/providers/linear/tests.rs b/src/openhuman/composio/providers/linear/tests.rs new file mode 100644 index 000000000..14fd25d22 --- /dev/null +++ b/src/openhuman/composio/providers/linear/tests.rs @@ -0,0 +1,172 @@ +//! Unit tests for the Linear provider. + +use super::sync::{ + extract_issue_title, extract_issue_updated, extract_issues, extract_pagination_cursor, + extract_viewer, extract_viewer_id, +}; +use super::LinearProvider; +use crate::openhuman::composio::providers::ComposioProvider; +use serde_json::json; + +// ── extract_issues ─────────────────────────────────────────────────── + +#[test] +fn extract_issues_walks_common_shapes() { + let v1 = json!({ "data": { "nodes": [{"id": "i1"}] } }); + let v2 = json!({ "nodes": [{"id": "i2"}, {"id": "i3"}] }); + let v3 = json!({ "data": { "issues": { "nodes": [{"id": "i4"}] } } }); + let v4 = json!({ "foo": "bar" }); + assert_eq!(extract_issues(&v1).len(), 1); + assert_eq!(extract_issues(&v2).len(), 2); + assert_eq!(extract_issues(&v3).len(), 1); + assert_eq!(extract_issues(&v4).len(), 0); +} + +// ── extract_issue_title ────────────────────────────────────────────── + +#[test] +fn extract_issue_title_finds_title_field() { + let issue = json!({ "id": "i1", "title": "Fix the login bug" }); + assert_eq!( + extract_issue_title(&issue), + Some("Fix the login bug".into()) + ); +} + +#[test] +fn extract_issue_title_falls_back_to_wrapped_data() { + let issue = json!({ "data": { "title": "Wrapped issue" } }); + assert_eq!(extract_issue_title(&issue), Some("Wrapped issue".into())); +} + +#[test] +fn extract_issue_title_falls_back_to_identifier() { + let issue = json!({ "identifier": "ENG-99" }); + assert_eq!(extract_issue_title(&issue), Some("ENG-99".into())); +} + +// ── extract_issue_updated ──────────────────────────────────────────── + +#[test] +fn extract_issue_updated_handles_camel_case() { + let issue = json!({ "updatedAt": "2026-03-01T12:00:00.000Z" }); + assert_eq!( + extract_issue_updated(&issue), + Some("2026-03-01T12:00:00.000Z".to_string()) + ); +} + +#[test] +fn extract_issue_updated_handles_wrapped_data() { + let issue = json!({ "data": { "updatedAt": "2026-01-15T08:30:00.000Z" } }); + assert_eq!( + extract_issue_updated(&issue), + Some("2026-01-15T08:30:00.000Z".to_string()) + ); +} + +// ── extract_viewer ─────────────────────────────────────────────────── + +#[test] +fn extract_viewer_finds_first_node() { + let data = json!({ "data": { "nodes": [{ "id": "usr_1", "email": "a@b.com" }] } }); + let v = extract_viewer(&data).expect("viewer found"); + assert_eq!(v["id"], "usr_1"); +} + +#[test] +fn extract_viewer_from_top_level_nodes() { + let data = json!({ "nodes": [{ "id": "usr_2" }] }); + let v = extract_viewer(&data).expect("viewer found"); + assert_eq!(v["id"], "usr_2"); +} + +#[test] +fn extract_viewer_fallback_direct_object() { + let data = json!({ "id": "usr_direct", "name": "Alice" }); + let v = extract_viewer(&data).expect("viewer found"); + assert_eq!(v["id"], "usr_direct"); +} + +#[test] +fn extract_viewer_returns_none_when_absent() { + let data = json!({ "foo": "bar" }); + assert!(extract_viewer(&data).is_none()); +} + +// ── extract_pagination_cursor ──────────────────────────────────────── + +#[test] +fn extract_pagination_cursor_returns_cursor_on_has_next_page() { + let data = json!({ + "data": { + "pageInfo": { "hasNextPage": true, "endCursor": "abc123" } + } + }); + assert_eq!(extract_pagination_cursor(&data), Some("abc123".to_string())); +} + +#[test] +fn extract_pagination_cursor_returns_none_on_last_page() { + let data = json!({ + "pageInfo": { "hasNextPage": false, "endCursor": "xyz" } + }); + assert!(extract_pagination_cursor(&data).is_none()); +} + +#[test] +fn extract_pagination_cursor_returns_none_when_absent() { + let data = json!({ "nodes": [{"id": "i1"}] }); + assert!(extract_pagination_cursor(&data).is_none()); +} + +// ── extract_viewer_id ──────────────────────────────────────────────── + +#[test] +fn extract_viewer_id_from_data_nodes() { + let data = json!({ "data": { "nodes": [{ "id": "usr_abc" }] } }); + assert_eq!(extract_viewer_id(&data), Some("usr_abc".to_string())); +} + +#[test] +fn extract_viewer_id_returns_none_when_absent() { + let data = json!({ "foo": "bar" }); + assert!(extract_viewer_id(&data).is_none()); +} + +// ── provider metadata ──────────────────────────────────────────────── + +#[test] +fn provider_metadata_is_stable() { + let p = LinearProvider::new(); + assert_eq!(p.toolkit_slug(), "linear"); + assert_eq!(p.sync_interval_secs(), Some(30 * 60)); + assert!(p.curated_tools().is_some()); +} + +#[test] +fn curated_tools_contains_core_sync_surface() { + let p = LinearProvider::new(); + let curated = p.curated_tools().expect("LINEAR_CURATED is registered"); + let slugs: Vec<&str> = curated.iter().map(|t| t.slug).collect(); + assert!( + slugs.contains(&"LINEAR_LIST_LINEAR_USERS"), + "LINEAR_LIST_LINEAR_USERS must be in curated catalog" + ); + assert!( + slugs.contains(&"LINEAR_LIST_LINEAR_ISSUES"), + "LINEAR_LIST_LINEAR_ISSUES must be in curated catalog" + ); +} + +#[test] +fn default_impl_matches_new() { + let a = LinearProvider::new(); + let b = LinearProvider::default(); + assert_eq!(a.toolkit_slug(), b.toolkit_slug()); + assert_eq!(a.sync_interval_secs(), b.sync_interval_secs()); + assert_eq!( + a.curated_tools().map(<[_]>::len), + b.curated_tools().map(<[_]>::len), + ); +} diff --git a/src/openhuman/composio/providers/linear/tools.rs b/src/openhuman/composio/providers/linear/tools.rs new file mode 100644 index 000000000..014a6b18c --- /dev/null +++ b/src/openhuman/composio/providers/linear/tools.rs @@ -0,0 +1,90 @@ +//! Curated catalog of Linear Composio actions. + +use crate::openhuman::composio::providers::tool_scope::{CuratedTool, ToolScope}; + +pub const LINEAR_CURATED: &[CuratedTool] = &[ + CuratedTool { + slug: "LINEAR_LIST_LINEAR_USERS", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_LIST_LINEAR_ISSUES", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_GET_LINEAR_ISSUE", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_SEARCH_ISSUES", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_LIST_LINEAR_TEAMS", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_LIST_LINEAR_PROJECTS", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_GET_LINEAR_PROJECT", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_LIST_LINEAR_STATES", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_GET_CYCLES_BY_TEAM_ID", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_LIST_LINEAR_LABELS", + scope: ToolScope::Read, + }, + CuratedTool { + slug: "LINEAR_CREATE_LINEAR_ISSUE", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_UPDATE_ISSUE", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_CREATE_LINEAR_COMMENT", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_UPDATE_LINEAR_COMMENT", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_CREATE_ATTACHMENT", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_CREATE_ISSUE_RELATION", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_CREATE_LINEAR_PROJECT", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_UPDATE_LINEAR_PROJECT", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_CREATE_LINEAR_LABEL", + scope: ToolScope::Write, + }, + CuratedTool { + slug: "LINEAR_DELETE_LINEAR_ISSUE", + scope: ToolScope::Admin, + }, + CuratedTool { + slug: "LINEAR_REMOVE_ISSUE_LABEL", + scope: ToolScope::Admin, + }, +]; diff --git a/src/openhuman/composio/providers/mod.rs b/src/openhuman/composio/providers/mod.rs index 59b0524b0..1d4b71975 100644 --- a/src/openhuman/composio/providers/mod.rs +++ b/src/openhuman/composio/providers/mod.rs @@ -49,6 +49,7 @@ pub mod catalogs_social_media; pub mod clickup; pub mod github; pub mod gmail; +pub mod linear; pub mod notion; pub mod profile; pub mod profile_md; @@ -95,13 +96,14 @@ fn native_provider_sync_interval(toolkit: &str) -> Option { "notion" => Some(notion::NotionProvider::new().sync_interval_secs()), "slack" => Some(slack::SlackProvider::new().sync_interval_secs()), "clickup" => Some(clickup::ClickUpProvider::new().sync_interval_secs()), + "linear" => Some(linear::LinearProvider::new().sync_interval_secs()), _ => None, } .flatten() } fn has_native_provider(toolkit: &str) -> bool { - matches!(toolkit, "gmail" | "notion" | "slack" | "clickup") + matches!(toolkit, "gmail" | "notion" | "slack" | "clickup" | "linear") } /// Static overview of the Composio integrations supported by this core build. @@ -191,7 +193,7 @@ pub fn catalog_for_toolkit(toolkit: &str) -> Option<&'static [CuratedTool]> { "outlook" => Some(catalogs::OUTLOOK_CURATED), // MICROSOFT_TEAMS_* slugs extract to "microsoft" via toolkit_from_slug. "microsoft" | "microsoft_teams" => Some(catalogs::MICROSOFT_TEAMS_CURATED), - "linear" => Some(catalogs::LINEAR_CURATED), + "linear" => Some(linear::LINEAR_CURATED), "jira" => Some(catalogs::JIRA_CURATED), "trello" => Some(catalogs::TRELLO_CURATED), "asana" => Some(catalogs::ASANA_CURATED), @@ -354,6 +356,35 @@ mod tests { assert!(clickup.memory_ingest); } + #[test] + fn capability_matrix_includes_linear_as_native_memory_provider() { + // Locks in the per-issue #2400 registration: a Linear row must + // appear in the capability matrix with the same native-provider + // flags Gmail/Notion/Slack/ClickUp already carry (`memory_ingest`, + // `periodic_sync`, non-zero `sync_interval_secs`). If a future + // change drops one of the four registration touchpoints + // (CAPABILITY_TOOLKITS, has_native_provider, + // native_provider_sync_interval, catalog_for_toolkit) this test + // fails loud rather than silently degrading the provider to + // catalog-only status. + let matrix = capability_matrix(); + let linear = matrix + .iter() + .find(|entry| entry.toolkit == "linear") + .expect("linear capability row"); + assert!(linear.native_provider, "linear must be native"); + assert!(linear.curated_tools, "linear must have a curated catalog"); + assert!( + linear.curated_tool_count > 0, + "linear catalog must be non-empty" + ); + assert!(linear.user_profile); + assert!(linear.initial_sync); + assert!(linear.periodic_sync); + assert_eq!(linear.sync_interval_secs, Some(30 * 60)); + assert!(linear.memory_ingest); + } + #[test] fn toolkit_description_known_slugs_are_distinct_and_non_empty() { let known = [ diff --git a/src/openhuman/composio/providers/registry.rs b/src/openhuman/composio/providers/registry.rs index 3f8e3d2ca..bfcee26ac 100644 --- a/src/openhuman/composio/providers/registry.rs +++ b/src/openhuman/composio/providers/registry.rs @@ -80,6 +80,7 @@ pub fn all_providers() -> Vec { pub fn init_default_providers() { register_provider(Arc::new(super::clickup::ClickUpProvider::new())); register_provider(Arc::new(super::gmail::GmailProvider::new())); + register_provider(Arc::new(super::linear::LinearProvider::new())); register_provider(Arc::new(super::notion::NotionProvider::new())); register_provider(Arc::new(super::slack::SlackProvider::new())); tracing::info!( From 86acfb0683af5008d5679bd7a3f9e6ba6cbfd0ca Mon Sep 17 00:00:00 2001 From: M3gA-Mind Date: Thu, 21 May 2026 22:40:17 +0530 Subject: [PATCH 2/2] fix(composio): address CodeRabbit review on Linear provider - Use &viewer_id in json! macro inside pagination loop to avoid String move on first iteration (CodeRabbit critical, line 213) - Track had_persist_failures and gate cursor advancement on zero failures; emit a warn log when keeping cursor for retry, preventing permanently skipped issues (CodeRabbit major, line 319) --- src/openhuman/composio/providers/linear/provider.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/openhuman/composio/providers/linear/provider.rs b/src/openhuman/composio/providers/linear/provider.rs index b59ef3ce3..d3225e95b 100644 --- a/src/openhuman/composio/providers/linear/provider.rs +++ b/src/openhuman/composio/providers/linear/provider.rs @@ -193,6 +193,7 @@ impl ComposioProvider for LinearProvider { let mut total_fetched: usize = 0; let mut total_persisted: usize = 0; + let mut had_persist_failures = false; let mut newest_updated: Option = None; let mut after_cursor: Option = None; let mut hit_cursor_boundary = false; @@ -207,7 +208,7 @@ impl ComposioProvider for LinearProvider { } let mut args = json!({ - "assigneeId": viewer_id, + "assigneeId": &viewer_id, "first": page_size, "orderBy": "updatedAt", }); @@ -310,6 +311,7 @@ impl ComposioProvider for LinearProvider { total_persisted += 1; } Err(e) => { + had_persist_failures = true; tracing::warn!( issue_id = %issue_id, error = %e, @@ -343,7 +345,11 @@ impl ComposioProvider for LinearProvider { } // ── Step 5: advance cursor and save state ──────────────────── - if let Some(new_cursor) = newest_updated { + if had_persist_failures { + tracing::warn!( + "[composio:linear] persist failures seen; keeping previous cursor for retry" + ); + } else if let Some(new_cursor) = newest_updated { state.advance_cursor(&new_cursor); } state.set_last_sync_at_ms(sync::now_ms());