Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ jobs:
fail-fast: false
matrix:
scala: [2.11.12, 2.12.20, 2.13.16]
spark: [2.4.8, 3.3.4, 3.4.4, 3.5.5]
spark: [2.4.8, 3.4.4, 3.5.5]
exclude:
- scala: 2.11.12
spark: 3.3.4
- scala: 2.11.12
spark: 3.4.4
- scala: 2.11.12
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
val lastProcessedDate = bookkeeper.getLatestProcessedDate(outputTable, Option(infoDate))
lastProcessedDate.foreach(d => log.info(s"Last processed info date: $d"))

val backfillDates = getBackFillDays(outputTable, runDate, backfillDays, trackDays, lastProcessedDate, schedule, infoDateExpression, bookkeeper)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
val backfillDates = if (lateOnly) {
val lateDaysToCheck = Math.max(backfillDays, trackDays)
getBackFillDays(outputTable, runDate, lateDaysToCheck, 0, schedule, infoDateExpression, bookkeeper)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
} else {
getBackFillDays(outputTable, runDate, backfillDays, trackDays, schedule, infoDateExpression, bookkeeper)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
}

val newDaysOrig = if (!lateOnly) {
getNew(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression).toList
Expand All @@ -75,7 +81,7 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
case _ => newDaysOrig
}

val lateDaysOrig = if (!newOnly) {
val lateDaysOrig = if (!newOnly && (backfillDays == -1 || lastProcessedDate.isEmpty)) {
getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, lastProcessedDate)
} else {
Nil
Expand Down Expand Up @@ -113,19 +119,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
runDate: LocalDate,
backfillDays: Int,
trackDays: Int,
lastProcessedDate: Option[LocalDate],
schedule: Schedule,
initialSourcingDateExpr: String,
bookkeeper: Bookkeeper): Seq[LocalDate] = {
// If backfillDays == 0, backfill is disabled
// If trackDays > backfillDays, track days supersede backfill with checks for retrospective updates
if (backfillDays == 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty
if (backfillDays <= 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty

val backfillStart = if (backfillDays < 0) {
lastProcessedDate.getOrElse(runDate)
} else {
runDate.minusDays(backfillDays - 1)
}
val backfillStart = runDate.minusDays(backfillDays - 1)

if (backfillStart.isEqual(runDate)) return Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
}

"handle a successful multiple task job parallel execution" in {
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1))
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), backfillDays = 2)

runner.runJob(job)

Expand All @@ -87,7 +87,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
}

"handle a successful multiple task job sequential execution" in {
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false)
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false, backfillDays = 2)

runner.runJob(job)

Expand Down Expand Up @@ -176,7 +176,8 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
runFunction: () => RunResult = () => RunResult(exampleDf),
consumeThreads: Int = 1,
allowParallel: Boolean = true,
parallelTasks: Int = 1
parallelTasks: Int = 1,
backfillDays: Int = 1
): (ConcurrentJobRunnerImpl, Bookkeeper, PipelineStateSpy, Job) = {
val conf = ConfigFactory.empty()

Expand All @@ -193,7 +194,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
val stats = MetaTableStats(Some(2), None, Some(100))

val operationDef = OperationDefFactory.getDummyOperationDef(consumeThreads = consumeThreads)
val job = new JobSpy(runFunction = runFunction, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel)
val job = new JobSpy(runFunction = runFunction, jobBackfillDays = backfillDays, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel)

val taskRunner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, tokenLockFactory, state, runtimeConfig, "app_123")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ class ScheduleStrategySuite extends AnyWordSpec {
val bk = mock(classOf[Bookkeeper])

when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))
when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq.empty)

val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = false, lateOnly = true)
val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = false, lateOnly = true)

val expected = Seq(runDate.minusDays(1))
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
Expand Down Expand Up @@ -163,7 +164,8 @@ class ScheduleStrategySuite extends AnyWordSpec {

when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))

val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = true, lateOnly = true)
val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = true, lateOnly = true)
when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1)))

val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate)

Expand Down Expand Up @@ -335,10 +337,11 @@ class ScheduleStrategySuite extends AnyWordSpec {
"default behavior with more than 1 day late" in {
val minimumDate = LocalDate.parse("2022-07-01")
val runDate = LocalDate.parse("2022-07-14")
val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
val params = ScheduleParams.Normal(runDate, 7, 0, 0, newOnly = false, lateOnly = false)

val bk = mock(classOf[Bookkeeper])
when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-09")))).thenReturn(Some(LocalDate.parse("2022-07-05")))
when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty)

val expected = Seq(
pipeline.TaskPreDef(LocalDate.of(2022, 7, 9), TaskRunReason.Late)
Expand All @@ -354,7 +357,8 @@ class ScheduleStrategySuite extends AnyWordSpec {

when(bk.getLatestProcessedDate(outputTable, Some(runDate.minusDays(1)))).thenReturn(Some(runDate.minusDays(30)))

val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
val params = ScheduleParams.Normal(runDate, 2, 7, 0, newOnly = false, lateOnly = false)
when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-07"), LocalDate.parse("2022-07-20"))).thenReturn(Seq.empty)

val result = strategySnapshot.getDaysToRun(outputTable, dependencies, bk, "@runDate - 1", schedule, params, initialSourcingDateExpr, minimumDate)

Expand All @@ -368,6 +372,7 @@ class ScheduleStrategySuite extends AnyWordSpec {

val bk = mock(classOf[Bookkeeper])
when(bk.getLatestProcessedDate(outputTable, Some(runDate.plusDays(1)))).thenReturn(Some(runDate.minusDays(9)))
when(bk.getDataAvailability(outputTable, nextSunday.minusDays(13), nextSunday.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1)))

val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate)

Expand Down Expand Up @@ -475,12 +480,14 @@ class ScheduleStrategySuite extends AnyWordSpec {
"default behavior with a monthly job" in {
val minimumDate = LocalDate.parse("2022-05-30")
val runDate = LocalDate.parse("2022-07-14")
val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
val params = ScheduleParams.Normal(runDate, 0, 62, 0, newOnly = false, lateOnly = false)

val bk = mock(classOf[Bookkeeper])
when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-01"))))
.thenReturn(Some(LocalDate.parse("2022-05-01")))

when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty)

val expected = Seq(
pipeline.TaskPreDef(LocalDate.of(2022, 6, 1), TaskRunReason.Late),
pipeline.TaskPreDef(LocalDate.of(2022, 7, 1), TaskRunReason.Late)
Expand Down
2 changes: 1 addition & 1 deletion pramen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@

<scala.version>2.12.20</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<spark.version>3.5.7</spark.version>
<spark.version>3.5.8</spark.version>
<spark.compat.version>3.5</spark.compat.version>
<hadoop.version>3.3.6</hadoop.version>
<delta.version>3.3.2</delta.version>
Expand Down
Loading