Skip to content
Open
4 changes: 3 additions & 1 deletion interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ http-body-util = "0.1"
prost = "0.14"
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {path = "../tonic", features = ["tls-ring"]}
tonic = {path = "../tonic", features = ["tls-ring", "gzip"]}
tonic-prost = {path = "../tonic-prost"}
tower = "0.5"
tracing-subscriber = {version = "0.3"}
Expand All @@ -35,6 +35,8 @@ protobuf = { version = "4.34.0-release" }
tonic-protobuf = {path = "../tonic-protobuf"}
grpc-protobuf = {path = "../grpc-protobuf"}
rustls = { version = "0.23", default-features = false, features = ["ring"] }
base64 = "0.22"


[build-dependencies]
tonic-prost-build = {path = "../tonic-prost-build"}
Expand Down
157 changes: 133 additions & 24 deletions interop/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,28 @@ use tonic::transport::Certificate;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Endpoint;

#[allow(dead_code)]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment which members and why are currently unused or apply #[allow(dead_code)] only to the members that are unused and make a general comment why they are currently unused

#[derive(Debug)]
struct Opts {
use_tls: bool,
test_case: Vec<Testcase>,
codec: Codec,
server_host: String,
server_port: u16,
server_host_override: Option<String>,
use_test_ca: bool,
default_service_account: Option<String>,
oauth_scope: Option<String>,
service_account_key_file: Option<String>,
service_config_json: Option<String>,
additional_metadata: Option<String>,
google_c2p_universe_domain: Option<String>,
soak_iterations: usize,
soak_max_failures: usize,
soak_per_iteration_max_acceptable_latency_ms: u32,
soak_overall_timeout_seconds: Option<u32>,
soak_min_time_ms_between_rpcs: u32,
soak_num_threads: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -50,6 +67,35 @@ impl Opts {
test_case.split(',').map(Testcase::from_str).collect()
})?,
codec: pargs.value_from_str("--codec")?,
server_host: pargs
.opt_value_from_str("--server_host")?
.unwrap_or_else(|| "localhost".to_string()),
server_port: pargs.opt_value_from_str("--server_port")?.unwrap_or(10000),
server_host_override: pargs.opt_value_from_str("--server_host_override")?,
use_test_ca: match pargs.opt_value_from_str::<_, bool>("--use_test_ca") {
Ok(Some(val)) => val,
Ok(None) => true,
Err(_) => true,
},
default_service_account: pargs.opt_value_from_str("--default_service_account")?,
oauth_scope: pargs.opt_value_from_str("--oauth_scope")?,
service_account_key_file: pargs.opt_value_from_str("--service_account_key_file")?,
service_config_json: pargs.opt_value_from_str("--service_config_json")?,
additional_metadata: pargs.opt_value_from_str("--additional_metadata")?,
google_c2p_universe_domain: pargs.opt_value_from_str("--google_c2p_universe_domain")?,
soak_iterations: pargs.opt_value_from_str("--soak_iterations")?.unwrap_or(10),
soak_max_failures: pargs
.opt_value_from_str("--soak_max_failures")?
.unwrap_or(0),
soak_per_iteration_max_acceptable_latency_ms: pargs
.opt_value_from_str("--soak_per_iteration_max_acceptable_latency_ms")?
.unwrap_or(1000),
soak_overall_timeout_seconds: pargs
.opt_value_from_str("--soak_overall_timeout_seconds")?,
soak_min_time_ms_between_rpcs: pargs
.opt_value_from_str("--soak_min_time_ms_between_rpcs")?
.unwrap_or(0),
soak_num_threads: pargs.opt_value_from_str("--soak_num_threads")?.unwrap_or(1),
})
}
}
Expand All @@ -62,53 +108,102 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let test_cases = matches.test_case;

let additional_metadata = if let Some(ref am) = matches.additional_metadata {
Copy link
Copy Markdown

@mswierq mswierq May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Opts should keep Option<MetadataMap>. This parsing code could be implemented as a FromStr trait for MetadataMapWrapper, where MetadataMapWrapper wraps MetadataMap in order to satisfy the orphan rule.

let mut map = tonic::metadata::MetadataMap::new();
for pair in am.split(';') {
if pair.is_empty() {
continue;
}
if let Some(colon_idx) = pair.find(':') {
let (key_str, val_str) = pair.split_at(colon_idx);
let val_str = &val_str[1..]; // strip the leading colon
let key_str = key_str.trim();
let val_str = val_str.trim();

if key_str.ends_with("-bin") {
use base64::Engine;
let decoded_val = base64::engine::general_purpose::STANDARD.decode(val_str)?;
let key = tonic::metadata::BinaryMetadataKey::from_str(key_str)?;
let value = tonic::metadata::MetadataValue::from_bytes(&decoded_val);
map.insert_bin(key, value);
} else {
let key = tonic::metadata::AsciiMetadataKey::from_str(key_str)?;
let value = tonic::metadata::MetadataValue::try_from(val_str)?;
map.insert(key, value);
}
}
}
Some(map)
} else {
None
};

let (mut client, mut unimplemented_client): (
Box<dyn InteropTest>,
Box<dyn InteropTestUnimplemented>,
) = match matches.codec {
Codec::Prost => {
let scheme = if matches.use_tls { "https" } else { "http" };
let mut endpoint = Endpoint::try_from(format!("{scheme}://localhost:10000"))?
let host = &matches.server_host;
let port = matches.server_port;
let mut endpoint = Endpoint::try_from(format!("{scheme}://{host}:{port}"))?
.timeout(Duration::from_secs(5))
.concurrency_limit(30);

if matches.use_tls {
let pem = std::fs::read_to_string("interop/data/ca.pem")?;
let ca = Certificate::from_pem(pem);
endpoint = endpoint.tls_config(
ClientTlsConfig::new()
.ca_certificate(ca)
.domain_name("foo.test.google.fr"),
)?;
let mut tls_config = ClientTlsConfig::new();
if matches.use_test_ca {
let pem = std::fs::read_to_string("interop/data/ca.pem")?;
let ca = Certificate::from_pem(pem);
tls_config = tls_config.ca_certificate(ca);
}
let domain_name = matches
.server_host_override
.as_deref()
.unwrap_or("foo.test.google.fr");
tls_config = tls_config.domain_name(domain_name);
endpoint = endpoint.tls_config(tls_config)?;
}

let channel = endpoint.connect().await?;

let interceptor = interop::client::MetadataInterceptor {
metadata: additional_metadata.unwrap_or_default(),
};
(
Box::new(client_prost::TestClient::new(channel.clone())),
Box::new(client_prost::UnimplementedClient::new(channel)),
Box::new(client_prost::TestClient::new(
tonic::codegen::InterceptedService::new(channel.clone(), interceptor.clone()),
)),
Box::new(client_prost::UnimplementedClient::new(
tonic::codegen::InterceptedService::new(channel, interceptor),
)),
)
}
Codec::Protobuf => {
let host = &matches.server_host;
let port = matches.server_port;
let target_uri = format!("dns:///{host}:{port}");

let channel = if matches.use_tls {
let _ = rustls::crypto::ring::default_provider().install_default();

let pem = std::fs::read_to_string("interop/data/ca.pem")?;
let root_certs = RootCertificates::from_pem(pem);
let creds = RustlsChannelCredendials::new(
GrpcClientTlsConfig::new()
.with_root_certificates_provider(StaticProvider::new(root_certs)),
)?;
let channel_options =
ChannelOptions::default().override_authority("test.test.google.fr");
grpc::client::Channel::new(
"dns:///localhost:10000",
Arc::new(creds),
channel_options,
)
let mut tls_config = GrpcClientTlsConfig::new();
if matches.use_test_ca {
let pem = std::fs::read_to_string("interop/data/ca.pem")?;
let root_certs = RootCertificates::from_pem(pem);
tls_config =
tls_config.with_root_certificates_provider(StaticProvider::new(root_certs));
}
let creds = RustlsChannelCredendials::new(tls_config)?;
let domain_name = matches
.server_host_override
.as_deref()
.unwrap_or("test.test.google.fr");
let channel_options = ChannelOptions::default().override_authority(domain_name);
grpc::client::Channel::new(&target_uri, Arc::new(creds), channel_options)
} else {
grpc::client::Channel::new(
"dns:///localhost:10000",
&target_uri,
Arc::new(LocalChannelCredentials::new()),
ChannelOptions::default(),
)
Expand All @@ -129,6 +224,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

match test_case {
Testcase::EmptyUnary => client.empty_unary(&mut test_results).await,
Testcase::CacheableUnary => client.cacheable_unary(&mut test_results).await,
Testcase::ClientCompressedUnary => {
client.client_compressed_unary(&mut test_results).await
}
Testcase::ServerCompressedUnary => {
client.server_compressed_unary(&mut test_results).await
}
Testcase::LargeUnary => client.large_unary(&mut test_results).await,
Testcase::ClientStreaming => client.client_streaming(&mut test_results).await,
Testcase::ServerStreaming => client.server_streaming(&mut test_results).await,
Expand All @@ -147,6 +249,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await
}
Testcase::CustomMetadata => client.custom_metadata(&mut test_results).await,
Testcase::CancelAfterBegin => client.cancel_after_begin(&mut test_results).await,
Testcase::CancelAfterFirstResponse => {
client.cancel_after_first_response(&mut test_results).await
}
Testcase::TimeoutOnSleepingServer => {
client.timeout_on_sleeping_server(&mut test_results).await
}
_ => unimplemented!(),
}

Expand Down
37 changes: 35 additions & 2 deletions interop/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tonic::transport::{Identity, ServerTlsConfig};
struct Opts {
use_tls: bool,
codec: Codec,
port: u16,
address_type: AddressType,
}

#[derive(Debug)]
Expand All @@ -27,12 +29,36 @@ impl FromStr for Codec {
}
}

#[derive(Debug, Clone, Copy)]
enum AddressType {
Ipv4,
Ipv6,
Ipv4Ipv6,
}

impl FromStr for AddressType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_uppercase().as_str() {
"IPV4" => Ok(AddressType::Ipv4),
"IPV6" => Ok(AddressType::Ipv6),
"IPV4_IPV6" => Ok(AddressType::Ipv4Ipv6),
_ => Err(format!("Invalid address type: {}", s)),
}
}
}

impl Opts {
fn parse() -> Result<Self, pico_args::Error> {
let mut pargs = pico_args::Arguments::from_env();
Ok(Self {
use_tls: pargs.contains("--use_tls"),
codec: pargs.value_from_str("--codec")?,
port: pargs.opt_value_from_str("--port")?.unwrap_or(10000),
address_type: pargs
.opt_value_from_str("--address_type")?
.unwrap_or(AddressType::Ipv4Ipv6),
})
}
}
Expand All @@ -43,7 +69,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {

let matches = Opts::parse()?;

let addr = "127.0.0.1:10000".parse().unwrap();
let host = match matches.address_type {
AddressType::Ipv4 => "127.0.0.1",
AddressType::Ipv6 => "[::1]",
AddressType::Ipv4Ipv6 => "[::]",
};
let addr = format!("{host}:{}", matches.port).parse().unwrap();

let mut builder = Server::builder();

Expand All @@ -58,7 +89,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
match matches.codec {
Codec::Prost => {
let test_service =
server_prost::TestServiceServer::new(server_prost::TestService::default());
server_prost::TestServiceServer::new(server_prost::TestService::default())
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
.send_compressed(tonic::codec::CompressionEncoding::Gzip);
let unimplemented_service = server_prost::UnimplementedServiceServer::new(
server_prost::UnimplementedService::default(),
);
Expand Down
36 changes: 36 additions & 0 deletions interop/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
use crate::TestAssertion;
use tonic::async_trait;

#[derive(Clone)]
pub struct MetadataInterceptor {
pub metadata: tonic::metadata::MetadataMap,
}

impl tonic::service::Interceptor for MetadataInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
for key_and_val in self.metadata.iter() {
match key_and_val {
tonic::metadata::KeyAndValueRef::Ascii(key, val) => {
request.metadata_mut().insert(key.clone(), val.clone());
}
tonic::metadata::KeyAndValueRef::Binary(key, val) => {
request.metadata_mut().insert_bin(key.clone(), val.clone());
}
}
}
Ok(request)
}
}

#[async_trait]
pub trait InteropTest: Send {
async fn empty_unary(&mut self, assertions: &mut Vec<TestAssertion>);
Expand All @@ -22,6 +46,18 @@ pub trait InteropTest: Send {
async fn unimplemented_method(&mut self, assertions: &mut Vec<TestAssertion>);

async fn custom_metadata(&mut self, assertions: &mut Vec<TestAssertion>);

async fn cacheable_unary(&mut self, assertions: &mut Vec<TestAssertion>);

async fn client_compressed_unary(&mut self, assertions: &mut Vec<TestAssertion>);

async fn server_compressed_unary(&mut self, assertions: &mut Vec<TestAssertion>);

async fn cancel_after_begin(&mut self, assertions: &mut Vec<TestAssertion>);

async fn cancel_after_first_response(&mut self, assertions: &mut Vec<TestAssertion>);

async fn timeout_on_sleeping_server(&mut self, assertions: &mut Vec<TestAssertion>);
}

#[async_trait]
Expand Down
Loading
Loading