Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions grpc/src/client/name_resolution/dns/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use crate::rt::Runtime;
use crate::rt::Sleep;
use crate::rt::TaskHandle;
use crate::rt::TcpOptions;
use crate::rt::default_runtime;
use crate::rt::tokio::TokioRuntime;
use crate::rt::tracker::TrackedRuntime;

const DEFAULT_TEST_SHORT_TIMEOUT: Duration = Duration::from_millis(10);

Expand Down Expand Up @@ -178,9 +178,10 @@ pub(crate) async fn dns_basic() {
let builder = global_registry().get("dns").unwrap();
let target = &"dns:///localhost:1234".parse().unwrap();
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
runtime: GrpcRuntime::new(rt),
work_scheduler: work_scheduler.clone(),
};
let mut resolver = builder.build(target, opts);
Expand All @@ -192,6 +193,9 @@ pub(crate) async fn dns_basic() {
// A successful endpoint update should be received.
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);

drop(resolver);
waiter.wait_for_tasks().await;
}

#[tokio::test]
Expand All @@ -200,9 +204,10 @@ pub(crate) async fn invalid_target() {
let builder = global_registry().get("dns").unwrap();
let target = &"dns:///:1234".parse().unwrap();
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
runtime: GrpcRuntime::new(rt),
work_scheduler: work_scheduler.clone(),
};
let mut resolver = builder.build(target, opts);
Expand All @@ -220,6 +225,9 @@ pub(crate) async fn invalid_target() {
.unwrap()
.contains(&target.to_string())
);

drop(resolver);
waiter.wait_for_tasks().await;
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -277,16 +285,17 @@ pub(crate) async fn dns_lookup_error() {
let builder = global_registry().get("dns").unwrap();
let target = &"dns:///grpc.io:1234".parse().unwrap();
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let runtime = FakeRuntime {
let fake_rt = FakeRuntime {
inner: TokioRuntime::default(),
dns: FakeDns {
latency: Duration::from_secs(0),
lookup_result: Err("test_error".to_string()),
},
};
let (rt, waiter) = TrackedRuntime::new(fake_rt);
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: GrpcRuntime::new(runtime),
runtime: GrpcRuntime::new(rt),
work_scheduler: work_scheduler.clone(),
};
let mut resolver = builder.build(target, opts);
Expand All @@ -298,22 +307,26 @@ pub(crate) async fn dns_lookup_error() {
// An error endpoint update should be received.
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap_err().contains("test_error"));

drop(resolver);
waiter.wait_for_tasks().await;
}

#[tokio::test]
pub(crate) async fn dns_lookup_timeout() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let runtime = FakeRuntime {
let fake_dns = FakeDns {
latency: Duration::from_secs(20),
lookup_result: Ok(Vec::new()),
};
let fake_rt = FakeRuntime {
inner: TokioRuntime::default(),
dns: FakeDns {
latency: Duration::from_secs(20),
lookup_result: Ok(Vec::new()),
},
dns: fake_dns.clone(),
};
let dns_client = runtime.dns.clone();
let (rt, waiter) = TrackedRuntime::new(fake_rt);
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: GrpcRuntime::new(runtime),
runtime: GrpcRuntime::new(rt),
work_scheduler: work_scheduler.clone(),
};
let dns_opts = DnsOptions {
Expand All @@ -323,7 +336,7 @@ pub(crate) async fn dns_lookup_timeout() {
host: "grpc.io".to_string(),
port: 1234,
};
let mut resolver = DnsResolver::new(Box::new(dns_client), opts, dns_opts);
let mut resolver = DnsResolver::new(Box::new(fake_dns), opts, dns_opts);

// Wait for schedule work to be called.
work_rx.recv().await.unwrap();
Expand All @@ -333,14 +346,18 @@ pub(crate) async fn dns_lookup_timeout() {
// An error endpoint update should be received.
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap_err().contains("Timed out"));

drop(resolver);
waiter.wait_for_tasks().await;
}

#[tokio::test]
pub(crate) async fn rate_limit() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
runtime: GrpcRuntime::new(rt),
work_scheduler: work_scheduler.clone(),
};
let dns_client = opts
Expand Down Expand Up @@ -376,14 +393,18 @@ pub(crate) async fn rate_limit() {
}
};
}

drop(resolver);
waiter.wait_for_tasks().await;
}

#[tokio::test]
pub(crate) async fn re_resolution_after_success() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
runtime: GrpcRuntime::new(rt),
work_scheduler: work_scheduler.clone(),
};
let dns_opts = DnsOptions {
Expand Down Expand Up @@ -413,14 +434,18 @@ pub(crate) async fn re_resolution_after_success() {
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);

drop(resolver);
waiter.wait_for_tasks().await;
}

#[tokio::test]
pub(crate) async fn backoff_on_error() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
runtime: GrpcRuntime::new(rt),
work_scheduler: work_scheduler.clone(),
};
let dns_opts = DnsOptions {
Expand Down Expand Up @@ -472,4 +497,6 @@ pub(crate) async fn backoff_on_error() {
println!("No event received from resolver.");
}
};
drop(resolver);
waiter.wait_for_tasks().await;
}
16 changes: 13 additions & 3 deletions grpc/src/client/transport/tonic/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ use crate::metadata::AsciiMetadataKey;
use crate::metadata::MetadataMap;
use crate::rt::GrpcRuntime;
use crate::rt::tokio::TokioRuntime;
use crate::rt::tracker::TrackedRuntime;

#[derive(Debug)]
struct MockCallCredentials {
Expand Down Expand Up @@ -156,10 +157,11 @@ pub(crate) async fn tonic_transport_rpc() {
authority: Authority::new("localhost".to_string(), None),
handshake_info: ClientHandshakeInfo::default(),
};
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let (conn, _sec_info, mut disconnection_listener) = builder
.dyn_connect(
addr.to_string(),
GrpcRuntime::new(TokioRuntime::default()),
GrpcRuntime::new(rt),
&securty_opts,
&config,
)
Expand Down Expand Up @@ -224,6 +226,8 @@ pub(crate) async fn tonic_transport_rpc() {
.unwrap();
assert_eq!(res, Ok(()));
server_handle.await.unwrap();
drop(conn);
waiter.wait_for_tasks().await;
}

#[tokio::test]
Expand Down Expand Up @@ -673,10 +677,11 @@ async fn tonic_transport_invalid_base64_headers() {
authority: Authority::new("localhost".to_string(), None),
handshake_info: ClientHandshakeInfo::default(),
};
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let (conn, _sec_info, _disconnection_listener) = builder
.dyn_connect(
addr.to_string(),
GrpcRuntime::new(TokioRuntime::default()),
GrpcRuntime::new(rt),
&securty_opts,
&config,
)
Expand Down Expand Up @@ -715,6 +720,8 @@ async fn tonic_transport_invalid_base64_headers() {

shutdown_notify.notify_one();
server_handle.await.unwrap();
drop(conn);
waiter.wait_for_tasks().await;
}

#[tokio::test]
Expand Down Expand Up @@ -748,10 +755,11 @@ async fn tonic_transport_recv_drop_cancels_send() {
authority: Authority::new("localhost".to_string(), None),
handshake_info: ClientHandshakeInfo::default(),
};
let (rt, waiter) = TrackedRuntime::new(TokioRuntime::default());
let (conn, _sec_info, _disconnection_listener) = builder
.dyn_connect(
addr.to_string(),
GrpcRuntime::new(TokioRuntime::default()),
GrpcRuntime::new(rt),
&securty_opts,
&config,
)
Expand Down Expand Up @@ -781,6 +789,8 @@ async fn tonic_transport_recv_drop_cancels_send() {

shutdown_notify.notify_one();
server_handle.await.unwrap();
drop(conn);
waiter.wait_for_tasks().await;
}

struct WrappedEchoRequest(EchoRequest);
Expand Down
2 changes: 2 additions & 0 deletions grpc/src/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use crate::private;
pub(crate) mod hyper_wrapper;
#[cfg(feature = "_runtime-tokio")]
pub(crate) mod tokio;
#[cfg(test)]
pub(crate) mod tracker;

pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub type BoxedTaskHandle = Box<dyn TaskHandle>;
Expand Down
Loading
Loading