diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala index f5872157..cf0f7108 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scala @@ -65,7 +65,7 @@ object BookkeeperConfig { } else { if (bookkeepingEnabled && bookkeepingConnectionString.isEmpty && bookkeepingLocation.isEmpty && bookkeepingJdbcConfig.isEmpty) { if (allowLocalBookkepingStorage) { - log.warn("Bookeeping configuration is missing. Using the default SQLite database 'pramen.sqlite'") + log.warn("Bookkeeping configuration is missing. Using the default SQLite database 'pramen.sqlite'") return BookkeeperConfig( bookkeepingEnabled = true, None, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala index cbcbf95f..4440e647 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala @@ -34,9 +34,9 @@ import za.co.absa.pramen.core.rdb.PramenDb import java.time.LocalDate /** - * A bookkeeper is responsible of querying and updating state of all tables related to an ingestion pipeline. + * A bookkeeper is responsible for querying and updating state of all tables related to an ingestion pipeline. */ -trait Bookkeeper { +trait Bookkeeper extends AutoCloseable { val bookkeepingEnabled: Boolean def getLatestProcessedDate(table: String, until: Option[LocalDate] = None): Option[LocalDate] @@ -51,6 +51,19 @@ trait Bookkeeper { def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] + /** + * Deletes tables matching the given wildcard pattern. The wildcard pattern is expected + * according to this syntax: 'my_table_prefix_*'. Only '*' is supported at the moment. + * + * The method deletes just the metadata about selected tables in every bookkeeping table except + * journal, which is used for logging only. + * + * @param tableWithWildcard A string representing the name or pattern of the tables + * to be deleted. + * @return A sequence of strings representing the names of the tables that were successfully deleted. + */ + def deleteTable(tableWithWildcard: String): Seq[String] + private[pramen] def setRecordCount(table: String, infoDate: LocalDate, inputRecordCount: Long, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala index eedfb18f..5764d3ea 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala @@ -38,6 +38,8 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean, batchId: Long) exte def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit + override def close(): Unit = {} + private[pramen] def saveRecordCountToStorage(table: String, infoDate: LocalDate, inputRecordCount: Long, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala index 0d9ee4ea..911049e4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala @@ -105,6 +105,8 @@ class BookkeeperDeltaPath(bookkeepingPath: String, batchId: Long)(implicit spark .save(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + private def init(): Unit = { initRecordsDirectory(recordsPath) initSchemasDirectory(schemasPath) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index cada2d40..7c62091b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -30,6 +30,7 @@ import za.co.absa.pramen.core.utils.SlickUtils.WARN_IF_LONGER_MS import za.co.absa.pramen.core.utils.{AlgorithmUtils, SlickUtils, TimeUtils} import java.time.LocalDate +import java.util.concurrent.atomic.AtomicBoolean import scala.util.control.NonFatal class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends BookkeeperBase(true, batchId) { @@ -38,6 +39,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends private val log = LoggerFactory.getLogger(this.getClass) private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId)) + private val isClosed = new AtomicBoolean(false) override val bookkeepingEnabled: Boolean = true @@ -175,6 +177,76 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends } } + override def deleteTable(tableName: String): Seq[String] = { + val hasWildcard = tableName.contains("*") + + val escape = '\\' + val tableNameTrimmed = tableName.trim + val baseEscaped = tableName.trim + .replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_") + + val tableNameEscaped = if (hasWildcard) baseEscaped.replace("*", "%") else baseEscaped + + val likePattern = if (!hasWildcard) + tableNameEscaped + "->%" + else + tableNameEscaped + + val patternForLogging = if (hasWildcard) + s"'$likePattern'" + else + s"'$tableNameTrimmed' or '$likePattern'" + + val listQuery = BookkeepingRecords.records + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) + .map(_.pramenTableName) + .distinct + + val tablesToDelete = SlickUtils.executeQuery(db, listQuery).sorted + + if (tablesToDelete.length > 100) + throw new IllegalArgumentException(s"The table wildcard '$tableName' matches more than 100 tables (${tablesToDelete.length}). To avoid accidental deletions, please refine the wildcard.") + + val deletionQuery = BookkeepingRecords.records + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) + .delete + + try { + val deletedBkCount = SlickUtils.executeAction(db, deletionQuery) + log.info(s"Deleted $deletedBkCount records from the bookkeeping table for tables matching $patternForLogging: ${tablesToDelete.mkString(", ")}") + + val deletedSchemaCount = SlickUtils.executeAction(db, SchemaRecords.records + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) + .delete + ) + log.info(s"Deleted $deletedSchemaCount records from the schemas table.") + + val deletedOffsetsCount = SlickUtils.executeAction(db, OffsetRecords.records + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) + .delete + ) + log.info(s"Deleted $deletedOffsetsCount records from the offsets table.") + + val deletedMetadataCount = SlickUtils.executeAction(db, MetadataRecords.records + .filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape)) + .delete + ) + log.info(s"Deleted $deletedMetadataCount records from the metadata table.") + + tablesToDelete + } catch { + case NonFatal(ex) => throw new RuntimeException(s"Unable to delete records from the bookkeeping table for tables matching '$patternForLogging'.", ex) + } + } + + override def close(): Unit = { + if (isClosed.compareAndSet(false, true)) { + db.close() + } + } + private[pramen] override def getOffsetManager: OffsetManager = { offsetManagement } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala index 20856753..200bf99d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala @@ -151,6 +151,8 @@ class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection, batchId: Long) ext } } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Bson = { val baseFilter = (infoDateBeginOpt, infoDateEndOpt) match { case (Some(infoDateBegin), Some(infoDateEnd)) => diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala index c2678339..8da6f730 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala @@ -53,4 +53,6 @@ class BookkeeperNull() extends BookkeeperBase(false, 0L) { override def saveSchema(table: String, infoDate: LocalDate, schema: StructType): Unit = {} override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {} + + override def deleteTable(tableWithWildcard: String): Seq[String] = Seq.empty } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala index 172c4543..c2dae6a0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala @@ -202,4 +202,6 @@ class BookkeeperText(bookkeepingPath: String, batchId: Long)(implicit spark: Spa // No-op: CSV-based storage doesn't support efficient in-place deletion. // Cross-batch replacement is not supported for BookkeeperText. } + + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? } diff --git a/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala index 476b4161..aa98ed65 100644 --- a/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala +++ b/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -95,6 +95,8 @@ class BookkeeperDeltaTable(database: Option[String], .saveAsTable(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + def init(): Unit = { initRecordsDirectory() initSchemasDirectory() diff --git a/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala index 5c150abc..5823a5bd 100644 --- a/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala +++ b/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -100,6 +100,8 @@ class BookkeeperDeltaTable(database: Option[String], .saveAsTable(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + def init(): Unit = { initRecordsDirectory() initSchemasDirectory() diff --git a/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala index 5c150abc..5823a5bd 100644 --- a/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala +++ b/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -100,6 +100,8 @@ class BookkeeperDeltaTable(database: Option[String], .saveAsTable(pathOrTable) } + override def deleteTable(tableWithWildcard: String): Seq[String] = ??? + def init(): Unit = { initRecordsDirectory() initSchemasDirectory() diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala index 5117716b..8ec0e183 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala @@ -156,5 +156,29 @@ class SyncBookkeeperMock(batchId: Long = 123L) extends Bookkeeper { schemas += table -> (infoDate, tableSchema) } + override def deleteTable(tableName: String): Seq[String] = { + val hasWildcard = tableName.contains("*") + val tableNameEscaped = if (hasWildcard) + tableName.trim.replace(".", "\\.").replace("%", ".*").replace("*", ".*") + else + tableName.trim.replace(".", "\\.").replace("%", "\\%").replace("*", "\\*") + + val likePattern = if (!hasWildcard) + tableNameEscaped + "->*." + else + tableNameEscaped + + + val keysToDelete = chunks.keys.filter { case (tblName, _) => + tblName.matches(likePattern) || tblName.matches(tableNameEscaped) + }.toList + + keysToDelete.foreach(chunks.remove) + + keysToDelete.map(_._1).distinct + } + override private[pramen] def getOffsetManager: OffsetManager = null + + override def close(): Unit = {} } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index b0ae67c8..906eae26 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -16,6 +16,7 @@ package za.co.absa.pramen.core.tests.bookkeeper +import org.apache.spark.sql.types.StructType import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperJdbc} import za.co.absa.pramen.core.fixtures.RelationalDbFixture @@ -23,7 +24,10 @@ import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.utils.UsingUtils +import java.time.LocalDate + class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { + private val infoDate = LocalDate.of(2021, 2, 18) val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) var pramenDb: PramenDb = _ @@ -55,6 +59,40 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture assert(getTables.exists(_.equalsIgnoreCase("metadata"))) assert(getTables.exists(_.equalsIgnoreCase("offsets"))) } + + "delete a set of tables by the table prefix name" in { + val bk = getBookkeeper(0L) + + bk.saveSchema("test_table", infoDate, StructType.fromDDL("id INT, name STRING")) + bk.setRecordCount("test_table", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink1", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + + bk.deleteTable("test_table") + + assert(bk.getLatestSchema("test_table", infoDate.plusDays(1)).isEmpty) + assert(bk.getDataChunksCount("test_table2", None, None) == 1) + assert(bk.getDataChunksCount("test_table", None, None) == 0) + assert(bk.getDataChunksCount("test_table->sink2", None, None) == 0) + } + + "delete a set of tables by the table wildcard name" in { + val bk = getBookkeeper(0L) + + bk.saveSchema("test_table", infoDate, StructType.fromDDL("id INT, name STRING")) + bk.setRecordCount("test_table", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink1", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + bk.setRecordCount("test_table->sink2", infoDate, 1, 1, None, 1, 1, isTableTransient = false) + + bk.deleteTable("test_table*") + + assert(bk.getLatestSchema("test_table", infoDate.plusDays(1)).isEmpty) + assert(bk.getDataChunksCount("test_table2", None, None) == 0) + assert(bk.getDataChunksCount("test_table", None, None) == 0) + assert(bk.getDataChunksCount("test_table->sink2", None, None) == 0) + } } testBookKeeper(batchId => getBookkeeper(batchId))