Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public enum LogKeys implements LogKey {
EXPR,
EXPR_TERMS,
EXTENDED_EXPLAIN_GENERATOR,
FALLBACK_STORAGE_BLOCKS_SIZE,
FAILED_STAGE,
FAILED_STAGE_NAME,
FAILURES,
Expand Down Expand Up @@ -473,6 +474,7 @@ public enum LogKeys implements LogKey {
NUM_EXECUTOR_DESIRED,
NUM_EXECUTOR_LAUNCH,
NUM_EXECUTOR_TARGET,
NUM_FALLBACK_STORAGE_BLOCKS,
NUM_FAILURES,
NUM_FEATURES,
NUM_FILES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,13 @@ package object config {
"maxRemoteBlockSizeFetchToMem cannot be larger than (Int.MaxValue - 512) bytes.")
.createWithDefaultString("200m")

private[spark] val REDUCER_FALLBACK_STORAGE_READ_THREADS =
ConfigBuilder("spark.reducer.fallbackStorage.readThreads")
.doc("Number of threads used by the reducer to read shuffle blocks from fallback storage.")
.version("4.2.0")
.intConf
.createWithDefault(5)

private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")
.doc("Enable tracking of updatedBlockStatuses in the TaskMetrics. Off by default since " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ trait BlockDataManager {
*/
def getLocalBlockData(blockId: BlockId): ManagedBuffer

/**
* Interface to get fallback storage block data. Throws an exception if the block cannot be found
* or cannot be read successfully.
*/
def getFallbackStorageBlockData(blockId: BlockId): ManagedBuffer

/**
* Put the block locally, using the given storage level.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.get(config.SHUFFLE_MAX_ATTEMPTS_ON_NETTY_OOM),
SparkEnv.get.conf.get(config.REDUCER_FALLBACK_STORAGE_READ_THREADS),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT_MEMORY),
SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED),
Expand Down
30 changes: 20 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -759,16 +759,7 @@ private[spark] class BlockManager(
override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
logDebug(s"Getting local shuffle block ${blockId}")
try {
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
} catch {
case e: IOException =>
if (conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
FallbackStorage.read(conf, blockId)
} else {
throw e
}
}
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
} else {
getLocalBytes(blockId) match {
case Some(blockData) =>
Expand All @@ -783,6 +774,25 @@ private[spark] class BlockManager(
}
}

/**
* Interface to get fallback storage block data. Throws an exception if the block cannot be found
* or cannot be read successfully.
*/
override def getFallbackStorageBlockData(blockId: BlockId): ManagedBuffer = {
require(conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)

if (blockId.isShuffle) {
logDebug(s"Getting fallback storage block ${blockId}")
FallbackStorage.read(conf, blockId)
} else {
// If this block manager receives a request for a block that it doesn't have then it's
// likely that the master has outdated block statuses for this block. Therefore, we send
// an RPC so that this block is marked as being unavailable from this block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw SparkCoreErrors.blockNotFoundError(blockId)
}
}

/**
* Put the block locally, using the given storage level.
*
Expand Down
66 changes: 53 additions & 13 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.storage

import java.io.DataInputStream
import java.io.{DataInputStream, InputStream}
import java.nio.ByteBuffer

import scala.concurrent.Future
import scala.reflect.ClassTag

import io.netty.buffer.Unpooled
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -31,8 +32,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.util.{JavaUtils, LimitedInputStream}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
Expand Down Expand Up @@ -114,6 +115,51 @@ private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf
}
}

/**
* Lazily reads a segment of an Hadoop FileSystem file, i.e. when createInputStream is called.
* @param filesystem hadoop filesystem
* @param file path of the file
* @param offset offset of the segment
* @param length size of the segmetn
*/
private[storage] class FileSystemSegmentManagedBuffer(
filesystem: FileSystem,
file: Path,
offset: Long,
length: Long) extends ManagedBuffer with Logging {

override def size(): Long = length

override def nioByteBuffer(): ByteBuffer = {
Utils.tryWithResource(createInputStream()) { in =>
ByteBuffer.wrap(in.readAllBytes())
}
}

override def createInputStream(): InputStream = {
val startTimeNs = System.nanoTime()
try {
val in = filesystem.open(file)
in.seek(offset)
new LimitedInputStream(in, length)
} finally {
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
}
}

override def retain(): ManagedBuffer = this

override def release(): ManagedBuffer = this

override def convertToNetty(): AnyRef = {
Unpooled.wrappedBuffer(nioByteBuffer());
}

override def convertToNettyForSsl(): AnyRef = {
Unpooled.wrappedBuffer(nioByteBuffer());
}
}

private[spark] object FallbackStorage extends Logging {
/** We use one block manager id as a place holder. */
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
Expand Down Expand Up @@ -168,7 +214,9 @@ private[spark] object FallbackStorage extends Logging {
}

/**
* Read a ManagedBuffer.
* Read a block as ManagedBuffer. This reads the index for offset and block size
* but does not read the actual block data. Those data are later read when calling
* createInputStream() on the returned ManagedBuffer.
*/
def read(conf: SparkConf, blockId: BlockId): ManagedBuffer = {
logInfo(log"Read ${MDC(BLOCK_ID, blockId)}")
Expand Down Expand Up @@ -202,15 +250,7 @@ private[spark] object FallbackStorage extends Logging {
val hash = JavaUtils.nonNegativeHash(name)
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val size = nextOffset - offset
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
f.seek(offset)
f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
}
new NioManagedBuffer(ByteBuffer.wrap(array))
new FileSystemSegmentManagedBuffer(fallbackFileSystem, dataFile, offset, size)
}
}
}
Expand Down
Loading