From 723673980b4b42ffc675ae4cf3e15ba5dd729c58 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 2 Feb 2026 14:19:00 +0100 Subject: [PATCH 1/3] #706 Add the ability to delete bookkeeping tables from the database via `Bookeeper` trait. --- .../pramen/core/bookkeeper/Bookkeeper.scala | 17 ++++- .../core/bookkeeper/BookkeeperBase.scala | 2 + .../core/bookkeeper/BookkeeperDeltaPath.scala | 2 + .../core/bookkeeper/BookkeeperJdbc.scala | 67 +++++++++++++++++++ .../core/bookkeeper/BookkeeperMongoDb.scala | 2 + .../core/bookkeeper/BookkeeperNull.scala | 2 + .../core/bookkeeper/BookkeeperText.scala | 2 + .../bookkeeper/BookkeeperDeltaTable.scala | 2 + .../bookkeeper/BookkeeperDeltaTable.scala | 2 + .../bookkeeper/BookkeeperDeltaTable.scala | 2 + .../mocks/bookkeeper/SyncBookkeeperMock.scala | 24 +++++++ .../bookkeeper/BookkeeperJdbcSuite.scala | 38 +++++++++++ 12 files changed, 160 insertions(+), 2 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala index cbcbf95f..f321d02d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala @@ -34,9 +34,9 @@ import za.co.absa.pramen.core.rdb.PramenDb import java.time.LocalDate /** - * A bookkeeper is responsible of querying and updating state of all tables related to an ingestion pipeline. + * A bookkeeper is responsible for querying and updating state of all tables related to an ingestion pipeline. */ -trait Bookkeeper { +trait Bookkeeper extends AutoCloseable { val bookkeepingEnabled: Boolean def getLatestProcessedDate(table: String, until: Option[LocalDate] = None): Option[LocalDate] @@ -51,6 +51,19 @@ trait Bookkeeper { def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] + /** + * Deletes tables matching the given wildcard pattern. The wildcard pattern is expected + * according to this syntax: 'my_table_prefix_*'. Only '*' is supported at the moment. + * + * The method deletes just the metadata about selected tables in every bookeeping table except + * journal, which is used for logging only. + * + * @param tableWithWildcard A string representing the name or pattern of the tables + * to be deleted. + * @return A sequence of strings representing the names of the tables that were successfully deleted. + */ + def deleteTable(tableWithWildcard: String): Seq[String] + private[pramen] def setRecordCount(table: String, infoDate: LocalDate, inputRecordCount: Long, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala index eedfb18f..5764d3ea 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala @@ -38,6 +38,8 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean, batchId: Long) exte def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit + override def close(): Unit = {} + private[pramen] def saveRecordCountToStorage(table: String, infoDate: LocalDate, inputRecordCount: Long, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala index 0d9ee4ea..911049e4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala @@ -105,6 +105,8 @@ class BookkeeperDeltaPath(bookkeepingPath: String, batchId: Long)(implicit spark .save(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + private def init(): Unit = { initRecordsDirectory(recordsPath) initSchemasDirectory(schemasPath) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index cada2d40..9f1b585e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -38,6 +38,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends private val log = LoggerFactory.getLogger(this.getClass) private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId)) + @volatile private var isClosed = false override val bookkeepingEnabled: Boolean = true @@ -175,6 +176,72 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends } } + override def deleteTable(tableName: String): Seq[String] = { + val hasWildcard = tableName.contains("*") + val tableNameEscaped = if (hasWildcard) + tableName.trim.replace("%", "\\%").replace('*', '%') + else + tableName.trim.replace("%", "\\%") + + val likePattern = if (!hasWildcard) + tableNameEscaped + "->%" + else + tableNameEscaped + + val patternForLogging = if (hasWildcard) + s"'$likePattern'" + else + s"'$tableNameEscaped' or '$likePattern'" + + val listQuery = BookkeepingRecords.records + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .map(_.pramenTableName) + .distinct + + val tablesToDelete = SlickUtils.executeQuery(db, listQuery).sorted + + if (tablesToDelete.length > 100) + throw new IllegalArgumentException(s"The table wildcard '$tableName' matches more than 100 tables (${tablesToDelete.length}). To avoid accidental deletions, please refine the wildcard.") + + val deletionQuery = BookkeepingRecords.records + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .delete + + try { + val deletedBkCount = SlickUtils.executeAction(db, deletionQuery) + log.info(s"Deleted $deletedBkCount records from the bookkeeping table for tables matching $patternForLogging: ${tablesToDelete.mkString(", ")}") + + val deletedSchemaCount = SlickUtils.executeAction(db, SchemaRecords.records + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .delete + ) + log.info(s"Deleted $deletedSchemaCount records from the schemas table.") + + val deletedOffsetsCount = SlickUtils.executeAction(db, OffsetRecords.records + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .delete + ) + log.info(s"Deleted $deletedOffsetsCount records from the offsets table.") + + val deletedMetadataCount = SlickUtils.executeAction(db, MetadataRecords.records + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .delete + ) + log.info(s"Deleted $deletedMetadataCount records from the metadata table.") + + tablesToDelete + } catch { + case NonFatal(ex) => throw new RuntimeException(s"Unable to delete records from the bookkeeping table for tables matching '$likePattern'.", ex) + } + } + + override def close(): Unit = { + if (!isClosed) { + db.close() + isClosed = true + } + } + private[pramen] override def getOffsetManager: OffsetManager = { offsetManagement } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala index 20856753..200bf99d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala @@ -151,6 +151,8 @@ class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection, batchId: Long) ext } } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Bson = { val baseFilter = (infoDateBeginOpt, infoDateEndOpt) match { case (Some(infoDateBegin), Some(infoDateEnd)) => diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala index c2678339..8da6f730 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala @@ -53,4 +53,6 @@ class BookkeeperNull() extends BookkeeperBase(false, 0L) { override def saveSchema(table: String, infoDate: LocalDate, schema: StructType): Unit = {} override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {} + + override def deleteTable(tableWithWildcard: String): Seq[String] = Seq.empty } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala index 172c4543..c2dae6a0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala @@ -202,4 +202,6 @@ class BookkeeperText(bookkeepingPath: String, batchId: Long)(implicit spark: Spa // No-op: CSV-based storage doesn't support efficient in-place deletion. // Cross-batch replacement is not supported for BookkeeperText. } + + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? } diff --git a/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala index 476b4161..aa98ed65 100644 --- a/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala +++ b/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -95,6 +95,8 @@ class BookkeeperDeltaTable(database: Option[String], .saveAsTable(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + def init(): Unit = { initRecordsDirectory() initSchemasDirectory() diff --git a/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala index 5c150abc..5823a5bd 100644 --- a/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala +++ b/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -100,6 +100,8 @@ class BookkeeperDeltaTable(database: Option[String], .saveAsTable(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + def init(): Unit = { initRecordsDirectory() initSchemasDirectory() diff --git a/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala index 5c150abc..5823a5bd 100644 --- a/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala +++ b/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -100,6 +100,8 @@ class BookkeeperDeltaTable(database: Option[String], .saveAsTable(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + def init(): Unit = { initRecordsDirectory() initSchemasDirectory() diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala index 5117716b..63bd5601 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala @@ -156,5 +156,29 @@ class SyncBookkeeperMock(batchId: Long = 123L) extends Bookkeeper { schemas += table -> (infoDate, tableSchema) } + override def deleteTable(tableName: String): Seq[String] = { + val hasWildcard = tableName.contains("*") + val tableNameEscaped = if (hasWildcard) + tableName.trim.replace(".", "\\.").replace("%", ".*").replace("*", ".*") + else + tableName.trim.replace(".", "\\.").replace("%", "\\%").replace("*", "\\*") + + val likePattern = if (!hasWildcard) + tableNameEscaped + "->%" + else + tableNameEscaped + + + val keysToDelete = chunks.keys.filter { case (tblName, _) => + tblName.matches(likePattern) || tblName.matches(tableNameEscaped) + }.toList + + keysToDelete.foreach(chunks.remove) + + keysToDelete.map(_._1).distinct + } + override private[pramen] def getOffsetManager: OffsetManager = null + + override def close(): Unit = {} } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index b0ae67c8..906eae26 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -16,6 +16,7 @@ package za.co.absa.pramen.core.tests.bookkeeper +import org.apache.spark.sql.types.StructType import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperJdbc} import za.co.absa.pramen.core.fixtures.RelationalDbFixture @@ -23,7 +24,10 @@ import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.utils.UsingUtils +import java.time.LocalDate + class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { + private val infoDate = LocalDate.of(2021, 2, 18) val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) var pramenDb: PramenDb = _ @@ -55,6 +59,40 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture assert(getTables.exists(_.equalsIgnoreCase("metadata"))) assert(getTables.exists(_.equalsIgnoreCase("offsets"))) } + + "delete a set of tables by the table prefix name" in { + val bk = getBookkeeper(0L) + + bk.saveSchema("test_table", infoDate, StructType.fromDDL("id INT, name STRING")) + bk.setRecordCount("test_table", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink1", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + + bk.deleteTable("test_table") + + assert(bk.getLatestSchema("test_table", infoDate.plusDays(1)).isEmpty) + assert(bk.getDataChunksCount("test_table2", None, None) == 1) + assert(bk.getDataChunksCount("test_table", None, None) == 0) + assert(bk.getDataChunksCount("test_table->sink2", None, None) == 0) + } + + "delete a set of tables by the table wildcard name" in { + val bk = getBookkeeper(0L) + + bk.saveSchema("test_table", infoDate, StructType.fromDDL("id INT, name STRING")) + bk.setRecordCount("test_table", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink1", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + + bk.deleteTable("test_table*") + + assert(bk.getLatestSchema("test_table", infoDate.plusDays(1)).isEmpty) + assert(bk.getDataChunksCount("test_table2", None, None) == 0) + assert(bk.getDataChunksCount("test_table", None, None) == 0) + assert(bk.getDataChunksCount("test_table->sink2", None, None) == 0) + } } testBookKeeper(batchId => getBookkeeper(batchId)) From 513ba269825eb5e992d2d6313d0a2d94594bb058 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 2 Feb 2026 14:44:24 +0100 Subject: [PATCH 2/3] #706 Fix PR suggestions. --- .../core/app/config/BookkeeperConfig.scala | 2 +- .../pramen/core/bookkeeper/Bookkeeper.scala | 2 +- .../core/bookkeeper/BookkeeperJdbc.scala | 25 +++++++++++-------- .../mocks/bookkeeper/SyncBookkeeperMock.scala | 2 +- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala index f5872157..cf0f7108 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala @@ -65,7 +65,7 @@ object BookkeeperConfig { } else { if (bookkeepingEnabled && bookkeepingConnectionString.isEmpty && bookkeepingLocation.isEmpty && bookkeepingJdbcConfig.isEmpty) { if (allowLocalBookkepingStorage) { - log.warn("Bookeeping configuration is missing. Using the default SQLite database 'pramen.sqlite'") + log.warn("Bookkeeping configuration is missing. Using the default SQLite database 'pramen.sqlite'") return BookkeeperConfig( bookkeepingEnabled = true, None, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala index f321d02d..4440e647 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala @@ -55,7 +55,7 @@ trait Bookkeeper extends AutoCloseable { * Deletes tables matching the given wildcard pattern. The wildcard pattern is expected * according to this syntax: 'my_table_prefix_*'. Only '*' is supported at the moment. * - * The method deletes just the metadata about selected tables in every bookeeping table except + * The method deletes just the metadata about selected tables in every bookkeeping table except * journal, which is used for logging only. * * @param tableWithWildcard A string representing the name or pattern of the tables diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index 9f1b585e..b79ba64b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -30,6 +30,7 @@ import za.co.absa.pramen.core.utils.SlickUtils.WARN_IF_LONGER_MS import za.co.absa.pramen.core.utils.{AlgorithmUtils, SlickUtils, TimeUtils} import java.time.LocalDate +import java.util.concurrent.atomic.AtomicBoolean import scala.util.control.NonFatal class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends BookkeeperBase(true, batchId) { @@ -38,7 +39,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends private val log = LoggerFactory.getLogger(this.getClass) private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId)) - @volatile private var isClosed = false + private val isClosed = new AtomicBoolean(false) override val bookkeepingEnabled: Boolean = true @@ -178,10 +179,13 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends override def deleteTable(tableName: String): Seq[String] = { val hasWildcard = tableName.contains("*") - val tableNameEscaped = if (hasWildcard) - tableName.trim.replace("%", "\\%").replace('*', '%') - else - tableName.trim.replace("%", "\\%") + + val escape = '\\' + val baseEscaped = tableName.trim + .replace("\\", "\\\\") + .replace("%", "\\%") + + val tableNameEscaped = if (hasWildcard) baseEscaped.replace("*", "%") else baseEscaped val likePattern = if (!hasWildcard) tableNameEscaped + "->%" @@ -194,7 +198,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends s"'$tableNameEscaped' or '$likePattern'" val listQuery = BookkeepingRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) .map(_.pramenTableName) .distinct @@ -212,19 +216,19 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends log.info(s"Deleted $deletedBkCount records from the bookkeeping table for tables matching $patternForLogging: ${tablesToDelete.mkString(", ")}") val deletedSchemaCount = SlickUtils.executeAction(db, SchemaRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) .delete ) log.info(s"Deleted $deletedSchemaCount records from the schemas table.") val deletedOffsetsCount = SlickUtils.executeAction(db, OffsetRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) .delete ) log.info(s"Deleted $deletedOffsetsCount records from the offsets table.") val deletedMetadataCount = SlickUtils.executeAction(db, MetadataRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) .delete ) log.info(s"Deleted $deletedMetadataCount records from the metadata table.") @@ -236,9 +240,8 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends } override def close(): Unit = { - if (!isClosed) { + if (isClosed.compareAndSet(false, true)) { db.close() - isClosed = true } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala index 63bd5601..8ec0e183 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala @@ -164,7 +164,7 @@ class SyncBookkeeperMock(batchId: Long = 123L) extends Bookkeeper { tableName.trim.replace(".", "\\.").replace("%", "\\%").replace("*", "\\*") val likePattern = if (!hasWildcard) - tableNameEscaped + "->%" + tableNameEscaped + "->*." else tableNameEscaped From f7c80036c06cbce4378f2c93ececc19c2f66c7b2 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 2 Feb 2026 15:16:56 +0100 Subject: [PATCH 3/3] #706 Fix single character escaping in the bookkeeping deletion method. --- .../pramen/core/bookkeeper/BookkeeperJdbc.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index b79ba64b..7c62091b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -181,9 +181,11 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends val hasWildcard = tableName.contains("*") val escape = '\\' + val tableNameTrimmed = tableName.trim val baseEscaped = tableName.trim .replace("\\", "\\\\") .replace("%", "\\%") + .replace("_", "\\_") val tableNameEscaped = if (hasWildcard) baseEscaped.replace("*", "%") else baseEscaped @@ -195,10 +197,10 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends val patternForLogging = if (hasWildcard) s"'$likePattern'" else - s"'$tableNameEscaped' or '$likePattern'" + s"'$tableNameTrimmed' or '$likePattern'" val listQuery = BookkeepingRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) .map(_.pramenTableName) .distinct @@ -208,7 +210,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends throw new IllegalArgumentException(s"The table wildcard '$tableName' matches more than 100 tables (${tablesToDelete.length}). To avoid accidental deletions, please refine the wildcard.") val deletionQuery = BookkeepingRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern)) + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) .delete try { @@ -216,26 +218,26 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends log.info(s"Deleted $deletedBkCount records from the bookkeeping table for tables matching $patternForLogging: ${tablesToDelete.mkString(", ")}") val deletedSchemaCount = SlickUtils.executeAction(db, SchemaRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) .delete ) log.info(s"Deleted $deletedSchemaCount records from the schemas table.") val deletedOffsetsCount = SlickUtils.executeAction(db, OffsetRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) .delete ) log.info(s"Deleted $deletedOffsetsCount records from the offsets table.") val deletedMetadataCount = SlickUtils.executeAction(db, MetadataRecords.records - .filter(r => r.pramenTableName === tableNameEscaped || r.pramenTableName.like(likePattern, escape)) + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) .delete ) log.info(s"Deleted $deletedMetadataCount records from the metadata table.") tablesToDelete } catch { - case NonFatal(ex) => throw new RuntimeException(s"Unable to delete records from the bookkeeping table for tables matching '$likePattern'.", ex) + case NonFatal(ex) => throw new RuntimeException(s"Unable to delete records from the bookkeeping table for tables matching '$patternForLogging'.", ex) } }