Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def resource_bounded_multithreaded_reader_conf(file_type,
])
memory_limit = ('spark.rapids.sql.multiThreadedRead.memoryLimit.size', mem_lmt)

timeout = _list_conf_helper(timeout_conf, default=[0, 1000, 60000])
timeout = _list_conf_helper(timeout_conf, default=[500, 3000, 60000])
task_timeout = ('spark.rapids.sql.multiThreadedRead.memoryLimit.acquisitionTimeout', timeout)

conf_matrix = [base_conf]
Expand Down
35 changes: 30 additions & 5 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,9 @@ def test_parquet_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader

_resource_bounded_pool_conf_matrix = resource_bounded_multithreaded_reader_conf(
file_type='parquet',
specialized_conf={
# set the int96 rebase mode values because its LEGACY in databricks which will preclude this op from running on GPU
int96RebaseModeInReadKey: 'CORRECTED',
datetimeRebaseModeInReadKey: 'CORRECTED'
})
specialized_conf={int96RebaseModeInReadKey: 'CORRECTED', datetimeRebaseModeInReadKey: 'CORRECTED'}
)

@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
@pytest.mark.parametrize('reader_confs', _resource_bounded_pool_conf_matrix, ids=idfn)
@tz_sensitive_test
Expand All @@ -222,6 +220,33 @@ def test_parquet_read_multithread_flow_ctrl_round_trip(spark_tmp_path, parquet_g
conf=rebase_write_corrected_conf)
assert_gpu_and_cpu_are_equal_collect(read_parquet_sql(data_path), conf=reader_confs)

_res_bnd_pool_conf_mat_simple = resource_bounded_multithreaded_reader_conf(
file_type='parquet',
specialized_conf={int96RebaseModeInReadKey: 'CORRECTED', datetimeRebaseModeInReadKey: 'CORRECTED'},
combine_size_conf=[0, '64m'],
keep_order_conf=[False, True],
reader_type_conf=['MULTITHREADED'],
pool_size_conf=[16],
memory_limit_conf=[4 << 20, 16 << 20],
timeout_conf=[5000]
)

@pytest.mark.parametrize('parquet_gens', parquet_gens_list[:1], ids=idfn)
@pytest.mark.parametrize('reader_confs', _res_bnd_pool_conf_mat_simple, ids=idfn)
@tz_sensitive_test
@allow_non_gpu(*non_utc_allow)
def test_parquet_read_multithread_flow_ctrl_selective_read(spark_tmp_path, parquet_gens, reader_confs):
"""Test selective column reading with resource-bounded multithreaded reader."""
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : gen_df(spark, gen_list, length=2048).write.parquet(data_path),
conf=rebase_write_corrected_conf)
select_cols = [f'_c{i}' for i in range(len(gen_list)) if i % 3 == 0]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path).selectExpr(*select_cols),
conf=reader_confs)

# Ensure that the multithreaded reader with resource bounded pool can handle an excessive host
# memory request that cannot be satisfied by the pool.
@pytest.mark.parametrize('keep_order', [False, True], ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,31 +126,10 @@ trait HostMemoryBuffersWithMetaDataBase extends AutoCloseable {
_scheduleTime.toDouble / totalTime
}

def releaseResource(): Unit = {
while (_releaseCallback.nonEmpty) {
// Call the release callback to release the resources
_releaseCallback.dequeue()()
}
}

def addReleaseResourceCallback(callback: () => Unit): Unit = {
_releaseCallback.enqueue(callback)
}

def combineReleaseCallbacks(
other: HostMemoryBuffersWithMetaDataBase): Unit = {
while (_releaseCallback.nonEmpty) {
other._releaseCallback.enqueue(_releaseCallback.dequeue())
}
}

private val _releaseCallback: mutable.Queue[() => Unit] = mutable.Queue.empty

// This close method is idempotent, since both SingleHMBAndMeta.close and releaseResource
// keep idempotent.
override def close(): Unit = {
memBuffersAndSizes.safeClose()
releaseResource()
}
}

Expand Down Expand Up @@ -199,7 +178,8 @@ object MultiFileReaderThreadPool extends Logging {
name,
pool,
numThreads,
boundedConf.waitMemTimeoutMs)
boundedConf.waitMemTimeoutMs,
isStageLevel = conf.stageLevelPool)
}
threadPoolExecutor.allowCoreThreadTimeOut(true)
threadPoolExecutor
Expand Down Expand Up @@ -411,16 +391,21 @@ case class DefaultThreadPoolConf(
case class MemoryBoundedPoolConf(
maxThreadNumber: Int,
stageLevelPool: Boolean,
memoryCapacity: Long, // The maximum host memory being used in bytes, must be > 0
waitMemTimeoutMs: Long // The timeout for acquiring host memory in milliseconds
) extends ThreadPoolConf
// The maximum host memory being used in bytes, must be > 0
memoryCapacity: Long,
// The timeout for acquiring host memory in milliseconds
waitMemTimeoutMs: Long,
// When closing a MemoryBoundedAsyncRunner, the maximum retry attempts waiting for all
// allocated host memory buffers to be closed
maxRetriesOnClose: Int) extends ThreadPoolConf

class ThreadPoolConfBuilder(
private val maxThreadNumber: Int,
private val isMemoryBounded: Boolean,
private val memoryCapacityFromDriver: Long,
private val timeoutMs: Long,
private val stageLevelPool: Boolean
private val stageLevelPool: Boolean,
private val maxRetriesOnClose: Int
) extends Logging with Serializable {

// Finalize the ThreadPoolConf, which mainly determines the memory capacity of the
Expand Down Expand Up @@ -453,7 +438,8 @@ class ThreadPoolConfBuilder(
maxThreadNumber = maxThreadNumber,
stageLevelPool = stageLevelPool,
memoryCapacity = memCap,
waitMemTimeoutMs = timeoutMs)
waitMemTimeoutMs = timeoutMs,
maxRetriesOnClose = maxRetriesOnClose)
}
}
}
Expand All @@ -466,7 +452,8 @@ object ThreadPoolConfBuilder {
conf.enableMultiThreadReadMemoryLimit,
conf.multiThreadReadMemoryLimit,
conf.multiThreadReadMemoryAcquireTimeout,
conf.multiThreadReadStageLevelPool)
conf.multiThreadReadStageLevelPool,
conf.multiThreadReadMaxBufferCloseWaitRetries)
}

// Set an extremely large memory capacity by default, so that the thread pool can be used
Expand Down Expand Up @@ -683,16 +670,6 @@ abstract class MultiFileCloudPartitionReaderBase(

// Unwrap RunnerResult to facilitate the combination of file buffers.
private def convertAsyncResult(taskRet: RunnerResult): BufferInfo = {
taskRet.releaseHook.foreach { callback =>
taskRet.data match {
// If the task result is empty, call the release callback ASAP.
case bufWithMeta if bufWithMeta.bytesRead == 0 =>
callback()
// inject the release callback for deferred release
case bufWithMeta =>
bufWithMeta.addReleaseResourceCallback(callback)
}
}
taskRet.data.setScheduleTime(taskRet.metrics.scheduleTimeMs)
taskRet.data
}
Expand Down
16 changes: 16 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,19 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.createWithDefault(false)
}

val MULTITHREAD_READ_MAX_BUFFER_CLOSE_WAIT_RETRIES = {
conf("spark.rapids.sql.multiThreadedRead.maxBufferCloseWaitRetries")
.doc("The maximum number of retry attempts (each waiting 30 seconds) for all host " +
"memory buffers to be closed when closing a MemoryBoundedAsyncRunner. If the " +
"retry count exceeds this threshold, an exception will be thrown to prevent " +
"indefinite hangs caused by leaked buffer references. This helps detect resource " +
"management bugs early.")
.internal()
.integerConf
.checkValue(v => v > 0, "The max retry attempts must be greater than zero")
.createWithDefault(4)
} // 4 retries * 30s = 120 seconds max wait

val ENABLE_PARQUET = conf("spark.rapids.sql.format.parquet.enabled")
.doc("When set to false disables all parquet input and output acceleration")
.booleanConf
Expand Down Expand Up @@ -3425,6 +3438,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
lazy val multiThreadReadStageLevelPool: Boolean =
get(MULTITHREAD_READ_MEMORY_LIMIT_TEST_PER_STAGE_POOL)

lazy val multiThreadReadMaxBufferCloseWaitRetries: Int =
get(MULTITHREAD_READ_MAX_BUFFER_CLOSE_WAIT_RETRIES)

lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET)

lazy val isParquetInt96WriteEnabled: Boolean = get(ENABLE_PARQUET_INT96_WRITE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,19 @@ object SpillableHostBuffer {
new SpillableHostBuffer(SpillableHostBufferHandle(buffer), length)
}

def sliceWithRetry(shb: SpillableHostBuffer, start: Long, len: Long): HostMemoryBuffer = {
/**
* Materialize and slice a SpillableHostBuffer guarded by retry.
*/
def sliceWithRetry(shb: SpillableHostBuffer,
start: Long, len: Long): HostMemoryBuffer = {
withRetryNoSplit[HostMemoryBuffer] {
withResource(shb.getHostBuffer())(_.slice(start, len))
withResource(shb.getHostBuffer()) { buf =>
// transfer the event handler to the sliced buffer before closing the original buffer
val sliced = buf.slice(start, len)
sliced.setEventHandler(buf.getEventHandler)
buf.setEventHandler(null)
sliced
}
}
}
}
Loading