Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 115 additions & 116 deletions grpc/src/client/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use std::mem;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use std::vec;

use serde_json::json;
use tokio::sync::mpsc;
Expand All @@ -39,7 +37,6 @@ use url::Url; // NOTE: http::Uri requires non-empty authority portion of URI

use crate::StatusCodeError;
use crate::StatusError;
use crate::attributes::Attributes;
use crate::client::CallOptions;
use crate::client::ConnectivityState;
use crate::client::DynInvoke;
Expand Down Expand Up @@ -81,75 +78,13 @@ use crate::core::RequestHeaders;
use crate::credentials::ChannelCredentials;
use crate::credentials::client::ClientHandshakeInfo;
use crate::credentials::common::Authority;
use crate::credentials::dyn_wrapper::DynChannelCredentials;
use crate::rt;
use crate::rt::GrpcEndpoint;
use crate::rt::GrpcRuntime;
use crate::rt::default_runtime;

#[non_exhaustive]
pub struct ChannelOptions {
pub transport_options: Attributes, // ?
pub channel_authority: Option<String>,
pub connection_backoff: Option<TODO>,
pub default_service_config: Option<String>,
pub disable_proxy: bool,
pub disable_service_config_lookup: bool,
pub disable_health_checks: bool,
pub max_retry_memory: u32, // ?
pub idle_timeout: Duration,
// TODO: pub transport_registry: Option<TransportRegistry>,
// TODO: pub name_resolver_registry: Option<ResolverRegistry>,
// TODO: pub lb_policy_registry: Option<LbPolicyRegistry>,

// Typically we allow settings at the channel level that impact all RPCs,
// but can also be set per-RPC. E.g.s:
//
// - interceptors
// - user-agent string override
// - max message sizes
// - max retry/hedged attempts
// - disable retry
//
// In gRPC-Go, we can express CallOptions as DialOptions, which is a nice
// pattern: https://pkg.go.dev/google.golang.org/grpc#WithDefaultCallOptions
//
// To do this in rust, all optional behavior for a request would need to be
// expressed through a trait that applies a mutation to a request. We'd
// apply all those mutations before the user's options so the user's options
// would override the defaults, or so the defaults would occur first.
pub default_request_extensions: Vec<Box<TODO>>, // ??
}

impl Default for ChannelOptions {
fn default() -> Self {
Self {
transport_options: Attributes::default(),
channel_authority: None,
connection_backoff: None,
default_service_config: None,
disable_proxy: false,
disable_service_config_lookup: false,
disable_health_checks: false,
max_retry_memory: 8 * 1024 * 1024, // 8MB -- ???
idle_timeout: Duration::from_secs(30 * 60),
default_request_extensions: vec![],
}
}
}

impl ChannelOptions {
pub fn transport_options(self, transport_options: TODO) -> Self {
todo!(); // add to existing options.
}
pub fn override_authority(self, authority: impl Into<String>) -> Self {
Self {
channel_authority: Some(authority.into()),
..self
}
}
// etc
}
pub struct MissingOption;
pub struct PresentOption<T>(pub T);

// All of Channel needs to be thread-safe. Arc<inner>? Or give out
// Arc<Channel> from constructor?
Expand All @@ -159,35 +94,16 @@ pub struct Channel {
}

impl Channel {
/// Constructs a new gRPC channel. A gRPC channel is a virtual, persistent
/// connection to a service. Channel creation cannot fail, but if the
/// target string is invalid, the returned channel will never connect, and
/// will fail all RPCs.
// TODO: should this return a Result instead?
pub fn new<C>(target: &str, credentials: Arc<C>, options: ChannelOptions) -> Self
where
C: ChannelCredentials,
C::Output<Box<dyn GrpcEndpoint>>: GrpcEndpoint + 'static,
{
pick_first::reg();
round_robin::reg();
dns::reg();
#[cfg(unix)]
name_resolution::unix::reg();
#[cfg(target_os = "linux")]
name_resolution::unix_abstract::reg();
#[cfg(feature = "_runtime-tokio")]
tonic_transport::reg();
Self {
inner: Arc::new(PersistentChannel::new(
target,
default_runtime(),
options,
credentials as Arc<dyn DynChannelCredentials>,
)),
pub fn builder<CrState, RtState>(
target: String,
) -> ChannelBuilder<MissingOption, MissingOption> {
ChannelBuilder {
target: target.into(),
credentials: MissingOption,
runtime: MissingOption,
default_service_config: None,
}
}

// TODO: enter_idle(&self) and graceful_stop()?

/// Returns the current state of the channel. If there is no underlying active channel,
Expand Down Expand Up @@ -221,36 +137,106 @@ impl Invoke for Channel {
}
}

pub struct ChannelBuilder<CrState, RtState> {
// Required builder parameters:
target: String,
credentials: CrState,
runtime: RtState, // Optional if the Tokio flag is present to default to.
// Optional builder parameters with defaults:
default_service_config: Option<String>,
}

impl<CrState, RtState> ChannelBuilder<CrState, RtState> {
pub fn with_credentials<C>(
self,
credentials: impl Into<Arc<C>>,
) -> ChannelBuilder<PresentOption<Arc<C>>, RtState> {
ChannelBuilder {
target: self.target,
credentials: PresentOption(credentials.into()),
runtime: self.runtime,
default_service_config: self.default_service_config,
}
}

pub fn with_runtime<R>(self, runtime: R) -> ChannelBuilder<CrState, PresentOption<R>> {
ChannelBuilder {
target: self.target,
credentials: self.credentials,
runtime: PresentOption(runtime),
default_service_config: self.default_service_config,
}
}
}

/// If the Tokio runtime feature is enabled, the channel builder can be built
/// without explicitly providing a runtime, defaulting to the Tokio runtime.
/// This does not prevent a user from providing their own runtime if they wish,
/// and the builder will work as normal.
#[cfg(feature = "_runtime-tokio")]
impl<C> ChannelBuilder<PresentOption<Arc<C>>, MissingOption>
where
C: ChannelCredentials + 'static,
C::Output<Box<dyn GrpcEndpoint>>: GrpcEndpoint + 'static,
{
pub fn build(self) -> Channel {
self.with_runtime(default_runtime()).build()
}
}

impl<C> ChannelBuilder<PresentOption<Arc<C>>, PresentOption<GrpcRuntime>>
where
C: ChannelCredentials + 'static,
C::Output<Box<dyn GrpcEndpoint>>: GrpcEndpoint + 'static,
{
// Returns a Channel with the provided parameters.
// This will not fail, but if the target is invalid, the returned channel
// will never connect and all RPCs will fail. Further, even if properly
// configured the channel will begin in IDLE.
pub fn build(self) -> Channel {
init_registers();
let persistent_channel = PersistentChannel::new(
&self.target,
self.runtime.0,
self.credentials.0,
None, // channel authority
);

Channel {
inner: Arc::new(persistent_channel),
}
}
}

// A PersistentChannel represents the static configuration of a channel and an
// optional Arc of an ActiveChannel. An ActiveChannel exists whenever the
// PersistentChannel is not IDLE. Every channel is IDLE at creation, or after
// some configurable timeout elapses without any any RPC activity.
struct PersistentChannel {
target: Target,
resolver_builder: Arc<dyn ResolverBuilder>,
options: ChannelOptions,
active_channel: Mutex<Option<Arc<ActiveChannel>>>,
runtime: GrpcRuntime,
target: Target,
security_opts: SecurityOpts,
runtime: GrpcRuntime,
resolver_builder: Arc<dyn ResolverBuilder>,
authority: String,
}

impl PersistentChannel {
// Channels begin idle so `new()` does not automatically connect.
// ChannelOption contain only optional parameters.
fn new(
fn new<C>(
target: &str,
runtime: GrpcRuntime,
options: ChannelOptions,
credentials: Arc<dyn DynChannelCredentials>,
) -> Self {
// TODO(arjan-bal): Return errors here instead of panicking.
let target = Url::from_str(target).unwrap();
credentials: Arc<C>,
channel_authority: Option<String>,
) -> Self
where
C: ChannelCredentials,
C::Output<Box<dyn GrpcEndpoint>>: GrpcEndpoint + 'static,
{
// TODO(nford) The entire structure of PersistentChannel and the initialization will be revisited with 'channel internals' design.
let target = Url::from_str(target).unwrap(); // TODO(nford): Return always-failing channel instead of panicking.
let resolver_builder = global_registry().get(target.scheme()).unwrap();
let target = name_resolution::Target::from(target);
let authority = options
.channel_authority
.clone()
let authority = channel_authority
.unwrap_or_else(|| resolver_builder.default_authority(&target).to_owned());
let security_opts = SecurityOpts {
credentials,
Expand All @@ -259,20 +245,19 @@ impl PersistentChannel {
};

Self {
target,
resolver_builder,
target: name_resolution::Target::from(target),
resolver_builder: resolver_builder,
active_channel: Mutex::default(),
options,
runtime,
security_opts,
authority,
runtime: runtime,
security_opts: security_opts,
authority: authority,
}
}

/// Returns the current state of the channel. If there is no underlying active channel,
/// returns Idle. If `connect` is true, will create a new active channel iff none exists.
fn state(&self, connect: bool) -> ConnectivityState {
// Done this away to avoid potentially locking twice.
// Done this way to avoid potentially locking twice.
let active_channel = if connect {
self.get_active_channel()
} else {
Expand Down Expand Up @@ -630,6 +615,20 @@ fn parse_authority(host_and_port: &str) -> Authority {
Authority::new(host_and_port.to_string(), None)
}

// This registers the default implementations of load balancers, name resolvers,
// and transports.
fn init_registers() {
pick_first::reg();
round_robin::reg();
dns::reg();
#[cfg(unix)]
name_resolution::unix::reg();
#[cfg(target_os = "linux")]
name_resolution::unix_abstract::reg();
#[cfg(feature = "_runtime-tokio")]
tonic_transport::reg();
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion grpc/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub mod service_config;
pub mod stream_util;

pub use channel::Channel;
pub use channel::ChannelOptions;

pub(crate) mod load_balancing;
pub(crate) mod name_resolution;
Expand Down