Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Location of the *postgres* database. For example, if you have created a
# blank database locally named `cargo_registry`, this would be
# `postgres://postgres@localhost/cargo_registry`.
export DATABASE_URL=
# `postgres://db_username:db_password@localhost/cargo_registry`.
# On Unix systems, a shorthand of `postgres:///cargo_registry` can be used
# to connect via a local Unix socket, with a db_username equal to your Unix
# account name and does not require a password.
export DATABASE_URL=postgres:///cargo_registry

# Allowed origins - any origins for which you want to allow browser
# access to authenticated endpoints.
Expand All @@ -18,9 +21,8 @@ export SESSION_KEY=badkeyabcdefghijklmnopqrstuvwxyzabcdef
# If you will be running the tests, set this to another database that you
# have created. For example, if your test database is named
# `cargo_registry_test`, this would look something like
# `postgres://postgres@localhost/cargo_registry_test`
# If you don't plan on running the tests, you can leave this blank.
export TEST_DATABASE_URL=
# `postgres://db_username:db_password/cargo_registry_test`
export TEST_DATABASE_URL=postgres:///cargo_registry_test

# Credentials for AWS.
# export AWS_ACCESS_KEY=
Expand Down
7 changes: 6 additions & 1 deletion crates/crates_io_test_db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ impl TemplateDatabase {

#[instrument]
fn new() -> Self {
let base_url: Url = required_var_parsed("TEST_DATABASE_URL").unwrap();
let mut base_url: Url = required_var_parsed("TEST_DATABASE_URL").unwrap();

if base_url.host().is_none() && !base_url.query_pairs().any(|(key, _)| key == "host") {
// Default to a Unix socket if no hostname is provided.
base_url.set_host(Some("%2Frun%2Fpostgresql")).unwrap();
}

let prefix = base_url.path().strip_prefix('/');
let prefix = prefix.expect("failed to parse database name").to_string();
Expand Down
12 changes: 7 additions & 5 deletions docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,13 @@ linking with `cc` failed: exit code: 1``, you're probably missing some

##### Environment variables

Copy the `.env.sample` file to `.env`. Modify the settings as appropriate;
minimally you'll need to specify or modify the value of the `DATABASE_URL` var.
Try using `postgres://postgres@localhost/cargo_registry` first.
Copy the `.env.sample` file to `.env` and then modify `.env` as appropriate. On
Unix systems, the default configuration will use a local Unix socket which does
not require setting a password for the database user.

> If that doesn't work, change this by filling in this template with the
On other platforms, or if connecting to your database over IP:

> Change this by filling in this template with the
> appropriate values where there are `[]`s:
>
> ```text
Expand Down Expand Up @@ -432,7 +434,7 @@ In your `.env` file, set `TEST_DATABASE_URL` to a value that's the same as
connection will be used to create new databases for the tests, with names
prefixed with the database name from `TEST_DATABASE_URL`.

Example: `postgres://postgres@localhost/cargo_registry_test`.
Example: `postgres:///cargo_registry_test`.

Create the test database by running:

Expand Down
5 changes: 5 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ pub async fn oneoff_connection() -> anyhow::Result<AsyncPgConnection> {
pub fn connection_url(config: &config::DbPoolConfig) -> String {
let mut url = Url::parse(config.url.expose_secret()).expect("Invalid database URL");

// Support `postgres:///db_name` shorthand for easier local development.
if url.host().is_none() {
maybe_append_url_param(&mut url, "host", "/run/postgresql");
}
Comment on lines +29 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I'm okay with this default for the testing setup, I'm not sure we want to use the same for production. Is there anything preventing you from specifying /run/postgresql directly in the DATABASE_URL env var?


if config.enforce_tls {
maybe_append_url_param(&mut url, "sslmode", "require");
}
Expand Down
178 changes: 112 additions & 66 deletions src/tests/util/chaosproxy.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,44 @@
use anyhow::{Context, anyhow};
use std::net::SocketAddr;
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt as _;
use std::str::FromStr as _;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};

use anyhow::{Context, anyhow};
use futures_util::FutureExt as _;
use tempfile::TempDir;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpStream, UnixListener, UnixStream};
use tokio::sync::broadcast::Sender;
use tokio_postgres::Config;
use tokio_postgres::config::Host;
use tracing::{debug, error};
use url::Url;

pub(crate) struct ChaosProxy {
address: SocketAddr,
backend_address: SocketAddr,

socket_dir: TempDir,
backend_config: Config,
break_networking_send: Sender<()>,
restore_networking_send: Sender<()>,
}

impl ChaosProxy {
pub(crate) async fn new(backend_address: SocketAddr) -> anyhow::Result<Arc<Self>> {
debug!("Creating ChaosProxy for {backend_address}");
pub(crate) async fn new(backend_config: Config) -> anyhow::Result<Arc<Self>> {
debug!(?backend_config, "Creating ChaosProxy");

let directory_permissions = Permissions::from_mode(0o700);
let socket_dir = tempfile::Builder::new()
.permissions(directory_permissions)
.tempdir()?;
let socket_path = socket_dir.path().join(".s.PGSQL.5432");

let listener = TcpListener::bind("127.0.0.1:0").await?;
let address = listener.local_addr()?;
debug!("ChaosProxy listening on {address}");
let listener = UnixListener::bind(&socket_path)?;

let (break_networking_send, _) = tokio::sync::broadcast::channel(16);
let (restore_networking_send, _) = tokio::sync::broadcast::channel(16);

let instance = Arc::new(ChaosProxy {
address,
backend_address,

let instance = Arc::new(Self {
socket_dir,
backend_config,
break_networking_send,
restore_networking_send,
});
Expand All @@ -47,23 +55,29 @@ impl ChaosProxy {
}

pub(crate) async fn proxy_database_url(url: &str) -> anyhow::Result<(Arc<Self>, String)> {
let backend_config =
Config::from_str(url).context("failed to parse database url as config")?;

let mut db_url = Url::parse(url).context("failed to parse database url")?;
let backend_addr = db_url
.socket_addrs(|| Some(5432))
.context("could not resolve database url")?
.first()
.copied()
.ok_or_else(|| anyhow!("the database url does not point to any IP"))?;

let instance = ChaosProxy::new(backend_addr).await?;
let instance = ChaosProxy::new(backend_config).await?;

let host = instance
.socket_dir
.path()
.to_str()
.unwrap()
.replace("/", "%2F");
db_url
.set_ip_host(instance.address.ip())
.map_err(|_| anyhow!("Failed to set IP host on the URL"))?;
.set_host(Some(&host))
.map_err(|e| anyhow!("Failed to set socket host on the URL: {e}"))?;

// Drop any `host=` query params as that would route around our proxy.
let db_url_clone = db_url.clone();
db_url
.set_port(Some(instance.address.port()))
.map_err(|_| anyhow!("Failed to set post on the URL"))?;
.query_pairs_mut()
.clear()
.extend_pairs(db_url_clone.query_pairs().filter(|(key, _)| key != "host"));

debug!("ChaosProxy database URL: {db_url}");

Expand All @@ -82,58 +96,90 @@ impl ChaosProxy {
.context("Failed to send the restore_networking message")
}

async fn server_loop(&self, initial_listener: TcpListener) -> anyhow::Result<()> {
let mut listener = Some(initial_listener);

async fn server_loop(&self, listener: UnixListener) -> anyhow::Result<()> {
let mut is_broken = false;
let mut break_networking_recv = self.break_networking_send.subscribe();
let mut restore_networking_recv = self.restore_networking_send.subscribe();

loop {
if let Some(l) = &listener {
debug!("ChaosProxy waiting for connections");
tokio::select! {
accepted = l.accept() => {
let (stream, address ) = accepted?;
debug!("ChaosProxy accepted connection from {address}");
debug!("ChaosProxy waiting for connections");
tokio::select! {
accepted = listener.accept() => {
let (stream, address) = accepted?;
if is_broken {
debug!("ChaosProxy dropped connection from {address:?}");
} else {
debug!("ChaosProxy accepted connection from {address:?}");
self.accept_connection(stream).await?;
},

_ = break_networking_recv.recv() => {
debug!("ChaosProxy breaking networking");

// Setting the listener to `None` results in the listener being dropped,
// which closes the network port. A new listener will be established when
// networking is restored.
listener = None;
},
};
} else {
debug!("ChaosProxy networking is broken, waiting for restore signal");
let _ = restore_networking_recv.recv().await;
debug!("ChaosProxy restoring networking");
listener = Some(TcpListener::bind(self.address).await?);
}
}
},

_ = break_networking_recv.recv(), if !is_broken => {
debug!("ChaosProxy breaking networking");
is_broken = true;
},
_ = restore_networking_recv.recv(), if is_broken =>{
debug!("ChaosProxy restoring networking");
is_broken = false;
},
};
}
}

async fn accept_connection(&self, accepted: TcpStream) -> anyhow::Result<()> {
async fn accept_connection(&self, accepted: UnixStream) -> anyhow::Result<()> {
let (client_read, client_write) = accepted.into_split();
let (backend_read, backend_write) = TcpStream::connect(&self.backend_address)
.await?
.into_split();

let break_networking_send = self.break_networking_send.clone();
let host = self.backend_config.get_hosts().first().unwrap();
let port = self.backend_config.get_ports().first().unwrap_or(&5432);

let (backend_to_client, client_to_backend) = match &host {
Host::Tcp(hostname) => {
let (backend_read, backend_write) = TcpStream::connect((hostname.as_ref(), *port))
.await?
.into_split();
(
proxy_data(
self.break_networking_send.clone(),
client_read,
backend_write,
)
.boxed(),
proxy_data(
self.break_networking_send.clone(),
backend_read,
client_write,
)
.boxed(),
)
}
Host::Unix(path) => {
let path = path.join(format!(".s.PGSQL.{port}"));
let (backend_read, backend_write) = UnixStream::connect(path).await?.into_split();
(
proxy_data(
self.break_networking_send.clone(),
client_read,
backend_write,
)
.boxed(),
proxy_data(
self.break_networking_send.clone(),
backend_read,
client_write,
)
.boxed(),
)
}
};

tokio::spawn(async move {
if let Err(error) = proxy_data(break_networking_send, client_read, backend_write).await
{
if let Err(error) = backend_to_client.await {
error!(%error, "ChaosProxy connection error");
}
});

let break_networking_send = self.break_networking_send.clone();
tokio::spawn(async move {
if let Err(error) = proxy_data(break_networking_send, backend_read, client_write).await
{
if let Err(error) = client_to_backend.await {
error!(%error, "ChaosProxy connection error");
}
});
Expand All @@ -144,8 +190,8 @@ impl ChaosProxy {

async fn proxy_data(
break_networking_send: Sender<()>,
mut from: OwnedReadHalf,
mut to: OwnedWriteHalf,
mut from: impl AsyncRead + Unpin,
mut to: impl AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let mut break_connections_recv = break_networking_send.subscribe();
let mut buf = [0; 1024];
Expand Down
Loading