Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object BookkeeperConfig {
} else {
if (bookkeepingEnabled && bookkeepingConnectionString.isEmpty && bookkeepingLocation.isEmpty && bookkeepingJdbcConfig.isEmpty) {
if (allowLocalBookkepingStorage) {
log.warn("Bookeeping configuration is missing. Using the default SQLite database 'pramen.sqlite'")
log.warn("Bookkeeping configuration is missing. Using the default SQLite database 'pramen.sqlite'")
return BookkeeperConfig(
bookkeepingEnabled = true,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import za.co.absa.pramen.core.rdb.PramenDb
import java.time.LocalDate

/**
* A bookkeeper is responsible of querying and updating state of all tables related to an ingestion pipeline.
* A bookkeeper is responsible for querying and updating state of all tables related to an ingestion pipeline.
*/
trait Bookkeeper {
trait Bookkeeper extends AutoCloseable {
val bookkeepingEnabled: Boolean

def getLatestProcessedDate(table: String, until: Option[LocalDate] = None): Option[LocalDate]
Expand All @@ -51,6 +51,19 @@ trait Bookkeeper {

def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)]

/**
* Deletes tables matching the given wildcard pattern. The wildcard pattern is expected
* according to this syntax: 'my_table_prefix_*'. Only '*' is supported at the moment.
*
* The method deletes just the metadata about selected tables in every bookkeeping table except
* journal, which is used for logging only.
*
* @param tableWithWildcard A string representing the name or pattern of the tables
* to be deleted.
* @return A sequence of strings representing the names of the tables that were successfully deleted.
*/
def deleteTable(tableWithWildcard: String): Seq[String]

private[pramen] def setRecordCount(table: String,
infoDate: LocalDate,
inputRecordCount: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean, batchId: Long) exte

def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit

override def close(): Unit = {}

private[pramen] def saveRecordCountToStorage(table: String,
infoDate: LocalDate,
inputRecordCount: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class BookkeeperDeltaPath(bookkeepingPath: String, batchId: Long)(implicit spark
.save(pathOrTable)
}

override def deleteTable(tableWithWildcard: String): Seq[String] = ???

private def init(): Unit = {
initRecordsDirectory(recordsPath)
initSchemasDirectory(schemasPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import za.co.absa.pramen.core.utils.SlickUtils.WARN_IF_LONGER_MS
import za.co.absa.pramen.core.utils.{AlgorithmUtils, SlickUtils, TimeUtils}

import java.time.LocalDate
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal

class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends BookkeeperBase(true, batchId) {
Expand All @@ -38,6 +39,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends

private val log = LoggerFactory.getLogger(this.getClass)
private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId))
private val isClosed = new AtomicBoolean(false)

override val bookkeepingEnabled: Boolean = true

Expand Down Expand Up @@ -175,6 +177,76 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
}
}

override def deleteTable(tableName: String): Seq[String] = {
val hasWildcard = tableName.contains("*")

val escape = '\\'
val tableNameTrimmed = tableName.trim
val baseEscaped = tableName.trim
.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_")

val tableNameEscaped = if (hasWildcard) baseEscaped.replace("*", "%") else baseEscaped

val likePattern = if (!hasWildcard)
tableNameEscaped + "->%"
else
tableNameEscaped

val patternForLogging = if (hasWildcard)
s"'$likePattern'"
else
s"'$tableNameTrimmed' or '$likePattern'"

val listQuery = BookkeepingRecords.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.map(_.pramenTableName)
.distinct

val tablesToDelete = SlickUtils.executeQuery(db, listQuery).sorted

if (tablesToDelete.length > 100)
throw new IllegalArgumentException(s"The table wildcard '$tableName' matches more than 100 tables (${tablesToDelete.length}). To avoid accidental deletions, please refine the wildcard.")

val deletionQuery = BookkeepingRecords.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete

try {
val deletedBkCount = SlickUtils.executeAction(db, deletionQuery)
log.info(s"Deleted $deletedBkCount records from the bookkeeping table for tables matching $patternForLogging: ${tablesToDelete.mkString(", ")}")

val deletedSchemaCount = SlickUtils.executeAction(db, SchemaRecords.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
log.info(s"Deleted $deletedSchemaCount records from the schemas table.")

val deletedOffsetsCount = SlickUtils.executeAction(db, OffsetRecords.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
log.info(s"Deleted $deletedOffsetsCount records from the offsets table.")

val deletedMetadataCount = SlickUtils.executeAction(db, MetadataRecords.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
log.info(s"Deleted $deletedMetadataCount records from the metadata table.")

tablesToDelete
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to delete records from the bookkeeping table for tables matching '$patternForLogging'.", ex)
}
}

override def close(): Unit = {
if (isClosed.compareAndSet(false, true)) {
db.close()
}
}

private[pramen] override def getOffsetManager: OffsetManager = {
offsetManagement
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection, batchId: Long) ext
}
}

override def deleteTable(tableWithWildcard: String): Seq[String] = ???

private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Bson = {
val baseFilter = (infoDateBeginOpt, infoDateEndOpt) match {
case (Some(infoDateBegin), Some(infoDateEnd)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ class BookkeeperNull() extends BookkeeperBase(false, 0L) {
override def saveSchema(table: String, infoDate: LocalDate, schema: StructType): Unit = {}

override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {}

override def deleteTable(tableWithWildcard: String): Seq[String] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,6 @@ class BookkeeperText(bookkeepingPath: String, batchId: Long)(implicit spark: Spa
// No-op: CSV-based storage doesn't support efficient in-place deletion.
// Cross-batch replacement is not supported for BookkeeperText.
}

override def deleteTable(tableWithWildcard: String): Seq[String] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class BookkeeperDeltaTable(database: Option[String],
.saveAsTable(pathOrTable)
}

override def deleteTable(tableWithWildcard: String): Seq[String] = ???

def init(): Unit = {
initRecordsDirectory()
initSchemasDirectory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class BookkeeperDeltaTable(database: Option[String],
.saveAsTable(pathOrTable)
}

override def deleteTable(tableWithWildcard: String): Seq[String] = ???

def init(): Unit = {
initRecordsDirectory()
initSchemasDirectory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class BookkeeperDeltaTable(database: Option[String],
.saveAsTable(pathOrTable)
}

override def deleteTable(tableWithWildcard: String): Seq[String] = ???

def init(): Unit = {
initRecordsDirectory()
initSchemasDirectory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,29 @@ class SyncBookkeeperMock(batchId: Long = 123L) extends Bookkeeper {
schemas += table -> (infoDate, tableSchema)
}

override def deleteTable(tableName: String): Seq[String] = {
val hasWildcard = tableName.contains("*")
val tableNameEscaped = if (hasWildcard)
tableName.trim.replace(".", "\\.").replace("%", ".*").replace("*", ".*")
else
tableName.trim.replace(".", "\\.").replace("%", "\\%").replace("*", "\\*")

val likePattern = if (!hasWildcard)
tableNameEscaped + "->*."
else
tableNameEscaped


val keysToDelete = chunks.keys.filter { case (tblName, _) =>
tblName.matches(likePattern) || tblName.matches(tableNameEscaped)
}.toList

keysToDelete.foreach(chunks.remove)

keysToDelete.map(_._1).distinct
}

override private[pramen] def getOffsetManager: OffsetManager = null

override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package za.co.absa.pramen.core.tests.bookkeeper

import org.apache.spark.sql.types.StructType
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperJdbc}
import za.co.absa.pramen.core.fixtures.RelationalDbFixture
import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc}
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.utils.UsingUtils

import java.time.LocalDate

class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll {
private val infoDate = LocalDate.of(2021, 2, 18)

val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password))
var pramenDb: PramenDb = _
Expand Down Expand Up @@ -55,6 +59,40 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture
assert(getTables.exists(_.equalsIgnoreCase("metadata")))
assert(getTables.exists(_.equalsIgnoreCase("offsets")))
}

"delete a set of tables by the table prefix name" in {
val bk = getBookkeeper(0L)

bk.saveSchema("test_table", infoDate, StructType.fromDDL("id INT, name STRING"))
bk.setRecordCount("test_table", infoDate, 1, 1, None, 1, 1, isTableTransient = false)
bk.setRecordCount("test_table2", infoDate, 1, 1, None, 1, 1, isTableTransient = false)
bk.setRecordCount("test_table->sink1", infoDate, 1, 1, None, 1, 1, isTableTransient = false)
bk.setRecordCount("test_table->sink2", infoDate, 1, 1, None, 1, 1, isTableTransient = false)

bk.deleteTable("test_table")

assert(bk.getLatestSchema("test_table", infoDate.plusDays(1)).isEmpty)
assert(bk.getDataChunksCount("test_table2", None, None) == 1)
assert(bk.getDataChunksCount("test_table", None, None) == 0)
assert(bk.getDataChunksCount("test_table->sink2", None, None) == 0)
}

"delete a set of tables by the table wildcard name" in {
val bk = getBookkeeper(0L)

bk.saveSchema("test_table", infoDate, StructType.fromDDL("id INT, name STRING"))
bk.setRecordCount("test_table", infoDate, 1, 1, None, 1, 1, isTableTransient = false)
bk.setRecordCount("test_table2", infoDate, 1, 1, None, 1, 1, isTableTransient = false)
bk.setRecordCount("test_table->sink1", infoDate, 1, 1, None, 1, 1, isTableTransient = false)
bk.setRecordCount("test_table->sink2", infoDate, 1, 1, None, 1, 1, isTableTransient = false)

bk.deleteTable("test_table*")

assert(bk.getLatestSchema("test_table", infoDate.plusDays(1)).isEmpty)
assert(bk.getDataChunksCount("test_table2", None, None) == 0)
assert(bk.getDataChunksCount("test_table", None, None) == 0)
assert(bk.getDataChunksCount("test_table->sink2", None, None) == 0)
}
}

testBookKeeper(batchId => getBookkeeper(batchId))
Expand Down
Loading