Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions rs/consensus/dkg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use ic_interfaces::{
use ic_interfaces_registry::RegistryClient;
use ic_interfaces_state_manager::StateReader;
use ic_logger::{ReplicaLogger, error, info};
use ic_metrics::{
MetricsRegistry,
buckets::{decimal_buckets, linear_buckets},
};
use ic_metrics::MetricsRegistry;
use ic_replicated_state::ReplicatedState;
use ic_types::{
Height, NodeId, ReplicaVersion, SubnetId,
Expand All @@ -25,20 +22,21 @@ use ic_types::{
threshold_sig::ni_dkg::{NiDkgId, NiDkgTargetSubnet, config::NiDkgConfig},
},
};
use prometheus::Histogram;
use rayon::prelude::*;
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};

pub mod dkg_key_manager;
pub mod metrics;
pub mod payload_builder;
pub mod payload_validator;
pub(crate) mod remote;

use crate::remote::{build_callback_id_config_map, merge_configs};
pub use crate::utils::get_vetkey_public_keys;
use metrics::DkgClientMetrics;

#[cfg(test)]
mod test_utils;
Expand All @@ -61,11 +59,6 @@ const MAX_REMOTE_DKG_ATTEMPTS: u32 = 5;
/// Generic error string for failed remote DKG requests.
const REMOTE_DKG_REPEATED_FAILURE_ERROR: &str = "Attempts to run this DKG repeatedly failed";

struct Metrics {
on_state_change_duration: Histogram,
on_state_change_processed: Histogram,
}

/// `DkgImpl` is responsible for holding DKG dependencies and for responding to
/// changes in the consensus and DKG pool.
pub struct DkgImpl {
Expand All @@ -77,7 +70,7 @@ pub struct DkgImpl {
consensus_cache: Arc<dyn ConsensusPoolCache>,
dkg_key_manager: Arc<Mutex<DkgKeyManager>>,
logger: ReplicaLogger,
metrics: Metrics,
metrics: DkgClientMetrics,
}

impl DkgImpl {
Expand All @@ -102,21 +95,7 @@ impl DkgImpl {
consensus_cache,
dkg_key_manager,
logger,
metrics: Metrics {
on_state_change_duration: metrics_registry.histogram(
"consensus_dkg_on_state_change_duration_seconds",
"The time it took to execute on_state_change(), in seconds",
// 0.1ms, 0.2ms, 0.5ms, 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, 100ms, 200ms, 500ms,
// 1s, 2s, 5s, 10s, 20s, 50s, 100s, 200s, 500s
decimal_buckets(-4, 2),
),
on_state_change_processed: metrics_registry.histogram(
"consensus_dkg_on_state_change_processed",
"Number of entries processed by on_state_change()",
// 0 - 100
linear_buckets(0.0, 1.0, 100),
),
},
metrics: DkgClientMetrics::new(metrics_registry),
}
}

Expand Down Expand Up @@ -2040,6 +2019,7 @@ mod tests {
&parent,
callback_id_map,
&no_op_logger(),
None,
)
.unwrap();

Expand Down
136 changes: 136 additions & 0 deletions rs/consensus/dkg/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use ic_metrics::{
MetricsRegistry,
buckets::{decimal_buckets, linear_buckets},
};
use ic_types::{
consensus::BlockPayload,
crypto::threshold_sig::ni_dkg::{NiDkgTag, NiDkgTargetSubnet},
};
use prometheus::{Histogram, HistogramTimer, HistogramVec, IntCounterVec};
use std::collections::BTreeMap;

pub(crate) struct DkgClientMetrics {
pub(crate) on_state_change_duration: Histogram,
pub(crate) on_state_change_processed: Histogram,
}

impl DkgClientMetrics {
pub(crate) fn new(metrics_registry: MetricsRegistry) -> Self {
Self {
on_state_change_duration: metrics_registry.histogram(
"consensus_dkg_on_state_change_duration_seconds",
"The time it took to execute on_state_change(), in seconds",
// 0.1ms, 0.2ms, 0.5ms, 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, 100ms, 200ms, 500ms,
// 1s, 2s, 5s, 10s, 20s, 50s, 100s, 200s, 500s
decimal_buckets(-4, 2),
),
on_state_change_processed: metrics_registry.histogram(
"consensus_dkg_on_state_change_processed",
"Number of entries processed by on_state_change()",
// 0 - 100
linear_buckets(0.0, 1.0, 100),
),
}
}
}

pub struct DkgPayloadMetrics {
payload_errors: IntCounterVec,
pub(crate) payload_duration: HistogramVec,
}

impl DkgPayloadMetrics {
pub fn new(metrics_registry: MetricsRegistry) -> Self {
Self {
payload_errors: metrics_registry.int_counter_vec(
"dkg_payload_errors",
"DKG payload related errors",
&["type"],
),
payload_duration: metrics_registry.histogram_vec(
"dkg_payload_creation_seconds",
"Time taken to create a DKG payload, in seconds",
decimal_buckets(-4, 2),
&["type"],
),
}
}

pub fn payload_errors_inc(&self, label: &str) {
self.payload_errors.with_label_values(&[label]).inc();
}

pub(crate) fn payload_creation_timer(&self, label: &str) -> HistogramTimer {
self.payload_duration
.with_label_values(&[label])
.start_timer()
}
}

pub(crate) trait DkgPayloadMetricsOptionExt {
fn payload_errors_inc(self, label: &str);
fn payload_creation_timer(self, label: &str) -> Option<HistogramTimer>;
}

impl DkgPayloadMetricsOptionExt for Option<&DkgPayloadMetrics> {
fn payload_errors_inc(self, label: &str) {
if let Some(metrics) = self {
metrics.payload_errors_inc(label);
}
}

fn payload_creation_timer(self, label: &str) -> Option<HistogramTimer> {
self.map(|metrics| metrics.payload_creation_timer(label))
}
}

pub struct DkgPayloadStats {
pub remote_dkg_attempts_map_size: Option<usize>,
pub remote_dkg_attempts_map_sum: Option<u64>,
pub dealings_included: BTreeMap<(NiDkgTag, NiDkgTargetSubnet), usize>,
pub remote_transcripts_delivered: BTreeMap<NiDkgTag, usize>,
}

impl From<&BlockPayload> for DkgPayloadStats {
fn from(payload: &BlockPayload) -> Self {
let mut dealings_included = BTreeMap::new();
let mut remote_transcripts_delivered = BTreeMap::new();
let (remote_dkg_attempts_map_size, remote_dkg_attempts_map_sum) = match payload {
BlockPayload::Summary(summary_payload) => (
Some(summary_payload.dkg.initial_dkg_attempts.len()),
Some(
summary_payload
.dkg
.initial_dkg_attempts
.values()
.map(|attempts| *attempts as u64)
.sum(),
),
),
BlockPayload::Data(data_payload) => {
for message in &data_payload.dkg.messages {
let dkg_id = &message.content.dkg_id;
dealings_included
.entry((dkg_id.dkg_tag.clone(), dkg_id.target_subnet))
.and_modify(|count| *count += 1)
.or_insert(1);
}

for (dkg_id, _, _) in &data_payload.dkg.transcripts_for_remote_subnets {
remote_transcripts_delivered
.entry(dkg_id.dkg_tag.clone())
.and_modify(|count| *count += 1)
.or_insert(1);
}

(None, None)
}
};
Self {
remote_dkg_attempts_map_size,
remote_dkg_attempts_map_sum,
dealings_included,
remote_transcripts_delivered,
}
}
}
21 changes: 21 additions & 0 deletions rs/consensus/dkg/src/payload_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
MAX_EARLY_REMOTE_TRANSCRIPTS, MAX_REMOTE_DKGS_PER_INTERVAL,
metrics::{DkgPayloadMetrics, DkgPayloadMetricsOptionExt},
remote::{
ConfigResult, build_callback_id_config_map, get_updated_remote_dkg_attempts, merge_configs,
},
Expand Down Expand Up @@ -56,6 +57,7 @@ pub fn create_payload(
validation_context: &ValidationContext,
logger: ReplicaLogger,
max_dealings_per_block: usize,
dkg_payload_metrics: Option<&DkgPayloadMetrics>,
) -> Result<DkgPayload, DkgPayloadCreationError> {
let height = parent.height.increment();
// Get the last summary from the chain.
Expand All @@ -78,6 +80,7 @@ pub fn create_payload(
state_reader,
validation_context,
logger,
dkg_payload_metrics,
)
.map(DkgPayload::Summary)
} else {
Expand All @@ -96,11 +99,13 @@ pub fn create_payload(
state_reader,
validation_context,
logger,
dkg_payload_metrics,
)
.map(DkgPayload::Data)
}
}

#[allow(clippy::too_many_arguments)]
fn create_data_payload(
this_subnet_id: SubnetId,
registry_client: &dyn RegistryClient,
Expand All @@ -114,7 +119,10 @@ fn create_data_payload(
state_reader: &dyn StateReader<State = ReplicatedState>,
validation_context: &ValidationContext,
logger: ReplicaLogger,
dkg_payload_metrics: Option<&DkgPayloadMetrics>,
) -> Result<DkgDataPayload, DkgPayloadCreationError> {
let _timer = dkg_payload_metrics.payload_creation_timer("data");

// Get all existing dealer ids from the chain.
let dealers_from_chain = utils::get_dealers_from_chain(pool_reader, parent);

Expand Down Expand Up @@ -148,6 +156,7 @@ fn create_data_payload(
parent,
remote_config_results,
&logger,
dkg_payload_metrics,
)?;

if !remote_dkg_transcripts.is_empty() {
Expand All @@ -174,6 +183,7 @@ pub(crate) fn create_early_remote_transcripts(
parent: &Block,
callback_id_map: BTreeMap<CallbackId, ConfigResult>,
logger: &ReplicaLogger,
dkg_payload_metrics: Option<&DkgPayloadMetrics>,
) -> Result<Vec<(NiDkgId, CallbackId, Result<NiDkgTranscript, String>)>, DkgPayloadCreationError> {
// Since this function is relatively expensive, we simply return if there are no outstanding DKG contexts
if callback_id_map.is_empty() {
Expand All @@ -194,6 +204,7 @@ pub(crate) fn create_early_remote_transcripts(
for (dkg_id, err) in errs {
// Skip requests for which we already have a transcript on chain.
if !completed_dkgs.contains(&dkg_id) {
dkg_payload_metrics.payload_errors_inc("remote_config_creation_failed");
error!(
logger,
"Failed to create remote transcript config for dkg id {:?} at height {}: {}",
Expand Down Expand Up @@ -230,6 +241,8 @@ pub(crate) fn create_early_remote_transcripts(
// For each config, try to build the necessary (dkg_id, callback_id, transcript_result) triple
for config in configs.iter() {
let dealings = all_dealings.remove(config.dkg_id()).unwrap_or_else(|| {
dkg_payload_metrics
.payload_errors_inc("remote_dealings_missing_after_capacity_check");
error!(
logger,
"We checked that all configs have enough dealings above. This is a bug."
Expand All @@ -248,6 +261,7 @@ pub(crate) fn create_early_remote_transcripts(
// Note that we handled the reproducible error case of not having enough dealings
// already beforehand.
Err(err) if err.is_reproducible() => {
dkg_payload_metrics.payload_errors_inc("remote_transcript_reproducible_error");
// Including the error in the payload will cause the context to receive
// a reject response.
let error_message = format!(
Expand All @@ -260,6 +274,7 @@ pub(crate) fn create_early_remote_transcripts(
Err(error_message)
}
Err(err) => {
dkg_payload_metrics.payload_errors_inc("remote_transcript_transient_error");
// Return on transient crypto errors
return Err(DkgPayloadCreationError::DkgCreateTranscriptError(err));
}
Expand Down Expand Up @@ -408,7 +423,10 @@ pub(super) fn create_summary_payload(
state_reader: &dyn StateReader<State = ReplicatedState>,
validation_context: &ValidationContext,
logger: ReplicaLogger,
dkg_payload_metrics: Option<&DkgPayloadMetrics>,
) -> Result<DkgSummary, DkgPayloadCreationError> {
let _timer = dkg_payload_metrics.payload_creation_timer("summary");

let (mut all_dealings, completed_dkgs) = utils::get_dkg_dealings(pool_reader, parent);
let mut next_transcripts = BTreeMap::new();
// Try to create transcripts from the last round.
Expand All @@ -431,12 +449,14 @@ pub(super) fn create_summary_payload(
}
}
Err(err) if err.is_reproducible() => {
dkg_payload_metrics.payload_errors_inc("summary_transcript_reproducible_error");
warn!(
logger,
"Failed to create transcript for dkg id {:?}: {:?}", dkg_id, err
);
}
Err(err) => {
dkg_payload_metrics.payload_errors_inc("summary_transcript_transient_error");
return Err(DkgPayloadCreationError::DkgCreateTranscriptError(err));
}
};
Expand Down Expand Up @@ -1263,6 +1283,7 @@ mod tests {
time: UNIX_EPOCH,
},
no_op_logger(),
None,
)
.unwrap()
};
Expand Down
2 changes: 2 additions & 0 deletions rs/consensus/dkg/src/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn validate_payload(
state_reader,
validation_context,
log.clone(),
None,
)?;
if summary_payload.dkg != expected_summary {
return Err(InvalidDkgPayloadReason::MismatchedDkgSummary(
Expand Down Expand Up @@ -230,6 +231,7 @@ fn validate_dealings_payload(
parent,
remote_config_results,
log,
None,
)?;

if dealings.transcripts_for_remote_subnets != expected_transcripts {
Expand Down
2 changes: 1 addition & 1 deletion rs/consensus/idkg/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl IDkgPayloadMetrics {
.inc();
}

pub(crate) fn payload_errors_inc(&self, label: &str) {
pub fn payload_errors_inc(&self, label: &str) {
self.payload_errors.with_label_values(&[label]).inc();
}
}
Expand Down
Loading
Loading