Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bin/ci-builder
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ case "$cmd" in
--env CANARY_LOADTEST_PASSWORD
--env CLOUDTEST_CLUSTER_DEFINITION_FILE
--env COMMON_ANCESTOR_OVERRIDE
--env CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD
--env CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME
--env CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD
--env CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME
--env AZURE_SERVICE_ACCOUNT_USERNAME
Expand All @@ -257,6 +259,10 @@ case "$cmd" in
--env PRODUCTION_ANALYTICS_USERNAME
--env PRODUCTION_ANALYTICS_APP_PASSWORD
--env PYPI_TOKEN
--env QA_CLUSTER_SPEC_SHEET_POSTGRES_HOSTNAME
--env QA_CLUSTER_SPEC_SHEET_POSTGRES_PASSWORD
--env QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME
--env QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD
--env RUST_MIN_STACK
--env MZ_DEV_BUILD_SHA
--env MZ_GHCR
Expand Down
2 changes: 1 addition & 1 deletion ci/release-qualification/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ steps:
- ./ci/plugins/mzcompose:
composition: cluster-spec-sheet
run: default
args: [--cleanup, --target=cloud-production, cluster]
args: [--cleanup, --target=cloud-production, source_ingestion_strong]
agents:
queue: linux-aarch64-small

Expand Down
30 changes: 20 additions & 10 deletions src/postgres-util/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@

use std::collections::{BTreeMap, BTreeSet};

use anyhow::anyhow;
use tokio_postgres::Client;
use tokio_postgres::types::Oid;

use crate::PostgresError;
use crate::desc::{PostgresColumnDesc, PostgresKeyDesc, PostgresSchemaDesc, PostgresTableDesc};
use crate::{PostgresError, simple_query_opt};

pub async fn get_schemas(client: &Client) -> Result<Vec<PostgresSchemaDesc>, PostgresError> {
Ok(client
Expand All @@ -32,6 +31,22 @@ pub async fn get_schemas(client: &Client) -> Result<Vec<PostgresSchemaDesc>, Pos
.collect::<Vec<_>>())
}

/// Get the major version of the PostgreSQL server.
pub async fn get_pg_major_version(client: &Client) -> Result<u32, PostgresError> {
// server_version_num is an integer like 140005 for version 14.5
let query = "SHOW server_version_num";
let row = simple_query_opt(client, query).await?;
let version_num: u32 = row
.and_then(|r| r.get("server_version_num").map(|s| s.parse().ok()))
.flatten()
.ok_or_else(|| {
PostgresError::Generic(anyhow::anyhow!("failed to get PostgreSQL version"))
})?;
// server_version_num format: XXYYZZ where XX is major, YY is minor, ZZ is patch
// For PG >= 10, it's XXXYYZZ (3 digit major)
Ok(version_num / 10000)
}

/// Fetches table schema information from an upstream Postgres source for tables
/// that are part of a publication, given a connection string and the
/// publication name. Returns a map from table OID to table schema information.
Expand All @@ -49,12 +64,7 @@ pub async fn publication_info(
publication: &str,
oids: Option<&[Oid]>,
) -> Result<BTreeMap<Oid, PostgresTableDesc>, PostgresError> {
let server_version_num = client
.query_one("SHOW server_version_num", &[])
.await?
.get::<_, &str>("server_version_num")
.parse::<i32>()
.map_err(|e| PostgresError::Generic(anyhow!("unable to parse server_version_num: {e}")))?;
let server_major_version = get_pg_major_version(client).await?;

client
.query(
Expand Down Expand Up @@ -102,7 +112,7 @@ pub async fn publication_info(
// The Postgres replication protocol does not support GENERATED columns
// so we exclude them from this query. But not all Postgres-like
// databases have the `pg_attribute.attgenerated` column.
let attgenerated = if server_version_num >= 120000 {
let attgenerated = if server_major_version >= 12 {
"a.attgenerated = ''"
} else {
"true"
Expand Down Expand Up @@ -159,7 +169,7 @@ pub async fn publication_info(
// PG 15 adds UNIQUE NULLS NOT DISTINCT, which would let us use `UNIQUE` constraints over
// nullable columns as keys; i.e. aligns a PG index's NULL handling with an arrangement's
// keys. For more info, see https://www.postgresql.org/about/featurematrix/detail/392/
let nulls_not_distinct = if server_version_num >= 150000 {
let nulls_not_distinct = if server_major_version >= 15 {
"pg_index.indnullsnotdistinct"
} else {
"false"
Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/source/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
//! # Snapshot
//!
//! One part of the dataflow deals with snapshotting the tables involved in the ingestion. Each
//! table that needs a snapshot is assigned to a specific worker which performs a `COPY` query
//! and distributes the raw COPY bytes to all workers to decode the text encoded rows.
//! table is partitioned across all workers using PostgreSQL's `ctid` column to identify row
//! ranges. Each worker fetches its assigned range using a `COPY` query with ctid filtering,
//! enabling parallel snapshotting of large tables.
//!
//! For all tables that ended up being snapshotted the snapshot reader also emits a rewind request
//! to the replication reader which will ensure that the requested portion of the replication
Expand Down
Loading