Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ object Bookkeeper {
val dbOpt = if (hasBookkeepingJdbc) {
val jdbcConfig = bookkeepingConfig.bookkeepingJdbcConfig.get
val syncDb = PramenDb(jdbcConfig)
syncDb.setupDatabase()
Option(syncDb)
} else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val record = BookkeepingRecord(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, recordsAppended, jobStarted, jobFinished, Option(batchId))

try {
SlickUtils.ensureDbConnected(db)
db.run(
BookkeepingRecords.records += record
).execute()
Expand Down Expand Up @@ -230,6 +231,7 @@ class BookkeeperJdbc(db: Database, profile: JdbcProfile, batchId: Long) extends
val infoDateStr = infoDate.toString

try {
SlickUtils.ensureDbConnected(db)
db.run(
SchemaRecords.records.filter(t => t.pramenTableName === table && t.infoDate === infoDateStr).delete
).execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {

val record = OffsetRecord(table, infoDate.toString, offsetType.dataTypeString, "", "", batchId, createdAt.toEpochMilli, None)

SlickUtils.ensureDbConnected(db)
db.run(
OffsetRecords.records += record
).execute()
Expand All @@ -83,6 +84,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
override def commitOffsets(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
val committedAt = Instant.now().toEpochMilli

SlickUtils.ensureDbConnected(db)
db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli)
Expand All @@ -98,6 +100,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {

val committedAt = Instant.now().toEpochMilli

SlickUtils.ensureDbConnected(db)
db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli)
Expand All @@ -121,6 +124,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAtMilli))
}

SlickUtils.ensureDbConnected(db)
db.run(
OffsetRecords.records ++= records
).execute()
Expand All @@ -137,6 +141,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
}

override def rollbackOffsets(request: DataOffsetRequest): Unit = {
SlickUtils.ensureDbConnected(db)
db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt.toEpochMilli)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class JournalJdbc(db: Database) extends Journal {
Option(entry.batchId))

try {
SlickUtils.ensureDbConnected(db)
db.run(
JournalTasks.journalTasks += journalTask
).execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class TokenLockJdbc(token: String, db: Database) extends TokenLockBase(token) {

/** Invoked from a synchronized block. */
override def tryAcquireGuardLock(retries: Int = 3, thisTry: Int = 0): Boolean = {
SlickUtils.ensureDbConnected(db)

def tryAcquireExistingTicket(): Boolean = {
val ticket = getTicket

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
val record = MetadataRecord(tableName, infoDate.toString, key, metadata.value, metadata.lastUpdated.getEpochSecond)

try {
SlickUtils.ensureDbConnected(db)
db.run(
MetadataRecords.records
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString && r.key === key)
Expand All @@ -76,6 +77,7 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString && r.key === key)

try {
SlickUtils.ensureDbConnected(db)
db.run(query.delete).execute()
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to delete from the metadata table.", ex)
Expand All @@ -87,6 +89,7 @@ class MetadataManagerJdbc(db: Database) extends MetadataManagerBase(true) {
.filter(r => r.pramenTableName === tableName && r.infoDate === infoDate.toString)

try {
SlickUtils.ensureDbConnected(db)
db.run(query.delete).execute()
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to delete from the metadata table.", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ package za.co.absa.pramen.core.rdb

import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile
import slick.jdbc.{JdbcBackend, JdbcProfile}
import slick.util.AsyncExecutor
import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingRecords, MetadataRecords, OffsetRecords, SchemaRecords}
import za.co.absa.pramen.core.journal.model.JournalTasks
import za.co.absa.pramen.core.lock.model.LockTickets
import za.co.absa.pramen.core.rdb.PramenDb.MODEL_VERSION
import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.utils.{AlgorithmUtils, UsingUtils}

import java.sql.Connection
import scala.util.Try
import scala.util.control.NonFatal

class PramenDb(val jdbcConfig: JdbcConfig,
val activeUrl: String,
val jdbcConnection: Connection,
val slickDb: Database,
val profile: JdbcProfile) extends AutoCloseable {
def db: Database = slickDb
Expand All @@ -43,22 +43,22 @@ class PramenDb(val jdbcConfig: JdbcConfig,

private val log = LoggerFactory.getLogger(this.getClass)

val rdb: Rdb = new RdbJdbc(jdbcConnection)

def setupDatabase(): Unit = {
private def setupDatabase(jdbcConnection: Connection): Unit = {
// Explicitly set auto-commit to true, overriding any user JDBC settings or PostgreSQL defaults
Try(jdbcConnection.setAutoCommit(true)).recover {
case NonFatal(e) => log.warn(s"Unable to set autoCommit=true for the bookkeeping database that uses the driver: ${jdbcConfig.driver}.")
}

val dbVersion = rdb.getVersion()
if (dbVersion < MODEL_VERSION) {
initDatabase(dbVersion)
rdb.setVersion(MODEL_VERSION)
UsingUtils.using(new RdbJdbc(jdbcConnection)) { rdb =>
val dbVersion = rdb.getVersion()
if (dbVersion < MODEL_VERSION) {
initDatabase(dbVersion)
rdb.setVersion(MODEL_VERSION)
}
}
}

def initDatabase(dbVersion: Int): Unit = {
private def initDatabase(dbVersion: Int): Unit = {
log.warn(s"Initializing new database at $activeUrl")
if (dbVersion < 1) {
initTable(LockTickets.lockTickets.schema)
Expand Down Expand Up @@ -103,7 +103,7 @@ class PramenDb(val jdbcConfig: JdbcConfig,
}
}

def initTable(schema: profile.SchemaDescription): Unit = {
private def initTable(schema: profile.SchemaDescription): Unit = {
try {
db.run(DBIO.seq(
schema.createIfNotExists
Expand All @@ -115,7 +115,7 @@ class PramenDb(val jdbcConfig: JdbcConfig,
}
}

def addColumn(table: String, columnName: String, columnType: String): Unit = {
private def addColumn(table: String, columnName: String, columnType: String): Unit = {
try {
val quotedTable = s""""$table""""
val quotedColumnName = s""""$columnName""""
Expand All @@ -130,19 +130,33 @@ class PramenDb(val jdbcConfig: JdbcConfig,


override def close(): Unit = {
jdbcConnection.close()
slickDb.close()
try {
slickDb.close()
} catch {
case NonFatal(ex) =>
log.warn("Error closing the Pramen RDB database connection.", ex)
}

}
}

object PramenDb {
private val log = LoggerFactory.getLogger(this.getClass)

val MODEL_VERSION = 9
val DEFAULT_RETRIES = 3
val BACKOFF_MIN_MS = 10000
val BACKOFF_MAX_MS = 60000

def apply(jdbcConfig: JdbcConfig): PramenDb = {
val (url, conn, database, profile) = openDb(jdbcConfig)
val (url, connection) = getConnection(jdbcConfig)

new PramenDb(jdbcConfig, url, conn, database, profile)
UsingUtils.using(connection) { conn =>
val (database, profile) = openDb(jdbcConfig, url)
val pramenDb = new PramenDb(jdbcConfig, url, database, profile)
pramenDb.setupDatabase(conn)
pramenDb
}
}

def getProfile(driver: String): JdbcProfile = {
Expand All @@ -159,20 +173,30 @@ object PramenDb {
}
}

private def openDb(jdbcConfig: JdbcConfig): (String, Connection, Database, JdbcProfile) = {
def getConnection(jdbcConfig: JdbcConfig): (String, Connection) = {
val numberOfAttempts = jdbcConfig.retries.getOrElse(DEFAULT_RETRIES)
val selector = JdbcUrlSelector(jdbcConfig)
val (conn, url) = selector.getWorkingConnection(numberOfAttempts)

(url, conn)
}

def openDb(jdbcConfig: JdbcConfig, workingUrl: String): (Database, JdbcProfile) = {
val numberOfAttempts = jdbcConfig.retries.getOrElse(DEFAULT_RETRIES)
val selector = JdbcUrlSelector(jdbcConfig)
val (conn, url) = selector.getWorkingConnection(DEFAULT_RETRIES)
val prop = selector.getProperties

val slickProfile = getProfile(jdbcConfig.driver)

val database = jdbcConfig.user match {
case Some(user) => Database.forURL(url = url, driver = jdbcConfig.driver, user = user, password = jdbcConfig.password.getOrElse(""), prop = prop, executor = AsyncExecutor("Rdb", 2, 10))
case None => Database.forURL(url = url, driver = jdbcConfig.driver, prop = prop, executor = AsyncExecutor("Rdb", 2, 10))
var database: JdbcBackend.DatabaseDef = null
AlgorithmUtils.actionWithRetry(numberOfAttempts, log, BACKOFF_MIN_MS, BACKOFF_MAX_MS) {
database = jdbcConfig.user match {
case Some(user) => Database.forURL(url = workingUrl, driver = jdbcConfig.driver, user = user, password = jdbcConfig.password.getOrElse(""), prop = prop, executor = AsyncExecutor("Rdb", 2, 10))
case None => Database.forURL(url = workingUrl, driver = jdbcConfig.driver, prop = prop, executor = AsyncExecutor("Rdb", 2, 10))
}
}

(url, conn, database, slickProfile)
(database, slickProfile)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
package za.co.absa.pramen.core.rdb

import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.rdb.PramenDb.DEFAULT_RETRIES
import za.co.absa.pramen.core.rdb.RdbJdbc.dbVersionTableName
import za.co.absa.pramen.core.reader.JdbcUrlSelector
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.utils.UsingUtils

import java.sql.{Connection, SQLException}
import scala.util.control.NonFatal

object RdbJdbc {
val dbVersionTableName = "db_version"
}

class RdbJdbc(connection: Connection) extends Rdb{
class RdbJdbc(val connection: Connection) extends AutoCloseable with Rdb{
private val log = LoggerFactory.getLogger(this.getClass)

override def getVersion(): Int = {
Expand Down Expand Up @@ -61,9 +61,9 @@ class RdbJdbc(connection: Connection) extends Rdb{
}

override def executeDDL(ddl: String): Unit = {
val statement = connection.createStatement()
statement.execute(ddl)
statement.close()
UsingUtils.using(connection.createStatement()) { statement =>
statement.execute(ddl)
}
}

private def getDbVersion(): Int = {
Expand All @@ -80,4 +80,17 @@ class RdbJdbc(connection: Connection) extends Rdb{
dbVersion
}

override def close(): Unit = if (!connection.isClosed) connection.close()
}

object RdbJdbc {
val dbVersionTableName = "db_version"

def apply(jdbcConfig: JdbcConfig): RdbJdbc = {
val numberOfAttempts = jdbcConfig.retries.getOrElse(DEFAULT_RETRIES)
val selector = JdbcUrlSelector(jdbcConfig)
val (conn, _) = selector.getWorkingConnection(numberOfAttempts)

new RdbJdbc(conn)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import scala.util.{Failure, Random, Success, Try}

class JdbcUrlSelectorImpl(val jdbcConfig: JdbcConfig) extends JdbcUrlSelector{
private val log = LoggerFactory.getLogger(this.getClass)

private val BACKOFF_MIN_S = 1
private val BACKOFF_MAX_S = 10
private val allUrls = (jdbcConfig.primaryUrl ++ jdbcConfig.fallbackUrls).toSeq
private val numberOfUrls = allUrls.size
private var urlPool = allUrls
Expand Down Expand Up @@ -102,15 +105,16 @@ class JdbcUrlSelectorImpl(val jdbcConfig: JdbcConfig) extends JdbcUrlSelector{
@throws[SQLException]
def getWorkingConnection(retriesLeft: Int): (Connection, String) = {
val currentUrl = getUrl

Try {
JdbcNativeUtils.getJdbcConnection(jdbcConfig, currentUrl)
} match {
case Success(connection) => (connection, currentUrl)
case Failure(ex) =>
if (retriesLeft > 1) {
val newUrl = getNextUrl
log.error(s"JDBC connection error for $currentUrl. Retries left: ${retriesLeft - 1}. Retrying...", ex)
val backoffS = Random.nextInt(BACKOFF_MAX_S - BACKOFF_MIN_S) + BACKOFF_MIN_S
log.error(s"JDBC connection error for $currentUrl. Retries left: ${retriesLeft - 1}. Retrying... in $backoffS seconds", ex)
Thread.sleep(backoffS * 1000)
log.info(s"Trying URL: $newUrl")
getWorkingConnection(retriesLeft - 1)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.slf4j.Logger
import java.time.{Duration, Instant}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.Random

object AlgorithmUtils {
/** Finds which strings are encountered multiple times (case insensitive). */
Expand Down Expand Up @@ -70,7 +71,7 @@ object AlgorithmUtils {
}

@tailrec
final def actionWithRetry(attempts: Int, log: Logger)(action: => Unit): Unit = {
final def actionWithRetry(attempts: Int, log: Logger, backoffMinMs: Int = 0, backoffMaxMs: Int = 0)(action: => Unit): Unit = {
def getErrorMessage(ex: Throwable): String = {
if (ex.getCause == null) {
ex.getMessage
Expand All @@ -88,8 +89,16 @@ object AlgorithmUtils {
if (attemptsLeft < 1) {
throw ex
} else {
log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...")
actionWithRetry(attemptsLeft, log)(action)
if (backoffMaxMs > backoffMinMs && backoffMinMs > 0) {
val backoffMs = Random.nextInt(backoffMaxMs - backoffMinMs) + backoffMinMs
val backoffS = backoffMs / 1000
log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconds...")
Thread.sleep(backoffMs)
} else {
log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...")
}

actionWithRetry(attemptsLeft, log, backoffMinMs, backoffMaxMs)(action)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package za.co.absa.pramen.core.utils

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{Await, Future}

object FutureImplicits {
private val executionTimeout = Duration(300, TimeUnit.SECONDS)
val executionTimeout: FiniteDuration = Duration(300, TimeUnit.SECONDS)

implicit class FutureExecutor[T](future: Future[T]) {
def execute(): T = Await.result(future, executionTimeout)
Expand Down
Loading
Loading