diff --git a/src/storage/src/source/mysql/replication/events.rs b/src/storage/src/source/mysql/replication/events.rs index b738e5df0ed86..11778b2a96545 100644 --- a/src/storage/src/source/mysql/replication/events.rs +++ b/src/storage/src/source/mysql/replication/events.rs @@ -111,44 +111,68 @@ pub(super) async fn handle_query_event( } } } - // Detect `DROP TABLE [IF EXISTS] , ` statements. Since - // this can drop multiple tables we just check all tables we care about + // Detect `DROP TABLE [IF EXISTS] , ` statements. + // Parse the dropped table names directly from the SQL rather than + // querying MySQL via verify_schemas, because during a piped restore + // (mysql < backup.sql) the table may already be recreated by the time + // verify_schemas opens a connection and queries information_schema. + // + // NOTE: This correctly detects the drop (health transitions to Stalled, + // errored_outputs is updated, new rows are blocked) but the error + // emitted via give_fueled does not reach the query result — SELECT + // still returns the original snapshot data. The error likely gets stuck + // in the reclock/persist pipeline. See PR #35926 for details. (Some("drop"), Some("table")) => { - let mut conn = ctx - .connection_config - .connect( - &format!("timely-{worker_id} MySQL "), - &ctx.config.config.connection_context.ssh_tunnel_manager, - ) - .await?; - let expected = ctx - .table_info - .iter() - .map(|(t, info)| { - ( - t, - info.iter() - .filter(|output| !ctx.errored_outputs.contains(&output.output_index)), - ) - }) - .collect(); - let schema_errors = verify_schemas(&mut *conn, expected).await?; is_complete_event = true; - for (dropped_output, err) in schema_errors { - tracing::info!(%id, "timely-{worker_id} DDL change \ - dropped output: {dropped_output:?}: {err:?}"); - let gtid_cap = ctx.data_cap_set.delayed(new_gtid); - ctx.data_output - .give_fueled( - >id_cap, - ( - (dropped_output.output_index, Err(err.into())), - new_gtid.clone(), - Diff::ONE, - ), - ) - .await; - ctx.errored_outputs.insert(dropped_output.output_index); + let mut remaining: Vec<&str> = query_iter.collect(); + // Strip trailing /* ... */ comments added by the MySQL server + // (e.g. "DROP TABLE IF EXISTS `t` /* generated by server */"). + if let Some(pos) = remaining.iter().position(|t| t.starts_with("/*")) { + remaining.truncate(pos); + } + // Skip optional IF EXISTS tokens. + if remaining + .first() + .is_some_and(|t| t.eq_ignore_ascii_case("if")) + { + remaining.remove(0); + if remaining + .first() + .is_some_and(|t| t.eq_ignore_ascii_case("exists")) + { + remaining.remove(0); + } + } + // Parse comma-separated table names. + let table_list = remaining.join(" "); + for name in table_list.split(',') { + let name = name.trim().trim_end_matches(';'); + if name.is_empty() { + continue; + } + let table = table_ident(name, ¤t_schema)?; + if let Some(outputs) = ctx.table_info.get(&table) { + for output in outputs { + if ctx.errored_outputs.contains(&output.output_index) { + continue; + } + let err = DefiniteError::TableDropped(table.to_string()); + tracing::info!(%id, "timely-{worker_id} DDL change \ + dropped output: {output:?}: {err:?}"); + let gtid_cap = ctx.data_cap_set.delayed(new_gtid); + ctx.data_output + .give_fueled( + >id_cap, + ( + (output.output_index, Err(err.into())), + new_gtid.clone(), + Diff::ONE, + ), + ) + .await; + ctx.errored_outputs.insert(output.output_index); + } + } } } // Detect `TRUNCATE [TABLE] ` statements diff --git a/test/mysql-cdc/drop-recreate/setup.td b/test/mysql-cdc/drop-recreate/setup.td new file mode 100644 index 0000000000000..26c392c3df0fe --- /dev/null +++ b/test/mysql-cdc/drop-recreate/setup.td @@ -0,0 +1,32 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE drop_recreate (f1 INTEGER, f2 TEXT); +INSERT INTO drop_recreate VALUES (1, 'before'); + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' +> CREATE CONNECTION myconn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +> BEGIN +> CREATE SOURCE mysrc FROM MYSQL CONNECTION myconn; +> CREATE TABLE drop_recreate FROM SOURCE mysrc (REFERENCE public.drop_recreate); +> COMMIT + +> SELECT * FROM drop_recreate; +1 before diff --git a/test/mysql-cdc/drop-recreate/verify.td b/test/mysql-cdc/drop-recreate/verify.td new file mode 100644 index 0000000000000..bfcccc67285b4 --- /dev/null +++ b/test/mysql-cdc/drop-recreate/verify.td @@ -0,0 +1,13 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# The source should detect the drop and error, even though the table +# was immediately recreated with the same schema. +! SELECT * FROM drop_recreate; +contains:Source error diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index e4b9ebaf51690..dddd96e67c7de 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -155,6 +155,72 @@ def workflow_replica_connection(c: Composition, parser: WorkflowArgumentParser) ) +def workflow_drop_recreate(c: Composition, parser: WorkflowArgumentParser) -> None: + """ + Regression test for database-issues#7683: DROP TABLE followed by immediate + CREATE TABLE with the same schema (as happens during mysqldump restore). + + When handling a DROP TABLE binlog event, the replication worker opens a new + connection and calls verify_schemas() which queries the CURRENT state of + information_schema. If the table has already been recreated by the time this + query runs (because DROP+CREATE executed back-to-back on the server without + client round-trip delays), verify_schemas sees a matching schema and the + drop goes undetected. + + The race requires the worker to fall behind the binlog stream so that by the + time verify_schemas runs, the table has been recreated. This happens during + mysqldump --all-databases restores because they generate DDL events for many + system tables, each of which triggers a verify_schemas call that takes time + (new connection + query). While the worker processes these events one by one, + MySQL continues executing the restore script, recreating the tracked table. + + This test replicates that scenario: mysqldump + piped restore. + It isolates the same race that mysql-cdc-resumption/backup_restore_mysql + encounters. The fix (parsing DROP TABLE statements directly instead of + calling verify_schemas) also unblocks uncommenting verify-source-failed.td + in that test. + """ + + mysql_version = get_targeted_mysql_version(parser) + with c.override(create_mysql(mysql_version)): + c.up("materialized", "mysql") + c.run_testdrive_files( + f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}", + "drop-recreate/setup.td", + ) + + # Create a mysqldump backup (like the backup_restore_mysql test). + # mysqldump --all-databases generates DROP TABLE + CREATE TABLE for every + # table including system tables, producing a flood of DDL events. + c.exec( + "mysql", + "bash", + "-c", + f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD}" + " && mysqldump --all-databases -u root --set-gtid-purged=OFF" + " > /tmp/backup.sql", + ) + + # Restore from the backup via pipe. MySQL processes the entire script + # back-to-back, generating many DDL binlog events. The worker falls + # behind processing verify_schemas calls for each DROP TABLE, and by + # the time it handles the DROP for our tracked table, that table has + # already been recreated. + c.exec( + "mysql", + "bash", + "-c", + f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD}" + " && mysql -u root < /tmp/backup.sql", + ) + + with c.override(Testdrive(no_reset=True), create_mysql(mysql_version)): + c.run_testdrive_files( + f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}", + "drop-recreate/verify.td", + ) + + def workflow_schema_change_restart( c: Composition, parser: WorkflowArgumentParser ) -> None: