From f30e33729039f01fd8c45d27c178e49ded96154a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 26 Mar 2026 14:21:33 +0900 Subject: [PATCH 1/3] Support disabling initial/periodic scan for specific URI schemes in SHS --- .../deploy/history/FsHistoryProvider.scala | 19 ++++- .../spark/internal/config/History.scala | 10 +++ .../history/FsHistoryProviderSuite.scala | 79 +++++++++++++++++++ docs/monitoring.md | 14 ++++ 4 files changed, 120 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 54bc290b3787f..6bcb9cc5519e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} import java.lang.{Long => JLong} -import java.util.{Date, NoSuchElementException, ServiceLoader} +import java.util.{Date, Locale, NoSuchElementException, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, TimeUnit} import java.util.zip.ZipOutputStream @@ -109,6 +109,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val logDirs = conf.get(History.HISTORY_LOG_DIR) .split(",").map(_.trim).filter(_.nonEmpty).toSeq + private val scanDisabledSchemes = conf.get(History.SCAN_DISABLED_SCHEMES) + .split(",").map(_.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).toSet + private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE) private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS) private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS) @@ -561,10 +564,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] def checkForLogs(): Unit = { val newLastScanTime = clock.getTimeMillis() val allNotStale = mutable.HashSet[String]() + val skippedDirs = mutable.ArrayBuffer[String]() logDirs.foreach { dir => try { - checkForLogsInDir(dir, newLastScanTime, allNotStale) + val scheme = logDirFs(dir).getUri.getScheme.toLowerCase(Locale.ROOT) + if (scanDisabledSchemes.contains(scheme)) { + logDebug(log"Skipping scan for directory ${MDC(HISTORY_DIR, dir)}" + + log" (scan disabled for its URI scheme)") + skippedDirs += dir + } else { + checkForLogsInDir(dir, newLastScanTime, allNotStale) + } } catch { case e: IOException => logError(log"Error checking for logs in directory ${MDC(HISTORY_DIR, dir)}", e) @@ -574,6 +585,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Delete all information about applications whose log files disappeared from storage. // This is done after scanning ALL directories to avoid incorrectly marking entries from // other directories as stale. + // Entries from scan-disabled directories are excluded from stale detection because + // they are never scanned and their lastProcessed time is never updated. val stale = listing.synchronized { KVUtils.viewToSeq(listing.view(classOf[LogInfo]) .index("lastProcessed") @@ -581,6 +594,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } stale.filterNot(isProcessing) .filterNot(info => allNotStale.contains(info.logPath)) + .filterNot(info => skippedDirs.exists(dir => + logDirForPath(new Path(info.logPath)) == dir)) .foreach { log => log.appId.foreach { appId => cleanAppData(appId, log.attemptId, log.logPath) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index e12f0b8eeaad0..bc635554c9d01 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -43,6 +43,16 @@ private[spark] object History { .stringConf .createOptional + val SCAN_DISABLED_SCHEMES = ConfigBuilder("spark.history.fs.update.disabledSchemes") + .doc("Comma-separated list of URI schemes for which periodic log directory scanning is " + + "disabled. Directories with these schemes rely on on-demand loading " + + "instead of scanning. Applications in these directories will not appear in the " + + "listing until accessed by appId. Log compaction and cleaner may not fully function " + + "for these directories; use external lifecycle management (e.g., S3 Lifecycle Policies).") + .version("4.2.0") + .stringConf + .createWithDefault("") + val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval") .version("1.6.0") .doc("Interval between HDFS safemode checks for the event log directory") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 7c76b50b07acb..9796d82bc79b5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -2381,6 +2381,85 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + test("scan disabled for file scheme skips scanning but on-demand loading still works") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(SCAN_DISABLED_SCHEMES, "file") + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + // Write a rolling event log + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer.stop() + + // Scan should skip this directory -- listing remains empty + provider.checkForLogs() + assert(provider.getListing().length === 0) + + // On-demand loading should still work + assert(provider.getAppUI("app1", None).isDefined) + assert(provider.getListing().length === 1) + + // Subsequent scan should NOT remove the on-demand loaded app (stale protection) + provider.checkForLogs() + assert(provider.getListing().length === 1) + + provider.stop() + } + } + + test("scan disabled schemes do not affect directories with non-matching schemes") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + // Disable scanning for s3a/gs -- should not affect local file:// directories + conf.set(SCAN_DISABLED_SCHEMES, "s3a,gs") + val provider = new FsHistoryProvider(conf) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer.stop() + + // file:// scheme is not disabled, so scanning should work normally + provider.checkForLogs() + assert(provider.getListing().length === 1) + + provider.stop() + } + } + + test("scan disabled with empty config does not disable any scheme") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(SCAN_DISABLED_SCHEMES, "") + val provider = new FsHistoryProvider(conf) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer.stop() + + provider.checkForLogs() + assert(provider.getListing().length === 1) + + provider.stop() + } + } + private class SafeModeTestProvider(conf: SparkConf, clock: Clock) extends FsHistoryProvider(conf, clock) { diff --git a/docs/monitoring.md b/docs/monitoring.md index bd9a8b134dc3f..0cdb5a28e8a38 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -207,6 +207,20 @@ Security options for the Spark History Server are covered more detail in the 1.4.0 + + spark.history.fs.update.disabledSchemes + (none) + + Comma-separated list of URI schemes for which periodic log directory scanning is disabled + (e.g., s3a,gs). Directories with these schemes rely on on-demand loading + (SPARK-52914, rolling event + logs only) instead of scanning. Applications in these directories will not appear in the + listing until accessed by appId. Log compaction and the cleaner may not fully function for + these directories; use external lifecycle management (e.g., S3 Lifecycle Policies, GCS Object + Lifecycle Management) instead. + + 4.2.0 + spark.history.retainedApplications 50 From ace6f10887c696c6cf6b90c7aa7db87d9d4ce0dd Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 26 Mar 2026 16:25:37 +0900 Subject: [PATCH 2/3] Use regex instead of URI scheme --- .../deploy/history/FsHistoryProvider.scala | 61 ++++++++---- .../spark/internal/config/History.scala | 18 ++-- .../history/FsHistoryProviderSuite.scala | 95 +++++++++++++++++-- docs/monitoring.md | 17 ++-- .../configs-without-binding-policy-exceptions | 1 + 5 files changed, 155 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6bcb9cc5519e4..a14cfab73af55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} import java.lang.{Long => JLong} -import java.util.{Date, Locale, NoSuchElementException, ServiceLoader} +import java.util.{Date, NoSuchElementException, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, TimeUnit} import java.util.zip.ZipOutputStream @@ -109,8 +109,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val logDirs = conf.get(History.HISTORY_LOG_DIR) .split(",").map(_.trim).filter(_.nonEmpty).toSeq - private val scanDisabledSchemes = conf.get(History.SCAN_DISABLED_SCHEMES) - .split(",").map(_.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).toSet + private val scanDisabledPathPatterns = conf.get(History.SCAN_DISABLED_PATH_PATTERNS) + .map(_.r) + + /** Check if scanning is disabled for a directory by matching its path against patterns. */ + private def isScanDisabled(dir: String): Boolean = { + if (scanDisabledPathPatterns.isEmpty) return false + val qualifiedPath = { + val path = new Path(dir) + val dirFs = logDirFs(dir) + path.makeQualified(dirFs.getUri, dirFs.getWorkingDirectory).toString + } + scanDisabledPathPatterns.exists(_.pattern.matcher(qualifiedPath).matches()) + } private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE) private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS) @@ -457,16 +468,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String) : ApplicationInfoWrapper = { - val date = new Date(0) - val lastUpdate = new Date() - val (logSourceName, logSourceFullPath) = getLogDirInfo(logPath) - val info = ApplicationAttemptInfo( - attemptId, date, date, lastUpdate, 0, "spark", false, "unknown", - Some(logSourceName), Some(logSourceFullPath)) - addListing(new ApplicationInfoWrapper( - ApplicationInfo(appId, appId, None, None, None, None, List.empty), - List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None, - logSourceName, logSourceFullPath)))) + // Resolve the full path to find which directory actually contains this log. + // logPath is a relative name (e.g., "eventlog_v2_app1"), so we need to scan + // all directories to find the correct one. + val (dirFs, fullPath) = resolveLogPath(logPath, "") + val dir = logDirForPath(fullPath) + + if (isScanDisabled(dir)) { + // For scan-disabled directories, run mergeApplicationListing to populate accurate + // metadata since periodic scanning will never update the dummy entry. + try { + EventLogFileReader(dirFs, dirFs.getFileStatus(fullPath)).foreach { reader => + mergeApplicationListing(reader, clock.getTimeMillis(), enableOptimizations = true) + } + } catch { + case _: FileNotFoundException => + throw new NoSuchElementException(s"Log not found for appId: $appId") + } + } else { + val date = new Date(0) + val lastUpdate = new Date() + val (logSourceName, logSourceFullPath) = getLogDirInfo(logPath) + val info = ApplicationAttemptInfo( + attemptId, date, date, lastUpdate, 0, "spark", false, "unknown", + Some(logSourceName), Some(logSourceFullPath)) + addListing(new ApplicationInfoWrapper( + ApplicationInfo(appId, appId, None, None, None, None, List.empty), + List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None, + logSourceName, logSourceFullPath)))) + } load(appId) } @@ -568,10 +598,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logDirs.foreach { dir => try { - val scheme = logDirFs(dir).getUri.getScheme.toLowerCase(Locale.ROOT) - if (scanDisabledSchemes.contains(scheme)) { + if (isScanDisabled(dir)) { logDebug(log"Skipping scan for directory ${MDC(HISTORY_DIR, dir)}" + - log" (scan disabled for its URI scheme)") + log" (scan disabled for this directory)") skippedDirs += dir } else { checkForLogsInDir(dir, newLastScanTime, allNotStale) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index bc635554c9d01..60033e6122f95 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -43,15 +43,19 @@ private[spark] object History { .stringConf .createOptional - val SCAN_DISABLED_SCHEMES = ConfigBuilder("spark.history.fs.update.disabledSchemes") - .doc("Comma-separated list of URI schemes for which periodic log directory scanning is " + - "disabled. Directories with these schemes rely on on-demand loading " + - "instead of scanning. Applications in these directories will not appear in the " + - "listing until accessed by appId. Log compaction and cleaner may not fully function " + - "for these directories; use external lifecycle management (e.g., S3 Lifecycle Policies).") + val SCAN_DISABLED_PATH_PATTERNS = + ConfigBuilder("spark.history.fs.update.scanDisabledPathPatterns") + .doc("Comma-separated list of regular expressions matched against log directory paths. " + + "Directories whose full path matches any pattern will not be scanned periodically. " + + "Applications in these directories rely on on-demand loading instead of scanning and " + + "will not appear in the listing until accessed by appId. When accessed, accurate metadata " + + "is populated immediately. Logs that are never accessed are not subject to the cleaner; " + + "use external lifecycle management (e.g., S3 Lifecycle Policies) for those. " + + "Example: \"s3a://.*,gs://.*\" disables scanning for all S3 and GCS directories.") .version("4.2.0") .stringConf - .createWithDefault("") + .toSequence + .createWithDefault(Nil) val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval") .version("1.6.0") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 9796d82bc79b5..9d3bd258956c5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -2381,11 +2381,11 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } - test("scan disabled for file scheme skips scanning but on-demand loading still works") { + test("SPARK-56234: Skips scanning but on-demand loading still works") { withTempDir { dir => val conf = createTestConf(true) conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) - conf.set(SCAN_DISABLED_SCHEMES, "file") + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("file:.*")) conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true) val hadoopConf = SparkHadoopUtil.newConfiguration(conf) val provider = new FsHistoryProvider(conf) @@ -2406,6 +2406,12 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P assert(provider.getAppUI("app1", None).isDefined) assert(provider.getListing().length === 1) + // Metadata should be accurate (not dummy) since mergeApplicationListing runs + // for scan-disabled directories + val appInfo = provider.getListing().next() + assert(appInfo.name === "app1") + assert(appInfo.attempts.head.appSparkVersion !== "unknown") + // Subsequent scan should NOT remove the on-demand loaded app (stale protection) provider.checkForLogs() assert(provider.getListing().length === 1) @@ -2414,12 +2420,12 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } - test("scan disabled schemes do not affect directories with non-matching schemes") { + test("SPARK-56234: Scan disabled schemes do not affect directories with non-matching schemes") { withTempDir { dir => val conf = createTestConf(true) conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) // Disable scanning for s3a/gs -- should not affect local file:// directories - conf.set(SCAN_DISABLED_SCHEMES, "s3a,gs") + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("s3a://.*", "gs://.*")) val provider = new FsHistoryProvider(conf) val hadoopConf = SparkHadoopUtil.newConfiguration(conf) @@ -2438,11 +2444,11 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } - test("scan disabled with empty config does not disable any scheme") { + test("SPARK-56234: Scan disabled with empty config does not disable any scheme") { withTempDir { dir => val conf = createTestConf(true) conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) - conf.set(SCAN_DISABLED_SCHEMES, "") + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq.empty[String]) val provider = new FsHistoryProvider(conf) val hadoopConf = SparkHadoopUtil.newConfiguration(conf) @@ -2460,6 +2466,83 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + test("SPARK-56234: On-demand loading populates accurate metadata via mergeApplicationListing") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("file:.*")) + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 1000, "testuser", None), + SparkListenerJobStart(1, 0, Seq.empty), + SparkListenerApplicationEnd(5000)), rollFile = false) + writer.stop() + + // On-demand load without prior scan + assert(provider.getAppUI("app1", None).isDefined) + + // Listing should have accurate metadata from mergeApplicationListing, + // not dummy values (sparkVersion="unknown", sparkUser="spark", etc.) + val appInfo = provider.getListing().next() + assert(appInfo.name === "app1") + assert(appInfo.attempts.head.appSparkVersion !== "unknown") + assert(appInfo.attempts.head.sparkUser === "testuser") + assert(appInfo.attempts.head.completed) + assert(appInfo.attempts.head.duration > 0) + + provider.stop() + } + } + + test("SPARK-56234: Scan disabled by path pattern skips matching directories") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + try { + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}") + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq(s".*${dir2.getName}")) + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + // Write app in active dir (scan enabled) + val writer1 = new RollingEventLogFilesWriter( + "app1", None, testDir.toURI, conf, hadoopConf) + writer1.start() + writeEventsToRollingWriter(writer1, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer1.stop() + + // Write app in archive dir (scan disabled) + val writer2 = new RollingEventLogFilesWriter( + "app2", None, dir2.toURI, conf, hadoopConf) + writer2.start() + writeEventsToRollingWriter(writer2, Seq( + SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None), + SparkListenerJobStart(2, 0, Seq.empty)), rollFile = false) + writer2.stop() + + // Only app1 should be discovered by scan + provider.checkForLogs() + val listing = provider.getListing().toSeq + assert(listing.size === 1) + assert(listing.head.id === "app1") + + // app2 should be loadable on demand + assert(provider.getAppUI("app2", None).isDefined) + assert(provider.getListing().toSeq.size === 2) + + provider.stop() + } finally { + Utils.deleteRecursively(dir2) + } + } + private class SafeModeTestProvider(conf: SparkConf, clock: Clock) extends FsHistoryProvider(conf, clock) { diff --git a/docs/monitoring.md b/docs/monitoring.md index 0cdb5a28e8a38..4f90cbb5645c5 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -208,16 +208,17 @@ Security options for the Spark History Server are covered more detail in the 1.4.0 - spark.history.fs.update.disabledSchemes + spark.history.fs.update.scanDisabledPathPatterns (none) - Comma-separated list of URI schemes for which periodic log directory scanning is disabled - (e.g., s3a,gs). Directories with these schemes rely on on-demand loading - (SPARK-52914, rolling event - logs only) instead of scanning. Applications in these directories will not appear in the - listing until accessed by appId. Log compaction and the cleaner may not fully function for - these directories; use external lifecycle management (e.g., S3 Lifecycle Policies, GCS Object - Lifecycle Management) instead. + Comma-separated list of regular expressions matched against log directory paths. + Directories whose full path matches any pattern will not be scanned periodically + (e.g., s3a://.*,gs://.* disables scanning for all S3 and GCS directories). + Applications in these directories rely on on-demand loading instead of scanning and + will not appear in the listing until accessed by appId. When accessed, accurate metadata + is populated immediately. Logs that are never accessed are not subject to the cleaner; + use external lifecycle management (e.g., S3 Lifecycle Policies, GCS Object Lifecycle + Management) for those. 4.2.0 diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index f20b592daf5aa..556e2befc5b66 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -167,6 +167,7 @@ spark.history.fs.numReplayThreads spark.history.fs.safemodeCheck.interval spark.history.fs.update.batchSize spark.history.fs.update.interval +spark.history.fs.update.scanDisabledPathPatterns spark.history.kerberos.enabled spark.history.kerberos.keytab spark.history.kerberos.principal From fc5eeabdef2b27de9752e51a79e7b60a24009865 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 27 Mar 2026 10:45:12 +0900 Subject: [PATCH 3/3] Address comment --- .../deploy/history/FsHistoryProvider.scala | 2 +- .../spark/internal/config/History.scala | 29 ++++++++++--------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a14cfab73af55..2c00878bb31ec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -469,7 +469,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String) : ApplicationInfoWrapper = { // Resolve the full path to find which directory actually contains this log. - // logPath is a relative name (e.g., "eventlog_v2_app1"), so we need to scan + // logPath is a relative name (e.g., "eventlog_v2_app1"), so we look up // all directories to find the correct one. val (dirFs, fullPath) = resolveLogPath(logPath, "") val dir = logDirForPath(fullPath) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 60033e6122f95..90abc9d038db1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -43,20 +43,6 @@ private[spark] object History { .stringConf .createOptional - val SCAN_DISABLED_PATH_PATTERNS = - ConfigBuilder("spark.history.fs.update.scanDisabledPathPatterns") - .doc("Comma-separated list of regular expressions matched against log directory paths. " + - "Directories whose full path matches any pattern will not be scanned periodically. " + - "Applications in these directories rely on on-demand loading instead of scanning and " + - "will not appear in the listing until accessed by appId. When accessed, accurate metadata " + - "is populated immediately. Logs that are never accessed are not subject to the cleaner; " + - "use external lifecycle management (e.g., S3 Lifecycle Policies) for those. " + - "Example: \"s3a://.*,gs://.*\" disables scanning for all S3 and GCS directories.") - .version("4.2.0") - .stringConf - .toSequence - .createWithDefault(Nil) - val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval") .version("1.6.0") .doc("Interval between HDFS safemode checks for the event log directory") @@ -79,6 +65,21 @@ private[spark] object History { .checkValue(v => v > 0, "The update batchSize should be a positive integer.") .createWithDefault(Int.MaxValue) + val SCAN_DISABLED_PATH_PATTERNS = + ConfigBuilder("spark.history.fs.update.scanDisabledPathPatterns") + .doc("Comma-separated list of regular expressions matched against log directory " + + "paths. Directories whose full path matches any pattern will not be scanned " + + "periodically. Applications in these directories rely on on-demand loading " + + "instead of scanning and will not appear in the listing until accessed by appId. " + + "When accessed, accurate metadata is populated immediately. Logs that are never " + + "accessed are not subject to the cleaner; use external lifecycle management " + + "(e.g., S3 Lifecycle Policies) for those. " + + "Example: \"s3a://.*,gs://.*\" disables scanning for all S3 and GCS directories.") + .version("4.2.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled") .version("1.4.0") .doc("Whether the History Server should periodically clean up event logs from storage")