From a20d78428a8c7505098984c5c8c20ee57e881884 Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Fri, 7 Nov 2025 22:48:40 +0800 Subject: [PATCH] P2P: always send/expect network magic bytes --- lib/net/mod.rs | 6 ++- lib/net/peer/error.rs | 4 ++ lib/net/peer/message.rs | 22 +++++++++- lib/net/peer/mod.rs | 97 +++++++++++++++++++++++++++++------------ lib/net/peer/task.rs | 7 ++- 5 files changed, 105 insertions(+), 31 deletions(-) diff --git a/lib/net/mod.rs b/lib/net/mod.rs index 0c64dea..3672a7d 100644 --- a/lib/net/mod.rs +++ b/lib/net/mod.rs @@ -186,6 +186,7 @@ const fn seed_node_addrs(network: Network) -> &'static [SocketAddr] { pub struct Net { pub server: Endpoint, archive: Archive, + network: Network, state: State, active_peers: Arc>>, // None indicates that the stream has ended @@ -280,6 +281,7 @@ impl Net { let connection_ctxt = PeerConnectionCtxt { env, archive: self.archive.clone(), + network: self.network, state: self.state.clone(), }; @@ -358,6 +360,7 @@ impl Net { let net = Net { server, archive, + network, state, active_peers, peer_info_tx, @@ -430,7 +433,7 @@ impl Net { remote_address, } })?; - Connection::from(raw_conn) + Connection::new(raw_conn, self.network) } None => { tracing::debug!("server endpoint closed"); @@ -462,6 +465,7 @@ impl Net { let connection_ctxt = PeerConnectionCtxt { env, archive: self.archive.clone(), + network: self.network, state: self.state.clone(), }; let (connection_handle, info_rx) = diff --git a/lib/net/peer/error.rs b/lib/net/peer/error.rs index 9680627..d369611 100644 --- a/lib/net/peer/error.rs +++ b/lib/net/peer/error.rs @@ -71,10 +71,14 @@ pub(in crate::net::peer) mod connection { #[derive(Debug, Error)] pub enum Receive { + #[error("received incorrect magic: {}", hex::encode(.0))] + BadMagic(crate::net::peer::message::MagicBytes), #[error("bincode error")] Bincode(#[from] bincode::Error), #[error("connection error")] Connection(#[from] quinn::ConnectionError), + #[error("failed to read magic bytes")] + ReadMagic(#[source] quinn::ReadExactError), #[error("read to end error")] ReadToEnd(#[from] quinn::ReadToEndError), } diff --git a/lib/net/peer/message.rs b/lib/net/peer/message.rs index 2f7401b..47e0a3f 100644 --- a/lib/net/peer/message.rs +++ b/lib/net/peer/message.rs @@ -7,9 +7,29 @@ use serde::{Deserialize, Serialize}; use crate::{ net::peer::{PeerState, PeerStateId}, - types::{AuthorizedTransaction, BlockHash, Body, Header, Tip, Txid}, + types::{ + AuthorizedTransaction, BlockHash, Body, Header, Network, Tip, Txid, + }, }; +pub const MAGIC_BYTES_LEN: usize = 4; + +pub type MagicBytes = [u8; MAGIC_BYTES_LEN]; + +pub const fn magic_bytes(network: Network) -> MagicBytes { + // First 4 bytes are the US-TTY (LSB Right) Baudot–Murray code for "THNDR". + // Rightmost bits of the 4th byte is the network identifier. + let b0 = 0b1000_0101; + let b1 = 0b0001_1000; + let b2 = 0b1001_0101; + let mut b3 = 0b0000_0000; + match network { + Network::Regtest => (), + Network::Signet => b3 |= 0b0000_0001, + } + [b0, b1, b2, b3] +} + #[derive(BorshSerialize, Clone, Debug, Deserialize, Serialize)] pub struct Heartbeat(pub PeerState); diff --git a/lib/net/peer/mod.rs b/lib/net/peer/mod.rs index 650d1bd..6be0cb2 100644 --- a/lib/net/peer/mod.rs +++ b/lib/net/peer/mod.rs @@ -18,7 +18,7 @@ use tokio::{spawn, task::JoinHandle, time::Duration}; use crate::{ archive::Archive, state::State, - types::{AuthorizedTransaction, Hash, Tip, Version, hash, schema}, + types::{AuthorizedTransaction, Hash, Network, Tip, Version, hash, schema}, }; mod channel_pool; @@ -140,6 +140,7 @@ where #[derive(Clone)] pub struct Connection { pub(in crate::net) inner: quinn::Connection, + pub network: Network, } impl Connection { @@ -154,14 +155,25 @@ impl Connection { self.inner.remote_address() } - pub async fn new( + pub fn new(connection: quinn::Connection, network: Network) -> Self { + Self { + inner: connection, + network, + } + } + + pub async fn from_connecting( connecting: quinn::Connecting, + network: Network, ) -> Result { let addr = connecting.remote_address(); tracing::trace!(%addr, "connecting to peer"); let connection = connecting.await?; tracing::info!(%addr, "connected successfully to peer"); - Ok(Self { inner: connection }) + Ok(Self { + inner: connection, + network, + }) } async fn receive_request( @@ -170,6 +182,15 @@ impl Connection { { let (tx, mut rx) = self.inner.accept_bi().await?; tracing::trace!(recv_id = %rx.id(), "Receiving request"); + let mut magic_bytes = [0u8; message::MAGIC_BYTES_LEN]; + rx.read_exact(&mut magic_bytes) + .await + .map_err(error::connection::Receive::ReadMagic)?; + if magic_bytes != message::magic_bytes(self.network) { + return Err( + error::connection::Receive::BadMagic(magic_bytes).into() + ); + } let msg_bytes = rx.read_to_end(Connection::READ_REQUEST_LIMIT).await?; let msg: RequestMessage = bincode::deserialize(&msg_bytes)?; tracing::trace!( @@ -191,8 +212,9 @@ impl Connection { "Sending heartbeat" ); let message = RequestMessageRef::from(heartbeat); - let message = bincode::serialize(&message)?; - send.write_all(&message).await.map_err(|err| { + let mut message_buf = message::magic_bytes(self.network).to_vec(); + bincode::serialize_into::<&mut Vec<_>, _>(&mut message_buf, &message)?; + send.write_all(&message_buf).await.map_err(|err| { error::connection::Send::Write { stream_id: send.id(), source: err, @@ -203,10 +225,20 @@ impl Connection { } async fn receive_response( + network: Network, mut recv: RecvStream, read_response_limit: NonZeroUsize, ) -> ResponseResult { tracing::trace!(recv_id = %recv.id(), "Receiving response"); + let mut magic_bytes = [0u8; message::MAGIC_BYTES_LEN]; + recv.read_exact(&mut magic_bytes) + .await + .map_err(error::connection::Receive::ReadMagic)?; + if magic_bytes != message::magic_bytes(network) { + return Err( + error::connection::Receive::BadMagic(magic_bytes).into() + ); + } let response_bytes = recv.read_to_end(read_response_limit.get()).await?; let response: ResponseMessage = bincode::deserialize(&response_bytes)?; @@ -230,18 +262,25 @@ impl Connection { "Sending request" ); let message = RequestMessageRef::from(request); - let message = bincode::serialize(&message)?; - send.write_all(&message).await.map_err(|err| { + let mut message_buf = message::magic_bytes(self.network).to_vec(); + bincode::serialize_into::<&mut Vec<_>, _>(&mut message_buf, &message)?; + send.write_all(&message_buf).await.map_err(|err| { error::connection::Send::Write { stream_id: send.id(), source: err, } })?; send.finish()?; - Ok(Self::receive_response(recv, read_response_limit).await) + Ok( + Self::receive_response(self.network, recv, read_response_limit) + .await, + ) } + // Send a pre-serialized response, where the response does not include + // magic bytes async fn send_serialized_response( + network: Network, mut response_tx: SendStream, serialized_response: &[u8], ) -> Result<(), error::connection::SendResponse> { @@ -249,21 +288,26 @@ impl Connection { send_id = %response_tx.id(), "Sending response" ); - response_tx - .write_all(serialized_response) - .await - .map_err(|err| { - { - error::connection::Send::Write { - stream_id: response_tx.id(), - source: err, - } + async { + response_tx + .write_all(&message::magic_bytes(network)) + .await?; + response_tx.write_all(serialized_response).await + } + .await + .map_err(|err| { + { + error::connection::Send::Write { + stream_id: response_tx.id(), + source: err, } - .into() - }) + } + .into() + }) } async fn send_response( + network: Network, mut response_tx: SendStream, response: ResponseMessage, ) -> Result<(), error::connection::SendResponse> { @@ -272,8 +316,9 @@ impl Connection { send_id = %response_tx.id(), "Sending response" ); - let response_bytes = bincode::serialize(&response)?; - response_tx.write_all(&response_bytes).await.map_err(|err| { + let mut message_buf = message::magic_bytes(network).to_vec(); + bincode::serialize_into::<&mut Vec<_>, _>(&mut message_buf, &response)?; + response_tx.write_all(&message_buf).await.map_err(|err| { { error::connection::Send::Write { stream_id: response_tx.id(), @@ -285,15 +330,10 @@ impl Connection { } } -impl From for Connection { - fn from(inner: quinn::Connection) -> Self { - Self { inner } - } -} - pub struct ConnectionContext { pub env: sneed::Env, pub archive: Archive, + pub network: Network, pub state: State, } @@ -427,7 +467,8 @@ pub fn connect( let status_repr = status_repr.clone(); let info_tx = info_tx.clone(); move || async move { - let connection = Connection::new(connecting).await?; + let connection = + Connection::from_connecting(connecting, ctxt.network).await?; status_repr.store( PeerConnectionStatus::Connected.as_repr(), atomic::Ordering::SeqCst, diff --git a/lib/net/peer/task.rs b/lib/net/peer/task.rs index bd180f9..acd9d5c 100644 --- a/lib/net/peer/task.rs +++ b/lib/net/peer/task.rs @@ -609,7 +609,8 @@ impl ConnectionTask { } (_, _) => ResponseMessage::NoBlock { block_hash }, }; - let () = Connection::send_response(response_tx, resp).await?; + let () = + Connection::send_response(ctxt.network, response_tx, resp).await?; Ok(()) } @@ -661,6 +662,7 @@ impl ConnectionTask { match validate_tx_result { Err(err) => { Connection::send_response( + ctxt.network, response_tx, ResponseMessage::TransactionRejected(txid), ) @@ -669,6 +671,7 @@ impl ConnectionTask { } Ok(_) => { Connection::send_response( + ctxt.network, response_tx, ResponseMessage::TransactionAccepted(txid), ) @@ -844,8 +847,10 @@ impl ConnectionTask { serialized_response, response_tx, }) => { + let network = ctxt.network; self.mailbox_tx.send_response_spawner.spawn(async move { Connection::send_serialized_response( + network, response_tx, &serialized_response, )