Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/backend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,34 @@ impl<'a> Drop for FrontConnectionGuard<'a> {
metrics::front_conn_close(self.cluster);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metrics;

#[test]
fn client_ids_are_monotonic() {
let a = ClientId::new();
let b = ClientId::new();
assert!(b.as_u64() > a.as_u64());
}

#[test]
fn front_connection_guard_updates_metrics() {
let cluster = "guard-cluster";
let initial_current = metrics::front_connections_current(cluster);
let initial_total = metrics::front_connections_total(cluster);

{
let _guard = FrontConnectionGuard::new(cluster);
assert_eq!(
metrics::front_connections_current(cluster),
initial_current + 1
);
}

assert_eq!(metrics::front_connections_current(cluster), initial_current);
assert_eq!(metrics::front_connections_total(cluster), initial_total + 1);
}
}
72 changes: 72 additions & 0 deletions src/backend/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::sync::Arc;

use anyhow::{anyhow, Result};
use async_trait::async_trait;

use crate::backend::client::ClientId;
use crate::backend::pool::{BackendNode, ConnectionPool};

use super::pool::BackendRequest;

/// Abstraction over backend request execution so it can be mocked in tests.
#[async_trait]
pub trait BackendExecutor<T>: Send + Sync
where
T: BackendRequest,
{
/// Dispatch a non-blocking request for the given client.
async fn dispatch(
&self,
node: BackendNode,
client_id: ClientId,
request: T,
) -> Result<T::Response>;

/// Dispatch a request that requires an exclusive backend connection.
async fn dispatch_blocking(&self, node: BackendNode, request: T) -> Result<T::Response>;
}

/// Default executor that proxies calls through the actual connection pool.
pub struct PoolBackendExecutor<T: BackendRequest> {
pool: Arc<ConnectionPool<T>>,
}

impl<T: BackendRequest> PoolBackendExecutor<T> {
pub fn new(pool: Arc<ConnectionPool<T>>) -> Self {
Self { pool }
}

pub fn pool(&self) -> &Arc<ConnectionPool<T>> {
&self.pool
}
}

#[async_trait]
impl<T> BackendExecutor<T> for PoolBackendExecutor<T>
where
T: BackendRequest,
{
async fn dispatch(
&self,
node: BackendNode,
client_id: ClientId,
request: T,
) -> Result<T::Response> {
let response_rx = self.pool.dispatch(node, client_id, request).await?;
match response_rx.await {
Ok(result) => result,
Err(_) => Err(anyhow!("backend session closed unexpectedly")),
}
}

async fn dispatch_blocking(&self, node: BackendNode, request: T) -> Result<T::Response> {
let mut exclusive = self.pool.acquire_exclusive(&node);
let response_rx = exclusive.send(request).await?;
let outcome = response_rx.await;
drop(exclusive);
match outcome {
Ok(result) => result,
Err(_) => Err(anyhow!("backend session closed unexpectedly")),
}
}
}
1 change: 1 addition & 0 deletions src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod client;
pub mod executor;
pub mod pool;
129 changes: 129 additions & 0 deletions src/backend/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,132 @@ impl<'a, T: BackendRequest> Drop for ExclusiveConnection<'a, T> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

#[derive(Default)]
struct TestConnector {
started: AtomicUsize,
}

#[async_trait]
impl Connector<TestRequest> for TestConnector {
async fn run_session(
self: Arc<Self>,
_node: BackendNode,
_cluster: Arc<str>,
mut rx: mpsc::Receiver<SessionCommand<TestRequest>>,
) {
self.started.fetch_add(1, Ordering::SeqCst);
while let Some(cmd) = rx.recv().await {
let _ = cmd.respond_to.send(Ok(cmd.request.payload));
}
}
}

impl TestConnector {
fn started(&self) -> usize {
self.started.load(Ordering::SeqCst)
}
}

#[derive(Clone)]
struct CallRecorder {
values: Arc<Mutex<Vec<String>>>,
}

impl CallRecorder {
fn new() -> Self {
Self {
values: Arc::new(Mutex::new(Vec::new())),
}
}

fn record(&self, value: &str) {
self.values.lock().unwrap().push(value.to_string());
}

fn entries(&self) -> Vec<String> {
self.values.lock().unwrap().clone()
}
}

#[derive(Clone)]
struct TestRequest {
payload: &'static str,
total: CallRecorder,
remote: CallRecorder,
}

impl TestRequest {
fn new(payload: &'static str, total: CallRecorder, remote: CallRecorder) -> Self {
Self {
payload,
total,
remote,
}
}
}

impl BackendRequest for TestRequest {
type Response = &'static str;

fn apply_total_tracker(&mut self, cluster: &str) {
self.total.record(cluster);
}

fn apply_remote_tracker(&mut self, cluster: &str) {
self.remote.record(cluster);
}
}

fn cluster_name() -> Arc<str> {
Arc::<str>::from("cluster-backend-tests".to_string())
}

fn backend_node() -> BackendNode {
BackendNode::new("127.0.0.1:7000".into())
}

#[tokio::test(flavor = "current_thread")]
async fn dispatch_sends_request_and_tracks_cluster() {
let connector = Arc::new(TestConnector::default());
let total = CallRecorder::new();
let remote = CallRecorder::new();
let request = TestRequest::new("ok", total.clone(), remote.clone());
let pool = ConnectionPool::with_slots(cluster_name(), connector.clone(), 2);
let node = backend_node();
let rx = pool
.dispatch(node.clone(), ClientId::new(), request)
.await
.expect("dispatch");
let response = rx.await.expect("oneshot").expect("response");
assert_eq!(response, "ok");
assert_eq!(connector.started(), 1);
assert_eq!(total.entries(), vec!["cluster-backend-tests".to_string()]);
assert_eq!(remote.entries(), vec!["cluster-backend-tests".to_string()]);
}

#[tokio::test(flavor = "current_thread")]
async fn exclusive_connections_are_reused() {
let connector = Arc::new(TestConnector::default());
let pool = ConnectionPool::<TestRequest>::with_slots(cluster_name(), connector.clone(), 1);
let node = backend_node();

{
let _conn = pool.acquire_exclusive(&node);
tokio::task::yield_now().await;
assert_eq!(connector.started(), 1);
}

{
let _conn = pool.acquire_exclusive(&node);
tokio::task::yield_now().await;
assert_eq!(connector.started(), 1);
}
}
}
Loading