Skip to content

Commit 9f99030

Browse files
netsiriussanityclaude
authored
fix: Resolve race conditions and improve operation atomicity (#2233)
Co-authored-by: Ian Clarke <sanity@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent 58262e8 commit 9f99030

File tree

15 files changed

+412
-179
lines changed

15 files changed

+412
-179
lines changed

apps/freenet-ping/Cargo.lock

Lines changed: 4 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/freenet-ping/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ members = ["contracts/ping", "app", "types"]
44

55
[workspace.dependencies]
66
# freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] }
7-
freenet-stdlib = { version = "0.1.24" }
7+
freenet-stdlib = { version = "0.1.24" }
88
freenet-ping-types = { path = "types", default-features = false }
99
chrono = { version = "0.4", default-features = false }
10+
clap = "4"
1011
testresult = "0.4"
1112

1213
[profile.dev.package."*"]

apps/freenet-ping/app/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ manual-tests = []
1010
[dependencies]
1111
anyhow = "1.0"
1212
chrono = { workspace = true, features = ["default"] }
13-
clap = { version = "4.5", features = ["derive"] }
13+
clap = { workspace = true, features = ["derive"] }
1414
freenet-stdlib = { version = "0.1.24", features = ["net"] }
1515
freenet-ping-types = { path = "../types", features = ["std", "clap"] }
1616
futures = "0.3.31"

apps/freenet-ping/app/src/ping_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::HashMap;
22
use std::time::{Duration, Instant};
33

4-
use chrono::{DateTime, Utc};
4+
use freenet_ping_types::chrono::{DateTime, Utc};
55
use freenet_ping_types::{Ping, PingContractOptions};
66
use freenet_stdlib::client_api::{
77
ClientRequest, ContractRequest, ContractResponse, HostResponse, WebApi,

apps/freenet-ping/app/tests/common/mod.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@ use futures::{future::BoxFuture, FutureExt};
1919
use rand::{random, Rng, SeedableRng};
2020
use std::io::{Read, Write};
2121
use std::process::{Child, Command, Stdio};
22-
use std::sync::{Arc, LazyLock};
22+
use std::sync::{Arc, LazyLock, Mutex};
2323
use std::{
2424
collections::HashSet,
2525
io,
2626
net::{Ipv4Addr, SocketAddr, TcpListener},
2727
path::{Path, PathBuf},
28-
sync::Mutex,
2928
time::Duration,
3029
};
30+
31+
/// Global lock to prevent concurrent contract compilation which causes race conditions
32+
static COMPILE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
3133
use tokio::{select, time::sleep};
3234
use tokio_tungstenite::connect_async;
3335
use tracing::{info, span, Instrument, Level};
@@ -36,12 +38,6 @@ use serde::{Deserialize, Serialize};
3638

3739
const TARGET_DIR_VAR: &str = "CARGO_TARGET_DIR";
3840

39-
pub static RNG: LazyLock<Mutex<rand::rngs::StdRng>> = LazyLock::new(|| {
40-
Mutex::new(rand::rngs::StdRng::from_seed(
41-
*b"0102030405060708090a0b0c0d0e0f10",
42-
))
43-
});
44-
4541
#[derive(Debug)]
4642
pub struct PresetConfig {
4743
pub temp_dir: tempfile::TempDir,
@@ -58,7 +54,6 @@ pub fn get_free_socket_addr() -> Result<SocketAddr> {
5854
}
5955

6056
#[allow(clippy::too_many_arguments)]
61-
#[allow(clippy::await_holding_lock)]
6257
pub async fn base_node_test_config(
6358
is_gateway: bool,
6459
gateways: Vec<String>,
@@ -68,7 +63,13 @@ pub async fn base_node_test_config(
6863
base_tmp_dir: Option<&Path>,
6964
blocked_addresses: Option<Vec<SocketAddr>>,
7065
) -> Result<(ConfigArgs, PresetConfig)> {
71-
let mut rng = RNG.lock().unwrap();
66+
// Create RNG seeded from test name for reproducibility while maintaining isolation
67+
use std::hash::{Hash, Hasher};
68+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
69+
data_dir_suffix.hash(&mut hasher);
70+
let seed = hasher.finish();
71+
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
72+
7273
base_node_test_config_with_rng(
7374
is_gateway,
7475
gateways,
@@ -83,15 +84,15 @@ pub async fn base_node_test_config(
8384
}
8485

8586
#[allow(clippy::too_many_arguments)]
86-
pub async fn base_node_test_config_with_rng(
87+
pub async fn base_node_test_config_with_rng<R: Rng>(
8788
is_gateway: bool,
8889
gateways: Vec<String>,
8990
public_port: Option<u16>,
9091
ws_api_port: u16,
9192
data_dir_suffix: &str,
9293
base_tmp_dir: Option<&Path>,
9394
blocked_addresses: Option<Vec<SocketAddr>>,
94-
rng: &mut rand::rngs::StdRng,
95+
rng: &mut R,
9596
) -> Result<(ConfigArgs, PresetConfig)> {
9697
if is_gateway {
9798
assert!(public_port.is_some());
@@ -146,13 +147,19 @@ pub async fn base_node_test_config_with_rng(
146147
}
147148

148149
pub fn gw_config_from_path(port: u16, path: &Path) -> Result<InlineGwConfig> {
149-
gw_config_from_path_with_rng(port, path, &mut RNG.lock().unwrap())
150+
// Create RNG seeded from path for reproducibility while maintaining isolation
151+
use std::hash::{Hash, Hasher};
152+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
153+
path.hash(&mut hasher);
154+
let seed = hasher.finish();
155+
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
156+
gw_config_from_path_with_rng(port, path, &mut rng)
150157
}
151158

152-
pub fn gw_config_from_path_with_rng(
159+
pub fn gw_config_from_path_with_rng<R: Rng>(
153160
port: u16,
154161
path: &Path,
155-
rng: &mut rand::rngs::StdRng,
162+
rng: &mut R,
156163
) -> Result<InlineGwConfig> {
157164
Ok(InlineGwConfig {
158165
address: (Ipv4Addr::LOCALHOST, port).into(),
@@ -303,6 +310,9 @@ fn find_workspace_root() -> PathBuf {
303310
}
304311

305312
fn compile_contract(contract_path: &PathBuf) -> anyhow::Result<Vec<u8>> {
313+
// Acquire lock to prevent concurrent compilations which cause race conditions
314+
let _lock = COMPILE_LOCK.lock().unwrap();
315+
306316
ensure_target_dir_env();
307317
println!("module path: {contract_path:?}");
308318
let target = std::env::var(TARGET_DIR_VAR)

apps/freenet-ping/app/tests/run_app_blocked_peers.rs

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
133133
let path = preset.temp_dir.path().to_path_buf();
134134
(cfg, preset, gw_config_from_path(public_port, &path)?)
135135
};
136+
136137
let ws_api_port_gw = config_gw.ws_api.ws_api_port.unwrap();
137138

138139
// Configure Node1 (blocks Node2)
@@ -167,7 +168,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
167168
tracing::info!("Node 1 blocks: {:?}", node2_network_addr);
168169
tracing::info!("Node 2 blocks: {:?}", node1_network_addr);
169170

170-
// Free socket resources
171+
// Free socket resources before starting nodes
171172
std::mem::drop(network_socket_gw);
172173
std::mem::drop(ws_api_port_socket_gw);
173174
std::mem::drop(network_socket_node1);
@@ -788,17 +789,16 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
788789

789790
/// Standard blocked peers test (baseline)
790791
#[test_log::test(tokio::test(flavor = "multi_thread"))]
791-
#[ignore]
792792
async fn test_ping_blocked_peers() -> TestResult {
793793
run_blocked_peers_test(BlockedPeersConfig {
794794
test_name: "baseline",
795-
initial_wait: Duration::from_secs(10),
796-
operation_timeout: Duration::from_secs(20),
795+
initial_wait: Duration::from_secs(25),
796+
operation_timeout: Duration::from_secs(45),
797797
update_rounds: 3,
798798
update_wait: Duration::from_secs(5),
799-
propagation_wait: Duration::from_secs(8),
799+
propagation_wait: Duration::from_secs(15),
800800
verbose_logging: false,
801-
check_interval: None,
801+
check_interval: Some(Duration::from_secs(4)),
802802
send_refresh_updates: false,
803803
send_final_updates: true,
804804
subscribe_immediately: false,
@@ -808,44 +808,39 @@ async fn test_ping_blocked_peers() -> TestResult {
808808

809809
/// Simple blocked peers test
810810
#[test_log::test(tokio::test(flavor = "multi_thread"))]
811-
#[ignore]
812811
async fn test_ping_blocked_peers_simple() -> TestResult {
813812
run_blocked_peers_test(BlockedPeersConfig {
814813
test_name: "simple",
815-
initial_wait: Duration::from_secs(10),
816-
operation_timeout: Duration::from_secs(15),
817-
update_rounds: 1, // Only one round of updates
818-
update_wait: Duration::from_secs(3),
819-
propagation_wait: Duration::from_secs(10), // Longer wait for simpler flow
814+
initial_wait: Duration::from_secs(25),
815+
operation_timeout: Duration::from_secs(45),
816+
update_rounds: 1,
817+
update_wait: Duration::from_secs(5),
818+
propagation_wait: Duration::from_secs(15),
820819
verbose_logging: false,
821-
check_interval: None,
820+
check_interval: Some(Duration::from_secs(4)),
822821
send_refresh_updates: false,
823822
send_final_updates: false,
824-
subscribe_immediately: true,
823+
subscribe_immediately: false,
825824
})
826825
.await
827826
}
828827

829-
// Note: Redundant tests (optimized, improved, debug, reliable) were removed
830-
// as they only varied in non-functional aspects like timeouts and logging
831-
832828
/// Solution/reference implementation for blocked peers
833829
// TODO-MUST-FIX: WebSocket connection reset during teardown - see issue #2108
834830
// Test passes functionally (PUT/GET/Subscribe/state propagation all work) but
835831
// fails with "Connection reset without closing handshake" during cleanup.
836832
// Likely a test teardown race rather than functional bug.
837833
#[test_log::test(tokio::test(flavor = "multi_thread"))]
838-
#[ignore]
839834
async fn test_ping_blocked_peers_solution() -> TestResult {
840835
run_blocked_peers_test(BlockedPeersConfig {
841836
test_name: "solution",
842-
initial_wait: Duration::from_secs(12),
843-
operation_timeout: Duration::from_secs(25),
837+
initial_wait: Duration::from_secs(25),
838+
operation_timeout: Duration::from_secs(60),
844839
update_rounds: 2,
845-
update_wait: Duration::from_secs(4),
846-
propagation_wait: Duration::from_secs(12),
840+
update_wait: Duration::from_secs(6),
841+
propagation_wait: Duration::from_secs(20),
847842
verbose_logging: false,
848-
check_interval: Some(Duration::from_secs(3)), // Regular check intervals
843+
check_interval: Some(Duration::from_secs(5)),
849844
send_refresh_updates: true,
850845
send_final_updates: true,
851846
subscribe_immediately: true,

apps/freenet-ping/types/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ humantime = "2"
1717
humantime-serde = "1"
1818
serde = { version = "1", features = ["derive"] }
1919
chrono = { workspace = true, features = ["serde"] }
20-
clap = { version = "4", features = ["derive"], optional = true }
20+
clap = { workspace = true, features = ["derive"], optional = true }
2121

2222
freenet-stdlib = { workspace = true }

crates/core/benches/transport_perf.rs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,15 +1284,13 @@ mod experimental_combined {
12841284
});
12851285

12861286
// Sender: serialize, encrypt, send to channel
1287-
let mut nonce_counter = 0u64;
1288-
for _ in 0..PACKET_COUNT {
1287+
for nonce_counter in 0u64..PACKET_COUNT as u64 {
12891288
// Allocate buffer for packet
12901289
let mut packet = vec![0u8; size + 28]; // +28 for nonce+tag
12911290

12921291
// Create nonce
12931292
let mut nonce = [0u8; 12];
12941293
nonce[4..].copy_from_slice(&nonce_counter.to_le_bytes());
1295-
nonce_counter += 1;
12961294

12971295
// Copy nonce to packet
12981296
packet[..12].copy_from_slice(&nonce);
@@ -1493,24 +1491,19 @@ mod experimental_syscall_batching {
14931491
let rt = tokio::runtime::Handle::current();
14941492
let mut batch = Vec::with_capacity(BATCH_SIZE);
14951493

1496-
loop {
1497-
// Try to receive up to BATCH_SIZE packets
1498-
match rt.block_on(rx.recv()) {
1499-
Some(packet) => {
1500-
batch.push(packet);
1501-
// Drain available packets up to batch size
1502-
while batch.len() < BATCH_SIZE {
1503-
match rx.try_recv() {
1504-
Ok(p) => batch.push(p),
1505-
Err(_) => break,
1506-
}
1507-
}
1508-
// Send batch
1509-
for packet in batch.drain(..) {
1510-
socket.send(&packet).unwrap();
1511-
}
1494+
// Try to receive up to BATCH_SIZE packets
1495+
while let Some(packet) = rt.block_on(rx.recv()) {
1496+
batch.push(packet);
1497+
// Drain available packets up to batch size
1498+
while batch.len() < BATCH_SIZE {
1499+
match rx.try_recv() {
1500+
Ok(p) => batch.push(p),
1501+
Err(_) => break,
15121502
}
1513-
None => break,
1503+
}
1504+
// Send batch
1505+
for packet in batch.drain(..) {
1506+
socket.send(&packet).unwrap();
15141507
}
15151508
}
15161509
});

0 commit comments

Comments
 (0)