diff --git a/Cargo.lock b/Cargo.lock index 5b6a5a2a..0357b94b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5604,6 +5604,7 @@ dependencies = [ "libp2p", "pluto-build-proto", "pluto-cluster", + "pluto-crypto", "pluto-eth2api", "pluto-eth2util", "pluto-p2p", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 594b2d18..7e0d7539 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -27,6 +27,7 @@ thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true +pluto-crypto.workspace = true pluto-eth2util.workspace = true pluto-ssz.workspace = true ssz.workspace = true @@ -44,6 +45,7 @@ prost-types.workspace = true hex.workspace = true chrono.workspace = true test-case.workspace = true +pluto-crypto.workspace = true pluto-eth2util.workspace = true pluto-cluster.workspace = true pluto-p2p.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5b44a216..5e69855f 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -29,6 +29,9 @@ pub mod parsigdb; /// DutyDB — in-memory store for unsigned duty data. pub mod dutydb; +/// SigAgg — threshold BLS signature aggregation. +pub mod sigagg; + mod parsigex_codec; // SSZ codec operates on compile-time-constant byte sizes and offsets. // Arithmetic is bounded and casts from `usize` to `u32` are safe because all diff --git a/crates/core/src/sigagg.rs b/crates/core/src/sigagg.rs new file mode 100644 index 00000000..53d5bd60 --- /dev/null +++ b/crates/core/src/sigagg.rs @@ -0,0 +1,847 @@ +//! SigAgg aggregates threshold partial BLS signatures into a single aggregated +//! signature ready to be broadcast to the beacon chain. + +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; + +use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls}; +use tracing::{debug, info_span}; + +use crate::{ + signeddata::{SignedDataError, VersionedAttestation}, + types::{Duty, ParSignedData, PubKey, Signature, SignedData}, +}; + +/// Error type for sigagg. +#[derive(Debug, thiserror::Error)] +pub enum SigAggError { + /// Threshold must be a positive integer. + #[error("invalid threshold")] + InvalidThreshold, + + /// Aggregate was called with an empty per-validator map. + #[error("empty partial signed data set")] + EmptySet, + + /// A validator entry has fewer partial signatures than the threshold. + #[error("validator {pubkey}: require threshold signatures")] + RequireThresholdSignatures { + /// The validator public key. + pubkey: PubKey, + }, + + /// After deduplicating by share index, fewer distinct signatures remain + /// than the threshold. + #[error("validator {pubkey}: number of partial signatures less than threshold")] + InsufficientDistinctSignatures { + /// The validator public key. + pubkey: PubKey, + }, + + /// Failed to extract the BLS signature bytes from a partial signed data. + #[error("validator {pubkey}: signature from core: {source}")] + SignatureFromCore { + /// The validator public key. + pubkey: PubKey, + /// The underlying error. + #[source] + source: SignedDataError, + }, + + /// Failed to inject the aggregated signature into the output signed data. + #[error("validator {pubkey}: set signature: {source}")] + SetSignature { + /// The validator public key. + pubkey: PubKey, + /// The underlying error. + #[source] + source: SignedDataError, + }, + + /// BLS threshold aggregation failed. + #[error("validator {pubkey}: threshold aggregate: {source}")] + ThresholdAggregate { + /// The validator public key. + pubkey: PubKey, + /// The underlying error. + #[source] + source: pluto_crypto::types::Error, + }, + + /// Share index does not fit in a u8. + #[error("validator {pubkey}: invalid share index: {idx}")] + InvalidShareIndex { + /// The validator public key. + pubkey: PubKey, + /// The out-of-range index value. + idx: u64, + }, +} + +/// Convenience alias for [`std::result::Result`] with [`SigAggError`]. +pub type Result = std::result::Result; + +/// Per-duty output: one aggregated [`SignedData`] per validator public key. +pub type AggSignedDataSet = HashMap>; + +/// Callback invoked after a successful threshold aggregation for a duty. +pub type AggSub = Arc< + dyn Fn(&Duty, &AggSignedDataSet) -> Pin> + Send>> + + Send + + Sync + + 'static, +>; + +/// Verify callback — checks the aggregated signature against the beacon chain. +pub type VerifyFn = Arc< + dyn Fn(&PubKey, &dyn SignedData) -> Pin> + Send>> + + Send + + Sync + + 'static, +>; + +/// Aggregates threshold partial BLS signatures into a single aggregated +/// signature per validator. +pub struct Aggregator { + threshold: u64, + verify_fn: VerifyFn, + subs: Vec, +} + +impl Aggregator { + /// Creates a new `Aggregator`. + /// + /// Returns an error if `threshold` is zero. + pub fn new(threshold: u64, verify_fn: VerifyFn) -> Result { + if threshold == 0 { + return Err(SigAggError::InvalidThreshold); + } + + Ok(Self { + threshold, + verify_fn, + subs: Vec::new(), + }) + } + + /// Registers a callback for aggregated signed duty data. + pub fn subscribe(&mut self, sub: AggSub) { + self.subs.push(sub); + } + + /// Aggregates the partially signed duty data for the set of DVs and + /// notifies all subscribers. + /// + /// If aggregation fails for any validator the entire call returns that + /// error immediately — no partial results are emitted. + pub async fn aggregate( + &self, + duty: &Duty, + set: &HashMap>, + ) -> Result<()> { + if set.is_empty() { + return Err(SigAggError::EmptySet); + } + + let mut output = AggSignedDataSet::new(); + + for (pubkey, par_sigs) in set { + let signed = self.aggregate_one(pubkey, par_sigs).await?; + output.insert(*pubkey, signed); + } + + debug!("Threshold aggregated partial signatures"); + + for sub in &self.subs { + sub(duty, &output).await?; + } + + Ok(()) + } + + async fn aggregate_one( + &self, + pubkey: &PubKey, + par_sigs: &[ParSignedData], + ) -> Result> { + if (par_sigs.len() as u64) < self.threshold { + return Err(SigAggError::RequireThresholdSignatures { pubkey: *pubkey }); + } + + // Deduplicate by share index; last writer wins (matches Go behaviour). + let mut bls_sigs: HashMap = HashMap::new(); + for par_sig in par_sigs { + let sig = + par_sig + .signed_data + .signature() + .map_err(|e| SigAggError::SignatureFromCore { + pubkey: *pubkey, + source: e, + })?; + bls_sigs.insert(par_sig.share_idx, sig); + } + + if (bls_sigs.len() as u64) < self.threshold { + return Err(SigAggError::InsufficientDistinctSignatures { pubkey: *pubkey }); + } + + let bls_map: HashMap = bls_sigs + .iter() + .map(|(idx, sig)| { + let idx_u8 = u8::try_from(*idx).map_err(|_| SigAggError::InvalidShareIndex { + pubkey: *pubkey, + idx: *idx, + })?; + Ok((idx_u8, *sig.as_ref())) + }) + .collect::>()?; + + let agg_bytes = info_span!("tbls.ThresholdAggregate") + .in_scope(|| BlstImpl.threshold_aggregate(&bls_map)) + .map_err(|e| SigAggError::ThresholdAggregate { + pubkey: *pubkey, + source: e, + })?; + + // Prefer a VersionedAttestation that has validator_index set — the local VC + // includes it, peers don't. Falling back to parSigs[0] is fine for all other + // duty types, and for attestations where no parSig carries a validator_index. + // All parSigs share the same unsigned payload (guaranteed by consensus), so + // any one works as a template. + let template = par_sigs + .iter() + .find_map(|ps| { + let att = ps + .signed_data + .as_any() + .downcast_ref::()?; + att.0.validator_index?; // return an error if validator_index is not set + Some(ps.signed_data.as_ref()) + }) + .unwrap_or_else(|| par_sigs[0].signed_data.as_ref()); + + let agg_signed = template + .set_signature_boxed(Signature::new(agg_bytes)) + .map_err(|e| SigAggError::SetSignature { + pubkey: *pubkey, + source: e, + })?; + + (self.verify_fn)(pubkey, agg_signed.as_ref()).await?; + + Ok(agg_signed) + } +} + +/// Returns a [`VerifyFn`] that verifies the aggregated signature against the +/// beacon chain. +/// +/// TODO: implement once `Eth2SignedData` and beacon-client verification are +/// ported (`core::types` has a placeholder — see types.rs TODO for +/// `Eth2SignedData`). For now callers can use a no-op or BLS-only verifier. +pub fn new_verifier() -> VerifyFn { + Arc::new(|_, _| Box::pin(async { Ok(()) })) +} + +#[cfg(test)] +mod tests { + use std::{fs, sync::Mutex}; + + use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls}; + + use super::*; + use crate::{ + signeddata::{ + SignedDataError, SignedRandao, SignedVoluntaryExit, VersionedSignedProposal, + VersionedSignedValidatorRegistration, + }, + types::Signature as CoreSig, + }; + + fn noop_verify() -> VerifyFn { + Arc::new(|_, _| Box::pin(async { Ok(()) })) + } + + #[derive(Debug, Clone, PartialEq, Eq)] + struct MockSignedData { + sig: [u8; 96], + } + + impl SignedData for MockSignedData { + fn signature(&self) -> std::result::Result { + Ok(CoreSig::new(self.sig)) + } + + fn set_signature(&self, sig: CoreSig) -> std::result::Result + where + Self: Sized, + { + Ok(Self { sig: *sig.as_ref() }) + } + + fn set_signature_boxed( + &self, + signature: CoreSig, + ) -> std::result::Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + + fn message_root(&self) -> std::result::Result<[u8; 32], SignedDataError> { + Ok([0u8; 32]) + } + } + + #[derive(Debug, Clone, PartialEq, Eq)] + struct FailSignatureMock; + + impl SignedData for FailSignatureMock { + fn signature(&self) -> std::result::Result { + Err(SignedDataError::UnknownType) + } + + fn set_signature(&self, _: CoreSig) -> std::result::Result + where + Self: Sized, + { + Ok(Self) + } + + fn set_signature_boxed( + &self, + sig: CoreSig, + ) -> std::result::Result, SignedDataError> { + Ok(Box::new(self.set_signature(sig)?)) + } + + fn message_root(&self) -> std::result::Result<[u8; 32], SignedDataError> { + Ok([0u8; 32]) + } + } + + #[derive(Debug, Clone, PartialEq, Eq)] + struct FailSetSignatureMock { + sig: [u8; 96], + } + + impl SignedData for FailSetSignatureMock { + fn signature(&self) -> std::result::Result { + Ok(CoreSig::new(self.sig)) + } + + fn set_signature(&self, _: CoreSig) -> std::result::Result + where + Self: Sized, + { + Err(SignedDataError::UnknownType) + } + + fn set_signature_boxed( + &self, + _: CoreSig, + ) -> std::result::Result, SignedDataError> { + Err(SignedDataError::UnknownType) + } + + fn message_root(&self) -> std::result::Result<[u8; 32], SignedDataError> { + Ok([0u8; 32]) + } + } + + fn mock_par_sigs(count: usize, share_idx: u64) -> Vec { + (0..count) + .map(|_| ParSignedData::new(MockSignedData { sig: [0u8; 96] }, share_idx)) + .collect() + } + + struct BLSContext { + pubkey: [u8; 48], + sigs: Vec<(u8, [u8; 96])>, + expected_agg: [u8; 96], + } + + fn make_bls_context() -> BLSContext { + const THRESHOLD: u64 = 3; + const PEERS: u8 = 4; + const MSG: [u8; 32] = [42u8; 32]; + + let tbls = BlstImpl; + let mut rng = rand::thread_rng(); + let secret = tbls.generate_secret_key(&mut rng).unwrap(); + let pubkey = tbls.secret_to_public_key(&secret).unwrap(); + let shares = tbls + .threshold_split(&secret, PEERS, u8::try_from(THRESHOLD).unwrap()) + .unwrap(); + + let mut bls_map: HashMap = HashMap::new(); + let mut sigs = Vec::new(); + for (share_idx, share) in &shares { + let sig = tbls.sign(share, &MSG).unwrap(); + bls_map.insert(*share_idx, sig); + sigs.push((*share_idx, sig)); + } + + BLSContext { + pubkey, + sigs, + expected_agg: tbls.threshold_aggregate(&bls_map).unwrap(), + } + } + + async fn assert_aggregates( + pubkey: [u8; 48], + par_sigs: Vec, + expected_agg: [u8; 96], + duty: &Duty, + ) { + let received: Arc>> = Arc::new(Mutex::new(None)); + let received_clone = received.clone(); + + let mut agg = Aggregator::new(3, noop_verify()).unwrap(); + agg.subscribe(Arc::new(move |_, set: &AggSignedDataSet| { + let received_clone = received_clone.clone(); + let sig = set.values().next().unwrap().signature().unwrap(); + Box::pin(async move { + *received_clone.lock().unwrap() = Some(sig); + Ok(()) + }) + })); + + let mut set = HashMap::new(); + set.insert(PubKey::new(pubkey), par_sigs); + agg.aggregate(duty, &set).await.unwrap(); + + let received_sig = received.lock().unwrap().take().unwrap(); + assert_eq!(*received_sig.as_ref(), expected_agg); + } + + async fn run_aggregation_test(template: &dyn SignedData, duty: &Duty) { + let ctx = make_bls_context(); + let par_sigs = ctx + .sigs + .iter() + .map(|(idx, sig)| { + let signed = template.set_signature_boxed(CoreSig::new(*sig)).unwrap(); + ParSignedData::new_boxed(signed, u64::from(*idx)) + }) + .collect(); + assert_aggregates(ctx.pubkey, par_sigs, ctx.expected_agg, duty).await; + } + + #[test] + fn invalid_threshold() { + let result = Aggregator::new(0, noop_verify()); + let Err(err) = result else { + panic!("expected error") + }; + assert!(matches!(err, SigAggError::InvalidThreshold)); + assert_eq!(err.to_string(), "invalid threshold"); + } + + #[tokio::test] + async fn require_threshold_signatures() { + let agg = Aggregator::new(3, noop_verify()).unwrap(); + let mut set = HashMap::new(); + set.insert(PubKey::new([0u8; 48]), vec![]); + let err = agg + .aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap_err(); + assert!(matches!( + err, + SigAggError::RequireThresholdSignatures { .. } + )); + assert_eq!( + err.to_string(), + format!( + "validator 0x{}: require threshold signatures", + "00".repeat(48) + ) + ); + } + + #[tokio::test] + async fn aggregate_attester() { + let ctx = make_bls_context(); + let par_sigs = ctx + .sigs + .iter() + .map(|(idx, sig)| ParSignedData::new(MockSignedData { sig: *sig }, u64::from(*idx))) + .collect(); + assert_aggregates( + ctx.pubkey, + par_sigs, + ctx.expected_agg, + &Duty::new_attester_duty(1.into()), + ) + .await; + } + + #[tokio::test] + async fn insufficient_distinct_signatures() { + // 4 parSigs all with the same share_idx → deduplicates to 1, below threshold 3. + let agg = Aggregator::new(3, noop_verify()).unwrap(); + let mut set = HashMap::new(); + set.insert(PubKey::new([0u8; 48]), mock_par_sigs(4, 0)); + let err = agg + .aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap_err(); + assert!(matches!( + err, + SigAggError::InsufficientDistinctSignatures { .. } + )); + assert_eq!( + err.to_string(), + format!( + "validator 0x{}: number of partial signatures less than threshold", + "00".repeat(48) + ) + ); + } + + #[tokio::test] + async fn empty_set() { + let agg = Aggregator::new(3, noop_verify()).unwrap(); + let err = agg + .aggregate(&Duty::new_attester_duty(1.into()), &HashMap::new()) + .await + .unwrap_err(); + assert!(matches!(err, SigAggError::EmptySet)); + assert_eq!(err.to_string(), "empty partial signed data set"); + } + + #[tokio::test] + async fn multiple_subscribers_all_notified() { + const THRESHOLD: u64 = 3; + const PEERS: u8 = 4; + + let tbls = BlstImpl; + let mut rng = rand::thread_rng(); + + let secret = tbls.generate_secret_key(&mut rng).unwrap(); + let pubkey = tbls.secret_to_public_key(&secret).unwrap(); + let shares = tbls + .threshold_split(&secret, PEERS, u8::try_from(THRESHOLD).unwrap()) + .unwrap(); + + let msg = [7u8; 32]; + let mut par_sigs = Vec::new(); + for (share_idx, share) in &shares { + let sig = tbls.sign(share, &msg).unwrap(); + par_sigs.push(ParSignedData::new( + MockSignedData { sig }, + u64::from(*share_idx), + )); + } + + let mut agg = Aggregator::new(THRESHOLD, noop_verify()).unwrap(); + + let count: Arc> = Arc::new(Mutex::new(0)); + + for _ in 0..3 { + let count = count.clone(); + agg.subscribe(Arc::new(move |_, _| { + let count = count.clone(); + Box::pin(async move { + *count.lock().unwrap() += 1; + Ok(()) + }) + })); + } + + let mut set = HashMap::new(); + set.insert(PubKey::new(pubkey), par_sigs); + agg.aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap(); + + assert_eq!(*count.lock().unwrap(), 3); + } + + #[tokio::test] + async fn deduplication_succeeds() { + // 5 parSigs with 4 distinct share indices (one duplicate) at threshold 3 → + // success. + let ctx = make_bls_context(); + let mut par_sigs: Vec = ctx + .sigs + .iter() + .map(|(idx, sig)| ParSignedData::new(MockSignedData { sig: *sig }, u64::from(*idx))) + .collect(); + + // Add a duplicate of the first share — last writer wins, same sig so result + // identical. + let (first_idx, first_sig) = ctx.sigs[0]; + par_sigs.push(ParSignedData::new( + MockSignedData { sig: first_sig }, + u64::from(first_idx), + )); + + assert_aggregates( + ctx.pubkey, + par_sigs, + ctx.expected_agg, + &Duty::new_attester_duty(1.into()), + ) + .await; + } + + fn fixture_path(name: &str) -> std::path::PathBuf { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("testdata") + .join("signeddata") + .join(name) + } + + #[tokio::test] + async fn aggregate_randao() { + let json = fs::read_to_string(fixture_path( + "TestJSONSerialisation_SignedRandao.json.golden", + )) + .unwrap(); + let template: SignedRandao = serde_json::from_str(&json).unwrap(); + run_aggregation_test(&template, &Duty::new_randao_duty(1.into())).await; + } + + #[tokio::test] + async fn aggregate_exit() { + let json = fs::read_to_string(fixture_path( + "TestJSONSerialisation_SignedVoluntaryExit.json.golden", + )) + .unwrap(); + let template: SignedVoluntaryExit = serde_json::from_str(&json).unwrap(); + run_aggregation_test(&template, &Duty::new_voluntary_exit_duty(1.into())).await; + } + + #[tokio::test] + async fn aggregate_proposer() { + let json = fs::read_to_string(fixture_path( + "TestJSONSerialisation_VersionedSignedProposal.json.golden", + )) + .unwrap(); + let template: VersionedSignedProposal = serde_json::from_str(&json).unwrap(); + run_aggregation_test(&template, &Duty::new_proposer_duty(1.into())).await; + } + + #[tokio::test] + async fn aggregate_builder_proposer() { + let json = fs::read_to_string(fixture_path( + "TestJSONSerialisation_VersionedSignedProposal.json#01.golden", + )) + .unwrap(); + let template: VersionedSignedProposal = serde_json::from_str(&json).unwrap(); + run_aggregation_test(&template, &Duty::new_builder_proposer_duty(1.into())).await; + } + + #[tokio::test] + async fn aggregate_builder_registration() { + let json = fs::read_to_string(fixture_path("VersionedSignedValidatorRegistration.v1.json")) + .unwrap(); + let template: VersionedSignedValidatorRegistration = serde_json::from_str(&json).unwrap(); + run_aggregation_test(&template, &Duty::new_builder_registration_duty(1.into())).await; + } + + #[tokio::test] + async fn multiple_validators() { + // Two independent validators aggregated in a single aggregate() call. + const THRESHOLD: u64 = 3; + const PEERS: u8 = 4; + + let tbls = BlstImpl; + let mut rng = rand::thread_rng(); + let msg = [55u8; 32]; + + let mut agg_set: HashMap> = HashMap::new(); + let mut expected: HashMap = HashMap::new(); + + for _ in 0..2 { + let secret = tbls.generate_secret_key(&mut rng).unwrap(); + let pubkey_bytes = tbls.secret_to_public_key(&secret).unwrap(); + let shares = tbls + .threshold_split(&secret, PEERS, u8::try_from(THRESHOLD).unwrap()) + .unwrap(); + + let mut par_sigs = Vec::new(); + let mut bls_map: HashMap = HashMap::new(); + for (share_idx, share) in &shares { + let sig = tbls.sign(share, &msg).unwrap(); + bls_map.insert(*share_idx, sig); + par_sigs.push(ParSignedData::new( + MockSignedData { sig }, + u64::from(*share_idx), + )); + } + + let agg_sig = tbls.threshold_aggregate(&bls_map).unwrap(); + let pubkey = PubKey::new(pubkey_bytes); + expected.insert(pubkey, agg_sig); + agg_set.insert(pubkey, par_sigs); + } + + let received: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let received_clone = received.clone(); + + let mut agg = Aggregator::new(THRESHOLD, noop_verify()).unwrap(); + agg.subscribe(Arc::new(move |_, set: &AggSignedDataSet| { + let received_clone = received_clone.clone(); + let sigs: HashMap = set + .iter() + .map(|(k, v)| (*k, v.signature().unwrap())) + .collect(); + Box::pin(async move { + received_clone.lock().unwrap().extend(sigs); + Ok(()) + }) + })); + + agg.aggregate(&Duty::new_attester_duty(1.into()), &agg_set) + .await + .unwrap(); + + let received = received.lock().unwrap(); + assert_eq!(received.len(), 2); + for (pubkey, exp_bytes) in &expected { + let got = &received[pubkey]; + assert_eq!(*got.as_ref(), *exp_bytes); + } + } + + #[tokio::test] + async fn verify_fn_error() { + let ctx = make_bls_context(); + let par_sigs: Vec = ctx + .sigs + .iter() + .map(|(idx, sig)| ParSignedData::new(MockSignedData { sig: *sig }, u64::from(*idx))) + .collect(); + + let fail_verify: VerifyFn = + Arc::new(|_, _| Box::pin(async { Err(SigAggError::InvalidThreshold) })); + let agg = Aggregator::new(3, fail_verify).unwrap(); + let mut set = HashMap::new(); + set.insert(PubKey::new(ctx.pubkey), par_sigs); + let err = agg + .aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap_err(); + assert!(matches!(err, SigAggError::InvalidThreshold)); + } + + #[tokio::test] + async fn subscriber_error() { + let ctx = make_bls_context(); + let par_sigs: Vec = ctx + .sigs + .iter() + .map(|(idx, sig)| ParSignedData::new(MockSignedData { sig: *sig }, u64::from(*idx))) + .collect(); + + let mut agg = Aggregator::new(3, noop_verify()).unwrap(); + agg.subscribe(Arc::new(|_, _| { + Box::pin(async { Err(SigAggError::InvalidThreshold) }) + })); + let mut set = HashMap::new(); + set.insert(PubKey::new(ctx.pubkey), par_sigs); + let err = agg + .aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap_err(); + assert!(matches!(err, SigAggError::InvalidThreshold)); + } + + #[tokio::test] + async fn signature_from_core_error() { + let agg = Aggregator::new(3, noop_verify()).unwrap(); + let par_sigs: Vec = (0..3u64) + .map(|i| ParSignedData::new(FailSignatureMock, i)) + .collect(); + let mut set = HashMap::new(); + set.insert(PubKey::new([1u8; 48]), par_sigs); + let err = agg + .aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap_err(); + assert!(matches!(err, SigAggError::SignatureFromCore { .. })); + } + + #[tokio::test] + async fn set_signature_error() { + let ctx = make_bls_context(); + let par_sigs: Vec = ctx + .sigs + .iter() + .map(|(idx, sig)| { + ParSignedData::new(FailSetSignatureMock { sig: *sig }, u64::from(*idx)) + }) + .collect(); + + let agg = Aggregator::new(3, noop_verify()).unwrap(); + let mut set = HashMap::new(); + set.insert(PubKey::new(ctx.pubkey), par_sigs); + let err = agg + .aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap_err(); + assert!(matches!(err, SigAggError::SetSignature { .. })); + } + + #[tokio::test] + async fn versioned_attestation_validator_index_preference() { + let json = fs::read_to_string(fixture_path( + "TestJSONSerialisation_VersionedAttestation.json.golden", + )) + .unwrap(); + let with_idx: VersionedAttestation = serde_json::from_str(&json).unwrap(); + assert!( + with_idx.0.validator_index.is_some(), + "fixture must carry validator_index" + ); + + let mut inner_no_idx = with_idx.0.clone(); + inner_no_idx.validator_index = None; + let without_idx = VersionedAttestation::new(inner_no_idx).unwrap(); + + let ctx = make_bls_context(); + // First par_sig has no validator_index; second has it — template must prefer + // the latter. + let par_sigs: Vec = ctx + .sigs + .iter() + .enumerate() + .map(|(i, (idx, sig))| { + let template: &dyn SignedData = if i == 0 { &without_idx } else { &with_idx }; + let signed = template.set_signature_boxed(CoreSig::new(*sig)).unwrap(); + ParSignedData::new_boxed(signed, u64::from(*idx)) + }) + .collect(); + + let captured: Arc>>> = Arc::new(Mutex::new(None)); + let captured_clone = captured.clone(); + + let mut agg = Aggregator::new(3, noop_verify()).unwrap(); + agg.subscribe(Arc::new(move |_, set: &AggSignedDataSet| { + let captured_clone = captured_clone.clone(); + let output = set.values().next().unwrap().clone(); + Box::pin(async move { + *captured_clone.lock().unwrap() = Some(output); + Ok(()) + }) + })); + + let mut set = HashMap::new(); + set.insert(PubKey::new(ctx.pubkey), par_sigs); + agg.aggregate(&Duty::new_attester_duty(1.into()), &set) + .await + .unwrap(); + + let output = captured.lock().unwrap().take().unwrap(); + let att = output + .as_any() + .downcast_ref::() + .expect("output must be VersionedAttestation"); + assert!( + att.0.validator_index.is_some(), + "output must preserve validator_index from template" + ); + } +} diff --git a/crates/core/src/signeddata.rs b/crates/core/src/signeddata.rs index f08b2145..288ebca3 100644 --- a/crates/core/src/signeddata.rs +++ b/crates/core/src/signeddata.rs @@ -151,6 +151,13 @@ impl SignedData for Signature { Ok(signature) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Err(SignedDataError::UnsupportedSignatureMessageRoot) } @@ -259,6 +266,13 @@ impl SignedData for VersionedSignedProposal { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { let proposal = &self.0; if proposal.version == versioned::DataVersion::Unknown { @@ -397,6 +411,13 @@ impl SignedData for Attestation { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(hash_root(&self.0.data)) } @@ -475,6 +496,13 @@ impl SignedData for VersionedAttestation { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { let version = self.0.version; if version == versioned::DataVersion::Unknown { @@ -590,6 +618,13 @@ impl SignedData for SignedVoluntaryExit { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(self.0.message_root()) } @@ -672,6 +707,13 @@ impl SignedData for VersionedSignedValidatorRegistration { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { match self.0.version { versioned::BuilderVersion::V1 => self @@ -748,6 +790,13 @@ impl SignedData for SignedRandao { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(self.0.message_root()) } @@ -791,6 +840,13 @@ impl SignedData for BeaconCommitteeSelection { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(self.0.message_root()) } @@ -827,6 +883,13 @@ impl SignedData for SyncCommitteeSelection { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(self.0.message_root()) } @@ -863,6 +926,13 @@ impl SignedData for SignedAggregateAndProof { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(hash_root(&self.0.message)) } @@ -943,6 +1013,13 @@ impl SignedData for VersionedSignedAggregateAndProof { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { let version = self.0.version; if version == versioned::DataVersion::Unknown { @@ -1029,6 +1106,13 @@ impl SignedData for SignedSyncMessage { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(self.0.message_root()) } @@ -1065,6 +1149,13 @@ impl SignedData for SyncContributionAndProof { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(self.0.selection_proof_message_root()) } @@ -1101,6 +1192,13 @@ impl SignedData for SignedSyncContributionAndProof { Ok(out) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok(self.0.message_root()) } diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 8d971f7d..02ab1919 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -546,6 +546,12 @@ pub trait SignedData: Any + DynClone + DynEq + StdDebug + Send + Sync { where Self: Sized; + /// Object-safe equivalent of [`SignedData::set_signature`]. + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError>; + /// message_root returns the message root for the unsigned data. fn message_root(&self) -> Result<[u8; 32], SignedDataError>; } @@ -1028,6 +1034,13 @@ mod tests { Ok(self.clone()) } + fn set_signature_boxed( + &self, + signature: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(signature)?)) + } + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { Ok([42u8; 32]) }