From 2b59f0e7cf5ff7f0e3eba9ad13d304491bb17b3f Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 28 Jan 2026 15:11:07 +0100 Subject: [PATCH 1/2] #703 Update backfill configuration and improve error logging in AlgorithmUtils --- pramen/core/src/main/resources/reference.conf | 17 ++++++++--------- .../absa/pramen/core/utils/AlgorithmUtils.scala | 4 ++-- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf index c0f9e6b2..a20aa8cc 100644 --- a/pramen/core/src/main/resources/reference.conf +++ b/pramen/core/src/main/resources/reference.conf @@ -68,15 +68,15 @@ pramen { wait.for.output.table.enabled = false wait.for.output.table.seconds = 600 - # How many days to check back for late data (when there is no data at the source for that day) - # 0 - never check previous days, even if no data at the source happened - # 1 - check only the current info date if you run the job more than once per day - # 2 - check the latest info date and the date before + # Backfill window for missing data when a source has no data for an info date. + # -1 - look back up to the previous successful info date with data + # 0 - do not look back at previous dates, even if the source is empty + # 1 - only recheck the current info date (useful if rerunning the job on the same day) + # 2 - recheck the current info date and the previous day # etc... - # You can also set this parameter for individual tables in the metastore. - # NOTE. By default it is the same as track.days, so it won't take effect. In order for it to take effect you - # need to either increase 'backfill.days' and/or decrease 'track.days' - backfill.days = 5 + # You can also set this parameter per table in the metastore. + # NOTE: By default this parameter is set to -1 to be compatible with the behavior before this option was introduced, + backfill.days = -1 # How many days to check back for retrospective updates # 0 - never check for updates after the data is loaded @@ -289,4 +289,3 @@ hadoop.redacted.tokens = [ password, secret, session.token, access.key ] # java.security.auth.login.config = "" # java.security.krb5.conf = "" # javax.net.debug = "" - diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala index 984fe8d4..cd9c91f6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala @@ -92,10 +92,10 @@ object AlgorithmUtils { if (backoffMaxMs > backoffMinMs && backoffMinMs > 0) { val backoffMs = Random.nextInt(backoffMaxMs - backoffMinMs) + backoffMinMs val backoffS = backoffMs / 1000 - log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconds...") + log.error(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconds...") Thread.sleep(backoffMs) } else { - log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...") + log.error(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...") } actionWithRetry(attemptsLeft, log, backoffMinMs, backoffMaxMs)(action) From 59dc8448a0eb8aad959881e840f08cfc11dfd548 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 28 Jan 2026 15:20:10 +0100 Subject: [PATCH 2/2] #703 Do not log evaluation of info date expressions in places outside schedule strategy. This is redundant and confusing. --- .../pramen/core/runner/orchestrator/OrchestratorImpl.scala | 2 +- .../pramen/core/runner/splitter/ScheduleStrategyUtils.scala | 6 +++--- .../absa/pramen/core/app/config/InfoDateConfigSuite.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala index 5ff843c5..0344d9e5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala @@ -260,7 +260,7 @@ class OrchestratorImpl extends Orchestrator { val infoDates = jobs.flatMap { job => if (job.operation.schedule.isEnabled(runDate)) { val infoDateExpression = job.operation.outputInfoDateExpression - val infoDate = evaluateRunDate(runDate, infoDateExpression).toString + val infoDate = evaluateRunDate(runDate, infoDateExpression, logExpression = false).toString Option(infoDate) } else { None diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala index f899f92d..042881d7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala @@ -51,7 +51,7 @@ object ScheduleStrategyUtils { bookkeeper: Bookkeeper ): List[TaskPreDef] = { if (schedule.isEnabled(runDate)) { - val infoDate = evaluateRunDate(runDate, infoDateExpression) + val infoDate = evaluateRunDate(runDate, infoDateExpression, logExpression = false) bookkeeper.getLatestDataChunk(outputTable, infoDate) match { case Some(_) => @@ -82,7 +82,7 @@ object ScheduleStrategyUtils { infoDateExpression: String ): Option[TaskPreDef] = { if (schedule.isEnabled(runDate)) { - val infoDate = evaluateRunDate(runDate, infoDateExpression) + val infoDate = evaluateRunDate(runDate, infoDateExpression, logExpression = false) log.info(s"For $outputTable $runDate is one of scheduled days. Adding infoDate = '$infoDateExpression' = $infoDate to check.") @@ -216,7 +216,7 @@ object ScheduleStrategyUtils { val end = dateTo.plusDays(1) while (date.isBefore(end)) { if (schedule.isEnabled(date)) { - val infoDate = evaluateRunDate(date, infoDateExpression) + val infoDate = evaluateRunDate(date, infoDateExpression, logExpression = false) if (uniqueInfoDates.add(infoDate)) { infoDates += infoDate } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/InfoDateConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/InfoDateConfigSuite.scala index b16d0ff1..02994703 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/InfoDateConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/InfoDateConfigSuite.scala @@ -74,7 +74,7 @@ class InfoDateConfigSuite extends AnyWordSpec { assert(runtimeConfig.columnName == "pramen_info_date") assert(runtimeConfig.dateFormat == "yyyy-MM-dd") - assert(runtimeConfig.defaultBackfillDays == 5) + assert(runtimeConfig.defaultBackfillDays == -1) assert(runtimeConfig.defaultTrackDays == 5) assert(runtimeConfig.defaultDelayDays == 0)