Skip to content

Commit 04173a9

Browse files
authored
fix(replication): detect misconfigered run replication publication and output helpful error messages (#2736)
Add validation for logical replication publication configuration. Helps diagnose an issue where runs are no longer replicated to clickhouse because of a configuration issue with the replication publication. ## Problem The `LogicalReplicationClient` only checked if a publication existed, not if it was correctly configured. This caused a silent failure where: - Replication would start successfully - Transaction boundaries (begin/commit) were received - **But no actual data changes were replicated** This happened when a publication existed but: 1. Had no tables associated with it 2. Was missing required actions (e.g., `delete`) ## Solution Added `#validatePublicationConfiguration()` method that validates: - ✅ Publication includes the expected table - ✅ Publication has all required actions configured When validation fails, error messages include the exact SQL command to fix the issue: **Missing table:** ``` Publication 'task_runs_to_clickhouse_v1_publication' exists but has NO TABLES configured. Expected table: "public.TaskRun". Run: ALTER PUBLICATION task_runs_to_clickhouse_v1_publication ADD TABLE "TaskRun"; ``` **Missing actions:** ``` Publication 'task_runs_to_clickhouse_v1_publication' is missing required actions. Expected: [insert, update, delete], Current: [insert, update], Missing: [delete]. Run: ALTER PUBLICATION task_runs_to_clickhouse_v1_publication SET (publish = 'insert, update, delete'); ``` This prevents silent data loss and makes debugging configuration issues much easier.
1 parent da4c753 commit 04173a9

File tree

1 file changed

+109
-1
lines changed

1 file changed

+109
-1
lines changed

internal-packages/replication/src/client.ts

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,31 @@ export class LogicalReplicationClient {
413413
return false;
414414
}
415415

416-
if (await this.#doesPublicationExist()) {
416+
const publicationExists = await this.#doesPublicationExist();
417+
418+
if (publicationExists) {
419+
// Validate the existing publication is correctly configured
420+
const validationError = await this.#validatePublicationConfiguration();
421+
422+
if (validationError) {
423+
this.logger.error("Publication exists but is misconfigured", {
424+
name: this.options.name,
425+
table: this.options.table,
426+
slotName: this.options.slotName,
427+
publicationName: this.options.publicationName,
428+
error: validationError,
429+
});
430+
431+
this.events.emit("error", new LogicalReplicationClientError(validationError));
432+
return false;
433+
}
434+
435+
this.logger.info("Publication exists and is correctly configured", {
436+
name: this.options.name,
437+
table: this.options.table,
438+
publicationName: this.options.publicationName,
439+
});
440+
417441
return true;
418442
}
419443

@@ -459,6 +483,90 @@ export class LogicalReplicationClient {
459483
return res.rows[0].exists;
460484
}
461485

486+
async #validatePublicationConfiguration(): Promise<string | null> {
487+
if (!this.client) {
488+
return "Cannot validate publication configuration: client not connected";
489+
}
490+
491+
// Check if the publication has the correct table
492+
const tablesRes = await this.client.query(
493+
`SELECT schemaname, tablename
494+
FROM pg_publication_tables
495+
WHERE pubname = '${this.options.publicationName}';`
496+
);
497+
498+
const tables = tablesRes.rows;
499+
const expectedTable = this.options.table;
500+
501+
// Check if the table is in the publication
502+
const hasTable = tables.some(
503+
(row) => row.tablename === expectedTable && row.schemaname === "public"
504+
);
505+
506+
if (!hasTable) {
507+
if (tables.length === 0) {
508+
return `Publication '${this.options.publicationName}' exists but has NO TABLES configured. Expected table: "public.${expectedTable}". Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`;
509+
} else {
510+
const tableList = tables.map((t) => `"${t.schemaname}"."${t.tablename}"`).join(", ");
511+
return `Publication '${this.options.publicationName}' exists but does not include the required table "public.${expectedTable}". Current tables: ${tableList}. Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`;
512+
}
513+
}
514+
515+
// Check if the publication has the correct actions configured
516+
if (this.options.publicationActions && this.options.publicationActions.length > 0) {
517+
const actionsRes = await this.client.query(
518+
`SELECT pubinsert, pubupdate, pubdelete, pubtruncate
519+
FROM pg_publication
520+
WHERE pubname = '${this.options.publicationName}';`
521+
);
522+
523+
if (actionsRes.rows.length === 0) {
524+
return `Publication '${this.options.publicationName}' not found when checking actions`;
525+
}
526+
527+
const actualActions = actionsRes.rows[0];
528+
const missingActions: string[] = [];
529+
530+
for (const action of this.options.publicationActions) {
531+
switch (action) {
532+
case "insert":
533+
if (!actualActions.pubinsert) missingActions.push("insert");
534+
break;
535+
case "update":
536+
if (!actualActions.pubupdate) missingActions.push("update");
537+
break;
538+
case "delete":
539+
if (!actualActions.pubdelete) missingActions.push("delete");
540+
break;
541+
case "truncate":
542+
if (!actualActions.pubtruncate) missingActions.push("truncate");
543+
break;
544+
}
545+
}
546+
547+
if (missingActions.length > 0) {
548+
const currentActions: string[] = [];
549+
if (actualActions.pubinsert) currentActions.push("insert");
550+
if (actualActions.pubupdate) currentActions.push("update");
551+
if (actualActions.pubdelete) currentActions.push("delete");
552+
if (actualActions.pubtruncate) currentActions.push("truncate");
553+
554+
return `Publication '${
555+
this.options.publicationName
556+
}' is missing required actions. Expected: [${this.options.publicationActions.join(
557+
", "
558+
)}], Current: [${currentActions.join(", ")}], Missing: [${missingActions.join(
559+
", "
560+
)}]. Run: ALTER PUBLICATION ${
561+
this.options.publicationName
562+
} SET (publish = '${this.options.publicationActions.join(", ")}');`;
563+
}
564+
}
565+
566+
// All validations passed
567+
return null;
568+
}
569+
462570
async #createSlot(): Promise<boolean> {
463571
if (!this.client) {
464572
this.events.emit("error", new LogicalReplicationClientError("Cannot create slot"));

0 commit comments

Comments
 (0)