diff --git a/integration/python/test_psycopg2.py b/integration/python/test_psycopg2.py new file mode 100644 index 00000000..d3c7fd6a --- /dev/null +++ b/integration/python/test_psycopg2.py @@ -0,0 +1,131 @@ +import psycopg2 +import pytest +from globals import admin + +queries = [ + "SELECT 1", + "CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)", + "INSERT INTO test_conn_reads (id) VALUES (1)", + "INSERT INTO test_conn_reads (id) VALUES (2)", + "INSERT INTO test_conn_reads (id) VALUES (3)", + "SELECT * FROM test_conn_reads WHERE id = 1", + "SELECT * FROM test_conn_reads WHERE id = 2", + "SELECT * FROM test_conn_reads WHERE id = 3", + "SET work_mem TO '4MB'", + "SET work_mem TO '6MB'", + "SET work_mem TO '8MB'", +] + + +@pytest.fixture +def conn_reads(): + return psycopg2.connect( + "host=127.0.0.1 port=6432 user=pgdog password=pgdog " + "options='-c pgdog.role=replica'" + ) + + +@pytest.fixture +def conn_writes(): + return psycopg2.connect( + "host=127.0.0.1 port=6432 user=pgdog password=pgdog " + "options='-c pgdog.role=primary'" + ) + + +@pytest.fixture +def conn_default(): + return psycopg2.connect("host=127.0.0.1 port=6432 user=pgdog password=pgdog") + + +def test_conn_writes(conn_writes): + admin().execute("SET query_parser TO 'off'") + for query in queries: + conn_writes.autocommit = True + cursor = conn_writes.cursor() + cursor.execute(query) + admin().execute("SET query_parser TO 'auto'") + + +def test_conn_reads(conn_reads, conn_writes): + admin().execute("SET query_parser TO 'off'") + + conn_writes.autocommit = True + conn_reads.autocommit = True + + conn_writes.cursor().execute( + "CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)" + ) + + read = False + for query in queries: + cursor = conn_reads.cursor() + try: + cursor.execute(query) + except psycopg2.errors.ReadOnlySqlTransaction: + # Some will succeed because we allow reads + # on the primary. + read = True + admin().execute("SET query_parser TO 'auto'") + + conn_writes.cursor().execute("DROP TABLE IF EXISTS test_conn_reads") + assert read, "expected some queries to hit replicas and fail" + + +def test_transactions_writes(conn_writes): + admin().execute("SET query_parser TO 'off'") + + for query in queries: + conn_writes.cursor().execute(query) + conn_writes.commit() + + admin().execute("SET query_parser TO 'auto'") + + +def test_transactions_reads(conn_reads): + admin().execute("SET query_parser TO 'off'") + read = False + + for query in queries: + try: + conn_reads.cursor().execute(query) + except psycopg2.errors.ReadOnlySqlTransaction: + # Some will succeed because we allow reads + # on the primary. + read = True + conn_reads.commit() + + assert read, "expected some queries to hit replicas and fail" + admin().execute("SET query_parser TO 'auto'") + + +def test_transaction_reads_explicit(conn_reads, conn_writes): + conn_reads.autocommit = True + admin().execute("SET query_parser TO 'off'") + + conn_writes.cursor().execute( + "CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)" + ) + conn_writes.commit() + + cursor = conn_reads.cursor() + + read = False + + for _ in range(15): + cursor.execute("BEGIN") + try: + cursor.execute("INSERT INTO test_conn_reads (id) VALUES (1)") + cursor.execute("COMMIT") + except psycopg2.errors.ReadOnlySqlTransaction: + read = True + cursor.execute("ROLLBACK") + + assert read, "expected some queries to hit replicas and fail" + + for _ in range(15): + cursor.execute("BEGIN READ ONLY") # Won't be parsed, doesn't matter to PgDog + cursor.execute("SELECT 1") + cursor.execute("ROLLBACK") + + admin().execute("SET query_parser TO 'on'") diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index 31a053ff..dc7891fe 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -7,7 +7,7 @@ use tracing::{info, warn}; use crate::sharding::ShardedSchema; use crate::{ EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements, - ReadWriteSplit, RewriteMode, Role, + QueryParserLevel, ReadWriteSplit, RewriteMode, Role, }; use super::database::Database; @@ -293,7 +293,7 @@ impl Config { mappings } - pub fn check(&self) { + pub fn check(&mut self) { // Check databases. let mut duplicate_dbs = HashSet::new(); for database in self.databases.clone() { @@ -317,7 +317,9 @@ impl Config { pooler_mode: Option, role: Role, role_warned: bool, + parser_warned: bool, have_replicas: bool, + sharded: bool, } // Check identical configs. @@ -341,6 +343,20 @@ impl Config { if !existing.have_replicas { existing.have_replicas = database.role == Role::Replica; } + if !existing.sharded { + existing.sharded = database.shard > 0; + } + + if (existing.sharded || existing.have_replicas) + && self.general.query_parser == QueryParserLevel::Off + && !existing.parser_warned + { + existing.parser_warned = true; + warn!( + r#"database "{}" may need the query parser for load balancing/sharding, but it's disabled"#, + database.name + ); + } } else { checks.insert( database.name.clone(), @@ -348,7 +364,9 @@ impl Config { pooler_mode: database.pooler_mode, role: database.role, role_warned: false, + parser_warned: false, have_replicas: database.role == Role::Replica, + sharded: database.shard > 0, }, ); } @@ -381,13 +399,15 @@ impl Config { if !self.general.two_phase_commit && self.rewrite.enabled { if self.rewrite.shard_key == RewriteMode::Rewrite { - warn!("rewrite.shard_key=rewrite will apply non-atomic shard-key rewrites; enabling two_phase_commit is strongly recommended" - ); + warn!( + r#"rewrite.shard_key = "rewrite" may apply non-atomic sharding key rewrites; enabling "two_phase_commit" is strongly recommended"# + ); } if self.rewrite.split_inserts == RewriteMode::Rewrite { - warn!("rewrite.split_inserts=rewrite may commit partial multi-row INSERTs; enabling two_phase_commit is strongly recommended" - ); + warn!( + r#"rewrite.split_inserts = "rewrite" may commit partial multi-row inserts; enabling "two_phase_commit" is strongly recommended"# + ); } } @@ -396,11 +416,16 @@ impl Config { && self.general.read_write_split == ReadWriteSplit::ExcludePrimary { warn!( - r#"database "{}" has no replicas and read_write_split is set to "{}", read queries will not be served"#, + r#"database "{}" has no replicas and "read_write_split" is set to "{}": read queries will be rejected"#, database, self.general.read_write_split ); } } + + if self.general.query_parser_enabled { + warn!(r#""query_parser_enabled" is deprecated, use "query_parser" = "on" instead"#); + self.general.query_parser = QueryParserLevel::On; + } } /// Multi-tenancy is enabled. diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index 66979649..b6611452 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use std::time::Duration; use crate::pooling::ConnectionRecovery; +use crate::QueryParserLevel; use super::auth::{AuthType, PassthoughAuth}; use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy}; @@ -96,6 +97,9 @@ pub struct General { /// Parse Queries override. #[serde(default = "General::query_parser_enabled")] pub query_parser_enabled: bool, + /// Query parser. + #[serde(default)] + pub query_parser: QueryParserLevel, /// Limit on the number of prepared statements in the server cache. #[serde(default = "General::prepared_statements_limit")] pub prepared_statements_limit: usize, @@ -219,6 +223,7 @@ impl Default for General { openmetrics_namespace: Self::openmetrics_namespace(), prepared_statements: Self::prepared_statements(), query_parser_enabled: Self::query_parser_enabled(), + query_parser: QueryParserLevel::default(), prepared_statements_limit: Self::prepared_statements_limit(), query_cache_limit: Self::query_cache_limit(), passthrough_auth: Self::default_passthrough_auth(), diff --git a/pgdog-config/src/sharding.rs b/pgdog-config/src/sharding.rs index 8e3121c4..257f4a4e 100644 --- a/pgdog-config/src/sharding.rs +++ b/pgdog-config/src/sharding.rs @@ -297,3 +297,12 @@ impl ListShards { } } } + +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum QueryParserLevel { + On, + #[default] + Auto, + Off, +} diff --git a/pgdog/src/admin/set.rs b/pgdog/src/admin/set.rs index 20c296f7..167cf3ef 100644 --- a/pgdog/src/admin/set.rs +++ b/pgdog/src/admin/set.rs @@ -145,6 +145,10 @@ impl Command for Set { config.config.general.tls_client_required = Self::from_json(&self.value)?; } + "query_parser" => { + config.config.general.query_parser = Self::from_json(&self.value)?; + } + _ => return Err(Error::Syntax), } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 5cde9c65..7350bbc7 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -1,7 +1,7 @@ //! A collection of replicas and a primary. use parking_lot::{Mutex, RwLock}; -use pgdog_config::{PreparedStatements, Rewrite, RewriteMode}; +use pgdog_config::{PreparedStatements, QueryParserLevel, Rewrite, RewriteMode}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -62,7 +62,7 @@ pub struct Cluster { dry_run: bool, expanded_explain: bool, pub_sub_channel_size: usize, - query_parser_enabled: bool, + query_parser: QueryParserLevel, connection_recovery: ConnectionRecovery, } @@ -130,7 +130,7 @@ pub struct ClusterConfig<'a> { pub dry_run: bool, pub expanded_explain: bool, pub pub_sub_channel_size: usize, - pub query_parser_enabled: bool, + pub query_parser: QueryParserLevel, pub connection_recovery: ConnectionRecovery, pub lsn_check_interval: Duration, } @@ -176,7 +176,7 @@ impl<'a> ClusterConfig<'a> { dry_run: general.dry_run, expanded_explain: general.expanded_explain, pub_sub_channel_size: general.pub_sub_channel_size, - query_parser_enabled: general.query_parser_enabled, + query_parser: general.query_parser, connection_recovery: general.connection_recovery, lsn_check_interval: Duration::from_millis(general.lsn_check_interval), } @@ -208,7 +208,7 @@ impl Cluster { dry_run, expanded_explain, pub_sub_channel_size, - query_parser_enabled, + query_parser, connection_recovery, lsn_check_interval, } = config; @@ -254,7 +254,7 @@ impl Cluster { dry_run, expanded_explain, pub_sub_channel_size, - query_parser_enabled, + query_parser, connection_recovery, } } @@ -349,8 +349,8 @@ impl Cluster { &self.rewrite } - pub fn query_parser_enabled(&self) -> bool { - self.query_parser_enabled + pub fn query_parser(&self) -> QueryParserLevel { + self.query_parser } pub fn prepared_statements(&self) -> &PreparedStatements { @@ -417,12 +417,17 @@ impl Cluster { /// Use the query parser. pub fn use_query_parser(&self) -> bool { - self.multi_tenant().is_some() - || self.query_parser_enabled() - || self.router_needed() - || self.dry_run() - || self.prepared_statements() == &PreparedStatements::Full - || self.pub_sub_enabled() + match self.query_parser() { + QueryParserLevel::Off => return false, + QueryParserLevel::On => return true, + QueryParserLevel::Auto => { + self.multi_tenant().is_some() + || self.router_needed() + || self.dry_run() + || self.prepared_statements() == &PreparedStatements::Full + || self.pub_sub_enabled() + } + } } /// Multi-tenant config. @@ -539,7 +544,7 @@ impl Cluster { mod test { use std::{sync::Arc, time::Duration}; - use pgdog_config::{OmnishardedTable, ShardedSchema}; + use pgdog_config::{ConfigAndUsers, OmnishardedTable, ShardedSchema}; use crate::{ backend::{ @@ -548,7 +553,7 @@ mod test { Shard, ShardedTables, }, config::{ - config, DataType, Hasher, LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy, + DataType, Hasher, LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy, ShardedTable, }, }; @@ -556,8 +561,7 @@ mod test { use super::{Cluster, DatabaseUser}; impl Cluster { - pub fn new_test() -> Self { - let config = config(); + pub fn new_test(config: &ConfigAndUsers) -> Self { let identifier = Arc::new(DatabaseUser { user: "pgdog".into(), database: "pgdog".into(), @@ -630,7 +634,7 @@ mod test { prepared_statements: config.config.general.prepared_statements, dry_run: config.config.general.dry_run, expanded_explain: config.config.general.expanded_explain, - query_parser_enabled: config.config.general.query_parser_enabled, + query_parser: config.config.general.query_parser, rewrite: config.config.rewrite.clone(), two_phase_commit: config.config.general.two_phase_commit, two_phase_commit_auto: config.config.general.two_phase_commit_auto.unwrap_or(false), @@ -638,8 +642,8 @@ mod test { } } - pub fn new_test_single_shard() -> Cluster { - let mut cluster = Self::new_test(); + pub fn new_test_single_shard(config: &ConfigAndUsers) -> Cluster { + let mut cluster = Self::new_test(config); cluster.shards.pop(); cluster diff --git a/pgdog/src/backend/pool/connection/mirror/mod.rs b/pgdog/src/backend/pool/connection/mirror/mod.rs index 271e91c6..644f9df9 100644 --- a/pgdog/src/backend/pool/connection/mirror/mod.rs +++ b/pgdog/src/backend/pool/connection/mirror/mod.rs @@ -222,7 +222,7 @@ mod test { #[tokio::test] async fn test_mirror() { config::load_test(); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); cluster.launch(); let mut mirror = Mirror::spawn("pgdog", &cluster, None).unwrap(); let mut conn = cluster.primary(0, &Request::default()).await.unwrap(); @@ -272,7 +272,7 @@ mod test { #[tokio::test] async fn test_mirror_stats_tracking() { config::load_test(); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); cluster.launch(); // Get initial stats diff --git a/pgdog/src/backend/replication/logical/subscriber/copy.rs b/pgdog/src/backend/replication/logical/subscriber/copy.rs index 8221fa07..2a833153 100644 --- a/pgdog/src/backend/replication/logical/subscriber/copy.rs +++ b/pgdog/src/backend/replication/logical/subscriber/copy.rs @@ -198,6 +198,7 @@ mod test { use crate::{ backend::{pool::Request, replication::publisher::PublicationTable}, + config::config, frontend::router::parser::binary::{header::Header, Data, Tuple}, }; @@ -214,7 +215,7 @@ mod test { }; let copy = CopyStatement::new(&table, &["id".into(), "value".into()]); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); cluster.launch(); cluster diff --git a/pgdog/src/backend/schema/sync/pg_dump.rs b/pgdog/src/backend/schema/sync/pg_dump.rs index aa67d816..5dc99346 100644 --- a/pgdog/src/backend/schema/sync/pg_dump.rs +++ b/pgdog/src/backend/schema/sync/pg_dump.rs @@ -597,7 +597,7 @@ mod test { #[tokio::test] async fn test_pg_dump_execute() { - let cluster = Cluster::new_test_single_shard(); + let cluster = Cluster::new_test_single_shard(&config()); let _pg_dump = PgDump::new(&cluster, "test_pg_dump_execute"); } diff --git a/pgdog/src/config/sharding.rs b/pgdog/src/config/sharding.rs index f3d278ed..c88567f5 100644 --- a/pgdog/src/config/sharding.rs +++ b/pgdog/src/config/sharding.rs @@ -1,4 +1,4 @@ pub use pgdog_config::sharding::{ - DataType, FlexibleType, Hasher, ManualQuery, OmnishardedTables, ShardedMapping, - ShardedMappingKey, ShardedMappingKind, ShardedTable, + DataType, FlexibleType, Hasher, ManualQuery, OmnishardedTables, QueryParserLevel, + ShardedMapping, ShardedMappingKey, ShardedMappingKind, ShardedTable, }; diff --git a/pgdog/src/frontend/router/parser/copy.rs b/pgdog/src/frontend/router/parser/copy.rs index b48b3ac1..f2875e61 100644 --- a/pgdog/src/frontend/router/parser/copy.rs +++ b/pgdog/src/frontend/router/parser/copy.rs @@ -299,6 +299,8 @@ impl CopyParser { mod test { use pg_query::parse; + use crate::config::config; + use super::*; #[test] @@ -372,7 +374,7 @@ mod test { _ => panic!("not a copy"), }; - let mut copy = CopyParser::new(©, &Cluster::new_test()).unwrap(); + let mut copy = CopyParser::new(©, &Cluster::new_test(&config())).unwrap(); let rows = copy.shard(&[copy_data]).unwrap(); assert_eq!(rows.len(), 3); diff --git a/pgdog/src/frontend/router/parser/error.rs b/pgdog/src/frontend/router/parser/error.rs index 285ee0a8..06e148bd 100644 --- a/pgdog/src/frontend/router/parser/error.rs +++ b/pgdog/src/frontend/router/parser/error.rs @@ -3,7 +3,7 @@ use thiserror::Error; use super::rewrite::statement::Error as RewriteError; -use crate::{config::RewriteMode, frontend::router::sharding}; +use crate::frontend::router::sharding; #[derive(Debug, Error)] pub enum Error { @@ -70,31 +70,6 @@ pub enum Error { #[error("regex error")] RegexError, - #[error( - "updating sharding key columns ({columns}) on table \"{table}\" is not allowed when rewrite.shard_key={mode}" - )] - ShardKeyUpdateViolation { - table: String, - columns: String, - mode: RewriteMode, - }, - - #[error( - "rewrite.shard_key=\"rewrite\" is not yet supported for table \"{table}\" (columns: {columns})" - )] - ShardKeyRewriteNotSupported { table: String, columns: String }, - - #[error("internal shard key rewrite invariant violated: {reason}")] - ShardKeyRewriteInvariant { reason: String }, - - #[error( - "multi-row INSERT into sharded table \"{table}\" is not supported when rewrite.split_inserts={mode}" - )] - ShardedMultiRowInsert { table: String, mode: RewriteMode }, - - #[error("multi-row INSERT into sharded table \"{table}\" cannot be rewritten: {reason}")] - SplitInsertNotSupported { table: String, reason: String }, - #[error("cross-shard truncate not supported when schema-sharding is used")] CrossShardTruncateSchemaSharding, @@ -115,4 +90,7 @@ pub enum Error { #[error("rewrite: {0}")] Rewrite(#[from] RewriteError), + + #[error("sharded databases require the query parser to be enabled")] + QueryParserRequired, } diff --git a/pgdog/src/frontend/router/parser/query/explain.rs b/pgdog/src/frontend/router/parser/query/explain.rs index bd1d3c9d..60ac14fb 100644 --- a/pgdog/src/frontend/router/parser/query/explain.rs +++ b/pgdog/src/frontend/router/parser/query/explain.rs @@ -73,7 +73,7 @@ mod tests { // Helper function to route a plain SQL statement and return its `Route`. fn route(sql: &str) -> Route { enable_expanded_explain(); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); let mut stmts = PreparedStatements::default(); let ast = Ast::new( @@ -108,7 +108,7 @@ mod tests { let bind = Bind::new_params("", ¶meters); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); let mut stmts = PreparedStatements::default(); let ast = Ast::new( diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index 65ca8fb4..50e1078f 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -135,6 +135,44 @@ impl QueryParser { Ok(command) } + /// Bypass the query parser if we can. + fn query_parser_bypass(context: &mut QueryParserContext) -> Option { + let shard = context.shards_calculator.shard(); + + if !shard.is_direct() && context.shards > 1 { + return None; + } + + if !shard.is_direct() { + context + .shards_calculator + .push(ShardWithPriority::new_override_parser_disabled( + Shard::Direct(0), + )); + } + + let shard = context.shards_calculator.shard(); + + // Cluster is read-only and only has one shard. + if context.read_only { + Some(Route::read(shard)) + } + // Cluster doesn't have replicas and has only one shard. + else if context.write_only { + Some(Route::write(shard)) + + // The role is specified in the connection parameter (pgdog.role). + } else if let Some(role) = context.router_context.parameter_hints.compute_role() { + Some(match role { + Role::Replica => Route::read(shard), + Role::Primary | Role::Auto => Route::write(shard), + }) + // Default to primary. + } else { + Some(Route::write(shard)) + } + } + /// Parse a query and return a command that tells us what to do with it. /// /// # Arguments @@ -154,17 +192,12 @@ impl QueryParser { ); if !use_parser { - // Cluster is read-only and only has one shard. - if context.read_only { - return Ok(Command::Query(Route::read( - ShardWithPriority::new_override_parser_disabled(Shard::Direct(0)), - ))); - } - // Cluster doesn't have replicas and has only one shard. - if context.write_only { - return Ok(Command::Query(Route::write( - ShardWithPriority::new_override_parser_disabled(Shard::Direct(0)), - ))); + // Try to figure out where we can send the query without + // parsing SQL. + if let Some(route) = Self::query_parser_bypass(context) { + return Ok(Command::Query(route)); + } else { + return Err(Error::QueryParserRequired); } } diff --git a/pgdog/src/frontend/router/parser/query/show.rs b/pgdog/src/frontend/router/parser/query/show.rs index 6e84afcd..a3338b02 100644 --- a/pgdog/src/frontend/router/parser/query/show.rs +++ b/pgdog/src/frontend/router/parser/query/show.rs @@ -31,6 +31,7 @@ impl QueryParser { #[cfg(test)] mod test_show { use crate::backend::Cluster; + use crate::config::config; use crate::frontend::client::Sticky; use crate::frontend::router::parser::Shard; use crate::frontend::router::{Ast, QueryParser}; @@ -40,7 +41,7 @@ mod test_show { #[test] fn show_runs_on_a_direct_shard_round_robin() { - let c = Cluster::new_test(); + let c = Cluster::new_test(&config()); let mut parser = QueryParser::default(); // First call diff --git a/pgdog/src/frontend/router/parser/query/test/mod.rs b/pgdog/src/frontend/router/parser/query/test/mod.rs index 9808be2c..b2fd4321 100644 --- a/pgdog/src/frontend/router/parser/query/test/mod.rs +++ b/pgdog/src/frontend/router/parser/query/test/mod.rs @@ -21,6 +21,7 @@ use crate::net::messages::Query; pub mod setup; +pub mod test_bypass; pub mod test_comments; pub mod test_ddl; pub mod test_delete; @@ -40,7 +41,7 @@ pub mod test_transaction; fn parse_query(query: &str) -> Command { let mut query_parser = QueryParser::default(); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); let ast = Ast::new( &BufferedQuery::Query(Query::new(query)), &cluster.sharding_schema(), @@ -65,7 +66,7 @@ macro_rules! command { ($query:expr, $in_transaction:expr) => {{ let query = $query; let mut query_parser = QueryParser::default(); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&crate::config::config()); let mut ast = Ast::new( &BufferedQuery::Query(Query::new($query)), &cluster.sharding_schema(), @@ -149,7 +150,12 @@ macro_rules! query_parser { }}; ($qp:expr, $query:expr, $in_transaction:expr) => { - query_parser!($qp, $query, $in_transaction, Cluster::new_test()) + query_parser!( + $qp, + $query, + $in_transaction, + Cluster::new_test(&crate::config::config()) + ) }; } @@ -168,7 +174,7 @@ macro_rules! parse { }) .collect::>(); let bind = Bind::new_params_codes($name, ¶ms, $codes); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&crate::config::config()); let ast = Ast::new( &BufferedQuery::Prepared(Parse::new_anonymous($query)), &cluster.sharding_schema(), @@ -404,7 +410,7 @@ fn test_set() { } let query_str = r#"SET statement_timeout TO 1"#; - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); let mut prep_stmts = PreparedStatements::default(); let buffered_query = BufferedQuery::Query(Query::new(query_str)); let mut ast = Ast::new(&buffered_query, &cluster.sharding_schema(), &mut prep_stmts).unwrap(); @@ -452,7 +458,7 @@ fn test_transaction() { _ => panic!("not a select"), } - let mut cluster = Cluster::new_test(); + let mut cluster = Cluster::new_test(&config()); cluster.set_read_write_strategy(ReadWriteStrategy::Aggressive); let command = query_parser!( QueryParser::default(), @@ -530,7 +536,7 @@ fn test_cte() { fn test_function_begin() { let (cmd, mut qp) = command!("BEGIN"); assert!(matches!(cmd, Command::StartTransaction { .. })); - let cluster = Cluster::new_test(); + let cluster = Cluster::new_test(&config()); let mut prep_stmts = PreparedStatements::default(); let query_str = "SELECT ROW(t1.*) AS tt1, @@ -606,7 +612,7 @@ fn test_limit_offset() { #[test] fn test_close_direct_one_shard() { - let cluster = Cluster::new_test_single_shard(); + let cluster = Cluster::new_test_single_shard(&config()); let mut qp = QueryParser::default(); let buf: ClientRequest = vec![Close::named("test").into(), Sync.into()].into(); @@ -663,9 +669,9 @@ fn test_commit_prepared() { fn test_dry_run_simple() { let mut config = config().deref().clone(); config.config.general.dry_run = true; - config::set(config).unwrap(); + config::set(config.clone()).unwrap(); - let cluster = Cluster::new_test_single_shard(); + let cluster = Cluster::new_test_single_shard(&config); let command = query_parser!( QueryParser::default(), Query::new("/* pgdog_sharding_key: 1234 */ SELECT * FROM sharded"), diff --git a/pgdog/src/frontend/router/parser/query/test/setup.rs b/pgdog/src/frontend/router/parser/query/test/setup.rs index 56f61368..1df2493a 100644 --- a/pgdog/src/frontend/router/parser/query/test/setup.rs +++ b/pgdog/src/frontend/router/parser/query/test/setup.rs @@ -1,5 +1,7 @@ use std::ops::Deref; +use pgdog_config::ConfigAndUsers; + use crate::{ backend::Cluster, config::{self, config, ReadWriteStrategy}, @@ -30,7 +32,12 @@ pub(crate) struct QueryParserTest { impl QueryParserTest { /// Create a new test with default settings (no transaction, default cluster). pub(crate) fn new() -> Self { - let cluster = Cluster::new_test(); + Self::new_with_config(&config()) + } + + /// Create a test with a single-shard cluster. + pub(crate) fn new_single_shard(config: &ConfigAndUsers) -> Self { + let cluster = Cluster::new_test_single_shard(config); Self { cluster, @@ -43,9 +50,9 @@ impl QueryParserTest { } } - /// Create a test with a single-shard cluster. - pub(crate) fn new_single_shard() -> Self { - let cluster = Cluster::new_test_single_shard(); + /// Create new test with specific general settings. + pub(crate) fn new_with_config(config: &ConfigAndUsers) -> Self { + let cluster = Cluster::new_test(config); Self { cluster, @@ -80,7 +87,7 @@ impl QueryParserTest { updated.config.general.dry_run = true; config::set(updated).unwrap(); // Recreate cluster with the new config - self.cluster = Cluster::new_test(); + self.cluster = Cluster::new_test(&config()); self } @@ -94,6 +101,8 @@ impl QueryParserTest { self } + /// Startup parameters. + /// Execute a request and return the command (panics on error). pub(crate) fn execute(&mut self, request: Vec) -> Command { self.try_execute(request).expect("execute failed") diff --git a/pgdog/src/frontend/router/parser/query/test/test_bypass.rs b/pgdog/src/frontend/router/parser/query/test/test_bypass.rs new file mode 100644 index 00000000..8c7909ab --- /dev/null +++ b/pgdog/src/frontend/router/parser/query/test/test_bypass.rs @@ -0,0 +1,102 @@ +//! Tests that test what the query parser is disabled +//! and we have only one shard (but we have replicas). +//! +//! QueryParser::query_parser_bypass. +//! +use pgdog_config::QueryParserLevel; + +use crate::{ + config::config, + frontend::router::parser::{Error, Shard}, + net::Query, +}; + +use super::setup::QueryParserTest; + +fn setup() -> QueryParserTest { + let mut config = (*config()).clone(); + config.config.general.query_parser = QueryParserLevel::Off; + QueryParserTest::new_single_shard(&config) +} + +fn setup_sharded() -> QueryParserTest { + let mut config = (*config()).clone(); + config.config.general.query_parser = QueryParserLevel::Off; + QueryParserTest::new_with_config(&config) +} + +const QUERIES: &[&str] = &[ + "SELECT 1", + "CREATE TABLE test (id BIGINT)", + "SELECT * FROM test", + "INSERT INTO test (id) VALUES (1)", +]; + +#[tokio::test] +async fn test_replica() { + let mut test = setup().with_param("pgdog.role", "replica"); + + for query in QUERIES { + let result = test.try_execute(vec![Query::new(query).into()]).unwrap(); + assert!(result.route().is_read()); + assert_eq!(result.route().shard(), &Shard::Direct(0)) + } +} + +#[tokio::test] +async fn test_primary() { + let mut test = setup().with_param("pgdog.role", "primary"); + + for query in QUERIES { + let result = test.try_execute(vec![Query::new(query).into()]).unwrap(); + assert!(result.route().is_write()); + assert_eq!(result.route().shard(), &Shard::Direct(0)) + } +} + +#[tokio::test] +async fn test_no_hints() { + let mut test = setup(); + + for query in QUERIES { + let result = test.try_execute(vec![Query::new(query).into()]).unwrap(); + assert!(result.route().is_write()); + assert_eq!(result.route().shard(), &Shard::Direct(0)) + } +} + +#[tokio::test] +async fn test_sharded_with_shard() { + let mut test = setup_sharded().with_param("pgdog.shard", "1"); + + for query in QUERIES { + let result = test.try_execute(vec![Query::new(query).into()]).unwrap(); + assert!(result.route().is_write()); + assert_eq!(result.route().shard(), &Shard::Direct(1)) + } +} + +#[tokio::test] +async fn test_sharded_with_shard_and_replica() { + let mut test = setup_sharded() + .with_param("pgdog.shard", "1") + .with_param("pgdog.role", "replica"); + + for query in QUERIES { + let result = test.try_execute(vec![Query::new(query).into()]).unwrap(); + assert!(result.route().is_read()); + assert_eq!(result.route().shard(), &Shard::Direct(1)) + } +} + +#[tokio::test] +async fn test_sharded_no_hints() { + let mut test = setup_sharded(); + + for query in QUERIES { + let result = test + .try_execute(vec![Query::new(query).into()]) + .unwrap_err(); + assert!(matches!(result, Error::QueryParserRequired)); + } +} diff --git a/pgdog/src/frontend/router/parser/query/test/test_sharding.rs b/pgdog/src/frontend/router/parser/query/test/test_sharding.rs index d6511f14..57064a5c 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_sharding.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_sharding.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; +use crate::config::config; use crate::frontend::router::parser::{Cache, Shard}; use crate::frontend::Command; @@ -16,7 +17,7 @@ fn test_show_shards() { #[test] fn test_close_direct_single_shard() { - let mut test = QueryParserTest::new_single_shard(); + let mut test = QueryParserTest::new_single_shard(&config()); let command = test.execute(vec![Close::named("test").into(), Sync.into()]); @@ -28,7 +29,7 @@ fn test_close_direct_single_shard() { #[test] fn test_dry_run_simple() { - let mut test = QueryParserTest::new_single_shard().with_dry_run(); + let mut test = QueryParserTest::new_single_shard(&config()).with_dry_run(); let command = test.execute(vec![Query::new( "/* pgdog_sharding_key: 1234 */ SELECT * FROM sharded",