diff --git a/Cargo.lock b/Cargo.lock index 16bcb0f104fd..afd98f78d1de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10303,7 +10303,12 @@ name = "ic-https-outcalls-pricing" version = "0.9.0" dependencies = [ "ic-config", + "ic-logger", + "ic-metrics", "ic-types", + "ic-types-cycles", + "prometheus", + "slog", ] [[package]] diff --git a/rs/consensus/src/consensus/batch_delivery.rs b/rs/consensus/src/consensus/batch_delivery.rs index 7df26b0668e1..a0ab0d455e3c 100644 --- a/rs/consensus/src/consensus/batch_delivery.rs +++ b/rs/consensus/src/consensus/batch_delivery.rs @@ -26,8 +26,8 @@ use ic_protobuf::{ use ic_types::{ Height, PrincipalId, SubnetId, batch::{ - Batch, BatchContent, BatchMessages, BatchSummary, BlockmakerMetrics, ChainKeyData, - ConsensusResponse, + Batch, BatchContent, BatchMessages, BatchSummary, BlockmakerMetrics, CanisterHttpRefunds, + ChainKeyData, ConsensusResponse, }, consensus::{ Block, BlockPayload, HasVersion, @@ -210,7 +210,8 @@ pub(crate) fn deliver_batches_with_result_processor( idkg_pre_signatures, nidkg_ids, }; - let consensus_responses = generate_responses_to_subnet_calls(&block, &mut batch_stats, log); + let (consensus_responses, refunds) = + generate_responses_to_subnet_calls(&block, &mut batch_stats, log); // This flag can only be true, if we've called deliver_batches with a height // limit. In this case we also want to have a checkpoint for that last height. let persist_batch = Some(height) == max_batch_height_to_deliver; @@ -220,6 +221,7 @@ pub(crate) fn deliver_batches_with_result_processor( batch_messages: BatchMessages::default(), chain_key_data, consensus_responses, + refunds, requires_full_state_hash, }, BlockPayload::Data(data_payload) => { @@ -236,6 +238,7 @@ pub(crate) fn deliver_batches_with_result_processor( .unwrap_or_default(), chain_key_data, consensus_responses, + refunds, requires_full_state_hash, } } @@ -312,8 +315,9 @@ fn generate_responses_to_subnet_calls( block: &Block, stats: &mut BatchStats, log: &ReplicaLogger, -) -> Vec { +) -> (Vec, CanisterHttpRefunds) { let mut consensus_responses = Vec::new(); + let mut refunds = CanisterHttpRefunds::default(); match block.payload.as_ref() { BlockPayload::Summary(summary_payload) => { info!( @@ -337,9 +341,11 @@ fn generate_responses_to_subnet_calls( .append(&mut generate_responses_to_initial_dealings_calls(payload)); } - let (mut http_responses, http_stats) = + let (mut http_responses, mut http_refunds, http_stats) = CanisterHttpPayloadBuilderImpl::into_messages(&data_payload.batch.canister_http); consensus_responses.append(&mut http_responses); + refunds.initial.append(&mut http_refunds.initial); + refunds.asynchronous.append(&mut http_refunds.asynchronous); stats.canister_http = http_stats; let mut chain_key_responses = @@ -347,7 +353,7 @@ fn generate_responses_to_subnet_calls( consensus_responses.append(&mut chain_key_responses); } } - consensus_responses + (consensus_responses, refunds) } #[derive(Debug, Clone, PartialEq, Eq)] @@ -735,7 +741,7 @@ mod tests { ); let mut batch_stats = BatchStats::new(Height::from(1)); - let responses = + let (responses, _refunds) = generate_responses_to_subnet_calls(&block, &mut batch_stats, &no_op_logger()); assert_eq!( diff --git a/rs/determinism_test/src/lib.rs b/rs/determinism_test/src/lib.rs index 5576130535e1..077ef14ab4db 100644 --- a/rs/determinism_test/src/lib.rs +++ b/rs/determinism_test/src/lib.rs @@ -14,7 +14,7 @@ use ic_state_manager::StateManagerImpl; use ic_test_utilities_types::messages::SignedIngressBuilder; use ic_types::{ CanisterId, CryptoHashOfState, Randomness, RegistryVersion, ReplicaVersion, - batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics}, + batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics, CanisterHttpRefunds}, ingress::{IngressState, IngressStatus, WasmResult}, messages::{MessageId, SignedIngress}, time::UNIX_EPOCH, @@ -33,6 +33,7 @@ fn build_batch(message_routing: &dyn MessageRouting, msgs: Vec) - }, chain_key_data: Default::default(), consensus_responses: vec![], + refunds: CanisterHttpRefunds::default(), requires_full_state_hash: false, }, randomness: Randomness::from([0; 32]), @@ -51,6 +52,7 @@ fn build_batch_with_full_state_hash(message_routing: &dyn MessageRouting) -> Bat batch_messages: BatchMessages::default(), chain_key_data: Default::default(), consensus_responses: vec![], + refunds: CanisterHttpRefunds::default(), requires_full_state_hash: true, }, randomness: Randomness::from([0; 32]), diff --git a/rs/execution_environment/src/execution_environment.rs b/rs/execution_environment/src/execution_environment.rs index f25a304c1b1a..1a6c2670ae98 100644 --- a/rs/execution_environment/src/execution_environment.rs +++ b/rs/execution_environment/src/execution_environment.rs @@ -693,11 +693,11 @@ impl ExecutionEnvironment { let cost_schedule = state.get_own_cost_schedule(); let mut msg = match msg { - SubnetMessage::Response(response) => { + SubnetMessage::ConsensusResponse(response) => { let context = state .metadata .subnet_call_context_manager - .retrieve_context(response.originator_reply_callback, &self.log); + .retrieve_context(response.callback, &self.log); return match context { None => (state, ExecuteSubnetMessageResultType::Finished), Some(context) => { @@ -718,7 +718,7 @@ impl ExecutionEnvironment { context.max_response_bytes, registry_settings.subnet_size, cost_schedule, - NumBytes::from(response.payload_size_bytes()), + response.payload.size_bytes(), ); self.metrics.observe_http_outcall_price_change( @@ -737,7 +737,7 @@ impl ExecutionEnvironment { info!( self.log, "Canister Http request with payload_size {}, max_response_size {}, subnet_size {}, reply_callback_id {}, sender {}, process_id {}", - response.payload_size_bytes().get(), + response.payload.size_bytes().get(), max_response_size, registry_settings.subnet_size, context.request.sender_reply_callback, @@ -749,7 +749,7 @@ impl ExecutionEnvironment { self.metrics.observe_subnet_message( &request.method_name, time_elapsed.as_secs_f64(), - &match &response.response_payload { + &match &response.payload { Payload::Data(_) => Ok(()), Payload::Reject(_) => Err(ErrorCode::CanisterRejectedMessage), }, @@ -758,7 +758,7 @@ impl ExecutionEnvironment { if let ( SubnetCallContext::SignWithThreshold(threshold_context), Payload::Data(_), - ) = (&context, &response.response_payload) + ) = (&context, &response.payload) { *state .metadata @@ -768,13 +768,27 @@ impl ExecutionEnvironment { .or_default() += 1; } + // Refund the cycles left unspent upfront (`request.payment`). + // For HTTP outcalls, additionally refund the per-replica + // shares that the messaging layer accumulated into the + // request context's `refund_status` before execution. For + // non-HTTP responses there is no such accumulation, so this + // preserves the previous behavior. + let http_refund = match &context { + SubnetCallContext::CanisterHttpRequest(context) => { + context.refund_status.refunded_cycles + } + _ => Cycles::new(0), + }; + let refund = request.payment + http_refund; + state.push_subnet_output_response( Response { originator: request.sender, respondent: CanisterId::from(self.own_subnet_id), originator_reply_callback: request.sender_reply_callback, - refund: request.payment, - response_payload: response.response_payload.clone(), + refund, + response_payload: response.payload.clone(), deadline: request.deadline, } .into(), diff --git a/rs/execution_environment/src/execution_environment_metrics.rs b/rs/execution_environment/src/execution_environment_metrics.rs index f4e4836576b3..0b9a4a415585 100644 --- a/rs/execution_environment/src/execution_environment_metrics.rs +++ b/rs/execution_environment/src/execution_environment_metrics.rs @@ -8,8 +8,8 @@ use ic_metrics::MetricsRegistry; use ic_metrics::buckets::{decimal_buckets, decimal_buckets_with_zero}; use ic_replicated_state::metadata_state::subnet_call_context_manager::InstallCodeCallId; use ic_types::CanisterId; +use ic_types::batch::ConsensusResponse; use ic_types::canister_http::{CanisterHttpRequestContext, MAX_CANISTER_HTTP_RESPONSE_BYTES}; -use ic_types::messages::Response; use ic_types_cycles::NominalCycles; use prometheus::{Histogram, HistogramVec, IntCounter}; use std::str::FromStr; @@ -232,7 +232,7 @@ impl ExecutionEnvironmentMetrics { pub(crate) fn observe_http_outcall_request( &self, context: &CanisterHttpRequestContext, - response: &Response, + response: &ConsensusResponse, ) { self.http_outcalls_metrics .request_size @@ -250,7 +250,7 @@ impl ExecutionEnvironmentMetrics { self.http_outcalls_metrics .payload_size - .observe(response.payload_size_bytes().get() as f64); + .observe(response.payload.size_bytes().get() as f64); } pub(crate) fn observe_http_outcall_price_change( diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 3eda6c33c653..281e63de0068 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -41,13 +41,13 @@ use ic_replicated_state::{ }; use ic_types::batch::ChainKeyData; use ic_types::ingress::{IngressState, IngressStatus}; -use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage}; +use ic_types::messages::{Ingress, MessageId, SubnetMessage}; use ic_types::{ CanisterId, CanisterLog, ComputeAllocation, DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, ExecutionRound, MemoryAllocation, NumBytes, NumInstructions, NumMessages, NumSlices, Randomness, ReplicaVersion, Time, }; -use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles}; +use ic_types_cycles::CanisterCyclesCostSchedule; use more_asserts::{debug_assert_ge, debug_assert_le, debug_assert_lt}; use std::cell::RefCell; use std::collections::{BTreeSet, VecDeque}; @@ -1306,20 +1306,7 @@ impl Scheduler for SchedulerImpl { // state. That can be changed in the future as we optimize scheduling. while let Some(response) = state.consensus_queue.pop() { let (new_state, _) = self.execute_subnet_message( - // Wrap the callback ID and payload into a Response, to make it easier for - // `execute_subnet_message()` to deal with. All other fields will be ignored by - // `execute_subnet_message()`. - SubnetMessage::Response( - Response { - originator: CanisterId::ic_00(), - respondent: CanisterId::ic_00(), - originator_reply_callback: response.callback, - refund: Cycles::zero(), - response_payload: response.payload, - deadline: NO_DEADLINE, - } - .into(), - ), + SubnetMessage::ConsensusResponse(Arc::new(response)), state, &mut csprng, current_round, @@ -1855,7 +1842,7 @@ fn can_execute_subnet_msg( let maybe_method = match msg { SubnetMessage::Ingress(ingress) => Ic00Method::from_str(ingress.method_name.as_str()).ok(), SubnetMessage::Request(request) => Ic00Method::from_str(request.method_name.as_str()).ok(), - SubnetMessage::Response { .. } => None, + SubnetMessage::ConsensusResponse { .. } => None, }; let Some(method) = maybe_method else { // If this is a response or the method name is not valid, execute the message. @@ -1907,7 +1894,7 @@ fn get_instruction_limits_for_subnet_message( // for install code in which case the default limits are overriden. let default_limits = InstructionLimits::new(NumInstructions::new(0), NumInstructions::new(0)); let method_name = match &msg { - SubnetMessage::Response { .. } => { + SubnetMessage::ConsensusResponse { .. } => { return default_limits; } SubnetMessage::Ingress(ingress) => &ingress.method_name, diff --git a/rs/https_outcalls/client/src/client.rs b/rs/https_outcalls/client/src/client.rs index 2c10c9859079..421dc980ea44 100644 --- a/rs/https_outcalls/client/src/client.rs +++ b/rs/https_outcalls/client/src/client.rs @@ -65,6 +65,7 @@ pub struct CanisterHttpAdapterClientImpl { rx: Receiver<(CanisterHttpResponse, CanisterHttpPaymentReceipt)>, query_service: TransformExecutionService, metrics: Metrics, + pricing_factory: PricingFactory, log: ReplicaLogger, } @@ -79,6 +80,7 @@ impl CanisterHttpAdapterClientImpl { ) -> Self { let (tx, rx) = channel(inflight_requests); let metrics = Metrics::new(&metrics_registry); + let pricing_factory = PricingFactory::new(&metrics_registry, log.clone()); Self { rt_handle, grpc_channel, @@ -86,6 +88,7 @@ impl CanisterHttpAdapterClientImpl { rx, query_service, metrics, + pricing_factory, log, } } @@ -121,6 +124,7 @@ impl NonBlockingChannel for CanisterHttpAdapterClientImpl { let mut http_adapter_client = HttpsOutcallsServiceClient::new(self.grpc_channel.clone()); let query_handler = self.query_service.clone(); let metrics = self.metrics.clone(); + let pricing_factory = self.pricing_factory.clone(); let log = self.log.clone(); // Spawn an async task that sends the canister http request to the adapter and awaits the response. @@ -133,7 +137,7 @@ impl NonBlockingChannel for CanisterHttpAdapterClientImpl { socks_proxy_addrs, } = canister_http_request; - let mut budget = PricingFactory::new_tracker(&request_context); + let mut budget = pricing_factory.new_tracker(&request_context); let request_size = request_context.variable_parts_size(); let CanisterHttpRequestContext { @@ -262,7 +266,12 @@ impl NonBlockingChannel for CanisterHttpAdapterClientImpl { } .await; - // Create the payment receipt after all processing is complete. + // Account for the final (post-transform) response that will be + // handed back to the caller, then create the payment receipt after + // all processing is complete. + if let Ok(resp) = &payload { + let _ = budget.subtract_transformed_response_usage(NumBytes::from(resp.len() as u64)); + } let receipt = budget.create_payment_receipt(); permit.send(( @@ -654,6 +663,7 @@ mod tests { replication: Replication::FullyReplicated, pricing_version: PricingVersion::Legacy, refund_status: RefundStatus::default(), + subnet_size: 13, }, socks_proxy_addrs: vec![], } diff --git a/rs/https_outcalls/consensus/BUILD.bazel b/rs/https_outcalls/consensus/BUILD.bazel index 02fa215eb284..35cfad0182f6 100644 --- a/rs/https_outcalls/consensus/BUILD.bazel +++ b/rs/https_outcalls/consensus/BUILD.bazel @@ -113,6 +113,7 @@ rust_bench( "//rs/test_utilities/registry", "//rs/test_utilities/state", "//rs/test_utilities/types", + "//rs/types/cycles", "//rs/types/types", "@crate_index//:criterion", ], diff --git a/rs/https_outcalls/consensus/benches/payload_validation.rs b/rs/https_outcalls/consensus/benches/payload_validation.rs index 3e45e797b1e3..fa75cd14fbd6 100644 --- a/rs/https_outcalls/consensus/benches/payload_validation.rs +++ b/rs/https_outcalls/consensus/benches/payload_validation.rs @@ -364,6 +364,7 @@ impl<'a> PayloadAssembler<'a> { metadata, signatures, }, + initial_refund: ic_types_cycles::Cycles::zero(), }); self.contexts.push(( CallbackId::new(callback_id), @@ -387,6 +388,7 @@ impl<'a> PayloadAssembler<'a> { metadata, signatures, }, + initial_refund: ic_types_cycles::Cycles::zero(), }); self.contexts.push(( CallbackId::new(callback_id), @@ -433,6 +435,7 @@ impl<'a> PayloadAssembler<'a> { flexible_responses.push(FlexibleCanisterHttpResponses { callback_id: CallbackId::new(callback_id), responses: entries, + initial_refund: ic_types_cycles::Cycles::zero(), }); self.contexts.push(( CallbackId::new(callback_id), @@ -492,6 +495,7 @@ fn request_context(replication: Replication) -> CanisterHttpRequestContext { replication, pricing_version: PricingVersion::Legacy, refund_status: RefundStatus::default(), + subnet_size: 4, } } diff --git a/rs/https_outcalls/consensus/src/gossip.rs b/rs/https_outcalls/consensus/src/gossip.rs index 818fd4b61ae3..907a239d0e7e 100644 --- a/rs/https_outcalls/consensus/src/gossip.rs +++ b/rs/https_outcalls/consensus/src/gossip.rs @@ -51,10 +51,20 @@ impl BouncerFactory let latest_state = self.state_reader.get_latest_state(); let subnet_call_context_manger = &latest_state.get_ref().metadata.subnet_call_context_manager; + // Accept shares both for active request contexts and for contexts + // whose responses have already been delivered to execution. The + // latter are kept around (until they time out) so that replicas + // responding late can still be refunded, which requires their + // shares to keep being fetched. let known_request_ids: BTreeSet<_> = subnet_call_context_manger .canister_http_request_contexts - .iter() - .map(|item| *item.0) + .keys() + .chain( + subnet_call_context_manger + .delivered_canister_http_request_contexts + .keys(), + ) + .copied() .collect(); let next_callback_id = subnet_call_context_manger.next_callback_id(); (known_request_ids, next_callback_id) @@ -78,7 +88,8 @@ impl BouncerFactory CallbackId::from(next_callback_id.get() + MAX_NUMBER_OF_REQUESTS_AHEAD); // The https outcalls share should be fetched in two cases: - // - The Id of the share is part of the state which means it is active. + // - The Id of the share is part of the state, which means it is either an active request + // or a request whose response was already delivered but is retained for late refunds. // - The callback Id is higher than the next callback Id (the next callback Id is the Id used next in execution round), but // not higher that `MAX_NUMBER_OF_REQUESTS_AHEAD`. // Receiving an callback Id higher is possible because the priority fn is updated periodically (every 3s) with the latest state diff --git a/rs/https_outcalls/consensus/src/payload_builder.rs b/rs/https_outcalls/consensus/src/payload_builder.rs index fe4203e45852..b52756bb0edb 100644 --- a/rs/https_outcalls/consensus/src/payload_builder.rs +++ b/rs/https_outcalls/consensus/src/payload_builder.rs @@ -42,9 +42,9 @@ use ic_replicated_state::ReplicatedState; use ic_types::{ CountBytes, Height, NodeId, NumBytes, SubnetId, batch::{ - CanisterHttpPayload, ConsensusResponse, FlexibleCanisterHttpError, - FlexibleCanisterHttpResponseWithProof, FlexibleCanisterHttpResponses, - MAX_CANISTER_HTTP_PAYLOAD_SIZE, ValidationContext, + CanisterHttpInitialRefund, CanisterHttpPayload, CanisterHttpRefunds, ConsensusResponse, + FlexibleCanisterHttpError, FlexibleCanisterHttpResponseWithProof, + FlexibleCanisterHttpResponses, MAX_CANISTER_HTTP_PAYLOAD_SIZE, ValidationContext, }, canister_http::{ CANISTER_HTTP_MAX_RESPONSES_PER_BLOCK, CANISTER_HTTP_TIMEOUT_INTERVAL, @@ -56,7 +56,7 @@ use ic_types::{ registry::RegistryClientError, }; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashSet}, sync::{Arc, RwLock}, }; @@ -265,9 +265,12 @@ impl CanisterHttpPayloadBuilderImpl { }; match &request.replication { Replication::FullyReplicated => { - if let Some(response) = - find_fully_replicated_response(grouped_shares, threshold, &*pool_access) - { + if let Some(response) = find_fully_replicated_response( + grouped_shares, + threshold, + request.subnet_size as u32, + &*pool_access, + ) { let candidate_size = response.count_bytes(); let size = NumBytes::new((accumulated_size + candidate_size) as u64); if size < max_payload_size { @@ -299,6 +302,7 @@ impl CanisterHttpPayloadBuilderImpl { if let Some(response) = find_non_replicated_response( grouped_shares, designated_node_id, + request.subnet_size as u32, &*pool_access, ) { let candidate_size = response.count_bytes(); @@ -322,6 +326,7 @@ impl CanisterHttpPayloadBuilderImpl { *max_responses, accumulated_size, max_payload_size, + request.subnet_size as u32, &*pool_access, ) { FlexibleFindResult::OkResponses(group, group_size) => { @@ -526,6 +531,21 @@ impl CanisterHttpPayloadBuilderImpl { ) .map_err(CanisterHttpPayloadValidationError::InvalidArtifact)?; } + + // The collective initial refund must match the value recomputed from + // the request context's subnet size and the signed per-replica receipts. + let computed_refund = utils::fully_replicated_initial_refund( + &response.proof, + request_context.subnet_size as u32, + ); + if response.initial_refund != computed_refund { + return invalid_artifact(InvalidCanisterHttpPayloadReason::InitialRefundMismatch { + callback_id, + payload_refund: response.initial_refund, + computed_refund, + }); + } + // Reconstruct the per-signer shares from the response proof. reconstructed_shares.extend(utils::reconstruct_individual_shares(&response.proof)); } @@ -656,6 +676,20 @@ impl CanisterHttpPayloadBuilderImpl { } } + // The collective initial refund must match the value recomputed from + // the request context's subnet size and the signed per-replica receipts. + let computed_refund = utils::flexible_initial_refund( + group.responses.iter().map(|r| &r.proof), + context.subnet_size as u32, + ); + if group.initial_refund != computed_refund { + return invalid_artifact(InvalidCanisterHttpPayloadReason::InitialRefundMismatch { + callback_id, + payload_refund: group.initial_refund, + computed_refund, + }); + } + // Defer signature verification. sig_inputs.extend(response_share_sig_inputs( group.responses.iter().map(|r| &r.proof), @@ -699,7 +733,9 @@ impl CanisterHttpPayloadBuilderImpl { } } FlexibleCanisterHttpError::TooManyRejects { - reject_responses, .. + reject_responses, + initial_refund, + .. } => { let mut seen_signers = HashSet::new(); @@ -734,6 +770,22 @@ impl CanisterHttpPayloadBuilderImpl { ); } + // The collective initial refund must match the value recomputed + // from the request context's subnet size and the signed receipts. + let computed_refund = utils::flexible_initial_refund( + reject_responses.iter().map(|r| &r.proof), + context.subnet_size as u32, + ); + if *initial_refund != computed_refund { + return invalid_artifact( + InvalidCanisterHttpPayloadReason::InitialRefundMismatch { + callback_id, + payload_refund: *initial_refund, + computed_refund, + }, + ); + } + // Defer signature verification. sig_inputs.extend(response_share_sig_inputs( reject_responses.iter().map(|r| &r.proof), @@ -915,77 +967,142 @@ impl BatchPayloadBuilder for CanisterHttpPayloadBuilderImpl { } } -impl IntoMessages<(Vec, CanisterHttpBatchStats)> - for CanisterHttpPayloadBuilderImpl +impl + IntoMessages<( + Vec, + CanisterHttpRefunds, + CanisterHttpBatchStats, + )> for CanisterHttpPayloadBuilderImpl { - fn into_messages(payload: &[u8]) -> (Vec, CanisterHttpBatchStats) { + fn into_messages( + payload: &[u8], + ) -> ( + Vec, + CanisterHttpRefunds, + CanisterHttpBatchStats, + ) { let mut stats = CanisterHttpBatchStats::default(); let messages = bytes_to_payload(payload) .expect("Failed to parse a payload that was already validated"); - let responses = messages.responses.into_iter().map(|response| { + let mut consensus_responses = Vec::new(); + let mut refunds = CanisterHttpRefunds::default(); + + // Fully-replicated (and non-replicated) responses: deliver the + // collective initial refund that was computed during payload building + // and validated during payload validation. Divergence and timeout + // responses carry no refund. + for response in messages.responses { if response.proof.signatures.len() == 1 { stats.single_signature_responses += 1; } stats.responses += 1; - ConsensusResponse::new( - response.content.id, + + let callback = response.content.id; + let nodes: BTreeSet = response.proof.signatures.keys().copied().collect(); + let amount = response.initial_refund; + + consensus_responses.push(ConsensusResponse::new( + callback, match response.content.content { CanisterHttpResponseContent::Success(data) => Payload::Data(data), CanisterHttpResponseContent::Reject(canister_http_reject) => { Payload::Reject(RejectContext::from(&canister_http_reject)) } }, - ) - }); + )); + if !nodes.is_empty() { + refunds.initial.push(CanisterHttpInitialRefund { + callback, + amount, + nodes, + }); + } + } - let timeouts = messages.timeouts.iter().map(|callback| { - // Map timeouts to a rejected response + // Timeouts: map to a rejected response. A timed-out request has no + // signed shares, hence no refunds. + for callback in &messages.timeouts { stats.timeouts += 1; - ConsensusResponse::new( + consensus_responses.push(ConsensusResponse::new( *callback, Payload::Reject(RejectContext::new( RejectCode::SysTransient, "Canister http request timed out", )), - ) - }); + )); + } - let divergence_responses = messages - .divergence_responses - .into_iter() - .filter_map(divergence_response_into_reject) - .inspect(|_| stats.divergence_responses += 1); - - let flexible_ok_responses = messages - .flexible_responses - .into_iter() - .map(flexible_ok_responses_into_consensus_response) - .inspect(|result| match result { - Some(_) => stats.flexible_ok_responses += 1, + for divergence_response in messages.divergence_responses { + if let Some(consensus_response) = divergence_response_into_reject(divergence_response) { + stats.divergence_responses += 1; + consensus_responses.push(consensus_response); + } + } + + for response_group in messages.flexible_responses { + // The collective initial refund was computed during payload building + // and validated during payload validation. + let callback = response_group.callback_id; + let amount = response_group.initial_refund; + let nodes: BTreeSet = response_group + .responses + .iter() + .map(|r| r.proof.signature.signer) + .collect(); + match flexible_ok_responses_into_consensus_response(response_group) { + Some(consensus_response) => { + stats.flexible_ok_responses += 1; + consensus_responses.push(consensus_response); + if !nodes.is_empty() { + refunds.initial.push(CanisterHttpInitialRefund { + callback, + amount, + nodes, + }); + } + } None => stats.flexible_ok_responses_candid_failures += 1, - }) - .flatten(); - - let flexible_errors = messages - .flexible_errors - .into_iter() - .map(flexible_error_into_consensus_response) - .inspect(|result| match result { - Some(_) => stats.flexible_errors += 1, - None => stats.flexible_errors_candid_failures += 1, - }) - .flatten(); + } + } - let responses = responses - .chain(timeouts) - .chain(divergence_responses) - .chain(flexible_ok_responses) - .chain(flexible_errors) - .collect(); + for error in messages.flexible_errors { + // Only errors that carry full responses (too-many-rejects) produce + // an initial refund; timeouts and responses-too-large do not. + let refund = match &error { + FlexibleCanisterHttpError::TooManyRejects { + reject_responses, + initial_refund, + .. + } => { + let nodes: BTreeSet = reject_responses + .iter() + .map(|r| r.proof.signature.signer) + .collect(); + Some(CanisterHttpInitialRefund { + callback: error.callback_id(), + amount: *initial_refund, + nodes, + }) + } + _ => None, + }; + match flexible_error_into_consensus_response(error) { + Some(consensus_response) => { + stats.flexible_errors += 1; + consensus_responses.push(consensus_response); + if let Some(refund) = refund + && !refund.nodes.is_empty() + { + refunds.initial.push(refund); + } + } + None => stats.flexible_errors_candid_failures += 1, + } + } - (responses, stats) + (consensus_responses, refunds, stats) } } diff --git a/rs/https_outcalls/consensus/src/payload_builder/tests.rs b/rs/https_outcalls/consensus/src/payload_builder/tests.rs index 2e959e90a5e6..b84b400f6d2a 100644 --- a/rs/https_outcalls/consensus/src/payload_builder/tests.rs +++ b/rs/https_outcalls/consensus/src/payload_builder/tests.rs @@ -209,6 +209,7 @@ fn multiple_payload_test() { metadata: past_metadata, signatures: BTreeMap::new(), }, + initial_refund: ic_types_cycles::Cycles::zero(), }], timeouts: vec![], divergence_responses: vec![], @@ -846,6 +847,7 @@ fn non_replicated_request_response_coming_in_gossip_payload_created() { replication: ic_types::canister_http::Replication::NonReplicated(delegated_node_id), pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; // Insert the context in the replicated state @@ -917,6 +919,7 @@ fn non_replicated_request_with_extra_share_includes_only_delegated_share() { replication: ic_types::canister_http::Replication::NonReplicated(delegated_node_id), pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; // Insert the context in the replicated state @@ -989,6 +992,7 @@ fn non_replicated_share_is_ignored_if_content_is_missing() { replication: ic_types::canister_http::Replication::NonReplicated(delegated_node_id), pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; inject_request_contexts(&mut payload_builder, [(callback_id, request_context)]); @@ -1039,6 +1043,7 @@ fn validate_payload_succeeds_for_valid_non_replicated_response() { replication: ic_types::canister_http::Replication::NonReplicated(delegated_node_id), pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; // Inject this context into the state reader used by the validator. @@ -1150,6 +1155,7 @@ fn validate_payload_fails_for_refund_exceeding_allowance_flexible_response() { response, proof: share_with_excess_refund(0, &metadata), }], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -1182,6 +1188,7 @@ fn validate_payload_fails_for_refund_exceeding_allowance_too_many_rejects() { response, proof: share_with_excess_refund(0, &metadata), }], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -1242,6 +1249,7 @@ fn validate_payload_fails_for_non_replicated_response_with_wrong_signer() { replication: ic_types::canister_http::Replication::NonReplicated(delegated_node_id), pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; // Inject this context into the state reader. @@ -1310,6 +1318,7 @@ fn validate_payload_fails_for_response_with_no_signatures() { replication: ic_types::canister_http::Replication::NonReplicated(delegated_node_id), pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; // Inject this context into the state reader used by the validator. @@ -1385,6 +1394,7 @@ fn validate_payload_fails_when_non_replicated_proof_is_for_fully_replicated_requ replication: ic_types::canister_http::Replication::FullyReplicated, pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; // Inject this context into the state reader. @@ -1461,6 +1471,7 @@ fn validate_payload_fails_for_duplicate_non_replicated_response() { replication: ic_types::canister_http::Replication::NonReplicated(delegated_node_id), pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, }; // 2. Inject this context into the state reader @@ -1639,6 +1650,7 @@ pub(crate) fn response_and_metadata_to_proof( metadata: metadata.clone(), signatures: BTreeMap::new(), }, + initial_refund: ic_types_cycles::Cycles::zero(), } } @@ -2146,6 +2158,7 @@ fn flexible_valid_mixed_content_responses() { flexible_response(42, 1, b"response_b"), flexible_response(42, 2, b"response_c"), ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2170,6 +2183,7 @@ fn flexible_valid_at_min_responses_boundary() { flexible_response(42, 0, b"a"), flexible_response(42, 1, b"b"), ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2196,6 +2210,7 @@ fn flexible_valid_at_max_responses_boundary() { flexible_response(42, 2, b"c"), flexible_response(42, 3, b"d"), ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2217,6 +2232,7 @@ fn flexible_valid_with_zero_min_and_max_responses() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2239,10 +2255,12 @@ fn flexible_invalid_duplicate_callback_id_within_payload() { FlexibleCanisterHttpResponses { callback_id, responses: vec![flexible_response(42, 0, b"a")], + initial_refund: ic_types_cycles::Cycles::zero(), }, FlexibleCanisterHttpResponses { callback_id, responses: vec![flexible_response(42, 1, b"b")], + initial_refund: ic_types_cycles::Cycles::zero(), }, ]); @@ -2272,6 +2290,7 @@ fn flexible_invalid_already_delivered_callback_id() { let group = FlexibleCanisterHttpResponses { callback_id, responses: vec![flexible_response(42, 0, b"a")], + initial_refund: ic_types_cycles::Cycles::zero(), }; let past_payload = flexible_payload(vec![group.clone()]); let past_payload_bytes = payload_to_bytes_max_4mb(past_payload); @@ -2285,6 +2304,7 @@ fn flexible_invalid_already_delivered_callback_id() { let current_payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![flexible_response(42, 1, b"b")], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2313,6 +2333,7 @@ fn flexible_invalid_fewer_than_min_responses() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![flexible_response(42, 0, b"only_one")], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2350,6 +2371,7 @@ fn flexible_invalid_more_than_max_responses() { flexible_response(42, 1, b"b"), flexible_response(42, 2, b"c"), ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2383,6 +2405,7 @@ fn flexible_invalid_empty_group_with_nonzero_min() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2416,6 +2439,7 @@ fn flexible_valid_empty_group_with_zero_min() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2441,6 +2465,7 @@ fn flexible_invalid_callback_id_mismatch_in_proof() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2472,6 +2497,7 @@ fn flexible_invalid_duplicate_signer() { flexible_response(42, 0, b"a"), flexible_response(42, 0, b"b"), // same signer ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2503,6 +2529,7 @@ fn flexible_invalid_signer_not_in_committee() { flexible_response(42, 0, b"a"), flexible_response(42, 3, b"b"), // node 3 not in committee ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2536,6 +2563,7 @@ fn flexible_invalid_content_hash_mismatch() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2572,6 +2600,7 @@ fn flexible_invalid_content_size_mismatch() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2606,6 +2635,7 @@ fn flexible_invalid_is_reject_mismatch() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2672,6 +2702,7 @@ fn non_flexible_response_in_flexible_section_rejected() { flexible_response(42, 0, b"a"), flexible_response(42, 1, b"b"), ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2710,6 +2741,7 @@ fn flexible_invalid_unknown_callback_id() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id: unknown_id, responses: vec![flexible_response(999, 0, b"a")], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2742,6 +2774,7 @@ fn flexible_invalid_rejects_in_ok_responses() { flexible_reject_response(42, 1), flexible_response(42, 2, b"another_good"), ], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2776,6 +2809,7 @@ fn flexible_invalid_callback_id_mismatch_in_response() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2810,6 +2844,7 @@ fn flexible_invalid_registry_version_mismatch() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2849,6 +2884,7 @@ fn flexible_invalid_signature_error() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![flexible_response(42, 0, b"data")], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let result = payload_builder.validate_payload( @@ -2893,10 +2929,11 @@ fn flexible_ok_responses_into_messages_success_round_trip() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![entry_a, entry_b], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let bytes = payload_to_bytes_max_4mb(payload); - let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + let (responses, _refunds, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); assert_eq!(responses.len(), 1); assert_eq!(responses[0].callback, callback_id); @@ -2940,10 +2977,11 @@ fn flexible_ok_responses_into_messages_skips_reject_entries() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![success_entry, reject_entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let bytes = payload_to_bytes_max_4mb(payload); - let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + let (responses, _refunds, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); assert_eq!(responses.len(), 1); let Payload::Data(ref data) = responses[0].payload else { @@ -2971,20 +3009,23 @@ fn flexible_ok_responses_into_messages_stats_count_multiple_groups() { let group_a = FlexibleCanisterHttpResponses { callback_id: CallbackId::from(1), responses: vec![flexible_response(1, 0, &payload_data)], + initial_refund: ic_types_cycles::Cycles::zero(), }; let group_b = FlexibleCanisterHttpResponses { callback_id: CallbackId::from(2), responses: vec![flexible_response(2, 1, &payload_data)], + initial_refund: ic_types_cycles::Cycles::zero(), }; let group_c = FlexibleCanisterHttpResponses { callback_id: CallbackId::from(3), responses: vec![flexible_response(3, 2, &payload_data)], + initial_refund: ic_types_cycles::Cycles::zero(), }; let payload = flexible_payload(vec![group_a, group_b, group_c]); let bytes = payload_to_bytes_max_4mb(payload); - let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + let (responses, _refunds, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); assert_eq!(responses.len(), 3); assert_eq!(stats.flexible_ok_responses, 3); @@ -3007,10 +3048,11 @@ fn flexible_ok_responses_into_messages_decode_failure_is_skipped() { let payload = flexible_payload(vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![valid_entry, invalid_entry], + initial_refund: ic_types_cycles::Cycles::zero(), }]); let bytes = payload_to_bytes_max_4mb(payload); - let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + let (responses, _refunds, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); assert_eq!(responses.len(), 0); assert_eq!(stats.flexible_ok_responses, 0); @@ -3027,7 +3069,7 @@ fn flexible_error_into_messages_timeout() { }; let bytes = payload_to_bytes_max_4mb(payload); - let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + let (responses, _refunds, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); assert_eq!(responses.len(), 1); assert_eq!(responses[0].callback, callback_id); @@ -3061,12 +3103,13 @@ fn flexible_error_into_messages_too_many_rejects() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: reject_entries, + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; let bytes = payload_to_bytes_max_4mb(payload); - let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + let (responses, _refunds, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); assert_eq!(responses.len(), 1); assert_eq!(responses[0].callback, callback_id); @@ -3121,7 +3164,7 @@ fn flexible_error_into_messages_responses_too_large() { }; let bytes = payload_to_bytes_max_4mb(payload); - let (responses, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); + let (responses, _refunds, stats) = CanisterHttpPayloadBuilderImpl::into_messages(&bytes); assert_eq!(responses.len(), 1); assert_eq!(responses[0].callback, callback_id); @@ -3468,6 +3511,7 @@ fn flexible_build_too_many_rejects() { FlexibleCanisterHttpError::TooManyRejects { callback_id: cb, reject_responses, + .. } => { assert_eq!(*cb, callback_id); assert_eq!(reject_responses.len(), 2); @@ -3746,6 +3790,7 @@ fn flexible_error_duplicate_callback_id_cross_type() { flexible_responses: vec![FlexibleCanisterHttpResponses { callback_id, responses: vec![flexible_response(42, 0, b"a")], + initial_refund: ic_types_cycles::Cycles::zero(), }], flexible_errors: vec![FlexibleCanisterHttpError::Timeout { callback_id }], ..Default::default() @@ -4288,6 +4333,7 @@ fn flexible_error_too_many_rejects_valid() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: reject_entries, + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4318,6 +4364,7 @@ fn flexible_error_too_many_rejects_insufficient_rejects() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: reject_entries, + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4356,6 +4403,7 @@ fn flexible_error_too_many_rejects_non_reject_content() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![ok_entry, reject_entry], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4390,6 +4438,7 @@ fn flexible_error_too_many_rejects_duplicate_signer() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_a, entry_b], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4425,6 +4474,7 @@ fn flexible_error_too_many_rejects_signer_not_in_committee() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_a, entry_b], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4460,6 +4510,7 @@ fn flexible_error_too_many_rejects_callback_id_mismatch_in_response() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_ok, entry_wrong], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4495,6 +4546,7 @@ fn flexible_error_too_many_rejects_registry_version_mismatch() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_ok, entry_bad], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4531,6 +4583,7 @@ fn flexible_error_too_many_rejects_content_hash_mismatch() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_ok, entry_bad], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4566,6 +4619,7 @@ fn flexible_error_too_many_rejects_content_size_mismatch() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_ok, entry_bad], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4601,6 +4655,7 @@ fn flexible_error_too_many_rejects_is_reject_mismatch() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_ok, entry_bad], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4637,6 +4692,7 @@ fn flexible_error_too_many_rejects_proof_id_mismatch() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: vec![entry_ok, entry_bad], + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4677,6 +4733,7 @@ fn flexible_error_too_many_rejects_invalid_signature() { flexible_errors: vec![FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: reject_entries, + initial_refund: ic_types_cycles::Cycles::zero(), }], ..Default::default() }; @@ -4776,6 +4833,7 @@ pub(crate) fn request_context(replication: Replication) -> CanisterHttpRequestCo replication, pricing_version: ic_types::canister_http::PricingVersion::Legacy, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, } } @@ -4800,6 +4858,7 @@ fn flexible_request_context( }, pricing_version: ic_types::canister_http::PricingVersion::PayAsYouGo, refund_status: ic_types::canister_http::RefundStatus::default(), + subnet_size: 4, } } diff --git a/rs/https_outcalls/consensus/src/payload_builder/utils.rs b/rs/https_outcalls/consensus/src/payload_builder/utils.rs index 18b0635ce959..6840b98e8e0c 100644 --- a/rs/https_outcalls/consensus/src/payload_builder/utils.rs +++ b/rs/https_outcalls/consensus/src/payload_builder/utils.rs @@ -75,6 +75,56 @@ pub(crate) fn check_response_consistency( Ok(()) } +/// Per-replica consensus cost coefficient `10 * N + 600`, where `N` is the +/// subnet size recorded in the request context. +fn consensus_cost_coefficient(subnet_size: u32) -> u128 { + let n = subnet_size as u128; + n * (10 * n + 600) +} + +/// Computes the collective initial refund for a fully-replicated (or +/// non-replicated) HTTP outcall response. +/// +/// The refund is the sum of the per-replica refunds claimed in the proof's +/// payment receipts, minus the consensus cost `N x (10 x N + 600) x +/// `, saturating to zero. +pub(crate) fn fully_replicated_initial_refund( + proof: &CanisterHttpResponseProof, + subnet_size: u32, +) -> Cycles { + let refund_sum: Cycles = proof + .signatures + .values() + .map(|sig| sig.payment_receipt.refund) + .sum(); + let consensus_cost = + Cycles::from(consensus_cost_coefficient(subnet_size) * proof.metadata.content_size as u128); + // `Sub` for `Cycles` saturates at zero. + refund_sum - consensus_cost +} + +/// Computes the collective initial refund for a group of flexible HTTP outcall +/// responses (used both for successful responses and `TooManyRejects` errors). +/// +/// The refund is the sum of the per-replica refunds claimed in the shares' +/// payment receipts, minus the consensus cost +/// `N x (10 x N + 600) x sum over K replicas (181 + )`, +/// saturating to zero. +pub(crate) fn flexible_initial_refund<'a>( + shares: impl Iterator, + subnet_size: u32, +) -> Cycles { + let mut refund_sum = Cycles::zero(); + let mut size_term: u128 = 0; + for share in shares { + refund_sum += share.content.payment_receipt.refund; + size_term += 181 + share.content.content_size() as u128; + } + let consensus_cost = Cycles::from(consensus_cost_coefficient(subnet_size) * size_term); + // `Sub` for `Cycles` saturates at zero. + refund_sum - consensus_cost +} + /// Enforces the per-replica refund allowance from the request context: the /// `refund` claimed in the payment receipt must never exceed the /// `per_replica_allowance` derived from the request's context. @@ -353,6 +403,7 @@ pub(crate) fn group_shares_by_callback_id< pub(crate) fn find_fully_replicated_response( grouped_shares: &BTreeMap>, threshold: usize, + subnet_size: u32, pool_access: &dyn CanisterHttpPool, ) -> Option { grouped_shares.iter().find_map(|(metadata, shares)| { @@ -360,9 +411,14 @@ pub(crate) fn find_fully_replicated_response( if signers.len() >= threshold { pool_access .get_response_content_by_hash(&metadata.content_hash) - .map(|content| CanisterHttpResponseWithConsensus { - content, - proof: aggregate_shares(metadata.clone(), shares), + .map(|content| { + let proof = aggregate_shares(metadata.clone(), shares); + let initial_refund = fully_replicated_initial_refund(&proof, subnet_size); + CanisterHttpResponseWithConsensus { + content, + proof, + initial_refund, + } }) } else { None @@ -377,6 +433,7 @@ pub(crate) fn find_fully_replicated_response( pub(crate) fn find_non_replicated_response( grouped_shares: &BTreeMap>, designated_node_id: &NodeId, + subnet_size: u32, pool_access: &dyn CanisterHttpPool, ) -> Option { grouped_shares.iter().find_map(|(metadata, shares)| { @@ -386,9 +443,14 @@ pub(crate) fn find_non_replicated_response( .and_then(|correct_share| { pool_access .get_response_content_by_hash(&metadata.content_hash) - .map(|content| CanisterHttpResponseWithConsensus { - content, - proof: aggregate_shares(metadata.clone(), &[correct_share]), + .map(|content| { + let proof = aggregate_shares(metadata.clone(), &[correct_share]); + let initial_refund = fully_replicated_initial_refund(&proof, subnet_size); + CanisterHttpResponseWithConsensus { + content, + proof, + initial_refund, + } }) }) }) @@ -427,6 +489,7 @@ pub(crate) fn find_flexible_result( max_responses: u32, accumulated_size: usize, max_payload_size: NumBytes, + subnet_size: u32, pool_access: &dyn CanisterHttpPool, ) -> FlexibleFindResult { let mut entries_sorted_asc: Vec<_> = grouped_shares.iter().collect(); @@ -434,7 +497,7 @@ pub(crate) fn find_flexible_result( let min_responses = min_responses as usize; let mut ok_responses: Vec<(CanisterHttpResponse, &CanisterHttpResponseShare)> = Vec::new(); - let mut ok_responses_size = size_of::(); + let mut ok_responses_size = size_of::() + size_of::(); // Tracks all signers processed (both OK and reject) let mut seen_signers = BTreeSet::new(); let mut reject_responses: Vec<(CanisterHttpResponse, &CanisterHttpResponseShare)> = Vec::new(); @@ -478,6 +541,8 @@ pub(crate) fn find_flexible_result( // 1. Enough OK responses collected? if ok_responses.len() >= min_responses { + let initial_refund = + flexible_initial_refund(ok_responses.iter().map(|(_, share)| *share), subnet_size); return FlexibleFindResult::OkResponses( FlexibleCanisterHttpResponses { callback_id, @@ -488,6 +553,7 @@ pub(crate) fn find_flexible_result( proof: share.clone(), }) .collect(), + initial_refund, }, ok_responses_size, ); @@ -495,6 +561,10 @@ pub(crate) fn find_flexible_result( // 2. Too many nodes returned rejects (so that we can never reach min_responses OK responses)? if reject_responses.len() > committee.len().saturating_sub(min_responses) { + let initial_refund = flexible_initial_refund( + reject_responses.iter().map(|(_, share)| *share), + subnet_size, + ); let error = FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses: reject_responses @@ -504,6 +574,7 @@ pub(crate) fn find_flexible_result( proof: share.clone(), }) .collect(), + initial_refund, }; let error_size = error.count_bytes(); return FlexibleFindResult::Error(error, error_size); diff --git a/rs/https_outcalls/consensus/src/pool_manager.rs b/rs/https_outcalls/consensus/src/pool_manager.rs index 198105f8b807..1f095a709d5d 100644 --- a/rs/https_outcalls/consensus/src/pool_manager.rs +++ b/rs/https_outcalls/consensus/src/pool_manager.rs @@ -164,10 +164,18 @@ impl CanisterHttpPoolManagerImpl { self.requested_id_cache.borrow_mut().remove(callback_id); } + // Shares whose context's response was already delivered to execution + // must not be purged: such contexts are retained (until they time out) + // so that replicas responding late can still be refunded, which + // requires keeping their shares around. We therefore retain shares for + // both active and delivered contexts. + let mut retained_callback_ids = active_callback_ids; + retained_callback_ids.extend(self.delivered_callback_ids()); + canister_http_pool .get_validated_shares() .filter_map(|share| { - if active_callback_ids.contains(&share.content.id()) { + if retained_callback_ids.contains(&share.content.id()) { None } else { Some(CanisterHttpChangeAction::RemoveValidated(share.clone())) @@ -180,7 +188,7 @@ impl CanisterHttpPoolManagerImpl { .filter(|artifact| artifact.share.content.id() < next_callback_id) .filter_map(|artifact| { let share = &artifact.share; - if active_callback_ids.contains(&share.content.id()) { + if retained_callback_ids.contains(&share.content.id()) { None } else { Some(CanisterHttpChangeAction::RemoveUnvalidated(share.clone())) @@ -191,7 +199,7 @@ impl CanisterHttpPoolManagerImpl { canister_http_pool .get_response_content_items() .filter_map(|content| { - if active_callback_ids.contains(&content.1.id) { + if retained_callback_ids.contains(&content.1.id) { None } else { Some(CanisterHttpChangeAction::RemoveContent(content.0.clone())) @@ -464,11 +472,15 @@ impl CanisterHttpPoolManagerImpl { return Vec::new(); }; - let active_contexts = &self - .latest_state() - .metadata - .subnet_call_context_manager - .canister_http_request_contexts; + // Validate shares for both active request contexts and contexts whose + // responses have already been delivered to execution. The latter are + // retained (until they time out) so that replicas responding late can + // still be refunded, which requires validating their late shares. + let latest_state = self.latest_state(); + let subnet_call_context_manager = &latest_state.metadata.subnet_call_context_manager; + let active_contexts = &subnet_call_context_manager.canister_http_request_contexts; + let delivered_contexts = + &subnet_call_context_manager.delivered_canister_http_request_contexts; let next_callback_id = self.next_callback_id(); let key_from_share = @@ -495,7 +507,10 @@ impl CanisterHttpPoolManagerImpl { }; } - let Some(context) = active_contexts.get(&share.content.id()) else { + let Some(context) = active_contexts + .get(&share.content.id()) + .or_else(|| delivered_contexts.get(&share.content.id())) + else { return Some(CanisterHttpChangeAction::RemoveUnvalidated(share.clone())); }; @@ -672,6 +687,22 @@ impl CanisterHttpPoolManagerImpl { .collect() } + /// Callback ids of contexts whose responses have already been delivered to + /// execution. These contexts are retained (until they time out) so that + /// replicas responding late can still be refunded; their shares must + /// therefore be kept around and validated as well. + fn delivered_callback_ids(&self) -> BTreeSet { + self.state_reader + .get_latest_state() + .get_ref() + .metadata + .subnet_call_context_manager + .delivered_canister_http_request_contexts + .keys() + .copied() + .collect() + } + fn latest_state(&self) -> Arc { self.state_reader.get_latest_state().get_ref().clone() } @@ -787,6 +818,7 @@ pub mod test { replication, pricing_version, refund_status: RefundStatus::default(), + subnet_size: 4, } } diff --git a/rs/https_outcalls/pricing/BUILD.bazel b/rs/https_outcalls/pricing/BUILD.bazel index bb6fa3989628..2c948a5854c5 100644 --- a/rs/https_outcalls/pricing/BUILD.bazel +++ b/rs/https_outcalls/pricing/BUILD.bazel @@ -8,8 +8,14 @@ rust_library( crate_name = "ic_https_outcalls_pricing", version = "0.1.0", deps = [ + # Keep sorted. "//rs/config", + "//rs/monitoring/logger", + "//rs/monitoring/metrics", + "//rs/types/cycles", "//rs/types/types", + "@crate_index//:prometheus", + "@crate_index//:slog", ], ) diff --git a/rs/https_outcalls/pricing/Cargo.toml b/rs/https_outcalls/pricing/Cargo.toml index 1773aa69401f..3de7839e79db 100644 --- a/rs/https_outcalls/pricing/Cargo.toml +++ b/rs/https_outcalls/pricing/Cargo.toml @@ -8,4 +8,9 @@ documentation.workspace = true [dependencies] ic-config = { path = "../../config" } -ic-types = { path = "../../types/types" } \ No newline at end of file +ic-logger = { path = "../../monitoring/logger" } +ic-metrics = { path = "../../monitoring/metrics" } +ic-types = { path = "../../types/types" } +ic-types-cycles = { path = "../../types/cycles" } +prometheus = { workspace = true } +slog = { workspace = true } \ No newline at end of file diff --git a/rs/https_outcalls/pricing/src/dark_launch.rs b/rs/https_outcalls/pricing/src/dark_launch.rs new file mode 100644 index 000000000000..be4475bf4195 --- /dev/null +++ b/rs/https_outcalls/pricing/src/dark_launch.rs @@ -0,0 +1,269 @@ +use ic_logger::{ReplicaLogger, warn}; +use ic_types::{CanisterId, NumBytes, NumInstructions, canister_http::CanisterHttpPaymentReceipt}; + +use crate::{AdapterLimits, BudgetTracker, NetworkUsage, PricingError, metrics::PricingMetrics}; + +/// A [`BudgetTracker`] that runs two child trackers side by side: a `real` +/// tracker whose results are the only ones returned (and therefore the only +/// ones that affect observable behaviour), and a `shadow` tracker whose results +/// are merely compared against the real one. +/// +/// Whenever the shadow tracker disagrees with the real tracker (e.g. it returns +/// a pricing error where the real tracker succeeded), the divergence is counted +/// in a metric and logged together with the canister id, so we can measure what +/// fraction of requests would not be backwards compatible under the shadow +/// pricing and which canisters would break. +pub struct DarkLaunchTracker { + real: Box, + shadow: Box, + canister_id: CanisterId, + metrics: PricingMetrics, + log: ReplicaLogger, + /// Whether an incompatibility has already been recorded for this request. + /// Ensures we count and log at most once per request. + reported: bool, +} + +impl DarkLaunchTracker { + pub fn new( + real: Box, + shadow: Box, + canister_id: CanisterId, + metrics: PricingMetrics, + log: ReplicaLogger, + ) -> Self { + Self { + real, + shadow, + canister_id, + metrics, + log, + reported: false, + } + } + + /// Compares the results of the real and shadow trackers for a given + /// accounting `step` and records a divergence if they disagree. + fn compare( + &mut self, + step: &str, + real: &Result<(), PricingError>, + shadow: &Result<(), PricingError>, + ) { + if real.is_ok() == shadow.is_ok() { + return; + } + if self.reported { + return; + } + self.reported = true; + self.metrics + .shadow_incompatible_total + .with_label_values(&[step]) + .inc(); + warn!( + self.log, + "Canister http request would not be backwards compatible under shadow pricing: \ + canister_id {}, step {}, real_result {:?}, shadow_result {:?}", + self.canister_id, + step, + real, + shadow, + ); + } +} + +impl BudgetTracker for DarkLaunchTracker { + fn get_adapter_limits(&self) -> AdapterLimits { + // Only the real tracker drives observable behaviour. + self.real.get_adapter_limits() + } + + fn subtract_network_usage(&mut self, network_usage: NetworkUsage) -> Result<(), PricingError> { + let real = self.real.subtract_network_usage(network_usage); + let shadow = self.shadow.subtract_network_usage(network_usage); + self.compare("network_usage", &real, &shadow); + real + } + + fn get_transform_limit(&self) -> NumInstructions { + self.real.get_transform_limit() + } + + fn subtract_transform_usage(&mut self, usage: NumInstructions) -> Result<(), PricingError> { + let real = self.real.subtract_transform_usage(usage); + let shadow = self.shadow.subtract_transform_usage(usage); + self.compare("transform_usage", &real, &shadow); + real + } + + fn subtract_transformed_response_usage( + &mut self, + transformed_response_size: NumBytes, + ) -> Result<(), PricingError> { + let real = self + .real + .subtract_transformed_response_usage(transformed_response_size); + let shadow = self + .shadow + .subtract_transformed_response_usage(transformed_response_size); + self.compare("transformed_response_usage", &real, &shadow); + real + } + + fn create_payment_receipt(&self) -> CanisterHttpPaymentReceipt { + // Count every request that reaches the final accounting step so the + // incompatible counter can be expressed as a fraction of the total. + self.metrics.shadow_requests_total.inc(); + self.real.create_payment_receipt() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ic_logger::no_op_logger; + use ic_metrics::MetricsRegistry; + use std::time::Duration; + + /// A [`BudgetTracker`] whose accounting steps return preconfigured results. + struct FakeTracker { + network: Result<(), PricingError>, + transform: Result<(), PricingError>, + transformed: Result<(), PricingError>, + } + + impl FakeTracker { + fn ok() -> Self { + Self { + network: Ok(()), + transform: Ok(()), + transformed: Ok(()), + } + } + } + + impl BudgetTracker for FakeTracker { + fn get_adapter_limits(&self) -> AdapterLimits { + AdapterLimits { + max_response_size: NumBytes::from(0), + max_response_time: Duration::ZERO, + } + } + fn subtract_network_usage(&mut self, _: NetworkUsage) -> Result<(), PricingError> { + self.network + } + fn get_transform_limit(&self) -> NumInstructions { + NumInstructions::from(0) + } + fn subtract_transform_usage(&mut self, _: NumInstructions) -> Result<(), PricingError> { + self.transform + } + fn subtract_transformed_response_usage(&mut self, _: NumBytes) -> Result<(), PricingError> { + self.transformed + } + fn create_payment_receipt(&self) -> CanisterHttpPaymentReceipt { + CanisterHttpPaymentReceipt::default() + } + } + + fn dark_launch( + real: FakeTracker, + shadow: FakeTracker, + metrics: PricingMetrics, + ) -> DarkLaunchTracker { + DarkLaunchTracker::new( + Box::new(real), + Box::new(shadow), + CanisterId::from_u64(7), + metrics, + no_op_logger(), + ) + } + + fn network_usage() -> NetworkUsage { + NetworkUsage { + response_size: NumBytes::from(0), + response_time: Duration::ZERO, + } + } + + fn incompatible_count(metrics: &PricingMetrics) -> u64 { + [ + "network_usage", + "transform_usage", + "transformed_response_usage", + ] + .iter() + .map(|step| { + metrics + .shadow_incompatible_total + .with_label_values(&[*step]) + .get() + }) + .sum() + } + + #[test] + fn returns_real_result_and_counts_divergence() { + let metrics = PricingMetrics::new(&MetricsRegistry::new()); + let shadow = FakeTracker { + network: Err(PricingError::InsufficientCycles), + ..FakeTracker::ok() + }; + let mut tracker = dark_launch(FakeTracker::ok(), shadow, metrics.clone()); + + // The real (always-Ok) result is returned even though the shadow fails. + assert_eq!(tracker.subtract_network_usage(network_usage()), Ok(())); + assert_eq!( + metrics + .shadow_incompatible_total + .with_label_values(&["network_usage"]) + .get(), + 1 + ); + + let _ = tracker.create_payment_receipt(); + assert_eq!(metrics.shadow_requests_total.get(), 1); + } + + #[test] + fn counts_divergence_at_most_once_per_request() { + let metrics = PricingMetrics::new(&MetricsRegistry::new()); + let shadow = FakeTracker { + network: Err(PricingError::InsufficientCycles), + transform: Err(PricingError::InsufficientCycles), + transformed: Err(PricingError::InsufficientCycles), + }; + let mut tracker = dark_launch(FakeTracker::ok(), shadow, metrics.clone()); + + assert_eq!(tracker.subtract_network_usage(network_usage()), Ok(())); + assert_eq!( + tracker.subtract_transform_usage(NumInstructions::from(0)), + Ok(()) + ); + assert_eq!( + tracker.subtract_transformed_response_usage(NumBytes::from(0)), + Ok(()) + ); + + // Only the first divergence is recorded for the request. + assert_eq!(incompatible_count(&metrics), 1); + } + + #[test] + fn no_divergence_when_results_agree() { + let metrics = PricingMetrics::new(&MetricsRegistry::new()); + let mut tracker = dark_launch(FakeTracker::ok(), FakeTracker::ok(), metrics.clone()); + + assert_eq!(tracker.subtract_network_usage(network_usage()), Ok(())); + assert_eq!( + tracker.subtract_transform_usage(NumInstructions::from(0)), + Ok(()) + ); + let _ = tracker.create_payment_receipt(); + + assert_eq!(incompatible_count(&metrics), 0); + assert_eq!(metrics.shadow_requests_total.get(), 1); + } +} diff --git a/rs/https_outcalls/pricing/src/legacy.rs b/rs/https_outcalls/pricing/src/legacy.rs index eb272bc8a37b..6ce72888a588 100644 --- a/rs/https_outcalls/pricing/src/legacy.rs +++ b/rs/https_outcalls/pricing/src/legacy.rs @@ -45,6 +45,13 @@ impl BudgetTracker for LegacyTracker { Ok(()) } + fn subtract_transformed_response_usage( + &mut self, + _transformed_response_size: NumBytes, + ) -> Result<(), PricingError> { + Ok(()) + } + fn create_payment_receipt(&self) -> CanisterHttpPaymentReceipt { // Legacy pricing does not perform cycles accounting, so no cycles // are ever refunded. diff --git a/rs/https_outcalls/pricing/src/lib.rs b/rs/https_outcalls/pricing/src/lib.rs index 827459a9bcdd..70654b653625 100644 --- a/rs/https_outcalls/pricing/src/lib.rs +++ b/rs/https_outcalls/pricing/src/lib.rs @@ -1,12 +1,21 @@ +mod dark_launch; mod legacy; +mod metrics; +mod payg; use std::time::Duration; +use ic_logger::ReplicaLogger; +use ic_metrics::MetricsRegistry; use ic_types::{ NumBytes, NumInstructions, - canister_http::{CanisterHttpPaymentReceipt, CanisterHttpRequestContext}, + canister_http::{CanisterHttpPaymentReceipt, CanisterHttpRequestContext, PricingVersion}, }; + +use dark_launch::DarkLaunchTracker; use legacy::LegacyTracker; +use metrics::PricingMetrics; +use payg::PayAsYouGoTracker; pub trait BudgetTracker: Send { /// Returns the maximum network resources the Adapter is allowed to consume. @@ -26,6 +35,15 @@ pub trait BudgetTracker: Send { /// # Invariants /// - This method returns `Ok(())` if and only if `usage <= get_transform_limit()`. fn subtract_transform_usage(&mut self, usage: NumInstructions) -> Result<(), PricingError>; + /// Deducts the cost of the final (post-transform) response that this replica + /// produced and that will be handed back to the caller. + /// + /// This is the last accounting step and is invoked once the size of the + /// response is known. + fn subtract_transformed_response_usage( + &mut self, + transformed_response_size: NumBytes, + ) -> Result<(), PricingError>; /// Produces the per-replica payment receipt that summarizes the cycles /// accounting outcome of the outcall, given the resources consumed so /// far via the `subtract_*` methods. @@ -39,6 +57,7 @@ pub struct AdapterLimits { pub max_response_time: Duration, } +#[derive(Clone, Copy)] pub struct NetworkUsage { /// The size of the HTTP response, including the headers and the body. pub response_size: NumBytes, @@ -46,16 +65,47 @@ pub struct NetworkUsage { pub response_time: Duration, } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum PricingError { InsufficientCycles, } -pub struct PricingFactory; +/// Builds a [`BudgetTracker`] for each canister HTTP request. +/// +/// The factory is constructed once per replica and holds the shared metrics +/// and logger needed by the dark-launch tracker. +#[derive(Clone)] +pub struct PricingFactory { + metrics: PricingMetrics, + log: ReplicaLogger, +} impl PricingFactory { - pub fn new_tracker(context: &CanisterHttpRequestContext) -> Box { - // TODO(IC-1937): This should take into account context.pricing_version and a replica config. - // Currently, we only support the legacy pricing version. - Box::new(LegacyTracker::new(context.max_response_bytes)) + pub fn new(metrics_registry: &MetricsRegistry, log: ReplicaLogger) -> Self { + Self { + metrics: PricingMetrics::new(metrics_registry), + log, + } + } + + /// Creates the tracker for a request. The subnet size (`N`) needed by the + /// pay-as-you-go formula is read from `context.subnet_size`. + pub fn new_tracker(&self, context: &CanisterHttpRequestContext) -> Box { + match context.pricing_version { + // Legacy pricing is what is actually charged today. We run the + // PayAsYouGo tracker as a shadow next to it so we can measure how + // many requests would become backwards-incompatible under the new + // pricing without changing any observable behaviour. + PricingVersion::Legacy => Box::new(DarkLaunchTracker::new( + Box::new(LegacyTracker::new(context.max_response_bytes)), + Box::new(PayAsYouGoTracker::new(context)), + context.request.sender, + self.metrics.clone(), + self.log.clone(), + )), + // PayAsYouGo requests are not served yet (the client rejects them), + // but we still hand back the matching tracker for completeness. + PricingVersion::PayAsYouGo => Box::new(PayAsYouGoTracker::new(context)), + } } } diff --git a/rs/https_outcalls/pricing/src/metrics.rs b/rs/https_outcalls/pricing/src/metrics.rs new file mode 100644 index 000000000000..df2c83b5829a --- /dev/null +++ b/rs/https_outcalls/pricing/src/metrics.rs @@ -0,0 +1,36 @@ +use ic_metrics::MetricsRegistry; +use prometheus::{IntCounter, IntCounterVec}; + +/// Label identifying the accounting step at which the shadow tracker diverged +/// from the real one. +pub const LABEL_STEP: &str = "step"; + +#[derive(Clone)] +pub struct PricingMetrics { + /// Total number of requests evaluated by the dark-launch budget tracker. + pub shadow_requests_total: IntCounter, + /// Number of requests that would be rejected (pricing error) under the + /// shadow pricing while succeeding under the real pricing, by the + /// accounting step at which the divergence was first observed. + /// + /// The fraction `shadow_incompatible_total / shadow_requests_total` is the + /// share of requests that would NOT be backwards compatible. + pub shadow_incompatible_total: IntCounterVec, +} + +impl PricingMetrics { + pub fn new(metrics_registry: &MetricsRegistry) -> Self { + Self { + shadow_requests_total: metrics_registry.int_counter( + "canister_http_pricing_shadow_requests_total", + "Total canister http requests evaluated by the dark-launch budget tracker.", + ), + shadow_incompatible_total: metrics_registry.int_counter_vec( + "canister_http_pricing_shadow_incompatible_total", + "Canister http requests that would be rejected (pricing error) under the shadow \ + pricing while succeeding under the real pricing, by accounting step.", + &[LABEL_STEP], + ), + } + } +} diff --git a/rs/https_outcalls/pricing/src/payg.rs b/rs/https_outcalls/pricing/src/payg.rs new file mode 100644 index 000000000000..ae882d70c410 --- /dev/null +++ b/rs/https_outcalls/pricing/src/payg.rs @@ -0,0 +1,269 @@ +use std::time::Duration; + +use ic_config::subnet_config::MAX_INSTRUCTIONS_PER_QUERY_MESSAGE; +use ic_types::{ + NumBytes, NumInstructions, + canister_http::{ + CanisterHttpPaymentReceipt, CanisterHttpRequestContext, MAX_CANISTER_HTTP_RESPONSE_BYTES, + Replication, + }, +}; +use ic_types_cycles::Cycles; + +use crate::{AdapterLimits, BudgetTracker, NetworkUsage, PricingError}; + +// Per-replica fee constants. +// +// A request's cost is split into three parts: +// 1. the base cost, subtracted up-front when the request context is created +// (and therefore reflected in `per_replica_allowance`); +// 2. the per-replica cost, accounted for here as-you-go; +// 3. the consensus cost, computed from the aggregated response in the block +// payload (ignored for now). +// +// This tracker only implements the per-replica part. The formula differs +// between fully/non-replicated and flexible outcalls: +// +// Fully/non-replicated per replica: +// 50 * downloaded_bytes_i + 300 * request_ms_i + transform_instructions_i / 13 +// +// Flexible per replica: +// 50 * downloaded_bytes_i + 300 * request_ms_i +// + 50 * transformed_response_bytes_i * N + transform_instructions_i / 13 +const PER_DOWNLOADED_BYTE_FEE: u128 = 50; +const PER_RESPONSE_MS_FEE: u128 = 300; +const TRANSFORM_INSTRUCTION_DIVISOR: u128 = 13; +const FLEXIBLE_PER_TRANSFORMED_BYTE_NODE_FEE: u128 = 50; + +pub struct PayAsYouGoTracker { + /// Number of nodes (`N`) on the subnet. + n: u64, + /// Whether this is a flexible outcall (different per-replica formula). + is_flexible: bool, + /// The cycles budget available to this replica (already net of the base + /// cost, which was subtracted when the context was created). + allowance: u128, + /// The maximum size of the HTTP response, including headers and body. + max_response_size: NumBytes, + /// The cycles charged so far against `allowance`. + spent: u128, +} + +impl PayAsYouGoTracker { + pub fn new(context: &CanisterHttpRequestContext) -> Self { + Self { + n: context.subnet_size as u64, + is_flexible: matches!(context.replication, Replication::Flexible { .. }), + allowance: context.refund_status.per_replica_allowance.get(), + max_response_size: context + .max_response_bytes + .unwrap_or(NumBytes::from(MAX_CANISTER_HTTP_RESPONSE_BYTES)), + spent: 0, + } + } + + /// Charges `amount` against the budget. Returns an error if the total spent + /// now exceeds the available allowance. + fn charge(&mut self, amount: u128) -> Result<(), PricingError> { + self.spent = self.spent.saturating_add(amount); + if self.spent > self.allowance { + Err(PricingError::InsufficientCycles) + } else { + Ok(()) + } + } +} + +impl BudgetTracker for PayAsYouGoTracker { + fn get_adapter_limits(&self) -> AdapterLimits { + AdapterLimits { + max_response_size: self.max_response_size, + // Mirror the legacy limit: the server enforces a 30s timeout, so 60s + // here is just a safety margin. + max_response_time: Duration::from_secs(60), + } + } + + fn subtract_network_usage(&mut self, network_usage: NetworkUsage) -> Result<(), PricingError> { + let NetworkUsage { + response_size, + response_time, + } = network_usage; + let cost = PER_DOWNLOADED_BYTE_FEE + .saturating_mul(response_size.get() as u128) + .saturating_add(PER_RESPONSE_MS_FEE.saturating_mul(response_time.as_millis() as u128)); + self.charge(cost) + } + + fn get_transform_limit(&self) -> NumInstructions { + MAX_INSTRUCTIONS_PER_QUERY_MESSAGE + } + + fn subtract_transform_usage(&mut self, usage: NumInstructions) -> Result<(), PricingError> { + let cost = (usage.get() as u128) / TRANSFORM_INSTRUCTION_DIVISOR; + self.charge(cost) + } + + fn subtract_transformed_response_usage( + &mut self, + transformed_response_size: NumBytes, + ) -> Result<(), PricingError> { + // For fully/non-replicated outcalls the transformed-response term is a + // consensus cost (ignored here for now). For flexible outcalls each + // replica is charged 50 * transformed_response_bytes_i * N. + if !self.is_flexible { + return Ok(()); + } + let cost = FLEXIBLE_PER_TRANSFORMED_BYTE_NODE_FEE + .saturating_mul(transformed_response_size.get() as u128) + .saturating_mul(self.n as u128); + self.charge(cost) + } + + fn create_payment_receipt(&self) -> CanisterHttpPaymentReceipt { + CanisterHttpPaymentReceipt { + refund: Cycles::new(self.allowance.saturating_sub(self.spent)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ic_types::{ + CanisterId, NodeId, PrincipalId, + canister_http::{CanisterHttpMethod, PricingVersion, RefundStatus}, + messages::{CallbackId, NO_DEADLINE, Request}, + time::UNIX_EPOCH, + }; + use std::collections::BTreeSet; + + fn context( + replication: Replication, + per_replica_allowance: u128, + subnet_size: usize, + ) -> CanisterHttpRequestContext { + CanisterHttpRequestContext { + request: Request { + receiver: CanisterId::from_u64(1), + sender: CanisterId::from_u64(1), + sender_reply_callback: CallbackId::from(1), + payment: Cycles::zero(), + method_name: String::new(), + method_payload: Vec::new(), + metadata: Default::default(), + deadline: NO_DEADLINE, + }, + url: String::new(), + max_response_bytes: None, + headers: vec![], + body: None, + http_method: CanisterHttpMethod::GET, + transform: None, + time: UNIX_EPOCH, + replication, + pricing_version: PricingVersion::Legacy, + refund_status: RefundStatus { + refundable_cycles: Cycles::new(per_replica_allowance), + per_replica_allowance: Cycles::new(per_replica_allowance), + refunded_cycles: Cycles::zero(), + refunding_nodes: BTreeSet::new(), + }, + subnet_size, + } + } + + fn flexible(n: usize) -> Replication { + let committee: BTreeSet = (0..n as u64) + .map(|i| NodeId::from(PrincipalId::new_node_test_id(i))) + .collect(); + Replication::Flexible { + committee, + min_responses: 1, + max_responses: n as u32, + } + } + + #[test] + fn does_not_charge_base_cost() { + // The base cost is handled at context creation, so a freshly created + // tracker has spent nothing and a zero-usage request refunds everything. + let ctx = context(Replication::FullyReplicated, 1_000_000, 13); + let tracker = PayAsYouGoTracker::new(&ctx); + assert_eq!(tracker.spent, 0); + assert_eq!( + tracker.create_payment_receipt().refund, + Cycles::new(1_000_000) + ); + } + + #[test] + fn charges_per_replica_cost_fully_replicated() { + let allowance = 1_000_000_000u128; + let ctx = context(Replication::FullyReplicated, allowance, 13); + let mut tracker = PayAsYouGoTracker::new(&ctx); + + let response_size = 1_000u64; + let response_ms = 2_000u128; + assert_eq!( + tracker.subtract_network_usage(NetworkUsage { + response_size: NumBytes::from(response_size), + response_time: Duration::from_millis(response_ms as u64), + }), + Ok(()) + ); + let network = + PER_DOWNLOADED_BYTE_FEE * response_size as u128 + PER_RESPONSE_MS_FEE * response_ms; + + let instructions = 13_000u64; + assert_eq!( + tracker.subtract_transform_usage(NumInstructions::from(instructions)), + Ok(()) + ); + let transform = instructions as u128 / TRANSFORM_INSTRUCTION_DIVISOR; + + // For fully-replicated requests the transformed-response term is a + // consensus cost and must not be charged here. + assert_eq!( + tracker.subtract_transformed_response_usage(NumBytes::from(5_000)), + Ok(()) + ); + + assert_eq!(tracker.spent, network + transform); + assert_eq!( + tracker.create_payment_receipt().refund, + Cycles::new(allowance - network - transform) + ); + } + + #[test] + fn charges_transformed_response_for_flexible() { + let allowance = 1_000_000_000u128; + let n = 13usize; + let ctx = context(flexible(n), allowance, n); + let mut tracker = PayAsYouGoTracker::new(&ctx); + + let transformed_size = 500u64; + assert_eq!( + tracker.subtract_transformed_response_usage(NumBytes::from(transformed_size)), + Ok(()) + ); + let expected = + FLEXIBLE_PER_TRANSFORMED_BYTE_NODE_FEE * transformed_size as u128 * n as u128; + assert_eq!(tracker.spent, expected); + } + + #[test] + fn returns_pricing_error_when_budget_is_exceeded() { + let ctx = context(Replication::FullyReplicated, 100, 13); + let mut tracker = PayAsYouGoTracker::new(&ctx); + assert_eq!( + tracker.subtract_network_usage(NetworkUsage { + response_size: NumBytes::from(1_000), + response_time: Duration::ZERO, + }), + Err(PricingError::InsufficientCycles) + ); + assert_eq!(tracker.create_payment_receipt().refund, Cycles::zero()); + } +} diff --git a/rs/interfaces/src/canister_http.rs b/rs/interfaces/src/canister_http.rs index da03d4090fd3..fd0320d880e1 100644 --- a/rs/interfaces/src/canister_http.rs +++ b/rs/interfaces/src/canister_http.rs @@ -46,6 +46,14 @@ pub enum InvalidCanisterHttpPayloadReason { metadata_is_reject: bool, calculated_is_reject: bool, }, + /// The collective initial refund included in the payload does not match the + /// value recomputed from the request context's subnet size and the signed + /// per-replica receipts. + InitialRefundMismatch { + callback_id: CallbackId, + payload_refund: Cycles, + computed_refund: Cycles, + }, /// A timeout refers to a CallbackId that is unknown by the StateManager UnknownCallbackId(CallbackId), /// A CallbackId was included as a timeout, however the Request has not timed out at all diff --git a/rs/messaging/src/canister_http_refunds.rs b/rs/messaging/src/canister_http_refunds.rs new file mode 100644 index 000000000000..fac7f9b47174 --- /dev/null +++ b/rs/messaging/src/canister_http_refunds.rs @@ -0,0 +1,212 @@ +use ic_logger::{ReplicaLogger, error}; +use ic_replicated_state::ReplicatedState; +use ic_types::messages::CallbackId; +use ic_types::{CanisterId, Time, batch::CanisterHttpRefunds}; +use ic_types_cycles::Cycles; +use std::collections::BTreeMap; + +/// Applies the HTTP outcall refunds carried by `refunds` to the calling +/// canisters by crediting their cycle balances directly. +/// +/// Refunds are only applied to contexts that have already been responded to, +/// i.e. those in `delivered_canister_http_request_contexts`. This function must +/// therefore run *after* the round has executed and moved the just-responded +/// contexts into the delivered collection. +/// +/// Two kinds of refunds are handled: +/// - *initial* refunds, where the set of nodes that produced a response +/// collectively refund one specific amount; +/// - *asynchronous* refunds, where individual nodes each refund some cycles, +/// possibly in a later block than the response. A node is credited at most +/// once (tracked via `refunding_nodes`), which makes these idempotent. +/// +/// In all cases the accumulated `refunded_cycles` is capped so that it never +/// exceeds the context's `refundable_cycles`; only the amount actually applied +/// is credited to the canister. +pub(crate) fn deliver_canister_http_refunds( + refunds: &CanisterHttpRefunds, + state: &mut ReplicatedState, + log: &ReplicaLogger, +) { + // First update the contexts' refund status and accumulate the cycles to + // credit per canister. Crediting happens in a second pass to avoid borrowing + // both the subnet call context manager and the canister states at once. + let mut credits: BTreeMap = BTreeMap::new(); + { + let contexts = &mut state + .metadata + .subnet_call_context_manager + .delivered_canister_http_request_contexts; + + // The context should still be around (delivered contexts are only + // removed on timeout, which is long after the response). A missing + // context means the refund arrived unexpectedly late; log and drop it. + let log_missing = |callback: CallbackId| { + error!( + log, + "Received HTTP outcall refund for callback {} with no matching delivered request \ + context; dropping it.", + callback + ); + }; + + for refund in &refunds.initial { + let Some(context) = contexts.get_mut(&refund.callback) else { + log_missing(refund.callback); + continue; + }; + // The initial refund is the collective refund of a set of nodes. Only + // apply it if no node has refunded yet; otherwise some of its + // contributing nodes may already have been credited (via an + // asynchronous refund) and applying it would double-credit. + if !context.refund_status.refunding_nodes.is_empty() { + error!( + log, + "Received an initial HTTP outcall refund for callback {} but {} node(s) have \ + already refunded; dropping it to avoid double-crediting.", + refund.callback, + context.refund_status.refunding_nodes.len() + ); + continue; + } + let applied = apply_capped( + &mut context.refund_status, + refund.amount, + refund.callback, + log, + ); + // Record the contributing nodes so that a later asynchronous refund + // from any of them is not credited twice. + context + .refund_status + .refunding_nodes + .extend(refund.nodes.iter()); + *credits.entry(context.request.sender).or_default() += applied; + } + + for refund in &refunds.asynchronous { + let Some(context) = contexts.get_mut(&refund.callback) else { + log_missing(refund.callback); + continue; + }; + let mut applied = Cycles::zero(); + for (node_id, cycles) in &refund.shares { + if context.refund_status.refunding_nodes.insert(*node_id) { + applied += + apply_capped(&mut context.refund_status, *cycles, refund.callback, log); + } else { + error!( + log, + "Node {} attempted to refund again for HTTP outcall callback {}; \ + ignoring the duplicate refund of {} cycles.", + node_id, + refund.callback, + cycles + ); + } + } + *credits.entry(context.request.sender).or_default() += applied; + } + } + + credit_canisters(state, credits, log); +} + +/// Times out delivered `CanisterHttpRequestContext`s and refunds the calling +/// canister for the replicas that never responded. +/// +/// Like [`deliver_canister_http_refunds`], this must run *after* the round has +/// executed, so that just-responded contexts have been moved into the delivered +/// collection. +/// +/// A delivered context is kept around until it times out so that late refunds +/// can still be applied. The replicas that did respond refund their unused +/// per-replica allowance through [`deliver_canister_http_refunds`]; the +/// remaining `subnet_size - refunding_nodes.len()` replicas never did, so on +/// timeout their full per-replica allowance is returned to the caller (still +/// capped so that `refunded_cycles` never exceeds `refundable_cycles`). +pub(crate) fn refund_timed_out_canister_http_contexts( + state: &mut ReplicatedState, + current_time: Time, + log: &ReplicaLogger, +) { + let timed_out = state + .metadata + .subnet_call_context_manager + .time_out_delivered_canister_http_request_contexts(current_time); + + let mut credits: BTreeMap = BTreeMap::new(); + for mut context in timed_out { + let unresponsive_replicas = context + .subnet_size + .saturating_sub(context.refund_status.refunding_nodes.len()); + let refund = context.refund_status.per_replica_allowance * unresponsive_replicas; + let applied = apply_capped( + &mut context.refund_status, + refund, + context.request.sender_reply_callback, + log, + ); + *credits.entry(context.request.sender).or_default() += applied; + } + + credit_canisters(state, credits, log); +} + +/// Records `amount` as refunded against `refund_status`, capped so that +/// `refunded_cycles` never exceeds `refundable_cycles`. Returns the amount that +/// was actually applied (and therefore should be credited to the canister). +/// +/// Capping is not expected to happen (the per-replica allowances are sized so +/// that the sum of all refunds stays within `refundable_cycles`), so an error +/// is logged if it does. +fn apply_capped( + refund_status: &mut ic_types::canister_http::RefundStatus, + amount: Cycles, + callback: CallbackId, + log: &ReplicaLogger, +) -> Cycles { + let room = refund_status.refundable_cycles - refund_status.refunded_cycles; + let applied = std::cmp::min(amount, room); + if applied < amount { + error!( + log, + "HTTP outcall refund for callback {} exceeded the refundable amount and was capped: \ + requested {}, applied {} (refundable_cycles {}, already refunded_cycles {}).", + callback, + amount, + applied, + refund_status.refundable_cycles, + refund_status.refunded_cycles + ); + } + refund_status.refunded_cycles += applied; + applied +} + +/// Credits the accumulated per-canister refund `credits` to the corresponding +/// canisters' balances, logging any whose canister no longer exists. +/// +/// `credits` is keyed by canister, so `canister_state_make_mut` (which heats the +/// canister and may clone its state) is called at most once per canister. +fn credit_canisters( + state: &mut ReplicatedState, + credits: BTreeMap, + log: &ReplicaLogger, +) { + for (sender, amount) in credits { + if amount.is_zero() { + continue; + } + match state.canister_state_make_mut(&sender) { + Some(canister) => canister.system_state.add_cycles(amount), + None => error!( + log, + "Canister {} for an HTTP outcall no longer exists; \ + dropping refund of {} cycles.", + sender, + amount + ), + } + } +} diff --git a/rs/messaging/src/lib.rs b/rs/messaging/src/lib.rs index 03c15a6643c8..b51160b4ebc3 100644 --- a/rs/messaging/src/lib.rs +++ b/rs/messaging/src/lib.rs @@ -2,6 +2,7 @@ //! (ii) inter-canister message routing within a subnet and across subnets (also //! known as cross-net or XNet transfer). +mod canister_http_refunds; mod message_routing; pub(crate) mod routing; mod scheduling; diff --git a/rs/messaging/src/message_routing/tests.rs b/rs/messaging/src/message_routing/tests.rs index 6734ff421580..0791df82377a 100644 --- a/rs/messaging/src/message_routing/tests.rs +++ b/rs/messaging/src/message_routing/tests.rs @@ -32,7 +32,7 @@ use ic_test_utilities_registry::{SubnetRecordBuilder, get_mainnet_delta_00_6d_c1 use ic_test_utilities_state::CanisterStateBuilder; use ic_test_utilities_types::batch::BatchBuilder; use ic_test_utilities_types::ids::{canister_test_id, node_test_id, subnet_test_id, user_test_id}; -use ic_types::batch::{Batch, BatchMessages, BlockmakerMetrics}; +use ic_types::batch::{Batch, BatchMessages, BlockmakerMetrics, CanisterHttpRefunds}; use ic_types::crypto::AlgorithmId; use ic_types::crypto::threshold_sig::ni_dkg::{NiDkgTag, NiDkgTranscript}; use ic_types::time::Time; @@ -1040,6 +1040,7 @@ fn try_read_registry_succeeds_with_fully_specified_registry_records() { content: BatchContent::Data { batch_messages: BatchMessages::default(), consensus_responses: Vec::new(), + refunds: CanisterHttpRefunds::default(), chain_key_data: Default::default(), requires_full_state_hash: false, }, @@ -2197,6 +2198,7 @@ fn process_batch_updates_subnet_metrics() { content: BatchContent::Data { batch_messages: BatchMessages::default(), consensus_responses: Vec::new(), + refunds: CanisterHttpRefunds::default(), chain_key_data: Default::default(), requires_full_state_hash: false, }, @@ -2267,6 +2269,7 @@ fn process_batch_resets_split_marker() { content: BatchContent::Data { batch_messages: BatchMessages::default(), consensus_responses: Vec::new(), + refunds: CanisterHttpRefunds::default(), chain_key_data: Default::default(), requires_full_state_hash: false, }, diff --git a/rs/messaging/src/state_machine.rs b/rs/messaging/src/state_machine.rs index e46de6a9ad66..5dfeee8d8595 100644 --- a/rs/messaging/src/state_machine.rs +++ b/rs/messaging/src/state_machine.rs @@ -1,3 +1,6 @@ +use crate::canister_http_refunds::{ + deliver_canister_http_refunds, refund_timed_out_canister_http_contexts, +}; use crate::message_routing::{ ApiBoundaryNodes, CRITICAL_ERROR_INDUCT_RESPONSE_FAILED, MessageRoutingMetrics, NodePublicKeys, }; @@ -136,29 +139,36 @@ impl StateMachine for StateMachineImpl { .observe_no_canister_allocation_range(&self.log, message); } - let (batch_messages, mut consensus_responses, chain_key_data, requires_full_state_hash) = - match batch.content { - // Regular batch, proceed with round execution. - BatchContent::Data { - batch_messages, - consensus_responses, - chain_key_data, - requires_full_state_hash, - } => ( - batch_messages, - consensus_responses, - chain_key_data, - requires_full_state_hash, - ), + let ( + batch_messages, + mut consensus_responses, + refunds, + chain_key_data, + requires_full_state_hash, + ) = match batch.content { + // Regular batch, proceed with round execution. + BatchContent::Data { + batch_messages, + consensus_responses, + refunds, + chain_key_data, + requires_full_state_hash, + } => ( + batch_messages, + consensus_responses, + refunds, + chain_key_data, + requires_full_state_hash, + ), - // Consensus is telling us to split, do so and return the new state. - BatchContent::Splitting { - new_subnet_id, - other_subnet_id, - } => { - return self.online_split(state, new_subnet_id, other_subnet_id); - } - }; + // Consensus is telling us to split, do so and return the new state. + BatchContent::Splitting { + new_subnet_id, + other_subnet_id, + } => { + return self.online_split(state, new_subnet_id, other_subnet_id); + } + }; // Get query stats from blocks and add them to the state, so that they can be aggregated later. if let Some(query_stats) = &batch_messages.query_stats { @@ -235,7 +245,7 @@ impl StateMachine for StateMachineImpl { next_checkpoint_round: ExecutionRound::from(b.next_checkpoint_height.get()), current_interval_length: ExecutionRound::from(b.current_interval_length.get()), }); - let state_after_execution = self.scheduler.execute_round( + let mut state_after_execution = self.scheduler.execute_round( state_with_messages, batch.randomness, chain_key_data, @@ -254,6 +264,14 @@ impl StateMachine for StateMachineImpl { } execution_timer.observe_duration(); + // Apply HTTP outcall refunds and time out delivered request contexts. + // This runs after execution, so that contexts that were just responded + // to during the round have already been moved into the delivered + // collection and can therefore receive their refunds. + deliver_canister_http_refunds(&refunds, &mut state_after_execution, &self.log); + let batch_time = state_after_execution.time(); + refund_timed_out_canister_http_contexts(&mut state_after_execution, batch_time, &self.log); + // Postprocess the state: route messages into streams. let message_routing_timer = self.metrics.start_phase_timer(PHASE_MESSAGE_ROUTING); #[cfg(debug_assertions)] diff --git a/rs/protobuf/def/state/metadata/v1/metadata.proto b/rs/protobuf/def/state/metadata/v1/metadata.proto index 0ab9cd576e93..40f3f9bb0a9b 100644 --- a/rs/protobuf/def/state/metadata/v1/metadata.proto +++ b/rs/protobuf/def/state/metadata/v1/metadata.proto @@ -161,6 +161,7 @@ message CanisterHttpRequestContext { optional Replication replication = 11; optional PricingVersion pricing_version = 12; optional RefundStatus refund_status = 13; + uint32 subnet_size = 14; reserved 5; } @@ -306,6 +307,7 @@ message SubnetCallContextManager { repeated ReshareChainKeyContextTree reshare_chain_key_contexts = 17; repeated SignWithThresholdContextTree sign_with_threshold_contexts = 18; repeated PreSignatureStashTree pre_signature_stashes = 19; + repeated CanisterHttpRequestContextTree delivered_canister_http_request_contexts = 20; } message SubnetMetrics { diff --git a/rs/protobuf/def/types/v1/canister_http.proto b/rs/protobuf/def/types/v1/canister_http.proto index f2587b029219..86ea00e919b2 100644 --- a/rs/protobuf/def/types/v1/canister_http.proto +++ b/rs/protobuf/def/types/v1/canister_http.proto @@ -70,6 +70,7 @@ message CanisterHttpResponseWithConsensus { repeated CanisterHttpResponseSignature signatures = 7; uint32 content_size = 9; bool is_reject = 10; + state.queues.v1.Cycles initial_refund = 11; } message CanisterHttpShare { @@ -89,6 +90,7 @@ message FlexibleCanisterHttpResponseWithProof { message FlexibleCanisterHttpResponses { uint64 callback_id = 1; repeated FlexibleCanisterHttpResponseWithProof responses = 2; + state.queues.v1.Cycles initial_refund = 3; } message FlexibleCanisterHttpTimeout {} @@ -101,6 +103,7 @@ message FlexibleCanisterHttpResponsesTooLarge { message FlexibleCanisterHttpTooManyRejects { repeated FlexibleCanisterHttpResponseWithProof reject_responses = 1; + state.queues.v1.Cycles initial_refund = 2; } message FlexibleCanisterHttpError { diff --git a/rs/protobuf/src/gen/state/state.metadata.v1.rs b/rs/protobuf/src/gen/state/state.metadata.v1.rs index d21e747282a7..02fac8dce430 100644 --- a/rs/protobuf/src/gen/state/state.metadata.v1.rs +++ b/rs/protobuf/src/gen/state/state.metadata.v1.rs @@ -227,6 +227,8 @@ pub struct CanisterHttpRequestContext { pub pricing_version: ::core::option::Option, #[prost(message, optional, tag = "13")] pub refund_status: ::core::option::Option, + #[prost(uint32, tag = "14")] + pub subnet_size: u32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct RefundStatus { @@ -453,6 +455,9 @@ pub struct SubnetCallContextManager { pub sign_with_threshold_contexts: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "19")] pub pre_signature_stashes: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "20")] + pub delivered_canister_http_request_contexts: + ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SubnetMetrics { diff --git a/rs/protobuf/src/gen/types/types.v1.rs b/rs/protobuf/src/gen/types/types.v1.rs index 64f8f32a02b4..a4d42ee5962b 100644 --- a/rs/protobuf/src/gen/types/types.v1.rs +++ b/rs/protobuf/src/gen/types/types.v1.rs @@ -609,6 +609,8 @@ pub struct CanisterHttpResponseWithConsensus { pub content_size: u32, #[prost(bool, tag = "10")] pub is_reject: bool, + #[prost(message, optional, tag = "11")] + pub initial_refund: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CanisterHttpShare { @@ -635,6 +637,8 @@ pub struct FlexibleCanisterHttpResponses { pub callback_id: u64, #[prost(message, repeated, tag = "2")] pub responses: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub initial_refund: ::core::option::Option, } #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct FlexibleCanisterHttpTimeout {} @@ -651,6 +655,8 @@ pub struct FlexibleCanisterHttpResponsesTooLarge { pub struct FlexibleCanisterHttpTooManyRejects { #[prost(message, repeated, tag = "1")] pub reject_responses: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub initial_refund: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct FlexibleCanisterHttpError { diff --git a/rs/replay/src/player.rs b/rs/replay/src/player.rs index f1a266d88c70..cd2e6bb5b235 100644 --- a/rs/replay/src/player.rs +++ b/rs/replay/src/player.rs @@ -60,7 +60,7 @@ use ic_state_manager::StateManagerImpl; use ic_types::{ CryptoHashOfPartialState, CryptoHashOfState, Height, NodeId, PrincipalId, Randomness, RegistryVersion, ReplicaVersion, SubnetId, Time, UserId, - batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics}, + batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics, CanisterHttpRefunds}, consensus::{ CatchUpContentProtobufBytes, CatchUpPackage, HasHeight, HasVersion, certification::{Certification, CertificationContent, CertificationShare}, @@ -799,6 +799,7 @@ impl Player { }, chain_key_data: Default::default(), consensus_responses: Vec::new(), + refunds: CanisterHttpRefunds::default(), requires_full_state_hash: false, }, // Use a fake randomness here since we don't have random tape for extra messages @@ -839,6 +840,7 @@ impl Player { batch_messages: BatchMessages::default(), chain_key_data: Default::default(), consensus_responses: Vec::new(), + refunds: CanisterHttpRefunds::default(), requires_full_state_hash: !have_incomplete_msgs, }; extra_batch.batch_number = message_routing.expected_batch_height(); diff --git a/rs/replicated_state/src/metadata_state/subnet_call_context_manager.rs b/rs/replicated_state/src/metadata_state/subnet_call_context_manager.rs index e0d904d89fa3..f8e2736db56f 100644 --- a/rs/replicated_state/src/metadata_state/subnet_call_context_manager.rs +++ b/rs/replicated_state/src/metadata_state/subnet_call_context_manager.rs @@ -23,6 +23,7 @@ use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, convert::{From, TryFrom}, sync::Arc, + time::Duration, }; /// ECDSA message hash size in bytes. @@ -31,6 +32,13 @@ const MESSAGE_HASH_SIZE: usize = 32; /// Threshold algorithm nonce size in bytes. const NONCE_SIZE: usize = 32; +/// How long a `CanisterHttpRequestContext` whose response was already delivered +/// to execution is retained before being removed. +/// +/// The timeout is measured against the `time` of the original +/// `CanisterHttpRequestContext` and the batch time, not wall-clock time. +pub const DELIVERED_CANISTER_HTTP_REQUEST_CONTEXT_TIMEOUT: Duration = Duration::from_secs(2 * 60); + pub enum SubnetCallContext { SetupInitialDKG(SetupInitialDkgContext), CanisterHttpRequest(CanisterHttpRequestContext), @@ -217,6 +225,10 @@ pub struct SubnetCallContextManager { pub setup_initial_dkg_contexts: BTreeMap, pub sign_with_threshold_contexts: BTreeMap, pub canister_http_request_contexts: BTreeMap, + /// `CanisterHttpRequestContext`s whose responses have already been delivered + /// to execution. They are kept here until they time out (see + /// [`DELIVERED_CANISTER_HTTP_REQUEST_CONTEXT_TIMEOUT`]). + pub delivered_canister_http_request_contexts: BTreeMap, pub reshare_chain_key_contexts: BTreeMap, pub bitcoin_get_successors_contexts: BTreeMap, pub bitcoin_send_transaction_internal_contexts: @@ -315,6 +327,11 @@ impl SubnetCallContextManager { context.request.sender_reply_callback, context.request.sender ); + // Move the context into the collection of contexts that + // have already been responded to, where it remains until + // it times out. + self.delivered_canister_http_request_contexts + .insert(callback_id, context.clone()); SubnetCallContext::CanisterHttpRequest(context) }) }) @@ -346,6 +363,33 @@ impl SubnetCallContextManager { }) } + /// Removes all delivered `CanisterHttpRequestContext`s that have been around + /// for longer than [`DELIVERED_CANISTER_HTTP_REQUEST_CONTEXT_TIMEOUT`] and + /// returns them, so that the caller can refund the per-replica allowance of + /// the replicas that never responded. + /// + /// The timeout is measured against the `time` recorded in the original + /// `CanisterHttpRequestContext` and the provided `current_time` (the batch + /// time). + pub fn time_out_delivered_canister_http_request_contexts( + &mut self, + current_time: Time, + ) -> Vec { + let mut timed_out = Vec::new(); + self.delivered_canister_http_request_contexts + .retain(|_callback_id, context| { + if current_time.saturating_duration_since(context.time) + < DELIVERED_CANISTER_HTTP_REQUEST_CONTEXT_TIMEOUT + { + true + } else { + timed_out.push(context.clone()); + false + } + }); + timed_out + } + pub fn push_install_code_call(&mut self, call: InstallCodeCall) -> InstallCodeCallId { self.canister_management_calls.push_install_code_call(call) } @@ -743,6 +787,7 @@ mod testing { setup_initial_dkg_contexts: Default::default(), sign_with_threshold_contexts: Default::default(), canister_http_request_contexts: Default::default(), + delivered_canister_http_request_contexts: Default::default(), reshare_chain_key_contexts: Default::default(), bitcoin_get_successors_contexts: Default::default(), bitcoin_send_transaction_internal_contexts: Default::default(), diff --git a/rs/replicated_state/src/metadata_state/subnet_call_context_manager/proto.rs b/rs/replicated_state/src/metadata_state/subnet_call_context_manager/proto.rs index 5e7aea110d45..9ddd37b0b10c 100644 --- a/rs/replicated_state/src/metadata_state/subnet_call_context_manager/proto.rs +++ b/rs/replicated_state/src/metadata_state/subnet_call_context_manager/proto.rs @@ -59,6 +59,16 @@ impl From<&SubnetCallContextManager> for pb_metadata::SubnetCallContextManager { }, ) .collect(), + delivered_canister_http_request_contexts: item + .delivered_canister_http_request_contexts + .iter() + .map( + |(callback_id, context)| pb_metadata::CanisterHttpRequestContextTree { + callback_id: callback_id.get(), + context: Some(context.into()), + }, + ) + .collect(), bitcoin_get_successors_contexts: item .bitcoin_get_successors_contexts .iter() @@ -188,6 +198,17 @@ impl TryFrom<(Time, pb_metadata::SubnetCallContextManager)> for SubnetCallContex canister_http_request_contexts.insert(CallbackId::new(entry.callback_id), context); } + let mut delivered_canister_http_request_contexts = + BTreeMap::::new(); + for entry in item.delivered_canister_http_request_contexts { + let context: CanisterHttpRequestContext = try_from_option_field( + entry.context, + "SystemMetadata::DeliveredCanisterHttpRequestContext", + )?; + delivered_canister_http_request_contexts + .insert(CallbackId::new(entry.callback_id), context); + } + let mut reshare_chain_key_contexts = BTreeMap::::new(); for entry in item.reshare_chain_key_contexts { let pb_context = @@ -261,6 +282,7 @@ impl TryFrom<(Time, pb_metadata::SubnetCallContextManager)> for SubnetCallContex setup_initial_dkg_contexts, sign_with_threshold_contexts, canister_http_request_contexts, + delivered_canister_http_request_contexts, bitcoin_get_successors_contexts, bitcoin_send_transaction_internal_contexts, canister_management_calls: CanisterManagementCalls { diff --git a/rs/replicated_state/src/metadata_state/tests.rs b/rs/replicated_state/src/metadata_state/tests.rs index 6407dbca03e1..05f68693fc12 100644 --- a/rs/replicated_state/src/metadata_state/tests.rs +++ b/rs/replicated_state/src/metadata_state/tests.rs @@ -868,6 +868,7 @@ fn subnet_call_contexts_deserialization() { replication: Replication::FullyReplicated, pricing_version: PricingVersion::Legacy, refund_status: RefundStatus::default(), + subnet_size: 4, }; subnet_call_context_manager.push_context(SubnetCallContext::CanisterHttpRequest( canister_http_request, diff --git a/rs/state_machine_tests/src/lib.rs b/rs/state_machine_tests/src/lib.rs index 2e4bd98268e5..b325db2f1e0b 100644 --- a/rs/state_machine_tests/src/lib.rs +++ b/rs/state_machine_tests/src/lib.rs @@ -145,8 +145,8 @@ use ic_types::{ SubnetId, UserId, artifact::IngressMessageId, batch::{ - Batch, BatchContent, BatchMessages, BatchSummary, BlockmakerMetrics, ChainKeyData, - ConsensusResponse, QueryStatsPayload, SelfValidatingPayload, TotalQueryStats, + Batch, BatchContent, BatchMessages, BatchSummary, BlockmakerMetrics, CanisterHttpRefunds, + ChainKeyData, ConsensusResponse, QueryStatsPayload, SelfValidatingPayload, TotalQueryStats, ValidationContext, XNetPayload, }, canister_http::{ @@ -1924,7 +1924,7 @@ impl StateMachine { let xnet_payload = batch_payload.xnet.clone(); let ingress = &batch_payload.ingress; let ingress_messages = ingress.clone().try_into().unwrap(); - let (http_responses, _) = + let (http_responses, http_refunds, _) = CanisterHttpPayloadBuilderImpl::into_messages(&batch_payload.canister_http); let inducted: Vec<_> = http_responses .clone() @@ -1983,6 +1983,7 @@ impl StateMachine { .with_ingress_messages(ingress_messages) .with_xnet_payload(xnet_payload) .with_consensus_responses(consensus_responses) + .with_refunds(http_refunds) .with_query_stats(query_stats) .with_self_validating(self_validating); if let Some(blockmaker_metrics) = blockmaker_metrics { @@ -3104,6 +3105,7 @@ impl StateMachine { nidkg_ids: self.ni_dkg_ids.clone(), }, consensus_responses: payload.consensus_responses, + refunds: payload.refunds, requires_full_state_hash, }; let blockmaker_metrics = payload @@ -5418,6 +5420,7 @@ pub struct PayloadBuilder { ingress_messages: Vec, xnet_payload: XNetPayload, consensus_responses: Vec, + refunds: CanisterHttpRefunds, query_stats: Option, self_validating: Option, blockmaker_metrics: Option, @@ -5431,6 +5434,7 @@ impl Default for PayloadBuilder { ingress_messages: Default::default(), xnet_payload: Default::default(), consensus_responses: Default::default(), + refunds: Default::default(), query_stats: Default::default(), self_validating: Default::default(), blockmaker_metrics: Default::default(), @@ -5488,6 +5492,10 @@ impl PayloadBuilder { } } + pub fn with_refunds(self, refunds: CanisterHttpRefunds) -> Self { + Self { refunds, ..self } + } + pub fn with_query_stats(self, query_stats: Option) -> Self { Self { query_stats, diff --git a/rs/test_utilities/execution_environment/src/lib.rs b/rs/test_utilities/execution_environment/src/lib.rs index 33fdd2b0437a..a9698d1d23ab 100644 --- a/rs/test_utilities/execution_environment/src/lib.rs +++ b/rs/test_utilities/execution_environment/src/lib.rs @@ -1549,7 +1549,7 @@ impl ExecutionTest { let subnet_size = self.subnet_size(); let cost_schedule = self.cost_schedule(); let message = match message { - SubnetMessage::Response(_) => return NominalCycles::zero(), + SubnetMessage::ConsensusResponse(_) => return NominalCycles::zero(), SubnetMessage::Request(request) => CanisterCall::Request(request), SubnetMessage::Ingress(ingress) => CanisterCall::Ingress(ingress), }; @@ -1661,7 +1661,7 @@ impl ExecutionTest { let cycles_used = cycles_used_after - cycles_used_before; if instructions_used.get() != 0 { let method_name = match message { - SubnetMessage::Response(_) => None, + SubnetMessage::ConsensusResponse(_) => None, SubnetMessage::Request(ref request) => Some(request.method_name.clone()), SubnetMessage::Ingress(ref ingress) => Some(ingress.method_name.clone()), }; @@ -3241,7 +3241,7 @@ pub fn get_output_messages(state: &mut ReplicatedState) -> Vec<(CanisterId, Requ fn get_effective_canister_id(message: SubnetMessage) -> Option { match message { - SubnetMessage::Response(_) => None, + SubnetMessage::ConsensusResponse(_) => None, SubnetMessage::Request(request) => request.extract_effective_canister_id(), SubnetMessage::Ingress(ingress) => { let signed_ingress_content = SignedIngressContent::new_for_testing( @@ -3260,7 +3260,7 @@ fn get_effective_canister_id(message: SubnetMessage) -> Option { fn check_is_install_code(message: SubnetMessage) -> bool { let message = match message { - SubnetMessage::Response(_) => return false, + SubnetMessage::ConsensusResponse(_) => return false, SubnetMessage::Request(request) => CanisterCall::Request(request), SubnetMessage::Ingress(ingress) => CanisterCall::Ingress(ingress), }; diff --git a/rs/test_utilities/types/src/batch/batch_builder.rs b/rs/test_utilities/types/src/batch/batch_builder.rs index 80d3c863df94..5c003a1c16cb 100644 --- a/rs/test_utilities/types/src/batch/batch_builder.rs +++ b/rs/test_utilities/types/src/batch/batch_builder.rs @@ -1,6 +1,6 @@ use ic_types::{ Height, Randomness, RegistryVersion, ReplicaVersion, Time, - batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics}, + batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics, CanisterHttpRefunds}, time::UNIX_EPOCH, }; @@ -19,6 +19,7 @@ impl Default for BatchBuilder { batch_messages: BatchMessages::default(), chain_key_data: Default::default(), consensus_responses: vec![], + refunds: CanisterHttpRefunds::default(), requires_full_state_hash: false, }, randomness: Randomness::from([0; 32]), @@ -49,6 +50,7 @@ impl BatchBuilder { batch_messages: messages, chain_key_data: Default::default(), consensus_responses: vec![], + refunds: CanisterHttpRefunds::default(), requires_full_state_hash: false, }; self diff --git a/rs/types/types/src/batch.rs b/rs/types/types/src/batch.rs index 5cc927abd4aa..b12c64bafc4b 100644 --- a/rs/types/types/src/batch.rs +++ b/rs/types/types/src/batch.rs @@ -41,10 +41,13 @@ use ic_btc_replica_types::BitcoinAdapterResponse; use ic_exhaustive_derive::ExhaustiveSet; use ic_management_canister_types_private::MasterPublicKeyId; use ic_protobuf::{proxy::ProxyDecodeError, types::v1 as pb}; +use ic_types_cycles::Cycles; use prost::{DecodeError, Message, bytes::BufMut}; use serde::{Deserialize, Serialize}; +use std::collections::BTreeSet; use std::{collections::BTreeMap, convert::TryInto, hash::Hash}; +#[allow(clippy::large_enum_variant)] #[derive(Clone, Eq, PartialEq, Debug)] pub enum BatchContent { /// The payload messages to be processed. @@ -52,6 +55,10 @@ pub enum BatchContent { batch_messages: BatchMessages, /// Responses to subnet calls that require consensus' involvement. consensus_responses: Vec, + /// Refunds for HTTP outcalls, delivered separately from the consensus + /// responses (they are accounting data, not delivered to the calling + /// canister). + refunds: CanisterHttpRefunds, /// Data required by the chain key service chain_key_data: ChainKeyData, /// Whether the state obtained by executing this batch needs to be fully @@ -344,6 +351,57 @@ impl TryFrom for ConsensusResponse { } } +/// Refunds for HTTP outcalls, delivered alongside (but separately from) the +/// [`ConsensusResponse`]s of a batch. +/// +/// Unlike the consensus responses, these are *not* delivered to the calling +/// canister; they are consumed by the messaging layer, which accumulates them +/// into the request contexts' refund status. They are only used in-memory on +/// the hop from consensus to the messaging layer and are intentionally not +/// serialized. +/// +/// There are two kinds of refunds: +/// - an *initial* refund, where the set of nodes that produced a response +/// collectively refund one specific amount of cycles (see +/// [`CanisterHttpInitialRefund`]); +/// - an *asynchronous* refund, where individual nodes each refund some cycles, +/// possibly in a later block than the response (see +/// [`CanisterHttpAsyncRefund`]). +#[derive(Clone, Eq, PartialEq, Hash, Debug, Default, Deserialize, Serialize)] +#[cfg_attr(test, derive(ExhaustiveSet))] +pub struct CanisterHttpRefunds { + pub initial: Vec, + pub asynchronous: Vec, +} + +/// The initial refund for an HTTP outcall: the set of `nodes` that produced the +/// response collectively refund one specific `amount` of cycles. +/// +/// Eventually this amount will be computed as the sum of the participating +/// nodes' individual refunds minus the consensus cost. The contributing `nodes` +/// are recorded so that the messaging layer can avoid crediting any later +/// asynchronous refund from a node that already contributed here. +#[derive(Clone, Eq, PartialEq, Hash, Debug, Deserialize, Serialize)] +#[cfg_attr(test, derive(ExhaustiveSet))] +pub struct CanisterHttpInitialRefund { + pub callback: CallbackId, + pub amount: Cycles, + pub nodes: BTreeSet, +} + +/// An asynchronous refund for an HTTP outcall. +/// +/// `shares` holds the per-replica refunds that the participating nodes signed +/// over as part of the aggregated response proof. A refund may be delivered in +/// a later block than the response; the messaging layer credits each node at +/// most once. +#[derive(Clone, Eq, PartialEq, Hash, Debug, Deserialize, Serialize)] +#[cfg_attr(test, derive(ExhaustiveSet))] +pub struct CanisterHttpAsyncRefund { + pub callback: CallbackId, + pub shares: Vec<(NodeId, Cycles)>, +} + #[cfg(test)] mod tests { use super::*; diff --git a/rs/types/types/src/batch/canister_http.rs b/rs/types/types/src/batch/canister_http.rs index 4ab531e4f109..0285f0cc3ef0 100644 --- a/rs/types/types/src/batch/canister_http.rs +++ b/rs/types/types/src/batch/canister_http.rs @@ -19,6 +19,7 @@ use ic_protobuf::{ proxy::{ProxyDecodeError, try_from_option_field}, types::v1 as pb, }; +use ic_types_cycles::Cycles; use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, convert::TryFrom}; @@ -54,6 +55,9 @@ pub enum FlexibleCanisterHttpError { TooManyRejects { callback_id: CallbackId, reject_responses: Vec, + /// Collective initial refund, computed during payload building from the + /// subnet size in the request context and validated during validation. + initial_refund: Cycles, }, } @@ -77,6 +81,9 @@ impl FlexibleCanisterHttpError { pub struct FlexibleCanisterHttpResponses { pub callback_id: CallbackId, pub responses: Vec, + /// Collective initial refund, computed during payload building from the + /// subnet size in the request context and validated during validation. + pub initial_refund: Cycles, } /// A single flexible HTTP outcall response paired with its single-signer proof. @@ -134,12 +141,14 @@ impl CountBytes for FlexibleCanisterHttpError { Self::TooManyRejects { callback_id, reject_responses, + initial_refund, } => { callback_id.count_bytes() + reject_responses .iter() .map(|r| r.count_bytes()) .sum::() + + std::mem::size_of_val(initial_refund) } } } @@ -227,6 +236,7 @@ impl From for pb::CanisterHttpResponseWithCon .collect(), content_size: metadata.content_size, is_reject: metadata.is_reject, + initial_refund: Some(payload.initial_refund.into()), } } } @@ -291,6 +301,10 @@ impl TryFrom for CanisterHttpResponseWith }, signatures, }, + initial_refund: try_from_option_field( + payload.initial_refund, + "CanisterHttpResponseWithConsensus::initial_refund", + )?, }) } } @@ -453,6 +467,7 @@ impl From for pb::FlexibleCanisterHttpResponses { pb::FlexibleCanisterHttpResponses { callback_id: responses.callback_id.get(), responses: responses.responses.into_iter().map(Into::into).collect(), + initial_refund: Some(responses.initial_refund.into()), } } } @@ -468,6 +483,10 @@ impl TryFrom for FlexibleCanisterHttpResponse .into_iter() .map(TryFrom::try_from) .collect::, _>>()?, + initial_refund: try_from_option_field( + responses.initial_refund, + "FlexibleCanisterHttpResponses::initial_refund", + )?, }) } } @@ -494,12 +513,15 @@ impl From for pb::FlexibleCanisterHttpError { min_responses, }), FlexibleCanisterHttpError::TooManyRejects { - reject_responses, .. + reject_responses, + initial_refund, + .. } => ErrorDetails::TooManyRejects(pb::FlexibleCanisterHttpTooManyRejects { reject_responses: reject_responses .into_iter() .map(pb::FlexibleCanisterHttpResponseWithProof::from) .collect(), + initial_refund: Some(initial_refund.into()), }), }; pb::FlexibleCanisterHttpError { @@ -541,6 +563,10 @@ impl TryFrom for FlexibleCanisterHttpError { Ok(FlexibleCanisterHttpError::TooManyRejects { callback_id, reject_responses, + initial_refund: try_from_option_field( + details.initial_refund, + "FlexibleCanisterHttpTooManyRejects::initial_refund", + )?, }) } None => Err(ProxyDecodeError::MissingField( diff --git a/rs/types/types/src/canister_http.rs b/rs/types/types/src/canister_http.rs index fe452bc7b2d6..f98ecebfadd9 100644 --- a/rs/types/types/src/canister_http.rs +++ b/rs/types/types/src/canister_http.rs @@ -137,6 +137,11 @@ pub struct CanisterHttpRequestContext { pub replication: Replication, pub pricing_version: PricingVersion, pub refund_status: RefundStatus, + /// The total number of nodes on the subnet (`N`) at the time the request + /// context was created, as known from the subnet membership. This is + /// required by the pay-as-you-go pricing formula and is carried on the + /// context so that it survives all the way to the adapter client. + pub subnet_size: usize, } #[derive(Clone, Eq, PartialEq, Hash, Debug, Deserialize, Serialize)] @@ -281,6 +286,7 @@ impl From<&CanisterHttpRequestContext> for pb_metadata::CanisterHttpRequestConte replication: Some(replication_message), pricing_version: Some(pricing_message), refund_status: Some(refund_status), + subnet_size: context.subnet_size as u32, } } } @@ -405,6 +411,7 @@ impl TryFrom for CanisterHttpRequestCon replication, pricing_version, refund_status, + subnet_size: context.subnet_size as usize, }) } } @@ -487,6 +494,45 @@ fn validate_url_length(url: &str) -> Result<(), CanisterHttpRequestContextError> Ok(()) } +// Base-cost fee constants for HTTPS outcalls pricing. +// +// Every outcall's cost is split into three parts: the base cost (charged +// up-front when the request context is created and reflected in the refundable +// amount below), the per-replica cost (charged as-you-go by the budget tracker +// in the adapter client), and the consensus cost (ignored for now). +// +// The base-cost formula depends on the replication strategy: +// +// Fully/non-replicated: +// N * (1M + 50 * request_bytes + 140k * N + 800 * N * N) +// Flexible: +// N * (1M + 50 * request_bytes + 90k * N) +// +// where `N` is the total number of nodes on the subnet and `request_bytes` is +// the variable (unbounded) part of the request. +const BASE_FEE: u128 = 1_000_000; +const BASE_PER_REQUEST_BYTE_FEE: u128 = 50; +const BASE_PER_NODE_FEE_FULLY_REPLICATED: u128 = 140_000; +const BASE_QUADRATIC_NODE_FEE_FULLY_REPLICATED: u128 = 800; +const BASE_PER_NODE_FEE_FLEXIBLE: u128 = 90_000; + +fn base_cost_fully_replicated(request_bytes: u64, n: u64) -> Cycles { + let n = n as u128; + let per_replica = BASE_FEE + + BASE_PER_REQUEST_BYTE_FEE * request_bytes as u128 + + BASE_PER_NODE_FEE_FULLY_REPLICATED * n + + BASE_QUADRATIC_NODE_FEE_FULLY_REPLICATED * n * n; + Cycles::new(per_replica * n) +} + +fn base_cost_flexible(request_bytes: u64, n: u64) -> Cycles { + let n = n as u128; + let per_replica = BASE_FEE + + BASE_PER_REQUEST_BYTE_FEE * request_bytes as u128 + + BASE_PER_NODE_FEE_FLEXIBLE * n; + Cycles::new(per_replica * n) +} + impl CanisterHttpRequestContext { /// Calculate the size of all unbounded struct elements. pub fn variable_parts_size(&self) -> NumBytes { @@ -571,7 +617,7 @@ impl CanisterHttpRequestContext { _ => Replication::FullyReplicated, }; - Ok(CanisterHttpRequestContext { + let mut context = CanisterHttpRequestContext { request: request.clone(), url: args.url, max_response_bytes, @@ -588,14 +634,22 @@ impl CanisterHttpRequestContext { .unwrap_or(DEFAULT_HTTP_OUTCALLS_PRICING_VERSION); PricingVersion::from_repr(final_version_u32).unwrap_or(PricingVersion::Legacy) }, - refund_status: RefundStatus { - //TODO(IC-1937): subtract the base fee from the refundable amount. - refundable_cycles: request.payment, - per_replica_allowance: request.payment / node_ids.len(), - refunded_cycles: Cycles::new(0), - refunding_nodes: BTreeSet::new(), - }, - }) + refund_status: RefundStatus::default(), + subnet_size: node_ids.len(), + }; + + // Charge the base cost up-front. Fully/non-replicated outcalls use the + // fully-replicated base formula. `N` is the total number of nodes. + let base_cost = + base_cost_fully_replicated(context.variable_parts_size().get(), node_ids.len() as u64); + let refundable_cycles = request.payment - base_cost; + context.refund_status = RefundStatus { + refundable_cycles, + per_replica_allowance: refundable_cycles / node_ids.len(), + refunded_cycles: Cycles::new(0), + refunding_nodes: BTreeSet::new(), + }; + Ok(context) } pub fn generate_from_flexible_args( @@ -682,7 +736,7 @@ impl CanisterHttpRequestContext { .into_iter() .collect(); - Ok(CanisterHttpRequestContext { + let mut context = CanisterHttpRequestContext { request: request.clone(), url: args.url, max_response_bytes: None, @@ -697,14 +751,22 @@ impl CanisterHttpRequestContext { max_responses, }, pricing_version: PricingVersion::PayAsYouGo, - refund_status: RefundStatus { - //TODO(IC-1937): subtract the base fee from the refundable amount. - refundable_cycles: request.payment, - per_replica_allowance: request.payment / (total_requests as usize).max(1), - refunded_cycles: Cycles::new(0), - refunding_nodes: BTreeSet::new(), - }, - }) + refund_status: RefundStatus::default(), + subnet_size: node_ids.len(), + }; + + // Charge the base cost up-front using the flexible base formula. `N` is + // the total number of nodes on the subnet, while the per-replica + // allowance is shared among the `total_requests` committee members. + let base_cost = base_cost_flexible(context.variable_parts_size().get(), n as u64); + let refundable_cycles = request.payment - base_cost; + context.refund_status = RefundStatus { + refundable_cycles, + per_replica_allowance: refundable_cycles / (total_requests as usize).max(1), + refunded_cycles: Cycles::new(0), + refunding_nodes: BTreeSet::new(), + }; + Ok(context) } } @@ -1007,12 +1069,20 @@ impl From for CanisterHttpMethod { pub struct CanisterHttpResponseWithConsensus { pub content: CanisterHttpResponse, pub proof: CanisterHttpResponseProof, + /// The collective initial refund to credit to the calling canister, + /// computed during payload building from the subnet size in the request + /// context and validated during payload validation. + pub initial_refund: Cycles, } impl CountBytes for CanisterHttpResponseWithConsensus { fn count_bytes(&self) -> usize { - let CanisterHttpResponseWithConsensus { content, proof } = &self; - proof.count_bytes() + content.count_bytes() + let CanisterHttpResponseWithConsensus { + content, + proof, + initial_refund, + } = &self; + proof.count_bytes() + content.count_bytes() + size_of_val(initial_refund) } } @@ -1264,6 +1334,7 @@ mod tests { replication: Replication::FullyReplicated, pricing_version: PricingVersion::Legacy, refund_status: RefundStatus::default(), + subnet_size: 13, }; let expected_size = context.url.len() @@ -1309,6 +1380,7 @@ mod tests { replication: Replication::FullyReplicated, pricing_version: PricingVersion::Legacy, refund_status: RefundStatus::default(), + subnet_size: 13, }; let expected_size = context.url.len() @@ -1388,6 +1460,7 @@ mod tests { refunded_cycles: Cycles::new(123), refunding_nodes: BTreeSet::from([node_test_id(1), node_test_id(2)]), }, + subnet_size: 13, }; let pb: pb_metadata::CanisterHttpRequestContext = (&initial).into(); diff --git a/rs/types/types/src/messages.rs b/rs/types/types/src/messages.rs index 8a1522aa317d..0a8573ba7494 100644 --- a/rs/types/types/src/messages.rs +++ b/rs/types/types/src/messages.rs @@ -17,6 +17,7 @@ pub use self::http::{ HttpUserQuery, NodeSignature, QueryResponseHash, RawHttpRequestVal, RawSignedSenderInfo, ReplicaHealthStatus, SenderInfoContent, SignedDelegation, SignedSenderInfo, }; +use crate::batch::ConsensusResponse; use crate::methods::Callback; pub use crate::methods::SystemMethod; use crate::time::CoarseTime; @@ -345,7 +346,7 @@ impl Display for CanisterMessage { #[derive(Clone, Eq, PartialEq, Debug)] pub enum SubnetMessage { Request(Arc), - Response(Arc), + ConsensusResponse(Arc), Ingress(Arc), } @@ -355,7 +356,7 @@ impl SubnetMessage { match &self { SubnetMessage::Ingress(ingress) => ingress.effective_canister_id, SubnetMessage::Request(request) => request.extract_effective_canister_id(), - SubnetMessage::Response { .. } => None, + SubnetMessage::ConsensusResponse { .. } => None, } } }