diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala index 28fe8c7c..9788deba 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala @@ -30,5 +30,7 @@ case class RuntimeInfo( isNewOnly: Boolean = false, // If true, the pipeline runs without catching up late data mode. isLateOnly: Boolean = false, // If true, the pipeline runs in catching up late data only mode. minRps: Int = 0, // Configured records per second that is considered bad if the actual rps is lower, ignored if 0. - goodRps: Int = 0 // Configured records per second that is considered very good if the actual rps is higher, ignored if 0. + goodRps: Int = 0, // Configured records per second that is considered very good if the actual rps is higher, ignored if 0. + attempt: Int = 1, // Current attempt number for the pipeline run (for auto-retry automation). + maxAttempts: Int = 1 // Maximum number of attempts allowed for the pipeline run. ) diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf index db3af771..e14a5968 100644 --- a/pramen/core/src/main/resources/reference.conf +++ b/pramen/core/src/main/resources/reference.conf @@ -106,6 +106,12 @@ pramen { # If true, jobs that are already running and hold a lock will be skipped instead of throwing an error runtime.skip.locked = false + # Current attempt number for the pipeline run (for auto-retry automation) + runtime.attempt = 1 + + # Maximum number of attempts allowed for the pipeline run + runtime.max.attempts = 1 + # Send an email even if there are no changes and no late or not ready data email.if.no.changes = true diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index 24e634ee..b886ae84 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -45,7 +45,9 @@ case class RuntimeConfig( allowEmptyPipeline: Boolean, alwaysAddBatchIdColumn: Boolean, historicalRunMode: RunMode, - sparkAppDescriptionTemplate: Option[String] + sparkAppDescriptionTemplate: Option[String], + attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation) + maxAttempts: Int // Maximum number of attempts allowed for the pipeline run ) object RuntimeConfig { @@ -72,6 +74,8 @@ object RuntimeConfig { val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline" val ALWAYS_ADD_BATCHID_COLUMN = "pramen.always.add.batchid.column" val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template" + val ATTEMPT = "pramen.runtime.attempt" + val MAX_ATTEMPTS = "pramen.runtime.max.attempts" def fromConfig(conf: Config): RuntimeConfig = { val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP) @@ -136,6 +140,8 @@ object RuntimeConfig { val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false) val alwaysAddBatchIdColumn = ConfigUtils.getOptionBoolean(conf, ALWAYS_ADD_BATCHID_COLUMN).getOrElse(false) val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE) + val attempt = ConfigUtils.getOptionInt(conf, ATTEMPT).getOrElse(1) + val maxAttempts = ConfigUtils.getOptionInt(conf, MAX_ATTEMPTS).getOrElse(1) RuntimeConfig( isDryRun = isDryRun, @@ -155,7 +161,9 @@ object RuntimeConfig { allowEmptyPipeline, alwaysAddBatchIdColumn, runMode, - sparkAppDescriptionTemplate + sparkAppDescriptionTemplate, + attempt, + maxAttempts ) } @@ -178,7 +186,9 @@ object RuntimeConfig { allowEmptyPipeline = false, alwaysAddBatchIdColumn = false, historicalRunMode = RunMode.CheckUpdates, - sparkAppDescriptionTemplate = None + sparkAppDescriptionTemplate = None, + attempt = 1, + maxAttempts = 1 ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index b96be711..449bfc24 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -213,7 +213,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot introParagraph.withText(". ") runtimeInfo.foreach { c => - val executionInfoParagraph = renderExecutionInfo(c.runDateFrom, c.runDateTo, c.isRerun, c.isNewOnly, c.isLateOnly) + val executionInfoParagraph = renderExecutionInfo(c.runDateFrom, c.runDateTo, c.isRerun, c.isNewOnly, c.isLateOnly, c.attempt, c.maxAttempts) .withText(". ") .paragraph introParagraph @@ -274,7 +274,9 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot runDateTo: Option[LocalDate], isRerun: Boolean, isNewOnly: Boolean, - isLateOnly: Boolean): ParagraphBuilder = { + isLateOnly: Boolean, + attempt: Int, + maxAttempts: Int): ParagraphBuilder = { val executionStr = if (isRerun) "Re-run execution" else "Execution" val datesStr = runDateTo match { case Some(dateTo) => s"the period from $runDateFrom to $dateTo" @@ -283,11 +285,12 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val newOnlyDescription = if (isNewOnly) " (only new data)" else "" val lateOnlyDescription = if (isLateOnly) " (only late data)" else "" + val attemptDescription = if (maxAttempts > 1) s" (attempt $attempt of $maxAttempts)" else "" ParagraphBuilder() .withText(executionStr) .withText(" for ") - .withText(datesStr + newOnlyDescription + lateOnlyDescription) + .withText(datesStr + newOnlyDescription + lateOnlyDescription + attemptDescription) } private[core] def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index 5f0e7955..e2c9153e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -129,7 +129,9 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification runtimeConfig.checkOnlyNewData, runtimeConfig.checkOnlyLateData, minRps, - goodRps + goodRps, + runtimeConfig.attempt, + runtimeConfig.maxAttempts ), startedInstant, finishedInstant, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index fd185bb3..80d36158 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -40,7 +40,9 @@ object RuntimeConfigFactory { allowEmptyPipeline: Boolean = false, alwaysAddBatchIdColumn: Boolean = false, historicalRunMode: RunMode = RunMode.CheckUpdates, - sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = { + sparkAppDescriptionTemplate: Option[String] = None, + attempt: Int = 1, + maxAttempts: Int = 1): RuntimeConfig = { RuntimeConfig(isDryRun, isRerun, runTables, @@ -58,7 +60,9 @@ object RuntimeConfigFactory { allowEmptyPipeline, alwaysAddBatchIdColumn, historicalRunMode, - sparkAppDescriptionTemplate) + sparkAppDescriptionTemplate, + attempt, + maxAttempts) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala index d590e884..c8b39afa 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala @@ -31,6 +31,8 @@ class RuntimeConfigSuite extends AnyWordSpec { | is.rerun = true | inverse.order = true | run.tables = [ tbl1, tbl2 ] + | attempt = 2 + | max.attempts = 5 | } | undercover = true | use.lock = false @@ -65,6 +67,8 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.parallelTasks == 4) assert(runtimeConfig.stopSparkSession) assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template")) + assert(runtimeConfig.attempt == 2) + assert(runtimeConfig.maxAttempts == 5) } "have default values" in { @@ -86,6 +90,8 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.parallelTasks == 1) assert(!runtimeConfig.stopSparkSession) assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty) + assert(runtimeConfig.attempt == 1) + assert(runtimeConfig.maxAttempts == 1) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala index 40c9922a..c7f6cce7 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala @@ -684,6 +684,67 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis } } + "renderExecutionInfo" should { + "render execution info without attempt information when maxAttempts is 1" in { + val builder = getBuilder() + + val result = builder.renderExecutionInfo( + LocalDate.parse("2022-02-18"), + None, + isRerun = false, + isNewOnly = false, + isLateOnly = false, + attempt = 1, + maxAttempts = 1 + ) + + val paragraph = result.paragraph + + assert(paragraph.exists(_.text == "Execution")) + assert(paragraph.exists(_.text == " for ")) + assert(paragraph.exists(_.text == "the run date 2022-02-18")) + assert(!paragraph.exists(_.text.contains("attempt"))) + } + + "render execution info with attempt information when maxAttempts is greater than 1" in { + val builder = getBuilder() + + val result = builder.renderExecutionInfo( + LocalDate.parse("2022-02-18"), + None, + isRerun = false, + isNewOnly = false, + isLateOnly = false, + attempt = 2, + maxAttempts = 5 + ) + + val paragraph = result.paragraph + + assert(paragraph.exists(_.text == "Execution")) + assert(paragraph.exists(_.text.contains("attempt 2 of 5"))) + } + + "render execution info with attempt and period" in { + val builder = getBuilder() + + val result = builder.renderExecutionInfo( + LocalDate.parse("2022-02-18"), + Some(LocalDate.parse("2022-02-25")), + isRerun = true, + isNewOnly = false, + isLateOnly = false, + attempt = 3, + maxAttempts = 10 + ) + + val paragraph = result.paragraph + + assert(paragraph.exists(_.text == "Re-run execution")) + assert(paragraph.exists(_.text.contains("attempt 3 of 10"))) + } + } + def getBuilder(conf: Config = emptyConfig, structFailure: Boolean = false): PipelineNotificationBuilderHtml = { implicit val implicitConfig: Config =