From c444aaf5a280bd92c4b6b2ed28ec328502331784 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 26 Jan 2026 14:00:54 +0100 Subject: [PATCH 1/6] #699 Add retries for Bookkeeping database, and make sure only 1 connection is active. --- .../core/bookkeeper/BookkeeperJdbc.scala | 2 + .../core/bookkeeper/OffsetManagerJdbc.scala | 5 + .../pramen/core/journal/JournalJdbc.scala | 1 + .../absa/pramen/core/lock/TokenLockJdbc.scala | 2 + .../core/metadata/MetadataManagerJdbc.scala | 3 + .../za/co/absa/pramen/core/rdb/PramenDb.scala | 36 +-- .../za/co/absa/pramen/core/rdb/RdbJdbc.scala | 22 +- .../core/reader/JdbcUrlSelectorImpl.scala | 8 +- .../pramen/core/utils/AlgorithmUtils.scala | 13 +- .../pramen/core/utils/FutureImplicits.scala | 5 +- .../absa/pramen/core/utils/SlickUtils.scala | 30 +++ .../absa/pramen/core/utils/UsingUtils.scala | 62 +++++ .../IncrementalPipelineJdbcLongSuite.scala | 7 +- .../IncrementalPipelineLongFixture.scala | 7 +- .../metadata/MetadataManagerJdbcSuite.scala | 7 +- .../pramen/core/mocks/AutoCloseableSpy.scala | 40 +++ .../bookkeeper/BookkeeperJdbcSuite.scala | 7 +- .../tests/bookkeeper/BookkeeperSuite.scala | 7 +- .../bookkeeper/OffsetManagerJdbcSuite.scala | 7 +- .../core/tests/journal/JournalJdbcSuite.scala | 7 +- .../core/tests/lock/TokenLockJdbcSuite.scala | 7 +- .../core/tests/utils/UsingUtilsSuite.scala | 229 ++++++++++++++++++ 22 files changed, 472 insertions(+), 42 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/AutoCloseableSpy.scala create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index 48a910192..cada2d40e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -146,6 +146,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends val record = BookkeepingRecord(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, recordsAppended, jobStarted, jobFinished, Option(batchId)) try { + SlickUtils.ensureDbConnected(db) db.run( BookkeepingRecords.records += record ).execute() @@ -230,6 +231,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends val infoDateStr = infoDate.toString try { + SlickUtils.ensureDbConnected(db) db.run( SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete ).execute() diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index 4cbbace41..685b72f4a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -73,6 +73,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { val record = OffsetRecord(table, infoDate.toString, offsetType.dataTypeString, "", "", batchId, createdAt.toEpochMilli, None) + SlickUtils.ensureDbConnected(db) db.run( OffsetRecords.records += record ).execute() @@ -83,6 +84,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { override def commitOffsets(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = { val committedAt = Instant.now().toEpochMilli + SlickUtils.ensureDbConnected(db) db.run( OffsetRecords.records .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli) @@ -98,6 +100,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { val committedAt = Instant.now().toEpochMilli + SlickUtils.ensureDbConnected(db) db.run( OffsetRecords.records .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli) @@ -121,6 +124,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAtMilli)) } + SlickUtils.ensureDbConnected(db) db.run( OffsetRecords.records ++= records ).execute() @@ -137,6 +141,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { } override def rollbackOffsets(request: DataOffsetRequest): Unit = { + SlickUtils.ensureDbConnected(db) db.run( OffsetRecords.records .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala index 80048d702..ea0b562ca 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala @@ -60,6 +60,7 @@ class JournalJdbc(db: Database) extends Journal { Option(entry.batchId)) try { + SlickUtils.ensureDbConnected(db) db.run( JournalTasks.journalTasks += journalTask ).execute() diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala index 5ec3b7d9b..e125993d0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockJdbc.scala @@ -36,6 +36,8 @@ class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) { /** Invoked from a synchronized block. */ override def tryAcquireGuardLock(retries: Int = 3, thisTry: Int = 0): Boolean = { + SlickUtils.ensureDbConnected(db) + def tryAcquireExistingTicket(): Boolean = { val ticket = getTicket diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala index 340e68909..a7dc90c22 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbc.scala @@ -57,6 +57,7 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) { val record = MetadataRecord(tableName, infoDate.toString, key, metadata.value, metadata.lastUpdated.getEpochSecond) try { + SlickUtils.ensureDbConnected(db) db.run( MetadataRecords.records .filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString && r.key === key) @@ -76,6 +77,7 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) { .filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString && r.key === key) try { + SlickUtils.ensureDbConnected(db) db.run(query.delete).execute() } catch { case NonFatal(ex) => throw new RuntimeException(s"Unable to delete from the metadata table.", ex) @@ -87,6 +89,7 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) { .filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString) try { + SlickUtils.ensureDbConnected(db) db.run(query.delete).execute() } catch { case NonFatal(ex) => throw new RuntimeException(s"Unable to delete from the metadata table.", ex) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index ffdd4edeb..155b3fa4f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.rdb import org.slf4j.LoggerFactory import slick.jdbc.JdbcBackend.Database -import slick.jdbc.JdbcProfile +import slick.jdbc.{JdbcBackend, JdbcProfile} import slick.util.AsyncExecutor import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingRecords, MetadataRecords, OffsetRecords, SchemaRecords} import za.co.absa.pramen.core.journal.model.JournalTasks @@ -26,6 +26,7 @@ import za.co.absa.pramen.core.lock.model.LockTickets import za.co.absa.pramen.core.rdb.PramenDb.MODEL_VERSION import za.co.absa.pramen.core.reader.JdbcUrlSelector import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.{AlgorithmUtils, UsingUtils} import java.sql.Connection import scala.util.Try @@ -43,18 +44,18 @@ class PramenDb(val jdbcConfig: JdbcConfig, private val log = LoggerFactory.getLogger(this.getClass) - val rdb: Rdb = new RdbJdbc(jdbcConnection) - def setupDatabase(): Unit = { // Explicitly set auto-commit to true, overriding any user JDBC settings or PostgreSQL defaults Try(jdbcConnection.setAutoCommit(true)).recover { case NonFatal(e) => log.warn(s"Unable to set autoCommit=true for the bookkeeping database that uses the driver: ${jdbcConfig.driver}.") } - val dbVersion = rdb.getVersion() - if (dbVersion < MODEL_VERSION) { - initDatabase(dbVersion) - rdb.setVersion(MODEL_VERSION) + UsingUtils.using(new RdbJdbc(jdbcConnection)) { rdb => + val dbVersion = rdb.getVersion() + if (dbVersion < MODEL_VERSION) { + initDatabase(dbVersion) + rdb.setVersion(MODEL_VERSION) + } } } @@ -130,14 +131,18 @@ class PramenDb(val jdbcConfig: JdbcConfig, override def close(): Unit = { - jdbcConnection.close() + if (!jdbcConnection.isClosed) jdbcConnection.close() slickDb.close() } } object PramenDb { + private val log = LoggerFactory.getLogger(this.getClass) + val MODEL_VERSION = 9 val DEFAULT_RETRIES = 3 + val BACKOFF_MIN_MS = 1000 + val BACKOFF_MAX_MS = 20000 def apply(jdbcConfig: JdbcConfig): PramenDb = { val (url, conn, database, profile) = openDb(jdbcConfig) @@ -159,20 +164,23 @@ object PramenDb { } } - private def openDb(jdbcConfig: JdbcConfig): (String, Connection, Database, JdbcProfile) = { + def openDb(jdbcConfig: JdbcConfig): (String, Connection, Database, JdbcProfile) = { + val numberOfAttempts = jdbcConfig.retries.getOrElse(DEFAULT_RETRIES) val selector = JdbcUrlSelector(jdbcConfig) - val (conn, url) = selector.getWorkingConnection(DEFAULT_RETRIES) + val (conn, url) = selector.getWorkingConnection(numberOfAttempts) val prop = selector.getProperties val slickProfile = getProfile(jdbcConfig.driver) - val database = jdbcConfig.user match { - case Some(user) => Database.forURL(url = url, driver = jdbcConfig.driver, user = user, password = jdbcConfig.password.getOrElse(""), prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) - case None => Database.forURL(url = url, driver = jdbcConfig.driver, prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) + var database: JdbcBackend.DatabaseDef = null + AlgorithmUtils.actionWithRetry(numberOfAttempts, log, BACKOFF_MIN_MS, BACKOFF_MAX_MS) { + database = jdbcConfig.user match { + case Some(user) => Database.forURL(url = url, driver = jdbcConfig.driver, user = user, password = jdbcConfig.password.getOrElse(""), prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) + case None => Database.forURL(url = url, driver = jdbcConfig.driver, prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) + } } (url, conn, database, slickProfile) } - } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala index cc8ee676b..4d816a85d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala @@ -17,16 +17,15 @@ package za.co.absa.pramen.core.rdb import org.slf4j.LoggerFactory +import za.co.absa.pramen.core.rdb.PramenDb.DEFAULT_RETRIES import za.co.absa.pramen.core.rdb.RdbJdbc.dbVersionTableName +import za.co.absa.pramen.core.reader.JdbcUrlSelector +import za.co.absa.pramen.core.reader.model.JdbcConfig import java.sql.{Connection, SQLException} import scala.util.control.NonFatal -object RdbJdbc { - val dbVersionTableName = "db_version" -} - -class RdbJdbc(connection: Connection) extends Rdb{ +class RdbJdbc(connection: Connection) extends AutoCloseable with Rdb{ private val log = LoggerFactory.getLogger(this.getClass) override def getVersion(): Int = { @@ -80,4 +79,17 @@ class RdbJdbc(connection: Connection) extends Rdb{ dbVersion } + override def close(): Unit = if (connection.isClosed) connection.close() +} + +object RdbJdbc { + val dbVersionTableName = "db_version" + + def apply(jdbcConfig: JdbcConfig): RdbJdbc = { + val numberOfAttempts = jdbcConfig.retries.getOrElse(DEFAULT_RETRIES) + val selector = JdbcUrlSelector(jdbcConfig) + val (conn, _) = selector.getWorkingConnection(numberOfAttempts) + + new RdbJdbc(conn) + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala index 5ac01aa26..c9630b5d4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala @@ -26,6 +26,9 @@ import java.util.Properties import scala.util.{Failure, Random, Success, Try} class JdbcUrlSelectorImpl(val jdbcConfig: JdbcConfig) extends JdbcUrlSelector{ + val BACKOFF_MIN_S = 1 + val BACKOFF_MAX_S = 10 + private val log = LoggerFactory.getLogger(this.getClass) private val allUrls = (jdbcConfig.primaryUrl ++ jdbcConfig.fallbackUrls).toSeq private val numberOfUrls = allUrls.size @@ -102,7 +105,6 @@ class JdbcUrlSelectorImpl(val jdbcConfig: JdbcConfig) extends JdbcUrlSelector{ @throws[SQLException] def getWorkingConnection(retriesLeft: Int): (Connection, String) = { val currentUrl = getUrl - Try { JdbcNativeUtils.getJdbcConnection(jdbcConfig, currentUrl) } match { @@ -110,7 +112,9 @@ class JdbcUrlSelectorImpl(val jdbcConfig: JdbcConfig) extends JdbcUrlSelector{ case Failure(ex) => if (retriesLeft > 1) { val newUrl = getNextUrl - log.error(s"JDBC connection error for $currentUrl. Retries left: ${retriesLeft - 1}. Retrying...", ex) + val backoffS = Random.nextInt(BACKOFF_MIN_S - BACKOFF_MAX_S) + BACKOFF_MIN_S + log.error(s"JDBC connection error for $currentUrl. Retries left: ${retriesLeft - 1}. Retrying... in $backoffS seconds", ex) + Thread.sleep(backoffS * 1000) log.info(s"Trying URL: $newUrl") getWorkingConnection(retriesLeft - 1) } else { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala index cf2ea6bee..96003e917 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala @@ -21,6 +21,7 @@ import org.slf4j.Logger import java.time.{Duration, Instant} import scala.annotation.tailrec import scala.collection.mutable +import scala.util.Random object AlgorithmUtils { /** Finds which strings are encountered multiple times (case insensitive). */ @@ -70,7 +71,7 @@ object AlgorithmUtils { } @tailrec - final def actionWithRetry(attempts: Int, log: Logger)(action: => Unit): Unit = { + final def actionWithRetry(attempts: Int, log: Logger, backoffMinMs: Int = 0, backoffMaxMs: Int = 0)(action: => Unit): Unit = { def getErrorMessage(ex: Throwable): String = { if (ex.getCause == null) { ex.getMessage @@ -88,7 +89,15 @@ object AlgorithmUtils { if (attemptsLeft < 1) { throw ex } else { - log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...") + if (backoffMaxMs > backoffMinMs && backoffMaxMs > 0) { + val backoffMs = Random.nextInt(backoffMaxMs - backoffMinMs) + backoffMinMs + val backoffS = backoffMs / 1000 + log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconnds...") + Thread.sleep(backoffMs) + } else { + log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...") + } + actionWithRetry(attemptsLeft, log)(action) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FutureImplicits.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FutureImplicits.scala index a74b1b657..baa9b14f5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FutureImplicits.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FutureImplicits.scala @@ -17,12 +17,11 @@ package za.co.absa.pramen.core.utils import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.{Await, Future} object FutureImplicits { - private val executionTimeout = Duration(300, TimeUnit.SECONDS) + val executionTimeout: FiniteDuration = Duration(300, TimeUnit.SECONDS) implicit class FutureExecutor[T](future: Future[T]) { def execute(): T = Await.result(future, executionTimeout) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala index 74e8701a5..da5a53a2f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory import slick.dbio.Effect import slick.jdbc.PostgresProfile.api._ import slick.sql.SqlAction +import za.co.absa.pramen.core.rdb.PramenDb import java.time.{Duration, Instant} import scala.util.control.NonFatal @@ -43,11 +44,14 @@ object SlickUtils { * @return The result of the query */ def executeQuery[E, U](db: Database, query: Query[E, U, Seq]): Seq[U] = { + ensureDbConnected(db) + val action = query.result val sql = action.statements.mkString("; ") try { val start = Instant.now + ensureDbConnected(db) val result = db.run(action).execute() val finish = Instant.now @@ -75,10 +79,13 @@ object SlickUtils { * @return The result of the action */ def executeAction[R, E <: Effect](db: Database, action: SqlAction[R, NoStream, E]): R = { + ensureDbConnected(db) + val sql = action.statements.mkString("; ") try { val start = Instant.now + ensureDbConnected(db) val result = db.run(action).execute() val finish = Instant.now @@ -106,6 +113,8 @@ object SlickUtils { * @return The result of the query */ def executeCount(db: Database, rep: Rep[Int]): Int = { + ensureDbConnected(db) + val action = rep.result val sql = action.statements.mkString("; ") @@ -138,6 +147,8 @@ object SlickUtils { * @return The result of the query */ def executeMaxString(db: Database, rep: Rep[Option[String]]): Option[String] = { + ensureDbConnected(db) + val action = rep.result val sql = action.statements.mkString("; ") @@ -158,4 +169,23 @@ object SlickUtils { case NonFatal(ex) => throw new RuntimeException(s"Error executing an SQL query: $sql", ex) } } + + /** + * Ensures that the database connection is valid and ready for use. + * If the connection is not valid, an exception is thrown. + * The method retries the connection check according to the retry logic. + * + * @param db The database instance to verify the connection for. + */ + def ensureDbConnected(db: Database): Unit = { + val check = SimpleDBIO { ctx => + val conn = ctx.connection + if (!conn.isValid(FutureImplicits.executionTimeout.toSeconds.toInt)) + throw new RuntimeException("Connection not valid") + } + + AlgorithmUtils.actionWithRetry(PramenDb.DEFAULT_RETRIES, log, PramenDb.BACKOFF_MIN_MS, PramenDb.BACKOFF_MAX_MS) { + db.run(check).execute() + } + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala new file mode 100644 index 000000000..2e37c8a56 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.utils + +import scala.util.control.NonFatal + +object UsingUtils { + /** + * Executes the given action with a resource that implements the AutoCloseable interface, ensuring + * proper closure of the resource. Any exception that occurs during the action or resource closure + * is handled appropriately, with suppressed exceptions added where relevant. Null resources are not supported. + * + * @param resource a lazily evaluated resource that implements AutoCloseable + * @param action a function to be executed using the provided resource + * @tparam T the type of the resource, which must extend AutoCloseable + * @throws Throwable if either the action or resource closure fails. If both fail, the action's exception + * is thrown with the closure's exception added as suppressed + */ + def using[T <: AutoCloseable,U](resource: => T)(action: T => U): U = { + var thrownException: Option[Throwable] = None + var suppressedException: Option[Throwable] = None + val openedResource = resource + + val result = try { + Option(action(openedResource)) + } catch { + case NonFatal(ex) => + thrownException = Option(ex) + None + } finally + if (openedResource != null) { + try + openedResource.close() + catch { + case NonFatal(ex) => suppressedException = Option(ex) + } + } + + (thrownException, suppressedException) match { + case (Some(thrown), Some(suppressed)) => + thrown.addSuppressed(suppressed) + throw thrown + case (Some(thrown), None) => throw thrown + case (None, Some(suppressed)) => throw suppressed + case (None, None) => result.getOrElse(throw new IllegalArgumentException("Action returned null")) + } + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala index 0435ddc6c..1c9d405f2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala @@ -23,7 +23,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture} -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.runner.AppRunner @@ -50,7 +50,10 @@ class IncrementalPipelineJdbcLongSuite extends AnyWordSpec private val INFO_DATE_COLUMN = "pramen_info_date" before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() RdbExampleTable.IncrementalTable.initTable(getConnection) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 0c47d24ba..447f23432 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -28,7 +28,7 @@ import za.co.absa.pramen.api.offset.OffsetType import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.bookkeeper.OffsetManagerJdbc import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture} -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.runner.AppRunner @@ -49,7 +49,10 @@ class IncrementalPipelineLongFixture extends AnyWordSpec lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala index 213273cec..15bfbd123 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala @@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.api.MetadataValue import za.co.absa.pramen.core.fixtures.RelationalDbFixture -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig import java.time.{LocalDate, ZoneOffset} @@ -32,7 +32,10 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with private val exampleInstant = infoDate.atStartOfDay().toInstant(ZoneOffset.UTC) before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/AutoCloseableSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/AutoCloseableSpy.scala new file mode 100644 index 000000000..57e03a6ab --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/AutoCloseableSpy.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.mocks + +class AutoCloseableSpy(failCreate: Boolean = false, failAction: Boolean = false, failClose: Boolean = false) extends AutoCloseable { + var actionCallCount: Int = 0 + var closeCallCount: Int = 0 + + if (failCreate) { + throw new RuntimeException("Failed to create resource") + } + + def dummyAction(): Unit = { + actionCallCount += 1 + if (failAction) { + throw new RuntimeException("Failed during action") + } + } + + override def close(): Unit = { + closeCallCount += 1 + if (failClose) { + throw new RuntimeException("Failed to close resource") + } + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index f06672d97..a377a4526 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.tests.bookkeeper import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperJdbc} import za.co.absa.pramen.core.fixtures.RelationalDbFixture -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { @@ -28,7 +28,10 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala index 41adb3486..33f87c5fd 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala @@ -26,7 +26,7 @@ import za.co.absa.pramen.core.fixtures.{MongoDbFixture, RelationalDbFixture, Tem import za.co.absa.pramen.core.journal._ import za.co.absa.pramen.core.lock.{TokenLockFactoryAllow, TokenLockFactoryHadoopPath, TokenLockFactoryJdbc, TokenLockFactoryMongoDb} import za.co.absa.pramen.core.metadata.{MetadataManagerJdbc, MetadataManagerNull} -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.{BookkeepingConfigFactory, RuntimeConfigFactory} @@ -46,7 +46,10 @@ class BookkeeperSuite extends AnyWordSpec lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() if (db != null) { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala index 0ec05cca5..3e31b37cd 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.offset.DataOffset.{CommittedOffset, UncommittedOffs import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue} import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerCached, OffsetManagerJdbc} import za.co.absa.pramen.core.fixtures.RelationalDbFixture -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig import java.time.{Instant, LocalDate} @@ -34,7 +34,10 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B private val infoDate = LocalDate.of(2023, 8, 25) before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala index a2333b5d1..9ae6be38b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.journal.{Journal, JournalJdbc} -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig class JournalJdbcSuite extends AnyWordSpec with SparkTestBase with BeforeAndAfter with BeforeAndAfterAll with RelationalDbFixture { @@ -31,7 +31,10 @@ class JournalJdbcSuite extends AnyWordSpec with SparkTestBase with BeforeAndAfte lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala index 62deb8c76..2b7fa8cb5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala @@ -22,7 +22,7 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.api.lock.TokenLock import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.lock.TokenLockJdbc -import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig import scala.concurrent.duration._ @@ -32,7 +32,10 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + val rdb = RdbJdbc(jdbcConfig) + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + rdb.close() + pramenDb.setupDatabase() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala new file mode 100644 index 000000000..9c76a631a --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala @@ -0,0 +1,229 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.tests.utils + +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.core.mocks.AutoCloseableSpy +import za.co.absa.pramen.core.utils.UsingUtils + +class UsingUtilsSuite extends AnyWordSpec { + "using with a single resource" should { + "properly close the resource" in { + var resource: AutoCloseableSpy = null + + UsingUtils.using(new AutoCloseableSpy()) { res => + resource = res + res.dummyAction() + } + + assert(resource.actionCallCount == 1) + assert(resource.closeCallCount == 1) + } + + "close resource even if exception occurs" in { + var resource: AutoCloseableSpy = null + var exceptionThrown = false + + try { + UsingUtils.using(new AutoCloseableSpy(failAction = true)) { res => + resource = res + res.dummyAction() + } + } catch { + case ex: Throwable => + assert(ex.getMessage.contains("Failed during action")) + exceptionThrown = true + } + + assert(exceptionThrown) + assert(resource.actionCallCount == 1) + assert(resource.closeCallCount == 1) + } + + "handle exceptions when a resource is created" in { + var exceptionThrown = false + var resource: AutoCloseableSpy = null + + try { + UsingUtils.using(new AutoCloseableSpy(failCreate = true)) { res => + resource = res + res.dummyAction() + } + } catch { + case ex: Throwable => + exceptionThrown = true + assert(ex.getMessage.contains("Failed to create resource")) + } + + assert(exceptionThrown) + assert(resource == null) + } + + "handle exceptions when a resource is closed" in { + var resource: AutoCloseableSpy = null + var exceptionThrown = false + + try { + UsingUtils.using(new AutoCloseableSpy(failClose = true)) { res => + resource = res + res.dummyAction() + } + } catch { + case ex: Throwable => + exceptionThrown = true + assert(ex.getMessage.contains("Failed to close resource")) + } + + assert(exceptionThrown) + assert(resource.actionCallCount == 1) + assert(resource.closeCallCount == 1) + } + + "handle exceptions on both action and close" in { + var resource: AutoCloseableSpy = null + var exceptionThrown = false + + try { + UsingUtils.using(new AutoCloseableSpy(failClose = true)) { res => + resource = res + res.dummyAction() + throw new RuntimeException("Failed during action") + } + } catch { + case ex: Throwable => + exceptionThrown = true + assert(ex.getMessage.contains("Failed during action")) + val suppressed = ex.getSuppressed + assert(suppressed.length == 1) + assert(suppressed(0).getMessage.contains("Failed to close resource")) + } + + assert(exceptionThrown) + assert(resource.actionCallCount == 1) + assert(resource.closeCallCount == 1) + } + } + + "using with two resources" should { + "properly close both resources" in { + var resource1: AutoCloseableSpy = null + var resource2: AutoCloseableSpy = null + + val result = UsingUtils.using(new AutoCloseableSpy()) { res1 => + resource1 = res1 + UsingUtils.using(new AutoCloseableSpy()) { res2 => + resource2 = res2 + res1.dummyAction() + res2.dummyAction() + 100 + } + } + + assert(result == 100) + assert(resource1.actionCallCount == 1) + assert(resource1.closeCallCount == 1) + assert(resource2.actionCallCount == 1) + assert(resource2.closeCallCount == 1) + } + + "properly close both resources when an inner one throws an exception during action and close" in { + var resource1: AutoCloseableSpy = null + var resource2: AutoCloseableSpy = null + var exceptionThrown = false + + try { + UsingUtils.using(new AutoCloseableSpy()) { res1 => + resource1 = res1 + UsingUtils.using(new AutoCloseableSpy(failAction = true, failClose = true)) { res2 => + resource2 = res2 + res1.dummyAction() + res2.dummyAction() + } + } + } catch { + case ex: Throwable => + exceptionThrown = true + assert(ex.getMessage.contains("Failed during action")) + val suppressed = ex.getSuppressed + assert(suppressed.length == 1) + assert(suppressed(0).getMessage.contains("Failed to close resource")) + } + + assert(exceptionThrown) + assert(resource1.actionCallCount == 1) + assert(resource1.closeCallCount == 1) + assert(resource2.actionCallCount == 1) + assert(resource2.closeCallCount == 1) + } + + "properly close both resources when an outer one throws an exception during action and close" in { + var resource1: AutoCloseableSpy = null + var resource2: AutoCloseableSpy = null + var exceptionThrown = false + + try { + UsingUtils.using(new AutoCloseableSpy(failAction = true, failClose = true)) { res1 => + resource1 = res1 + UsingUtils.using(new AutoCloseableSpy()) { res2 => + resource2 = res2 + res1.dummyAction() + res2.dummyAction() + } + } + } catch { + case ex: Throwable => + exceptionThrown = true + assert(ex.getMessage.contains("Failed during action")) + val suppressed = ex.getSuppressed + assert(suppressed.length == 1) + assert(suppressed(0).getMessage.contains("Failed to close resource")) + } + + assert(exceptionThrown) + assert(resource1.actionCallCount == 1) + assert(resource1.closeCallCount == 1) + assert(resource2.actionCallCount == 0) + assert(resource2.closeCallCount == 1) + } + + "properly close the outer resource when the inner one fails on create" in { + var resource1: AutoCloseableSpy = null + var resource2: AutoCloseableSpy = null + var exceptionThrown = false + + try { + UsingUtils.using(new AutoCloseableSpy()) { res1 => + resource1 = res1 + UsingUtils.using(new AutoCloseableSpy(failCreate = true)) { res2 => + resource2 = res2 + res1.dummyAction() + res2.dummyAction() + } + } + } catch { + case ex: Throwable => + exceptionThrown = true + assert(ex.getMessage.contains("Failed to create resource")) + } + + assert(exceptionThrown) + assert(resource1.actionCallCount == 0) + assert(resource1.closeCallCount == 1) + assert(resource2 == null) + } + } +} From a3b8061b838e70598bf886ea8d95d4a122e24f9b Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 26 Jan 2026 14:49:18 +0100 Subject: [PATCH 2/6] #699 Fix PR suggestions. Good catch, @coderabbitai! --- .../src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala | 2 +- .../za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala | 2 +- .../scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala | 2 +- .../main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala | 1 - .../main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala | 4 ++++ 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala index 4d816a85d..356d34702 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala @@ -79,7 +79,7 @@ class RdbJdbc(connection: Connection) extends AutoCloseable with Rdb{ dbVersion } - override def close(): Unit = if (connection.isClosed) connection.close() + override def close(): Unit = if (!connection.isClosed) connection.close() } object RdbJdbc { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala index c9630b5d4..f80b2e073 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala @@ -112,7 +112,7 @@ class JdbcUrlSelectorImpl(val jdbcConfig: JdbcConfig) extends JdbcUrlSelector{ case Failure(ex) => if (retriesLeft > 1) { val newUrl = getNextUrl - val backoffS = Random.nextInt(BACKOFF_MIN_S - BACKOFF_MAX_S) + BACKOFF_MIN_S + val backoffS = Random.nextInt(BACKOFF_MAX_S - BACKOFF_MIN_S) + BACKOFF_MIN_S log.error(s"JDBC connection error for $currentUrl. Retries left: ${retriesLeft - 1}. Retrying... in $backoffS seconds", ex) Thread.sleep(backoffS * 1000) log.info(s"Trying URL: $newUrl") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala index 96003e917..fd717dca6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala @@ -98,7 +98,7 @@ object AlgorithmUtils { log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...") } - actionWithRetry(attemptsLeft, log)(action) + actionWithRetry(attemptsLeft, log, backoffMinMs, backoffMaxMs)(action) } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala index da5a53a2f..2f404bbf5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala @@ -51,7 +51,6 @@ object SlickUtils { try { val start = Instant.now - ensureDbConnected(db) val result = db.run(action).execute() val finish = Instant.now diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala index 2e37c8a56..09494e293 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/UsingUtils.scala @@ -35,6 +35,10 @@ object UsingUtils { var suppressedException: Option[Throwable] = None val openedResource = resource + if (openedResource == null) { + throw new IllegalArgumentException("Resource must not be null") + } + val result = try { Option(action(openedResource)) } catch { From b6a9a9ca8b2e2da6397d9c013a933463d7118077 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 26 Jan 2026 15:29:27 +0100 Subject: [PATCH 3/6] #699 Fix Paramdn DB creation logic in tests. --- .../pramen/core/bookkeeper/Bookkeeper.scala | 1 - .../za/co/absa/pramen/core/rdb/PramenDb.scala | 31 +++++++++++++------ .../za/co/absa/pramen/core/rdb/RdbJdbc.scala | 2 +- .../pramen/core/utils/AlgorithmUtils.scala | 2 +- .../IncrementalPipelineJdbcLongSuite.scala | 2 +- .../IncrementalPipelineLongFixture.scala | 3 +- .../metadata/MetadataManagerJdbcSuite.scala | 3 +- .../bookkeeper/BookkeeperJdbcSuite.scala | 3 +- .../tests/bookkeeper/BookkeeperSuite.scala | 3 +- .../bookkeeper/OffsetManagerJdbcSuite.scala | 3 +- .../core/tests/journal/JournalJdbcSuite.scala | 3 +- .../core/tests/lock/TokenLockJdbcSuite.scala | 3 +- 12 files changed, 32 insertions(+), 27 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala index 79c08636e..cbcbf95f2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala @@ -81,7 +81,6 @@ object Bookkeeper { val dbOpt = if (hasBookkeepingJdbc) { val jdbcConfig = bookkeepingConfig.bookkeepingJdbcConfig.get val syncDb = PramenDb(jdbcConfig) - syncDb.setupDatabase() Option(syncDb) } else None diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index 155b3fa4f..87157f673 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -34,7 +34,6 @@ import scala.util.control.NonFatal class PramenDb(val jdbcConfig: JdbcConfig, val activeUrl: String, - val jdbcConnection: Connection, val slickDb: Database, val profile: JdbcProfile) extends AutoCloseable { def db: Database = slickDb @@ -44,7 +43,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, private val log = LoggerFactory.getLogger(this.getClass) - def setupDatabase(): Unit = { + def setupDatabase(jdbcConnection: Connection): Unit = { // Explicitly set auto-commit to true, overriding any user JDBC settings or PostgreSQL defaults Try(jdbcConnection.setAutoCommit(true)).recover { case NonFatal(e) => log.warn(s"Unable to set autoCommit=true for the bookkeeping database that uses the driver: ${jdbcConfig.driver}.") @@ -131,7 +130,6 @@ class PramenDb(val jdbcConfig: JdbcConfig, override def close(): Unit = { - if (!jdbcConnection.isClosed) jdbcConnection.close() slickDb.close() } } @@ -145,9 +143,17 @@ object PramenDb { val BACKOFF_MAX_MS = 20000 def apply(jdbcConfig: JdbcConfig): PramenDb = { - val (url, conn, database, profile) = openDb(jdbcConfig) + val (url, connection) = getConnection(jdbcConfig) - new PramenDb(jdbcConfig, url, conn, database, profile) + val (database, profile) = openDb(jdbcConfig, url) + + val pramenDb = new PramenDb(jdbcConfig, url, database, profile) + + UsingUtils.using(connection) { conn => + pramenDb.setupDatabase(conn) + } + + pramenDb } def getProfile(driver: String): JdbcProfile = { @@ -164,10 +170,17 @@ object PramenDb { } } - def openDb(jdbcConfig: JdbcConfig): (String, Connection, Database, JdbcProfile) = { + def getConnection(jdbcConfig: JdbcConfig): (String, Connection) = { val numberOfAttempts = jdbcConfig.retries.getOrElse(DEFAULT_RETRIES) val selector = JdbcUrlSelector(jdbcConfig) val (conn, url) = selector.getWorkingConnection(numberOfAttempts) + + (url, conn) + } + + def openDb(jdbcConfig: JdbcConfig, workingUrl: String): (Database, JdbcProfile) = { + val numberOfAttempts = jdbcConfig.retries.getOrElse(DEFAULT_RETRIES) + val selector = JdbcUrlSelector(jdbcConfig) val prop = selector.getProperties val slickProfile = getProfile(jdbcConfig.driver) @@ -175,12 +188,12 @@ object PramenDb { var database: JdbcBackend.DatabaseDef = null AlgorithmUtils.actionWithRetry(numberOfAttempts, log, BACKOFF_MIN_MS, BACKOFF_MAX_MS) { database = jdbcConfig.user match { - case Some(user) => Database.forURL(url = url, driver = jdbcConfig.driver, user = user, password = jdbcConfig.password.getOrElse(""), prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) - case None => Database.forURL(url = url, driver = jdbcConfig.driver, prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) + case Some(user) => Database.forURL(url = workingUrl, driver = jdbcConfig.driver, user = user, password = jdbcConfig.password.getOrElse(""), prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) + case None => Database.forURL(url = workingUrl, driver = jdbcConfig.driver, prop = prop, executor = AsyncExecutor("Rdb", 2, 10)) } } - (url, conn, database, slickProfile) + (database, slickProfile) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala index 356d34702..82d7fc786 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala @@ -25,7 +25,7 @@ import za.co.absa.pramen.core.reader.model.JdbcConfig import java.sql.{Connection, SQLException} import scala.util.control.NonFatal -class RdbJdbc(connection: Connection) extends AutoCloseable with Rdb{ +class RdbJdbc(val connection: Connection) extends AutoCloseable with Rdb{ private val log = LoggerFactory.getLogger(this.getClass) override def getVersion(): Int = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala index fd717dca6..45984d8ca 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala @@ -89,7 +89,7 @@ object AlgorithmUtils { if (attemptsLeft < 1) { throw ex } else { - if (backoffMaxMs > backoffMinMs && backoffMaxMs > 0) { + if (backoffMaxMs > backoffMinMs && backoffMinMs > 0) { val backoffMs = Random.nextInt(backoffMaxMs - backoffMinMs) + backoffMinMs val backoffS = backoffMs / 1000 log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconnds...") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala index 1c9d405f2..4ca10570e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala @@ -52,9 +52,9 @@ class IncrementalPipelineJdbcLongSuite extends AnyWordSpec before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - pramenDb.setupDatabase() RdbExampleTable.IncrementalTable.initTable(getConnection) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 447f23432..182ef79d9 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -51,9 +51,8 @@ class IncrementalPipelineLongFixture extends AnyWordSpec before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - - pramenDb.setupDatabase() } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala index 15bfbd123..675a23caf 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala @@ -34,9 +34,8 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - - pramenDb.setupDatabase() } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index a377a4526..e13d59581 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -30,9 +30,8 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - - pramenDb.setupDatabase() } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala index 33f87c5fd..99a69cfe8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala @@ -48,10 +48,9 @@ class BookkeeperSuite extends AnyWordSpec before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - pramenDb.setupDatabase() - if (db != null) { if (db.doesCollectionExists(collectionName)) { db.dropCollection(collectionName) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala index 3e31b37cd..9911fc4b9 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -36,9 +36,8 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - - pramenDb.setupDatabase() } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala index 9ae6be38b..5625ecc8f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala @@ -33,9 +33,8 @@ class JournalJdbcSuite extends AnyWordSpec with SparkTestBase with BeforeAndAfte before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - - pramenDb.setupDatabase() } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala index 2b7fa8cb5..cf639d1da 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala @@ -34,9 +34,8 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor before { val rdb = RdbJdbc(jdbcConfig) rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase(rdb.connection) rdb.close() - - pramenDb.setupDatabase() } override def afterAll(): Unit = { From f72618f4bc54d770c644c4ba05bfd9899ae76337 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 26 Jan 2026 17:35:16 +0100 Subject: [PATCH 4/6] #699 Fix more issues with Paramdn DB creation logic. --- .../za/co/absa/pramen/core/rdb/PramenDb.scala | 18 ++++++++++++------ .../za/co/absa/pramen/core/rdb/RdbJdbc.scala | 7 ++++--- .../core/reader/JdbcUrlSelectorImpl.scala | 6 +++--- .../pramen/core/utils/AlgorithmUtils.scala | 2 +- .../IncrementalPipelineJdbcLongSuite.scala | 12 ++++++------ .../IncrementalPipelineLongFixture.scala | 12 ++++++------ .../metadata/MetadataManagerJdbcSuite.scala | 11 ++++++----- .../tests/bookkeeper/BookkeeperJdbcSuite.scala | 11 ++++++----- .../tests/bookkeeper/BookkeeperSuite.scala | 11 ++++++----- .../bookkeeper/OffsetManagerJdbcSuite.scala | 11 ++++++----- .../core/tests/journal/JournalJdbcSuite.scala | 11 ++++++----- .../core/tests/lock/TokenLockJdbcSuite.scala | 11 ++++++----- .../core/tests/utils/UsingUtilsSuite.scala | 16 ++++++++++++++++ 13 files changed, 84 insertions(+), 55 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index 87157f673..10ff9fab5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -43,7 +43,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, private val log = LoggerFactory.getLogger(this.getClass) - def setupDatabase(jdbcConnection: Connection): Unit = { + private def setupDatabase(jdbcConnection: Connection): Unit = { // Explicitly set auto-commit to true, overriding any user JDBC settings or PostgreSQL defaults Try(jdbcConnection.setAutoCommit(true)).recover { case NonFatal(e) => log.warn(s"Unable to set autoCommit=true for the bookkeeping database that uses the driver: ${jdbcConfig.driver}.") @@ -58,7 +58,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, } } - def initDatabase(dbVersion: Int): Unit = { + private def initDatabase(dbVersion: Int): Unit = { log.warn(s"Initializing new database at $activeUrl") if (dbVersion < 1) { initTable(LockTickets.lockTickets.schema) @@ -103,7 +103,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, } } - def initTable(schema: profile.SchemaDescription): Unit = { + private def initTable(schema: profile.SchemaDescription): Unit = { try { db.run(DBIO.seq( schema.createIfNotExists @@ -115,7 +115,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, } } - def addColumn(table: String, columnName: String, columnType: String): Unit = { + private def addColumn(table: String, columnName: String, columnType: String): Unit = { try { val quotedTable = s""""$table"""" val quotedColumnName = s""""$columnName"""" @@ -130,7 +130,13 @@ class PramenDb(val jdbcConfig: JdbcConfig, override def close(): Unit = { - slickDb.close() + try { + slickDb.close() + } catch { + case NonFatal(ex) => + log.warn("Error closing the Pramen RDB database connection.", ex) + } + } } @@ -153,7 +159,7 @@ object PramenDb { pramenDb.setupDatabase(conn) } - pramenDb + pramenDb } def getProfile(driver: String): JdbcProfile = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala index 82d7fc786..6d0aa55e9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/RdbJdbc.scala @@ -21,6 +21,7 @@ import za.co.absa.pramen.core.rdb.PramenDb.DEFAULT_RETRIES import za.co.absa.pramen.core.rdb.RdbJdbc.dbVersionTableName import za.co.absa.pramen.core.reader.JdbcUrlSelector import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.UsingUtils import java.sql.{Connection, SQLException} import scala.util.control.NonFatal @@ -60,9 +61,9 @@ class RdbJdbc(val connection: Connection) extends AutoCloseable with Rdb{ } override def executeDDL(ddl: String): Unit = { - val statement = connection.createStatement() - statement.execute(ddl) - statement.close() + UsingUtils.using(connection.createStatement()) { statement => + statement.execute(ddl) + } } private def getDbVersion(): Int = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala index f80b2e073..b4b6445c0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/JdbcUrlSelectorImpl.scala @@ -26,10 +26,10 @@ import java.util.Properties import scala.util.{Failure, Random, Success, Try} class JdbcUrlSelectorImpl(val jdbcConfig: JdbcConfig) extends JdbcUrlSelector{ - val BACKOFF_MIN_S = 1 - val BACKOFF_MAX_S = 10 - private val log = LoggerFactory.getLogger(this.getClass) + + private val BACKOFF_MIN_S = 1 + private val BACKOFF_MAX_S = 10 private val allUrls = (jdbcConfig.primaryUrl ++ jdbcConfig.fallbackUrls).toSeq private val numberOfUrls = allUrls.size private var urlPool = allUrls diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala index 45984d8ca..984fe8d47 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala @@ -92,7 +92,7 @@ object AlgorithmUtils { if (backoffMaxMs > backoffMinMs && backoffMinMs > 0) { val backoffMs = Random.nextInt(backoffMaxMs - backoffMinMs) + backoffMinMs val backoffS = backoffMs / 1000 - log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconnds...") + log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconds...") Thread.sleep(backoffMs) } else { log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala index 4ca10570e..0dffe0321 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala @@ -28,7 +28,7 @@ import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.runner.AppRunner import za.co.absa.pramen.core.samples.RdbExampleTable -import za.co.absa.pramen.core.utils.{JdbcNativeUtils, ResourceUtils} +import za.co.absa.pramen.core.utils.{JdbcNativeUtils, ResourceUtils, UsingUtils} import java.sql.Date import java.time.LocalDate @@ -42,7 +42,7 @@ class IncrementalPipelineJdbcLongSuite extends AnyWordSpec with TextComparisonFixture { val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ private val infoDate = LocalDate.of(2021, 2, 18) @@ -50,10 +50,10 @@ class IncrementalPipelineJdbcLongSuite extends AnyWordSpec private val INFO_DATE_COLUMN = "pramen_info_date" before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) RdbExampleTable.IncrementalTable.initTable(getConnection) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 182ef79d9..f54b0923d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -32,7 +32,7 @@ import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.runner.AppRunner -import za.co.absa.pramen.core.utils.{FsUtils, JdbcNativeUtils, ResourceUtils} +import za.co.absa.pramen.core.utils.{FsUtils, JdbcNativeUtils, ResourceUtils, UsingUtils} import java.sql.Date import java.time.LocalDate @@ -46,13 +46,13 @@ class IncrementalPipelineLongFixture extends AnyWordSpec with TextComparisonFixture { val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala index 675a23caf..2e4ab257a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala @@ -22,20 +22,21 @@ import za.co.absa.pramen.api.MetadataValue import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.UsingUtils import java.time.{LocalDate, ZoneOffset} class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ private val infoDate = LocalDate.of(2021, 2, 18) private val exampleInstant = infoDate.atStartOfDay().toInstant(ZoneOffset.UTC) before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index e13d59581..eecfb4dfc 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -21,17 +21,18 @@ import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperJdbc} import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.UsingUtils class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala index 99a69cfe8..9d421c667 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala @@ -28,6 +28,7 @@ import za.co.absa.pramen.core.lock.{TokenLockFactoryAllow, TokenLockFactoryHadoo import za.co.absa.pramen.core.metadata.{MetadataManagerJdbc, MetadataManagerNull} import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.UsingUtils import za.co.absa.pramen.core.{BookkeepingConfigFactory, RuntimeConfigFactory} import java.nio.file.Paths @@ -43,13 +44,13 @@ class BookkeeperSuite extends AnyWordSpec import za.co.absa.pramen.core.bookkeeper.BookkeeperMongoDb._ val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Option(user), Option(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) if (db != null) { if (db.doesCollectionExists(collectionName)) { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala index 9911fc4b9..f542d1a6e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -24,20 +24,21 @@ import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerCached, Of import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.UsingUtils import java.time.{Instant, LocalDate} class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ private val infoDate = LocalDate.of(2023, 8, 25) before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala index 5625ecc8f..453c354b0 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala @@ -23,18 +23,19 @@ import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.journal.{Journal, JournalJdbc} import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.UsingUtils class JournalJdbcSuite extends AnyWordSpec with SparkTestBase with BeforeAndAfter with BeforeAndAfterAll with RelationalDbFixture { import TestCases._ val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Option(user), Option(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala index cf639d1da..597b5f61f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala @@ -24,18 +24,19 @@ import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.lock.TokenLockJdbc import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.utils.UsingUtils import scala.concurrent.duration._ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Option(user), Option(password)) - lazy val pramenDb: PramenDb = PramenDb(jdbcConfig) + var pramenDb: PramenDb = _ before { - val rdb = RdbJdbc(jdbcConfig) - rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase(rdb.connection) - rdb.close() + UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => + rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + } + pramenDb = PramenDb(jdbcConfig) } override def afterAll(): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala index 9c76a631a..ce536bbca 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/UsingUtilsSuite.scala @@ -54,6 +54,22 @@ class UsingUtilsSuite extends AnyWordSpec { assert(resource.closeCallCount == 1) } + "throw an exception on null resource" in { + var exceptionThrown = false + + try { + UsingUtils.using(null: AutoCloseableSpy) { res => + res.dummyAction() + } + } catch { + case ex: Throwable => + assert(ex.getMessage.contains("Resource must not be null")) + exceptionThrown = true + } + + assert(exceptionThrown) + } + "handle exceptions when a resource is created" in { var exceptionThrown = false var resource: AutoCloseableSpy = null From ba0d1e3c99d3f2e38baba07eb8d1327b6cbcb398 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 27 Jan 2026 07:53:40 +0100 Subject: [PATCH 5/6] #699 Fix a minor possible resource leak in tests and increase the backoff time a little. --- .../src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala | 4 ++-- .../core/integration/IncrementalPipelineJdbcLongSuite.scala | 3 ++- .../core/integration/IncrementalPipelineLongFixture.scala | 3 ++- .../absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala | 3 ++- .../pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala | 3 ++- .../absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala | 5 +++++ .../core/tests/bookkeeper/OffsetManagerJdbcSuite.scala | 3 ++- .../co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala | 3 ++- .../co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala | 3 ++- 9 files changed, 21 insertions(+), 9 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index 10ff9fab5..b6e3c9945 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -145,8 +145,8 @@ object PramenDb { val MODEL_VERSION = 9 val DEFAULT_RETRIES = 3 - val BACKOFF_MIN_MS = 1000 - val BACKOFF_MAX_MS = 20000 + val BACKOFF_MIN_MS = 10000 + val BACKOFF_MAX_MS = 60000 def apply(jdbcConfig: JdbcConfig): PramenDb = { val (url, connection) = getConnection(jdbcConfig) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala index 0dffe0321..ad63b5cba 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineJdbcLongSuite.scala @@ -50,6 +50,7 @@ class IncrementalPipelineJdbcLongSuite extends AnyWordSpec private val INFO_DATE_COLUMN = "pramen_info_date" before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -59,7 +60,7 @@ class IncrementalPipelineJdbcLongSuite extends AnyWordSpec } override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index f54b0923d..a28dc58a8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -49,6 +49,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec var pramenDb: PramenDb = _ before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -56,7 +57,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec } override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala index 2e4ab257a..cae9312d8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala @@ -33,6 +33,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with private val exampleInstant = infoDate.atStartOfDay().toInstant(ZoneOffset.UTC) before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -40,7 +41,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with } override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index eecfb4dfc..b0ae67c8c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -29,6 +29,7 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture var pramenDb: PramenDb = _ before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -36,7 +37,7 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture } override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala index 9d421c667..e9c2fa318 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala @@ -47,6 +47,7 @@ class BookkeeperSuite extends AnyWordSpec var pramenDb: PramenDb = _ before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -62,6 +63,10 @@ class BookkeeperSuite extends AnyWordSpec } } + override def afterAll(): Unit = { + if (pramenDb != null) pramenDb.close() + super.afterAll() + } val runtimeConfig: RuntimeConfig = RuntimeConfigFactory.getDummyRuntimeConfig( useLocks = true diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala index f542d1a6e..7dc0e0c88 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -35,6 +35,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B private val infoDate = LocalDate.of(2023, 8, 25) before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -42,7 +43,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B } override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala index 453c354b0..a74510c51 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalJdbcSuite.scala @@ -32,6 +32,7 @@ class JournalJdbcSuite extends AnyWordSpec with SparkTestBase with BeforeAndAfte var pramenDb: PramenDb = _ before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -39,7 +40,7 @@ class JournalJdbcSuite extends AnyWordSpec with SparkTestBase with BeforeAndAfte } override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala index 597b5f61f..a23903e9f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala @@ -33,6 +33,7 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor var pramenDb: PramenDb = _ before { + if (pramenDb != null) pramenDb.close() UsingUtils.using(RdbJdbc(jdbcConfig)) { rdb => rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") } @@ -40,7 +41,7 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor } override def afterAll(): Unit = { - pramenDb.close() + if (pramenDb != null) pramenDb.close() super.afterAll() } From ed186cccc845385269e8c1c35fe09858f4cc98e8 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 27 Jan 2026 11:23:38 +0100 Subject: [PATCH 6/6] #699 Fix last minute PR suggestions. --- .../main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala | 9 +++------ .../pramen/core/metadata/MetadataManagerJdbcSuite.scala | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index b6e3c9945..5f6a2857e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -151,15 +151,12 @@ object PramenDb { def apply(jdbcConfig: JdbcConfig): PramenDb = { val (url, connection) = getConnection(jdbcConfig) - val (database, profile) = openDb(jdbcConfig, url) - - val pramenDb = new PramenDb(jdbcConfig, url, database, profile) - UsingUtils.using(connection) { conn => + val (database, profile) = openDb(jdbcConfig, url) + val pramenDb = new PramenDb(jdbcConfig, url, database, profile) pramenDb.setupDatabase(conn) + pramenDb } - - pramenDb } def getProfile(driver: String): JdbcProfile = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala index cae9312d8..fb475bf2d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metadata/MetadataManagerJdbcSuite.scala @@ -168,7 +168,7 @@ class MetadataManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with assert(ex.getMessage.contains("Unable to delete from the metadata table.")) } - "throw an exception on connection errors when deleting metadata from a partision" in { + "throw an exception on connection errors when deleting metadata from a partition" in { val metadata = new MetadataManagerJdbc(null) val ex = intercept[RuntimeException] {