Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val logDirs = conf.get(History.HISTORY_LOG_DIR)
.split(",").map(_.trim).filter(_.nonEmpty).toSeq

private val scanDisabledPathPatterns = conf.get(History.SCAN_DISABLED_PATH_PATTERNS)
.map(_.r)

/** Check if scanning is disabled for a directory by matching its path against patterns. */
private def isScanDisabled(dir: String): Boolean = {
if (scanDisabledPathPatterns.isEmpty) return false
val qualifiedPath = {
val path = new Path(dir)
val dirFs = logDirFs(dir)
path.makeQualified(dirFs.getUri, dirFs.getWorkingDirectory).toString
}
scanDisabledPathPatterns.exists(_.pattern.matcher(qualifiedPath).matches())
}

private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE)
private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS)
private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS)
Expand Down Expand Up @@ -454,16 +468,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String)
: ApplicationInfoWrapper = {
val date = new Date(0)
val lastUpdate = new Date()
val (logSourceName, logSourceFullPath) = getLogDirInfo(logPath)
val info = ApplicationAttemptInfo(
attemptId, date, date, lastUpdate, 0, "spark", false, "unknown",
Some(logSourceName), Some(logSourceFullPath))
addListing(new ApplicationInfoWrapper(
ApplicationInfo(appId, appId, None, None, None, None, List.empty),
List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None,
logSourceName, logSourceFullPath))))
// Resolve the full path to find which directory actually contains this log.
// logPath is a relative name (e.g., "eventlog_v2_app1"), so we look up
// all directories to find the correct one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds misleading because this PR title is Support disabling log directory scanning .... Could you revise the comment?

so we need to scan all directories to find the correct one.

val (dirFs, fullPath) = resolveLogPath(logPath, "")
val dir = logDirForPath(fullPath)

if (isScanDisabled(dir)) {
// For scan-disabled directories, run mergeApplicationListing to populate accurate
// metadata since periodic scanning will never update the dummy entry.
try {
EventLogFileReader(dirFs, dirFs.getFileStatus(fullPath)).foreach { reader =>
mergeApplicationListing(reader, clock.getTimeMillis(), enableOptimizations = true)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this enough to load a single Spark job history correctly? If then, can we remove else statement logic (489 ~ 498) completely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mergeApplicationListing is sufficient to load accurate metadata. However, I'd like to keep the else branch (dummy metadata path) for scan-enabled directories to preserve the existing SPARK-52914 behavior. Removing it would change the on-demand loading behavior for all directories, not just scan-disabled ones.

} catch {
case _: FileNotFoundException =>
throw new NoSuchElementException(s"Log not found for appId: $appId")
}
} else {
val date = new Date(0)
val lastUpdate = new Date()
val (logSourceName, logSourceFullPath) = getLogDirInfo(logPath)
val info = ApplicationAttemptInfo(
attemptId, date, date, lastUpdate, 0, "spark", false, "unknown",
Some(logSourceName), Some(logSourceFullPath))
addListing(new ApplicationInfoWrapper(
ApplicationInfo(appId, appId, None, None, None, None, List.empty),
List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None,
logSourceName, logSourceFullPath))))
}
load(appId)
}

Expand Down Expand Up @@ -561,10 +594,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] def checkForLogs(): Unit = {
val newLastScanTime = clock.getTimeMillis()
val allNotStale = mutable.HashSet[String]()
val skippedDirs = mutable.ArrayBuffer[String]()

logDirs.foreach { dir =>
try {
checkForLogsInDir(dir, newLastScanTime, allNotStale)
if (isScanDisabled(dir)) {
logDebug(log"Skipping scan for directory ${MDC(HISTORY_DIR, dir)}" +
log" (scan disabled for this directory)")
skippedDirs += dir
} else {
checkForLogsInDir(dir, newLastScanTime, allNotStale)
}
} catch {
case e: IOException =>
logError(log"Error checking for logs in directory ${MDC(HISTORY_DIR, dir)}", e)
Expand All @@ -574,13 +614,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Delete all information about applications whose log files disappeared from storage.
// This is done after scanning ALL directories to avoid incorrectly marking entries from
// other directories as stale.
// Entries from scan-disabled directories are excluded from stale detection because
// they are never scanned and their lastProcessed time is never updated.
val stale = listing.synchronized {
KVUtils.viewToSeq(listing.view(classOf[LogInfo])
.index("lastProcessed")
.last(newLastScanTime - 1))
}
stale.filterNot(isProcessing)
.filterNot(info => allNotStale.contains(info.logPath))
.filterNot(info => skippedDirs.exists(dir =>
logDirForPath(new Path(info.logPath)) == dir))
.foreach { log =>
log.appId.foreach { appId =>
cleanAppData(appId, log.attemptId, log.logPath)
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/History.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ private[spark] object History {
.checkValue(v => v > 0, "The update batchSize should be a positive integer.")
.createWithDefault(Int.MaxValue)

val SCAN_DISABLED_PATH_PATTERNS =
ConfigBuilder("spark.history.fs.update.scanDisabledPathPatterns")
.doc("Comma-separated list of regular expressions matched against log directory " +
"paths. Directories whose full path matches any pattern will not be scanned " +
"periodically. Applications in these directories rely on on-demand loading " +
"instead of scanning and will not appear in the listing until accessed by appId. " +
"When accessed, accurate metadata is populated immediately. Logs that are never " +
"accessed are not subject to the cleaner; use external lifecycle management " +
"(e.g., S3 Lifecycle Policies) for those. " +
"Example: \"s3a://.*,gs://.*\" disables scanning for all S3 and GCS directories.")
.version("4.2.0")
.stringConf
.toSequence
.createWithDefault(Nil)

val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.version("1.4.0")
.doc("Whether the History Server should periodically clean up event logs from storage")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,168 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
}
}

test("SPARK-56234: Skips scanning but on-demand loading still works") {
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("file:.*"))
conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
val provider = new FsHistoryProvider(conf)

// Write a rolling event log
val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf)
writer.start()
writeEventsToRollingWriter(writer, Seq(
SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
writer.stop()

// Scan should skip this directory -- listing remains empty
provider.checkForLogs()
assert(provider.getListing().length === 0)

// On-demand loading should still work
assert(provider.getAppUI("app1", None).isDefined)
assert(provider.getListing().length === 1)

// Metadata should be accurate (not dummy) since mergeApplicationListing runs
// for scan-disabled directories
val appInfo = provider.getListing().next()
assert(appInfo.name === "app1")
assert(appInfo.attempts.head.appSparkVersion !== "unknown")

// Subsequent scan should NOT remove the on-demand loaded app (stale protection)
provider.checkForLogs()
assert(provider.getListing().length === 1)

provider.stop()
}
}

test("SPARK-56234: Scan disabled schemes do not affect directories with non-matching schemes") {
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
// Disable scanning for s3a/gs -- should not affect local file:// directories
conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("s3a://.*", "gs://.*"))
val provider = new FsHistoryProvider(conf)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)

val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf)
writer.start()
writeEventsToRollingWriter(writer, Seq(
SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
writer.stop()

// file:// scheme is not disabled, so scanning should work normally
provider.checkForLogs()
assert(provider.getListing().length === 1)

provider.stop()
}
}

test("SPARK-56234: Scan disabled with empty config does not disable any scheme") {
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq.empty[String])
val provider = new FsHistoryProvider(conf)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)

val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf)
writer.start()
writeEventsToRollingWriter(writer, Seq(
SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
writer.stop()

provider.checkForLogs()
assert(provider.getListing().length === 1)

provider.stop()
}
}

test("SPARK-56234: On-demand loading populates accurate metadata via mergeApplicationListing") {
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq("file:.*"))
conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
val provider = new FsHistoryProvider(conf)

val writer = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf)
writer.start()
writeEventsToRollingWriter(writer, Seq(
SparkListenerApplicationStart("app1", Some("app1"), 1000, "testuser", None),
SparkListenerJobStart(1, 0, Seq.empty),
SparkListenerApplicationEnd(5000)), rollFile = false)
writer.stop()

// On-demand load without prior scan
assert(provider.getAppUI("app1", None).isDefined)

// Listing should have accurate metadata from mergeApplicationListing,
// not dummy values (sparkVersion="unknown", sparkUser="spark", etc.)
val appInfo = provider.getListing().next()
assert(appInfo.name === "app1")
assert(appInfo.attempts.head.appSparkVersion !== "unknown")
assert(appInfo.attempts.head.sparkUser === "testuser")
assert(appInfo.attempts.head.completed)
assert(appInfo.attempts.head.duration > 0)

provider.stop()
}
}

test("SPARK-56234: Scan disabled by path pattern skips matching directories") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
try {
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
conf.set(SCAN_DISABLED_PATH_PATTERNS, Seq(s".*${dir2.getName}"))
conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, true)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
val provider = new FsHistoryProvider(conf)

// Write app in active dir (scan enabled)
val writer1 = new RollingEventLogFilesWriter(
"app1", None, testDir.toURI, conf, hadoopConf)
writer1.start()
writeEventsToRollingWriter(writer1, Seq(
SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
writer1.stop()

// Write app in archive dir (scan disabled)
val writer2 = new RollingEventLogFilesWriter(
"app2", None, dir2.toURI, conf, hadoopConf)
writer2.start()
writeEventsToRollingWriter(writer2, Seq(
SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
SparkListenerJobStart(2, 0, Seq.empty)), rollFile = false)
writer2.stop()

// Only app1 should be discovered by scan
provider.checkForLogs()
val listing = provider.getListing().toSeq
assert(listing.size === 1)
assert(listing.head.id === "app1")

// app2 should be loadable on demand
assert(provider.getAppUI("app2", None).isDefined)
assert(provider.getListing().toSeq.size === 2)

provider.stop()
} finally {
Utils.deleteRecursively(dir2)
}
}

private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
extends FsHistoryProvider(conf, clock) {

Expand Down
15 changes: 15 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,21 @@ Security options for the Spark History Server are covered more detail in the
</td>
<td>1.4.0</td>
</tr>
<tr>
<td>spark.history.fs.update.scanDisabledPathPatterns</td>
<td>(none)</td>
<td>
Comma-separated list of regular expressions matched against log directory paths.
Directories whose full path matches any pattern will not be scanned periodically
(e.g., <code>s3a://.*,gs://.*</code> disables scanning for all S3 and GCS directories).
Applications in these directories rely on on-demand loading instead of scanning and
will not appear in the listing until accessed by appId. When accessed, accurate metadata
is populated immediately. Logs that are never accessed are not subject to the cleaner;
use external lifecycle management (e.g., S3 Lifecycle Policies, GCS Object Lifecycle
Management) for those.
</td>
<td>4.2.0</td>
</tr>
<tr>
<td>spark.history.retainedApplications</td>
<td>50</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ spark.history.fs.numReplayThreads
spark.history.fs.safemodeCheck.interval
spark.history.fs.update.batchSize
spark.history.fs.update.interval
spark.history.fs.update.scanDisabledPathPatterns
spark.history.kerberos.enabled
spark.history.kerberos.keytab
spark.history.kerberos.principal
Expand Down