Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 13 additions & 7 deletions rs/consensus/src/consensus/batch_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use ic_protobuf::{
use ic_types::{
Height, PrincipalId, SubnetId,
batch::{
Batch, BatchContent, BatchMessages, BatchSummary, BlockmakerMetrics, ChainKeyData,
ConsensusResponse,
Batch, BatchContent, BatchMessages, BatchSummary, BlockmakerMetrics, CanisterHttpRefunds,
ChainKeyData, ConsensusResponse,
},
consensus::{
Block, BlockPayload, HasVersion,
Expand Down Expand Up @@ -210,7 +210,8 @@ pub(crate) fn deliver_batches_with_result_processor(
idkg_pre_signatures,
nidkg_ids,
};
let consensus_responses = generate_responses_to_subnet_calls(&block, &mut batch_stats, log);
let (consensus_responses, refunds) =
generate_responses_to_subnet_calls(&block, &mut batch_stats, log);
// This flag can only be true, if we've called deliver_batches with a height
// limit. In this case we also want to have a checkpoint for that last height.
let persist_batch = Some(height) == max_batch_height_to_deliver;
Expand All @@ -220,6 +221,7 @@ pub(crate) fn deliver_batches_with_result_processor(
batch_messages: BatchMessages::default(),
chain_key_data,
consensus_responses,
refunds,
requires_full_state_hash,
},
BlockPayload::Data(data_payload) => {
Expand All @@ -236,6 +238,7 @@ pub(crate) fn deliver_batches_with_result_processor(
.unwrap_or_default(),
chain_key_data,
consensus_responses,
refunds,
requires_full_state_hash,
}
}
Expand Down Expand Up @@ -312,8 +315,9 @@ fn generate_responses_to_subnet_calls(
block: &Block,
stats: &mut BatchStats,
log: &ReplicaLogger,
) -> Vec<ConsensusResponse> {
) -> (Vec<ConsensusResponse>, CanisterHttpRefunds) {
let mut consensus_responses = Vec::new();
let mut refunds = CanisterHttpRefunds::default();
match block.payload.as_ref() {
BlockPayload::Summary(summary_payload) => {
info!(
Expand All @@ -337,17 +341,19 @@ fn generate_responses_to_subnet_calls(
.append(&mut generate_responses_to_initial_dealings_calls(payload));
}

let (mut http_responses, http_stats) =
let (mut http_responses, mut http_refunds, http_stats) =
CanisterHttpPayloadBuilderImpl::into_messages(&data_payload.batch.canister_http);
consensus_responses.append(&mut http_responses);
refunds.initial.append(&mut http_refunds.initial);
refunds.asynchronous.append(&mut http_refunds.asynchronous);
stats.canister_http = http_stats;

let mut chain_key_responses =
ChainKeyPayloadBuilderImpl::into_messages(&data_payload.batch.chain_key);
consensus_responses.append(&mut chain_key_responses);
}
}
consensus_responses
(consensus_responses, refunds)
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -735,7 +741,7 @@ mod tests {
);

let mut batch_stats = BatchStats::new(Height::from(1));
let responses =
let (responses, _refunds) =
generate_responses_to_subnet_calls(&block, &mut batch_stats, &no_op_logger());

assert_eq!(
Expand Down
4 changes: 3 additions & 1 deletion rs/determinism_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ic_state_manager::StateManagerImpl;
use ic_test_utilities_types::messages::SignedIngressBuilder;
use ic_types::{
CanisterId, CryptoHashOfState, Randomness, RegistryVersion, ReplicaVersion,
batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics},
batch::{Batch, BatchContent, BatchMessages, BlockmakerMetrics, CanisterHttpRefunds},
ingress::{IngressState, IngressStatus, WasmResult},
messages::{MessageId, SignedIngress},
time::UNIX_EPOCH,
Expand All @@ -33,6 +33,7 @@ fn build_batch(message_routing: &dyn MessageRouting, msgs: Vec<SignedIngress>) -
},
chain_key_data: Default::default(),
consensus_responses: vec![],
refunds: CanisterHttpRefunds::default(),
requires_full_state_hash: false,
},
randomness: Randomness::from([0; 32]),
Expand All @@ -51,6 +52,7 @@ fn build_batch_with_full_state_hash(message_routing: &dyn MessageRouting) -> Bat
batch_messages: BatchMessages::default(),
chain_key_data: Default::default(),
consensus_responses: vec![],
refunds: CanisterHttpRefunds::default(),
requires_full_state_hash: true,
},
randomness: Randomness::from([0; 32]),
Expand Down
30 changes: 22 additions & 8 deletions rs/execution_environment/src/execution_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,11 +693,11 @@ impl ExecutionEnvironment {
let cost_schedule = state.get_own_cost_schedule();

let mut msg = match msg {
SubnetMessage::Response(response) => {
SubnetMessage::ConsensusResponse(response) => {
let context = state
.metadata
.subnet_call_context_manager
.retrieve_context(response.originator_reply_callback, &self.log);
.retrieve_context(response.callback, &self.log);
return match context {
None => (state, ExecuteSubnetMessageResultType::Finished),
Some(context) => {
Expand All @@ -718,7 +718,7 @@ impl ExecutionEnvironment {
context.max_response_bytes,
registry_settings.subnet_size,
cost_schedule,
NumBytes::from(response.payload_size_bytes()),
response.payload.size_bytes(),
);

self.metrics.observe_http_outcall_price_change(
Expand All @@ -737,7 +737,7 @@ impl ExecutionEnvironment {
info!(
self.log,
"Canister Http request with payload_size {}, max_response_size {}, subnet_size {}, reply_callback_id {}, sender {}, process_id {}",
response.payload_size_bytes().get(),
response.payload.size_bytes().get(),
max_response_size,
registry_settings.subnet_size,
context.request.sender_reply_callback,
Expand All @@ -749,7 +749,7 @@ impl ExecutionEnvironment {
self.metrics.observe_subnet_message(
&request.method_name,
time_elapsed.as_secs_f64(),
&match &response.response_payload {
&match &response.payload {
Payload::Data(_) => Ok(()),
Payload::Reject(_) => Err(ErrorCode::CanisterRejectedMessage),
},
Expand All @@ -758,7 +758,7 @@ impl ExecutionEnvironment {
if let (
SubnetCallContext::SignWithThreshold(threshold_context),
Payload::Data(_),
) = (&context, &response.response_payload)
) = (&context, &response.payload)
{
*state
.metadata
Expand All @@ -768,13 +768,27 @@ impl ExecutionEnvironment {
.or_default() += 1;
}

// Refund the cycles left unspent upfront (`request.payment`).
// For HTTP outcalls, additionally refund the per-replica
// shares that the messaging layer accumulated into the
// request context's `refund_status` before execution. For
// non-HTTP responses there is no such accumulation, so this
// preserves the previous behavior.
let http_refund = match &context {
SubnetCallContext::CanisterHttpRequest(context) => {
context.refund_status.refunded_cycles
}
_ => Cycles::new(0),
};
let refund = request.payment + http_refund;

state.push_subnet_output_response(
Response {
originator: request.sender,
respondent: CanisterId::from(self.own_subnet_id),
originator_reply_callback: request.sender_reply_callback,
refund: request.payment,
response_payload: response.response_payload.clone(),
refund,
response_payload: response.payload.clone(),
deadline: request.deadline,
}
.into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use ic_metrics::MetricsRegistry;
use ic_metrics::buckets::{decimal_buckets, decimal_buckets_with_zero};
use ic_replicated_state::metadata_state::subnet_call_context_manager::InstallCodeCallId;
use ic_types::CanisterId;
use ic_types::batch::ConsensusResponse;
use ic_types::canister_http::{CanisterHttpRequestContext, MAX_CANISTER_HTTP_RESPONSE_BYTES};
use ic_types::messages::Response;
use ic_types_cycles::NominalCycles;
use prometheus::{Histogram, HistogramVec, IntCounter};
use std::str::FromStr;
Expand Down Expand Up @@ -232,7 +232,7 @@ impl ExecutionEnvironmentMetrics {
pub(crate) fn observe_http_outcall_request(
&self,
context: &CanisterHttpRequestContext,
response: &Response,
response: &ConsensusResponse,
) {
self.http_outcalls_metrics
.request_size
Expand All @@ -250,7 +250,7 @@ impl ExecutionEnvironmentMetrics {

self.http_outcalls_metrics
.payload_size
.observe(response.payload_size_bytes().get() as f64);
.observe(response.payload.size_bytes().get() as f64);
}

pub(crate) fn observe_http_outcall_price_change(
Expand Down
23 changes: 5 additions & 18 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ use ic_replicated_state::{
};
use ic_types::batch::ChainKeyData;
use ic_types::ingress::{IngressState, IngressStatus};
use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage};
use ic_types::messages::{Ingress, MessageId, SubnetMessage};
use ic_types::{
CanisterId, CanisterLog, ComputeAllocation, DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, ExecutionRound,
MemoryAllocation, NumBytes, NumInstructions, NumMessages, NumSlices, Randomness,
ReplicaVersion, Time,
};
use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles};
use ic_types_cycles::CanisterCyclesCostSchedule;
use more_asserts::{debug_assert_ge, debug_assert_le, debug_assert_lt};
use std::cell::RefCell;
use std::collections::{BTreeSet, VecDeque};
Expand Down Expand Up @@ -1306,20 +1306,7 @@ impl Scheduler for SchedulerImpl {
// state. That can be changed in the future as we optimize scheduling.
while let Some(response) = state.consensus_queue.pop() {
let (new_state, _) = self.execute_subnet_message(
// Wrap the callback ID and payload into a Response, to make it easier for
// `execute_subnet_message()` to deal with. All other fields will be ignored by
// `execute_subnet_message()`.
SubnetMessage::Response(
Response {
originator: CanisterId::ic_00(),
respondent: CanisterId::ic_00(),
originator_reply_callback: response.callback,
refund: Cycles::zero(),
response_payload: response.payload,
deadline: NO_DEADLINE,
}
.into(),
),
SubnetMessage::ConsensusResponse(Arc::new(response)),
state,
&mut csprng,
current_round,
Expand Down Expand Up @@ -1855,7 +1842,7 @@ fn can_execute_subnet_msg(
let maybe_method = match msg {
SubnetMessage::Ingress(ingress) => Ic00Method::from_str(ingress.method_name.as_str()).ok(),
SubnetMessage::Request(request) => Ic00Method::from_str(request.method_name.as_str()).ok(),
SubnetMessage::Response { .. } => None,
SubnetMessage::ConsensusResponse { .. } => None,
};
let Some(method) = maybe_method else {
// If this is a response or the method name is not valid, execute the message.
Expand Down Expand Up @@ -1907,7 +1894,7 @@ fn get_instruction_limits_for_subnet_message(
// for install code in which case the default limits are overriden.
let default_limits = InstructionLimits::new(NumInstructions::new(0), NumInstructions::new(0));
let method_name = match &msg {
SubnetMessage::Response { .. } => {
SubnetMessage::ConsensusResponse { .. } => {
return default_limits;
}
SubnetMessage::Ingress(ingress) => &ingress.method_name,
Expand Down
14 changes: 12 additions & 2 deletions rs/https_outcalls/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct CanisterHttpAdapterClientImpl {
rx: Receiver<(CanisterHttpResponse, CanisterHttpPaymentReceipt)>,
query_service: TransformExecutionService,
metrics: Metrics,
pricing_factory: PricingFactory,
log: ReplicaLogger,
}

Expand All @@ -79,13 +80,15 @@ impl CanisterHttpAdapterClientImpl {
) -> Self {
let (tx, rx) = channel(inflight_requests);
let metrics = Metrics::new(&metrics_registry);
let pricing_factory = PricingFactory::new(&metrics_registry, log.clone());
Self {
rt_handle,
grpc_channel,
tx,
rx,
query_service,
metrics,
pricing_factory,
log,
}
}
Expand Down Expand Up @@ -121,6 +124,7 @@ impl NonBlockingChannel<CanisterHttpRequest> for CanisterHttpAdapterClientImpl {
let mut http_adapter_client = HttpsOutcallsServiceClient::new(self.grpc_channel.clone());
let query_handler = self.query_service.clone();
let metrics = self.metrics.clone();
let pricing_factory = self.pricing_factory.clone();
let log = self.log.clone();

// Spawn an async task that sends the canister http request to the adapter and awaits the response.
Expand All @@ -133,7 +137,7 @@ impl NonBlockingChannel<CanisterHttpRequest> for CanisterHttpAdapterClientImpl {
socks_proxy_addrs,
} = canister_http_request;

let mut budget = PricingFactory::new_tracker(&request_context);
let mut budget = pricing_factory.new_tracker(&request_context);
let request_size = request_context.variable_parts_size();

let CanisterHttpRequestContext {
Expand Down Expand Up @@ -262,7 +266,12 @@ impl NonBlockingChannel<CanisterHttpRequest> for CanisterHttpAdapterClientImpl {
}
.await;

// Create the payment receipt after all processing is complete.
// Account for the final (post-transform) response that will be
// handed back to the caller, then create the payment receipt after
// all processing is complete.
if let Ok(resp) = &payload {
let _ = budget.subtract_transformed_response_usage(NumBytes::from(resp.len() as u64));
}
let receipt = budget.create_payment_receipt();

permit.send((
Expand Down Expand Up @@ -654,6 +663,7 @@ mod tests {
replication: Replication::FullyReplicated,
pricing_version: PricingVersion::Legacy,
refund_status: RefundStatus::default(),
subnet_size: 13,
},
socks_proxy_addrs: vec![],
}
Expand Down
1 change: 1 addition & 0 deletions rs/https_outcalls/consensus/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ rust_bench(
"//rs/test_utilities/registry",
"//rs/test_utilities/state",
"//rs/test_utilities/types",
"//rs/types/cycles",
"//rs/types/types",
"@crate_index//:criterion",
],
Expand Down
4 changes: 4 additions & 0 deletions rs/https_outcalls/consensus/benches/payload_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl<'a> PayloadAssembler<'a> {
metadata,
signatures,
},
initial_refund: ic_types_cycles::Cycles::zero(),
});
self.contexts.push((
CallbackId::new(callback_id),
Expand All @@ -387,6 +388,7 @@ impl<'a> PayloadAssembler<'a> {
metadata,
signatures,
},
initial_refund: ic_types_cycles::Cycles::zero(),
});
self.contexts.push((
CallbackId::new(callback_id),
Expand Down Expand Up @@ -433,6 +435,7 @@ impl<'a> PayloadAssembler<'a> {
flexible_responses.push(FlexibleCanisterHttpResponses {
callback_id: CallbackId::new(callback_id),
responses: entries,
initial_refund: ic_types_cycles::Cycles::zero(),
});
self.contexts.push((
CallbackId::new(callback_id),
Expand Down Expand Up @@ -492,6 +495,7 @@ fn request_context(replication: Replication) -> CanisterHttpRequestContext {
replication,
pricing_version: PricingVersion::Legacy,
refund_status: RefundStatus::default(),
subnet_size: 4,
}
}

Expand Down
Loading
Loading