Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ class AppContextImpl(val appConfig: AppConfig,

override def close(): Unit = synchronized {
if (closable != null) {
closable.close()
try {
bookkeeper.close()
} finally {
closable.close()
}
Comment on lines 44 to 48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

closable = null is unreachable if closable.close() throws — risk of double-close.

If closable.close() throws, closable = null on Line 48 is skipped. A subsequent call to close() will pass the if (closable != null) guard again and invoke both bookkeeper.close() and closable.close() a second time.

Set the field to null (via a local capture) before calling close() to make the guard idempotent regardless of exceptions:

🛡️ Proposed fix
     try {
       bookkeeper.close()
     } finally {
-      closable.close()
-      closable = null
+      val c = closable
+      closable = null
+      c.close()
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
bookkeeper.close()
} finally {
closable.close()
closable = null
}
try {
bookkeeper.close()
} finally {
val c = closable
closable = null
c.close()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala`
around lines 44 - 49, In AppContextImpl (the close/cleanup logic around
bookkeeper.close() and closable.close()), make the closable field idempotent by
nulling the field before invoking its close to avoid double-close if close()
throws: capture the current closable into a local val, set the class-level
closable = null, then call local.close(); keep the try/finally around
bookkeeper.close() as-is but ensure you reference the same local variable when
closing the resource so the if (closable != null) guard can’t result in a second
close after an exception.

closable = null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object Bookkeeper {
val tokenFactory = if (runtimeConfig.useLocks && bookkeepingConfig.bookkeepingEnabled) {
if (hasBookkeepingJdbc) {
log.info(s"Using RDB for lock management.")
new TokenLockFactoryJdbc(dbOpt.get.slickDb)
new TokenLockFactoryJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else {
mongoDbConnection match {
case Some(connection) =>
Expand Down Expand Up @@ -128,7 +128,7 @@ object Bookkeeper {
log.info(s"Bookkeeping is DISABLED. Updates won't be tracked")
new BookkeeperNull()
} else if (hasBookkeepingJdbc) {
new BookkeeperJdbc(dbOpt.get.slickDb, dbOpt.get.profile, batchId)
BookkeeperJdbc.fromPramenDb(dbOpt.get, batchId)
} else {
mongoDbConnection match {
case Some(connection) =>
Expand Down Expand Up @@ -160,7 +160,7 @@ object Bookkeeper {
new JournalNull()
} else if (hasBookkeepingJdbc) {
log.info(s"Using RDB to keep journal of executed jobs.")
new JournalJdbc(dbOpt.get.slickDb)
new JournalJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else {
mongoDbConnection match {
case Some(connection) =>
Expand Down Expand Up @@ -193,7 +193,7 @@ object Bookkeeper {
new MetadataManagerNull(isPersistenceEnabled = false)
} else if (hasBookkeepingJdbc) {
log.info(s"Using RDB to keep custom metadata.")
new MetadataManagerJdbc(dbOpt.get.slickDb)
new MetadataManagerJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else {
log.info(s"The custom metadata management is not supported.")
new MetadataManagerNull(isPersistenceEnabled = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ package za.co.absa.pramen.core.bookkeeper

import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile
import za.co.absa.pramen.core.bookkeeper.model._
import za.co.absa.pramen.core.model.{DataChunk, TableSchema}
import za.co.absa.pramen.core.rdb.PramenDb
import za.co.absa.pramen.core.rdb.PramenDb.DEFAULT_RETRIES
import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.utils.SlickUtils.WARN_IF_LONGER_MS
import za.co.absa.pramen.core.utils.{AlgorithmUtils, SlickUtils, TimeUtils}
Expand All @@ -33,33 +29,34 @@ import java.time.LocalDate
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal

class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends BookkeeperBase(true, batchId) {
import profile.api._
class BookkeeperJdbc(pramenDb: PramenDb, batchId: Long, autoCloseDb: Boolean) extends BookkeeperBase(true, batchId) {
import pramenDb.slickProfile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)
private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId))
private val isClosed = new AtomicBoolean(false)
private val log = LoggerFactory.getLogger(this.getClass)
private val slickUtils = new SlickUtils(pramenDb.slickProfile)
private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, batchId))

override val bookkeepingEnabled: Boolean = true

override def getLatestProcessedDateFromStorage(table: String, until: Option[LocalDate]): Option[LocalDate] = {
val query = until match {
case Some(endDate) =>
val endDateStr = DataChunk.dateFormatter.format(endDate)
BookkeepingRecords.records
pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === table && r.infoDate <= endDateStr)
.sortBy(r => (r.infoDate.desc, r.jobFinished.desc))
.take(1)
case None =>
BookkeepingRecords.records
pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === table)
.sortBy(r => (r.infoDate.desc, r.jobFinished.desc))
.take(1)
}

val chunks = try {
SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query)
slickUtils.executeQuery(pramenDb.slickDb, query)
.map(DataChunk.fromRecord)
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping table.", ex)
Expand All @@ -77,7 +74,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val query = getFilter(table, Option(infoDate), Option(infoDate), batchId)

try {
SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query)
slickUtils.executeQuery(pramenDb.slickDb, query)
.map(DataChunk.fromRecord)
.toArray[DataChunk]
.sortBy(_.jobFinished)
Expand All @@ -92,7 +89,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
.take(1)

try {
val records = SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query)
val records = slickUtils.executeQuery(pramenDb.slickDb, query)
.map(DataChunk.fromRecord)
.toArray[DataChunk]

Expand All @@ -109,7 +106,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
.length

val count = try {
SlickUtils.executeCount(db, query)
slickUtils.executeCount(pramenDb.slickDb, query)
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping table.", ex)
}
Expand All @@ -126,7 +123,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
.sortBy(_._1)

try {
SlickUtils.executeQuery[(Rep[String], Rep[Int], Rep[Long]), (String, Int, Long)](db, query)
slickUtils.executeQuery[(Rep[String], Rep[Int], Rep[Long]), (String, Int, Long)](pramenDb.slickDb, query)
.map { case (infoDateStr, recordCount, outputRecordCount) =>
val infoDate = LocalDate.parse(infoDateStr, DataChunk.dateFormatter)
DataAvailability(infoDate, recordCount, outputRecordCount)
Expand All @@ -148,9 +145,9 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val record = BookkeepingRecord(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, recordsAppended, jobStarted, jobFinished, Option(batchId))

try {
SlickUtils.ensureDbConnected(db)
db.run(
BookkeepingRecords.records += record
slickUtils.ensureDbConnected(pramenDb.slickDb)
pramenDb.slickDb.run(
pramenDb.bookkeepingTable.records += record
).execute()
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to write to the bookkeeping table.", ex)
Expand All @@ -160,13 +157,13 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
val dateStr = DataChunk.dateFormatter.format(infoDate)

val query = BookkeepingRecords.records
val query = pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === table && r.infoDate === dateStr && r.batchId =!= Option(batchId))
.delete

try {
AlgorithmUtils.runActionWithElapsedTimeEvent(WARN_IF_LONGER_MS) {
db.run(query).execute()
pramenDb.slickDb.run(query).execute()
} { actualTimeMs =>
val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(actualTimeMs)
val sql = query.statements.mkString("; ")
Expand Down Expand Up @@ -199,37 +196,37 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
else
s"'$tableNameTrimmed' or '$likePattern'"

val listQuery = BookkeepingRecords.records
val listQuery = pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.map(_.pramenTableName)
.distinct

val tablesToDelete = SlickUtils.executeQuery(db, listQuery).sorted
val tablesToDelete = slickUtils.executeQuery(pramenDb.slickDb, listQuery).sorted

if (tablesToDelete.length > 100)
throw new IllegalArgumentException(s"The table wildcard '$tableName' matches more than 100 tables (${tablesToDelete.length}). To avoid accidental deletions, please refine the wildcard.")
Comment on lines +199 to 207
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

listQuery execution sits outside the try-catch block.

Line 204 executes a DB query but is not covered by the try { ... } catch { case NonFatal(ex) => throw new RuntimeException(...) } that starts at Line 213. A connection failure or query error during the listing phase will throw a raw exception with no contextual message, unlike every other DB operation in this class.

🛡️ Proposed fix: extend the try-catch to cover the list query
-    val tablesToDelete = slickUtils.executeQuery(pramenDb.slickDb, listQuery).sorted
-
-    if (tablesToDelete.length > 100)
-      throw new IllegalArgumentException(...)
-
-    val deletionQuery = ...
-
-    try {
-      val deletedBkCount = ...
+    val tablesToDelete = try {
+      slickUtils.executeQuery(pramenDb.slickDb, listQuery).sorted
+    } catch {
+      case NonFatal(ex) => throw new RuntimeException(s"Unable to list tables matching '$patternForLogging' from the bookkeeping table.", ex)
+    }
+
+    if (tablesToDelete.length > 100)
+      throw new IllegalArgumentException(...)
+
+    val deletionQuery = ...
+
+    try {
+      val deletedBkCount = ...
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala`
around lines 199 - 207, The code executes listQuery / slickUtils.executeQuery
(producing tablesToDelete) outside the existing try-catch in BookkeeperJdbc, so
move the execution of listQuery and the subsequent length check into the
surrounding try { ... } catch { case NonFatal(ex) => ... } block (or extend that
try to start before val listQuery is executed) so any DB/query failures are
caught and rethrown with the same RuntimeException handling used for other DB
operations; ensure you reference listQuery, tablesToDelete, and the existing
try-catch around the deletion logic when making the change.


val deletionQuery = BookkeepingRecords.records
val deletionQuery = pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete

try {
val deletedBkCount = SlickUtils.executeAction(db, deletionQuery)
val deletedBkCount = slickUtils.executeAction(pramenDb.slickDb, deletionQuery)
log.info(s"Deleted $deletedBkCount records from the bookkeeping table for tables matching $patternForLogging: ${tablesToDelete.mkString(", ")}")

val deletedSchemaCount = SlickUtils.executeAction(db, SchemaRecords.records
val deletedSchemaCount = slickUtils.executeAction(pramenDb.slickDb, pramenDb.schemaTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
log.info(s"Deleted $deletedSchemaCount records from the schemas table.")

val deletedOffsetsCount = SlickUtils.executeAction(db, OffsetRecords.records
val deletedOffsetsCount = slickUtils.executeAction(pramenDb.slickDb, pramenDb.offsetTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
log.info(s"Deleted $deletedOffsetsCount records from the offsets table.")

val deletedMetadataCount = SlickUtils.executeAction(db, MetadataRecords.records
val deletedMetadataCount = slickUtils.executeAction(pramenDb.slickDb, pramenDb.metadataTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
Expand All @@ -242,40 +239,40 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
}

override def close(): Unit = {
if (isClosed.compareAndSet(false, true)) {
db.close()
if (autoCloseDb && isClosed.compareAndSet(false, true)) {
pramenDb.close()
}
}

private[pramen] override def getOffsetManager: OffsetManager = {
offsetManagement
}

private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Query[BookkeepingRecords, BookkeepingRecord, Seq] = {
private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Query[pramenDb.bookkeepingTable.BookkeepingRecords, BookkeepingRecord, Seq] = {
val baseFilter = (infoDateBeginOpt, infoDateEndOpt) match {
case (Some(infoDateBegin), Some(infoDateEnd)) =>
val date0Str = DataChunk.dateFormatter.format(infoDateBegin)
val date1Str = DataChunk.dateFormatter.format(infoDateEnd)

if (date0Str == date1Str) {
BookkeepingRecords.records
pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate === date0Str)
} else {
BookkeepingRecords.records
pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate >= date0Str && r.infoDate <= date1Str)
}
case (Some(infoDateBegin), None) =>
val date0Str = DataChunk.dateFormatter.format(infoDateBegin)

BookkeepingRecords.records
pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate >= date0Str)
case (None, Some(infoDateEnd)) =>
val date1Str = DataChunk.dateFormatter.format(infoDateEnd)

BookkeepingRecords.records
pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate <= date1Str)
case (None, None) =>
BookkeepingRecords.records
pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName)
}

Expand All @@ -287,11 +284,11 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends

override def getLatestSchema(table: String, infoDate: LocalDate): Option[(StructType, LocalDate)] = {
val infoDateStr = infoDate.toString
val query = SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate <= infoDateStr)
val query = pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate <= infoDateStr)
.sortBy(t => t.infoDate.desc)
.take(1)

SlickUtils.executeQuery[SchemaRecords, SchemaRecord](db, query)
slickUtils.executeQuery(pramenDb.slickDb, query)
.map(schemaRecord => TableSchema(schemaRecord.pramenTableName, schemaRecord.infoDate, schemaRecord.schemaJson))
.flatMap(tableSchema =>
TableSchema.toSchemaAndDate(tableSchema)
Expand All @@ -303,13 +300,13 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val infoDateStr = infoDate.toString

try {
SlickUtils.ensureDbConnected(db)
db.run(
SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete
slickUtils.ensureDbConnected(pramenDb.slickDb)
pramenDb.slickDb.run(
pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete
).execute()

db.run(
SchemaRecords.records += SchemaRecord(table, infoDate.toString, schema.json)
pramenDb.slickDb.run(
pramenDb.schemaTable.records += SchemaRecord(table, infoDate.toString, schema.json)
).execute()
} catch {
case NonFatal(ex) => log.error(s"Unable to write to the bookkeeping schema table.", ex)
Expand All @@ -319,12 +316,13 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
/** This method is for migration purposes*/
private[pramen] def saveSchemaRaw(table: String, infoDate: String, schema: String): Unit = {
try {
db.run(
SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate === infoDate).delete
slickUtils.ensureDbConnected(pramenDb.slickDb)
pramenDb.slickDb.run(
pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate === infoDate).delete
).execute()

db.run(
SchemaRecords.records += SchemaRecord(table, infoDate, schema)
pramenDb.slickDb.run(
pramenDb.schemaTable.records += SchemaRecord(table, infoDate, schema)
).execute()
} catch {
case NonFatal(ex) => log.error(s"Unable to write to the bookkeeping schema table.", ex)
Expand All @@ -333,19 +331,12 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
}

object BookkeeperJdbc {
def fromJdbcConfig(jdbcConfig: JdbcConfig, batchId: Long): BookkeeperJdbc = {
val selector = JdbcUrlSelector(jdbcConfig)
val url = selector.getWorkingUrl(DEFAULT_RETRIES)
val prop = selector.getProperties

val profile = PramenDb.getProfile(jdbcConfig.driver)

val db = if (jdbcConfig.user.nonEmpty) {
Database.forURL(url = url, driver = jdbcConfig.driver, user = jdbcConfig.user.get, password = jdbcConfig.password.getOrElse(""), prop = prop)
} else {
Database.forURL(url = url, driver = jdbcConfig.driver, prop = prop)
}
new BookkeeperJdbc(db, profile, batchId)
def fromPramenDb(pramenDb: PramenDb, batchId: Long): BookkeeperJdbc = {
new BookkeeperJdbc(pramenDb, batchId, false)
}

def fromJdbcConfig(jdbcConfig: JdbcConfig, batchId: Long): BookkeeperJdbc = {
val pramenDb = PramenDb(jdbcConfig)
new BookkeeperJdbc(pramenDb, batchId, true)
}
}
Loading
Loading