diff --git a/bin/ci-builder b/bin/ci-builder index 2fc03a500d925..7cd8c2bc3d648 100755 --- a/bin/ci-builder +++ b/bin/ci-builder @@ -236,6 +236,8 @@ case "$cmd" in --env CANARY_LOADTEST_PASSWORD --env CLOUDTEST_CLUSTER_DEFINITION_FILE --env COMMON_ANCESTOR_OVERRIDE + --env CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD + --env CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME --env CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD --env CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME --env AZURE_SERVICE_ACCOUNT_USERNAME @@ -257,6 +259,10 @@ case "$cmd" in --env PRODUCTION_ANALYTICS_USERNAME --env PRODUCTION_ANALYTICS_APP_PASSWORD --env PYPI_TOKEN + --env QA_CLUSTER_SPEC_SHEET_POSTGRES_HOSTNAME + --env QA_CLUSTER_SPEC_SHEET_POSTGRES_PASSWORD + --env QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME + --env QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD --env RUST_MIN_STACK --env MZ_DEV_BUILD_SHA --env MZ_GHCR diff --git a/ci/release-qualification/pipeline.template.yml b/ci/release-qualification/pipeline.template.yml index 1ce7f231d5d91..b95f5b7350f02 100644 --- a/ci/release-qualification/pipeline.template.yml +++ b/ci/release-qualification/pipeline.template.yml @@ -543,7 +543,7 @@ steps: - ./ci/plugins/mzcompose: composition: cluster-spec-sheet run: default - args: [--cleanup, --target=cloud-production, cluster] + args: [--cleanup, --target=cloud-production, source_ingestion_strong] agents: queue: linux-aarch64-small diff --git a/src/postgres-util/src/schemas.rs b/src/postgres-util/src/schemas.rs index ab9c9af6bd7ce..12f1ede1ff5ce 100644 --- a/src/postgres-util/src/schemas.rs +++ b/src/postgres-util/src/schemas.rs @@ -11,12 +11,11 @@ use std::collections::{BTreeMap, BTreeSet}; -use anyhow::anyhow; use tokio_postgres::Client; use tokio_postgres::types::Oid; -use crate::PostgresError; use crate::desc::{PostgresColumnDesc, PostgresKeyDesc, PostgresSchemaDesc, PostgresTableDesc}; +use crate::{PostgresError, simple_query_opt}; pub async fn get_schemas(client: &Client) -> Result, PostgresError> { Ok(client @@ -32,6 +31,22 @@ pub async fn get_schemas(client: &Client) -> Result, Pos .collect::>()) } +/// Get the major version of the PostgreSQL server. +pub async fn get_pg_major_version(client: &Client) -> Result { + // server_version_num is an integer like 140005 for version 14.5 + let query = "SHOW server_version_num"; + let row = simple_query_opt(client, query).await?; + let version_num: u32 = row + .and_then(|r| r.get("server_version_num").map(|s| s.parse().ok())) + .flatten() + .ok_or_else(|| { + PostgresError::Generic(anyhow::anyhow!("failed to get PostgreSQL version")) + })?; + // server_version_num format: XXYYZZ where XX is major, YY is minor, ZZ is patch + // For PG >= 10, it's XXXYYZZ (3 digit major) + Ok(version_num / 10000) +} + /// Fetches table schema information from an upstream Postgres source for tables /// that are part of a publication, given a connection string and the /// publication name. Returns a map from table OID to table schema information. @@ -49,12 +64,7 @@ pub async fn publication_info( publication: &str, oids: Option<&[Oid]>, ) -> Result, PostgresError> { - let server_version_num = client - .query_one("SHOW server_version_num", &[]) - .await? - .get::<_, &str>("server_version_num") - .parse::() - .map_err(|e| PostgresError::Generic(anyhow!("unable to parse server_version_num: {e}")))?; + let server_major_version = get_pg_major_version(client).await?; client .query( @@ -102,7 +112,7 @@ pub async fn publication_info( // The Postgres replication protocol does not support GENERATED columns // so we exclude them from this query. But not all Postgres-like // databases have the `pg_attribute.attgenerated` column. - let attgenerated = if server_version_num >= 120000 { + let attgenerated = if server_major_version >= 12 { "a.attgenerated = ''" } else { "true" @@ -159,7 +169,7 @@ pub async fn publication_info( // PG 15 adds UNIQUE NULLS NOT DISTINCT, which would let us use `UNIQUE` constraints over // nullable columns as keys; i.e. aligns a PG index's NULL handling with an arrangement's // keys. For more info, see https://www.postgresql.org/about/featurematrix/detail/392/ - let nulls_not_distinct = if server_version_num >= 150000 { + let nulls_not_distinct = if server_major_version >= 15 { "pg_index.indnullsnotdistinct" } else { "false" diff --git a/src/storage/src/source/postgres.rs b/src/storage/src/source/postgres.rs index b42ae73bc6365..50750d76f86d6 100644 --- a/src/storage/src/source/postgres.rs +++ b/src/storage/src/source/postgres.rs @@ -13,8 +13,9 @@ //! # Snapshot //! //! One part of the dataflow deals with snapshotting the tables involved in the ingestion. Each -//! table that needs a snapshot is assigned to a specific worker which performs a `COPY` query -//! and distributes the raw COPY bytes to all workers to decode the text encoded rows. +//! table is partitioned across all workers using PostgreSQL's `ctid` column to identify row +//! ranges. Each worker fetches its assigned range using a `COPY` query with ctid filtering, +//! enabling parallel snapshotting of large tables. //! //! For all tables that ended up being snapshotted the snapshot reader also emits a rewind request //! to the replication reader which will ensure that the requested portion of the replication diff --git a/src/storage/src/source/postgres/snapshot.rs b/src/storage/src/source/postgres/snapshot.rs index 9c717bce147d8..1a776eaccbd5a 100644 --- a/src/storage/src/source/postgres/snapshot.rs +++ b/src/storage/src/source/postgres/snapshot.rs @@ -11,9 +11,12 @@ //! //! # Snapshot reading //! -//! Depending on the resumption LSNs the table reader decides which tables need to be snapshotted -//! and performs a simple `COPY` query on them in order to get a snapshot. There are a few subtle -//! points about this operation, described in the following sections. +//! Depending on the resumption LSNs the table reader decides which tables need to be snapshotted. +//! Each table is partitioned across all workers using PostgreSQL's `ctid` (tuple identifier) +//! column, which identifies the physical location of each row. This allows parallel snapshotting +//! of large tables across all available workers. +//! +//! There are a few subtle points about this operation, described in the following sections. //! //! ## Consistent LSN point for snapshot transactions //! @@ -104,25 +107,48 @@ //! with the diffs taken at `t_snapshot` that were also emitted at LSN 0 (by convention) and we end //! up with a TVC that at LSN 0 contains the snapshot at `t_slot`. //! -//! # Snapshot decoding +//! # Parallel table snapshotting with ctid ranges +//! +//! Each table is partitioned across workers using PostgreSQL's `ctid` column. The `ctid` is a +//! tuple identifier of the form `(block_number, tuple_index)` that represents the physical +//! location of a row on disk. By partitioning the ctid range, each worker can independently +//! fetch a portion of the table. +//! +//! The partitioning works as follows: +//! 1. The snapshot leader queries `pg_class.relpages` to estimate the number of blocks for each +//! table. This is much faster than querying `max(ctid)` which would require a sequential scan. +//! 2. The leader broadcasts the block count estimates along with the snapshot transaction ID +//! to all workers, ensuring all workers use consistent estimates for partitioning. +//! 3. Each worker calculates its assigned block range and fetches rows using a `COPY` query +//! with a `SELECT` that filters by `ctid >= start AND ctid < end`. +//! 4. The last worker uses an open-ended range (`ctid >= start`) to capture any rows beyond +//! the estimated block count (handles cases where statistics are stale or table has grown). //! -//! The expectation is that tables will most likely be skewed on the number of rows they contain so -//! while a `COPY` query for any given table runs on a single worker the decoding of the COPY -//! stream is distributed to all workers. +//! This approach efficiently parallelizes large table snapshots while maintaining the benefits +//! of the `COPY` protocol for bulk data transfer. +//! +//! ## PostgreSQL version requirements +//! +//! Ctid range scans are only efficient on PostgreSQL >= 14 due to TID range scan optimizations +//! introduced in that version. For older PostgreSQL versions, the snapshot falls back to the +//! single-worker-per-table mode where each table is assigned to one worker based on consistent +//! hashing. This is implemented by having the leader broadcast all-zero block counts when +//! PostgreSQL version < 14. +//! +//! # Snapshot decoding //! +//! Each worker fetches its ctid range directly and decodes the COPY stream locally. //! //! ```text //! ╭──────────────────╮ //! ┏━━━━━━━━━━━━v━┓ │ exported //! ┃ table ┃ ╭─────────╮ │ snapshot id -//! ┃ reader ┠─>─┤broadcast├──╯ -//! ┗━┯━━━━━━━━━━┯━┛ ╰─────────╯ +//! ┃ readers ┠─>─┤broadcast├──╯ +//! ┃ (parallel) ┃ ╰─────────╯ +//! ┗━┯━━━━━━━━━━┯━┛ //! raw│ │ //! COPY│ │ //! data│ │ -//! ╭────┴─────╮ │ -//! │distribute│ │ -//! ╰────┬─────╯ │ //! ┏━━━━┷━━━━┓ │ //! ┃ COPY ┃ │ //! ┃ decoder ┃ │ @@ -146,6 +172,7 @@ use itertools::Itertools; use mz_ore::cast::CastFrom; use mz_ore::future::InTask; use mz_postgres_util::desc::PostgresTableDesc; +use mz_postgres_util::schemas::get_pg_major_version; use mz_postgres_util::{Client, Config, PostgresError, simple_query_opt}; use mz_repr::{Datum, DatumVec, Diff, Row}; use mz_sql_parser::ast::{Ident, display::AstDisplay}; @@ -157,7 +184,8 @@ use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; use timely::container::CapacityContainerBuilder; -use timely::dataflow::channels::pact::{Exchange, Pipeline}; +use timely::container::DrainContainer; +use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::core::Map; use timely::dataflow::operators::{ Broadcast, CapabilitySet, Concat, ConnectLoop, Feedback, Operator, @@ -177,6 +205,139 @@ use crate::source::postgres::{ use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection}; use crate::statistics::SourceStatistics; +/// Information broadcasted from the snapshot leader to all workers. +/// This includes the transaction snapshot ID, LSN, and estimated block counts for each table. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct SnapshotInfo { + /// The exported transaction snapshot identifier. + snapshot_id: String, + /// The LSN at which the snapshot was taken. + snapshot_lsn: MzOffset, + /// Estimated number of blocks (pages) for each table, keyed by OID. + /// This is derived from `pg_class.relpages` and used to partition ctid ranges. + table_block_counts: BTreeMap, + /// The current upstream schema of each table. + upstream_info: BTreeMap, +} + +/// Represents a ctid range that a worker should snapshot. +/// The range is [start_block, end_block) where end_block is optional (None means unbounded). +#[derive(Debug)] +struct CtidRange { + /// The starting block number (inclusive). + start_block: u64, + /// The ending block number (exclusive). None means unbounded (open-ended range). + end_block: Option, +} + +/// Calculate the ctid range for a given worker based on estimated block count. +/// +/// The table is partitioned by block number across all workers. Each worker gets a contiguous +/// range of blocks. The last worker gets an open-ended range to handle any rows beyond the +/// estimated block count. +/// +/// When `estimated_blocks` is 0 (either because statistics are unavailable, the table appears +/// empty, or PostgreSQL version < 14 doesn't support ctid range scans), the table is assigned +/// to a single worker determined by `config.responsible_for(oid)` and that worker scans the +/// full table. +/// +/// Returns None if this worker has no work to do. +fn worker_ctid_range( + config: &RawSourceCreationConfig, + estimated_blocks: u64, + oid: u32, +) -> Option { + // If estimated_blocks is 0, fall back to single-worker mode for this table. + // This handles: + // - PostgreSQL < 14 (ctid range scans not supported) + // - Tables that appear empty in statistics + // - Tables with stale/missing statistics + // The responsible worker scans the full table with an open-ended range. + if estimated_blocks == 0 { + let fallback = if config.responsible_for(oid) { + Some(CtidRange { + start_block: 0, + end_block: None, + }) + } else { + None + }; + return fallback; + } + + let worker_id = u64::cast_from(config.worker_id); + let worker_count = u64::cast_from(config.worker_count); + + // If there are more workers than blocks, only assign work to workers with id < estimated_blocks + // The last assigned worker still gets an open range. + let effective_worker_count = std::cmp::min(worker_count, estimated_blocks); + + if worker_id >= effective_worker_count { + // This worker has no work to do + return None; + } + + // Calculate start block for this worker (integer division distributes blocks evenly) + let start_block = worker_id * estimated_blocks / effective_worker_count; + + // The last effective worker gets an open-ended range + let is_last_effective_worker = worker_id == effective_worker_count - 1; + if is_last_effective_worker { + Some(CtidRange { + start_block, + end_block: None, + }) + } else { + let end_block = (worker_id + 1) * estimated_blocks / effective_worker_count; + Some(CtidRange { + start_block, + end_block: Some(end_block), + }) + } +} + +/// Estimate the number of blocks for each table from pg_class statistics. +/// This is used to partition ctid ranges across workers. +async fn estimate_table_block_counts( + client: &Client, + table_oids: &[u32], +) -> Result, TransientError> { + if table_oids.is_empty() { + return Ok(BTreeMap::new()); + } + + // Query relpages for all tables at once + let oid_list = table_oids + .iter() + .map(|oid| oid.to_string()) + .collect::>() + .join(","); + let query = format!( + "SELECT oid, relpages FROM pg_class WHERE oid IN ({})", + oid_list + ); + + let mut block_counts = BTreeMap::new(); + // Initialize all tables with 0 blocks (in case they're not in pg_class) + for &oid in table_oids { + block_counts.insert(oid, 0); + } + + // Execute the query and collect results + let rows = client.simple_query(&query).await?; + for msg in rows { + if let tokio_postgres::SimpleQueryMessage::Row(row) = msg { + let oid: u32 = row.get("oid").unwrap().parse().unwrap(); + let relpages: i64 = row.get("relpages").unwrap().parse().unwrap_or(0); + // relpages can be -1 if never analyzed, treat as 0 + let relpages = std::cmp::max(0, relpages).try_into().unwrap(); + block_counts.insert(oid, relpages); + } + } + + Ok(block_counts) +} + /// Renders the snapshot dataflow. See the module documentation for more information. pub(crate) fn render>( mut scope: G, @@ -220,9 +381,10 @@ pub(crate) fn render>( // A global view of all outputs that will be snapshot by all workers. let mut all_outputs = vec![]; - // A filtered table info containing only the tables that this worker should snapshot. - let mut worker_table_info = BTreeMap::new(); - // A collecction of `SourceStatistics` to update for a given Oid. Same info exists in reader_table_info, + // Table info for tables that need snapshotting. All workers will snapshot all tables, + // but each worker will handle a different ctid range within each table. + let mut tables_to_snapshot = BTreeMap::new(); + // A collection of `SourceStatistics` to update for a given Oid. Same info exists in table_info, // but this avoids having to iterate + map each time the statistics are needed. let mut export_statistics = BTreeMap::new(); for (table, outputs) in table_info.iter() { @@ -232,12 +394,10 @@ pub(crate) fn render>( continue; } all_outputs.push(output_index); - if config.responsible_for(*table) { - worker_table_info - .entry(*table) - .or_insert_with(BTreeMap::new) - .insert(output_index, output.clone()); - } + tables_to_snapshot + .entry(*table) + .or_insert_with(BTreeMap::new) + .insert(output_index, output.clone()); let statistics = config .statistics .get(&output.export_id) @@ -264,7 +424,7 @@ pub(crate) fn render>( %id, "timely-{worker_id} initializing table reader \ with {} tables to snapshot", - worker_table_info.len() + tables_to_snapshot.len() ); let connection_config = connection @@ -313,11 +473,91 @@ pub(crate) fn render>( statistics.set_snapshot_records_staged(0); } + // Collect table OIDs for block count estimation + let table_oids: Vec = tables_to_snapshot.keys().copied().collect(); + // replication client is only set if this worker is the snapshot leader let client = match replication_client { Some(client) => { let tmp_slot = format!("mzsnapshot_{}", uuid::Uuid::new_v4()).replace('-', ""); - let snapshot_info = export_snapshot(&client, &tmp_slot, true).await?; + let (snapshot_id, snapshot_lsn) = export_snapshot(&client, &tmp_slot, true).await?; + + // Check PostgreSQL version. Ctid range scans are only efficient on PG >= 14 + // due to improvements in TID range scan support. + let pg_version = get_pg_major_version(&client).await?; + + // Estimate block counts for all tables from pg_class statistics. + // This must be done by the leader and broadcasted to ensure all workers + // use the same estimates for ctid range partitioning. + // + // For PostgreSQL < 14, we set all block counts to 0 to fall back to + // single-worker-per-table mode, as ctid range scans are not well supported. + let table_block_counts = if pg_version >= 14 { + estimate_table_block_counts(&client, &table_oids).await? + } else { + trace!( + %id, + "timely-{worker_id} PostgreSQL version {pg_version} < 14, \ + falling back to single-worker-per-table snapshot mode" + ); + // Return all zeros to trigger fallback mode + table_oids.iter().map(|&oid| (oid, 0u64)).collect() + }; + + report_snapshot_size(&client, &tables_to_snapshot, metrics, &config, &export_statistics).await?; + + let upstream_info = { + let table_oids = tables_to_snapshot.keys().copied().collect::>(); + // As part of retrieving the schema info, RLS policies are checked to ensure the + // snapshot can successfully read the tables. RLS policy errors are treated as + // transient, as the customer can simply add the BYPASSRLS to the PG account + // used by MZ. + match retrieve_schema_info( + &connection_config, + &config.config.connection_context, + &connection.publication, + &table_oids) + .await + { + // If the replication stream cannot be obtained in a definite way there is + // nothing else to do. These errors are not retractable. + Err(PostgresError::PublicationMissing(publication)) => { + let err = DefiniteError::PublicationDropped(publication); + for (oid, outputs) in tables_to_snapshot.iter() { + // Produce a definite error here and then exit to ensure + // a missing publication doesn't generate a transient + // error and restart this dataflow indefinitely. + // + // We pick `u64::MAX` as the LSN which will (in + // practice) never conflict any previously revealed + // portions of the TVC. + for output_index in outputs.keys() { + let update = ( + (*oid, *output_index, Err(err.clone().into())), + MzOffset::from(u64::MAX), + Diff::ONE, + ); + raw_handle.give_fueled(&data_cap_set[0], update).await; + } + } + + definite_error_handle.give( + &definite_error_cap_set[0], + ReplicationError::Definite(Rc::new(err)), + ); + return Ok(()); + }, + Err(e) => Err(TransientError::from(e))?, + Ok(i) => i, + } + }; + + let snapshot_info = SnapshotInfo { + snapshot_id, + snapshot_lsn, + upstream_info, + table_block_counts, + }; trace!( %id, "timely-{worker_id} exporting snapshot info {snapshot_info:?}"); @@ -349,7 +589,7 @@ pub(crate) fn render>( ) .await?; - let (snapshot, snapshot_lsn) = loop { + let snapshot_info = loop { match snapshot_input.next().await { Some(AsyncEvent::Data(_, mut data)) => { break data.pop().expect("snapshot sent above") @@ -361,62 +601,20 @@ pub(crate) fn render>( ), } }; + let SnapshotInfo { + snapshot_id, + snapshot_lsn, + table_block_counts, + upstream_info, + } = snapshot_info; + // Snapshot leader is already in identified transaction but all other workers need to enter it. if !is_snapshot_leader { - trace!(%id, "timely-{worker_id} using snapshot id {snapshot:?}"); - use_snapshot(&client, &snapshot).await?; + trace!(%id, "timely-{worker_id} using snapshot id {snapshot_id:?}"); + use_snapshot(&client, &snapshot_id).await?; } - - let upstream_info = { - let table_oids = worker_table_info.keys().copied().collect::>(); - // As part of retrieving the schema info, RLS policies are checked to ensure the - // snapshot can successfully read the tables. RLS policy errors are treated as - // transient, as the customer can simply add the BYPASSRLS to the PG account - // used by MZ. - match retrieve_schema_info( - &connection_config, - &config.config.connection_context, - &connection.publication, - &table_oids) - .await - { - // If the replication stream cannot be obtained in a definite way there is - // nothing else to do. These errors are not retractable. - Err(PostgresError::PublicationMissing(publication)) => { - let err = DefiniteError::PublicationDropped(publication); - for (oid, outputs) in worker_table_info.iter() { - // Produce a definite error here and then exit to ensure - // a missing publication doesn't generate a transient - // error and restart this dataflow indefinitely. - // - // We pick `u64::MAX` as the LSN which will (in - // practice) never conflict any previously revealed - // portions of the TVC. - for output_index in outputs.keys() { - let update = ( - (*oid, *output_index, Err(err.clone().into())), - MzOffset::from(u64::MAX), - Diff::ONE, - ); - raw_handle.give_fueled(&data_cap_set[0], update).await; - } - } - - definite_error_handle.give( - &definite_error_cap_set[0], - ReplicationError::Definite(Rc::new(err)), - ); - return Ok(()); - }, - Err(e) => Err(TransientError::from(e))?, - Ok(i) => i, - } - }; - - report_snapshot_size(&client, &worker_table_info, metrics, &config, &export_statistics).await?; - - for (&oid, outputs) in worker_table_info.iter() { + for (&oid, outputs) in tables_to_snapshot.iter() { for (&output_index, info) in outputs.iter() { if let Err(err) = verify_schema(oid, info, &upstream_info) { raw_handle @@ -432,10 +630,28 @@ pub(crate) fn render>( continue; } + // Get estimated block count from the broadcasted table statistics + let block_count = table_block_counts.get(&oid).copied().unwrap_or(0); + + // Calculate this worker's ctid range based on estimated blocks. + // When estimated_blocks is 0 (PG < 14 or empty table), fall back to + // single-worker mode using responsible_for to pick the worker. + let Some(ctid_range) = worker_ctid_range(&config, block_count, oid) else { + // This worker has no work for this table (more workers than blocks) + trace!( + %id, + "timely-{worker_id} no ctid range assigned for table {:?}({oid})", + info.desc.name + ); + continue; + }; + trace!( %id, - "timely-{worker_id} snapshotting table {:?}({oid}) @ {snapshot_lsn}", - info.desc.name + "timely-{worker_id} snapshotting table {:?}({oid}) output {output_index} \ + @ {snapshot_lsn} with ctid range {:?}", + info.desc.name, + ctid_range ); // To handle quoted/keyword names, we can use `Ident`'s AST printing, which @@ -448,8 +664,19 @@ pub(crate) fn render>( .iter() .map(|c| Ident::new_unchecked(&c.name).to_ast_string_stable()) .join(","); - let query = format!("COPY {namespace}.{table} ({column_list}) \ - TO STDOUT (FORMAT TEXT, DELIMITER '\t')"); + + + let ctid_filter = match ctid_range.end_block { + Some(end) => format!( + "WHERE ctid >= '({},0)'::tid AND ctid < '({},0)'::tid", + ctid_range.start_block, end + ), + None => format!("WHERE ctid >= '({},0)'::tid", ctid_range.start_block), + }; + let query = format!( + "COPY (SELECT {column_list} FROM {namespace}.{table} {ctid_filter}) \ + TO STDOUT (FORMAT TEXT, DELIMITER '\t')" + ); let mut stream = pin!(client.copy_out_simple(&query).await?); let mut snapshot_staged = 0; @@ -466,7 +693,6 @@ pub(crate) fn render>( } // final update for snapshot_staged, using the staged values as the total is an estimate export_statistics[&(oid, output_index)].set_snapshot_records_staged(snapshot_staged); - export_statistics[&(oid, output_index)].set_snapshot_records_known(snapshot_staged); } } @@ -474,10 +700,17 @@ pub(crate) fn render>( // that this happens after the snapshot has finished because this is what unblocks the // replication operator and we want this to happen serially. It might seem like a good // idea to read the replication stream concurrently with the snapshot but it actually - // leads to a lot of data being staged for the future, which needlesly consumed memory + // leads to a lot of data being staged for the future, which needlessly consumed memory // in the cluster. - for output in worker_table_info.values() { + // + // Since all workers now snapshot all tables (each with different ctid ranges), we only + // emit rewind requests from the worker responsible for each output to avoid duplicates. + for (&oid, output) in tables_to_snapshot.iter() { for (output_index, info) in output { + // Only emit rewind request from one worker per output + if !config.responsible_for((oid, *output_index)) { + continue; + } trace!(%id, "timely-{worker_id} producing rewind request for table {} output {output_index}", info.desc.name); let req = RewindRequest { output_index: *output_index, snapshot_lsn }; rewinds_handle.give(&rewind_cap_set[0], req); @@ -512,24 +745,17 @@ pub(crate) fn render>( let mut text_row = Row::default(); let mut final_row = Row::default(); let mut datum_vec = DatumVec::new(); - let mut next_worker = (0..u64::cast_from(scope.peers())) - // Round robin on 1000-records basis to avoid creating tiny containers when there are a - // small number of updates and a large number of workers. - .flat_map(|w| std::iter::repeat_n(w, 1000)) - .cycle(); - let round_robin = Exchange::new(move |_| next_worker.next().unwrap()); let snapshot_updates = raw_data - .map::, _, _>(Clone::clone) - .unary(round_robin, "PgCastSnapshotRows", |_, _| { + .unary(Pipeline, "PgCastSnapshotRows", |_, _| { move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); for ((oid, output_index, event), time, diff) in - data.flat_map(|data| data.drain(..)) + data.flat_map(|data| data.drain()) { let output = &table_info - .get(&oid) - .and_then(|outputs| outputs.get(&output_index)) + .get(oid) + .and_then(|outputs| outputs.get(output_index)) .expect("table_info contains all outputs"); let event = event @@ -546,7 +772,7 @@ pub(crate) fn render>( }) }); - session.give(((output_index, event), time, diff)); + session.give(((*output_index, event), *time, *diff)); } }); } @@ -662,7 +888,7 @@ fn decode_copy_row(data: &[u8], col_len: usize, row: &mut Row) -> Result<(), Def /// Record the sizes of the tables being snapshotted in `PgSnapshotMetrics` and emit snapshot statistics for each export. async fn report_snapshot_size( client: &Client, - worker_table_info: &BTreeMap>, + tables_to_snapshot: &BTreeMap>, metrics: PgSnapshotMetrics, config: &RawSourceCreationConfig, export_statistics: &BTreeMap<(u32, usize), SourceStatistics>, @@ -670,7 +896,7 @@ async fn report_snapshot_size( // TODO(guswynn): delete unused configs let snapshot_config = config.config.parameters.pg_snapshot_config; - for (&oid, outputs) in worker_table_info { + for (&oid, outputs) in tables_to_snapshot { // Use the first output's desc to make the table name since it is the same for all outputs let Some((_, info)) = outputs.first_key_value() else { continue; diff --git a/test/cluster-spec-sheet/mzcompose.py b/test/cluster-spec-sheet/mzcompose.py index 6eb4c5447ca24..c6ef7b36d8e79 100644 --- a/test/cluster-spec-sheet/mzcompose.py +++ b/test/cluster-spec-sheet/mzcompose.py @@ -88,6 +88,7 @@ SCENARIO_TPCH_QUERIES_STRONG = "tpch_queries_strong" SCENARIO_TPCH_QUERIES_WEAK = "tpch_queries_weak" SCENARIO_TPCH_STRONG = "tpch_strong" +SCENARIO_SOURCE_INGESTION_STRONG = "source_ingestion_strong" SCENARIO_QPS_ENVD_STRONG_SCALING = "qps_envd_strong_scaling" SCENARIOS_CLUSTERD = [ @@ -97,6 +98,7 @@ SCENARIO_TPCH_QUERIES_STRONG, SCENARIO_TPCH_QUERIES_WEAK, SCENARIO_TPCH_STRONG, + SCENARIO_SOURCE_INGESTION_STRONG, ] SCENARIOS_ENVIRONMENTD = [ SCENARIO_QPS_ENVD_STRONG_SCALING, @@ -196,7 +198,8 @@ def add_result( def run_query(self, query: str, fetch: bool = False, **params) -> list | None: query = dedent(query).strip() - print(f"> {query} {params or ''}") + if "CREATE SECRET" not in query: + print(f"> {query} {params or ''}") with self.connection as cur: cur.execute(query.encode(), params) if fetch: @@ -209,7 +212,7 @@ def measure( category: str, name: str, setup: list[str], - query: list[str], + query: list[str | tuple[str, list[tuple]]], after: list[str] = [], repetitions: int = 1, size_of_index: str | None = None, @@ -226,7 +229,13 @@ def inner() -> None: time.sleep(setup_delay) start_time = time.time() for query_part in query: - self.run_query(query_part) + if isinstance(query_part, str): + self.run_query(query_part) + else: + q = query_part[0] + expected = query_part[1] + actual = self.run_query(q, fetch=True) + assert actual == expected, f"Expected {expected}, got {actual}" end_time = time.time() if size_of_index: # We need to wait for the introspection source to catch up. @@ -1912,6 +1921,160 @@ def run(self, runner: ScenarioRunner) -> None: # We'll also want to measure latency, including tail latency. +class SourceIngestionScenario(Scenario): + def name(self) -> str: + return "source_ingestion" + + def materialize_views(self) -> list[str]: + return [] + + def drop(self) -> list[str]: + return [] + + def setup(self) -> list[str]: + # External setup was done once: + # Postgres (RDS) + # create user materialize password '...'; + # create table tbl (customer_id int, region_id int, customer_name text, customer_email text, customer_phone text); + # \set N 50000000 + # set synchronous_commit = off; + # insert into tbl + # select + # gs::int as customer_id, + # (1 + (random()*999)::int) as region_id, + # 'Customer ' || gs as customer_name, + # 'customer' || gs || '@example.com' as customer_email, + # lpad((random()*10000000000)::bigint::text, 10, '0') as customer_phone + # from generate_series(1, :N) as gs; + # analyze tbl; + + # MySQL (RDS) + # CREATE USER 'materialize'@'%' IDENTIFIED BY '...'; + # create table tbl (customer_id int, region_id int, customer_name text, customer_email text, customer_phone text); + # INSERT INTO tbl + # SELECT + # n AS customer_id, + # 1 + (RAND() * 999) AS region_id, + # CONCAT('Customer ', n) AS customer_name, + # CONCAT('customer', n, '@example.com') AS customer_email, + # LPAD(FLOOR(RAND() * 10000000000), 10, '0') AS customer_phone + # FROM ( + # SELECT (a.n + b.n*10 + c.n*100 + d.n*1000 + e.n*10000 + f.n*100000 + g.n*1000000 + h.n*10000000) AS n + # FROM (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) a + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) b + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) c + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) d + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) e + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) f + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) g + # CROSS JOIN (SELECT 0 n UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4 + # UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9) h + # ) nums + # WHERE n BETWEEN 1 AND 50000000; + + # Kafka (Confluent) + # node /usr/local/bin/datagen -f avro -n 50000000 -w 0 -p qa_cluster_spec_sheet -s table.json + # cat table.json + # [ + # { + # "_meta": { + # "topic": "table", + # "key": "customer_id" + # }, + # "customer_id": "iteration.index", + # "region_id": "faker.number.int({ min: 1, max: 4 })", + # "customer_name": "faker.person.fullName()", + # "customer_email": "faker.internet.email()", + # "customer_phone": "faker.phone.number()" + # } + # ] + + postgres_hostname = os.environ[ + "QA_CLUSTER_SPEC_SHEET_POSTGRES_HOSTNAME" + ].replace("%", "%%") + postgres_password = os.environ[ + "QA_CLUSTER_SPEC_SHEET_POSTGRES_PASSWORD" + ].replace("%", "%%") + mysql_hostname = os.environ["QA_CLUSTER_SPEC_SHEET_MYSQL_HOSTNAME"].replace( + "%", "%%" + ) + mysql_password = os.environ["QA_CLUSTER_SPEC_SHEET_MYSQL_PASSWORD"].replace( + "%", "%%" + ) + kafka_username = os.environ["CONFLUENT_CLOUD_QA_CANARY_KAFKA_USERNAME"].replace( + "%", "%%" + ) + kafka_password = os.environ["CONFLUENT_CLOUD_QA_CANARY_KAFKA_PASSWORD"].replace( + "%", "%%" + ) + csr_username = os.environ["CONFLUENT_CLOUD_QA_CANARY_CSR_USERNAME"].replace( + "%", "%%" + ) + csr_password = os.environ["CONFLUENT_CLOUD_QA_CANARY_CSR_PASSWORD"].replace( + "%", "%%" + ) + return [ + "DROP CONNECTION IF EXISTS pg_conn CASCADE;", + "DROP CONNECTION IF EXISTS mysql_conn CASCADE;", + "DROP CONNECTION IF EXISTS kafka_conn CASCADE;", + "DROP CONNECTION IF EXISTS csr_conn CASCADE;", + f"CREATE SECRET IF NOT EXISTS pgpass AS '{postgres_password}';", + f"CREATE CONNECTION pg_conn TO postgres (HOST '{postgres_hostname}', PORT 5432, USER materialize, PASSWORD SECRET pgpass, SSL MODE 'require', DATABASE 'postgres');", + f"CREATE SECRET IF NOT EXISTS mysqlpass AS '{mysql_password}';", + f"CREATE CONNECTION mysql_conn TO MYSQL (HOST '{mysql_hostname}', PORT 3306, USER 'materialize', PASSWORD SECRET mysqlpass, SSL MODE REQUIRED);", + f"CREATE SECRET IF NOT EXISTS kafka_username AS '{kafka_username}';", + f"CREATE SECRET IF NOT EXISTS kafka_password AS '{kafka_password}';", + "CREATE CONNECTION kafka_conn TO KAFKA (BROKER 'pkc-oxqxx9.us-east-1.aws.confluent.cloud:9092', SASL MECHANISMS = 'PLAIN', SASL USERNAME = SECRET kafka_username, SASL PASSWORD = SECRET kafka_password);", + f"CREATE SECRET IF NOT EXISTS csr_username AS '{csr_username}';", + f"CREATE SECRET IF NOT EXISTS csr_password AS '{csr_password}';", + "CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (URL 'https://psrc-e0919.us-east-2.aws.confluent.cloud', USERNAME = SECRET csr_username, PASSWORD = SECRET csr_password);", + ] + + def run(self, runner: ScenarioRunner) -> None: + runner.measure( + "hydration", + "postgres", + setup=["DROP SOURCE IF EXISTS pg_source CASCADE;"], + query=[ + "CREATE SOURCE pg_source IN CLUSTER c FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source') FOR TABLES (tbl AS pg_table);", + # TODO: Use `CREATE TABLE FROM SOURCE` once supported in prod. + # "CREATE TABLE pg_table FROM SOURCE qa_cluster_spec_sheet_pg_source (REFERENCE tbl);", + ("SELECT count(*) FROM pg_table;", [(50000000,)]), + ], + ) + + runner.measure( + "hydration", + "mysql", + setup=["DROP SOURCE IF EXISTS mysql_source CASCADE;"], + query=[ + "CREATE SOURCE mysql_source IN CLUSTER c FROM MYSQL CONNECTION mysql_conn FOR TABLES (admin.tbl AS mysql_table);", + # TODO: Use `CREATE TABLE FROM SOURCE` once supported in prod. + # "CREATE TABLE mysql_table FROM SOURCE mysql_source (REFERENCE admin.tbl);", + ("SELECT count(*) FROM mysql_table;", [(50000000,)]), + ], + ) + + runner.measure( + "hydration", + "kafka", + setup=["DROP SOURCE IF EXISTS kafka_table CASCADE;"], + query=[ + "CREATE SOURCE kafka_table IN CLUSTER c FROM KAFKA CONNECTION kafka_conn (TOPIC 'qa_cluster_spec_sheet_table') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE NONE;", + # TODO: Use `CREATE TABLE FROM SOURCE` once supported in prod. + # "CREATE TABLE kafka_table FROM SOURCE kafka_source;", + ("SELECT count(*) FROM kafka_table;", [(50000000,)]), + ], + ) + + # TODO: We should factor out the below # `disable_region`, `cloud_disable_enable_and_wait`, `reconfigure_envd_cpus`, `wait_for_envd` # functions into a separate module. (Similar `disable_region` functions also occur in other tests.) @@ -2286,6 +2449,17 @@ def process(scenario: str) -> None: target=target, max_scale=max_scale, ) + if scenario == SCENARIO_SOURCE_INGESTION_STRONG: + print("--- SCENARIO: Running Source ingestion scaling") + run_scenario_strong( + scenario=SourceIngestionScenario( + args.scale_auction, target.replica_size_for_scale(1) + ), + results_writer=cluster_writer, + connection=conn, + target=target, + max_scale=max_scale, + ) if scenario == SCENARIO_QPS_ENVD_STRONG_SCALING: print("--- SCENARIO: Running QPS envd strong scaling") run_scenario_envd_strong_scaling( @@ -2429,7 +2603,14 @@ def replica_size_for_scale(self, scale: int) -> str: """ Returns the replica size for a given scale. """ - return f"{scale}00cc" + return { + 1: "M.1-xsmall", + 2: "M.1-small", + 4: "M.1-large", + 8: "M.1-2xlarge", + 16: "M.1-4xlarge", + 32: "M.1-8xlarge", + }[scale] class DockerTarget(BenchTarget): @@ -2464,7 +2645,7 @@ def cleanup(self) -> None: self.composition.stop("materialized") def replica_size_for_scale(self, scale: int) -> str: - # 100cc == 2 workers + # M.1-xsmall == 2 workers return f"scale=1,workers={2*scale}" def max_scale(self) -> int | None: @@ -2736,19 +2917,14 @@ def analyze_cluster_results_file(file: str) -> None: print(f"--- Analyzing cluster results file {file} ...") def extract_cluster_size(s: str) -> float: - match = re.search(r"(\d+)(?:(cc)|(C))", s) - if match: - if match.group(2): # 'cc' match - return float(match.group(1)) / 100.0 - elif match.group(3): # 'C' matches - return float(match.group(1)) - match = re.search(r"(?:scale=)(\d+)(?:,workers=)(\d+)", s) - if match: - # We don't have credits in docker, so approximate it - # 100cc == 2 workers - if match.group(1) and match.group(2): - return float(match.group(1)) * float(match.group(2)) / 2 - raise ValueError(f"Invalid cluster size format: {s}") + return { + "M.1-xsmall": 1, + "M.1-small": 2, + "M.1-large": 4, + "M.1-2xlarge": 8, + "M.1-4xlarge": 16, + "M.1-8xlarge": 32, + }[s] df = pd.read_csv(file) if df.empty: