diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml
index 15b939a5..922e7cbc 100644
--- a/.github/workflows/scala.yml
+++ b/.github/workflows/scala.yml
@@ -23,10 +23,8 @@ jobs:
fail-fast: false
matrix:
scala: [2.11.12, 2.12.20, 2.13.16]
- spark: [2.4.8, 3.3.4, 3.4.4, 3.5.5]
+ spark: [2.4.8, 3.4.4, 3.5.5]
exclude:
- - scala: 2.11.12
- spark: 3.3.4
- scala: 2.11.12
spark: 3.4.4
- scala: 2.11.12
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala
index 92d4b846..1aecc44d 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala
@@ -61,8 +61,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
val lastProcessedDate = bookkeeper.getLatestProcessedDate(outputTable, Option(infoDate))
lastProcessedDate.foreach(d => log.info(s"Last processed info date: $d"))
- val backfillDates = getBackFillDays(outputTable, runDate, backfillDays, trackDays, lastProcessedDate, schedule, infoDateExpression, bookkeeper)
- .map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
+ val backfillDates = if (lateOnly) {
+ val lateDaysToCheck = Math.max(backfillDays, trackDays)
+ getBackFillDays(outputTable, runDate, lateDaysToCheck, 0, schedule, infoDateExpression, bookkeeper)
+ .map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
+ } else {
+ getBackFillDays(outputTable, runDate, backfillDays, trackDays, schedule, infoDateExpression, bookkeeper)
+ .map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
+ }
val newDaysOrig = if (!lateOnly) {
getNew(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression).toList
@@ -75,7 +81,7 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
case _ => newDaysOrig
}
- val lateDaysOrig = if (!newOnly) {
+ val lateDaysOrig = if (!newOnly && (backfillDays == -1 || lastProcessedDate.isEmpty)) {
getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, lastProcessedDate)
} else {
Nil
@@ -113,19 +119,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
runDate: LocalDate,
backfillDays: Int,
trackDays: Int,
- lastProcessedDate: Option[LocalDate],
schedule: Schedule,
initialSourcingDateExpr: String,
bookkeeper: Bookkeeper): Seq[LocalDate] = {
// If backfillDays == 0, backfill is disabled
// If trackDays > backfillDays, track days supersede backfill with checks for retrospective updates
- if (backfillDays == 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty
+ if (backfillDays <= 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty
- val backfillStart = if (backfillDays < 0) {
- lastProcessedDate.getOrElse(runDate)
- } else {
- runDate.minusDays(backfillDays - 1)
- }
+ val backfillStart = runDate.minusDays(backfillDays - 1)
if (backfillStart.isEqual(runDate)) return Seq.empty
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala
index ecee53c7..65810485 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala
@@ -61,7 +61,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
}
"handle a successful multiple task job parallel execution" in {
- val (runner, bk, state, job) = getUseCase(runDate.plusDays(1))
+ val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), backfillDays = 2)
runner.runJob(job)
@@ -87,7 +87,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
}
"handle a successful multiple task job sequential execution" in {
- val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false)
+ val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false, backfillDays = 2)
runner.runJob(job)
@@ -176,7 +176,8 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
runFunction: () => RunResult = () => RunResult(exampleDf),
consumeThreads: Int = 1,
allowParallel: Boolean = true,
- parallelTasks: Int = 1
+ parallelTasks: Int = 1,
+ backfillDays: Int = 1
): (ConcurrentJobRunnerImpl, Bookkeeper, PipelineStateSpy, Job) = {
val conf = ConfigFactory.empty()
@@ -193,7 +194,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
val stats = MetaTableStats(Some(2), None, Some(100))
val operationDef = OperationDefFactory.getDummyOperationDef(consumeThreads = consumeThreads)
- val job = new JobSpy(runFunction = runFunction, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel)
+ val job = new JobSpy(runFunction = runFunction, jobBackfillDays = backfillDays, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel)
val taskRunner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, tokenLockFactory, state, runtimeConfig, "app_123")
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala
index af80a301..02b2527e 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala
@@ -132,8 +132,9 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))
+ when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq.empty)
- val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = false, lateOnly = true)
+ val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = false, lateOnly = true)
val expected = Seq(runDate.minusDays(1))
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
@@ -163,7 +164,8 @@ class ScheduleStrategySuite extends AnyWordSpec {
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))
- val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = true, lateOnly = true)
+ val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = true, lateOnly = true)
+ when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1)))
val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate)
@@ -335,10 +337,11 @@ class ScheduleStrategySuite extends AnyWordSpec {
"default behavior with more than 1 day late" in {
val minimumDate = LocalDate.parse("2022-07-01")
val runDate = LocalDate.parse("2022-07-14")
- val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
+ val params = ScheduleParams.Normal(runDate, 7, 0, 0, newOnly = false, lateOnly = false)
val bk = mock(classOf[Bookkeeper])
when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-09")))).thenReturn(Some(LocalDate.parse("2022-07-05")))
+ when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty)
val expected = Seq(
pipeline.TaskPreDef(LocalDate.of(2022, 7, 9), TaskRunReason.Late)
@@ -354,7 +357,8 @@ class ScheduleStrategySuite extends AnyWordSpec {
when(bk.getLatestProcessedDate(outputTable, Some(runDate.minusDays(1)))).thenReturn(Some(runDate.minusDays(30)))
- val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
+ val params = ScheduleParams.Normal(runDate, 2, 7, 0, newOnly = false, lateOnly = false)
+ when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-07"), LocalDate.parse("2022-07-20"))).thenReturn(Seq.empty)
val result = strategySnapshot.getDaysToRun(outputTable, dependencies, bk, "@runDate - 1", schedule, params, initialSourcingDateExpr, minimumDate)
@@ -368,6 +372,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])
when(bk.getLatestProcessedDate(outputTable, Some(runDate.plusDays(1)))).thenReturn(Some(runDate.minusDays(9)))
+ when(bk.getDataAvailability(outputTable, nextSunday.minusDays(13), nextSunday.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1)))
val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate)
@@ -475,12 +480,14 @@ class ScheduleStrategySuite extends AnyWordSpec {
"default behavior with a monthly job" in {
val minimumDate = LocalDate.parse("2022-05-30")
val runDate = LocalDate.parse("2022-07-14")
- val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
+ val params = ScheduleParams.Normal(runDate, 0, 62, 0, newOnly = false, lateOnly = false)
val bk = mock(classOf[Bookkeeper])
when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-01"))))
.thenReturn(Some(LocalDate.parse("2022-05-01")))
+ when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty)
+
val expected = Seq(
pipeline.TaskPreDef(LocalDate.of(2022, 6, 1), TaskRunReason.Late),
pipeline.TaskPreDef(LocalDate.of(2022, 7, 1), TaskRunReason.Late)
diff --git a/pramen/pom.xml b/pramen/pom.xml
index 797bff4b..46ab6534 100644
--- a/pramen/pom.xml
+++ b/pramen/pom.xml
@@ -111,7 +111,7 @@
2.12.20
2.12
- 3.5.7
+ 3.5.8
3.5
3.3.6
3.3.2