From 478ef42b697cf4ebf2e4b8dee97418ea0517064b Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 23 Feb 2026 08:19:45 +0000
Subject: [PATCH 1/3] Initial plan
From 025a7d0a615c38af9f7411e611f8eb72ff7439d8 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 23 Feb 2026 08:23:36 +0000
Subject: [PATCH 2/3] Add attempt and maxAttempts configuration support to
RuntimeConfig and RuntimeInfo
Co-authored-by: yruslan <4082463+yruslan@users.noreply.github.com>
---
.../co/absa/pramen/api/status/RuntimeInfo.scala | 4 +++-
pramen/core/src/main/resources/reference.conf | 6 ++++++
.../pramen/core/app/config/RuntimeConfig.scala | 16 +++++++++++++---
.../PipelineNotificationBuilderHtml.scala | 9 ++++++---
.../pramen/core/state/PipelineStateImpl.scala | 4 +++-
5 files changed, 31 insertions(+), 8 deletions(-)
diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala
index 28fe8c7c8..9788debaf 100644
--- a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala
+++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RuntimeInfo.scala
@@ -30,5 +30,7 @@ case class RuntimeInfo(
isNewOnly: Boolean = false, // If true, the pipeline runs without catching up late data mode.
isLateOnly: Boolean = false, // If true, the pipeline runs in catching up late data only mode.
minRps: Int = 0, // Configured records per second that is considered bad if the actual rps is lower, ignored if 0.
- goodRps: Int = 0 // Configured records per second that is considered very good if the actual rps is higher, ignored if 0.
+ goodRps: Int = 0, // Configured records per second that is considered very good if the actual rps is higher, ignored if 0.
+ attempt: Int = 1, // Current attempt number for the pipeline run (for auto-retry automation).
+ maxAttempts: Int = 1 // Maximum number of attempts allowed for the pipeline run.
)
diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf
index db3af7713..e14a5968b 100644
--- a/pramen/core/src/main/resources/reference.conf
+++ b/pramen/core/src/main/resources/reference.conf
@@ -106,6 +106,12 @@ pramen {
# If true, jobs that are already running and hold a lock will be skipped instead of throwing an error
runtime.skip.locked = false
+ # Current attempt number for the pipeline run (for auto-retry automation)
+ runtime.attempt = 1
+
+ # Maximum number of attempts allowed for the pipeline run
+ runtime.max.attempts = 1
+
# Send an email even if there are no changes and no late or not ready data
email.if.no.changes = true
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
index 24e634ee5..b886ae849 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
@@ -45,7 +45,9 @@ case class RuntimeConfig(
allowEmptyPipeline: Boolean,
alwaysAddBatchIdColumn: Boolean,
historicalRunMode: RunMode,
- sparkAppDescriptionTemplate: Option[String]
+ sparkAppDescriptionTemplate: Option[String],
+ attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation)
+ maxAttempts: Int // Maximum number of attempts allowed for the pipeline run
)
object RuntimeConfig {
@@ -72,6 +74,8 @@ object RuntimeConfig {
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
val ALWAYS_ADD_BATCHID_COLUMN = "pramen.always.add.batchid.column"
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"
+ val ATTEMPT = "pramen.runtime.attempt"
+ val MAX_ATTEMPTS = "pramen.runtime.max.attempts"
def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
@@ -136,6 +140,8 @@ object RuntimeConfig {
val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
val alwaysAddBatchIdColumn = ConfigUtils.getOptionBoolean(conf, ALWAYS_ADD_BATCHID_COLUMN).getOrElse(false)
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)
+ val attempt = ConfigUtils.getOptionInt(conf, ATTEMPT).getOrElse(1)
+ val maxAttempts = ConfigUtils.getOptionInt(conf, MAX_ATTEMPTS).getOrElse(1)
RuntimeConfig(
isDryRun = isDryRun,
@@ -155,7 +161,9 @@ object RuntimeConfig {
allowEmptyPipeline,
alwaysAddBatchIdColumn,
runMode,
- sparkAppDescriptionTemplate
+ sparkAppDescriptionTemplate,
+ attempt,
+ maxAttempts
)
}
@@ -178,7 +186,9 @@ object RuntimeConfig {
allowEmptyPipeline = false,
alwaysAddBatchIdColumn = false,
historicalRunMode = RunMode.CheckUpdates,
- sparkAppDescriptionTemplate = None
+ sparkAppDescriptionTemplate = None,
+ attempt = 1,
+ maxAttempts = 1
)
}
}
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
index b96be7117..449bfc24b 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala
@@ -213,7 +213,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
introParagraph.withText(". ")
runtimeInfo.foreach { c =>
- val executionInfoParagraph = renderExecutionInfo(c.runDateFrom, c.runDateTo, c.isRerun, c.isNewOnly, c.isLateOnly)
+ val executionInfoParagraph = renderExecutionInfo(c.runDateFrom, c.runDateTo, c.isRerun, c.isNewOnly, c.isLateOnly, c.attempt, c.maxAttempts)
.withText(". ")
.paragraph
introParagraph
@@ -274,7 +274,9 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
runDateTo: Option[LocalDate],
isRerun: Boolean,
isNewOnly: Boolean,
- isLateOnly: Boolean): ParagraphBuilder = {
+ isLateOnly: Boolean,
+ attempt: Int,
+ maxAttempts: Int): ParagraphBuilder = {
val executionStr = if (isRerun) "Re-run execution" else "Execution"
val datesStr = runDateTo match {
case Some(dateTo) => s"the period from $runDateFrom to $dateTo"
@@ -283,11 +285,12 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val newOnlyDescription = if (isNewOnly) " (only new data)" else ""
val lateOnlyDescription = if (isLateOnly) " (only late data)" else ""
+ val attemptDescription = if (maxAttempts > 1) s" (attempt $attempt of $maxAttempts)" else ""
ParagraphBuilder()
.withText(executionStr)
.withText(" for ")
- .withText(datesStr + newOnlyDescription + lateOnlyDescription)
+ .withText(datesStr + newOnlyDescription + lateOnlyDescription + attemptDescription)
}
private[core] def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = {
diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
index 5f0e79551..e2c9153e9 100644
--- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
+++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
@@ -129,7 +129,9 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
runtimeConfig.checkOnlyNewData,
runtimeConfig.checkOnlyLateData,
minRps,
- goodRps
+ goodRps,
+ runtimeConfig.attempt,
+ runtimeConfig.maxAttempts
),
startedInstant,
finishedInstant,
From fd27f559b6ddd6767f0559882da2e092f3098565 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Mon, 23 Feb 2026 08:32:21 +0000
Subject: [PATCH 3/3] Add tests for attempt configuration and notification
rendering
Co-authored-by: yruslan <4082463+yruslan@users.noreply.github.com>
---
.../pramen/core/RuntimeConfigFactory.scala | 8 ++-
.../core/app/config/RuntimeConfigSuite.scala | 6 ++
...PipelineNotificationBuilderHtmlSuite.scala | 61 +++++++++++++++++++
3 files changed, 73 insertions(+), 2 deletions(-)
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala
index fd185bb33..80d361588 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala
@@ -40,7 +40,9 @@ object RuntimeConfigFactory {
allowEmptyPipeline: Boolean = false,
alwaysAddBatchIdColumn: Boolean = false,
historicalRunMode: RunMode = RunMode.CheckUpdates,
- sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = {
+ sparkAppDescriptionTemplate: Option[String] = None,
+ attempt: Int = 1,
+ maxAttempts: Int = 1): RuntimeConfig = {
RuntimeConfig(isDryRun,
isRerun,
runTables,
@@ -58,7 +60,9 @@ object RuntimeConfigFactory {
allowEmptyPipeline,
alwaysAddBatchIdColumn,
historicalRunMode,
- sparkAppDescriptionTemplate)
+ sparkAppDescriptionTemplate,
+ attempt,
+ maxAttempts)
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala
index d590e8845..c8b39afa4 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala
@@ -31,6 +31,8 @@ class RuntimeConfigSuite extends AnyWordSpec {
| is.rerun = true
| inverse.order = true
| run.tables = [ tbl1, tbl2 ]
+ | attempt = 2
+ | max.attempts = 5
| }
| undercover = true
| use.lock = false
@@ -65,6 +67,8 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.parallelTasks == 4)
assert(runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template"))
+ assert(runtimeConfig.attempt == 2)
+ assert(runtimeConfig.maxAttempts == 5)
}
"have default values" in {
@@ -86,6 +90,8 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.parallelTasks == 1)
assert(!runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty)
+ assert(runtimeConfig.attempt == 1)
+ assert(runtimeConfig.maxAttempts == 1)
}
}
diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala
index 40c9922af..c7f6cce74 100644
--- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala
+++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala
@@ -684,6 +684,67 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis
}
}
+ "renderExecutionInfo" should {
+ "render execution info without attempt information when maxAttempts is 1" in {
+ val builder = getBuilder()
+
+ val result = builder.renderExecutionInfo(
+ LocalDate.parse("2022-02-18"),
+ None,
+ isRerun = false,
+ isNewOnly = false,
+ isLateOnly = false,
+ attempt = 1,
+ maxAttempts = 1
+ )
+
+ val paragraph = result.paragraph
+
+ assert(paragraph.exists(_.text == "Execution"))
+ assert(paragraph.exists(_.text == " for "))
+ assert(paragraph.exists(_.text == "the run date 2022-02-18"))
+ assert(!paragraph.exists(_.text.contains("attempt")))
+ }
+
+ "render execution info with attempt information when maxAttempts is greater than 1" in {
+ val builder = getBuilder()
+
+ val result = builder.renderExecutionInfo(
+ LocalDate.parse("2022-02-18"),
+ None,
+ isRerun = false,
+ isNewOnly = false,
+ isLateOnly = false,
+ attempt = 2,
+ maxAttempts = 5
+ )
+
+ val paragraph = result.paragraph
+
+ assert(paragraph.exists(_.text == "Execution"))
+ assert(paragraph.exists(_.text.contains("attempt 2 of 5")))
+ }
+
+ "render execution info with attempt and period" in {
+ val builder = getBuilder()
+
+ val result = builder.renderExecutionInfo(
+ LocalDate.parse("2022-02-18"),
+ Some(LocalDate.parse("2022-02-25")),
+ isRerun = true,
+ isNewOnly = false,
+ isLateOnly = false,
+ attempt = 3,
+ maxAttempts = 10
+ )
+
+ val paragraph = result.paragraph
+
+ assert(paragraph.exists(_.text == "Re-run execution"))
+ assert(paragraph.exists(_.text.contains("attempt 3 of 10")))
+ }
+ }
+
def getBuilder(conf: Config = emptyConfig,
structFailure: Boolean = false): PipelineNotificationBuilderHtml = {
implicit val implicitConfig: Config =