Skip to content

Latest commit

 

History

History
111 lines (86 loc) · 3.48 KB

File metadata and controls

111 lines (86 loc) · 3.48 KB

FastIntercomMCP: Smart Sync State Management

Problem Statement

Currently, FastIntercomMCP users have no visibility into data freshness, leading to:

  1. Silent failures - Users get results but don't know if they're missing recent conversations
  2. Performance inconsistency - Background syncs randomly block requests (10+ second delays)
  3. Poor UX - No feedback about whether data is complete for requested timeframes

Proposed Solution: 3-State Sync Management

Implement intelligent sync state checking that categorizes data freshness relative to user queries:

State 1: Stale 🔴

When: last_sync < query_start_date Action: Auto-trigger sync, show "Syncing..." message, retry when complete UX: "Data is stale - syncing latest conversations..."

State 2: Partial 🟡

When: query_start_date <= last_sync < query_end_date Action: Proceed with warning about incomplete data UX: "Analysis includes conversations up to 2024-12-20 14:30 - may be missing recent conversations"

State 3: Fresh 🟢

When: last_sync >= (query_end_date - 5 minutes) Action: Proceed normally without warnings UX: Silent - optimal user experience

Implementation Details

Core Components

  1. Enhanced Sync Status API
def get_sync_status() -> Dict[str, Any]:
    """Return detailed sync metadata."""
    return {
        "last_sync": datetime,
        "total_conversations": int,
        "sync_in_progress": bool,
        "sync_coverage": {
            "start_date": datetime,
            "end_date": datetime
        }
    }
  1. Sync State Checker
def check_sync_state(start_date: datetime, end_date: datetime) -> SyncState:
    """Determine if data is fresh enough for query timeframe."""
    # Implementation of 3-state logic
  1. Non-blocking Background Sync
async def trigger_background_sync():
    """Truly async sync that doesn't block queries."""
    # Use asyncio.create_task() properly
    # Implement sync queue to prevent duplicate syncs

API Changes

Before:

search_conversations(query, limit) -> List[Conversation]

After:

search_conversations(query, limit) -> Dict[str, Any]:
    return {
        "conversations": List[Conversation],
        "sync_info": {
            "state": "fresh|partial|stale", 
            "last_sync": datetime,
            "message": Optional[str],
            "data_complete": bool
        }
    }

Benefits

  1. Transparent UX - Users know exactly what data they're getting
  2. Better Performance - No more blocking background syncs
  3. Automatic Recovery - Stale data triggers smart sync-and-retry
  4. Developer Friendly - Clear APIs for handling different data states

Implementation Priority

  • High Priority: Non-blocking background sync (fixes performance regression)
  • Medium Priority: Sync state checking and warnings
  • Nice to Have: Sync queue management and progress indicators

Backwards Compatibility

  • Existing APIs remain unchanged if sync_info is ignored
  • New functionality is opt-in via response structure
  • No breaking changes to current workflows

Real-World Impact

This enhancement would solve issues like:

  • 10-second delays from background syncs blocking requests
  • Users getting incomplete results without knowing why
  • Poor experience for time-sensitive queries ("show me today's issues")

Would you like to implement this enhancement? I can provide a working implementation or contribute via PR.