From ac17abc21241f98891e7e2110773cfe989d8bc5d Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 28 Jan 2026 09:05:07 +0100 Subject: [PATCH 1/3] #698 Fix the backfill logic for late jobs. --- .../splitter/ScheduleStrategySourcing.scala | 21 ++++++++++--------- .../TaskRunnerMultithreadedSuite.scala | 9 ++++---- .../splitter/ScheduleStrategySuite.scala | 17 ++++++++++----- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala index 92d4b846..1aecc44d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala @@ -61,8 +61,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat val lastProcessedDate = bookkeeper.getLatestProcessedDate(outputTable, Option(infoDate)) lastProcessedDate.foreach(d => log.info(s"Last processed info date: $d")) - val backfillDates = getBackFillDays(outputTable, runDate, backfillDays, trackDays, lastProcessedDate, schedule, infoDateExpression, bookkeeper) - .map(d => pipeline.TaskPreDef(d, TaskRunReason.Late)) + val backfillDates = if (lateOnly) { + val lateDaysToCheck = Math.max(backfillDays, trackDays) + getBackFillDays(outputTable, runDate, lateDaysToCheck, 0, schedule, infoDateExpression, bookkeeper) + .map(d => pipeline.TaskPreDef(d, TaskRunReason.Late)) + } else { + getBackFillDays(outputTable, runDate, backfillDays, trackDays, schedule, infoDateExpression, bookkeeper) + .map(d => pipeline.TaskPreDef(d, TaskRunReason.Late)) + } val newDaysOrig = if (!lateOnly) { getNew(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression).toList @@ -75,7 +81,7 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat case _ => newDaysOrig } - val lateDaysOrig = if (!newOnly) { + val lateDaysOrig = if (!newOnly && (backfillDays == -1 || lastProcessedDate.isEmpty)) { getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, lastProcessedDate) } else { Nil @@ -113,19 +119,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat runDate: LocalDate, backfillDays: Int, trackDays: Int, - lastProcessedDate: Option[LocalDate], schedule: Schedule, initialSourcingDateExpr: String, bookkeeper: Bookkeeper): Seq[LocalDate] = { // If backfillDays == 0, backfill is disabled // If trackDays > backfillDays, track days supersede backfill with checks for retrospective updates - if (backfillDays == 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty + if (backfillDays <= 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty - val backfillStart = if (backfillDays < 0) { - lastProcessedDate.getOrElse(runDate) - } else { - runDate.minusDays(backfillDays - 1) - } + val backfillStart = runDate.minusDays(backfillDays - 1) if (backfillStart.isEqual(runDate)) return Seq.empty diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala index ecee53c7..eb1403b2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala @@ -61,7 +61,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { } "handle a successful multiple task job parallel execution" in { - val (runner, bk, state, job) = getUseCase(runDate.plusDays(1)) + val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), backfillDays = 2) runner.runJob(job) @@ -87,7 +87,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { } "handle a successful multiple task job sequential execution" in { - val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false) + val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false, backfillDays = 2) runner.runJob(job) @@ -176,7 +176,8 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { runFunction: () => RunResult = () => RunResult(exampleDf), consumeThreads: Int = 1, allowParallel: Boolean = true, - parallelTasks: Int = 1 + parallelTasks: Int = 1, + backfillDays: Int = 1, ): (ConcurrentJobRunnerImpl, Bookkeeper, PipelineStateSpy, Job) = { val conf = ConfigFactory.empty() @@ -193,7 +194,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { val stats = MetaTableStats(Some(2), None, Some(100)) val operationDef = OperationDefFactory.getDummyOperationDef(consumeThreads = consumeThreads) - val job = new JobSpy(runFunction = runFunction, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel) + val job = new JobSpy(runFunction = runFunction, jobBackfillDays = backfillDays, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel) val taskRunner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, tokenLockFactory, state, runtimeConfig, "app_123") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala index af80a301..02b2527e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala @@ -132,8 +132,9 @@ class ScheduleStrategySuite extends AnyWordSpec { val bk = mock(classOf[Bookkeeper]) when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2))) + when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq.empty) - val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = false, lateOnly = true) + val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = false, lateOnly = true) val expected = Seq(runDate.minusDays(1)) .map(d => pipeline.TaskPreDef(d, TaskRunReason.Late)) @@ -163,7 +164,8 @@ class ScheduleStrategySuite extends AnyWordSpec { when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2))) - val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = true, lateOnly = true) + val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = true, lateOnly = true) + when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1))) val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate) @@ -335,10 +337,11 @@ class ScheduleStrategySuite extends AnyWordSpec { "default behavior with more than 1 day late" in { val minimumDate = LocalDate.parse("2022-07-01") val runDate = LocalDate.parse("2022-07-14") - val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false) + val params = ScheduleParams.Normal(runDate, 7, 0, 0, newOnly = false, lateOnly = false) val bk = mock(classOf[Bookkeeper]) when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-09")))).thenReturn(Some(LocalDate.parse("2022-07-05"))) + when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty) val expected = Seq( pipeline.TaskPreDef(LocalDate.of(2022, 7, 9), TaskRunReason.Late) @@ -354,7 +357,8 @@ class ScheduleStrategySuite extends AnyWordSpec { when(bk.getLatestProcessedDate(outputTable, Some(runDate.minusDays(1)))).thenReturn(Some(runDate.minusDays(30))) - val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false) + val params = ScheduleParams.Normal(runDate, 2, 7, 0, newOnly = false, lateOnly = false) + when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-07"), LocalDate.parse("2022-07-20"))).thenReturn(Seq.empty) val result = strategySnapshot.getDaysToRun(outputTable, dependencies, bk, "@runDate - 1", schedule, params, initialSourcingDateExpr, minimumDate) @@ -368,6 +372,7 @@ class ScheduleStrategySuite extends AnyWordSpec { val bk = mock(classOf[Bookkeeper]) when(bk.getLatestProcessedDate(outputTable, Some(runDate.plusDays(1)))).thenReturn(Some(runDate.minusDays(9))) + when(bk.getDataAvailability(outputTable, nextSunday.minusDays(13), nextSunday.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1))) val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate) @@ -475,12 +480,14 @@ class ScheduleStrategySuite extends AnyWordSpec { "default behavior with a monthly job" in { val minimumDate = LocalDate.parse("2022-05-30") val runDate = LocalDate.parse("2022-07-14") - val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false) + val params = ScheduleParams.Normal(runDate, 0, 62, 0, newOnly = false, lateOnly = false) val bk = mock(classOf[Bookkeeper]) when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-01")))) .thenReturn(Some(LocalDate.parse("2022-05-01"))) + when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty) + val expected = Seq( pipeline.TaskPreDef(LocalDate.of(2022, 6, 1), TaskRunReason.Late), pipeline.TaskPreDef(LocalDate.of(2022, 7, 1), TaskRunReason.Late) From 4c0a563d60062ea77d74e68393e8b61f08253962 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 28 Jan 2026 09:16:15 +0100 Subject: [PATCH 2/3] Fix Scala 2.11 compilation and update to the latest Spark 3.5.8. --- .github/workflows/scala.yml | 6 ++---- .../runner/jobrunner/TaskRunnerMultithreadedSuite.scala | 2 +- pramen/pom.xml | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 15b939a5..f3c8ddc7 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -23,14 +23,12 @@ jobs: fail-fast: false matrix: scala: [2.11.12, 2.12.20, 2.13.16] - spark: [2.4.8, 3.3.4, 3.4.4, 3.5.5] + spark: [2.4.8, 3.4.4, 3.5.7] exclude: - - scala: 2.11.12 - spark: 3.3.4 - scala: 2.11.12 spark: 3.4.4 - scala: 2.11.12 - spark: 3.5.5 + spark: 3.5.7 - scala: 2.12.20 spark: 2.4.8 - scala: 2.13.16 diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala index eb1403b2..65810485 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala @@ -177,7 +177,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { consumeThreads: Int = 1, allowParallel: Boolean = true, parallelTasks: Int = 1, - backfillDays: Int = 1, + backfillDays: Int = 1 ): (ConcurrentJobRunnerImpl, Bookkeeper, PipelineStateSpy, Job) = { val conf = ConfigFactory.empty() diff --git a/pramen/pom.xml b/pramen/pom.xml index 797bff4b..46ab6534 100644 --- a/pramen/pom.xml +++ b/pramen/pom.xml @@ -111,7 +111,7 @@ 2.12.20 2.12 - 3.5.7 + 3.5.8 3.5 3.3.6 3.3.2 From f1100da5988f6f599c39f3641538cc9a827179bf Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 28 Jan 2026 09:47:52 +0100 Subject: [PATCH 3/3] Fix Scala 2.11 compilation and update to the latest Spark 3.5.8. --- .github/workflows/scala.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index f3c8ddc7..922e7cbc 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -23,12 +23,12 @@ jobs: fail-fast: false matrix: scala: [2.11.12, 2.12.20, 2.13.16] - spark: [2.4.8, 3.4.4, 3.5.7] + spark: [2.4.8, 3.4.4, 3.5.5] exclude: - scala: 2.11.12 spark: 3.4.4 - scala: 2.11.12 - spark: 3.5.7 + spark: 3.5.5 - scala: 2.12.20 spark: 2.4.8 - scala: 2.13.16