Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ case class RuntimeInfo(
isNewOnly: Boolean = false, // If true, the pipeline runs without catching up late data mode.
isLateOnly: Boolean = false, // If true, the pipeline runs in catching up late data only mode.
minRps: Int = 0, // Configured records per second that is considered bad if the actual rps is lower, ignored if 0.
goodRps: Int = 0 // Configured records per second that is considered very good if the actual rps is higher, ignored if 0.
goodRps: Int = 0, // Configured records per second that is considered very good if the actual rps is higher, ignored if 0.
attempt: Int = 1, // Current attempt number for the pipeline run (for auto-retry automation).
maxAttempts: Int = 1 // Maximum number of attempts allowed for the pipeline run.
)
6 changes: 6 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ pramen {
# If true, jobs that are already running and hold a lock will be skipped instead of throwing an error
runtime.skip.locked = false

# Current attempt number for the pipeline run (for auto-retry automation)
runtime.attempt = 1

# Maximum number of attempts allowed for the pipeline run
runtime.max.attempts = 1

# Send an email even if there are no changes and no late or not ready data
email.if.no.changes = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ case class RuntimeConfig(
allowEmptyPipeline: Boolean,
alwaysAddBatchIdColumn: Boolean,
historicalRunMode: RunMode,
sparkAppDescriptionTemplate: Option[String]
sparkAppDescriptionTemplate: Option[String],
attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation)
maxAttempts: Int // Maximum number of attempts allowed for the pipeline run
)

object RuntimeConfig {
Expand All @@ -72,6 +74,8 @@ object RuntimeConfig {
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
val ALWAYS_ADD_BATCHID_COLUMN = "pramen.always.add.batchid.column"
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"
val ATTEMPT = "pramen.runtime.attempt"
val MAX_ATTEMPTS = "pramen.runtime.max.attempts"

def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
Expand Down Expand Up @@ -136,6 +140,8 @@ object RuntimeConfig {
val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
val alwaysAddBatchIdColumn = ConfigUtils.getOptionBoolean(conf, ALWAYS_ADD_BATCHID_COLUMN).getOrElse(false)
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)
val attempt = ConfigUtils.getOptionInt(conf, ATTEMPT).getOrElse(1)
val maxAttempts = ConfigUtils.getOptionInt(conf, MAX_ATTEMPTS).getOrElse(1)

RuntimeConfig(
isDryRun = isDryRun,
Expand All @@ -155,7 +161,9 @@ object RuntimeConfig {
allowEmptyPipeline,
alwaysAddBatchIdColumn,
runMode,
sparkAppDescriptionTemplate
sparkAppDescriptionTemplate,
attempt,
maxAttempts
)
}

Expand All @@ -178,7 +186,9 @@ object RuntimeConfig {
allowEmptyPipeline = false,
alwaysAddBatchIdColumn = false,
historicalRunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate = None
sparkAppDescriptionTemplate = None,
attempt = 1,
maxAttempts = 1
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
introParagraph.withText(". ")

runtimeInfo.foreach { c =>
val executionInfoParagraph = renderExecutionInfo(c.runDateFrom, c.runDateTo, c.isRerun, c.isNewOnly, c.isLateOnly)
val executionInfoParagraph = renderExecutionInfo(c.runDateFrom, c.runDateTo, c.isRerun, c.isNewOnly, c.isLateOnly, c.attempt, c.maxAttempts)
.withText(". ")
.paragraph
introParagraph
Expand Down Expand Up @@ -274,7 +274,9 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
runDateTo: Option[LocalDate],
isRerun: Boolean,
isNewOnly: Boolean,
isLateOnly: Boolean): ParagraphBuilder = {
isLateOnly: Boolean,
attempt: Int,
maxAttempts: Int): ParagraphBuilder = {
val executionStr = if (isRerun) "Re-run execution" else "Execution"
val datesStr = runDateTo match {
case Some(dateTo) => s"the period from <b>$runDateFrom</b> to <b>$dateTo</b>"
Expand All @@ -283,11 +285,12 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

val newOnlyDescription = if (isNewOnly) " (only new data)" else ""
val lateOnlyDescription = if (isLateOnly) " (only late data)" else ""
val attemptDescription = if (maxAttempts > 1) s" (attempt <b>$attempt</b> of <b>$maxAttempts</b>)" else ""

ParagraphBuilder()
.withText(executionStr)
.withText(" for ")
.withText(datesStr + newOnlyDescription + lateOnlyDescription)
.withText(datesStr + newOnlyDescription + lateOnlyDescription + attemptDescription)
}

private[core] def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
runtimeConfig.checkOnlyNewData,
runtimeConfig.checkOnlyLateData,
minRps,
goodRps
goodRps,
runtimeConfig.attempt,
runtimeConfig.maxAttempts
),
startedInstant,
finishedInstant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ object RuntimeConfigFactory {
allowEmptyPipeline: Boolean = false,
alwaysAddBatchIdColumn: Boolean = false,
historicalRunMode: RunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = {
sparkAppDescriptionTemplate: Option[String] = None,
attempt: Int = 1,
maxAttempts: Int = 1): RuntimeConfig = {
RuntimeConfig(isDryRun,
isRerun,
runTables,
Expand All @@ -58,7 +60,9 @@ object RuntimeConfigFactory {
allowEmptyPipeline,
alwaysAddBatchIdColumn,
historicalRunMode,
sparkAppDescriptionTemplate)
sparkAppDescriptionTemplate,
attempt,
maxAttempts)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class RuntimeConfigSuite extends AnyWordSpec {
| is.rerun = true
| inverse.order = true
| run.tables = [ tbl1, tbl2 ]
| attempt = 2
| max.attempts = 5
| }
| undercover = true
| use.lock = false
Expand Down Expand Up @@ -65,6 +67,8 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.parallelTasks == 4)
assert(runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template"))
assert(runtimeConfig.attempt == 2)
assert(runtimeConfig.maxAttempts == 5)
}

"have default values" in {
Expand All @@ -86,6 +90,8 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.parallelTasks == 1)
assert(!runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty)
assert(runtimeConfig.attempt == 1)
assert(runtimeConfig.maxAttempts == 1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,67 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis
}
}

"renderExecutionInfo" should {
"render execution info without attempt information when maxAttempts is 1" in {
val builder = getBuilder()

val result = builder.renderExecutionInfo(
LocalDate.parse("2022-02-18"),
None,
isRerun = false,
isNewOnly = false,
isLateOnly = false,
attempt = 1,
maxAttempts = 1
)

val paragraph = result.paragraph

assert(paragraph.exists(_.text == "Execution"))
assert(paragraph.exists(_.text == " for "))
assert(paragraph.exists(_.text == "the run date <b>2022-02-18</b>"))
assert(!paragraph.exists(_.text.contains("attempt")))
}

"render execution info with attempt information when maxAttempts is greater than 1" in {
val builder = getBuilder()

val result = builder.renderExecutionInfo(
LocalDate.parse("2022-02-18"),
None,
isRerun = false,
isNewOnly = false,
isLateOnly = false,
attempt = 2,
maxAttempts = 5
)

val paragraph = result.paragraph

assert(paragraph.exists(_.text == "Execution"))
assert(paragraph.exists(_.text.contains("attempt <b>2</b> of <b>5</b>")))
}

"render execution info with attempt and period" in {
val builder = getBuilder()

val result = builder.renderExecutionInfo(
LocalDate.parse("2022-02-18"),
Some(LocalDate.parse("2022-02-25")),
isRerun = true,
isNewOnly = false,
isLateOnly = false,
attempt = 3,
maxAttempts = 10
)

val paragraph = result.paragraph

assert(paragraph.exists(_.text == "Re-run execution"))
assert(paragraph.exists(_.text.contains("attempt <b>3</b> of <b>10</b>")))
}
}

def getBuilder(conf: Config = emptyConfig,
structFailure: Boolean = false): PipelineNotificationBuilderHtml = {
implicit val implicitConfig: Config =
Expand Down
Loading