From d44c932f925111d4e9ac6249e8f92f5ac3ee6563 Mon Sep 17 00:00:00 2001 From: Essam Aly Date: Thu, 21 May 2026 18:07:34 +0000 Subject: [PATCH 1/5] Adding rocAL+rocJPEG decode performance harness --- .../loaders/image/image_read_and_decode.h | 3 + .../loaders/image/image_read_and_decode.cpp | 191 +++++++--- .../dataloader_multithread.cpp | 43 ++- tests/cpp_api/rocjpeg_decode_perf/README.md | 326 ++++++++++++++++++ .../perf_sharded_launcher.cpp | 194 +++++++++++ .../reporting_perf_sharded_results.sh | 82 +++++ .../reporting_test_results.sh | 207 +++++++++++ .../rocal_decode_call_bench.py | 286 +++++++++++++++ .../run_tests_twice_solution_on_off.sh | 101 ++++++ 9 files changed, 1377 insertions(+), 56 deletions(-) create mode 100644 tests/cpp_api/rocjpeg_decode_perf/README.md create mode 100644 tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp create mode 100755 tests/cpp_api/rocjpeg_decode_perf/reporting_perf_sharded_results.sh create mode 100755 tests/cpp_api/rocjpeg_decode_perf/reporting_test_results.sh create mode 100644 tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py create mode 100755 tests/cpp_api/rocjpeg_decode_perf/run_tests_twice_solution_on_off.sh diff --git a/rocAL/include/loaders/image/image_read_and_decode.h b/rocAL/include/loaders/image/image_read_and_decode.h index 6f86ba897..5e6847f09 100644 --- a/rocAL/include/loaders/image/image_read_and_decode.h +++ b/rocAL/include/loaders/image/image_read_and_decode.h @@ -83,6 +83,9 @@ class ImageReadAndDecode { private: std::vector> _decoder; std::shared_ptr _rocjpeg_decoder; + std::vector> _rocjpeg_decoders; + std::vector _rocjpeg_sub_batch_sizes; + bool _use_rocjpeg_dedicated_omp_split = true; std::shared_ptr _reader; std::vector> _compressed_buff; std::vector _actual_read_size; diff --git a/rocAL/source/loaders/image/image_read_and_decode.cpp b/rocAL/source/loaders/image/image_read_and_decode.cpp index 922e50cf9..5d485b4bd 100644 --- a/rocAL/source/loaders/image/image_read_and_decode.cpp +++ b/rocAL/source/loaders/image/image_read_and_decode.cpp @@ -22,7 +22,9 @@ THE SOFTWARE. #include "loaders/image/image_read_and_decode.h" +#include #include +#include #include #include @@ -66,6 +68,7 @@ ImageReadAndDecode::~ImageReadAndDecode() { void ImageReadAndDecode::create(ReaderConfig reader_config, DecoderConfig decoder_config, int batch_size, int device_id) { // Can initialize it to any decoder types if needed _batch_size = batch_size; + _num_threads = reader_config.get_cpu_num_threads(); _compressed_buff.resize(batch_size); _decoder.resize(batch_size); _actual_read_size.resize(batch_size); @@ -91,8 +94,30 @@ void ImageReadAndDecode::create(ReaderConfig reader_config, DecoderConfig decode for (int i = 0; i < batch_size; i++) { _compressed_buff[i].resize(MAX_COMPRESSED_SIZE); // If we don't need MAX_COMPRESSED_SIZE we can remove this & resize in load module } - _rocjpeg_decoder = create_decoder(decoder_config); - _rocjpeg_decoder->initialize(device_id, batch_size); + const char *rocjpeg_omp_split_env = std::getenv("ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT"); + _use_rocjpeg_dedicated_omp_split = !(rocjpeg_omp_split_env && + (std::strcmp(rocjpeg_omp_split_env, "0") == 0 || + std::strcmp(rocjpeg_omp_split_env, "OFF") == 0 || + std::strcmp(rocjpeg_omp_split_env, "off") == 0 || + std::strcmp(rocjpeg_omp_split_env, "FALSE") == 0 || + std::strcmp(rocjpeg_omp_split_env, "false") == 0)); + if (_use_rocjpeg_dedicated_omp_split) { + const size_t rocjpeg_decoder_count = std::min(static_cast(batch_size), std::max(static_cast(1), std::min(static_cast(4), _num_threads))); + _rocjpeg_decoders.resize(rocjpeg_decoder_count); + _rocjpeg_sub_batch_sizes.resize(rocjpeg_decoder_count); + + const size_t base_sub_batch = static_cast(batch_size) / rocjpeg_decoder_count; + const size_t sub_batch_remainder = static_cast(batch_size) % rocjpeg_decoder_count; + for (size_t decoder_index = 0; decoder_index < rocjpeg_decoder_count; decoder_index++) { + const size_t sub_batch_size = base_sub_batch + ((decoder_index < sub_batch_remainder) ? 1 : 0); + _rocjpeg_sub_batch_sizes[decoder_index] = sub_batch_size; + _rocjpeg_decoders[decoder_index] = create_decoder(decoder_config); + _rocjpeg_decoders[decoder_index]->initialize(device_id, sub_batch_size); + } + } else { + _rocjpeg_decoder = create_decoder(decoder_config); + _rocjpeg_decoder->initialize(device_id, batch_size); + } } else { for (int i = 0; i < batch_size; i++) { _compressed_buff[i].resize(MAX_COMPRESSED_SIZE); // If we don't need MAX_COMPRESSED_SIZE we can remove this & resize in load module @@ -101,7 +126,6 @@ void ImageReadAndDecode::create(ReaderConfig reader_config, DecoderConfig decode } } } - _num_threads = reader_config.get_cpu_num_threads(); _reader = create_reader(reader_config); _is_external_source = (reader_config.type() == StorageType::EXTERNAL_FILE_SOURCE); } @@ -350,53 +374,132 @@ ImageReadAndDecode::load(unsigned char *buff, _set_device_id = true; } #endif - // Iterate through each image in the batch and obtain the decode info - for (size_t i = 0; i < _batch_size; i++) { - _actual_decoded_width[i] = max_decoded_width; - _actual_decoded_height[i] = max_decoded_height; - int original_width, original_height, decoded_width, decoded_height; - if (_rocjpeg_decoder->decode_info(_compressed_buff[i].data(), _actual_read_size[i], &original_width, &original_height, - &decoded_width, &decoded_height, - max_decoded_width, max_decoded_height, decoder_color_format, i) != Decoder::Status::OK) { - // Substituting the image which failed decoding with other image from the same batch - int j = ((i + 1) != _batch_size) ? _batch_size - 1 : _batch_size - 2; - while ((j >= 0)) { - if (_rocjpeg_decoder->decode_info(_compressed_buff[j].data(), _actual_read_size[j], &original_width, &original_height, - &decoded_width, &decoded_height, - max_decoded_width, max_decoded_height, decoder_color_format, i) == Decoder::Status::OK) { - _image_names[i] = _image_names[j]; - _compressed_buff[i] = _compressed_buff[j]; - _actual_read_size[i] = _actual_read_size[j]; - _compressed_image_size[i] = _compressed_image_size[j]; - break; - } else - j--; - if (j < 0) { - THROW("All images in the batch failed decoding with rocJpeg decoder\n"); + if (_use_rocjpeg_dedicated_omp_split) { + std::vector rocjpeg_sub_batch_offsets(_rocjpeg_sub_batch_sizes.size(), 0); + for (size_t shard = 1; shard < _rocjpeg_sub_batch_sizes.size(); shard++) { + rocjpeg_sub_batch_offsets[shard] = rocjpeg_sub_batch_offsets[shard - 1] + _rocjpeg_sub_batch_sizes[shard - 1]; + } + + const int rocjpeg_decoder_threads = static_cast(_rocjpeg_decoders.size()); +#pragma omp parallel for num_threads(rocjpeg_decoder_threads) + for (size_t shard = 0; shard < _rocjpeg_decoders.size(); shard++) { +#if ENABLE_HIP + hipError_t hip_status = hipSetDevice(_device_id); + if (hip_status != hipSuccess) { + THROW("hipSetDevice failed inside rocJPEG shard worker"); + } +#endif + auto& rocjpeg_decoder = _rocjpeg_decoders[shard]; + const size_t shard_begin = rocjpeg_sub_batch_offsets[shard]; + const size_t shard_size = _rocjpeg_sub_batch_sizes[shard]; + const size_t shard_end = shard_begin + shard_size; + + for (size_t i = shard_begin; i < shard_end; i++) { + const size_t local_index = i - shard_begin; + _actual_decoded_width[i] = max_decoded_width; + _actual_decoded_height[i] = max_decoded_height; + int original_width, original_height, decoded_width, decoded_height; + if (rocjpeg_decoder->decode_info(_compressed_buff[i].data(), _actual_read_size[i], &original_width, &original_height, + &decoded_width, &decoded_height, + max_decoded_width, max_decoded_height, decoder_color_format, static_cast(local_index)) != Decoder::Status::OK) { + int j = static_cast(shard_end) - 1; + while (j >= static_cast(shard_begin)) { + if (rocjpeg_decoder->decode_info(_compressed_buff[j].data(), _actual_read_size[j], &original_width, &original_height, + &decoded_width, &decoded_height, + max_decoded_width, max_decoded_height, decoder_color_format, static_cast(local_index)) == Decoder::Status::OK) { + _image_names[i] = _image_names[j]; + _compressed_buff[i] = _compressed_buff[j]; + _actual_read_size[i] = _actual_read_size[j]; + _compressed_image_size[i] = _compressed_image_size[j]; + break; + } else { + j--; + } + if (j < static_cast(shard_begin)) { + THROW("All images in the rocJpeg sub-batch failed decoding\n"); + } + } + } + _original_height[i] = original_height; + _original_width[i] = original_width; + _actual_decoded_width[i] = decoded_width; + _actual_decoded_height[i] = decoded_height; + + if (rocjpeg_decoder->is_cropped_decoder()) { + if (_randombboxcrop_meta_data_reader) { + rocjpeg_decoder->set_bbox_coords(_bbox_coords[i]); + } else if (_random_crop_dec_param) { + Shape dec_shape = {_original_height[i], _original_width[i]}; + auto crop_window = _random_crop_dec_param->generate_crop_window(dec_shape, i); + rocjpeg_decoder->set_crop_window(crop_window); + } } } - } - _original_height[i] = original_height; - _original_width[i] = original_width; - _actual_decoded_width[i] = decoded_width; - _actual_decoded_height[i] = decoded_height; - if (_rocjpeg_decoder->is_cropped_decoder()) { - if (_randombboxcrop_meta_data_reader) { - _rocjpeg_decoder->set_bbox_coords(_bbox_coords[i]); - } else if (_random_crop_dec_param) { - Shape dec_shape = {_original_height[i], _original_width[i]}; - auto crop_window = _random_crop_dec_param->generate_crop_window(dec_shape, i); - _rocjpeg_decoder->set_crop_window(crop_window); + std::vector shard_output(_decompressed_buff_ptrs.begin() + shard_begin, _decompressed_buff_ptrs.begin() + shard_end); + std::vector shard_original_width(_original_width.begin() + shard_begin, _original_width.begin() + shard_end); + std::vector shard_original_height(_original_height.begin() + shard_begin, _original_height.begin() + shard_end); + std::vector shard_actual_decoded_width(_actual_decoded_width.begin() + shard_begin, _actual_decoded_width.begin() + shard_end); + std::vector shard_actual_decoded_height(_actual_decoded_height.begin() + shard_begin, _actual_decoded_height.begin() + shard_end); + + if (rocjpeg_decoder->decode_batch(shard_output, + max_decoded_width, max_decoded_height, + shard_original_width, shard_original_height, + shard_actual_decoded_width, shard_actual_decoded_height) != Decoder::Status::OK) { + } + + std::copy(shard_actual_decoded_width.begin(), shard_actual_decoded_width.end(), _actual_decoded_width.begin() + shard_begin); + std::copy(shard_actual_decoded_height.begin(), shard_actual_decoded_height.end(), _actual_decoded_height.begin() + shard_begin); + } + } else { + // Iterate through each image in the batch and obtain the decode info + for (size_t i = 0; i < _batch_size; i++) { + _actual_decoded_width[i] = max_decoded_width; + _actual_decoded_height[i] = max_decoded_height; + int original_width, original_height, decoded_width, decoded_height; + if (_rocjpeg_decoder->decode_info(_compressed_buff[i].data(), _actual_read_size[i], &original_width, &original_height, + &decoded_width, &decoded_height, + max_decoded_width, max_decoded_height, decoder_color_format, i) != Decoder::Status::OK) { + // Substituting the image which failed decoding with other image from the same batch + int j = ((i + 1) != _batch_size) ? _batch_size - 1 : _batch_size - 2; + while ((j >= 0)) { + if (_rocjpeg_decoder->decode_info(_compressed_buff[j].data(), _actual_read_size[j], &original_width, &original_height, + &decoded_width, &decoded_height, + max_decoded_width, max_decoded_height, decoder_color_format, i) == Decoder::Status::OK) { + _image_names[i] = _image_names[j]; + _compressed_buff[i] = _compressed_buff[j]; + _actual_read_size[i] = _actual_read_size[j]; + _compressed_image_size[i] = _compressed_image_size[j]; + break; + } else + j--; + if (j < 0) { + THROW("All images in the batch failed decoding with rocJpeg decoder\n"); + } + } + } + _original_height[i] = original_height; + _original_width[i] = original_width; + _actual_decoded_width[i] = decoded_width; + _actual_decoded_height[i] = decoded_height; + + if (_rocjpeg_decoder->is_cropped_decoder()) { + if (_randombboxcrop_meta_data_reader) { + _rocjpeg_decoder->set_bbox_coords(_bbox_coords[i]); + } else if (_random_crop_dec_param) { + Shape dec_shape = {_original_height[i], _original_width[i]}; + auto crop_window = _random_crop_dec_param->generate_crop_window(dec_shape, i); + _rocjpeg_decoder->set_crop_window(crop_window); + } } } - } - - if (_rocjpeg_decoder->decode_batch(_decompressed_buff_ptrs, - max_decoded_width, max_decoded_height, - _original_width, _original_height, - _actual_decoded_width, _actual_decoded_height) != Decoder::Status::OK) { + if (_rocjpeg_decoder->decode_batch(_decompressed_buff_ptrs, + max_decoded_width, max_decoded_height, + _original_width, _original_height, + _actual_decoded_width, _actual_decoded_height) != Decoder::Status::OK) { + + } } } diff --git a/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp b/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp index 3d174b59b..8ff3f9168 100644 --- a/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp +++ b/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp @@ -23,6 +23,8 @@ THE SOFTWARE. */ #include +#include +#include #include #include #include @@ -53,16 +55,29 @@ using namespace cv; using namespace std::chrono; std::mutex g_mtx; // mutex for critical section -int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, int shard_id, int num_shards, int dec_width, int dec_height, int batch_size, bool shuffle, bool display, int dec_mode) { +int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, int shard_id, int num_shards, int dec_width, int dec_height, int batch_size, bool shuffle, bool display, int dec_mode, int cpu_thread_count) { std::unique_lock lck(g_mtx, std::defer_lock); std::cout << "Running on " << (gpu_mode >= 0 ? "GPU: " : "CPU: ") << gpu_mode << std::endl; std::cout << "shard_id: " << shard_id << std::endl; color_format = RocalImageColor::ROCAL_COLOR_RGB24; int gpu_id = (gpu_mode < 0) ? 0 : gpu_mode; RocalDecoderType dec_type = (RocalDecoderType)dec_mode; + const char *rocjpeg_omp_split_env = std::getenv("ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT"); + const bool rocjpeg_omp_split_enabled = !(rocjpeg_omp_split_env && + (std::strcmp(rocjpeg_omp_split_env, "0") == 0 || + std::strcmp(rocjpeg_omp_split_env, "OFF") == 0 || + std::strcmp(rocjpeg_omp_split_env, "off") == 0 || + std::strcmp(rocjpeg_omp_split_env, "FALSE") == 0 || + std::strcmp(rocjpeg_omp_split_env, "false") == 0)); + const int rocjpeg_decoder_threads = std::max(1, std::min(4, cpu_thread_count)); + const int effective_batch_size = (dec_mode == 4 && rocjpeg_omp_split_enabled) ? batch_size * rocjpeg_decoder_threads : batch_size; + if (effective_batch_size != batch_size) { + std::cout << "per-decoder batch size: " << batch_size + << " effective rocAL batch size: " << effective_batch_size << std::endl; + } lck.lock(); // looks like OpenVX has some issue loading kernels from multiple threads at the same time - auto handle = rocalCreate(batch_size, (gpu_mode < 0) ? RocalProcessMode::ROCAL_PROCESS_CPU : RocalProcessMode::ROCAL_PROCESS_GPU, gpu_id, 1); + auto handle = rocalCreate(effective_batch_size, (gpu_mode < 0) ? RocalProcessMode::ROCAL_PROCESS_CPU : RocalProcessMode::ROCAL_PROCESS_GPU, gpu_id, cpu_thread_count); lck.unlock(); if (rocalGetStatus(handle) != ROCAL_OK) { std::cout << "Could not create the Rocal context" @@ -110,7 +125,7 @@ int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, in /*>>>>>>>>>>>>>>>>>>> Diplay using OpenCV <<<<<<<<<<<<<<<<<*/ int n = rocalGetAugmentationBranchCount(handle); - int h = n * rocalGetOutputHeight(handle) * batch_size; + int h = n * rocalGetOutputHeight(handle) * effective_batch_size; int w = rocalGetOutputWidth(handle); int p = (((color_format == RocalImageColor::ROCAL_COLOR_RGB24) || (color_format == RocalImageColor::ROCAL_COLOR_RGB_PLANAR)) @@ -128,8 +143,8 @@ int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, in high_resolution_clock::time_point t1 = high_resolution_clock::now(); int counter = 0; std::vector names; - names.resize(batch_size); - std::vector image_name_length(batch_size); + names.resize(effective_batch_size); + std::vector image_name_length(effective_batch_size); if (DISPLAY) cv::namedWindow("output", CV_WINDOW_AUTOSIZE); @@ -139,17 +154,16 @@ int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, in rocalRelease(handle); return -1; } - // copy output to host as image - rocalCopyToOutput(handle, mat_input.data, h * w * p); + counter += effective_batch_size; +#if PRINT_NAMES_AND_LABELS unsigned img_name_size = rocalGetImageNameLen(handle, image_name_length.data()); std::vector img_name(img_name_size); rocalGetImageName(handle, img_name.data()); -#if PRINT_NAMES_AND_LABELS RocalTensorList labels = rocalGetImageLabels(handle); std::string imageNamesStr(img_name.data()); int pos = 0; int *labels_buffer = reinterpret_cast(labels->at(0)->buffer()); - for (int i = 0; i < batch_size; i++) { + for (int i = 0; i < effective_batch_size; i++) { names[i] = imageNamesStr.substr(pos, image_name_length[i]); pos += image_name_length[i]; std::cout << "name: " << names[i] << " label: " << labels_buffer[i] << " - "; @@ -158,6 +172,8 @@ int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, in #endif if (!display) continue; + // copy output to host as image + rocalCopyToOutput(handle, mat_input.data, h * w * p); mat_input.copyTo(mat_output(cv::Rect(col_counter * w, 0, w, h))); cv::cvtColor(mat_output, mat_color, CV_RGB2BGR); if (DISPLAY) @@ -166,7 +182,6 @@ int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, in cv::imwrite("output.png", mat_color); col_counter = (col_counter + 1) % number_of_cols; - counter += batch_size; } high_resolution_clock::time_point t2 = high_resolution_clock::now(); @@ -193,7 +208,7 @@ int main(int argc, const char **argv) { const int MIN_ARG_COUNT = 2; if (argc < MIN_ARG_COUNT) { std::cout << "Usage: dataloader_multithread " << - "num_shards decode_width decode_height batch_size shuffle display_on_off dec_mode<0(tjpeg)/1(opencv)/2(hwdec)>" << std::endl; + "num_shards decode_width decode_height batch_size shuffle display_on_off dec_mode<0(tjpeg)/4(rocjpeg)> cpu_thread_count" << std::endl; return -1; } int argIdx = 1; @@ -206,6 +221,7 @@ int main(int argc, const char **argv) { bool shuffle = 0; int num_gpus = 0; int dec_mode = 0; + int cpu_thread_count = 1; if (argc > argIdx) num_gpus = atoi(argv[argIdx++]); @@ -231,6 +247,9 @@ int main(int argc, const char **argv) { if (argc > argIdx) dec_mode = atoi(argv[argIdx++]); + if (argc > argIdx) + cpu_thread_count = atoi(argv[argIdx++]); + std::cout << "Number of GPUs: " << num_gpus << std::endl; // launch threads process shards @@ -239,7 +258,7 @@ int main(int argc, const char **argv) { int th_id; for (th_id = 0; th_id < num_shards; th_id++) { loader_threads[th_id] = std::thread(thread_func, path, gpu_id, RocalImageColor::ROCAL_COLOR_RGB24, th_id, num_shards, decode_width, decode_height, inputBatchSize, - shuffle, display, dec_mode); + shuffle, display, dec_mode, cpu_thread_count); if (num_gpus) gpu_id = (gpu_id + 1) % num_gpus; } for (auto &th : loader_threads) { diff --git a/tests/cpp_api/rocjpeg_decode_perf/README.md b/tests/cpp_api/rocjpeg_decode_perf/README.md new file mode 100644 index 000000000..f8d69441b --- /dev/null +++ b/tests/cpp_api/rocjpeg_decode_perf/README.md @@ -0,0 +1,326 @@ +# rocJPEG Decode Performance Harness + +This folder contains a small manual performance and validation harness for +testing rocAL image decode behavior with rocJPEG and TurboJPEG. It is intended +for multi-GPU sharded decode experiments, comparing the rocAL + rocJPEG path +with the dedicated OpenMP split enabled and disabled. + +Suggested location in rocAL: + +```text +tests/cpp_api/rocjpeg_decode_perf/ +``` + +## Files + +```text +perf_sharded_launcher.cpp +reporting_perf_sharded_results.sh +reporting_test_results.sh +rocal_decode_call_bench.py +run_tests_twice_solution_on_off.sh +``` + +### `perf_sharded_launcher.cpp` + +Standalone C++ launcher for running `jpegdecodeperf` across multiple GPUs. +It recursively scans an image dataset, creates per-GPU shard directories using +symlinks, launches one `jpegdecodeperf` process per GPU, and writes per-GPU logs +to `LOG_DIR`-style files named `jpegdecodeperf_gpu.log`. + +Expected usage: + +```bash +./perf_sharded_launcher [batch_size=32] [threads=4] [fmt=rgb] [work_dir=/tmp/rocjpeg_decode_perf/shards] [log_dir=/tmp/rocjpeg_decode_perf] +``` + +Example: + +```bash +./perf_sharded_launcher \ + /path/to/image_dataset \ + 8 \ + /path/to/jpegdecodeperf \ + 32 \ + 4 \ + rgb +``` + +### `reporting_perf_sharded_results.sh` + +Summarizes the `jpegdecodeperf_gpu.log` files generated by +`perf_sharded_launcher.cpp`. It reports decoded image counts, average decode +time, and wall/max decode time across GPU shards. + +Expected usage: + +```bash +export LOG_DIR=/tmp/rocjpeg_decode_perf +export DATASET_LABEL=ImageNet +./reporting_perf_sharded_results.sh +``` + +### `rocal_decode_call_bench.py` + +Python rocAL benchmark for exercising `fn.readers.file` and +`fn.decoders.image`. It can run one shard in-process or launch one worker +process per shard for multi-GPU tests. The script prints total decoded image +count, total processing time, average per-image time, and rocAL internal timing +fields when available. + +Expected usage: + +```bash +python3 rocal_decode_call_bench.py \ + --path \ + --device gpu \ + --batch-size 32 \ + --num-threads 4 \ + --device-id 0 \ + --num-gpus \ + --num-shards +``` + +For TurboJPEG comparison through the CPU decode path: + +```bash +python3 rocal_decode_call_bench.py \ + --path \ + --device cpu \ + --batch-size 32 \ + --num-threads 4 \ + --device-id 0 \ + --num-gpus \ + --num-shards +``` + +### `run_tests_twice_solution_on_off.sh` + +Main rocAL benchmark driver. It runs six cases and stores logs in `LOG_DIR`: + +```text +C++ rocAL + rocJPEG, solution OFF +C++ rocAL + rocJPEG, solution ON +C++ rocAL + TurboJPEG +Python rocAL + rocJPEG, solution OFF +Python rocAL + rocJPEG, solution ON +Python rocAL + TurboJPEG +``` + +The script toggles `ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT` between `0` and `1` for +rocJPEG runs, then runs a TurboJPEG comparison path. + +Expected usage: + +```bash +export DATASET=/path/to/image_dataset +export ROCAL_CPP_BIN=/path/to/dataloader_multithread +export ROCM_PATH=/opt/rocm +export LOG_DIR=/tmp/rocjpeg_decode_perf + +./run_tests_twice_solution_on_off.sh +``` + +### `reporting_test_results.sh` + +Summarizes the six logs produced by `run_tests_twice_solution_on_off.sh`. It +prints image counts, per-GPU or per-shard decode times, average decode times, +and the calculated decode-time reduction/speedup for the rocJPEG split path. + +Expected usage: + +```bash +export LOG_DIR=/tmp/rocjpeg_decode_perf +export DATASET_LABEL=ImageNet +./reporting_test_results.sh +``` + +## Environment Variables + +### Required + +```bash +export DATASET=/path/to/image_dataset +export ROCAL_CPP_BIN=/path/to/dataloader_multithread +``` + +`DATASET` points to the input image directory. `ROCAL_CPP_BIN` points to the +compiled rocAL `dataloader_multithread` binary used by the C++ comparison runs. + +### Common Optional Variables + +```bash +export DATASET_LABEL=dataset +export GPU_COUNT=1 +export LOG_DIR=/tmp/rocjpeg_decode_perf +export WORKSPACE=/path/to/rocjpeg_decode_perf +export ROCAL_PY_BENCH=$WORKSPACE/rocal_decode_call_bench.py +``` + +`WORKSPACE` defaults to the directory containing the shell script, so it usually +does not need to be exported. `LOG_DIR` is shared by the run and reporting +scripts. Keep the same `LOG_DIR` value when generating and summarizing logs. + +### rocAL/ROCm Runtime Variables + +```bash +export ROCM_PATH=/opt/rocm +export ROCJPEG_DECODER_CREATE_LOG=1 +``` + +The scripts use `ROCM_PATH` to set `LD_LIBRARY_PATH` and `PYTHONPATH`. If +`ROCM_PATH` is not set, it defaults to `/opt/rocm`. + +### rocJPEG Split Toggle + +```bash +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=0 # disabled +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=1 # enabled +``` + +`run_tests_twice_solution_on_off.sh` sets this internally for the relevant +benchmark cases. + +## Build Notes + +If this folder is added under `tests/cpp_api/rocjpeg_decode_perf/`, keep it as a +separate CMake target. Do not place `perf_sharded_launcher.cpp` inside existing +folders such as `performance_tests/` or `dataloader_multithread/`, because those +folders glob `*.cpp` files into a single executable. + +Suggested minimal CMake target: + +```cmake +cmake_minimum_required(VERSION 3.10) +project(rocjpeg_decode_perf) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +add_executable(perf_sharded_launcher perf_sharded_launcher.cpp) +target_link_libraries(perf_sharded_launcher pthread) +``` + +If installing this with rocAL test assets, include the shell and Python files as +test support files rather than compiling them. + +## Typical Workflow + +### 1. Run rocAL C++ and Python comparisons + +```bash +cd /path/to/rocAL/tests/cpp_api/rocjpeg_decode_perf + +export DATASET=/path/to/image_dataset +export DATASET_LABEL=my_dataset +export ROCAL_CPP_BIN=/path/to/dataloader_multithread +export ROCM_PATH=/opt/rocm +export LOG_DIR=/tmp/rocjpeg_decode_perf + +./run_tests_twice_solution_on_off.sh 1 +./reporting_test_results.sh 1 +``` + +### 2. Run standalone jpegdecodeperf sharded comparison + +```bash +cd /path/to/rocAL/tests/cpp_api/rocjpeg_decode_perf + +g++ -std=c++17 -O2 -Wall perf_sharded_launcher.cpp -o perf_sharded_launcher + +export LOG_DIR=/tmp/rocjpeg_decode_perf + +./perf_sharded_launcher \ + /path/to/image_dataset \ + 8 \ + /path/to/jpegdecodeperf \ + 32 \ + 4 \ + rgb \ + "$LOG_DIR/shards" \ + "$LOG_DIR" + +export DATASET_LABEL=ImageNet +./reporting_perf_sharded_results.sh 8 +``` + +## Output Logs + +The scripts write logs to `LOG_DIR`. If `LOG_DIR` is not set, it defaults to: + +```text +/tmp/rocjpeg_decode_perf +``` + +rocAL comparison logs: + +```text +$LOG_DIR/rocjpeg_split_off_gpu.log +$LOG_DIR/rocjpeg_split_on_gpu.log +$LOG_DIR/turbojpeg_gpu.log +$LOG_DIR/py_rocjpeg_split_off_gpu.log +$LOG_DIR/py_rocjpeg_split_on_gpu.log +$LOG_DIR/py_turbojpeg_gpu.log +``` + +Standalone `jpegdecodeperf` logs: + +```text +$LOG_DIR/jpegdecodeperf_gpu0.log +$LOG_DIR/jpegdecodeperf_gpu1.log +... +``` + +## Folder Name + +This folder is named: + +```text +rocjpeg_decode_perf +``` + +The name reflects the work performed by these files. The harness focuses on +rocJPEG-backed image decode performance in rocAL, including dedicated split-path +comparisons, sharded multi-GPU decode runs, TurboJPEG comparison runs, and +summary reporting for the generated benchmark logs. + +## Example to Set Env Vars Before Running Any Script + +For a local workspace where this folder is under `/workspace/EssamWork` and the +test dataset is `/workspace/test_1300_files/train`, use: + +```bash +export WORKSPACE=/workspace/EssamWork +cd "$WORKSPACE" + +export DATASET=/workspace/test_1300_files/train +export DATASET_LABEL=test_1300_files +export ROCM_PATH=/opt/rocm +export LOG_DIR=/tmp/rocjpeg_decode_perf +export ROCAL_CPP_BIN=/workspace/rocAL/build/tests/cpp_api/dataloader_multithread_manual/dataloader_multithread +``` + +Then run the main rocAL comparison and report: + +```bash +./run_tests_twice_solution_on_off.sh 1 +./reporting_test_results.sh 1 +``` + +For the standalone `jpegdecodeperf` sharded launcher, compile and run: + +```bash +g++ -std=c++17 -O2 -Wall perf_sharded_launcher.cpp -o perf_sharded_launcher + +./perf_sharded_launcher \ + "$DATASET" \ + 1 \ + /path/to/jpegdecodeperf \ + 32 \ + 4 \ + rgb \ + "$LOG_DIR/shards" \ + "$LOG_DIR" + +./reporting_perf_sharded_results.sh 1 +``` diff --git a/tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp b/tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp new file mode 100644 index 000000000..a4e1558ce --- /dev/null +++ b/tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp @@ -0,0 +1,194 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + +static bool is_jpeg(const fs::path& path) { + std::string ext = path.extension().string(); + std::transform(ext.begin(), ext.end(), ext.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return ext == ".jpg" || ext == ".jpeg"; +} + +static void usage(const char* prog) { + std::cerr + << "Usage: " << prog << " [batch_size=32] [threads=4] [fmt=rgb] [work_dir=/tmp/rocjpeg_decode_perf/shards] [log_dir=/tmp/rocjpeg_decode_perf]\n" + << "\n" + << "Example:\n" + << " " << prog << " /path/to/images 8 /path/to/jpegdecodeperf\n"; +} + +int main(int argc, char** argv) { + if (argc < 4) { + usage(argv[0]); + return 1; + } + + const fs::path dataset_dir = argv[1]; + const int num_gpus = std::atoi(argv[2]); + const fs::path jpegdecodeperf_bin = argv[3]; + const std::string batch_size = (argc > 4) ? argv[4] : "32"; + const std::string threads = (argc > 5) ? argv[5] : "4"; + const std::string fmt = (argc > 6) ? argv[6] : "rgb"; + const fs::path work_dir = (argc > 7) ? fs::path(argv[7]) : fs::path("/tmp/rocjpeg_decode_perf/shards"); + const fs::path log_dir = (argc > 8) ? fs::path(argv[8]) : fs::path("/tmp/rocjpeg_decode_perf"); + + if (num_gpus <= 0) { + std::cerr << "num_gpus must be > 0\n"; + return 1; + } + + if (!fs::exists(dataset_dir)) { + std::cerr << "Dataset does not exist: " << dataset_dir << "\n"; + return 1; + } + + if (!fs::exists(jpegdecodeperf_bin)) { + std::cerr << "jpegdecodeperf binary does not exist: " << jpegdecodeperf_bin << "\n"; + return 1; + } + + std::vector files; + const auto options = fs::directory_options::follow_directory_symlink; + + for (const auto& entry : fs::recursive_directory_iterator(dataset_dir, options)) { + std::error_code ec; + if (fs::is_regular_file(entry.status(ec)) && !ec && is_jpeg(entry.path())) { + files.push_back(entry.path()); + } + } + + std::sort(files.begin(), files.end()); + + if (files.empty()) { + std::cerr << "No JPEG files found under: " << dataset_dir << "\n"; + return 1; + } + + fs::remove_all(work_dir); + fs::create_directories(work_dir); + fs::create_directories(log_dir); + + std::vector shard_counts(num_gpus, 0); + + for (size_t i = 0; i < files.size(); ++i) { + const int shard = static_cast(i % static_cast(num_gpus)); + const fs::path shard_dir = work_dir / ("shard_" + std::to_string(shard)); + fs::create_directories(shard_dir); + + const fs::path src = files[i]; + const std::string link_name = + src.parent_path().filename().string() + "_" + src.filename().string(); + const fs::path dst = shard_dir / link_name; + + std::error_code ec; + fs::create_symlink(src, dst, ec); + if (ec) { + std::cerr << "Failed to create symlink: " << dst << " -> " << src + << " error: " << ec.message() << "\n"; + return 1; + } + + shard_counts[shard]++; + } + + std::cout << "Total JPEG files: " << files.size() << "\n"; + for (int gpu = 0; gpu < num_gpus; ++gpu) { + std::cout << "GPU " << gpu << " shard files: " << shard_counts[gpu] << "\n"; + } + + std::vector pids; + + for (int gpu = 0; gpu < num_gpus; ++gpu) { + const fs::path shard_dir = work_dir / ("shard_" + std::to_string(gpu)); + const fs::path log_path = log_dir / ("jpegdecodeperf_gpu" + std::to_string(gpu) + ".log"); + + pid_t pid = fork(); + if (pid < 0) { + std::cerr << "fork failed: " << std::strerror(errno) << "\n"; + return 1; + } + + if (pid == 0) { + FILE* log_file = std::freopen(log_path.c_str(), "w", stdout); + if (!log_file) { + std::perror("freopen stdout"); + _exit(127); + } + + if (dup2(fileno(stdout), STDERR_FILENO) < 0) { + std::perror("dup2 stderr"); + _exit(127); + } + + const std::string gpu_id = std::to_string(gpu); + + execl( + jpegdecodeperf_bin.c_str(), + jpegdecodeperf_bin.c_str(), + "-i", shard_dir.c_str(), + "-b", batch_size.c_str(), + "-t", threads.c_str(), + "-fmt", fmt.c_str(), + "-d", gpu_id.c_str(), + static_cast(nullptr) + ); + + std::perror("execl jpegdecodeperf"); + _exit(127); + } + + pids.push_back(pid); + + std::cout << "Launched GPU " << gpu << " pid " << pid + << " log " << log_path << "\n"; + } + + int failures = 0; + + for (pid_t pid : pids) { + int status = 0; + + if (waitpid(pid, &status, 0) < 0) { + std::cerr << "waitpid failed for pid " << pid << ": " + << std::strerror(errno) << "\n"; + failures++; + continue; + } + + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + std::cerr << "Process pid " << pid << " failed"; + + if (WIFEXITED(status)) { + std::cerr << " exit code " << WEXITSTATUS(status); + } else if (WIFSIGNALED(status)) { + std::cerr << " signal " << WTERMSIG(status); + } + + std::cerr << "\n"; + failures++; + } + } + + if (failures) { + std::cerr << failures << " jpegdecodeperf process(es) failed\n"; + return 1; + } + + std::cout << "All jpegdecodeperf processes completed.\n"; + std::cout << "Logs: " << (log_dir / "jpegdecodeperf_gpu0.log") + << " ... " << (log_dir / ("jpegdecodeperf_gpu" + std::to_string(num_gpus - 1) + ".log")) << "\n"; + + return 0; +} diff --git a/tests/cpp_api/rocjpeg_decode_perf/reporting_perf_sharded_results.sh b/tests/cpp_api/rocjpeg_decode_perf/reporting_perf_sharded_results.sh new file mode 100755 index 000000000..9404c4245 --- /dev/null +++ b/tests/cpp_api/rocjpeg_decode_perf/reporting_perf_sharded_results.sh @@ -0,0 +1,82 @@ +#!/bin/bash + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +WORKSPACE="${WORKSPACE:-$SCRIPT_DIR}" +LOG_DIR="${LOG_DIR:-/tmp/rocjpeg_decode_perf}" + +GPU_COUNT="${1:-${GPU_COUNT:-1}}" +DATASET_LABEL="${DATASET_LABEL:-dataset}" + +check_log() { + if [ ! -f "$1" ]; then + echo "ERROR: Missing log file: $1" + exit 1 + fi +} + +extract_count() { + awk '/Total decoded images:/ {n=$4} END {if (n) printf "%d", n; else printf "0"}' "$1" +} + +extract_decode_sec() { + awk ' + /Total decoded images:/ {n=$4} + /Average processing time per image/ {ms=$7} + END { + if (n && ms) printf "%.6f", (n * ms) / 1000.0; + else printf "0.000000"; + } + ' "$1" +} + +echo "# Summary:" +echo "" +echo "### jpegdecodeperf sharded results with ${DATASET_LABEL}:" +echo "" +echo "GPU count: ${GPU_COUNT}" +echo "" + +TOTAL_IMAGES=0 +TOTAL_SEC=0 +COUNT=0 + +for gpu in $(seq 0 $((GPU_COUNT - 1))); do + LOG="$LOG_DIR/jpegdecodeperf_gpu${gpu}.log" + check_log "$LOG" + + IMAGES=$(extract_count "$LOG") + SEC=$(extract_decode_sec "$LOG") + + TOTAL_IMAGES=$(awk -v a="$TOTAL_IMAGES" -v b="$IMAGES" 'BEGIN {printf "%d", a + b}') + TOTAL_SEC=$(awk -v a="$TOTAL_SEC" -v b="$SEC" 'BEGIN {printf "%.6f", a + b}') + COUNT=$((COUNT + 1)) + + printf "\tGPU/device %-3s images decoded: %10d decode time: %12.6f seconds\n" "$gpu" "$IMAGES" "$SEC" +done + +AVG_SEC=$(awk -v total="$TOTAL_SEC" -v count="$COUNT" 'BEGIN {if (count > 0) printf "%.6f", total / count; else printf "0.000000"}') +MAX_SEC=$(awk ' + /Total decoded images:/ {n=$4} + /Average processing time per image/ { + ms=$7 + sec=(n * ms) / 1000.0 + if (sec > max) max=sec + } + END { + printf "%.6f", max + } +' "$LOG_DIR"/jpegdecodeperf_gpu*.log) + +echo "" +echo "Decoded image count:" +echo "" +echo " jpegdecodeperf total images decoded: ${TOTAL_IMAGES}" + +echo "" +echo "jpegdecodeperf sharded decode-time results:" +echo "" +printf "\tjpegdecodeperf average decode time: %12.6f seconds\n" "$AVG_SEC" +printf "\tjpegdecodeperf wall/max decode time: %12.6f seconds\n" "$MAX_SEC" +echo "" diff --git a/tests/cpp_api/rocjpeg_decode_perf/reporting_test_results.sh b/tests/cpp_api/rocjpeg_decode_perf/reporting_test_results.sh new file mode 100755 index 000000000..9284b3af9 --- /dev/null +++ b/tests/cpp_api/rocjpeg_decode_perf/reporting_test_results.sh @@ -0,0 +1,207 @@ +#!/bin/bash + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +WORKSPACE="${WORKSPACE:-$SCRIPT_DIR}" +LOG_DIR="${LOG_DIR:-/tmp/rocjpeg_decode_perf}" + +GPU_COUNT="${1:-${GPU_COUNT:-1}}" + +CPP_ROCJPEG_OFF_LOG="${CPP_ROCJPEG_OFF_LOG:-$LOG_DIR/rocjpeg_split_off_${GPU_COUNT}gpu.log}" +CPP_ROCJPEG_ON_LOG="${CPP_ROCJPEG_ON_LOG:-$LOG_DIR/rocjpeg_split_on_${GPU_COUNT}gpu.log}" +CPP_TURBOJPEG_LOG="${CPP_TURBOJPEG_LOG:-$LOG_DIR/turbojpeg_${GPU_COUNT}gpu.log}" + +PY_ROCJPEG_OFF_LOG="${PY_ROCJPEG_OFF_LOG:-$LOG_DIR/py_rocjpeg_split_off_${GPU_COUNT}gpu.log}" +PY_ROCJPEG_ON_LOG="${PY_ROCJPEG_ON_LOG:-$LOG_DIR/py_rocjpeg_split_on_${GPU_COUNT}gpu.log}" +PY_TURBOJPEG_LOG="${PY_TURBOJPEG_LOG:-$LOG_DIR/py_turbojpeg_${GPU_COUNT}gpu.log}" + +DATASET_LABEL="${DATASET_LABEL:-dataset}" + +check_log() { + if [ ! -f "$1" ]; then + echo "ERROR: Missing log file: $1" + exit 1 + fi +} + +print_line() { + label="$1" + sec="$2" + printf "\t%-52s %12.6f seconds\n" "$label" "$sec" +} + +extract_cpp_total_count() { + awk '/Remaining images/ {total += $3; count += 1} END {if (count > 0) printf "%d", total; else printf "0"}' "$1" +} + +extract_py_total_count() { + awk '/Total decoded images:/ {total += $4; count += 1} END {if (count > 0) printf "%d", total; else printf "0"}' "$1" +} + +extract_cpp_avg_decode_sec() { + awk ' + /Decode time:/ { + total += $3 / 1000000.0; + count += 1; + } + END { + if (count > 0) printf "%.6f", total / count; + else printf "0.000000"; + } + ' "$1" +} + +print_cpp_per_gpu_decode() { + awk ' + /For shard_id:/ { + shard=$3; + gsub(/[^0-9]/, "", shard); + } + /Decode time:/ { + sec=$3 / 1000000.0; + if (shard == "") shard=count; + printf "\tGPU/shard %-3s decode time: %12.6f seconds\n", shard, sec; + count += 1; + shard=""; + } + ' "$1" +} + +extract_py_avg_decode_sec() { + awk ' + /Average rocAL internal decode time across/ {val=$NF} + /^rocAL internal decode time/ { + total += $NF; + count += 1; + single=$NF; + } + END { + if (val != "") printf "%.6f", val; + else if (count > 1) printf "%.6f", total / count; + else if (single != "") printf "%.6f", single; + else printf "0.000000"; + } + ' "$1" +} + +print_py_per_gpu_decode() { + awk ' + /GPU\/device id:/ { + gpu=$NF; + } + /Shard id:/ { + shard=$NF; + } + /^rocAL internal decode time/ { + sec=$NF; + if (gpu != "" && shard != "") { + printf "\tGPU/device %-3s shard %-3s decode time: %12.6f seconds\n", gpu, shard, sec; + } else { + printf "\tdecode time: %12.6f seconds\n", sec; + } + gpu=""; + shard=""; + } + ' "$1" +} + +calc_improvement() { + awk -v off="$1" -v on="$2" 'BEGIN { + if (off > 0) printf "%.2f", ((off - on) / off) * 100.0; + else printf "0.00"; + }' +} + +calc_speedup() { + awk -v off="$1" -v on="$2" 'BEGIN { + if (on > 0) printf "%.2f", off / on; + else printf "0.00"; + }' +} + +check_log "$CPP_ROCJPEG_OFF_LOG" +check_log "$CPP_ROCJPEG_ON_LOG" +check_log "$CPP_TURBOJPEG_LOG" +check_log "$PY_ROCJPEG_OFF_LOG" +check_log "$PY_ROCJPEG_ON_LOG" +check_log "$PY_TURBOJPEG_LOG" + +CPP_ROCJPEG_OFF_COUNT=$(extract_cpp_total_count "$CPP_ROCJPEG_OFF_LOG") +CPP_ROCJPEG_ON_COUNT=$(extract_cpp_total_count "$CPP_ROCJPEG_ON_LOG") +CPP_TURBOJPEG_COUNT=$(extract_cpp_total_count "$CPP_TURBOJPEG_LOG") + +PY_ROCJPEG_OFF_COUNT=$(extract_py_total_count "$PY_ROCJPEG_OFF_LOG") +PY_ROCJPEG_ON_COUNT=$(extract_py_total_count "$PY_ROCJPEG_ON_LOG") +PY_TURBOJPEG_COUNT=$(extract_py_total_count "$PY_TURBOJPEG_LOG") + +CPP_ROCJPEG_OFF_AVG=$(extract_cpp_avg_decode_sec "$CPP_ROCJPEG_OFF_LOG") +CPP_ROCJPEG_ON_AVG=$(extract_cpp_avg_decode_sec "$CPP_ROCJPEG_ON_LOG") +CPP_TURBOJPEG_AVG=$(extract_cpp_avg_decode_sec "$CPP_TURBOJPEG_LOG") + +PY_ROCJPEG_OFF_AVG=$(extract_py_avg_decode_sec "$PY_ROCJPEG_OFF_LOG") +PY_ROCJPEG_ON_AVG=$(extract_py_avg_decode_sec "$PY_ROCJPEG_ON_LOG") +PY_TURBOJPEG_AVG=$(extract_py_avg_decode_sec "$PY_TURBOJPEG_LOG") + +CPP_IMPROVEMENT=$(calc_improvement "$CPP_ROCJPEG_OFF_AVG" "$CPP_ROCJPEG_ON_AVG") +CPP_SPEEDUP=$(calc_speedup "$CPP_ROCJPEG_OFF_AVG" "$CPP_ROCJPEG_ON_AVG") + +PY_IMPROVEMENT=$(calc_improvement "$PY_ROCJPEG_OFF_AVG" "$PY_ROCJPEG_ON_AVG") +PY_SPEEDUP=$(calc_speedup "$PY_ROCJPEG_OFF_AVG" "$PY_ROCJPEG_ON_AVG") + +echo "# Summary:" +echo "" +echo "### With ${DATASET_LABEL}:" +echo "" +echo "GPU count: ${GPU_COUNT}" +echo "" + +echo "Decoded image count:" +echo "" +echo " C++ rocAL+rocJPEG OFF images decoded: ${CPP_ROCJPEG_OFF_COUNT}" +echo " C++ rocAL+rocJPEG ON images decoded: ${CPP_ROCJPEG_ON_COUNT}" +echo " C++ rocAL+TurboJPEG images decoded: ${CPP_TURBOJPEG_COUNT}" +echo " PY rocAL+rocJPEG OFF images decoded: ${PY_ROCJPEG_OFF_COUNT}" +echo " PY rocAL+rocJPEG ON images decoded: ${PY_ROCJPEG_ON_COUNT}" +echo " PY rocAL+TurboJPEG images decoded: ${PY_TURBOJPEG_COUNT}" + +echo "" +echo "### C++ rocAL sample decode-time results:" +echo "" +echo "Without rocAL patch solution" +print_cpp_per_gpu_decode "$CPP_ROCJPEG_OFF_LOG" +print_line "rocAL+rocJPEG C++ average decode time:" "$CPP_ROCJPEG_OFF_AVG" + +echo "" +echo "With rocAL patch solution" +print_cpp_per_gpu_decode "$CPP_ROCJPEG_ON_LOG" +print_line "rocAL+rocJPEG C++ average decode time:" "$CPP_ROCJPEG_ON_AVG" + +echo "" +echo "TurboJPEG one run" +print_cpp_per_gpu_decode "$CPP_TURBOJPEG_LOG" +print_line "rocAL+TurboJPEG C++ average decode time:" "$CPP_TURBOJPEG_AVG" + +echo "" +echo "### Python rocAL benchmark decode-time results:" +echo "" +echo "Without rocAL patch solution" +print_py_per_gpu_decode "$PY_ROCJPEG_OFF_LOG" +print_line "rocAL+rocJPEG PY average decode time:" "$PY_ROCJPEG_OFF_AVG" + +echo "" +echo "With rocAL patch solution" +print_py_per_gpu_decode "$PY_ROCJPEG_ON_LOG" +print_line "rocAL+rocJPEG PY average decode time:" "$PY_ROCJPEG_ON_AVG" + +echo "" +echo "TurboJPEG one run" +print_py_per_gpu_decode "$PY_TURBOJPEG_LOG" +print_line "rocAL+TurboJPEG PY average decode time:" "$PY_TURBOJPEG_AVG" + +echo "" +echo "The rocAL patch solution decode-time enhancements when used:" +echo "" +printf "\tC++ rocAL+rocJPEG enhancement: %s%% decode-time reduction, speedup around %sx\n" "$CPP_IMPROVEMENT" "$CPP_SPEEDUP" +printf "\tPY rocAL+rocJPEG enhancement: %s%% decode-time reduction, speedup around %sx\n" "$PY_IMPROVEMENT" "$PY_SPEEDUP" +echo "" diff --git a/tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py b/tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py new file mode 100644 index 000000000..a30ec5acd --- /dev/null +++ b/tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py @@ -0,0 +1,286 @@ +import argparse +import contextlib +import io +import multiprocessing as mp +import os +import sys +import time + +from amd.rocal.pipeline import pipeline_def +import amd.rocal.fn as fn +import amd.rocal.types as types + + +@pipeline_def(seed=1549361629) +def image_decoder_pipeline(device="cpu", path="", output_type="rgb", shard_id=0, num_shards=1): + jpegs, labels = fn.readers.file(file_root=path) + + output_type_map = { + "rgb": types.RGB, + "gray": types.GRAY, + } + + images = fn.decoders.image( + jpegs, + file_root=path, + device=device, + output_type=output_type_map[output_type], + shard_id=shard_id, + num_shards=num_shards, + random_shuffle=False, + ) + + return images + + +def parse_args(): + parser = argparse.ArgumentParser(description="Simple rocAL decode benchmark") + parser.add_argument("--path", required=True, help="Input image directory") + parser.add_argument("--device", choices=["cpu", "gpu"], required=True) + parser.add_argument("--batch-size", type=int, default=32) + parser.add_argument("--num-threads", type=int, default=4) + parser.add_argument("--device-id", type=int, default=0) + parser.add_argument("--num-gpus", type=int, default=1) + parser.add_argument("--num-shards", type=int, default=1) + parser.add_argument("--output-type", choices=["rgb", "gray"], default="rgb") + parser.add_argument( + "--total-files-on-disk", + type=int, + default=-1, + help="Optional known file count. If set, skip os.walk count.", + ) + return parser.parse_args() + + +def split_path_enabled(args): + split_mode = os.environ.get("ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT", "") + return ( + args.device == "gpu" + and split_mode not in ("0", "OFF", "off", "FALSE", "false") + ) + + +def effective_batch_size(args): + if not split_path_enabled(args): + return args.batch_size + + rocjpeg_decoder_threads = max(1, min(4, args.num_threads)) + return args.batch_size * rocjpeg_decoder_threads + + +def count_files(path): + total = 0 + for _, _, files in os.walk(path, followlinks=True): + total += len(files) + return total + + +def files_in_shard(total_files, shard_id, num_shards): + if total_files < 0: + return -1 + base = total_files // num_shards + remainder = total_files % num_shards + return base + (1 if shard_id < remainder else 0) + + +def extract_timing_info(pipe): + info = pipe.timing_info() + timing = {} + + for field in [ + "load_time", + "decode_time", + "process_time", + "transfer_time", + "rocjpeg_decode_only_time", + "turbojpeg_decode_only_time", + "rocjpeg_decode_image_count", + "turbojpeg_decode_image_count", + ]: + if hasattr(info, field): + timing[field] = getattr(info, field) + + return timing + + +def add_timing_info(total, batch_info): + for key, value in batch_info.items(): + total[key] = total.get(key, 0) + value + return total + + +def run_one_shard(args, shard_id, num_shards, device_id, total_files): + rocal_batch_size = effective_batch_size(args) + shard_file_count = files_in_shard(total_files, shard_id, num_shards) + + pipe = image_decoder_pipeline( + batch_size=rocal_batch_size, + num_threads=args.num_threads, + device_id=device_id, + rocal_cpu=(args.device == "cpu"), + tensor_layout=types.NHWC, + reverse_channels=True, + mean=[0, 0, 0], + std=[255, 255, 255], + device=args.device, + path=args.path, + output_type=args.output_type, + shard_id=shard_id, + num_shards=num_shards, + ) + + pipe.build() + + print(f"Requested batch size: {args.batch_size}") + if rocal_batch_size != args.batch_size: + print(f"Effective rocAL batch size: {rocal_batch_size}") + + print(f"GPU/device id: {device_id}") + print(f"Shard id: {shard_id}") + print(f"Num shards: {num_shards}") + print(f"Decoding started with {args.num_threads} threads, please wait!") + sys.stdout.flush() + + accumulated_timing_info = {} + + start_time = time.perf_counter() + + while pipe.get_remaining_images() > 0: + status = pipe.rocal_run() + if not status: + break + + accumulated_timing_info = add_timing_info( + accumulated_timing_info, + extract_timing_info(pipe), + ) + + total_elapsed_s = time.perf_counter() - start_time + + try: + pipe.rocal_run() + except Exception: + pass + + accumulated_timing_info = add_timing_info( + accumulated_timing_info, + extract_timing_info(pipe), + ) + + decoded_images = shard_file_count if shard_file_count >= 0 else total_files + + avg_time_per_image_ms = 0.0 + images_per_sec = 0.0 + + if decoded_images > 0 and total_elapsed_s > 0: + avg_time_per_image_ms = (total_elapsed_s * 1000.0) / decoded_images + images_per_sec = decoded_images / total_elapsed_s + + print(f"Total decoded images: {decoded_images}") + print(f"Total processing time (sec): {total_elapsed_s:.6f}") + print(f"Average processing time per image (ms): {avg_time_per_image_ms:.6f}") + print(f"Average decoded images per sec (Images/Sec): {images_per_sec:.2f}") + + decode_time_us = accumulated_timing_info.get("decode_time", 0) + load_time_us = accumulated_timing_info.get("load_time", 0) + process_time_us = accumulated_timing_info.get("process_time", 0) + + if decode_time_us: + print(f"rocAL internal decode time (sec): {decode_time_us / 1_000_000.0:.6f}") + + if load_time_us: + print(f"rocAL internal load time (sec): {load_time_us / 1_000_000.0:.6f}") + + if process_time_us: + print(f"rocAL internal process time (sec): {process_time_us / 1_000_000.0:.6f}") + + print("Decoding completed!") + return { + "device_id": device_id, + "shard_id": shard_id, + "num_shards": num_shards, + "decoded_images": decoded_images, + "total_elapsed_s": total_elapsed_s, + "decode_time_s": decode_time_us / 1_000_000.0 if decode_time_us else 0.0, + "load_time_s": load_time_us / 1_000_000.0 if load_time_us else 0.0, + "process_time_s": process_time_us / 1_000_000.0 if process_time_us else 0.0, + } + + +def run_worker(args, shard_id, num_shards, device_id, total_files, queue): + output = io.StringIO() + try: + with contextlib.redirect_stdout(output), contextlib.redirect_stderr(output): + result = run_one_shard(args, shard_id, num_shards, device_id, total_files) + queue.put((shard_id, True, result, output.getvalue())) + except Exception as exc: + queue.put((shard_id, False, repr(exc), output.getvalue())) + + +def print_multi_gpu_summary(results): + print("") + print("Per-GPU/shard processing time:") + for result in sorted(results, key=lambda item: item["shard_id"]): + print( + f"GPU/device {result['device_id']} shard {result['shard_id']} " + f"total processing time (sec): {result['total_elapsed_s']:.6f}" + ) + + if results: + avg_total = sum(item["total_elapsed_s"] for item in results) / len(results) + avg_decode = sum(item["decode_time_s"] for item in results) / len(results) + max_total = max(item["total_elapsed_s"] for item in results) + print(f"Average total processing time across {len(results)} GPUs/shards (sec): {avg_total:.6f}") + if avg_decode: + print(f"Average rocAL internal decode time across {len(results)} GPUs/shards (sec): {avg_decode:.6f}") + print(f"Wall-clock equivalent time across {len(results)} GPUs/shards (sec): {max_total:.6f}") + + +def main(): + args = parse_args() + + total_files = args.total_files_on_disk if args.total_files_on_disk >= 0 else count_files(args.path) + num_shards = max(1, args.num_shards) + num_gpus = max(1, args.num_gpus) + + if num_shards == 1: + run_one_shard(args, 0, 1, args.device_id, total_files) + return + + workers = [] + queue = mp.Queue() + for shard_id in range(num_shards): + device_id = args.device_id + (shard_id % num_gpus) + process = mp.Process( + target=run_worker, + args=(args, shard_id, num_shards, device_id, total_files, queue), + ) + process.start() + workers.append(process) + + worker_results = [] + failed = False + for _ in workers: + shard_id, ok, result, output = queue.get() + print("") + print(f"========== Worker output for shard {shard_id} ==========") + print(output, end="") + if ok: + worker_results.append(result) + else: + failed = True + print(f"ERROR: shard {shard_id} failed: {result}") + + for process in workers: + process.join() + if process.exitcode != 0: + failed = True + + if failed: + raise SystemExit(1) + + print_multi_gpu_summary(worker_results) + + +if __name__ == "__main__": + main() diff --git a/tests/cpp_api/rocjpeg_decode_perf/run_tests_twice_solution_on_off.sh b/tests/cpp_api/rocjpeg_decode_perf/run_tests_twice_solution_on_off.sh new file mode 100755 index 000000000..a5172fa1f --- /dev/null +++ b/tests/cpp_api/rocjpeg_decode_perf/run_tests_twice_solution_on_off.sh @@ -0,0 +1,101 @@ +#!/bin/bash + +set -e +set -o pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +WORKSPACE="${WORKSPACE:-$SCRIPT_DIR}" +ROCM_PATH="${ROCM_PATH:-/opt/rocm}" +LOG_DIR="${LOG_DIR:-/tmp/rocjpeg_decode_perf}" + +GPU_COUNT="${1:-${GPU_COUNT:-1}}" +SHARD_COUNT="$GPU_COUNT" + +if [ -z "${DATASET:-}" ]; then + echo "ERROR: DATASET is not set." + echo "Example: export DATASET=/path/to/image_dataset" + exit 1 +fi + +export LD_LIBRARY_PATH="$ROCM_PATH/lib${LD_LIBRARY_PATH:+:$LD_LIBRARY_PATH}" +export PYTHONPATH="$ROCM_PATH/lib${PYTHONPATH:+:$PYTHONPATH}" +export ROCJPEG_DECODER_CREATE_LOG="${ROCJPEG_DECODER_CREATE_LOG:-1}" + +if [ -z "${ROCAL_CPP_BIN:-}" ]; then + echo "ERROR: ROCAL_CPP_BIN is not set." + echo "Example: export ROCAL_CPP_BIN=/path/to/dataloader_multithread" + exit 1 +fi + +ROCAL_PY_BENCH="${ROCAL_PY_BENCH:-$WORKSPACE/rocal_decode_call_bench.py}" + +mkdir -p "$LOG_DIR" + +CPP_ROCJPEG_OFF_LOG="${CPP_ROCJPEG_OFF_LOG:-$LOG_DIR/rocjpeg_split_off_${GPU_COUNT}gpu.log}" +CPP_ROCJPEG_ON_LOG="${CPP_ROCJPEG_ON_LOG:-$LOG_DIR/rocjpeg_split_on_${GPU_COUNT}gpu.log}" +CPP_TURBOJPEG_LOG="${CPP_TURBOJPEG_LOG:-$LOG_DIR/turbojpeg_${GPU_COUNT}gpu.log}" + +PY_ROCJPEG_OFF_LOG="${PY_ROCJPEG_OFF_LOG:-$LOG_DIR/py_rocjpeg_split_off_${GPU_COUNT}gpu.log}" +PY_ROCJPEG_ON_LOG="${PY_ROCJPEG_ON_LOG:-$LOG_DIR/py_rocjpeg_split_on_${GPU_COUNT}gpu.log}" +PY_TURBOJPEG_LOG="${PY_TURBOJPEG_LOG:-$LOG_DIR/py_turbojpeg_${GPU_COUNT}gpu.log}" + +echo "WORKSPACE: $WORKSPACE" +echo "DATASET: $DATASET" +echo "GPU_COUNT: $GPU_COUNT" +echo "SHARD_COUNT: $SHARD_COUNT" +echo "ROCM_PATH: $ROCM_PATH" +echo "LOG_DIR: $LOG_DIR" +echo "ROCJPEG_DECODER_CREATE_LOG: $ROCJPEG_DECODER_CREATE_LOG" +echo "ROCAL_CPP_BIN: $ROCAL_CPP_BIN" +echo "ROCAL_PY_BENCH: $ROCAL_PY_BENCH" +echo "" + +echo "============================================================" +echo "C++ rocAL + rocJPEG, solution OFF" +echo "============================================================" +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=0 +"$ROCAL_CPP_BIN" "$DATASET" "$GPU_COUNT" "$SHARD_COUNT" 1024 1024 32 0 0 4 4 2>&1 | tee "$CPP_ROCJPEG_OFF_LOG" + +echo "" +echo "============================================================" +echo "C++ rocAL + rocJPEG, solution ON" +echo "============================================================" +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=1 +"$ROCAL_CPP_BIN" "$DATASET" "$GPU_COUNT" "$SHARD_COUNT" 1024 1024 32 0 0 4 4 2>&1 | tee "$CPP_ROCJPEG_ON_LOG" + +echo "" +echo "============================================================" +echo "C++ rocAL + TurboJPEG, one run only" +echo "============================================================" +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=0 +"$ROCAL_CPP_BIN" "$DATASET" "$GPU_COUNT" "$SHARD_COUNT" 1024 1024 32 0 0 0 4 2>&1 | tee "$CPP_TURBOJPEG_LOG" + +echo "" +echo "============================================================" +echo "Python rocAL + rocJPEG, solution OFF" +echo "============================================================" +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=0 +python3 "$ROCAL_PY_BENCH" --path "$DATASET" --device gpu --batch-size 32 --num-threads 4 --device-id 0 --num-gpus "$GPU_COUNT" --num-shards "$SHARD_COUNT" 2>&1 | tee "$PY_ROCJPEG_OFF_LOG" + +echo "" +echo "============================================================" +echo "Python rocAL + rocJPEG, solution ON" +echo "============================================================" +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=1 +python3 "$ROCAL_PY_BENCH" --path "$DATASET" --device gpu --batch-size 32 --num-threads 4 --device-id 0 --num-gpus "$GPU_COUNT" --num-shards "$SHARD_COUNT" 2>&1 | tee "$PY_ROCJPEG_ON_LOG" + +echo "" +echo "============================================================" +echo "Python rocAL + TurboJPEG, one run only" +echo "============================================================" +export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=0 +python3 "$ROCAL_PY_BENCH" --path "$DATASET" --device cpu --batch-size 32 --num-threads 4 --device-id 0 --num-gpus "$GPU_COUNT" --num-shards "$SHARD_COUNT" 2>&1 | tee "$PY_TURBOJPEG_LOG" + +echo "" +echo "Done. Logs written to:" +echo " $CPP_ROCJPEG_OFF_LOG" +echo " $CPP_ROCJPEG_ON_LOG" +echo " $CPP_TURBOJPEG_LOG" +echo " $PY_ROCJPEG_OFF_LOG" +echo " $PY_ROCJPEG_ON_LOG" +echo " $PY_TURBOJPEG_LOG" From 8bacc120f2466967a65f04e026dec9ec36532a16 Mon Sep 17 00:00:00 2001 From: Essam Aly Date: Thu, 21 May 2026 22:50:59 +0000 Subject: [PATCH 2/5] Comment more on coverage and scope of the tests --- tests/cpp_api/rocjpeg_decode_perf/README.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/cpp_api/rocjpeg_decode_perf/README.md b/tests/cpp_api/rocjpeg_decode_perf/README.md index f8d69441b..22210a473 100644 --- a/tests/cpp_api/rocjpeg_decode_perf/README.md +++ b/tests/cpp_api/rocjpeg_decode_perf/README.md @@ -178,8 +178,10 @@ export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=0 # disabled export ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=1 # enabled ``` -`run_tests_twice_solution_on_off.sh` sets this internally for the relevant -benchmark cases. +The rocJPEG dedicated OpenMP split path is enabled by default. Set +`ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT=0` to restore the previous single-decoder +rocJPEG path. `run_tests_twice_solution_on_off.sh` sets this internally for the +relevant benchmark cases so the old and new behavior can be compared directly. ## Build Notes @@ -188,6 +190,10 @@ separate CMake target. Do not place `perf_sharded_launcher.cpp` inside existing folders such as `performance_tests/` or `dataloader_multithread/`, because those folders glob `*.cpp` files into a single executable. +This harness is manual/performance-oriented and is not wired into the regular +CTest flow. It is intended for explicit developer or PR-reviewer runs on systems +with the needed dataset, rocAL build, rocJPEG build, and GPU configuration. + Suggested minimal CMake target: ```cmake @@ -204,6 +210,10 @@ target_link_libraries(perf_sharded_launcher pthread) If installing this with rocAL test assets, include the shell and Python files as test support files rather than compiling them. +This rocAL PR includes the rocAL-side changes only. Any rocJPEG decoder creation +logging patch that targets `src/rocjpeg_decoder.cpp` belongs in the rocJPEG repo +and is not included here. + ## Typical Workflow ### 1. Run rocAL C++ and Python comparisons From 650695f1d033d1cbec901502a667a905c2882d66 Mon Sep 17 00:00:00 2001 From: Essam Aly Date: Tue, 26 May 2026 19:45:00 +0000 Subject: [PATCH 3/5] Address rocJPEG decode benchmark review feedback Harden the sharded launcher work directory cleanup, make symlink names collision-resistant, validate dataloader CPU thread count, improve Python decoded image accounting, and use spawn for multi-shard benchmark workers. --- .../dataloader_multithread.cpp | 6 + .../perf_sharded_launcher.cpp | 114 +++++++++++++++++- .../rocal_decode_call_bench.py | 25 +++- 3 files changed, 134 insertions(+), 11 deletions(-) diff --git a/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp b/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp index 8ff3f9168..7ad7e364e 100644 --- a/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp +++ b/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp @@ -250,6 +250,12 @@ int main(int argc, const char **argv) { if (argc > argIdx) cpu_thread_count = atoi(argv[argIdx++]); + if (cpu_thread_count < 1) { + std::cout << "Invalid cpu_thread_count " << cpu_thread_count + << ", using 1" << std::endl; + cpu_thread_count = 1; + } + std::cout << "Number of GPUs: " << num_gpus << std::endl; // launch threads process shards diff --git a/tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp b/tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp index a4e1558ce..02b0c02a6 100644 --- a/tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp +++ b/tests/cpp_api/rocjpeg_decode_perf/perf_sharded_launcher.cpp @@ -1,11 +1,14 @@ #include #include #include +#include #include #include #include #include +#include #include +#include #include #include #include @@ -13,6 +16,8 @@ namespace fs = std::filesystem; +static const char* kWorkDirMarker = ".rocjpeg_decode_perf_workdir"; + static bool is_jpeg(const fs::path& path) { std::string ext = path.extension().string(); std::transform(ext.begin(), ext.end(), ext.begin(), [](unsigned char c) { @@ -21,6 +26,97 @@ static bool is_jpeg(const fs::path& path) { return ext == ".jpg" || ext == ".jpeg"; } +static std::string sanitized_relative_name(const fs::path& dataset_dir, const fs::path& path, size_t index) { + std::error_code ec; + fs::path relative_path = fs::relative(path, dataset_dir, ec); + if (ec || relative_path.empty()) { + relative_path = path.filename(); + } + + std::string name = relative_path.generic_string(); + for (char& c : name) { + const bool safe_char = std::isalnum(static_cast(c)) || + c == '.' || c == '-' || c == '_'; + if (!safe_char) { + c = '_'; + } + } + + std::ostringstream stream; + stream << std::setw(12) << std::setfill('0') << index << "_" << name; + return stream.str(); +} + +static bool prepare_work_dir(const fs::path& work_dir) { + if (work_dir.empty()) { + std::cerr << "work_dir must not be empty\n"; + return false; + } + + const fs::path absolute_work_dir = fs::absolute(work_dir).lexically_normal(); + if (absolute_work_dir == absolute_work_dir.root_path()) { + std::cerr << "Refusing to use filesystem root as work_dir: " + << absolute_work_dir << "\n"; + return false; + } + + std::error_code ec; + if (fs::is_symlink(fs::symlink_status(absolute_work_dir, ec))) { + std::cerr << "Refusing to use symlink as work_dir: " + << absolute_work_dir << "\n"; + return false; + } + + const fs::path marker_path = absolute_work_dir / kWorkDirMarker; + if (fs::exists(absolute_work_dir)) { + if (!fs::is_directory(absolute_work_dir)) { + std::cerr << "work_dir exists but is not a directory: " + << absolute_work_dir << "\n"; + return false; + } + + const bool has_marker = fs::exists(marker_path); + const bool is_empty = fs::is_empty(absolute_work_dir, ec); + if (ec) { + std::cerr << "Failed to inspect work_dir: " << absolute_work_dir + << " error: " << ec.message() << "\n"; + return false; + } + + if (!has_marker && !is_empty) { + std::cerr << "Refusing to delete non-empty work_dir without marker " + << marker_path << "\n"; + return false; + } + + if (has_marker) { + fs::remove_all(absolute_work_dir, ec); + if (ec) { + std::cerr << "Failed to remove work_dir: " << absolute_work_dir + << " error: " << ec.message() << "\n"; + return false; + } + } + } + + fs::create_directories(absolute_work_dir, ec); + if (ec) { + std::cerr << "Failed to create work_dir: " << absolute_work_dir + << " error: " << ec.message() << "\n"; + return false; + } + + FILE* marker_file = std::fopen(marker_path.c_str(), "w"); + if (!marker_file) { + std::cerr << "Failed to create marker file: " << marker_path + << " error: " << std::strerror(errno) << "\n"; + return false; + } + std::fclose(marker_file); + + return true; +} + static void usage(const char* prog) { std::cerr << "Usage: " << prog << " [batch_size=32] [threads=4] [fmt=rgb] [work_dir=/tmp/rocjpeg_decode_perf/shards] [log_dir=/tmp/rocjpeg_decode_perf]\n" @@ -76,9 +172,16 @@ int main(int argc, char** argv) { return 1; } - fs::remove_all(work_dir); - fs::create_directories(work_dir); - fs::create_directories(log_dir); + if (!prepare_work_dir(work_dir)) { + return 1; + } + std::error_code log_ec; + fs::create_directories(log_dir, log_ec); + if (log_ec) { + std::cerr << "Failed to create log_dir: " << log_dir + << " error: " << log_ec.message() << "\n"; + return 1; + } std::vector shard_counts(num_gpus, 0); @@ -88,8 +191,7 @@ int main(int argc, char** argv) { fs::create_directories(shard_dir); const fs::path src = files[i]; - const std::string link_name = - src.parent_path().filename().string() + "_" + src.filename().string(); + const std::string link_name = sanitized_relative_name(dataset_dir, src, i); const fs::path dst = shard_dir / link_name; std::error_code ec; @@ -107,6 +209,7 @@ int main(int argc, char** argv) { for (int gpu = 0; gpu < num_gpus; ++gpu) { std::cout << "GPU " << gpu << " shard files: " << shard_counts[gpu] << "\n"; } + std::cout.flush(); std::vector pids; @@ -153,6 +256,7 @@ int main(int argc, char** argv) { std::cout << "Launched GPU " << gpu << " pid " << pid << " log " << log_path << "\n"; + std::cout.flush(); } int failures = 0; diff --git a/tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py b/tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py index a30ec5acd..92e06626a 100644 --- a/tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py +++ b/tests/cpp_api/rocjpeg_decode_perf/rocal_decode_call_bench.py @@ -68,10 +68,14 @@ def effective_batch_size(args): return args.batch_size * rocjpeg_decoder_threads -def count_files(path): +def is_jpeg_file(filename): + return os.path.splitext(filename)[1].lower() in (".jpg", ".jpeg") + + +def count_jpeg_files(path): total = 0 for _, _, files in os.walk(path, followlinks=True): - total += len(files) + total += sum(1 for filename in files if is_jpeg_file(filename)) return total @@ -109,6 +113,12 @@ def add_timing_info(total, batch_info): return total +def decoded_image_count_from_timing(timing_info): + rocjpeg_count = timing_info.get("rocjpeg_decode_image_count", 0) + turbojpeg_count = timing_info.get("turbojpeg_decode_image_count", 0) + return rocjpeg_count + turbojpeg_count + + def run_one_shard(args, shard_id, num_shards, device_id, total_files): rocal_batch_size = effective_batch_size(args) shard_file_count = files_in_shard(total_files, shard_id, num_shards) @@ -167,7 +177,9 @@ def run_one_shard(args, shard_id, num_shards, device_id, total_files): extract_timing_info(pipe), ) - decoded_images = shard_file_count if shard_file_count >= 0 else total_files + decoded_images = decoded_image_count_from_timing(accumulated_timing_info) + if decoded_images <= 0: + decoded_images = shard_file_count if shard_file_count >= 0 else total_files avg_time_per_image_ms = 0.0 images_per_sec = 0.0 @@ -239,7 +251,7 @@ def print_multi_gpu_summary(results): def main(): args = parse_args() - total_files = args.total_files_on_disk if args.total_files_on_disk >= 0 else count_files(args.path) + total_files = args.total_files_on_disk if args.total_files_on_disk >= 0 else count_jpeg_files(args.path) num_shards = max(1, args.num_shards) num_gpus = max(1, args.num_gpus) @@ -247,11 +259,12 @@ def main(): run_one_shard(args, 0, 1, args.device_id, total_files) return + mp_context = mp.get_context("spawn") workers = [] - queue = mp.Queue() + queue = mp_context.Queue() for shard_id in range(num_shards): device_id = args.device_id + (shard_id % num_gpus) - process = mp.Process( + process = mp_context.Process( target=run_worker, args=(args, shard_id, num_shards, device_id, total_files, queue), ) From eb7750c5828b8b1886088800b89f73dfd9fec2fe Mon Sep 17 00:00:00 2001 From: Essam Aly Date: Tue, 26 May 2026 21:31:04 +0000 Subject: [PATCH 4/5] Address rocJPEG split review feedback --- .../loaders/image/image_read_and_decode.cpp | 29 +++++++++++++------ .../dataloader_multithread.cpp | 22 +++++++++----- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/rocAL/source/loaders/image/image_read_and_decode.cpp b/rocAL/source/loaders/image/image_read_and_decode.cpp index 5d485b4bd..8ac39f26f 100644 --- a/rocAL/source/loaders/image/image_read_and_decode.cpp +++ b/rocAL/source/loaders/image/image_read_and_decode.cpp @@ -23,14 +23,28 @@ THE SOFTWARE. #include "loaders/image/image_read_and_decode.h" #include +#include #include #include #include #include +#include #include "decoders/image/decoder_factory.h" #include "readers/image/external_source_reader.h" +static bool env_flag_disabled(const char* name) { + const char* value = std::getenv(name); + if (!value || value[0] == '\0') + return false; + + std::string text(value); + std::transform(text.begin(), text.end(), text.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return text == "0" || text == "no"; +} + std::tuple interpret_color_format(RocalColorFormat color_format) { switch (color_format) { @@ -94,13 +108,7 @@ void ImageReadAndDecode::create(ReaderConfig reader_config, DecoderConfig decode for (int i = 0; i < batch_size; i++) { _compressed_buff[i].resize(MAX_COMPRESSED_SIZE); // If we don't need MAX_COMPRESSED_SIZE we can remove this & resize in load module } - const char *rocjpeg_omp_split_env = std::getenv("ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT"); - _use_rocjpeg_dedicated_omp_split = !(rocjpeg_omp_split_env && - (std::strcmp(rocjpeg_omp_split_env, "0") == 0 || - std::strcmp(rocjpeg_omp_split_env, "OFF") == 0 || - std::strcmp(rocjpeg_omp_split_env, "off") == 0 || - std::strcmp(rocjpeg_omp_split_env, "FALSE") == 0 || - std::strcmp(rocjpeg_omp_split_env, "false") == 0)); + _use_rocjpeg_dedicated_omp_split = !env_flag_disabled("ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT"); if (_use_rocjpeg_dedicated_omp_split) { const size_t rocjpeg_decoder_count = std::min(static_cast(batch_size), std::max(static_cast(1), std::min(static_cast(4), _num_threads))); _rocjpeg_decoders.resize(rocjpeg_decoder_count); @@ -314,7 +322,9 @@ ImageReadAndDecode::load(unsigned char *buff, for (size_t i = 0; i < _batch_size; i++) _decompressed_buff_ptrs[i] = buff + image_size * i; - if (_decoder_config._type != DecoderType::ROCJPEG && _decoder_config._type != DecoderType::ROCJPEG_CROPPED) { + const bool is_rocjpeg_decoder = _decoder_config._type == DecoderType::ROCJPEG || + _decoder_config._type == DecoderType::ROCJPEG_CROPPED; + if (!is_rocjpeg_decoder) { #pragma omp parallel for num_threads(_num_threads) for (size_t i = 0; i < _batch_size; i++) { // initialize the actual decoded height and width with the maximum @@ -363,7 +373,7 @@ ImageReadAndDecode::load(unsigned char *buff, _actual_decoded_width[i] = scaledw; _actual_decoded_height[i] = scaledh; } - } else if (_decoder_config._type == DecoderType::ROCJPEG || _decoder_config._type == DecoderType::ROCJPEG_CROPPED) { + } else { #if ENABLE_HIP // Set device ID for load routine thread once if (!_set_device_id) { @@ -384,6 +394,7 @@ ImageReadAndDecode::load(unsigned char *buff, #pragma omp parallel for num_threads(rocjpeg_decoder_threads) for (size_t shard = 0; shard < _rocjpeg_decoders.size(); shard++) { #if ENABLE_HIP + // HIP current device is thread-local; set it for each OpenMP worker. hipError_t hip_status = hipSetDevice(_device_id); if (hip_status != hipSuccess) { THROW("hipSetDevice failed inside rocJPEG shard worker"); diff --git a/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp b/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp index 7ad7e364e..52806c8d8 100644 --- a/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp +++ b/tests/cpp_api/dataloader_multithread/dataloader_multithread.cpp @@ -24,12 +24,14 @@ THE SOFTWARE. #include #include +#include #include #include #include #include #include #include +#include #include using namespace cv; @@ -55,6 +57,18 @@ using namespace cv; using namespace std::chrono; std::mutex g_mtx; // mutex for critical section +static bool env_flag_disabled(const char* name) { + const char* value = std::getenv(name); + if (!value || value[0] == '\0') + return false; + + std::string text(value); + std::transform(text.begin(), text.end(), text.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return text == "0" || text == "no"; +} + int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, int shard_id, int num_shards, int dec_width, int dec_height, int batch_size, bool shuffle, bool display, int dec_mode, int cpu_thread_count) { std::unique_lock lck(g_mtx, std::defer_lock); std::cout << "Running on " << (gpu_mode >= 0 ? "GPU: " : "CPU: ") << gpu_mode << std::endl; @@ -62,13 +76,7 @@ int thread_func(const char *path, int gpu_mode, RocalImageColor color_format, in color_format = RocalImageColor::ROCAL_COLOR_RGB24; int gpu_id = (gpu_mode < 0) ? 0 : gpu_mode; RocalDecoderType dec_type = (RocalDecoderType)dec_mode; - const char *rocjpeg_omp_split_env = std::getenv("ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT"); - const bool rocjpeg_omp_split_enabled = !(rocjpeg_omp_split_env && - (std::strcmp(rocjpeg_omp_split_env, "0") == 0 || - std::strcmp(rocjpeg_omp_split_env, "OFF") == 0 || - std::strcmp(rocjpeg_omp_split_env, "off") == 0 || - std::strcmp(rocjpeg_omp_split_env, "FALSE") == 0 || - std::strcmp(rocjpeg_omp_split_env, "false") == 0)); + const bool rocjpeg_omp_split_enabled = !env_flag_disabled("ROCAL_ROCJPEG_DEDICATED_OMP_SPLIT"); const int rocjpeg_decoder_threads = std::max(1, std::min(4, cpu_thread_count)); const int effective_batch_size = (dec_mode == 4 && rocjpeg_omp_split_enabled) ? batch_size * rocjpeg_decoder_threads : batch_size; if (effective_batch_size != batch_size) { From c0526b76bad17e36264065dfeffa212261fa4587 Mon Sep 17 00:00:00 2001 From: Essam Aly Date: Wed, 27 May 2026 00:24:53 +0000 Subject: [PATCH 5/5] Address reviewer comments and suggestions --- .../loaders/image/image_read_and_decode.cpp | 70 ++++++++++++++----- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/rocAL/source/loaders/image/image_read_and_decode.cpp b/rocAL/source/loaders/image/image_read_and_decode.cpp index 8ac39f26f..054cd03eb 100644 --- a/rocAL/source/loaders/image/image_read_and_decode.cpp +++ b/rocAL/source/loaders/image/image_read_and_decode.cpp @@ -391,45 +391,69 @@ ImageReadAndDecode::load(unsigned char *buff, } const int rocjpeg_decoder_threads = static_cast(_rocjpeg_decoders.size()); + bool rocjpeg_worker_failed = false; + std::string rocjpeg_worker_error; + auto record_rocjpeg_worker_error = [&](const std::string& error_message) { +#pragma omp critical(rocjpeg_worker_error) + { + if (!rocjpeg_worker_failed) { + rocjpeg_worker_failed = true; + rocjpeg_worker_error = error_message; + } + } + }; #pragma omp parallel for num_threads(rocjpeg_decoder_threads) for (size_t shard = 0; shard < _rocjpeg_decoders.size(); shard++) { #if ENABLE_HIP // HIP current device is thread-local; set it for each OpenMP worker. hipError_t hip_status = hipSetDevice(_device_id); if (hip_status != hipSuccess) { - THROW("hipSetDevice failed inside rocJPEG shard worker"); + record_rocjpeg_worker_error("hipSetDevice failed inside rocJPEG shard worker"); + continue; } #endif auto& rocjpeg_decoder = _rocjpeg_decoders[shard]; const size_t shard_begin = rocjpeg_sub_batch_offsets[shard]; const size_t shard_size = _rocjpeg_sub_batch_sizes[shard]; const size_t shard_end = shard_begin + shard_size; + bool shard_failed = false; for (size_t i = shard_begin; i < shard_end; i++) { const size_t local_index = i - shard_begin; _actual_decoded_width[i] = max_decoded_width; _actual_decoded_height[i] = max_decoded_height; int original_width, original_height, decoded_width, decoded_height; - if (rocjpeg_decoder->decode_info(_compressed_buff[i].data(), _actual_read_size[i], &original_width, &original_height, - &decoded_width, &decoded_height, - max_decoded_width, max_decoded_height, decoder_color_format, static_cast(local_index)) != Decoder::Status::OK) { - int j = static_cast(shard_end) - 1; - while (j >= static_cast(shard_begin)) { - if (rocjpeg_decoder->decode_info(_compressed_buff[j].data(), _actual_read_size[j], &original_width, &original_height, - &decoded_width, &decoded_height, - max_decoded_width, max_decoded_height, decoder_color_format, static_cast(local_index)) == Decoder::Status::OK) { - _image_names[i] = _image_names[j]; - _compressed_buff[i] = _compressed_buff[j]; - _actual_read_size[i] = _actual_read_size[j]; - _compressed_image_size[i] = _compressed_image_size[j]; - break; - } else { - j--; - } - if (j < static_cast(shard_begin)) { - THROW("All images in the rocJpeg sub-batch failed decoding\n"); + bool decode_info_found = false; + int candidate = static_cast(i); + const int shard_begin_index = static_cast(shard_begin); + const int shard_end_index = static_cast(shard_end); + while (candidate >= shard_begin_index) { + if (rocjpeg_decoder->decode_info(_compressed_buff[candidate].data(), _actual_read_size[candidate], &original_width, &original_height, + &decoded_width, &decoded_height, + max_decoded_width, max_decoded_height, decoder_color_format, static_cast(local_index)) == Decoder::Status::OK) { + if (candidate != static_cast(i)) { + _image_names[i] = _image_names[candidate]; + _compressed_buff[i] = _compressed_buff[candidate]; + _actual_read_size[i] = _actual_read_size[candidate]; + _compressed_image_size[i] = _compressed_image_size[candidate]; } + decode_info_found = true; + break; + } + + if (candidate == static_cast(i)) { + candidate = shard_end_index - 1; + } else { + candidate--; } + if (candidate == static_cast(i)) { + candidate--; + } + } + if (!decode_info_found) { + record_rocjpeg_worker_error("All images in the rocJpeg sub-batch failed decoding\n"); + shard_failed = true; + break; } _original_height[i] = original_height; _original_width[i] = original_width; @@ -446,6 +470,9 @@ ImageReadAndDecode::load(unsigned char *buff, } } } + if (shard_failed) { + continue; + } std::vector shard_output(_decompressed_buff_ptrs.begin() + shard_begin, _decompressed_buff_ptrs.begin() + shard_end); std::vector shard_original_width(_original_width.begin() + shard_begin, _original_width.begin() + shard_end); @@ -457,11 +484,16 @@ ImageReadAndDecode::load(unsigned char *buff, max_decoded_width, max_decoded_height, shard_original_width, shard_original_height, shard_actual_decoded_width, shard_actual_decoded_height) != Decoder::Status::OK) { + record_rocjpeg_worker_error("rocJpeg sub-batch decode failed\n"); + continue; } std::copy(shard_actual_decoded_width.begin(), shard_actual_decoded_width.end(), _actual_decoded_width.begin() + shard_begin); std::copy(shard_actual_decoded_height.begin(), shard_actual_decoded_height.end(), _actual_decoded_height.begin() + shard_begin); } + if (rocjpeg_worker_failed) { + THROW(rocjpeg_worker_error); + } } else { // Iterate through each image in the batch and obtain the decode info for (size_t i = 0; i < _batch_size; i++) {