Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions integration/rust/tests/integration/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ async fn update_moves_row_between_shards() {
assert_eq!(count_on_shard(&pool, 0, 1).await, 1, "row on shard 0");
assert_eq!(count_on_shard(&pool, 1, 1).await, 0, "no row on shard 1");

let mut txn = pool.begin().await.unwrap();
let update = format!("UPDATE {TEST_TABLE} SET id = 11 WHERE id = 1");
let result = pool.execute(update.as_str()).await.expect("rewrite update");
let result = txn.execute(update.as_str()).await.expect("rewrite update");
assert_eq!(result.rows_affected(), 1, "exactly one row updated");
txn.commit().await.unwrap();

assert_eq!(
count_on_shard(&pool, 0, 1).await,
Expand Down Expand Up @@ -195,8 +197,10 @@ async fn update_rejects_multiple_rows() {
.await
.expect("insert second row");

let mut txn = pool.begin().await.unwrap();

let update = format!("UPDATE {TEST_TABLE} SET id = 11 WHERE id IN (1, 2)");
let err = pool
let err = txn
.execute(update.as_str())
.await
.expect_err("expected multi-row rewrite to fail");
Expand All @@ -206,10 +210,11 @@ async fn update_rejects_multiple_rows() {
assert!(
db_err
.message()
.contains("updating multiple rows is not supported when updating the sharding key"),
.contains("sharding key update changes more than one row (2)"),
"unexpected error message: {}",
db_err.message()
);
txn.rollback().await.unwrap();

assert_eq!(
count_on_shard(&pool, 0, 1).await,
Expand All @@ -231,7 +236,7 @@ async fn update_rejects_multiple_rows() {
}

#[tokio::test]
async fn update_rejects_transactions() {
async fn update_expects_transactions() {
let admin = admin_sqlx().await;
let _guard = RewriteConfigGuard::enable(admin.clone()).await;

Expand All @@ -246,26 +251,23 @@ async fn update_rejects_transactions() {
.expect("insert initial row");

let mut conn = pool.acquire().await.expect("acquire connection");
conn.execute("BEGIN").await.expect("begin transaction");

let update = format!("UPDATE {TEST_TABLE} SET id = 11 WHERE id = 1");
let err = conn
.execute(update.as_str())
.await
.expect_err("rewrite inside transaction must fail");
.expect_err("sharding key update must be executed inside a transaction");
let db_err = err
.as_database_error()
.expect("expected database error from proxy");
assert!(
db_err
.message()
.contains("shard key rewrites must run outside explicit transactions"),
.contains("sharding key update must be executed inside a transaction"),
"unexpected error message: {}",
db_err.message()
);

conn.execute("ROLLBACK").await.ok();

drop(conn);

assert_eq!(count_on_shard(&pool, 0, 1).await, 1, "row still on shard 0");
Expand Down
11 changes: 10 additions & 1 deletion integration/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ done
for db in pgdog shard_0 shard_1; do
for table in sharded sharded_omni; do
psql -c "DROP TABLE IF EXISTS ${table}" ${db} -U pgdog
psql -c "CREATE TABLE IF NOT EXISTS ${table} (id BIGINT PRIMARY KEY, value TEXT)" ${db} -U pgdog
psql -c "CREATE TABLE IF NOT EXISTS ${table} (
id BIGINT PRIMARY KEY,
value TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
enabled BOOLEAN DEFAULT false,
user_id BIGINT,
region_id INTEGER DEFAULT 10,
country_id SMALLINT DEFAULT 5,
options JSONB DEFAULT '{}'::jsonb
)" ${db} -U pgdog
done

psql -c "CREATE TABLE IF NOT EXISTS sharded_varchar (id_varchar VARCHAR)" ${db} -U pgdog
Expand Down
4 changes: 2 additions & 2 deletions pgdog-config/src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use serde::{Deserialize, Serialize};
use std::fmt;
use std::str::FromStr;

#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "lowercase")]
pub enum RewriteMode {
Ignore,
Error,
Rewrite,
Ignore,
}

impl Default for RewriteMode {
Expand Down
24 changes: 24 additions & 0 deletions pgdog/src/backend/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2481,4 +2481,28 @@ pub mod test {
"expected re-sync after RESET ALL cleared client_params"
);
}

#[tokio::test]
async fn test_error_decoding() {
let mut server = test_server().await;
let err = server
.execute(Query::new("SELECT * FROM test_error_decoding"))
.await
.expect_err("expected this query to fail");
assert!(
matches!(err, Error::ExecutionError(_)),
"expected execution error"
);
if let Error::ExecutionError(err) = err {
assert_eq!(
err.message,
"relation \"test_error_decoding\" does not exist"
);
assert_eq!(err.severity, "ERROR");
assert_eq!(err.code, "42P01");
assert_eq!(err.context, None);
assert_eq!(err.routine, Some("parserOpenTable".into())); // Might break in the future.
assert_eq!(err.detail, None);
}
}
}
2 changes: 1 addition & 1 deletion pgdog/src/frontend/client/query_engine/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::*;
impl QueryEngine {
/// Respond to a command sent by the client
/// in a way that won't make it suspicious.
pub async fn fake_command_response(
pub(crate) async fn fake_command_response(
&mut self,
context: &mut QueryEngineContext<'_>,
command: &str,
Expand Down
14 changes: 7 additions & 7 deletions pgdog/src/frontend/client/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,19 @@ impl QueryEngine {
self.pending_explain = None;

let command = self.router.command();
let mut route = command.route().clone();

if let Some(trace) = route.take_explain() {
if let Some(trace) = context
.client_request
.route // Admin commands don't have a route.
.as_mut()
.map(|route| route.take_explain())
.flatten()
{
if config().config.general.expanded_explain {
self.pending_explain = Some(ExplainResponseState::new(trace));
}
}

context.client_request.route = Some(route);

match command {
Command::InternalField { name, value } => {
self.show_internal_value(context, name.clone(), value.clone())
Expand Down Expand Up @@ -248,9 +251,6 @@ impl QueryEngine {
.await?;
}
Command::Copy(_) => self.execute(context).await?,
Command::ShardKeyRewrite(plan) => {
self.shard_key_rewrite(context, *plan.clone()).await?
}
Command::Deallocate => self.deallocate(context).await?,
Command::Discard { extended } => self.discard(context, *extended).await?,
command => self.unknown_command(context, command.clone()).await?,
Expand Down
48 changes: 48 additions & 0 deletions pgdog/src/frontend/client/query_engine/multi_step/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use thiserror::Error;

use crate::net::ErrorResponse;

#[derive(Debug, Error)]
pub enum Error {
#[error("{0}")]
Update(#[from] UpdateError),

#[error("frontend: {0}")]
Frontend(Box<crate::frontend::Error>),

#[error("backend: {0}")]
Backend(#[from] crate::backend::Error),

#[error("rewrite: {0}")]
Rewrite(#[from] crate::frontend::router::parser::rewrite::statement::Error),

#[error("router: {0}")]
Router(#[from] crate::frontend::router::Error),

#[error("{0}")]
Execution(ErrorResponse),

#[error("net: {0}")]
Net(#[from] crate::net::Error),
}

#[derive(Debug, Error)]
pub enum UpdateError {
#[error("sharding key updates are forbidden")]
Disabled,

#[error("sharding key update must be executed inside a transaction")]
TransactionRequired,

#[error("sharding key update intermediate query has no route")]
NoRoute,

#[error("sharding key update changes more than one row ({0})")]
TooManyRows(usize),
}

impl From<crate::frontend::Error> for Error {
fn from(value: crate::frontend::Error) -> Self {
Self::Frontend(Box::new(value))
}
}
40 changes: 40 additions & 0 deletions pgdog/src/frontend/client/query_engine/multi_step/forward_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use fnv::FnvHashSet as HashSet;

use crate::{frontend::ClientRequest, net::Protocol};

#[derive(Debug, Clone)]
pub(crate) struct ForwardCheck {
codes: HashSet<char>,
sent: HashSet<char>,
describe: bool,
}

impl ForwardCheck {
/// Create new forward checker from a client request.
///
/// Will construct a mapping to allow only the messages the client expects through
///
pub(crate) fn new(request: &ClientRequest) -> Self {
Self {
codes: request.iter().map(|m| m.code()).collect(),
describe: request.iter().find(|m| m.code() == 'D').is_some(),
sent: HashSet::default(),
}
}

/// Check if we should forward a particular message to the client.
pub(crate) fn forward(&mut self, code: char) -> bool {
let forward = match code {
'1' => self.codes.contains(&'P'), // ParseComplete
'2' => self.codes.contains(&'B'), // BindComplete
'D' | 'E' => true, // DataRow
'T' => self.describe && !self.sent.contains(&'T') || self.codes.contains(&'Q'),
't' => self.describe && !self.sent.contains(&'t'),
_ => false,
};

self.sent.insert(code);

forward
}
}
18 changes: 8 additions & 10 deletions pgdog/src/frontend/client/query_engine/multi_step/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,30 @@ impl<'a> InsertMulti<'a> {
}

for request in self.requests.iter() {
self.engine.backend.send(request).await?;
self.engine
.backend
.handle_client_request(request, &mut self.engine.router, self.engine.streaming)
.await?;

while self.engine.backend.has_more_messages() {
let message = self.engine.read_server_message(context).await.unwrap();
let message = self.engine.read_server_message(context).await?;

if self.state.forward(&message)? {
self.engine
.process_server_message(context, message)
.await
.unwrap();
self.engine.process_server_message(context, message).await?;
}
}
}

if let Some(cc) = self.state.command_complete(CommandType::Insert) {
self.engine
.process_server_message(context, cc.message()?)
.await
.unwrap();
.await?;
}

if let Some(rfq) = self.state.ready_for_query(context.in_transaction()) {
self.engine
.process_server_message(context, rfq.message()?)
.await
.unwrap();
.await?;
}

Ok(self.state.error())
Expand Down
6 changes: 6 additions & 0 deletions pgdog/src/frontend/client/query_engine/multi_step/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
pub(crate) mod error;
pub mod forward_check;
pub mod insert;
pub mod state;
pub mod update;

pub(crate) use error::{Error, UpdateError};
pub(crate) use forward_check::*;
pub(crate) use insert::InsertMulti;
pub use state::{CommandType, MultiServerState};
pub(crate) use update::UpdateMulti;

#[cfg(test)]
mod test;
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{

pub mod prepared;
pub mod simple;
pub mod update;

async fn truncate_table(table: &str, stream: &mut TcpStream) {
let query = Query::new(format!("TRUNCATE {}", table))
Expand Down
Loading
Loading