Skip to content

Implement Channel-based Message Routing System #63

@thep2p

Description

@thep2p

Overview

The Go implementation uses a channel-based system for routing different types of messages to appropriate processors. This provides clean separation of concerns and enables modular protocol handling. The Rust implementation needs a similar channel abstraction for organizing message flows.

Background

Reference: Go implementation defines channels like TestChannel, Consensus, etc. in the network layer

Channels provide:

  • Message type segregation
  • Protocol modularity
  • Independent message processing
  • Clear routing semantics

Requirements

1. Channel Definition and Registry

use std::fmt;
use strum::{EnumString, Display, EnumIter};

/// Well-known channels for skip graph communication
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, Display, EnumIter)]
#[strum(serialize_all = "kebab-case")]
pub enum Channel {
    /// Test channel for development and testing
    Test,
    
    /// Skip graph search operations
    Search,
    
    /// Node join operations
    Join,
    
    /// Node leave operations
    Leave,
    
    /// Neighbor update messages
    NeighborUpdate,
    
    /// Heartbeat and health checks
    Heartbeat,
    
    /// Consensus protocol messages
    Consensus,
    
    /// Data synchronization
    Sync,
    
    /// Transaction propagation
    Transaction,
    
    /// Custom application channels
    Custom(u16),
}

impl Channel {
    /// Get all standard channels
    pub fn standard_channels() -> Vec<Channel> {
        use strum::IntoEnumIterator;
        Channel::iter().collect()
    }
    
    /// Check if this is a system channel
    pub fn is_system(&self) -> bool {
        matches!(self, 
            Channel::Join | 
            Channel::Leave | 
            Channel::NeighborUpdate | 
            Channel::Heartbeat
        )
    }
    
    /// Get priority for this channel (higher = more important)
    pub fn priority(&self) -> u8 {
        match self {
            Channel::Heartbeat => 10,
            Channel::Join | Channel::Leave => 8,
            Channel::NeighborUpdate => 7,
            Channel::Search => 5,
            Channel::Consensus => 5,
            Channel::Sync => 4,
            Channel::Transaction => 3,
            Channel::Test => 1,
            Channel::Custom(_) => 2,
        }
    }
}

2. Channel Configuration

/// Configuration for a specific channel
#[derive(Debug, Clone)]
pub struct ChannelConfig {
    /// Maximum message size for this channel
    pub max_message_size: usize,
    
    /// Message TTL in seconds
    pub message_ttl: u64,
    
    /// Whether to enable message deduplication
    pub enable_dedup: bool,
    
    /// Buffer size for incoming messages
    pub buffer_size: usize,
    
    /// Rate limiting (messages per second)
    pub rate_limit: Option<u32>,
    
    /// Whether this channel requires authentication
    pub require_auth: bool,
}

impl Default for ChannelConfig {
    fn default() -> Self {
        Self {
            max_message_size: 1024 * 1024, // 1MB
            message_ttl: 60,
            enable_dedup: false,
            buffer_size: 1000,
            rate_limit: None,
            require_auth: false,
        }
    }
}

/// Channel-specific configurations
pub struct ChannelRegistry {
    configs: HashMap<Channel, ChannelConfig>,
}

impl ChannelRegistry {
    pub fn new() -> Self {
        let mut configs = HashMap::new();
        
        // System channels have specific configs
        configs.insert(Channel::Heartbeat, ChannelConfig {
            max_message_size: 1024,
            message_ttl: 10,
            buffer_size: 100,
            ..Default::default()
        });
        
        configs.insert(Channel::Transaction, ChannelConfig {
            max_message_size: 10 * 1024 * 1024, // 10MB
            enable_dedup: true,
            rate_limit: Some(1000),
            ..Default::default()
        });
        
        Self { configs }
    }
    
    pub fn get_config(&self, channel: Channel) -> &ChannelConfig {
        self.configs.get(&channel)
            .unwrap_or(&ChannelConfig::default())
    }
}

3. Channel Router

use tokio::sync::mpsc;
use std::sync::Arc;

/// Routes messages to appropriate channel processors
pub struct ChannelRouter {
    processors: Arc<RwLock<HashMap<Channel, Arc<dyn MessageProcessor>>>>,
    channel_queues: Arc<RwLock<HashMap<Channel, mpsc::Sender<RoutedMessage>>>>,
    registry: Arc<ChannelRegistry>,
    metrics: Arc<RwLock<ChannelMetrics>>,
}

impl ChannelRouter {
    pub fn new(registry: Arc<ChannelRegistry>) -> Self {
        Self {
            processors: Arc::new(RwLock::new(HashMap::new())),
            channel_queues: Arc::new(RwLock::new(HashMap::new())),
            registry,
            metrics: Arc::new(RwLock::new(ChannelMetrics::default())),
        }
    }
    
    /// Register a processor for a channel
    pub async fn register_processor(
        &self,
        channel: Channel,
        processor: Arc<dyn MessageProcessor>,
    ) -> Result<(), ChannelError> {
        let mut processors = self.processors.write().await;
        
        if processors.contains_key(&channel) {
            return Err(ChannelError::ProcessorAlreadyRegistered(channel));
        }
        
        // Create channel queue
        let config = self.registry.get_config(channel);
        let (tx, mut rx) = mpsc::channel(config.buffer_size);
        
        // Spawn processor task
        let processor_clone = processor.clone();
        let metrics = self.metrics.clone();
        
        tokio::spawn(async move {
            while let Some(msg) = rx.recv().await {
                // Update metrics
                metrics.write().await.messages_processed += 1;
                
                // Process message
                processor_clone.process_incoming_message(
                    msg.channel,
                    msg.origin,
                    msg.message,
                ).await;
            }
        });
        
        processors.insert(channel, processor);
        self.channel_queues.write().await.insert(channel, tx);
        
        Ok(())
    }
    
    /// Route an incoming message
    pub async fn route_message(
        &self,
        channel: Channel,
        origin: Identifier,
        message: Box<dyn Message>,
    ) -> Result<(), ChannelError> {
        // Check channel config
        let config = self.registry.get_config(channel);
        
        // Validate message size
        if message.size() > config.max_message_size {
            return Err(ChannelError::MessageTooLarge);
        }
        
        // Apply rate limiting if configured
        if let Some(limit) = config.rate_limit {
            self.check_rate_limit(channel, limit).await?;
        }
        
        // Get channel queue
        let queues = self.channel_queues.read().await;
        let queue = queues.get(&channel)
            .ok_or(ChannelError::NoProcessor(channel))?;
        
        // Route message
        let routed = RoutedMessage {
            channel,
            origin,
            message,
            timestamp: SystemTime::now(),
        };
        
        queue.send(routed).await
            .map_err(|_| ChannelError::QueueFull(channel))?;
        
        // Update metrics
        self.metrics.write().await.messages_routed += 1;
        
        Ok(())
    }
}

4. Channel-aware Network Implementation

impl NetworkImpl {
    /// Process incoming network message
    async fn process_incoming(&self, data: Vec<u8>, from: SocketAddr) {
        // Deserialize envelope
        let envelope: MessageEnvelope = match bincode::deserialize(&data) {
            Ok(e) => e,
            Err(e) => {
                log::warn!("Failed to deserialize message from {}: {}", from, e);
                return;
            }
        };
        
        // Extract channel from envelope
        let channel = match Channel::from_str(&envelope.channel) {
            Ok(c) => c,
            Err(e) => {
                log::warn!("Unknown channel '{}' from {}", envelope.channel, from);
                return;
            }
        };
        
        // Route to appropriate processor
        if let Err(e) = self.router.route_message(
            channel,
            envelope.sender,
            Box::new(envelope.payload),
        ).await {
            log::warn!("Failed to route message on channel {:?}: {}", channel, e);
        }
    }
}

5. Channel Metrics and Monitoring

#[derive(Debug, Default)]
pub struct ChannelMetrics {
    pub messages_routed: u64,
    pub messages_processed: u64,
    pub messages_dropped: u64,
    pub bytes_received: u64,
    pub bytes_sent: u64,
    pub per_channel: HashMap<Channel, ChannelStats>,
}

#[derive(Debug, Default)]
pub struct ChannelStats {
    pub message_count: u64,
    pub byte_count: u64,
    pub error_count: u64,
    pub last_activity: Option<SystemTime>,
    pub average_latency_ms: f64,
}

impl ChannelRouter {
    /// Get metrics for monitoring
    pub async fn get_metrics(&self) -> ChannelMetrics {
        self.metrics.read().await.clone()
    }
    
    /// Get per-channel statistics
    pub async fn get_channel_stats(&self, channel: Channel) -> Option<ChannelStats> {
        self.metrics.read().await
            .per_channel.get(&channel)
            .cloned()
    }
}

6. Usage Example

async fn setup_skip_graph_channels(network: Arc<NetworkImpl>) -> Result<(), Box<dyn Error>> {
    let router = network.get_router();
    
    // Register search processor
    let search_engine = Arc::new(SearchEngine::new());
    router.register_processor(Channel::Search, search_engine).await?;
    
    // Register join/leave processors
    let membership_engine = Arc::new(MembershipEngine::new());
    router.register_processor(Channel::Join, membership_engine.clone()).await?;
    router.register_processor(Channel::Leave, membership_engine).await?;
    
    // Register neighbor update processor
    let neighbor_engine = Arc::new(NeighborEngine::new());
    router.register_processor(Channel::NeighborUpdate, neighbor_engine).await?;
    
    // Register heartbeat processor
    let health_engine = Arc::new(HealthEngine::new());
    router.register_processor(Channel::Heartbeat, health_engine).await?;
    
    Ok(())
}

// Send messages on specific channels
async fn send_search_request(
    conduit: Arc<dyn Conduit>,
    target: Identifier,
    request: SearchRequest,
) -> Result<(), Box<dyn Error>> {
    // Conduit is already bound to Search channel
    let message = Message::new(request.encode()?);
    conduit.send(target, message).await?;
    Ok(())
}

Benefits

  • Modularity: Clean separation between protocol components
  • Type Safety: Channels are strongly typed
  • Performance: Per-channel configuration and optimization
  • Monitoring: Channel-specific metrics and statistics
  • Security: Channel-specific authentication and rate limiting

Testing Requirements

  • Test channel registration
  • Test message routing
  • Test rate limiting
  • Test channel priorities
  • Test metrics collection
  • Test error handling

Dependencies

  • strum (enum utilities)
  • tokio (async channels)
  • bincode (serialization)

Priority

Medium - Important for clean architecture

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions