Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rayon = "1.11"
reqwest = { version = "0.13", features = ["json"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["fs", "sync", "rt", "rt-multi-thread", "macros"] }
tokio = { version = "1", features = ["fs", "sync", "rt", "rt-multi-thread", "macros", "time"] }
tracing = "0.1"
utoipa = "5"

Expand Down
31 changes: 29 additions & 2 deletions src/routing/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ impl SpeedProfile {
#[derive(Debug, Clone)]
pub struct NetworkConfig {
pub overpass_url: String,
pub overpass_endpoints: Vec<String>,
pub overpass_max_retries: usize,
pub overpass_retry_backoff: Duration,
pub cache_dir: PathBuf,
pub connect_timeout: Duration,
pub read_timeout: Duration,
Expand All @@ -104,8 +107,12 @@ pub struct NetworkConfig {

impl Default for NetworkConfig {
fn default() -> Self {
let default_overpass = "https://overpass-api.de/api/interpreter".to_string();
Self {
overpass_url: "https://overpass-api.de/api/interpreter".to_string(),
overpass_url: default_overpass.clone(),
overpass_endpoints: vec![default_overpass],
overpass_max_retries: 2,
overpass_retry_backoff: Duration::from_secs(2),
cache_dir: PathBuf::from(".osm_cache"),
connect_timeout: Duration::from_secs(30),
read_timeout: Duration::from_secs(180),
Expand All @@ -132,7 +139,27 @@ impl NetworkConfig {
}

pub fn overpass_url(mut self, url: impl Into<String>) -> Self {
self.overpass_url = url.into();
let url = url.into();
self.overpass_url = url.clone();
self.overpass_endpoints = vec![url];
self
}

pub fn overpass_endpoints(mut self, urls: Vec<String>) -> Self {
if let Some(primary) = urls.first().cloned() {
self.overpass_url = primary;
self.overpass_endpoints = urls;
}
self
}

pub fn overpass_max_retries(mut self, retries: usize) -> Self {
self.overpass_max_retries = retries;
self
}

pub fn overpass_retry_backoff(mut self, backoff: Duration) -> Self {
self.overpass_retry_backoff = backoff;
self
}

Expand Down
248 changes: 216 additions & 32 deletions src/routing/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use std::collections::HashMap;
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::{mpsc::Sender, Mutex, OwnedMutexGuard};
use tokio::time::sleep;
use tracing::{debug, info};

use super::bbox::BoundingBox;
Expand Down Expand Up @@ -131,8 +133,6 @@ out body;"#,
.build()
.map_err(|e| RoutingError::Network(e.to_string()))?;

info!("Sending request to Overpass API...");

if let Some(tx) = progress {
let _ = tx
.send(RoutingProgress::DownloadingNetwork {
Expand All @@ -142,36 +142,7 @@ out body;"#,
.await;
}

let response = client
.post(&config.overpass_url)
.body(query)
.header("Content-Type", "text/plain")
.send()
.await
.map_err(|e| RoutingError::Network(e.to_string()))?;

info!("Received response: status={}", response.status());

if !response.status().is_success() {
return Err(RoutingError::Network(format!(
"Overpass API returned status {}",
response.status()
)));
}

if let Some(tx) = progress {
let _ = tx
.send(RoutingProgress::DownloadingNetwork {
percent: 25,
bytes: 0,
})
.await;
}

let bytes = response
.bytes()
.await
.map_err(|e| RoutingError::Network(e.to_string()))?;
let bytes = fetch_overpass_bytes(&client, &query, config, progress).await?;

let bytes_len = bytes.len();
if let Some(tx) = progress {
Expand Down Expand Up @@ -467,6 +438,125 @@ out body;"#,
}
}

async fn fetch_overpass_bytes(
client: &reqwest::Client,
query: &str,
config: &NetworkConfig,
progress: Option<&Sender<RoutingProgress>>,
) -> Result<Vec<u8>, RoutingError> {
let endpoints = overpass_endpoints(config);
let mut failures = Vec::new();

for (endpoint_index, endpoint) in endpoints.iter().enumerate() {
for attempt in 0..=config.overpass_max_retries {
info!(
"Sending request to Overpass API endpoint {} attempt {}: {}",
endpoint_index + 1,
attempt + 1,
endpoint
);

let response = client
.post(endpoint)
.body(query.to_owned())
.header("Content-Type", "text/plain")
.send()
.await;

match response {
Ok(response) if response.status().is_success() => {
info!(
"Received successful Overpass response from {} with status {}",
endpoint,
response.status()
);

if let Some(tx) = progress {
let _ = tx
.send(RoutingProgress::DownloadingNetwork {
percent: 25,
bytes: 0,
})
.await;
}

return response
.bytes()
.await
.map(|bytes| bytes.to_vec())
.map_err(|error| {
RoutingError::Network(format!(
"Overpass response body read failed from {} on attempt {}: {}",
endpoint,
attempt + 1,
error
))
});
Comment on lines +483 to +494
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Retry body-read failures instead of aborting after a 200 response

Once an endpoint returns HTTP 200, this branch immediately returns the result of response.bytes().await. If the body download itself times out or the connection is reset mid-stream—which can happen on large Overpass payloads under the configured read_timeout—the function exits here and never reaches the retry loop or the secondary endpoints. In those cases the new resilience logic still fails on the first mirror instead of retrying/failing over.

Useful? React with 👍 / 👎.

}
Ok(response) => {
let status = response.status();
failures.push(format!(
"{} attempt {} returned HTTP {}",
endpoint,
attempt + 1,
status
));

if is_retryable_status(status) && attempt < config.overpass_max_retries {
sleep(retry_backoff(config.overpass_retry_backoff, attempt)).await;
continue;
}

break;
}
Err(error) => {
failures.push(format!(
"{} attempt {} failed: {}",
endpoint,
attempt + 1,
error
));

if is_retryable_error(&error) && attempt < config.overpass_max_retries {
sleep(retry_backoff(config.overpass_retry_backoff, attempt)).await;
continue;
}

break;
}
}
}
}

Err(RoutingError::Network(format!(
"Overpass fetch failed after trying {} endpoint(s): {}",
endpoints.len(),
failures.join("; ")
)))
}

fn overpass_endpoints(config: &NetworkConfig) -> Vec<String> {
if config.overpass_endpoints.is_empty() {
vec![config.overpass_url.clone()]
} else {
config.overpass_endpoints.clone()
Comment on lines +538 to +542
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Respect overpass_url when callers keep the default endpoint list

NetworkConfig still exposes overpass_url as a public field, so downstream code can validly do let mut cfg = NetworkConfig::default(); cfg.overpass_url = mirror.into(); or NetworkConfig { overpass_url: ..., ..Default::default() }. After this change, overpass_endpoints() ignores that override whenever overpass_endpoints is non-empty, and Default now always pre-populates the list with overpass-api.de. That creates a silent regression for existing custom-endpoint users: requests continue going to the default host unless they learn to update both fields.

Useful? React with 👍 / 👎.

}
}

fn retry_backoff(base: Duration, attempt: usize) -> Duration {
base.saturating_mul((attempt + 1) as u32)
}

fn is_retryable_status(status: reqwest::StatusCode) -> bool {
status.is_server_error()
|| status == reqwest::StatusCode::TOO_MANY_REQUESTS
|| status == reqwest::StatusCode::REQUEST_TIMEOUT
}

fn is_retryable_error(error: &reqwest::Error) -> bool {
error.is_timeout() || error.is_connect() || error.is_request()
}

impl RoadNetwork {
#[doc(hidden)]
pub async fn load_or_fetch_simple(bbox: &BoundingBox) -> Result<NetworkRef, RoutingError> {
Expand Down Expand Up @@ -501,13 +591,17 @@ async fn cleanup_in_flight_slot(cache_key: &str, slot: &Arc<Mutex<()>>) {

#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};

use tokio::time::sleep;

use super::*;
use crate::routing::BoundingBox;

fn test_network() -> RoadNetwork {
RoadNetwork::from_test_data(&[(0.0, 0.0), (0.0, 0.01)], &[(0, 1, 60.0, 1_000.0)])
Expand Down Expand Up @@ -586,4 +680,94 @@ mod tests {

assert_eq!(loads.load(Ordering::Relaxed), 1);
}

fn overpass_fixture_json() -> &'static str {
r#"{
"elements": [
{"type": "node", "id": 1, "lat": 39.95, "lon": -75.16},
{"type": "node", "id": 2, "lat": 39.96, "lon": -75.17},
{"type": "way", "id": 10, "nodes": [1, 2], "tags": {"highway": "residential"}}
]
}"#
}

fn spawn_overpass_server(
responses: Vec<(&'static str, &'static str)>,
) -> (String, Arc<AtomicUsize>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = format!(
"http://{}/api/interpreter",
listener.local_addr().expect("listener addr")
);
let requests = Arc::new(AtomicUsize::new(0));
let served = requests.clone();

let handle = thread::spawn(move || {
for (status, body) in responses {
let (mut stream, _) = listener.accept().expect("connection should arrive");
let mut buffer = [0_u8; 4096];
let _ = stream.read(&mut buffer);
let response = format!(
"HTTP/1.1 {}\r\nContent-Length: {}\r\nContent-Type: application/json\r\nConnection: close\r\n\r\n{}",
status,
body.len(),
body
);
stream
.write_all(response.as_bytes())
.expect("response should write");
served.fetch_add(1, Ordering::Relaxed);
}
});

(address, requests, handle)
}

#[tokio::test]
async fn fetch_retries_same_endpoint_until_success() {
let (endpoint, requests, handle) = spawn_overpass_server(vec![
("429 Too Many Requests", r#"{"elements":[]}"#),
("200 OK", overpass_fixture_json()),
]);

let bbox = BoundingBox::try_new(39.94, -75.18, 39.97, -75.15).expect("bbox should build");
let config = NetworkConfig::default()
.overpass_url(endpoint)
.overpass_max_retries(1)
.overpass_retry_backoff(Duration::from_millis(1));

let network = RoadNetwork::fetch(&bbox, &config, None)
.await
.expect("fetch should succeed after retry");

assert_eq!(network.node_count(), 2);
assert_eq!(requests.load(Ordering::Relaxed), 2);
handle.join().expect("server should join");
}

#[tokio::test]
async fn fetch_falls_back_to_second_endpoint() {
let (primary, primary_requests, primary_handle) =
spawn_overpass_server(vec![("503 Service Unavailable", r#"{"elements":[]}"#)]);
let (secondary, secondary_requests, secondary_handle) =
spawn_overpass_server(vec![("200 OK", overpass_fixture_json())]);

let bbox = BoundingBox::try_new(39.94, -75.18, 39.97, -75.15).expect("bbox should build");
let config = NetworkConfig::default()
.overpass_endpoints(vec![primary, secondary])
.overpass_max_retries(0)
.overpass_retry_backoff(Duration::from_millis(1));

let network = RoadNetwork::fetch(&bbox, &config, None)
.await
.expect("fetch should fall back to second endpoint");

assert_eq!(network.node_count(), 2);
assert_eq!(primary_requests.load(Ordering::Relaxed), 1);
assert_eq!(secondary_requests.load(Ordering::Relaxed), 1);
primary_handle.join().expect("primary server should join");
secondary_handle
.join()
.expect("secondary server should join");
}
}
Loading
Loading