Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ pramen {
wait.for.output.table.enabled = false
wait.for.output.table.seconds = 600

# How many days to check back for late data (when there is no data at the source for that day)
# 0 - never check previous days, even if no data at the source happened
# 1 - check only the current info date if you run the job more than once per day
# 2 - check the latest info date and the date before
# Backfill window for missing data when a source has no data for an info date.
# -1 - look back up to the previous successful info date with data
# 0 - do not look back at previous dates, even if the source is empty
# 1 - only recheck the current info date (useful if rerunning the job on the same day)
# 2 - recheck the current info date and the previous day
# etc...
# You can also set this parameter for individual tables in the metastore.
# NOTE. By default it is the same as track.days, so it won't take effect. In order for it to take effect you
# need to either increase 'backfill.days' and/or decrease 'track.days'
backfill.days = 5
# You can also set this parameter per table in the metastore.
# NOTE: By default this parameter is set to -1 to be compatible with the behavior before this option was introduced,
backfill.days = -1

# How many days to check back for retrospective updates
# 0 - never check for updates after the data is loaded
Expand Down Expand Up @@ -289,4 +289,3 @@ hadoop.redacted.tokens = [ password, secret, session.token, access.key ]
# java.security.auth.login.config = ""
# java.security.krb5.conf = ""
# javax.net.debug = ""

Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class OrchestratorImpl extends Orchestrator {
val infoDates = jobs.flatMap { job =>
if (job.operation.schedule.isEnabled(runDate)) {
val infoDateExpression = job.operation.outputInfoDateExpression
val infoDate = evaluateRunDate(runDate, infoDateExpression).toString
val infoDate = evaluateRunDate(runDate, infoDateExpression, logExpression = false).toString
Option(infoDate)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object ScheduleStrategyUtils {
bookkeeper: Bookkeeper
): List[TaskPreDef] = {
if (schedule.isEnabled(runDate)) {
val infoDate = evaluateRunDate(runDate, infoDateExpression)
val infoDate = evaluateRunDate(runDate, infoDateExpression, logExpression = false)

bookkeeper.getLatestDataChunk(outputTable, infoDate) match {
case Some(_) =>
Expand Down Expand Up @@ -82,7 +82,7 @@ object ScheduleStrategyUtils {
infoDateExpression: String
): Option[TaskPreDef] = {
if (schedule.isEnabled(runDate)) {
val infoDate = evaluateRunDate(runDate, infoDateExpression)
val infoDate = evaluateRunDate(runDate, infoDateExpression, logExpression = false)

log.info(s"For $outputTable $runDate is one of scheduled days. Adding infoDate = '$infoDateExpression' = $infoDate to check.")

Expand Down Expand Up @@ -216,7 +216,7 @@ object ScheduleStrategyUtils {
val end = dateTo.plusDays(1)
while (date.isBefore(end)) {
if (schedule.isEnabled(date)) {
val infoDate = evaluateRunDate(date, infoDateExpression)
val infoDate = evaluateRunDate(date, infoDateExpression, logExpression = false)
if (uniqueInfoDates.add(infoDate)) {
infoDates += infoDate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ object AlgorithmUtils {
if (backoffMaxMs > backoffMinMs && backoffMinMs > 0) {
val backoffMs = Random.nextInt(backoffMaxMs - backoffMinMs) + backoffMinMs
val backoffS = backoffMs / 1000
log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconds...")
log.error(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying in $backoffS seconds...")
Thread.sleep(backoffMs)
} else {
log.warn(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...")
log.error(s"Attempt failed: ${getErrorMessage(ex)}. Attempts left: $attemptsLeft. Retrying...")
}

actionWithRetry(attemptsLeft, log, backoffMinMs, backoffMaxMs)(action)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class InfoDateConfigSuite extends AnyWordSpec {
assert(runtimeConfig.columnName == "pramen_info_date")
assert(runtimeConfig.dateFormat == "yyyy-MM-dd")

assert(runtimeConfig.defaultBackfillDays == 5)
assert(runtimeConfig.defaultBackfillDays == -1)
assert(runtimeConfig.defaultTrackDays == 5)
assert(runtimeConfig.defaultDelayDays == 0)

Expand Down
Loading