diff --git a/.gitignore b/.gitignore index b07033a86..641f4f03b 100644 --- a/.gitignore +++ b/.gitignore @@ -45,5 +45,3 @@ cobertura.xml # Build scripts artifacts *.log -/dash-spv-ffi/peer_reputation.json -/dash-spv/peer_reputation.json diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index c0dc87ff2..a268f8b71 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -22,12 +22,12 @@ use crate::error::{NetworkError, NetworkResult, SpvError as Error}; use crate::network::addrv2::AddrV2Handler; use crate::network::constants::*; use crate::network::discovery::DnsDiscovery; -use crate::network::persist::PeerStore; use crate::network::pool::PeerPool; use crate::network::reputation::{ misbehavior_scores, positive_scores, PeerReputationManager, ReputationAware, }; use crate::network::{HandshakeManager, NetworkManager, Peer}; +use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; use crate::types::PeerInfo; /// Peer network manager @@ -39,7 +39,7 @@ pub struct PeerNetworkManager { /// AddrV2 handler addrv2_handler: Arc, /// Peer persistence - peer_store: Arc, + peer_store: Arc, /// Peer reputation manager reputation_manager: Arc, /// Network type @@ -80,23 +80,12 @@ impl PeerNetworkManager { let discovery = DnsDiscovery::new().await?; let data_dir = config.storage_path.clone().unwrap_or_else(|| PathBuf::from(".")); - let peer_store = PeerStore::new(config.network, data_dir.clone()); - let reputation_manager = Arc::new(PeerReputationManager::new()); - - // Load reputation data if available - let reputation_path = data_dir.join("peer_reputation.json"); + let peer_store = PersistentPeerStorage::open(data_dir.clone()).await?; - // Ensure the directory exists before attempting to load - if let Some(parent_dir) = reputation_path.parent() { - if !parent_dir.exists() { - if let Err(e) = std::fs::create_dir_all(parent_dir) { - log::warn!("Failed to create directory for reputation data: {}", e); - } - } - } + let reputation_manager = Arc::new(PeerReputationManager::new()); - if let Err(e) = reputation_manager.load_from_storage(&reputation_path).await { + if let Err(e) = reputation_manager.load_from_storage(&peer_store).await { log::warn!("Failed to load peer reputation data: {}", e); } @@ -595,7 +584,6 @@ impl PeerNetworkManager { let reputation_manager = self.reputation_manager.clone(); let peer_search_started = self.peer_search_started.clone(); let initial_peers = self.initial_peers.clone(); - let data_dir = self.data_dir.clone(); let connected_peer_count = self.connected_peer_count.clone(); // Check if we're in exclusive mode (explicit flag or peers configured) @@ -750,8 +738,7 @@ impl PeerNetworkManager { } // Save reputation data periodically - let storage_path = data_dir.join("peer_reputation.json"); - if let Err(e) = reputation_manager.save_to_storage(&storage_path).await { + if let Err(e) = reputation_manager.save_to_storage(&peer_store).await { log::warn!("Failed to save reputation data: {}", e); } } @@ -1025,8 +1012,7 @@ impl PeerNetworkManager { } // Save reputation data before shutdown - let reputation_path = self.data_dir.join("peer_reputation.json"); - if let Err(e) = self.reputation_manager.save_to_storage(&reputation_path).await { + if let Err(e) = self.reputation_manager.save_to_storage(&self.peer_store).await { log::warn!("Failed to save reputation data on shutdown: {}", e); } diff --git a/dash-spv/src/network/mod.rs b/dash-spv/src/network/mod.rs index 89e8bde78..ff427d57a 100644 --- a/dash-spv/src/network/mod.rs +++ b/dash-spv/src/network/mod.rs @@ -6,7 +6,6 @@ pub mod discovery; pub mod handshake; pub mod manager; pub mod peer; -pub mod persist; pub mod pool; pub mod reputation; diff --git a/dash-spv/src/network/persist.rs b/dash-spv/src/network/persist.rs deleted file mode 100644 index 814eedeff..000000000 --- a/dash-spv/src/network/persist.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! Peer persistence for saving and loading known peers - -use dashcore::Network; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; - -use crate::error::{SpvError as Error, StorageError}; -use crate::storage::io::atomic_write; - -/// Peer persistence for saving and loading known peer addresses -pub struct PeerStore { - network: Network, - path: PathBuf, -} - -#[derive(Serialize, Deserialize)] -struct SavedPeers { - version: u32, - network: String, - peers: Vec, -} - -#[derive(Serialize, Deserialize)] -struct SavedPeer { - address: String, - services: u64, - last_seen: u64, -} - -impl PeerStore { - /// Create a new peer store for the given network - pub fn new(network: Network, data_dir: PathBuf) -> Self { - let filename = format!("peers_{}.json", network); - let path = data_dir.join(filename); - - Self { - network, - path, - } - } - - /// Save peers to disk - pub async fn save_peers( - &self, - peers: &[dashcore::network::address::AddrV2Message], - ) -> Result<(), Error> { - let saved = SavedPeers { - version: 1, - network: format!("{:?}", self.network), - peers: peers - .iter() - .filter_map(|p| { - p.socket_addr().ok().map(|addr| SavedPeer { - address: addr.to_string(), - services: p.services.as_u64(), - last_seen: p.time as u64, - }) - }) - .collect(), - }; - - let json = serde_json::to_string_pretty(&saved) - .map_err(|e| Error::Storage(StorageError::Serialization(e.to_string())))?; - - atomic_write(&self.path, json.as_bytes()).await.map_err(Error::Storage)?; - - log::debug!("Saved {} peers to {:?}", saved.peers.len(), self.path); - Ok(()) - } - - /// Load peers from disk - pub async fn load_peers(&self) -> Result, Error> { - match tokio::fs::read_to_string(&self.path).await { - Ok(json) => { - let saved: SavedPeers = serde_json::from_str(&json).map_err(|e| { - Error::Storage(StorageError::Corruption(format!( - "Failed to parse peers file: {}", - e - ))) - })?; - - // Verify network matches - if saved.network != format!("{:?}", self.network) { - return Err(Error::Storage(StorageError::Corruption(format!( - "Peers file is for network {} but we are on {:?}", - saved.network, self.network - )))); - } - - let addresses: Vec<_> = - saved.peers.iter().filter_map(|p| p.address.parse().ok()).collect(); - - log::info!("Loaded {} peers from {:?}", addresses.len(), self.path); - Ok(addresses) - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - log::debug!("No saved peers file found at {:?}", self.path); - Ok(vec![]) - } - Err(e) => Err(Error::Storage(StorageError::ReadFailed(e.to_string()))), - } - } - - /// Delete the peers file - pub async fn clear(&self) -> Result<(), Error> { - match tokio::fs::remove_file(&self.path).await { - Ok(_) => { - log::info!("Cleared peer store at {:?}", self.path); - Ok(()) - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(e) => Err(Error::Storage(StorageError::WriteFailed(e.to_string()))), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use dashcore::network::address::{AddrV2, AddrV2Message}; - use dashcore::network::constants::ServiceFlags; - use tempfile::TempDir; - - #[tokio::test] - async fn test_peer_store_save_load() { - let temp_dir = TempDir::new().expect("Failed to create temporary directory for test"); - let store = PeerStore::new(Network::Dash, temp_dir.path().to_path_buf()); - - // Create test peer messages - let addr: std::net::SocketAddr = - "192.168.1.1:9999".parse().expect("Failed to parse test address"); - let msg = AddrV2Message { - time: 1234567890, - services: ServiceFlags::from(1), - addr: AddrV2::Ipv4( - addr.ip().to_string().parse().expect("Failed to parse IPv4 address"), - ), - port: addr.port(), - }; - - // Save peers - store.save_peers(&[msg]).await.expect("Failed to save peers in test"); - - // Load peers - let loaded = store.load_peers().await.expect("Failed to load peers in test"); - assert_eq!(loaded.len(), 1); - assert_eq!(loaded[0], addr); - } - - #[tokio::test] - async fn test_peer_store_empty() { - let temp_dir = TempDir::new().expect("Failed to create temporary directory for test"); - let store = PeerStore::new(Network::Testnet, temp_dir.path().to_path_buf()); - - // Load from non-existent file - let loaded = store.load_peers().await.expect("Failed to load peers from empty store"); - assert!(loaded.is_empty()); - } -} diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index 87e6666f3..2c01e45e6 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -5,17 +5,14 @@ //! implements automatic banning for excessive misbehavior, and provides reputation //! decay over time for recovery. -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; -use crate::storage::io::atomic_write; - -/// Maximum misbehavior score before a peer is banned -const MAX_MISBEHAVIOR_SCORE: i32 = 100; +use crate::storage::{PeerStorage, PersistentPeerStorage}; /// Misbehavior score thresholds for different violations pub mod misbehavior_scores { @@ -80,22 +77,79 @@ const DECAY_INTERVAL: Duration = Duration::from_secs(60 * 60); // 1 hour /// Amount to decay reputation score per interval const DECAY_AMOUNT: i32 = 5; +/// Maximum misbehavior score before a peer is banned +const MAX_MISBEHAVIOR_SCORE: i32 = 100; + /// Minimum score (most positive reputation) -const MIN_SCORE: i32 = -50; +const MIN_MISBEHAVIOR_SCORE: i32 = -50; + +const MAX_BAN_COUNT: u32 = 1000; + +const MAX_ACTION_COUNT: u64 = 1_000_000; + +fn default_instant() -> Instant { + Instant::now() +} + +fn clamp_peer_score<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let mut v = i32::deserialize(deserializer)?; + + if v < MIN_MISBEHAVIOR_SCORE { + log::warn!("Peer has invalid score {v}, clamping to min {MIN_MISBEHAVIOR_SCORE}"); + v = MIN_MISBEHAVIOR_SCORE + } else if v > MAX_MISBEHAVIOR_SCORE { + log::warn!("Peer has invalid score {v}, clamping to max {MAX_MISBEHAVIOR_SCORE}"); + v = MAX_MISBEHAVIOR_SCORE + } + + Ok(v) +} + +fn clamp_peer_ban_count<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let mut v = u32::deserialize(deserializer)?; + + if v > MAX_BAN_COUNT { + log::warn!("Peer has excessive ban count {v}, clamping to {MAX_BAN_COUNT}"); + v = MAX_BAN_COUNT + } + + Ok(v) +} + +fn clamp_peer_connection_attempts<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let mut v = u64::deserialize(deserializer)?; + + v = v.min(MAX_ACTION_COUNT); + + Ok(v) +} /// Peer reputation entry -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerReputation { /// Current misbehavior score + #[serde(deserialize_with = "clamp_peer_score")] pub score: i32, /// Number of times this peer has been banned + #[serde(deserialize_with = "clamp_peer_ban_count")] pub ban_count: u32, /// Time when the peer was banned (if currently banned) + #[serde(skip)] pub banned_until: Option, /// Last time the reputation was updated + #[serde(skip, default = "default_instant")] pub last_update: Instant, /// Total number of positive actions @@ -105,33 +159,24 @@ pub struct PeerReputation { pub negative_actions: u64, /// Connection count + #[serde(deserialize_with = "clamp_peer_connection_attempts")] pub connection_attempts: u64, /// Successful connection count pub successful_connections: u64, /// Last connection time + #[serde(skip)] pub last_connection: Option, } -// Custom serialization for PeerReputation -#[derive(Serialize, Deserialize)] -struct SerializedPeerReputation { - score: i32, - ban_count: u32, - positive_actions: u64, - negative_actions: u64, - connection_attempts: u64, - successful_connections: u64, -} - impl Default for PeerReputation { fn default() -> Self { Self { score: 0, ban_count: 0, banned_until: None, - last_update: Instant::now(), + last_update: default_instant(), positive_actions: 0, negative_actions: 0, connection_attempts: 0, @@ -171,7 +216,7 @@ impl PeerReputation { // Cap at a reasonable maximum to avoid excessive decay let intervals_i32 = intervals.min(i32::MAX as u64) as i32; let decay = intervals_i32.saturating_mul(DECAY_AMOUNT); - self.score = (self.score - decay).max(MIN_SCORE); + self.score = (self.score - decay).max(MIN_MISBEHAVIOR_SCORE); self.last_update = now; } @@ -235,7 +280,7 @@ impl PeerReputationManager { // Update score let old_score = reputation.score; reputation.score = - (reputation.score + score_change).clamp(MIN_SCORE, MAX_MISBEHAVIOR_SCORE); + (reputation.score + score_change).clamp(MIN_MISBEHAVIOR_SCORE, MAX_MISBEHAVIOR_SCORE); // Track positive/negative actions if score_change > 0 { @@ -406,114 +451,40 @@ impl PeerReputationManager { } /// Save reputation data to persistent storage - pub async fn save_to_storage(&self, path: &std::path::Path) -> std::io::Result<()> { + pub async fn save_to_storage(&self, storage: &PersistentPeerStorage) -> std::io::Result<()> { let reputations = self.reputations.read().await; - // Convert to serializable format - let data: Vec<(SocketAddr, SerializedPeerReputation)> = reputations - .iter() - .map(|(addr, rep)| { - let serialized = SerializedPeerReputation { - score: rep.score, - ban_count: rep.ban_count, - positive_actions: rep.positive_actions, - negative_actions: rep.negative_actions, - connection_attempts: rep.connection_attempts, - successful_connections: rep.successful_connections, - }; - (*addr, serialized) - }) - .collect(); - - let json = serde_json::to_string_pretty(&data)?; - atomic_write(path, json.as_bytes()).await.map_err(std::io::Error::other) + storage.save_peers_reputation(&reputations).await.map_err(std::io::Error::other) } /// Load reputation data from persistent storage - pub async fn load_from_storage(&self, path: &std::path::Path) -> std::io::Result<()> { - if !path.exists() { - return Ok(()); - } - - let json = tokio::fs::read_to_string(path).await?; - let data: Vec<(SocketAddr, SerializedPeerReputation)> = serde_json::from_str(&json)?; + pub async fn load_from_storage(&self, storage: &PersistentPeerStorage) -> std::io::Result<()> { + let data = storage.load_peers_reputation().await.map_err(std::io::Error::other)?; let mut reputations = self.reputations.write().await; let mut loaded_count = 0; let mut skipped_count = 0; - for (addr, serialized) in data { - // Validate score is within expected range - let score = if serialized.score < MIN_SCORE { - log::warn!( - "Peer {} has invalid score {} (below minimum), clamping to {}", - addr, - serialized.score, - MIN_SCORE - ); - MIN_SCORE - } else if serialized.score > MAX_MISBEHAVIOR_SCORE { - log::warn!( - "Peer {} has invalid score {} (above maximum), clamping to {}", - addr, - serialized.score, - MAX_MISBEHAVIOR_SCORE - ); - MAX_MISBEHAVIOR_SCORE - } else { - serialized.score - }; - - // Validate ban count is reasonable (max 1000 bans) - const MAX_BAN_COUNT: u32 = 1000; - let ban_count = if serialized.ban_count > MAX_BAN_COUNT { - log::warn!( - "Peer {} has excessive ban count {}, clamping to {}", - addr, - serialized.ban_count, - MAX_BAN_COUNT - ); - MAX_BAN_COUNT - } else { - serialized.ban_count - }; - - // Validate action counts are reasonable (max 1 million actions) - const MAX_ACTION_COUNT: u64 = 1_000_000; - let positive_actions = serialized.positive_actions.min(MAX_ACTION_COUNT); - let negative_actions = serialized.negative_actions.min(MAX_ACTION_COUNT); - let connection_attempts = serialized.connection_attempts.min(MAX_ACTION_COUNT); - let successful_connections = serialized.successful_connections.min(MAX_ACTION_COUNT); - + for (addr, mut reputation) in data { // Validate successful connections don't exceed attempts - let successful_connections = successful_connections.min(connection_attempts); + reputation.successful_connections = + reputation.successful_connections.min(reputation.connection_attempts); // Skip entry if data appears corrupted - if positive_actions == MAX_ACTION_COUNT || negative_actions == MAX_ACTION_COUNT { + if reputation.positive_actions > MAX_ACTION_COUNT + || reputation.negative_actions > MAX_ACTION_COUNT + { log::warn!("Skipping peer {} with potentially corrupted action counts", addr); skipped_count += 1; continue; } - let rep = PeerReputation { - score, - ban_count, - banned_until: None, - last_update: Instant::now(), - positive_actions, - negative_actions, - connection_attempts, - successful_connections, - last_connection: None, - }; - // Apply initial decay based on ban count - let mut rep = rep; - if rep.ban_count > 0 { - rep.score = rep.score.max(50); // Start with higher score for previously banned peers + if reputation.ban_count > 0 { + reputation.score = reputation.score.max(50); // Start with higher score for previously banned peers } - reputations.insert(addr, rep); + reputations.insert(addr, reputation); loaded_count += 1; } diff --git a/dash-spv/src/network/reputation_tests.rs b/dash-spv/src/network/reputation_tests.rs index 82c8453af..8ab6dffc1 100644 --- a/dash-spv/src/network/reputation_tests.rs +++ b/dash-spv/src/network/reputation_tests.rs @@ -2,6 +2,8 @@ #[cfg(test)] mod tests { + use crate::storage::PersistentStorage; + use super::super::*; use std::net::SocketAddr; @@ -61,11 +63,14 @@ mod tests { manager.update_reputation(peer2, 50, "Bad peer").await; // Save and load - let temp_file = tempfile::NamedTempFile::new().unwrap(); - manager.save_to_storage(temp_file.path()).await.unwrap(); + let temp_dir = tempfile::TempDir::new().unwrap(); + let peer_storage = PersistentPeerStorage::open(temp_dir.path()) + .await + .expect("Failed to open PersistentPeerStorage"); + manager.save_to_storage(&peer_storage).await.unwrap(); let new_manager = PeerReputationManager::new(); - new_manager.load_from_storage(temp_file.path()).await.unwrap(); + new_manager.load_from_storage(&peer_storage).await.unwrap(); // Verify scores were preserved assert_eq!(new_manager.get_score(&peer1).await, -10); diff --git a/dash-spv/src/storage/mod.rs b/dash-spv/src/storage/mod.rs index 79a61bc8b..8a052bbe3 100644 --- a/dash-spv/src/storage/mod.rs +++ b/dash-spv/src/storage/mod.rs @@ -1,15 +1,15 @@ //! Storage abstraction for the Dash SPV client. -pub(crate) mod io; - pub mod types; mod blocks; mod chainstate; mod filters; +mod io; mod lockfile; mod masternode; mod metadata; +mod peers; mod segments; mod transactions; @@ -40,6 +40,7 @@ pub use crate::storage::filters::FilterHeaderStorage; pub use crate::storage::filters::FilterStorage; pub use crate::storage::masternode::MasternodeStateStorage; pub use crate::storage::metadata::MetadataStorage; +pub use crate::storage::peers::{PeerStorage, PersistentPeerStorage}; pub use crate::storage::transactions::TransactionStorage; pub use types::*; @@ -85,6 +86,7 @@ pub struct DiskStorageManager { metadata: Arc>, chainstate: Arc>, masternodestate: Arc>, + peers: Arc>, // Background worker worker_handle: Option>, @@ -127,6 +129,7 @@ impl DiskStorageManager { masternodestate: Arc::new(RwLock::new( PersistentMasternodeStateStorage::open(&storage_path).await?, )), + peers: Arc::new(RwLock::new(PersistentPeerStorage::open(&storage_path).await?)), worker_handle: None, @@ -154,6 +157,7 @@ impl DiskStorageManager { let transactions = Arc::clone(&self.transactions); let metadata = Arc::clone(&self.metadata); let chainstate = Arc::clone(&self.chainstate); + let peers = Arc::clone(&self.peers); let storage_path = self.storage_path.clone(); @@ -169,6 +173,7 @@ impl DiskStorageManager { let _ = transactions.write().await.persist(&storage_path).await; let _ = metadata.write().await.persist(&storage_path).await; let _ = chainstate.write().await.persist(&storage_path).await; + let _ = peers.write().await.persist(&storage_path).await; } }); @@ -191,6 +196,7 @@ impl DiskStorageManager { let _ = self.transactions.write().await.persist(storage_path).await; let _ = self.metadata.write().await.persist(storage_path).await; let _ = self.chainstate.write().await.persist(storage_path).await; + let _ = self.peers.write().await.persist(storage_path).await; } } diff --git a/dash-spv/src/storage/peers.rs b/dash-spv/src/storage/peers.rs new file mode 100644 index 000000000..63e2a3dc3 --- /dev/null +++ b/dash-spv/src/storage/peers.rs @@ -0,0 +1,206 @@ +use std::{ + collections::HashMap, + fs::{self, File}, + io::BufReader, + net::SocketAddr, + path::PathBuf, +}; + +use async_trait::async_trait; +use dashcore::{ + consensus::{encode, Decodable, Encodable}, + network::address::AddrV2Message, +}; + +use crate::{ + error::StorageResult, + network::reputation::PeerReputation, + storage::{io::atomic_write, PersistentStorage}, + StorageError, +}; + +#[async_trait] +pub trait PeerStorage { + async fn save_peers( + &self, + peers: &[dashcore::network::address::AddrV2Message], + ) -> StorageResult<()>; + + async fn load_peers(&self) -> StorageResult>; + + async fn save_peers_reputation( + &self, + reputations: &HashMap, + ) -> StorageResult<()>; + + async fn load_peers_reputation(&self) -> StorageResult>; +} + +pub struct PersistentPeerStorage { + storage_path: PathBuf, +} + +impl PersistentPeerStorage { + const FOLDER_NAME: &str = "peers"; + + fn peers_data_file(&self) -> PathBuf { + self.storage_path.join("peers.dat") + } + + fn peers_reputation_file(&self) -> PathBuf { + self.storage_path.join("reputations.json") + } +} + +#[async_trait] +impl PersistentStorage for PersistentPeerStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + let storage_path = storage_path.into(); + + Ok(PersistentPeerStorage { + storage_path: storage_path.join(Self::FOLDER_NAME), + }) + } + + async fn persist(&mut self, _storage_path: impl Into + Send) -> StorageResult<()> { + // Current implementation persists data everytime data is stored + Ok(()) + } +} + +#[async_trait] +impl PeerStorage for PersistentPeerStorage { + async fn save_peers( + &self, + peers: &[dashcore::network::address::AddrV2Message], + ) -> StorageResult<()> { + let peers_file = self.peers_data_file(); + + if let Err(e) = fs::create_dir_all(peers_file.parent().unwrap()) { + return Err(StorageError::WriteFailed(format!("Failed to persist peers: {}", e))); + } + + let mut buffer = Vec::new(); + + for item in peers.iter() { + item.consensus_encode(&mut buffer) + .map_err(|e| StorageError::WriteFailed(format!("Failed to encode peer: {}", e)))?; + } + + let peers_file_parent = peers_file + .parent() + .ok_or(StorageError::NotFound("peers_file doesn't have a parent".to_string()))?; + + tokio::fs::create_dir_all(peers_file_parent).await?; + + atomic_write(&peers_file, &buffer).await?; + + Ok(()) + } + + async fn load_peers(&self) -> StorageResult> { + let peers_file = self.peers_data_file(); + + if !peers_file.exists() { + return Ok(Vec::new()); + }; + + let file = File::open(&peers_file)?; + let mut reader = BufReader::new(file); + let mut peers = Vec::new(); + + loop { + match AddrV2Message::consensus_decode(&mut reader) { + Ok(peer) => peers.push(peer), + Err(encode::Error::Io(ref e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + break + } + Err(e) => { + return Err(StorageError::ReadFailed(format!("Failed to decode peer: {e}"))) + } + } + } + + let peers = peers.into_iter().filter_map(|p| p.socket_addr().ok()).collect(); + + Ok(peers) + } + + async fn save_peers_reputation( + &self, + reputations: &HashMap, + ) -> StorageResult<()> { + let reputation_file = self.peers_reputation_file(); + + let json = serde_json::to_string_pretty(reputations).map_err(|e| { + StorageError::Serialization(format!("Failed to serialize peers reputations: {e}")) + })?; + + let reputation_file_parent = reputation_file + .parent() + .ok_or(StorageError::NotFound("reputation_file doesn't have a parent".to_string()))?; + + tokio::fs::create_dir_all(reputation_file_parent).await?; + + atomic_write(&reputation_file, json.as_bytes()).await + } + + async fn load_peers_reputation(&self) -> StorageResult> { + let reputation_file = self.peers_reputation_file(); + + if !reputation_file.exists() { + return Ok(HashMap::new()); + } + + let json = tokio::fs::read_to_string(reputation_file).await?; + serde_json::from_str(&json).map_err(|e| { + StorageError::ReadFailed(format!("Failed to deserialize peers reputations: {e}")) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use dashcore::network::address::{AddrV2, AddrV2Message}; + use dashcore::network::constants::ServiceFlags; + use tempfile::TempDir; + + #[tokio::test] + async fn test_persistent_peer_storage_save_load() { + let temp_dir = TempDir::new().expect("Failed to create temporary directory for test"); + let store = PersistentPeerStorage::open(temp_dir.path()) + .await + .expect("Failed to open persistent peer storage"); + + // Create test peer messages + let addr: std::net::SocketAddr = + "192.168.1.1:9999".parse().expect("Failed to parse test address"); + let msg = AddrV2Message { + time: 1234567890, + services: ServiceFlags::from(1), + addr: AddrV2::Ipv4( + addr.ip().to_string().parse().expect("Failed to parse IPv4 address"), + ), + port: addr.port(), + }; + + store.save_peers(&[msg]).await.expect("Failed to save peers in test"); + + let loaded = store.load_peers().await.expect("Failed to load peers in test"); + assert_eq!(loaded.len(), 1); + assert_eq!(loaded[0], addr); + } + + #[tokio::test] + async fn test_persistent_peer_storage_empty() { + let temp_dir = TempDir::new().expect("Failed to create temporary directory for test"); + let store = PersistentPeerStorage::open(temp_dir.path()) + .await + .expect("Failed to open persistent peer storage"); + + // Load from non-existent file + let loaded = store.load_peers().await.expect("Failed to load peers from empty store"); + assert!(loaded.is_empty()); + } +} diff --git a/dash-spv/tests/handshake_test.rs b/dash-spv/tests/handshake_test.rs index d8cb6579f..0f5125992 100644 --- a/dash-spv/tests/handshake_test.rs +++ b/dash-spv/tests/handshake_test.rs @@ -72,7 +72,8 @@ async fn test_handshake_timeout() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_network_manager_creation() { - let config = ClientConfig::new(Network::Dash); + let temp_dir = tempfile::TempDir::new().expect("Failed to create temporary directory"); + let config = ClientConfig::new(Network::Dash).with_storage_path(temp_dir.path().to_path_buf()); let network = PeerNetworkManager::new(&config).await; assert!(network.is_ok(), "Network manager creation should succeed"); diff --git a/dash-spv/tests/header_sync_test.rs b/dash-spv/tests/header_sync_test.rs index 3edf0aa94..668426dd5 100644 --- a/dash-spv/tests/header_sync_test.rs +++ b/dash-spv/tests/header_sync_test.rs @@ -261,9 +261,11 @@ async fn test_header_sync_performance() { #[tokio::test] async fn test_header_sync_with_client_integration() { let _ = env_logger::try_init(); + let temp_dir = tempfile::TempDir::new().expect("Failed to create temporary directory"); // Test header sync integration with the full client let config = ClientConfig::new(Network::Dash) + .with_storage_path(temp_dir.path().to_path_buf()) .with_validation_mode(ValidationMode::Basic) .with_connection_timeout(Duration::from_secs(10)); diff --git a/dash-spv/tests/wallet_integration_test.rs b/dash-spv/tests/wallet_integration_test.rs index 4109e7cbf..d61f0f2cb 100644 --- a/dash-spv/tests/wallet_integration_test.rs +++ b/dash-spv/tests/wallet_integration_test.rs @@ -15,7 +15,11 @@ use key_wallet_manager::wallet_manager::WalletManager; /// Create a test SPV client with memory storage for integration testing. async fn create_test_client( ) -> DashSpvClient, PeerNetworkManager, DiskStorageManager> { - let config = ClientConfig::testnet().without_filters().without_masternodes(); + let temp_dir = tempfile::TempDir::new().expect("Failed to create temporary directory"); + let config = ClientConfig::testnet() + .without_filters() + .without_masternodes() + .with_storage_path(temp_dir.path().to_path_buf()); // Create network manager let network_manager = PeerNetworkManager::new(&config).await.unwrap();