Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@

### Added

- [#7024](https://github.com/ChainSafe/forest/pull/7024): `FOREST_RPC_MAX_RESPONSE_BODY_SIZE` environment variable. Sets the JSON-RPC server's maximum response body size in bytes (default 64 MiB). Operators serving log-heavy `eth_getTransactionReceipt`/`eth_getBlockReceipts` calls can raise this above 64 MiB.

### Changed

### Removed

### Fixed

- [#7024](https://github.com/ChainSafe/forest/pull/7024): `eth_getTransactionReceipt` no longer fails when another transaction in the same tipset emits a large number of events. `max_filter_results` now caps only multi-tipset event queries; single-block calls (`eth_getLogs` with `blockHash`, `eth_getBlockReceipts`, `eth_getTransactionReceipt`) bypass it. Public RPC operators should apply rate and response-size limits at the proxy layer for these calls; a single response can be large when a block contains log-heavy transactions. Ports [filecoin-project/lotus#13617](https://github.com/filecoin-project/lotus/pull/13617).

## Forest v0.33.3 "Dawn"

Non-mandatory release for all node operators. It includes a few fixes to make the chain following logic more robust and eliminate a few non-critical warnings.
Expand Down
13 changes: 13 additions & 0 deletions src/cli_shared/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,21 @@ impl Default for DaemonConfig {
#[derive(Deserialize, Serialize, PartialEq, Eq, Debug, Clone)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
pub struct EventsConfig {
/// Caps the events returned by event-filter queries used by the actor
/// events API and the Ethereum event and receipt APIs (`eth_getLogs`,
/// `eth_getFilterLogs`, `eth_getFilterChanges`). Set to `0` for no limit.
///
/// The cap is a hard limit only when a query's events come from more than
/// one tipset. A range whose events all live in a single tipset may
/// exceed this value; queries scoped to a single tipset (`block_hash`,
/// `eth_getBlockReceipts`) bypass it entirely. `eth_getTransactionReceipt`
/// narrows to a single message and is also unaffected.
///
/// Self-hosted nodes serving trusted callers can use `0` or a high value.
/// Public RPC operators should keep it bounded.
#[cfg_attr(test, arbitrary(gen(|g| u32::arbitrary(g) as _)))]
pub max_filter_results: usize,
/// Maximum block-range span (in epochs) accepted in event-filter queries.
pub max_filter_height_range: ChainEpoch,
}

Expand Down
4 changes: 2 additions & 2 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl UrlClient {
jsonrpsee::ws_client::WsClientBuilder::new()
.set_headers(headers)
.max_request_size(MAX_REQUEST_BODY_SIZE)
.max_response_size(MAX_RESPONSE_BODY_SIZE)
.max_response_size(*MAX_RESPONSE_BODY_SIZE)
.request_timeout(ONE_DAY)
.build(&url)
.await?,
Expand All @@ -220,7 +220,7 @@ impl UrlClient {
jsonrpsee::http_client::HttpClientBuilder::new()
.set_headers(headers)
.max_request_size(MAX_REQUEST_BODY_SIZE)
.max_response_size(MAX_RESPONSE_BODY_SIZE)
.max_response_size(*MAX_RESPONSE_BODY_SIZE)
.request_timeout(ONE_DAY)
.build(&url)?,
),
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
let handle = tokio::spawn(async move {
while let Ok(changes) = head_changes_rx.recv().await {
for ts in changes.applies {
match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await {
match eth_logs_with_filter(&ctx, &ts, filter.clone()).await {
Ok(logs) => {
if !logs.is_empty()
&& let Err(e) = sender.send(logs)
Expand Down
126 changes: 75 additions & 51 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,7 @@ async fn new_eth_tx_receipt<DB: Blockstore + Send + Sync + 'static>(
ctx: &Ctx<DB>,
tipset: &Tipset,
tx: &ApiEthTx,
msg_cid: Cid,
msg_receipt: &Receipt,
) -> anyhow::Result<EthTxReceipt> {
let mut tx_receipt = EthTxReceipt {
Expand Down Expand Up @@ -1282,7 +1283,7 @@ async fn new_eth_tx_receipt<DB: Blockstore + Send + Sync + 'static>(

if msg_receipt.events_root().is_some() {
let logs =
eth_logs_for_block_and_transaction(ctx, tipset, &tx.block_hash, &tx.hash).await?;
eth_logs_for_block_and_transaction(ctx, tipset, &tx.block_hash, &msg_cid).await?;
if !logs.is_empty() {
tx_receipt.logs = logs;
}
Expand All @@ -1304,21 +1305,34 @@ pub async fn eth_logs_for_block_and_transaction<DB: Blockstore + Send + Sync + '
ctx: &Ctx<DB>,
ts: &Tipset,
block_hash: &EthHash,
tx_hash: &EthHash,
msg_cid: &Cid,
) -> anyhow::Result<Vec<EthLog>> {
let spec = EthFilterSpec {
block_hash: Some(*block_hash),
..Default::default()
};
// Refuse to serve events for tipsets at or after head (deferred execution).
let heaviest_epoch = ctx.chain_store().heaviest_tipset().epoch();
if ts.epoch() >= heaviest_epoch {
return Err(EthErrors::EventsNotYetAvailable.into());
}

eth_logs_with_filter(ctx, ts, Some(spec), Some(tx_hash)).await
let pf = ParsedFilter::new_with_tipset_and_msg(
ParsedFilterTipsets::Hash(*block_hash),
Some(*msg_cid),
);
let mut events = vec![];
EthEventHandler::collect_events(
ctx,
ts,
Some(&pf),
SkipEvent::OnUnresolvedAddress,
&mut events,
)
.await?;
eth_filter_logs_from_events(ctx, &events)
}

pub async fn eth_logs_with_filter<DB: Blockstore + Send + Sync + 'static>(
ctx: &Ctx<DB>,
ts: &Tipset,
spec: Option<EthFilterSpec>,
tx_hash: Option<&EthHash>,
) -> anyhow::Result<Vec<EthLog>> {
let mut events = vec![];
EthEventHandler::collect_events(
Expand All @@ -1329,15 +1343,7 @@ pub async fn eth_logs_with_filter<DB: Blockstore + Send + Sync + 'static>(
&mut events,
)
.await?;

let logs = eth_filter_logs_from_events(ctx, &events)?;
Ok(match tx_hash {
Some(hash) => logs
.into_iter()
.filter(|log| &log.transaction_hash == hash)
.collect(),
None => logs, // no tx hash, keep all logs
})
eth_filter_logs_from_events(ctx, &events)
}

fn get_signed_message<DB: Blockstore>(ctx: &Ctx<DB>, message_cid: Cid) -> Result<SignedMessage> {
Expand Down Expand Up @@ -1453,7 +1459,7 @@ async fn get_block_receipts<DB: Blockstore + Send + Sync + 'static>(
i as u64,
)?;

let receipt = new_eth_tx_receipt(ctx, &ts_ref, &tx, receipt).await?;
let receipt = new_eth_tx_receipt(ctx, &ts_ref, &tx, message.cid(), receipt).await?;
eth_receipts.push(receipt);
}
Ok(eth_receipts)
Expand Down Expand Up @@ -2853,7 +2859,8 @@ async fn get_eth_transaction_receipt(
)
})?;

let tx_receipt = new_eth_tx_receipt(&ctx, &parent_ts, &tx, &message_lookup.receipt).await?;
let tx_receipt =
new_eth_tx_receipt(&ctx, &parent_ts, &tx, msg_cid, &message_lookup.receipt).await?;

Ok(Some(tx_receipt))
}
Expand Down Expand Up @@ -3060,20 +3067,6 @@ fn eth_tx_hash_from_message_cid<DB: Blockstore>(
Ok(None)
}

fn transform_events<F>(events: &[CollectedEvent], f: F) -> anyhow::Result<Vec<EthLog>>
where
F: Fn(&CollectedEvent) -> anyhow::Result<Option<EthLog>>,
{
events
.iter()
.filter_map(|event| match f(event) {
Ok(Some(eth_log)) => Some(Ok(eth_log)),
Ok(None) => None,
Err(e) => Some(Err(e)),
})
.collect()
}

fn eth_filter_logs_from_tipsets(events: &[CollectedEvent]) -> anyhow::Result<Vec<EthHash>> {
events
.iter()
Expand Down Expand Up @@ -3108,36 +3101,67 @@ fn eth_filter_logs_from_events<DB: Blockstore>(
ctx: &Ctx<DB>,
events: &[CollectedEvent],
) -> anyhow::Result<Vec<EthLog>> {
transform_events(events, |event| {
let (data, topics) = if let Some((data, topics)) = eth_log_from_event(&event.entries) {
(data, topics)
use ahash::AHashMap as HashMap;

let chain_id = ctx.state_manager.chain_config().eth_chain_id;
let mut tx_hash_by_msg: HashMap<Cid, EthHash> = HashMap::new();
let mut block_hash_by_tipset: HashMap<TipsetKey, EthHash> = HashMap::new();
let mut eth_addr_by_emitter: HashMap<FilecoinAddress, EthAddress> = HashMap::new();

let mut logs = Vec::with_capacity(events.len());
for event in events {
let (data, topics) = match eth_log_from_event(&event.entries) {
Some(parts) => parts,
None => {
tracing::warn!("Ignoring event");
continue;
}
};

let transaction_hash = if let Some(h) = tx_hash_by_msg.get(&event.msg_cid) {
*h
} else {
tracing::warn!("Ignoring event");
return Ok(None);
match eth_tx_hash_from_message_cid(ctx.store(), &event.msg_cid, chain_id)? {
Some(h) => {
tx_hash_by_msg.insert(event.msg_cid, h);
h
}
None => {
tracing::warn!("Ignoring event");
continue;
}
}
};
let transaction_hash = if let Some(transaction_hash) = eth_tx_hash_from_message_cid(
ctx.store(),
&event.msg_cid,
ctx.state_manager.chain_config().eth_chain_id,
)? {
transaction_hash

let block_hash = if let Some(h) = block_hash_by_tipset.get(&event.tipset_key) {
*h
} else {
tracing::warn!("Ignoring event");
return Ok(None);
let h: EthHash = event.tipset_key.cid()?.into();
block_hash_by_tipset.insert(event.tipset_key.clone(), h);
h
};
let address = EthAddress::from_filecoin_address(&event.emitter_addr)?;
Ok(Some(EthLog {

let address = if let Some(a) = eth_addr_by_emitter.get(&event.emitter_addr) {
*a
} else {
let a = EthAddress::from_filecoin_address(&event.emitter_addr)?;
eth_addr_by_emitter.insert(event.emitter_addr, a);
a
};

logs.push(EthLog {
address,
data,
topics,
removed: event.reverted,
log_index: event.event_idx.into(),
transaction_index: event.msg_idx.into(),
transaction_hash,
block_hash: event.tipset_key.cid()?.into(),
block_hash,
block_number: (event.height as u64).into(),
}))
})
});
}
Ok(logs)
}

fn eth_filter_result_from_events<DB: Blockstore>(
Expand Down
17 changes: 17 additions & 0 deletions src/rpc/methods/eth/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum EthErrors {
given: i64,
message: String,
},
#[error("events for the requested block are not yet available")]
EventsNotYetAvailable,
}

impl EthErrors {
Expand Down Expand Up @@ -57,13 +59,15 @@ impl RpcErrorData for EthErrors {
match self {
EthErrors::ExecutionReverted { .. } => Some(EXECUTION_REVERTED_CODE),
EthErrors::BlockRangeExceeded { .. } => Some(LIMIT_EXCEEDED_CODE),
EthErrors::EventsNotYetAvailable => None,
}
}

fn error_message(&self) -> Option<String> {
match self {
EthErrors::ExecutionReverted { message, .. } => Some(message.clone()),
EthErrors::BlockRangeExceeded { message, .. } => Some(message.clone()),
EthErrors::EventsNotYetAvailable => Some(self.to_string()),
}
}

Expand All @@ -73,6 +77,7 @@ impl RpcErrorData for EthErrors {
Some(serde_json::Value::String(data.clone()))
}
EthErrors::BlockRangeExceeded { .. } => None,
EthErrors::EventsNotYetAvailable => None,
}
}
}
Expand Down Expand Up @@ -106,4 +111,16 @@ mod tests {
"block range exceeds maximum of 2880 (got 5000)"
);
}

#[test]
fn test_events_not_yet_available_converts_to_server_error() {
let err = EthErrors::EventsNotYetAvailable;
let server_err: ServerError = err.into();

// No specific RPC error code is assigned; falls back to default.
assert_eq!(
server_err.message(),
"events for the requested block are not yet available"
);
}
}
2 changes: 2 additions & 0 deletions src/rpc/methods/eth/filter/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl From<&EventFilter> for ParsedFilter {
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys: event_filter.keys_with_codec.clone(),
msg_cid: None,
}
}
}
Expand Down Expand Up @@ -102,6 +103,7 @@ mod tests {
tipsets: ParsedFilterTipsets::Range(RangeInclusive::new(0, 100)),
addresses: vec![Address::new_id(123)],
keys: HashMap::new(),
msg_cid: None,
};
// Test case 1: Install the EventFilter
let filter = event_manager
Expand Down
Loading
Loading