Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions integration/python/test_psycopg2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import psycopg2
import pytest
from globals import admin

queries = [
"SELECT 1",
"CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)",
"INSERT INTO test_conn_reads (id) VALUES (1)",
"INSERT INTO test_conn_reads (id) VALUES (2)",
"INSERT INTO test_conn_reads (id) VALUES (3)",
"SELECT * FROM test_conn_reads WHERE id = 1",
"SELECT * FROM test_conn_reads WHERE id = 2",
"SELECT * FROM test_conn_reads WHERE id = 3",
"SET work_mem TO '4MB'",
"SET work_mem TO '6MB'",
"SET work_mem TO '8MB'",
]


@pytest.fixture
def conn_reads():
return psycopg2.connect(
"host=127.0.0.1 port=6432 user=pgdog password=pgdog "
"options='-c pgdog.role=replica'"
)


@pytest.fixture
def conn_writes():
return psycopg2.connect(
"host=127.0.0.1 port=6432 user=pgdog password=pgdog "
"options='-c pgdog.role=primary'"
)


@pytest.fixture
def conn_default():
return psycopg2.connect("host=127.0.0.1 port=6432 user=pgdog password=pgdog")


def test_conn_writes(conn_writes):
admin().execute("SET query_parser TO 'off'")
for query in queries:
conn_writes.autocommit = True
cursor = conn_writes.cursor()
cursor.execute(query)
admin().execute("SET query_parser TO 'auto'")


def test_conn_reads(conn_reads, conn_writes):
admin().execute("SET query_parser TO 'off'")

conn_writes.autocommit = True
conn_reads.autocommit = True

conn_writes.cursor().execute(
"CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)"
)

read = False
for query in queries:
cursor = conn_reads.cursor()
try:
cursor.execute(query)
except psycopg2.errors.ReadOnlySqlTransaction:
# Some will succeed because we allow reads
# on the primary.
read = True
admin().execute("SET query_parser TO 'auto'")

conn_writes.cursor().execute("DROP TABLE IF EXISTS test_conn_reads")
assert read, "expected some queries to hit replicas and fail"


def test_transactions_writes(conn_writes):
admin().execute("SET query_parser TO 'off'")

for query in queries:
conn_writes.cursor().execute(query)
conn_writes.commit()

admin().execute("SET query_parser TO 'auto'")


def test_transactions_reads(conn_reads):
admin().execute("SET query_parser TO 'off'")
read = False

for query in queries:
try:
conn_reads.cursor().execute(query)
except psycopg2.errors.ReadOnlySqlTransaction:
# Some will succeed because we allow reads
# on the primary.
read = True
conn_reads.commit()

assert read, "expected some queries to hit replicas and fail"
admin().execute("SET query_parser TO 'auto'")


def test_transaction_reads_explicit(conn_reads, conn_writes):
conn_reads.autocommit = True
admin().execute("SET query_parser TO 'off'")

conn_writes.cursor().execute(
"CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)"
)
conn_writes.commit()

cursor = conn_reads.cursor()

read = False

for _ in range(15):
cursor.execute("BEGIN")
try:
cursor.execute("INSERT INTO test_conn_reads (id) VALUES (1)")
cursor.execute("COMMIT")
except psycopg2.errors.ReadOnlySqlTransaction:
read = True
cursor.execute("ROLLBACK")

assert read, "expected some queries to hit replicas and fail"

for _ in range(15):
cursor.execute("BEGIN READ ONLY") # Won't be parsed, doesn't matter to PgDog
cursor.execute("SELECT 1")
cursor.execute("ROLLBACK")

admin().execute("SET query_parser TO 'on'")
39 changes: 32 additions & 7 deletions pgdog-config/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::{info, warn};
use crate::sharding::ShardedSchema;
use crate::{
EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements,
ReadWriteSplit, RewriteMode, Role,
QueryParserLevel, ReadWriteSplit, RewriteMode, Role,
};

use super::database::Database;
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Config {
mappings
}

pub fn check(&self) {
pub fn check(&mut self) {
// Check databases.
let mut duplicate_dbs = HashSet::new();
for database in self.databases.clone() {
Expand All @@ -317,7 +317,9 @@ impl Config {
pooler_mode: Option<PoolerMode>,
role: Role,
role_warned: bool,
parser_warned: bool,
have_replicas: bool,
sharded: bool,
}

// Check identical configs.
Expand All @@ -341,14 +343,30 @@ impl Config {
if !existing.have_replicas {
existing.have_replicas = database.role == Role::Replica;
}
if !existing.sharded {
existing.sharded = database.shard > 0;
}

if (existing.sharded || existing.have_replicas)
&& self.general.query_parser == QueryParserLevel::Off
&& !existing.parser_warned
{
existing.parser_warned = true;
warn!(
r#"database "{}" may need the query parser for load balancing/sharding, but it's disabled"#,
database.name
);
}
} else {
checks.insert(
database.name.clone(),
Check {
pooler_mode: database.pooler_mode,
role: database.role,
role_warned: false,
parser_warned: false,
have_replicas: database.role == Role::Replica,
sharded: database.shard > 0,
},
);
}
Expand Down Expand Up @@ -381,13 +399,15 @@ impl Config {

if !self.general.two_phase_commit && self.rewrite.enabled {
if self.rewrite.shard_key == RewriteMode::Rewrite {
warn!("rewrite.shard_key=rewrite will apply non-atomic shard-key rewrites; enabling two_phase_commit is strongly recommended"
);
warn!(
r#"rewrite.shard_key = "rewrite" may apply non-atomic sharding key rewrites; enabling "two_phase_commit" is strongly recommended"#
);
}

if self.rewrite.split_inserts == RewriteMode::Rewrite {
warn!("rewrite.split_inserts=rewrite may commit partial multi-row INSERTs; enabling two_phase_commit is strongly recommended"
);
warn!(
r#"rewrite.split_inserts = "rewrite" may commit partial multi-row inserts; enabling "two_phase_commit" is strongly recommended"#
);
}
}

Expand All @@ -396,11 +416,16 @@ impl Config {
&& self.general.read_write_split == ReadWriteSplit::ExcludePrimary
{
warn!(
r#"database "{}" has no replicas and read_write_split is set to "{}", read queries will not be served"#,
r#"database "{}" has no replicas and "read_write_split" is set to "{}": read queries will be rejected"#,
database, self.general.read_write_split
);
}
}

if self.general.query_parser_enabled {
warn!(r#""query_parser_enabled" is deprecated, use "query_parser" = "on" instead"#);
self.general.query_parser = QueryParserLevel::On;
}
}

/// Multi-tenancy is enabled.
Expand Down
5 changes: 5 additions & 0 deletions pgdog-config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::PathBuf;
use std::time::Duration;

use crate::pooling::ConnectionRecovery;
use crate::QueryParserLevel;

use super::auth::{AuthType, PassthoughAuth};
use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy};
Expand Down Expand Up @@ -96,6 +97,9 @@ pub struct General {
/// Parse Queries override.
#[serde(default = "General::query_parser_enabled")]
pub query_parser_enabled: bool,
/// Query parser.
#[serde(default)]
pub query_parser: QueryParserLevel,
/// Limit on the number of prepared statements in the server cache.
#[serde(default = "General::prepared_statements_limit")]
pub prepared_statements_limit: usize,
Expand Down Expand Up @@ -219,6 +223,7 @@ impl Default for General {
openmetrics_namespace: Self::openmetrics_namespace(),
prepared_statements: Self::prepared_statements(),
query_parser_enabled: Self::query_parser_enabled(),
query_parser: QueryParserLevel::default(),
prepared_statements_limit: Self::prepared_statements_limit(),
query_cache_limit: Self::query_cache_limit(),
passthrough_auth: Self::default_passthrough_auth(),
Expand Down
9 changes: 9 additions & 0 deletions pgdog-config/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,12 @@ impl ListShards {
}
}
}

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum QueryParserLevel {
On,
#[default]
Auto,
Off,
}
4 changes: 4 additions & 0 deletions pgdog/src/admin/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ impl Command for Set {
config.config.general.tls_client_required = Self::from_json(&self.value)?;
}

"query_parser" => {
config.config.general.query_parser = Self::from_json(&self.value)?;
}

_ => return Err(Error::Syntax),
}

Expand Down
Loading
Loading