diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 54bc290b3787f..2c00878bb31ec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -109,6 +109,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val logDirs = conf.get(History.HISTORY_LOG_DIR) .split(",").map(_.trim).filter(_.nonEmpty).toSeq + private val scanDisabledPathPatterns = conf.get(History.SCAN_DISABLED_PATH_PATTERNS) + .map(_.r) + + /** Check if scanning is disabled for a directory by matching its path against patterns. */ + private def isScanDisabled(dir: String): Boolean = { + if (scanDisabledPathPatterns.isEmpty) return false + val qualifiedPath = { + val path = new Path(dir) + val dirFs = logDirFs(dir) + path.makeQualified(dirFs.getUri, dirFs.getWorkingDirectory).toString + } + scanDisabledPathPatterns.exists(_.pattern.matcher(qualifiedPath).matches()) + } + private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE) private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS) private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS) @@ -454,16 +468,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String) : ApplicationInfoWrapper = { - val date = new Date(0) - val lastUpdate = new Date() - val (logSourceName, logSourceFullPath) = getLogDirInfo(logPath) - val info = ApplicationAttemptInfo( - attemptId, date, date, lastUpdate, 0, "spark", false, "unknown", - Some(logSourceName), Some(logSourceFullPath)) - addListing(new ApplicationInfoWrapper( - ApplicationInfo(appId, appId, None, None, None, None, List.empty), - List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None, - logSourceName, logSourceFullPath)))) + // Resolve the full path to find which directory actually contains this log. + // logPath is a relative name (e.g., "eventlog_v2_app1"), so we look up + // all directories to find the correct one. + val (dirFs, fullPath) = resolveLogPath(logPath, "") + val dir = logDirForPath(fullPath) + + if (isScanDisabled(dir)) { + // For scan-disabled directories, run mergeApplicationListing to populate accurate + // metadata since periodic scanning will never update the dummy entry. + try { + EventLogFileReader(dirFs, dirFs.getFileStatus(fullPath)).foreach { reader => + mergeApplicationListing(reader, clock.getTimeMillis(), enableOptimizations = true) + } + } catch { + case _: FileNotFoundException => + throw new NoSuchElementException(s"Log not found for appId: $appId") + } + } else { + val date = new Date(0) + val lastUpdate = new Date() + val (logSourceName, logSourceFullPath) = getLogDirInfo(logPath) + val info = ApplicationAttemptInfo( + attemptId, date, date, lastUpdate, 0, "spark", false, "unknown", + Some(logSourceName), Some(logSourceFullPath)) + addListing(new ApplicationInfoWrapper( + ApplicationInfo(appId, appId, None, None, None, None, List.empty), + List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None, + logSourceName, logSourceFullPath)))) + } load(appId) } @@ -561,10 +594,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] def checkForLogs(): Unit = { val newLastScanTime = clock.getTimeMillis() val allNotStale = mutable.HashSet[String]() + val skippedDirs = mutable.ArrayBuffer[String]() logDirs.foreach { dir => try { - checkForLogsInDir(dir, newLastScanTime, allNotStale) + if (isScanDisabled(dir)) { + logDebug(log"Skipping scan for directory ${MDC(HISTORY_DIR, dir)}" + + log" (scan disabled for this directory)") + skippedDirs += dir + } else { + checkForLogsInDir(dir, newLastScanTime, allNotStale) + } } catch { case e: IOException => logError(log"Error checking for logs in directory ${MDC(HISTORY_DIR, dir)}", e) @@ -574,6 +614,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Delete all information about applications whose log files disappeared from storage. // This is done after scanning ALL directories to avoid incorrectly marking entries from // other directories as stale. + // Entries from scan-disabled directories are excluded from stale detection because + // they are never scanned and their lastProcessed time is never updated. val stale = listing.synchronized { KVUtils.viewToSeq(listing.view(classOf[LogInfo]) .index("lastProcessed") @@ -581,6 +623,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } stale.filterNot(isProcessing) .filterNot(info => allNotStale.contains(info.logPath)) + .filterNot(info => skippedDirs.exists(dir => + logDirForPath(new Path(info.logPath)) == dir)) .foreach { log => log.appId.foreach { appId => cleanAppData(appId, log.attemptId, log.logPath) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index e12f0b8eeaad0..90abc9d038db1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -65,6 +65,21 @@ private[spark] object History { .checkValue(v => v > 0, "The update batchSize should be a positive integer.") .createWithDefault(Int.MaxValue) + val SCAN_DISABLED_PATH_PATTERNS = + ConfigBuilder("spark.history.fs.update.scanDisabledPathPatterns") + .doc("Comma-separated list of regular expressions matched against log directory " + + "paths. Directories whose full path matches any pattern will not be scanned " + + "periodically. Applications in these directories rely on on-demand loading " + + "instead of scanning and will not appear in the listing until accessed by appId. " + + "When accessed, accurate metadata is populated immediately. Logs that are never " + + "accessed are not subject to the cleaner; use external lifecycle management " + + "(e.g., S3 Lifecycle Policies) for those. " + + "Example: \"s3a://.*,gs://.*\" disables scanning for all S3 and GCS directories.") + .version("4.2.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled") .version("1.4.0") .doc("Whether the History Server should periodically clean up event logs from storage") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 7c76b50b07acb..9d3bd258956c5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -2381,6 +2381,168 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + test("SPARK-56234: Skips scanning but on-demand loading still works") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("file:.*")) + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + // Write a rolling event log + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer.stop() + + // Scan should skip this directory -- listing remains empty + provider.checkForLogs() + assert(provider.getListing().length === 0) + + // On-demand loading should still work + assert(provider.getAppUI("app1", None).isDefined) + assert(provider.getListing().length === 1) + + // Metadata should be accurate (not dummy) since mergeApplicationListing runs + // for scan-disabled directories + val appInfo = provider.getListing().next() + assert(appInfo.name === "app1") + assert(appInfo.attempts.head.appSparkVersion !== "unknown") + + // Subsequent scan should NOT remove the on-demand loaded app (stale protection) + provider.checkForLogs() + assert(provider.getListing().length === 1) + + provider.stop() + } + } + + test("SPARK-56234: Scan disabled schemes do not affect directories with non-matching schemes") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + // Disable scanning for s3a/gs -- should not affect local file:// directories + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("s3a://.*", "gs://.*")) + val provider = new FsHistoryProvider(conf) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer.stop() + + // file:// scheme is not disabled, so scanning should work normally + provider.checkForLogs() + assert(provider.getListing().length === 1) + + provider.stop() + } + } + + test("SPARK-56234: Scan disabled with empty config does not disable any scheme") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq.empty[String]) + val provider = new FsHistoryProvider(conf) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer.stop() + + provider.checkForLogs() + assert(provider.getListing().length === 1) + + provider.stop() + } + } + + test("SPARK-56234: On-demand loading populates accurate metadata via mergeApplicationListing") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("file:.*")) + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf) + writer.start() + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 1000, "testuser", None), + SparkListenerJobStart(1, 0, Seq.empty), + SparkListenerApplicationEnd(5000)), rollFile = false) + writer.stop() + + // On-demand load without prior scan + assert(provider.getAppUI("app1", None).isDefined) + + // Listing should have accurate metadata from mergeApplicationListing, + // not dummy values (sparkVersion="unknown", sparkUser="spark", etc.) + val appInfo = provider.getListing().next() + assert(appInfo.name === "app1") + assert(appInfo.attempts.head.appSparkVersion !== "unknown") + assert(appInfo.attempts.head.sparkUser === "testuser") + assert(appInfo.attempts.head.completed) + assert(appInfo.attempts.head.duration > 0) + + provider.stop() + } + } + + test("SPARK-56234: Scan disabled by path pattern skips matching directories") { + val dir2 = Utils.createTempDir(namePrefix = "logDir2") + try { + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}") + conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq(s".*${dir2.getName}")) + conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val provider = new FsHistoryProvider(conf) + + // Write app in active dir (scan enabled) + val writer1 = new RollingEventLogFilesWriter( + "app1", None, testDir.toURI, conf, hadoopConf) + writer1.start() + writeEventsToRollingWriter(writer1, Seq( + SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + writer1.stop() + + // Write app in archive dir (scan disabled) + val writer2 = new RollingEventLogFilesWriter( + "app2", None, dir2.toURI, conf, hadoopConf) + writer2.start() + writeEventsToRollingWriter(writer2, Seq( + SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None), + SparkListenerJobStart(2, 0, Seq.empty)), rollFile = false) + writer2.stop() + + // Only app1 should be discovered by scan + provider.checkForLogs() + val listing = provider.getListing().toSeq + assert(listing.size === 1) + assert(listing.head.id === "app1") + + // app2 should be loadable on demand + assert(provider.getAppUI("app2", None).isDefined) + assert(provider.getListing().toSeq.size === 2) + + provider.stop() + } finally { + Utils.deleteRecursively(dir2) + } + } + private class SafeModeTestProvider(conf: SparkConf, clock: Clock) extends FsHistoryProvider(conf, clock) { diff --git a/docs/monitoring.md b/docs/monitoring.md index bd9a8b134dc3f..4f90cbb5645c5 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -207,6 +207,21 @@ Security options for the Spark History Server are covered more detail in the 1.4.0 + + spark.history.fs.update.scanDisabledPathPatterns + (none) + + Comma-separated list of regular expressions matched against log directory paths. + Directories whose full path matches any pattern will not be scanned periodically + (e.g., s3a://.*,gs://.* disables scanning for all S3 and GCS directories). + Applications in these directories rely on on-demand loading instead of scanning and + will not appear in the listing until accessed by appId. When accessed, accurate metadata + is populated immediately. Logs that are never accessed are not subject to the cleaner; + use external lifecycle management (e.g., S3 Lifecycle Policies, GCS Object Lifecycle + Management) for those. + + 4.2.0 + spark.history.retainedApplications 50 diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index f20b592daf5aa..556e2befc5b66 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -167,6 +167,7 @@ spark.history.fs.numReplayThreads spark.history.fs.safemodeCheck.interval spark.history.fs.update.batchSize spark.history.fs.update.interval +spark.history.fs.update.scanDisabledPathPatterns spark.history.kerberos.enabled spark.history.kerberos.keytab spark.history.kerberos.principal