diff --git a/src/current/v25.4/manage-logical-data-replication.md b/src/current/v25.4/manage-logical-data-replication.md index 3debd4bf7c4..c7f21c2ca52 100644 --- a/src/current/v25.4/manage-logical-data-replication.md +++ b/src/current/v25.4/manage-logical-data-replication.md @@ -33,13 +33,7 @@ When a conflict cannot apply due to violating [constraints]({% link {{ page.vers ### Dead letter queue (DLQ) -When the LDR job starts, it will create a DLQ table with each replicating table so that unresolved conflicts can be tracked. The DLQ will contain the writes that LDR cannot apply after the retry period of a minute, which could occur if there is a unique index on the destination table (for more details, refer to [Unique seconday indexes]({% link {{ page.version.version }}/set-up-logical-data-replication.md %}#unique-secondary-indexes)). - -{{site.data.alerts.callout_info}} -LDR will not pause when the writes are sent to the DLQ, you must manage the DLQ manually. -{{site.data.alerts.end}} - -To manage the DLQ, you can evaluate entries in the `incoming_row` column and apply the row manually to another table with SQL statements. +When the LDR job starts, it creates a DLQ table with each replicating table so that unresolved conflicts can be tracked. The DLQ contains the writes that LDR cannot apply after the retry period of a minute, which could occur if there is a unique index on the destination table (for more details, refer to [Unique secondary indexes]({% link {{ page.version.version }}/set-up-logical-data-replication.md %}#unique-secondary-indexes)). As an example, for an LDR stream created on the `movr.public.promo_codes` table: @@ -80,6 +74,82 @@ CONSTRAINT dlq_113_public_promo_codes_pkey PRIMARY KEY (ingestion_job_id ASC, dl ) ~~~ +#### Manage entries in the DLQ + +LDR does not pause when writes are sent to the DLQ. You must manage the DLQ manually by examining each entry in the DLQ and either reinserting the entry or deleting it from the DLQ. If you have multiple DLQ entries, manage them in order from most recent to least recent. + +To manage an entry in the DLQ: + +1. In the destination database's DLQ table, examine the `incoming_row` column to find the primary key and values for the entry. + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + SELECT id, dlq_timestamp, incoming_row FROM crdb_replication.dlq_271_foo; + ~~~ + + In this example result, `incoming_row` contains a primary key of `207` identified by the column `my_id`, as well as the values of the entry's columns `created_at` and `payload`. + + {% include_cached copy-clipboard.html %} + ~~~ sql + id | dlq_timestamp | incoming_row + ----------------------+---------------------+----------+-------------------------------+----------------------------------------------------------------- + 106677386757203 | 2025-04-25 25:32:28.435439+00 | {"created_at": "2025-04-25:35:00.499499", "payload": "updated_value", "my_id": 207} + ~~~ + +1. Determine whether the values for the entry in the DLQ match the values for the entry in the destination table and the source table respectively: + + 1. On the destination database, check the values for the entry and the replicated time: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + SELECT * FROM destDB.foo WHERE my_id = 207; + + WITH t as (SHOW LOGICAL REPLICATION JOBS) + SELECT job_id, replicated_time FROM t; + ~~~ + + 1. On the source database, check the values for the entry as of the replicated time: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the source database: + SELECT * FROM sourceDB.foo WHERE my_id = 207 AS OF SYSTEM TIME '{replicated time}'; + ~~~ + +1. Determine a course of action based on the results of the previous steps: + + - If the values for the entry are the same in both the destination table and the source table, delete the entry from the DLQ on the destination database: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + DELETE FROM crdb_replication.dlq_271_foo WHERE id = 106677386757203; + ~~~ + + - If the entry's values in the destination table are different from its values in the source table, but the entry's values in the source table equal its values in the DLQ, update the entry in the destination table to have the same values as in the source table: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + UPSERT into destDB.foo VALUES (207, '2025-04-25:35:00.499499', 'updated_value'); + ~~~ + + If this upsert fails due to a constraint violation, you must either delete the row that the upsert conflicts with or delete the DLQ entry. If the destination table has unique or foreign key constraints, the DLQ will likely continue to accumulate entries. + + - If the entry's values in the destination table are different from its values in the source table, and the entry's values in the source table do not equal its values in the DLQ, refresh the replicated time and retry the equality queries above. If the same results hold after a few retries with refreshed replicated times, there is likely a more recent entry for the same row in the DLQ. + + 1. To find the more recent entry, find all entries in the DLQ with the matching primary key: + + {% include_cached copy-clipboard.html %} + ~~~ sql + # On the destination database: + SELECT id, dlq_timestamp, incoming_row FROM crdb_replication.dlq_271_foo WHERE incoming_row->>'my_id' = 207; + ~~~ + + 1. If there are more recent entries for the row, delete the less recent entries and repeat these steps to manage the most recent entry. + ## Schema changes When you start LDR on a table, the job will lock the schema, which will prevent any accidental [schema changes]({% link {{ page.version.version }}/online-schema-changes.md %}) that would cause issues for LDR. There are some [supported schema changes](#supported-schema-changes) that you can perform on a replicating table, otherwise it is necessary to stop LDR in order to [coordinate the schema change](#coordinate-other-schema-changes). diff --git a/src/current/v26.1/manage-logical-data-replication.md b/src/current/v26.1/manage-logical-data-replication.md index 3debd4bf7c4..c7f21c2ca52 100644 --- a/src/current/v26.1/manage-logical-data-replication.md +++ b/src/current/v26.1/manage-logical-data-replication.md @@ -33,13 +33,7 @@ When a conflict cannot apply due to violating [constraints]({% link {{ page.vers ### Dead letter queue (DLQ) -When the LDR job starts, it will create a DLQ table with each replicating table so that unresolved conflicts can be tracked. The DLQ will contain the writes that LDR cannot apply after the retry period of a minute, which could occur if there is a unique index on the destination table (for more details, refer to [Unique seconday indexes]({% link {{ page.version.version }}/set-up-logical-data-replication.md %}#unique-secondary-indexes)). - -{{site.data.alerts.callout_info}} -LDR will not pause when the writes are sent to the DLQ, you must manage the DLQ manually. -{{site.data.alerts.end}} - -To manage the DLQ, you can evaluate entries in the `incoming_row` column and apply the row manually to another table with SQL statements. +When the LDR job starts, it creates a DLQ table with each replicating table so that unresolved conflicts can be tracked. The DLQ contains the writes that LDR cannot apply after the retry period of a minute, which could occur if there is a unique index on the destination table (for more details, refer to [Unique secondary indexes]({% link {{ page.version.version }}/set-up-logical-data-replication.md %}#unique-secondary-indexes)). As an example, for an LDR stream created on the `movr.public.promo_codes` table: @@ -80,6 +74,82 @@ CONSTRAINT dlq_113_public_promo_codes_pkey PRIMARY KEY (ingestion_job_id ASC, dl ) ~~~ +#### Manage entries in the DLQ + +LDR does not pause when writes are sent to the DLQ. You must manage the DLQ manually by examining each entry in the DLQ and either reinserting the entry or deleting it from the DLQ. If you have multiple DLQ entries, manage them in order from most recent to least recent. + +To manage an entry in the DLQ: + +1. In the destination database's DLQ table, examine the `incoming_row` column to find the primary key and values for the entry. + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + SELECT id, dlq_timestamp, incoming_row FROM crdb_replication.dlq_271_foo; + ~~~ + + In this example result, `incoming_row` contains a primary key of `207` identified by the column `my_id`, as well as the values of the entry's columns `created_at` and `payload`. + + {% include_cached copy-clipboard.html %} + ~~~ sql + id | dlq_timestamp | incoming_row + ----------------------+---------------------+----------+-------------------------------+----------------------------------------------------------------- + 106677386757203 | 2025-04-25 25:32:28.435439+00 | {"created_at": "2025-04-25:35:00.499499", "payload": "updated_value", "my_id": 207} + ~~~ + +1. Determine whether the values for the entry in the DLQ match the values for the entry in the destination table and the source table respectively: + + 1. On the destination database, check the values for the entry and the replicated time: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + SELECT * FROM destDB.foo WHERE my_id = 207; + + WITH t as (SHOW LOGICAL REPLICATION JOBS) + SELECT job_id, replicated_time FROM t; + ~~~ + + 1. On the source database, check the values for the entry as of the replicated time: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the source database: + SELECT * FROM sourceDB.foo WHERE my_id = 207 AS OF SYSTEM TIME '{replicated time}'; + ~~~ + +1. Determine a course of action based on the results of the previous steps: + + - If the values for the entry are the same in both the destination table and the source table, delete the entry from the DLQ on the destination database: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + DELETE FROM crdb_replication.dlq_271_foo WHERE id = 106677386757203; + ~~~ + + - If the entry's values in the destination table are different from its values in the source table, but the entry's values in the source table equal its values in the DLQ, update the entry in the destination table to have the same values as in the source table: + + {% include_cached copy-clipboard.html %} + ~~~ sql + #On the destination database: + UPSERT into destDB.foo VALUES (207, '2025-04-25:35:00.499499', 'updated_value'); + ~~~ + + If this upsert fails due to a constraint violation, you must either delete the row that the upsert conflicts with or delete the DLQ entry. If the destination table has unique or foreign key constraints, the DLQ will likely continue to accumulate entries. + + - If the entry's values in the destination table are different from its values in the source table, and the entry's values in the source table do not equal its values in the DLQ, refresh the replicated time and retry the equality queries above. If the same results hold after a few retries with refreshed replicated times, there is likely a more recent entry for the same row in the DLQ. + + 1. To find the more recent entry, find all entries in the DLQ with the matching primary key: + + {% include_cached copy-clipboard.html %} + ~~~ sql + # On the destination database: + SELECT id, dlq_timestamp, incoming_row FROM crdb_replication.dlq_271_foo WHERE incoming_row->>'my_id' = 207; + ~~~ + + 1. If there are more recent entries for the row, delete the less recent entries and repeat these steps to manage the most recent entry. + ## Schema changes When you start LDR on a table, the job will lock the schema, which will prevent any accidental [schema changes]({% link {{ page.version.version }}/online-schema-changes.md %}) that would cause issues for LDR. There are some [supported schema changes](#supported-schema-changes) that you can perform on a replicating table, otherwise it is necessary to stop LDR in order to [coordinate the schema change](#coordinate-other-schema-changes).