diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala
index 4f09c17e..2927267c 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala
@@ -41,7 +41,11 @@ class AppContextImpl(val appConfig: AppConfig,
override def close(): Unit = synchronized {
if (closable != null) {
- closable.close()
+ try {
+ bookkeeper.close()
+ } finally {
+ closable.close()
+ }
closable = null
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala
index 4440e647..62eb8d88 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala
@@ -100,7 +100,7 @@ object Bookkeeper {
val tokenFactory = if (runtimeConfig.useLocks && bookkeepingConfig.bookkeepingEnabled) {
if (hasBookkeepingJdbc) {
log.info(s"Using RDB for lock management.")
- new TokenLockFactoryJdbc(dbOpt.get.slickDb)
+ new TokenLockFactoryJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else {
mongoDbConnection match {
case Some(connection) =>
@@ -128,7 +128,7 @@ object Bookkeeper {
log.info(s"Bookkeeping is DISABLED. Updates won't be tracked")
new BookkeeperNull()
} else if (hasBookkeepingJdbc) {
- new BookkeeperJdbc(dbOpt.get.slickDb, dbOpt.get.profile, batchId)
+ BookkeeperJdbc.fromPramenDb(dbOpt.get, batchId)
} else {
mongoDbConnection match {
case Some(connection) =>
@@ -160,7 +160,7 @@ object Bookkeeper {
new JournalNull()
} else if (hasBookkeepingJdbc) {
log.info(s"Using RDB to keep journal of executed jobs.")
- new JournalJdbc(dbOpt.get.slickDb)
+ new JournalJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else {
mongoDbConnection match {
case Some(connection) =>
@@ -193,7 +193,7 @@ object Bookkeeper {
new MetadataManagerNull(isPersistenceEnabled = false)
} else if (hasBookkeepingJdbc) {
log.info(s"Using RDB to keep custom metadata.")
- new MetadataManagerJdbc(dbOpt.get.slickDb)
+ new MetadataManagerJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else {
log.info(s"The custom metadata management is not supported.")
new MetadataManagerNull(isPersistenceEnabled = true)
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
index 7c62091b..c7594ff0 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala
@@ -18,13 +18,9 @@ package za.co.absa.pramen.core.bookkeeper
import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
-import slick.jdbc.JdbcBackend.Database
-import slick.jdbc.JdbcProfile
import za.co.absa.pramen.core.bookkeeper.model._
import za.co.absa.pramen.core.model.{DataChunk, TableSchema}
import za.co.absa.pramen.core.rdb.PramenDb
-import za.co.absa.pramen.core.rdb.PramenDb.DEFAULT_RETRIES
-import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.utils.SlickUtils.WARN_IF_LONGER_MS
import za.co.absa.pramen.core.utils.{AlgorithmUtils, SlickUtils, TimeUtils}
@@ -33,13 +29,14 @@ import java.time.LocalDate
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.control.NonFatal
-class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends BookkeeperBase(true, batchId) {
- import profile.api._
+class BookkeeperJdbc(pramenDb: PramenDb, batchId: Long, autoCloseDb: Boolean) extends BookkeeperBase(true, batchId) {
+ import pramenDb.slickProfile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._
- private val log = LoggerFactory.getLogger(this.getClass)
- private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId))
private val isClosed = new AtomicBoolean(false)
+ private val log = LoggerFactory.getLogger(this.getClass)
+ private val slickUtils = new SlickUtils(pramenDb.slickProfile)
+ private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, batchId))
override val bookkeepingEnabled: Boolean = true
@@ -47,19 +44,19 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val query = until match {
case Some(endDate) =>
val endDateStr = DataChunk.dateFormatter.format(endDate)
- BookkeepingRecords.records
+ pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === table && r.infoDate <= endDateStr)
.sortBy(r => (r.infoDate.desc, r.jobFinished.desc))
.take(1)
case None =>
- BookkeepingRecords.records
+ pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === table)
.sortBy(r => (r.infoDate.desc, r.jobFinished.desc))
.take(1)
}
val chunks = try {
- SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query)
+ slickUtils.executeQuery(pramenDb.slickDb, query)
.map(DataChunk.fromRecord)
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping table.", ex)
@@ -77,7 +74,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val query = getFilter(table, Option(infoDate), Option(infoDate), batchId)
try {
- SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query)
+ slickUtils.executeQuery(pramenDb.slickDb, query)
.map(DataChunk.fromRecord)
.toArray[DataChunk]
.sortBy(_.jobFinished)
@@ -92,7 +89,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
.take(1)
try {
- val records = SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query)
+ val records = slickUtils.executeQuery(pramenDb.slickDb, query)
.map(DataChunk.fromRecord)
.toArray[DataChunk]
@@ -109,7 +106,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
.length
val count = try {
- SlickUtils.executeCount(db, query)
+ slickUtils.executeCount(pramenDb.slickDb, query)
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping table.", ex)
}
@@ -126,7 +123,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
.sortBy(_._1)
try {
- SlickUtils.executeQuery[(Rep[String], Rep[Int], Rep[Long]), (String, Int, Long)](db, query)
+ slickUtils.executeQuery[(Rep[String], Rep[Int], Rep[Long]), (String, Int, Long)](pramenDb.slickDb, query)
.map { case (infoDateStr, recordCount, outputRecordCount) =>
val infoDate = LocalDate.parse(infoDateStr, DataChunk.dateFormatter)
DataAvailability(infoDate, recordCount, outputRecordCount)
@@ -148,9 +145,9 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val record = BookkeepingRecord(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, recordsAppended, jobStarted, jobFinished, Option(batchId))
try {
- SlickUtils.ensureDbConnected(db)
- db.run(
- BookkeepingRecords.records += record
+ slickUtils.ensureDbConnected(pramenDb.slickDb)
+ pramenDb.slickDb.run(
+ pramenDb.bookkeepingTable.records += record
).execute()
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to write to the bookkeeping table.", ex)
@@ -160,13 +157,13 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
val dateStr = DataChunk.dateFormatter.format(infoDate)
- val query = BookkeepingRecords.records
+ val query = pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === table && r.infoDate === dateStr && r.batchId =!= Option(batchId))
.delete
try {
AlgorithmUtils.runActionWithElapsedTimeEvent(WARN_IF_LONGER_MS) {
- db.run(query).execute()
+ pramenDb.slickDb.run(query).execute()
} { actualTimeMs =>
val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(actualTimeMs)
val sql = query.statements.mkString("; ")
@@ -199,37 +196,37 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
else
s"'$tableNameTrimmed' or '$likePattern'"
- val listQuery = BookkeepingRecords.records
+ val listQuery = pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.map(_.pramenTableName)
.distinct
- val tablesToDelete = SlickUtils.executeQuery(db, listQuery).sorted
+ val tablesToDelete = slickUtils.executeQuery(pramenDb.slickDb, listQuery).sorted
if (tablesToDelete.length > 100)
throw new IllegalArgumentException(s"The table wildcard '$tableName' matches more than 100 tables (${tablesToDelete.length}). To avoid accidental deletions, please refine the wildcard.")
- val deletionQuery = BookkeepingRecords.records
+ val deletionQuery = pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
try {
- val deletedBkCount = SlickUtils.executeAction(db, deletionQuery)
+ val deletedBkCount = slickUtils.executeAction(pramenDb.slickDb, deletionQuery)
log.info(s"Deleted $deletedBkCount records from the bookkeeping table for tables matching $patternForLogging: ${tablesToDelete.mkString(", ")}")
- val deletedSchemaCount = SlickUtils.executeAction(db, SchemaRecords.records
+ val deletedSchemaCount = slickUtils.executeAction(pramenDb.slickDb, pramenDb.schemaTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
log.info(s"Deleted $deletedSchemaCount records from the schemas table.")
- val deletedOffsetsCount = SlickUtils.executeAction(db, OffsetRecords.records
+ val deletedOffsetsCount = slickUtils.executeAction(pramenDb.slickDb, pramenDb.offsetTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
log.info(s"Deleted $deletedOffsetsCount records from the offsets table.")
- val deletedMetadataCount = SlickUtils.executeAction(db, MetadataRecords.records
+ val deletedMetadataCount = slickUtils.executeAction(pramenDb.slickDb, pramenDb.metadataTable.records
.filter(r => r.pramenTableName === tableNameTrimmed || r.pramenTableName.like(likePattern, escape))
.delete
)
@@ -242,8 +239,8 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
}
override def close(): Unit = {
- if (isClosed.compareAndSet(false, true)) {
- db.close()
+ if (autoCloseDb && isClosed.compareAndSet(false, true)) {
+ pramenDb.close()
}
}
@@ -251,31 +248,31 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
offsetManagement
}
- private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Query[BookkeepingRecords, BookkeepingRecord, Seq] = {
+ private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Query[pramenDb.bookkeepingTable.BookkeepingRecords, BookkeepingRecord, Seq] = {
val baseFilter = (infoDateBeginOpt, infoDateEndOpt) match {
case (Some(infoDateBegin), Some(infoDateEnd)) =>
val date0Str = DataChunk.dateFormatter.format(infoDateBegin)
val date1Str = DataChunk.dateFormatter.format(infoDateEnd)
if (date0Str == date1Str) {
- BookkeepingRecords.records
+ pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate === date0Str)
} else {
- BookkeepingRecords.records
+ pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate >= date0Str && r.infoDate <= date1Str)
}
case (Some(infoDateBegin), None) =>
val date0Str = DataChunk.dateFormatter.format(infoDateBegin)
- BookkeepingRecords.records
+ pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate >= date0Str)
case (None, Some(infoDateEnd)) =>
val date1Str = DataChunk.dateFormatter.format(infoDateEnd)
- BookkeepingRecords.records
+ pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate <= date1Str)
case (None, None) =>
- BookkeepingRecords.records
+ pramenDb.bookkeepingTable.records
.filter(r => r.pramenTableName === tableName)
}
@@ -287,11 +284,11 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
override def getLatestSchema(table: String, infoDate: LocalDate): Option[(StructType, LocalDate)] = {
val infoDateStr = infoDate.toString
- val query = SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate <= infoDateStr)
+ val query = pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate <= infoDateStr)
.sortBy(t => t.infoDate.desc)
.take(1)
- SlickUtils.executeQuery[SchemaRecords, SchemaRecord](db, query)
+ slickUtils.executeQuery(pramenDb.slickDb, query)
.map(schemaRecord => TableSchema(schemaRecord.pramenTableName, schemaRecord.infoDate, schemaRecord.schemaJson))
.flatMap(tableSchema =>
TableSchema.toSchemaAndDate(tableSchema)
@@ -303,13 +300,13 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val infoDateStr = infoDate.toString
try {
- SlickUtils.ensureDbConnected(db)
- db.run(
- SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete
+ slickUtils.ensureDbConnected(pramenDb.slickDb)
+ pramenDb.slickDb.run(
+ pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete
).execute()
- db.run(
- SchemaRecords.records += SchemaRecord(table, infoDate.toString, schema.json)
+ pramenDb.slickDb.run(
+ pramenDb.schemaTable.records += SchemaRecord(table, infoDate.toString, schema.json)
).execute()
} catch {
case NonFatal(ex) => log.error(s"Unable to write to the bookkeeping schema table.", ex)
@@ -319,12 +316,13 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
/** This method is for migration purposes*/
private[pramen] def saveSchemaRaw(table: String, infoDate: String, schema: String): Unit = {
try {
- db.run(
- SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate === infoDate).delete
+ slickUtils.ensureDbConnected(pramenDb.slickDb)
+ pramenDb.slickDb.run(
+ pramenDb.schemaTable.records.filter(t => t.pramenTableName === table && t.infoDate === infoDate).delete
).execute()
- db.run(
- SchemaRecords.records += SchemaRecord(table, infoDate, schema)
+ pramenDb.slickDb.run(
+ pramenDb.schemaTable.records += SchemaRecord(table, infoDate, schema)
).execute()
} catch {
case NonFatal(ex) => log.error(s"Unable to write to the bookkeeping schema table.", ex)
@@ -333,19 +331,12 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
}
object BookkeeperJdbc {
- def fromJdbcConfig(jdbcConfig: JdbcConfig, batchId: Long): BookkeeperJdbc = {
- val selector = JdbcUrlSelector(jdbcConfig)
- val url = selector.getWorkingUrl(DEFAULT_RETRIES)
- val prop = selector.getProperties
-
- val profile = PramenDb.getProfile(jdbcConfig.driver)
-
- val db = if (jdbcConfig.user.nonEmpty) {
- Database.forURL(url = url, driver = jdbcConfig.driver, user = jdbcConfig.user.get, password = jdbcConfig.password.getOrElse(""), prop = prop)
- } else {
- Database.forURL(url = url, driver = jdbcConfig.driver, prop = prop)
- }
- new BookkeeperJdbc(db, profile, batchId)
+ def fromPramenDb(pramenDb: PramenDb, batchId: Long): BookkeeperJdbc = {
+ new BookkeeperJdbc(pramenDb, batchId, false)
}
+ def fromJdbcConfig(jdbcConfig: JdbcConfig, batchId: Long): BookkeeperJdbc = {
+ val pramenDb = PramenDb(jdbcConfig)
+ new BookkeeperJdbc(pramenDb, batchId, true)
+ }
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala
index 685b72f4..368ab026 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala
@@ -16,7 +16,8 @@
package za.co.absa.pramen.core.bookkeeper
-import slick.jdbc.PostgresProfile.api._
+import slick.jdbc.JdbcBackend.Database
+import slick.jdbc.JdbcProfile
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model._
@@ -25,9 +26,12 @@ import za.co.absa.pramen.core.utils.SlickUtils
import java.time.{Instant, LocalDate}
import scala.util.control.NonFatal
-class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
+class OffsetManagerJdbc(db: Database, slickProfile: JdbcProfile, offsetTable: OffsetTable, batchId: Long) extends OffsetManager {
+ import slickProfile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._
+ private val slickUtils = new SlickUtils(slickProfile)
+
override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = {
val offsets = getOffsetRecords(table, infoDate)
@@ -42,16 +46,16 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
val query = onlyForInfoDate match {
case Some(infoDate) =>
val infoDateStr = infoDate.toString
- OffsetRecords.records
+ offsetTable.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDateStr && r.committedAt.isEmpty)
.sorted(r => r.infoDate)
case None =>
- OffsetRecords.records
+ offsetTable.records
.filter(r => r.pramenTableName === table && r.committedAt.isEmpty)
.sorted(r => r.infoDate)
}
- SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query)
+ slickUtils.executeQuery(db, query)
.toArray[OffsetRecord]
.map(record => OffsetRecordConverter.toDataOffset(record).asInstanceOf[UncommittedOffset])
}
@@ -73,9 +77,9 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
val record = OffsetRecord(table, infoDate.toString, offsetType.dataTypeString, "", "", batchId, createdAt.toEpochMilli, None)
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(
- OffsetRecords.records += record
+ offsetTable.records += record
).execute()
DataOffsetRequest(table, infoDate, batchId, createdAt)
@@ -84,9 +88,9 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
override def commitOffsets(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
val committedAt = Instant.now().toEpochMilli
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(
- OffsetRecords.records
+ offsetTable.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli)
.map(r => (r.minOffset, r.maxOffset, r.committedAt))
.update((minOffset.valueString, maxOffset.valueString, Some(committedAt)))
@@ -100,19 +104,16 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
val committedAt = Instant.now().toEpochMilli
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(
- OffsetRecords.records
+ offsetTable.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli)
.map(r => (r.minOffset, r.maxOffset, r.committedAt))
.update((minOffset.valueString, maxOffset.valueString, Some(committedAt)))
- ).execute()
-
- // Cleaning up previous batches
- db.run(
- OffsetRecords.records
- .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt =!= request.createdAt.toEpochMilli)
- .delete
+ .andThen(offsetTable.records
+ .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt =!= request.createdAt.toEpochMilli)
+ .delete
+ ).transactionally
).execute()
}
@@ -124,16 +125,16 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAtMilli))
}
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(
- OffsetRecords.records ++= records
+ offsetTable.records ++= records
).execute()
commitRequests.map(r => (r.table, r.infoDate))
.distinct
.foreach { case (table, infoDate) =>
db.run(
- OffsetRecords.records
+ offsetTable.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDate.toString && r.committedAt =!= committedAtMilli)
.delete
).execute()
@@ -141,21 +142,21 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
}
override def rollbackOffsets(request: DataOffsetRequest): Unit = {
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(
- OffsetRecords.records
+ offsetTable.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli)
.delete
).execute()
}
private[core] def getMaximumInfoDate(table: String): Option[LocalDate] = {
- val query = OffsetRecords.records
+ val query = offsetTable.records
.filter(r => r.pramenTableName === table)
.map(_.infoDate).max
try {
- SlickUtils.executeMaxString(db, query)
+ slickUtils.executeMaxString(db, query)
.map(LocalDate.parse)
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex)
@@ -164,10 +165,10 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
private[core] def getOffsetRecords(table: String, infoDate: LocalDate): Array[OffsetRecord] = {
val infoDateStr = infoDate.toString
- val query = OffsetRecords.records
+ val query = offsetTable.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDateStr)
- SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query)
+ slickUtils.executeQuery(db, query)
.toArray[OffsetRecord]
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala
deleted file mode 100644
index 9efa9908..00000000
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2022 ABSA Group Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package za.co.absa.pramen.core.bookkeeper.model
-
-import slick.jdbc.PostgresProfile.api._
-import slick.lifted.TableQuery
-
-class BookkeepingRecords(tag: Tag) extends Table[BookkeepingRecord](tag, "bookkeeping") {
- def pramenTableName = column[String]("watcher_table_name", O.Length(128))
- def infoDate = column[String]("info_date", O.Length(20))
- def infoDateBegin = column[String]("info_date_begin", O.Length(20))
- def infoDateEnd = column[String]("info_date_end", O.Length(20))
- def inputRecordCount = column[Long]("input_record_count")
- def outputRecordCount = column[Long]("output_record_count")
- def appendedRecordCount = column[Option[Long]]("appended_record_count")
- def jobStarted = column[Long]("job_started")
- def jobFinished = column[Long]("job_finished")
- def batchId = column[Option[Long]]("batch_id")
- def * = (pramenTableName, infoDate, infoDateBegin, infoDateEnd,
- inputRecordCount, outputRecordCount, appendedRecordCount,
- jobStarted, jobFinished, batchId) <> (BookkeepingRecord.tupled, BookkeepingRecord.unapply)
- def idx1 = index("bk_idx_1", (pramenTableName, infoDate), unique = false)
-}
-
-object BookkeepingRecords {
- lazy val records = TableQuery[BookkeepingRecords]
-}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala
new file mode 100644
index 00000000..f249ceef
--- /dev/null
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingTable.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.bookkeeper.model
+
+import slick.jdbc.JdbcProfile
+
+trait BookkeepingTable {
+ val profile: JdbcProfile
+ import profile.api._
+
+ class BookkeepingRecords(tag: Tag) extends Table[BookkeepingRecord](tag, "bookkeeping") {
+ def pramenTableName = column[String]("watcher_table_name", O.Length(128))
+ def infoDate = column[String]("info_date", O.Length(20))
+ def infoDateBegin = column[String]("info_date_begin", O.Length(20))
+ def infoDateEnd = column[String]("info_date_end", O.Length(20))
+ def inputRecordCount = column[Long]("input_record_count")
+ def outputRecordCount = column[Long]("output_record_count")
+ def appendedRecordCount = column[Option[Long]]("appended_record_count")
+ def jobStarted = column[Long]("job_started")
+ def jobFinished = column[Long]("job_finished")
+ def batchId = column[Option[Long]]("batch_id")
+
+ def * =
+ (pramenTableName, infoDate, infoDateBegin, infoDateEnd,
+ inputRecordCount, outputRecordCount, appendedRecordCount,
+ jobStarted, jobFinished, batchId)
+ .<> (BookkeepingRecord.tupled, BookkeepingRecord.unapply)
+
+ def idx1 = index("bk_idx_1", (pramenTableName, infoDate), unique = false)
+ }
+
+ lazy val records = TableQuery[BookkeepingRecords]
+}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataRecords.scala
deleted file mode 100644
index 767eba0c..00000000
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataRecords.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2022 ABSA Group Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package za.co.absa.pramen.core.bookkeeper.model
-
-import slick.jdbc.PostgresProfile.api._
-
-class MetadataRecords(tag: Tag) extends Table[MetadataRecord](tag, "metadata") {
- def pramenTableName = column[String]("table_name", O.Length(128))
- def infoDate = column[String]("info_date", O.Length(20))
- def key = column[String]("key", O.Length(255))
- def value = column[String]("value")
- def lastUpdated = column[Long]("last_updated")
- def * = (pramenTableName, infoDate, key, value, lastUpdated) <> (MetadataRecord.tupled, MetadataRecord.unapply)
- def idx1 = index("meta_idx_1", (pramenTableName, infoDate, key), unique = true)
- def idx2 = index("meta_idx_2", (pramenTableName, infoDate), unique = false)
-}
-
-object MetadataRecords {
- lazy val records = TableQuery[MetadataRecords]
-}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala
new file mode 100644
index 00000000..d803bb86
--- /dev/null
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/MetadataTable.scala
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.bookkeeper.model
+
+import slick.jdbc.JdbcProfile
+
+trait MetadataTable {
+ val profile: JdbcProfile
+ import profile.api._
+
+ class MetadataRecords(tag: Tag) extends Table[MetadataRecord](tag, "metadata") {
+ def pramenTableName = column[String]("table_name", O.Length(128))
+ def infoDate = column[String]("info_date", O.Length(20))
+ def key = column[String]("key", O.Length(255))
+ def value = column[String]("value")
+ def lastUpdated = column[Long]("last_updated")
+ def * = (pramenTableName, infoDate, key, value, lastUpdated) <> (MetadataRecord.tupled, MetadataRecord.unapply)
+ def idx1 = index("meta_idx_1", (pramenTableName, infoDate, key), unique = true)
+ def idx2 = index("meta_idx_2", (pramenTableName, infoDate), unique = false)
+ }
+
+ lazy val records = TableQuery[MetadataRecords]
+}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala
deleted file mode 100644
index 22e6aa05..00000000
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2022 ABSA Group Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package za.co.absa.pramen.core.bookkeeper.model
-
-import slick.jdbc.PostgresProfile.api._
-
-class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") {
- def pramenTableName = column[String]("table_name", O.Length(256))
- def infoDate = column[String]("info_date", O.Length(20))
- def dataType = column[String]("data_type", O.Length(20))
- def minOffset = column[String]("min_offset")
- def maxOffset = column[String]("max_offset")
- def batchId = column[Long]("batch_id")
- def createdAt = column[Long]("created_at")
- def committedAt = column[Option[Long]]("committed_at")
- def * = (pramenTableName, infoDate, dataType, minOffset, maxOffset, batchId, createdAt, committedAt) <> (OffsetRecord.tupled, OffsetRecord.unapply)
- def idx1 = index("offset_idx_1", (pramenTableName, infoDate, createdAt), unique = true)
- def idx2 = index("offset_idx_2", (pramenTableName, committedAt), unique = false)
-}
-
-object OffsetRecords {
- lazy val records = TableQuery[OffsetRecords]
-}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala
new file mode 100644
index 00000000..162b1608
--- /dev/null
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetTable.scala
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.bookkeeper.model
+
+import slick.jdbc.JdbcProfile
+
+trait OffsetTable {
+ val profile: JdbcProfile
+ import profile.api._
+
+ class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") {
+ def pramenTableName = column[String]("table_name", O.Length(256))
+ def infoDate = column[String]("info_date", O.Length(20))
+ def dataType = column[String]("data_type", O.Length(20))
+ def minOffset = column[String]("min_offset")
+ def maxOffset = column[String]("max_offset")
+ def batchId = column[Long]("batch_id")
+ def createdAt = column[Long]("created_at")
+ def committedAt = column[Option[Long]]("committed_at")
+ def * = (pramenTableName, infoDate, dataType, minOffset, maxOffset, batchId, createdAt, committedAt) <> (OffsetRecord.tupled, OffsetRecord.unapply)
+ def idx1 = index("offset_idx_1", (pramenTableName, infoDate, createdAt), unique = true)
+ def idx2 = index("offset_idx_2", (pramenTableName, committedAt), unique = false)
+ }
+
+ lazy val records = TableQuery[OffsetRecords]
+}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala
similarity index 56%
rename from pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaRecords.scala
rename to pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala
index d6c9a0d6..7ad1a60c 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaRecords.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/SchemaTable.scala
@@ -16,17 +16,19 @@
package za.co.absa.pramen.core.bookkeeper.model
-import slick.jdbc.PostgresProfile.api._
+import slick.jdbc.JdbcProfile
-class SchemaRecords(tag: Tag) extends Table[SchemaRecord](tag, "schemas") {
- def pramenTableName = column[String]("watcher_table_name", O.Length(128))
- def infoDate = column[String]("info_date", O.Length(20))
- def schemaJson = column[String]("schema_json")
- def * = (pramenTableName, infoDate, schemaJson) <> (SchemaRecord.tupled, SchemaRecord.unapply)
- def idx1 = index("sch_idx_1", (pramenTableName, infoDate), unique = true)
-}
+trait SchemaTable {
+ val profile: JdbcProfile
+ import profile.api._
+
+ class SchemaRecords(tag: Tag) extends Table[SchemaRecord](tag, "schemas") {
+ def pramenTableName = column[String]("watcher_table_name", O.Length(128))
+ def infoDate = column[String]("info_date", O.Length(20))
+ def schemaJson = column[String]("schema_json")
+ def * = (pramenTableName, infoDate, schemaJson) <> (SchemaRecord.tupled, SchemaRecord.unapply)
+ def idx1 = index("sch_idx_1", (pramenTableName, infoDate), unique = true)
+ }
-object SchemaRecords {
lazy val records = TableQuery[SchemaRecords]
}
-
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala
index ea0b562c..0558af39 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala
@@ -17,19 +17,26 @@
package za.co.absa.pramen.core.journal
import org.slf4j.LoggerFactory
-import slick.jdbc.PostgresProfile.api._
+import slick.jdbc.JdbcBackend.Database
+import slick.jdbc.JdbcProfile
import za.co.absa.pramen.core.app.config.InfoDateConfig
-import za.co.absa.pramen.core.journal.model.{JournalTask, JournalTasks, TaskCompleted}
+import za.co.absa.pramen.core.journal.model.{JournalTable, JournalTask, TaskCompleted}
import za.co.absa.pramen.core.utils.SlickUtils
import java.time.{Instant, LocalDate}
import scala.util.control.NonFatal
-class JournalJdbc(db: Database) extends Journal {
+class JournalJdbc(db: Database, slickProfile: JdbcProfile) extends Journal {
+ import slickProfile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._
private val log = LoggerFactory.getLogger(this.getClass)
private val dateFormatter = InfoDateConfig.defaultDateFormatter
+ private val slickUtils = new SlickUtils(slickProfile)
+
+ private val journalTable = new JournalTable {
+ override val profile = slickProfile
+ }
override def addEntry(entry: TaskCompleted): Unit = {
val periodBegin = entry.periodBegin.format(dateFormatter)
@@ -60,9 +67,9 @@ class JournalJdbc(db: Database) extends Journal {
Option(entry.batchId))
try {
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(
- JournalTasks.journalTasks += journalTask
+ journalTable.records += journalTask
).execute()
} catch {
case NonFatal(ex) => log.error(s"Unable to write to the journal table.", ex)
@@ -73,7 +80,7 @@ class JournalJdbc(db: Database) extends Journal {
val fromSec = from.getEpochSecond
val toSec = to.getEpochSecond
- val entries = SlickUtils.executeQuery(db, JournalTasks.journalTasks.filter(d => d.finishedAt >= fromSec && d.finishedAt <= toSec ))
+ val entries = slickUtils.executeQuery(db, journalTable.records.filter(d => d.finishedAt >= fromSec && d.finishedAt <= toSec ))
entries.map(v => {
val recordCountOpt = if (v.inputRecordCount < 0) None else Option(v.inputRecordCount)
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala
new file mode 100644
index 00000000..9d83c119
--- /dev/null
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTable.scala
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.journal.model
+
+import slick.jdbc.JdbcProfile
+
+trait JournalTable {
+ val profile: JdbcProfile
+ import profile.api._
+
+ class JournalRecords(tag: Tag) extends Table[JournalTask](tag, "journal") {
+ def jobName = column[String]("job_name", O.Length(200))
+ def pramenTableName = column[String]("watcher_table_name", O.Length(128))
+ def periodBegin = column[String]("period_begin", O.Length(20))
+ def periodEnd = column[String]("period_end", O.Length(20))
+ def informationDate = column[String]("information_date", O.Length(20))
+ def inputRecordCount = column[Long]("input_record_count")
+ def inputRecordCountOld = column[Long]("input_record_count_old")
+ def outputRecordCount = column[Option[Long]]("output_record_count")
+ def outputRecordCountOld = column[Option[Long]]("output_record_count_old")
+ def appendedRecordCount = column[Option[Long]]("appended_record_count")
+ def outputSize = column[Option[Long]]("output_size")
+ def startedAt = column[Long]("started_at")
+ def finishedAt = column[Long]("finished_at")
+ def status = column[String]("status", O.Length(50))
+ def failureReason = column[Option[String]]("failure_reason")
+ def sparkApplicationId = column[Option[String]]("spark_application_id", O.Length(128))
+ def pipelineId = column[Option[String]]("pipelineId", O.Length(40))
+ def pipelineName = column[Option[String]]("pipelineName", O.Length(200))
+ def environmentName = column[Option[String]]("environmentName", O.Length(128))
+ def tenant = column[Option[String]]("tenant", O.Length(200))
+ def country = column[Option[String]]("country", O.Length(50))
+ def batchId = column[Option[Long]]("batch_id")
+ def * = (jobName, pramenTableName, periodBegin, periodEnd,
+ informationDate, inputRecordCount, inputRecordCountOld, outputRecordCount,
+ outputRecordCountOld, appendedRecordCount, outputSize, startedAt, finishedAt,
+ status, failureReason, sparkApplicationId, pipelineId, pipelineName, environmentName,
+ tenant, country, batchId) <> (JournalTask.tupled, JournalTask.unapply)
+ def idx1 = index("idx_started_at", startedAt, unique = false)
+ def idx2 = index("idx_finished_at", finishedAt, unique = false)
+ }
+
+ lazy val records = TableQuery[JournalRecords]
+}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala
deleted file mode 100644
index 27693875..00000000
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2022 ABSA Group Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package za.co.absa.pramen.core.journal.model
-
-import slick.jdbc.PostgresProfile.api._
-import slick.lifted.TableQuery
-
-class JournalTasks(tag: Tag) extends Table[JournalTask](tag, "journal") {
- def jobName = column[String]("job_name", O.Length(200))
- def pramenTableName = column[String]("watcher_table_name", O.Length(128))
- def periodBegin = column[String]("period_begin", O.Length(20))
- def periodEnd = column[String]("period_end", O.Length(20))
- def informationDate = column[String]("information_date", O.Length(20))
- def inputRecordCount = column[Long]("input_record_count")
- def inputRecordCountOld = column[Long]("input_record_count_old")
- def outputRecordCount = column[Option[Long]]("output_record_count")
- def outputRecordCountOld = column[Option[Long]]("output_record_count_old")
- def appendedRecordCount = column[Option[Long]]("appended_record_count")
- def outputSize = column[Option[Long]]("output_size")
- def startedAt = column[Long]("started_at")
- def finishedAt = column[Long]("finished_at")
- def status = column[String]("status", O.Length(50))
- def failureReason = column[Option[String]]("failure_reason")
- def sparkApplicationId = column[Option[String]]("spark_application_id", O.Length(128))
- def pipelineId = column[Option[String]]("pipelineId", O.Length(40))
- def pipelineName = column[Option[String]]("pipelineName", O.Length(200))
- def environmentName = column[Option[String]]("environmentName", O.Length(128))
- def tenant = column[Option[String]]("tenant", O.Length(200))
- def country = column[Option[String]]("country", O.Length(50))
- def batchId = column[Option[Long]]("batch_id")
- def * = (jobName, pramenTableName, periodBegin, periodEnd,
- informationDate, inputRecordCount, inputRecordCountOld, outputRecordCount,
- outputRecordCountOld, appendedRecordCount, outputSize, startedAt, finishedAt,
- status, failureReason, sparkApplicationId, pipelineId, pipelineName, environmentName,
- tenant, country, batchId) <> (JournalTask.tupled, JournalTask.unapply)
- def idx1 = index("idx_started_at", startedAt, unique = false)
- def idx2 = index("idx_finished_at", finishedAt, unique = false)
-}
-
-object JournalTasks {
- lazy val journalTasks = TableQuery[JournalTasks]
-}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryJdbc.scala
index ae0f7a04..f57a9a2a 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryJdbc.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryJdbc.scala
@@ -16,12 +16,13 @@
package za.co.absa.pramen.core.lock
-import slick.jdbc.PostgresProfile.api._
+import slick.jdbc.JdbcBackend.Database
+import slick.jdbc.JdbcProfile
import za.co.absa.pramen.api.lock.{TokenLock, TokenLockFactory}
-class TokenLockFactoryJdbc(db: Database) extends TokenLockFactory {
+class TokenLockFactoryJdbc(db: Database, slickProfile: JdbcProfile) extends TokenLockFactory {
override def getLock(token: String): TokenLock = {
- new TokenLockJdbc(token, db)
+ new TokenLockJdbc(token, db, slickProfile)
}
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala
index e125993d..2f30ff4e 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala
@@ -17,8 +17,9 @@
package za.co.absa.pramen.core.lock
import org.slf4j.LoggerFactory
-import slick.jdbc.PostgresProfile.api._
-import za.co.absa.pramen.core.lock.model.{LockTicket, LockTickets}
+import slick.jdbc.JdbcBackend.Database
+import slick.jdbc.JdbcProfile
+import za.co.absa.pramen.core.lock.model.{LockTicket, LockTicketTable}
import za.co.absa.pramen.core.utils.SlickUtils
import java.sql.SQLIntegrityConstraintViolationException
@@ -27,16 +28,22 @@ import java.time.temporal.ChronoUnit
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
-class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) {
+class TokenLockJdbc(token: String, db: Database, slickProfile: JdbcProfile) extends TokenLockBase(token) {
+ import slickProfile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._
private val TICKETS_HARD_EXPIRE_DAYS = 1
private val log = LoggerFactory.getLogger(this.getClass)
+ private val slickUtils = new SlickUtils(slickProfile)
+
+ private val lockTicketTable = new LockTicketTable {
+ override val profile = slickProfile
+ }
/** Invoked from a synchronized block. */
override def tryAcquireGuardLock(retries: Int = 3, thisTry: Int = 0): Boolean = {
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
def tryAcquireExistingTicket(): Boolean = {
val ticket = getTicket
@@ -66,11 +73,18 @@ class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) {
ok match {
case Success(_) =>
true
- case Failure(_: SQLIntegrityConstraintViolationException) =>
+ case Failure(_: SQLIntegrityConstraintViolationException) => // HSQLDB and possible other DB engines
+ tryAcquireExistingTicket()
+ case Failure(_: org.postgresql.util.PSQLException) => // PostgreSQL
+ tryAcquireExistingTicket()
+ case Failure(_: org.sqlite.SQLiteException) => // SQLite
+ tryAcquireExistingTicket()
+ case Failure(sqlEx: java.sql.SQLException) if sqlEx.getSQLState != null && (sqlEx.getSQLState == "23505" || sqlEx.getSQLState == "23000") => // Conformant JDBC engines
+ // 23505: unique violation; 23000: integrity constraint violation (common umbrella)
tryAcquireExistingTicket()
- case Failure(_: org.postgresql.util.PSQLException) =>
+ case Failure(ex) if ex.getMessage.contains("constraint") => // MS SQL Server
tryAcquireExistingTicket()
- case Failure(ex) if ex.getMessage.contains("constraint") =>
+ case Failure(ex) if ex.getMessage.toLowerCase.contains("duplicate entry") => // MySQL
tryAcquireExistingTicket()
case Failure(ex) =>
throw new IllegalStateException(s"Unable to acquire a lock by querying the DB", ex)
@@ -84,9 +98,9 @@ class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) {
val now = Instant.now()
val nowEpoch = now.getEpochSecond
val hardExpireTickets = now.minus(TICKETS_HARD_EXPIRE_DAYS, ChronoUnit.DAYS).getEpochSecond
- SlickUtils.executeAction(
+ slickUtils.executeAction(
db,
- LockTickets.lockTickets
+ lockTicketTable.records
.filter(ticket => (ticket.token === escapedToken && ticket.owner === owner) ||
(ticket.createdAt.isDefined && ticket.createdAt < hardExpireTickets && ticket.expires < nowEpoch)).delete
)
@@ -102,7 +116,7 @@ class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) {
try {
log.debug(s"Update $escapedToken to $newTicket")
- db.run(LockTickets.lockTickets
+ db.run(lockTicketTable.records
.filter(_.token === escapedToken)
.map(_.expires)
.update(newTicket))
@@ -115,8 +129,8 @@ class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) {
/** Invoked from a synchronized block. */
private def getTicket: Option[LockTicket] = {
- val ticket = SlickUtils.executeQuery(db,
- LockTickets.lockTickets
+ val ticket = slickUtils.executeQuery(db,
+ lockTicketTable.records
.filter(_.token === escapedToken))
ticket.headOption
}
@@ -125,7 +139,7 @@ class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) {
private def acquireGuardLock(): Unit = {
val now = Instant.now().getEpochSecond
db.run(DBIO.seq(
- LockTickets.lockTickets += LockTicket(escapedToken, owner, expires = getNewTicket, createdAt = Option(now))
+ lockTicketTable.records += LockTicket(escapedToken, owner, expires = getNewTicket, createdAt = Option(now))
)).execute()
}
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/model/LockTickets.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/model/LockTicketTable.scala
similarity index 53%
rename from pramen/core/src/main/scala/za/co/absa/pramen/core/lock/model/LockTickets.scala
rename to pramen/core/src/main/scala/za/co/absa/pramen/core/lock/model/LockTicketTable.scala
index cb50b4ec..50474607 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/model/LockTickets.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/model/LockTicketTable.scala
@@ -16,17 +16,19 @@
package za.co.absa.pramen.core.lock.model
-import slick.jdbc.PostgresProfile.api._
-import slick.lifted.TableQuery
+import slick.jdbc.JdbcProfile
-class LockTickets(tag: Tag) extends Table[LockTicket](tag, "lock_tickets") {
- def token = column[String]("token", O.PrimaryKey, O.Length(255))
- def owner = column[String]("owner", O.Length(255))
- def expires = column[Long]("expires")
- def createdAt = column[Option[Long]]("created_at")
- def * = (token, owner, expires, createdAt) <> (LockTicket.tupled, LockTicket.unapply)
-}
+trait LockTicketTable {
+ val profile: JdbcProfile
+ import profile.api._
+
+ class LockTicketRecords(tag: Tag) extends Table[LockTicket](tag, "lock_tickets") {
+ def token = column[String]("token", O.PrimaryKey, O.Length(255))
+ def owner = column[String]("owner", O.Length(255))
+ def expires = column[Long]("expires")
+ def createdAt = column[Option[Long]]("created_at")
+ def * = (token, owner, expires, createdAt) <> (LockTicket.tupled, LockTicket.unapply)
+ }
-object LockTickets {
- lazy val lockTickets = TableQuery[LockTickets]
+ lazy val records = TableQuery[LockTicketRecords]
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala
index a7dc90c2..a630784a 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala
@@ -16,23 +16,30 @@
package za.co.absa.pramen.core.metadata
-import slick.jdbc.PostgresProfile.api._
+import slick.jdbc.JdbcBackend.Database
+import slick.jdbc.JdbcProfile
import za.co.absa.pramen.api.MetadataValue
-import za.co.absa.pramen.core.bookkeeper.model.{MetadataRecord, MetadataRecords}
+import za.co.absa.pramen.core.bookkeeper.model.{MetadataRecord, MetadataTable}
import za.co.absa.pramen.core.utils.SlickUtils
import java.time.{Instant, LocalDate}
import scala.util.control.NonFatal
-class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
+class MetadataManagerJdbc(db: Database, slickProfile: JdbcProfile) extends MetadataManagerBase(true) {
+ import slickProfile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._
+ private val slickUtils = new SlickUtils(slickProfile)
+ private val metadataTable = new MetadataTable {
+ override val profile = slickProfile
+ }
+
def getMetadataFromStorage(tableName: String, infoDate: LocalDate, key: String): Option[MetadataValue] = {
- val query = MetadataRecords.records
+ val query = metadataTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString && r.key === key)
try {
- SlickUtils.executeQuery[MetadataRecords, MetadataRecord](db, query)
+ slickUtils.executeQuery(db, query)
.headOption
.map(r => MetadataValue(r.value, Instant.ofEpochSecond(r.lastUpdated)))
} catch {
@@ -41,11 +48,11 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
}
def getMetadataFromStorage(tableName: String, infoDate: LocalDate): Map[String, MetadataValue] = {
- val query = MetadataRecords.records
+ val query = metadataTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString)
try {
- SlickUtils.executeQuery[MetadataRecords, MetadataRecord](db, query)
+ slickUtils.executeQuery(db, query)
.map(r => r.key -> MetadataValue(r.value, Instant.ofEpochSecond(r.lastUpdated)))
.toMap
} catch {
@@ -57,13 +64,13 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
val record = MetadataRecord(tableName, infoDate.toString, key, metadata.value, metadata.lastUpdated.getEpochSecond)
try {
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(
- MetadataRecords.records
+ metadataTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString && r.key === key)
.delete
.andThen(
- MetadataRecords.records += record
+ metadataTable.records += record
)
.transactionally
).execute()
@@ -73,11 +80,11 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
}
def deleteMetadataFromStorage(tableName: String, infoDate: LocalDate, key: String): Unit = {
- val query = MetadataRecords.records
+ val query = metadataTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString && r.key === key)
try {
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(query.delete).execute()
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to delete from the metadata table.", ex)
@@ -85,11 +92,11 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
}
def deleteMetadataFromStorage(tableName: String, infoDate: LocalDate): Unit = {
- val query = MetadataRecords.records
+ val query = metadataTable.records
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString)
try {
- SlickUtils.ensureDbConnected(db)
+ slickUtils.ensureDbConnected(db)
db.run(query.delete).execute()
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to delete from the metadata table.", ex)
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
index d228c6d7..b96be711 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
@@ -751,18 +751,18 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val oldColumnCell = change match {
case _: FieldChange.NewField => TextElement("")
- case c: FieldChange.DeletedField => TextElement(s"${c.columnName} (${c.dataType})")
- case c: FieldChange.ChangedType => TextElement(s"${c.columnName} (${c.oldType})")
+ case c: FieldChange.DeletedField => TextElement(s"${StringUtils.escapeHTML(c.columnName)} (${StringUtils.escapeHTML(c.dataType)})")
+ case c: FieldChange.ChangedType => TextElement(s"${StringUtils.escapeHTML(c.columnName)} (${StringUtils.escapeHTML(c.oldType)})")
}
val newColumnCell = change match {
- case c: FieldChange.NewField => TextElement(s"${c.columnName} (${c.dataType})")
+ case c: FieldChange.NewField => TextElement(s"${StringUtils.escapeHTML(c.columnName)} (${StringUtils.escapeHTML(c.dataType)})")
case _: FieldChange.DeletedField => TextElement("")
- case c: FieldChange.ChangedType => TextElement(s"${c.columnName} (${c.newType})")
+ case c: FieldChange.ChangedType => TextElement(s"${StringUtils.escapeHTML(c.columnName)} (${StringUtils.escapeHTML(c.newType)})")
}
tableBuilder.withRow(Seq(
- TextElement(diff.tableName),
+ TextElement(StringUtils.escapeHTML(diff.tableName)),
changeCell,
oldColumnCell,
newColumnCell,
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala
index e4c09d0a..d00f24ec 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala
@@ -16,33 +16,52 @@
package za.co.absa.pramen.core.rdb
-import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.{JdbcBackend, JdbcProfile}
import slick.util.AsyncExecutor
-import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingRecords, MetadataRecords, OffsetRecords, SchemaRecords}
-import za.co.absa.pramen.core.journal.model.JournalTasks
-import za.co.absa.pramen.core.lock.model.LockTickets
+import za.co.absa.pramen.api.Pramen
+import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingTable, MetadataTable, OffsetTable, SchemaTable}
+import za.co.absa.pramen.core.journal.model.JournalTable
+import za.co.absa.pramen.core.lock.model.LockTicketTable
import za.co.absa.pramen.core.rdb.PramenDb.MODEL_VERSION
import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.utils.{AlgorithmUtils, UsingUtils}
import java.sql.Connection
+import java.util.concurrent.atomic.AtomicBoolean
import scala.util.Try
import scala.util.control.NonFatal
class PramenDb(val jdbcConfig: JdbcConfig,
val activeUrl: String,
val slickDb: Database,
- val profile: JdbcProfile) extends AutoCloseable {
- def db: Database = slickDb
-
- import profile.api._
+ val slickProfile: JdbcProfile) extends AutoCloseable {
+ import slickProfile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._
+ val bookkeepingTable: BookkeepingTable = new BookkeepingTable {
+ override val profile = slickProfile
+ }
+ val schemaTable: SchemaTable = new SchemaTable {
+ override val profile = slickProfile
+ }
+ val offsetTable: OffsetTable = new OffsetTable {
+ override val profile = slickProfile
+ }
+ val journalTable: JournalTable = new JournalTable {
+ override val profile = slickProfile
+ }
+ val lockTicketTable: LockTicketTable = new LockTicketTable {
+ override val profile = slickProfile
+ }
+ val metadataTable: MetadataTable = new MetadataTable {
+ override val profile = slickProfile
+ }
+
private val log = LoggerFactory.getLogger(this.getClass)
+ private val isClosed = new AtomicBoolean(false)
private def setupDatabase(jdbcConnection: Connection): Unit = {
// Explicitly set auto-commit to true, overriding any user JDBC settings or PostgreSQL defaults
@@ -62,51 +81,51 @@ class PramenDb(val jdbcConfig: JdbcConfig,
private def initDatabase(dbVersion: Int): Unit = {
log.warn(s"Initializing new database at $activeUrl")
if (dbVersion < 1) {
- initTable(LockTickets.lockTickets.schema)
- initTable(JournalTasks.journalTasks.schema)
- initTable(BookkeepingRecords.records.schema)
+ initTable(lockTicketTable.records.schema)
+ initTable(journalTable.records.schema)
+ initTable(bookkeepingTable.records.schema)
}
if (dbVersion < 2) {
- initTable(SchemaRecords.records.schema)
+ initTable(schemaTable.records.schema)
}
if (dbVersion < 3) {
- initTable(MetadataRecords.records.schema)
+ initTable(metadataTable.records.schema)
}
if (0 < dbVersion && dbVersion < 4) {
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "spark_application_id", "varchar(128)")
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "pipelineId", "varchar(40)")
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "pipelineName", "varchar(200)")
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "environmentName", "varchar(128)")
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "tenant", "varchar(200)")
+ addColumn(journalTable.records.baseTableRow.tableName, "spark_application_id", "varchar(128)")
+ addColumn(journalTable.records.baseTableRow.tableName, "pipelineId", "varchar(40)")
+ addColumn(journalTable.records.baseTableRow.tableName, "pipelineName", "varchar(200)")
+ addColumn(journalTable.records.baseTableRow.tableName, "environmentName", "varchar(128)")
+ addColumn(journalTable.records.baseTableRow.tableName, "tenant", "varchar(200)")
}
if (dbVersion < 5) {
- initTable(OffsetRecords.records.schema)
+ initTable(offsetTable.records.schema)
}
if (0 < dbVersion && dbVersion < 6) {
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "appended_record_count", "bigint")
+ addColumn(journalTable.records.baseTableRow.tableName, "appended_record_count", "bigint")
}
if (0 < dbVersion && dbVersion < 7) {
- addColumn(LockTickets.lockTickets.baseTableRow.tableName, "created_at", "bigint")
+ addColumn(lockTicketTable.records.baseTableRow.tableName, "created_at", "bigint")
}
if (0 < dbVersion && dbVersion < 8) {
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "country", "varchar(50)")
+ addColumn(journalTable.records.baseTableRow.tableName, "country", "varchar(50)")
}
if (0 < dbVersion && dbVersion < 9) {
- addColumn(BookkeepingRecords.records.baseTableRow.tableName, "batch_id", "bigint")
- addColumn(BookkeepingRecords.records.baseTableRow.tableName, "appended_record_count", "bigint")
- addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "batch_id", "bigint")
+ addColumn(bookkeepingTable.records.baseTableRow.tableName, "batch_id", "bigint")
+ addColumn(bookkeepingTable.records.baseTableRow.tableName, "appended_record_count", "bigint")
+ addColumn(journalTable.records.baseTableRow.tableName, "batch_id", "bigint")
}
}
- private def initTable(schema: profile.SchemaDescription): Unit = {
+ private def initTable(schema: slickProfile.SchemaDescription): Unit = {
try {
- db.run(DBIO.seq(
+ slickDb.run(DBIO.seq(
schema.createIfNotExists
)).execute()
} catch {
@@ -118,9 +137,9 @@ class PramenDb(val jdbcConfig: JdbcConfig,
private def addColumn(table: String, columnName: String, columnType: String): Unit = {
try {
- val quotedTable = s""""$table""""
- val quotedColumnName = s""""$columnName""""
- db.run(
+ val quotedTable = slickProfile.quoteIdentifier(table)
+ val quotedColumnName = slickProfile.quoteIdentifier(columnName)
+ slickDb.run(
sqlu"ALTER TABLE #$quotedTable ADD #$quotedColumnName #$columnType"
).execute()
} catch {
@@ -132,18 +151,19 @@ class PramenDb(val jdbcConfig: JdbcConfig,
override def close(): Unit = {
try {
- slickDb.close()
+ if (isClosed.compareAndSet(false, true)) {
+ slickDb.close()
+ }
} catch {
case NonFatal(ex) =>
log.warn("Error closing the Pramen RDB database connection.", ex)
}
-
}
}
object PramenDb {
private val log = LoggerFactory.getLogger(this.getClass)
- private val conf = ConfigFactory.load()
+ private val conf = Pramen.getConfig
val MODEL_VERSION = 9
val DEFAULT_RETRIES: Int = conf.getInt("pramen.internal.connection.retries.default")
@@ -167,6 +187,7 @@ object PramenDb {
case "org.hsqldb.jdbc.JDBCDriver" => slick.jdbc.HsqldbProfile
case "org.h2.Driver" => slick.jdbc.H2Profile
case "org.sqlite.JDBC" => slick.jdbc.SQLiteProfile
+ case "oracle.jdbc.OracleDriver" => slick.jdbc.OracleProfile
case "com.mysql.cj.jdbc.Driver" | "com.mysql.jdbc.Driver" =>
slick.jdbc.MySQLProfile
case "com.microsoft.sqlserver.jdbc.SQLServerDriver" | "net.sourceforge.jtds.jdbc.Driver" =>
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
index e88a65d9..5f0e7955 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
@@ -25,6 +25,7 @@ import za.co.absa.pramen.api.{NotificationBuilder, PipelineInfo, PipelineNotific
import za.co.absa.pramen.core.app.config.RuntimeConfig.{DRY_RUN, EMAIL_IF_NO_CHANGES, UNDERCOVER}
import za.co.absa.pramen.core.app.config.{HookConfig, RuntimeConfig}
import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, WARN_THROUGHPUT_RPS}
+import za.co.absa.pramen.core.exceptions.OsSignalException
import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager}
import za.co.absa.pramen.core.notify.PipelineNotificationTargetFactory
import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail}
@@ -212,11 +213,11 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
override def run(): Unit = {
if (!exitedNormally && !isFinished) {
if (failureException.isEmpty && signalException.isEmpty) {
- setFailureException(new IllegalStateException("The application exited unexpectedly."))
-
val nonDaemonStackTraces = JvmUtils.getStackTraces
val renderedStackTraces = JvmUtils.renderStackTraces(nonDaemonStackTraces)
+ setFailureException(OsSignalException("System.exit()", nonDaemonStackTraces))
+
log.error("Stack traces at the moment of the unexpected exit:\n" + renderedStackTraces)
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala
index 2f404bbf..c2d6a626 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala
@@ -17,22 +17,20 @@
package za.co.absa.pramen.core.utils
import org.slf4j.LoggerFactory
-import slick.dbio.Effect
-import slick.jdbc.PostgresProfile.api._
+import slick.jdbc.JdbcProfile
import slick.sql.SqlAction
import za.co.absa.pramen.core.rdb.PramenDb
import java.time.{Duration, Instant}
import scala.util.control.NonFatal
-object SlickUtils {
-
+class SlickUtils(profile: JdbcProfile) {
+ import SlickUtils._
+ import profile.api._
import za.co.absa.pramen.core.utils.FutureImplicits._
private val log = LoggerFactory.getLogger(this.getClass)
- val WARN_IF_LONGER_MS = 1000L
-
/**
* Synchronously executes a query against a JDBC connection.
* - Handles exceptions
@@ -188,3 +186,7 @@ object SlickUtils {
}
}
}
+
+object SlickUtils {
+ val WARN_IF_LONGER_MS = 1000L
+}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
index a28dc58a..e9e03e88 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala
@@ -213,7 +213,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
assert(batchIds.isEmpty)
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
val offsets0 = om.getOffsets("table1", infoDate.minusDays(2))
val offsets1 = om.getOffsets("table1", infoDate.minusDays(1))
val offsets2 = om.getOffsets("table1", infoDate).map(_.asInstanceOf[CommittedOffset])
@@ -250,7 +250,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
assert(exitCode1 == 0)
// Adding an uncommitted offset for 2021-02-17
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
om.startWriteOffsets("table1", infoDate.minusDays(1), OffsetType.IntegralType)
Thread.sleep(10)
@@ -697,7 +697,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
}
def testOffsetOnlyDealWithUncommittedOffsetsWithNoPath(metastoreFormat: String): Assertion = {
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
om.startWriteOffsets("table1", infoDate, OffsetType.IntegralType)
@@ -736,7 +736,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
}
def testOffsetOnlyDealWithUncommittedOffsetsWithNoData(metastoreFormat: String): Assertion = {
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
om.startWriteOffsets("table1", infoDate, OffsetType.IntegralType)
@@ -788,12 +788,12 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
}
def testOffsetOnlyDealWithUncommittedOffsetsWithData(metastoreFormat: String): Assertion = {
- val om1 = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om1 = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
om1.startWriteOffsets("table1", infoDate, OffsetType.IntegralType)
Thread.sleep(10)
- val om2 = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om2 = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
om2.startWriteOffsets("table1", infoDate, OffsetType.IntegralType)
Thread.sleep(10)
@@ -866,7 +866,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
}
def testOffsetOnlyFailWhenInputTableDoestHaveOffsetField(metastoreFormat: String): Assertion = {
- val om1 = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om1 = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
om1.startWriteOffsets("table1", infoDate, OffsetType.IntegralType)
Thread.sleep(10)
@@ -1144,7 +1144,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
compareText(actualTable1After, expectedWithTimestamp2)
compareText(actualTable2After, expectedWithTimestamp2)
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)).map(_.asInstanceOf[CommittedOffset])
assert(offsets1.isEmpty)
@@ -1198,7 +1198,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
compareText(actualTable1_2, expected)
compareText(actualTable2_2, expected)
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)).map(_.asInstanceOf[CommittedOffset])
assert(offsets1.isEmpty)
@@ -1275,7 +1275,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
compareText(actualTable1After, expectedWithInfoDateAll)
compareText(actualTable2After, expectedWithInfoDateAll)
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset])
assert(offsets.length == 1)
@@ -1330,7 +1330,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
assert(batchIds.length == 2)
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset])
assert(offsets.length == 1)
@@ -1389,7 +1389,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
assert(batchIds.length == 2)
- val om = new OffsetManagerJdbc(pramenDb.db, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset])
assert(offsets.length == 1)
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala
index fb475bf2..2e604bb8 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala
@@ -76,7 +76,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with
}
"throw an exception on connection errors when querying a key" in {
- val metadata = new MetadataManagerJdbc(null)
+ val metadata = new MetadataManagerJdbc(null, pramenDb.slickProfile)
val ex = intercept[RuntimeException] {
metadata.getMetadataFromStorage("table1", infoDate, "key1")
@@ -86,7 +86,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with
}
"throw an exception on connection errors when querying a table" in {
- val metadata = new MetadataManagerJdbc(null)
+ val metadata = new MetadataManagerJdbc(null, pramenDb.slickProfile)
val ex = intercept[RuntimeException] {
metadata.getMetadataFromStorage("table1", infoDate)
@@ -111,7 +111,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with
}
"throw an exception on connection errors" in {
- val metadata = new MetadataManagerJdbc(null)
+ val metadata = new MetadataManagerJdbc(null, pramenDb.slickProfile)
val v = MetadataValue("value1", exampleInstant)
val ex = intercept[RuntimeException] {
@@ -159,7 +159,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with
}
"throw an exception on connection errors when deleting a key" in {
- val metadata = new MetadataManagerJdbc(null)
+ val metadata = new MetadataManagerJdbc(null, pramenDb.slickProfile)
val ex = intercept[RuntimeException] {
metadata.deleteMetadataFromStorage("table1", infoDate, "key1")
@@ -169,7 +169,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with
}
"throw an exception on connection errors when deleting metadata from a partition" in {
- val metadata = new MetadataManagerJdbc(null)
+ val metadata = new MetadataManagerJdbc(null, pramenDb.slickProfile)
val ex = intercept[RuntimeException] {
metadata.deleteMetadataFromStorage("table1", infoDate)
@@ -180,6 +180,6 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with
}
def getMetadataManager: MetadataManagerJdbc = {
- new MetadataManagerJdbc(pramenDb.slickDb)
+ new MetadataManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile)
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala
index 906eae26..52787847 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala
@@ -46,7 +46,7 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture
}
def getBookkeeper(batchId: Long): Bookkeeper = {
- new BookkeeperJdbc(pramenDb.slickDb, pramenDb.profile, batchId)
+ BookkeeperJdbc.fromPramenDb(pramenDb, batchId)
}
"BookkeeperJdbc" when {
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala
index 7dc0e0c8..4b759cb3 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala
@@ -48,7 +48,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B
}
def getOffsetManager: OffsetManager = {
- new OffsetManagerCached(new OffsetManagerJdbc(pramenDb.slickDb, 123L))
+ new OffsetManagerCached(new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L))
}
"getOffsets" should {
@@ -414,7 +414,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B
"getMinMaxOffsets" should {
"be able to sort Kafka offsets properly" in {
- val om = new OffsetManagerJdbc(pramenDb.slickDb, 123L)
+ val om = new OffsetManagerJdbc(pramenDb.slickDb, pramenDb.slickProfile, pramenDb.offsetTable, 123L)
val offsets = Array(
OffsetRecordFactory.getOffsetRecord(dataType = "kafka", minOffset = """{"0":100,"1":120}""", maxOffset = """{"0":101,"1":121}"""),
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala
index a74510c5..6cee9284 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala
@@ -78,6 +78,6 @@ class JournalJdbcSuite extends AnyWordSpec with SparkTestBase with BeforeAndAfte
}
private def getJournal: Journal = {
- new JournalJdbc(pramenDb.slickDb)
+ new JournalJdbc(pramenDb.slickDb, pramenDb.slickProfile)
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala
index a23903e9..1d459108 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala
@@ -83,10 +83,10 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor
}
"lock pramen should constantly update lock ticket" in {
- val lock1 = new TokenLockJdbc("token1", pramenDb.slickDb) {
+ val lock1 = new TokenLockJdbc("token1", pramenDb.slickDb, pramenDb.slickProfile) {
override val tokenExpiresSeconds = 2L
}
- val lock2 = new TokenLockJdbc("token1", pramenDb.slickDb)
+ val lock2 = new TokenLockJdbc("token1", pramenDb.slickDb, pramenDb.slickProfile)
assert(lock1.tryAcquire())
try {
@@ -101,6 +101,6 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor
}
private def getLock(token: String): TokenLock = {
- new TokenLockJdbc(token, pramenDb.slickDb)
+ new TokenLockJdbc(token, pramenDb.slickDb, pramenDb.slickProfile)
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala
new file mode 100644
index 00000000..e7e72758
--- /dev/null
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockSQLiteSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2022 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.pramen.core.tests.lock
+
+import org.apache.commons.io.FileUtils
+import org.scalatest.wordspec.AnyWordSpec
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import za.co.absa.pramen.api.lock.TokenLock
+import za.co.absa.pramen.core.fixtures.TempDirFixture
+import za.co.absa.pramen.core.lock.TokenLockJdbc
+import za.co.absa.pramen.core.rdb.PramenDb
+import za.co.absa.pramen.core.reader.model.JdbcConfig
+
+import java.io.File
+
+class TokenLockSQLiteSuite extends AnyWordSpec with BeforeAndAfter with BeforeAndAfterAll with TempDirFixture {
+ private var jdbcConfig: JdbcConfig = _
+ private var tempDir: String = _
+ private var pramenDb: PramenDb = _
+
+ before {
+ if (pramenDb != null) pramenDb.close()
+ val dbFile = new File(tempDir, "pramen.sqlite")
+ if (dbFile.exists()) {
+ dbFile.delete()
+ }
+ pramenDb = PramenDb(jdbcConfig)
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ tempDir = createTempDir("pramen_sqlite")
+ jdbcConfig = JdbcConfig("org.sqlite.JDBC", Some(s"jdbc:sqlite:$tempDir/pramen.sqlite"))
+ }
+
+ override def afterAll(): Unit = {
+ if (pramenDb != null) pramenDb.close()
+ FileUtils.deleteDirectory(new File(tempDir))
+
+ super.afterAll()
+ }
+
+ "Token lock" should {
+ "be able to acquire and release locks" in {
+ val lock1 = getLock("token1")
+
+ assert(lock1.tryAcquire())
+ assert(!lock1.tryAcquire())
+
+ val lock2 = getLock("token1")
+ assert(!lock2.tryAcquire())
+
+ lock1.release()
+
+ assert(lock2.tryAcquire())
+ assert(!lock2.tryAcquire())
+
+ lock2.release()
+ }
+
+ "multiple token locks should not affect each other" in {
+ val lock1 = getLock("token1")
+ val lock2 = getLock("token2")
+
+ assert(lock1.tryAcquire())
+ assert(lock2.tryAcquire())
+
+ assert(!lock1.tryAcquire())
+ assert(!lock2.tryAcquire())
+
+ lock1.release()
+
+ assert(lock1.tryAcquire())
+ assert(!lock2.tryAcquire())
+
+ lock1.release()
+ lock2.release()
+ }
+
+ "lock pramen should constantly update lock ticket" in {
+ val lock1 = new TokenLockJdbc("token1", pramenDb.slickDb, pramenDb.slickProfile) {
+ override val tokenExpiresSeconds = 2L
+ }
+ val lock2 = new TokenLockJdbc("token1", pramenDb.slickDb, pramenDb.slickProfile)
+ assert(lock1.tryAcquire())
+
+ try {
+ Thread.sleep(3000) // Wait past the 2-second expiry
+ assert(!lock2.tryAcquire())
+ } finally {
+ lock1.release()
+ }
+ }
+ }
+
+ private def getLock(token: String): TokenLock = {
+ new TokenLockJdbc(token, pramenDb.slickDb, pramenDb.slickProfile)
+ }
+}