Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 60 additions & 36 deletions src/storage/src/source/mysql/replication/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,44 +111,68 @@ pub(super) async fn handle_query_event(
}
}
}
// Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements. Since
// this can drop multiple tables we just check all tables we care about
// Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements.
// Parse the dropped table names directly from the SQL rather than
// querying MySQL via verify_schemas, because during a piped restore
// (mysql < backup.sql) the table may already be recreated by the time
// verify_schemas opens a connection and queries information_schema.
//
// NOTE: This correctly detects the drop (health transitions to Stalled,
// errored_outputs is updated, new rows are blocked) but the error
// emitted via give_fueled does not reach the query result — SELECT
// still returns the original snapshot data. The error likely gets stuck
// in the reclock/persist pipeline. See PR #35926 for details.
(Some("drop"), Some("table")) => {
let mut conn = ctx
.connection_config
.connect(
&format!("timely-{worker_id} MySQL "),
&ctx.config.config.connection_context.ssh_tunnel_manager,
)
.await?;
let expected = ctx
.table_info
.iter()
.map(|(t, info)| {
(
t,
info.iter()
.filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
)
})
.collect();
let schema_errors = verify_schemas(&mut *conn, expected).await?;
is_complete_event = true;
for (dropped_output, err) in schema_errors {
tracing::info!(%id, "timely-{worker_id} DDL change \
dropped output: {dropped_output:?}: {err:?}");
let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
ctx.data_output
.give_fueled(
&gtid_cap,
(
(dropped_output.output_index, Err(err.into())),
new_gtid.clone(),
Diff::ONE,
),
)
.await;
ctx.errored_outputs.insert(dropped_output.output_index);
let mut remaining: Vec<&str> = query_iter.collect();
// Strip trailing /* ... */ comments added by the MySQL server
// (e.g. "DROP TABLE IF EXISTS `t` /* generated by server */").
if let Some(pos) = remaining.iter().position(|t| t.starts_with("/*")) {
remaining.truncate(pos);
}
// Skip optional IF EXISTS tokens.
if remaining
.first()
.is_some_and(|t| t.eq_ignore_ascii_case("if"))
{
remaining.remove(0);
if remaining
.first()
.is_some_and(|t| t.eq_ignore_ascii_case("exists"))
{
remaining.remove(0);
}
}
// Parse comma-separated table names.
let table_list = remaining.join(" ");
for name in table_list.split(',') {
let name = name.trim().trim_end_matches(';');
if name.is_empty() {
continue;
}
let table = table_ident(name, &current_schema)?;
if let Some(outputs) = ctx.table_info.get(&table) {
for output in outputs {
if ctx.errored_outputs.contains(&output.output_index) {
continue;
}
let err = DefiniteError::TableDropped(table.to_string());
tracing::info!(%id, "timely-{worker_id} DDL change \
dropped output: {output:?}: {err:?}");
let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
ctx.data_output
.give_fueled(
&gtid_cap,
(
(output.output_index, Err(err.into())),
new_gtid.clone(),
Diff::ONE,
),
)
.await;
ctx.errored_outputs.insert(output.output_index);
}
}
}
}
// Detect `TRUNCATE [TABLE] <tbl>` statements
Expand Down
32 changes: 32 additions & 0 deletions test/mysql-cdc/drop-recreate/setup.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
CREATE TABLE drop_recreate (f1 INTEGER, f2 TEXT);
INSERT INTO drop_recreate VALUES (1, 'before');

> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}'
> CREATE CONNECTION myconn TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET mysqlpass
)

> BEGIN
> CREATE SOURCE mysrc FROM MYSQL CONNECTION myconn;
> CREATE TABLE drop_recreate FROM SOURCE mysrc (REFERENCE public.drop_recreate);
> COMMIT

> SELECT * FROM drop_recreate;
1 before
13 changes: 13 additions & 0 deletions test/mysql-cdc/drop-recreate/verify.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# The source should detect the drop and error, even though the table
# was immediately recreated with the same schema.
! SELECT * FROM drop_recreate;
contains:Source error
66 changes: 66 additions & 0 deletions test/mysql-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,72 @@ def workflow_replica_connection(c: Composition, parser: WorkflowArgumentParser)
)


def workflow_drop_recreate(c: Composition, parser: WorkflowArgumentParser) -> None:
"""
Regression test for database-issues#7683: DROP TABLE followed by immediate
CREATE TABLE with the same schema (as happens during mysqldump restore).

When handling a DROP TABLE binlog event, the replication worker opens a new
connection and calls verify_schemas() which queries the CURRENT state of
information_schema. If the table has already been recreated by the time this
query runs (because DROP+CREATE executed back-to-back on the server without
client round-trip delays), verify_schemas sees a matching schema and the
drop goes undetected.

The race requires the worker to fall behind the binlog stream so that by the
time verify_schemas runs, the table has been recreated. This happens during
mysqldump --all-databases restores because they generate DDL events for many
system tables, each of which triggers a verify_schemas call that takes time
(new connection + query). While the worker processes these events one by one,
MySQL continues executing the restore script, recreating the tracked table.

This test replicates that scenario: mysqldump + piped restore.
It isolates the same race that mysql-cdc-resumption/backup_restore_mysql
encounters. The fix (parsing DROP TABLE statements directly instead of
calling verify_schemas) also unblocks uncommenting verify-source-failed.td
in that test.
"""

mysql_version = get_targeted_mysql_version(parser)
with c.override(create_mysql(mysql_version)):
c.up("materialized", "mysql")
c.run_testdrive_files(
f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
"drop-recreate/setup.td",
)

# Create a mysqldump backup (like the backup_restore_mysql test).
# mysqldump --all-databases generates DROP TABLE + CREATE TABLE for every
# table including system tables, producing a flood of DDL events.
c.exec(
"mysql",
"bash",
"-c",
f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD}"
" && mysqldump --all-databases -u root --set-gtid-purged=OFF"
" > /tmp/backup.sql",
)

# Restore from the backup via pipe. MySQL processes the entire script
# back-to-back, generating many DDL binlog events. The worker falls
# behind processing verify_schemas calls for each DROP TABLE, and by
# the time it handles the DROP for our tracked table, that table has
# already been recreated.
c.exec(
"mysql",
"bash",
"-c",
f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD}"
" && mysql -u root < /tmp/backup.sql",
)

with c.override(Testdrive(no_reset=True), create_mysql(mysql_version)):
c.run_testdrive_files(
f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
"drop-recreate/verify.td",
)


def workflow_schema_change_restart(
c: Composition, parser: WorkflowArgumentParser
) -> None:
Expand Down
Loading