diff --git a/grpc/src/client/channel.rs b/grpc/src/client/channel.rs index 6a49c26d5..7fbb21ade 100644 --- a/grpc/src/client/channel.rs +++ b/grpc/src/client/channel.rs @@ -28,9 +28,7 @@ use std::mem; use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; -use std::time::Duration; use std::time::Instant; -use std::vec; use serde_json::json; use tokio::sync::mpsc; @@ -39,7 +37,6 @@ use url::Url; // NOTE: http::Uri requires non-empty authority portion of URI use crate::StatusCodeError; use crate::StatusError; -use crate::attributes::Attributes; use crate::client::CallOptions; use crate::client::ConnectivityState; use crate::client::DynInvoke; @@ -81,75 +78,13 @@ use crate::core::RequestHeaders; use crate::credentials::ChannelCredentials; use crate::credentials::client::ClientHandshakeInfo; use crate::credentials::common::Authority; -use crate::credentials::dyn_wrapper::DynChannelCredentials; use crate::rt; use crate::rt::GrpcEndpoint; use crate::rt::GrpcRuntime; use crate::rt::default_runtime; -#[non_exhaustive] -pub struct ChannelOptions { - pub transport_options: Attributes, // ? - pub channel_authority: Option, - pub connection_backoff: Option, - pub default_service_config: Option, - pub disable_proxy: bool, - pub disable_service_config_lookup: bool, - pub disable_health_checks: bool, - pub max_retry_memory: u32, // ? - pub idle_timeout: Duration, - // TODO: pub transport_registry: Option, - // TODO: pub name_resolver_registry: Option, - // TODO: pub lb_policy_registry: Option, - - // Typically we allow settings at the channel level that impact all RPCs, - // but can also be set per-RPC. E.g.s: - // - // - interceptors - // - user-agent string override - // - max message sizes - // - max retry/hedged attempts - // - disable retry - // - // In gRPC-Go, we can express CallOptions as DialOptions, which is a nice - // pattern: https://pkg.go.dev/google.golang.org/grpc#WithDefaultCallOptions - // - // To do this in rust, all optional behavior for a request would need to be - // expressed through a trait that applies a mutation to a request. We'd - // apply all those mutations before the user's options so the user's options - // would override the defaults, or so the defaults would occur first. - pub default_request_extensions: Vec>, // ?? -} - -impl Default for ChannelOptions { - fn default() -> Self { - Self { - transport_options: Attributes::default(), - channel_authority: None, - connection_backoff: None, - default_service_config: None, - disable_proxy: false, - disable_service_config_lookup: false, - disable_health_checks: false, - max_retry_memory: 8 * 1024 * 1024, // 8MB -- ??? - idle_timeout: Duration::from_secs(30 * 60), - default_request_extensions: vec![], - } - } -} - -impl ChannelOptions { - pub fn transport_options(self, transport_options: TODO) -> Self { - todo!(); // add to existing options. - } - pub fn override_authority(self, authority: impl Into) -> Self { - Self { - channel_authority: Some(authority.into()), - ..self - } - } - // etc -} +pub struct MissingOption; +pub struct PresentOption(pub T); // All of Channel needs to be thread-safe. Arc? Or give out // Arc from constructor? @@ -159,35 +94,16 @@ pub struct Channel { } impl Channel { - /// Constructs a new gRPC channel. A gRPC channel is a virtual, persistent - /// connection to a service. Channel creation cannot fail, but if the - /// target string is invalid, the returned channel will never connect, and - /// will fail all RPCs. - // TODO: should this return a Result instead? - pub fn new(target: &str, credentials: Arc, options: ChannelOptions) -> Self - where - C: ChannelCredentials, - C::Output>: GrpcEndpoint + 'static, - { - pick_first::reg(); - round_robin::reg(); - dns::reg(); - #[cfg(unix)] - name_resolution::unix::reg(); - #[cfg(target_os = "linux")] - name_resolution::unix_abstract::reg(); - #[cfg(feature = "_runtime-tokio")] - tonic_transport::reg(); - Self { - inner: Arc::new(PersistentChannel::new( - target, - default_runtime(), - options, - credentials as Arc, - )), + pub fn builder( + target: String, + ) -> ChannelBuilder { + ChannelBuilder { + target: target.into(), + credentials: MissingOption, + runtime: MissingOption, + default_service_config: None, } } - // TODO: enter_idle(&self) and graceful_stop()? /// Returns the current state of the channel. If there is no underlying active channel, @@ -221,36 +137,106 @@ impl Invoke for Channel { } } +pub struct ChannelBuilder { + // Required builder parameters: + target: String, + credentials: CrState, + runtime: RtState, // Optional if the Tokio flag is present to default to. + // Optional builder parameters with defaults: + default_service_config: Option, +} + +impl ChannelBuilder { + pub fn with_credentials( + self, + credentials: impl Into>, + ) -> ChannelBuilder>, RtState> { + ChannelBuilder { + target: self.target, + credentials: PresentOption(credentials.into()), + runtime: self.runtime, + default_service_config: self.default_service_config, + } + } + + pub fn with_runtime(self, runtime: R) -> ChannelBuilder> { + ChannelBuilder { + target: self.target, + credentials: self.credentials, + runtime: PresentOption(runtime), + default_service_config: self.default_service_config, + } + } +} + +/// If the Tokio runtime feature is enabled, the channel builder can be built +/// without explicitly providing a runtime, defaulting to the Tokio runtime. +/// This does not prevent a user from providing their own runtime if they wish, +/// and the builder will work as normal. +#[cfg(feature = "_runtime-tokio")] +impl ChannelBuilder>, MissingOption> +where + C: ChannelCredentials + 'static, + C::Output>: GrpcEndpoint + 'static, +{ + pub fn build(self) -> Channel { + self.with_runtime(default_runtime()).build() + } +} + +impl ChannelBuilder>, PresentOption> +where + C: ChannelCredentials + 'static, + C::Output>: GrpcEndpoint + 'static, +{ + // Returns a Channel with the provided parameters. + // This will not fail, but if the target is invalid, the returned channel + // will never connect and all RPCs will fail. Further, even if properly + // configured the channel will begin in IDLE. + pub fn build(self) -> Channel { + init_registers(); + let persistent_channel = PersistentChannel::new( + &self.target, + self.runtime.0, + self.credentials.0, + None, // channel authority + ); + + Channel { + inner: Arc::new(persistent_channel), + } + } +} + // A PersistentChannel represents the static configuration of a channel and an // optional Arc of an ActiveChannel. An ActiveChannel exists whenever the // PersistentChannel is not IDLE. Every channel is IDLE at creation, or after // some configurable timeout elapses without any any RPC activity. struct PersistentChannel { - target: Target, - resolver_builder: Arc, - options: ChannelOptions, active_channel: Mutex>>, - runtime: GrpcRuntime, + target: Target, security_opts: SecurityOpts, + runtime: GrpcRuntime, + resolver_builder: Arc, authority: String, } impl PersistentChannel { - // Channels begin idle so `new()` does not automatically connect. - // ChannelOption contain only optional parameters. - fn new( + fn new( target: &str, runtime: GrpcRuntime, - options: ChannelOptions, - credentials: Arc, - ) -> Self { - // TODO(arjan-bal): Return errors here instead of panicking. - let target = Url::from_str(target).unwrap(); + credentials: Arc, + channel_authority: Option, + ) -> Self + where + C: ChannelCredentials, + C::Output>: GrpcEndpoint + 'static, + { + // TODO(nford) The entire structure of PersistentChannel and the initialization will be revisited with 'channel internals' design. + let target = Url::from_str(target).unwrap(); // TODO(nford): Return always-failing channel instead of panicking. let resolver_builder = global_registry().get(target.scheme()).unwrap(); let target = name_resolution::Target::from(target); - let authority = options - .channel_authority - .clone() + let authority = channel_authority .unwrap_or_else(|| resolver_builder.default_authority(&target).to_owned()); let security_opts = SecurityOpts { credentials, @@ -259,20 +245,19 @@ impl PersistentChannel { }; Self { - target, - resolver_builder, + target: name_resolution::Target::from(target), + resolver_builder: resolver_builder, active_channel: Mutex::default(), - options, - runtime, - security_opts, - authority, + runtime: runtime, + security_opts: security_opts, + authority: authority, } } /// Returns the current state of the channel. If there is no underlying active channel, /// returns Idle. If `connect` is true, will create a new active channel iff none exists. fn state(&self, connect: bool) -> ConnectivityState { - // Done this away to avoid potentially locking twice. + // Done this way to avoid potentially locking twice. let active_channel = if connect { self.get_active_channel() } else { @@ -630,6 +615,20 @@ fn parse_authority(host_and_port: &str) -> Authority { Authority::new(host_and_port.to_string(), None) } +// This registers the default implementations of load balancers, name resolvers, +// and transports. +fn init_registers() { + pick_first::reg(); + round_robin::reg(); + dns::reg(); + #[cfg(unix)] + name_resolution::unix::reg(); + #[cfg(target_os = "linux")] + name_resolution::unix_abstract::reg(); + #[cfg(feature = "_runtime-tokio")] + tonic_transport::reg(); +} + #[cfg(test)] mod tests { use super::*; diff --git a/grpc/src/client/mod.rs b/grpc/src/client/mod.rs index b8ee56c75..d537104b5 100644 --- a/grpc/src/client/mod.rs +++ b/grpc/src/client/mod.rs @@ -39,7 +39,6 @@ pub mod service_config; pub mod stream_util; pub use channel::Channel; -pub use channel::ChannelOptions; pub(crate) mod load_balancing; pub(crate) mod name_resolution;