diff --git a/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/_internal/shared_gop_store.py b/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/_internal/shared_gop_store.py index 35daffa..66bf2af 100644 --- a/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/_internal/shared_gop_store.py +++ b/packages/on_demand_video_decoder/accvlab/on_demand_video_decoder/_internal/shared_gop_store.py @@ -48,7 +48,7 @@ import hashlib import os from multiprocessing import shared_memory -from typing import List, Optional +from typing import Any, List, Optional import numpy as np @@ -95,6 +95,31 @@ def _hash_video_path(video_path: str) -> np.uint64: return np.uint64(int.from_bytes(digest, 'little')) +def _as_byte_view(data: Any) -> memoryview: + """Return a C-contiguous uint8 memoryview over bytes-like GOP data.""" + if isinstance(data, np.ndarray): + if not data.flags.c_contiguous: + data = np.ascontiguousarray(data) + view = memoryview(data) + else: + try: + view = memoryview(data) + except TypeError as exc: + raise TypeError("data must be a bytes-like object or numpy.ndarray") from exc + if not view.c_contiguous: + view = memoryview(view.tobytes()) + + if view.nbytes == 0: + raise ValueError("data must not be empty") + + if view.ndim != 1 or view.itemsize != 1 or view.format not in ('B', 'b', 'c'): + try: + view = view.cast('B') + except TypeError as exc: + raise TypeError("data must expose a C-contiguous byte buffer") from exc + return view + + class SharedGopStore: """Cross-process shared GOP store backed by POSIX SharedMemory. @@ -254,9 +279,13 @@ def lookup(self, video_path: str, frame_id: int) -> Optional[GopRef]: self._misses += 1 return None - def put(self, video_path: str, first_frame_id: int, gop_len: int, data: np.ndarray) -> GopRef: + def put(self, video_path: str, first_frame_id: int, gop_len: int, data: Any) -> GopRef: """Store GOP packet data and return a :class:`GopRef`. + ``data`` may be a ``uint8`` numpy array or any C-contiguous + bytes-like object, including ``bytes``, ``bytearray``, and + ``memoryview``. + Holds ``flock`` during eviction + insertion to guarantee atomicity. Performs a double-check after acquiring the lock (another worker may have inserted while we waited). @@ -282,17 +311,18 @@ def put(self, video_path: str, first_frame_id: int, gop_len: int, data: np.ndarr gop_len=int(e['gop_len']), ) + data_view = _as_byte_view(data) slot_idx = self._find_free_or_evict() # Content-addressed naming -- same GOP always gets the same name. shm_name = f"{_SHM_PREFIX}_{self.store_id}_{vp_hash}_{first_frame_id}" - data_size = int(data.nbytes) + data_size = int(data_view.nbytes) # Clean up if same GOP was previously cached then evicted _cleanup_stale_shm(shm_name) shm = shared_memory.SharedMemory(name=shm_name, create=True, size=data_size) - shm.buf[:data_size] = data.tobytes() + shm.buf[:data_size] = data_view shm.close() # close handle; shm persists until unlink # Write metadata entry diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDecoder.hpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDecoder.hpp index 3da591f..2a2823c 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDecoder.hpp +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDecoder.hpp @@ -105,6 +105,9 @@ class PyNvGopDecoder { const std::vector frame_ids, const FastStreamInfo* fastStreamInfos = nullptr); + SerializedPacketBundle get_gop_from_bytes(std::shared_ptr> data, + const std::vector frame_ids); + /** * Extract GOP data for multiple videos and return them as separate bundles * diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDemuxer.hpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDemuxer.hpp index ca2c1bc..336643f 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDemuxer.hpp +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/inc/PyNvGopDemuxer.hpp @@ -18,11 +18,13 @@ #include "FFmpegDemuxer.h" #include +#include #include #include #include #include #include +#include extern "C" { #include } @@ -38,6 +40,8 @@ class PyNvGopDemuxer { public: explicit PyNvGopDemuxer(const std::string&); explicit PyNvGopDemuxer(const std::string& filePath, const FastStreamInfo* fastStreamInfo); + explicit PyNvGopDemuxer(const std::string& filePath, + std::shared_ptr> memoryData); uint32_t GetHeight() { return demuxer->GetHeight(); } diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_constructors.cpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_constructors.cpp index fec7d7c..700aa30 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_constructors.cpp +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_constructors.cpp @@ -34,6 +34,34 @@ namespace fs = std::filesystem; +namespace { +py::buffer_info RequestByteBuffer(const py::buffer& buffer, const std::string& arg_name) { + py::buffer_info info = buffer.request(); + if (info.itemsize != 1) { + throw std::invalid_argument(arg_name + " must expose a byte-sized buffer"); + } + if (info.ndim > 1) { + throw std::invalid_argument(arg_name + " must be a contiguous 1D bytes-like object"); + } + if (info.ndim == 1 && !info.strides.empty() && info.strides[0] != 1) { + throw std::invalid_argument(arg_name + " must be contiguous"); + } + + const size_t data_size = static_cast(info.size) * static_cast(info.itemsize); + if (data_size == 0) { + throw std::invalid_argument(arg_name + " must not be empty"); + } + return info; +} + +std::shared_ptr> CopyByteBuffer(const py::buffer& buffer) { + py::buffer_info info = RequestByteBuffer(buffer, "video_bytes"); + const auto* src = static_cast(info.ptr); + auto data = std::make_shared>(src, src + info.size * info.itemsize); + return data; +} +} // namespace + std::vector GetFastInitInfo(const std::vector& filepaths) { std::vector fast_stream_infos; fast_stream_infos.reserve(filepaths.size()); @@ -487,6 +515,34 @@ void Init_PyNvGopDecoder(py::module& m) { >>> rgb_frames = decoder.DecodeN12ToRGB(['video.mp4', 'video2.mp4'], [0, 10], as_bgr=True) >>> print(f"Decoded {len(rgb_frames)} RGB frames") )pbdoc") + .def( + "DecodeN12ToRGBFromBytes", + [](std::shared_ptr& dec, const py::buffer& video_bytes, + const std::vector frame_ids, bool as_bgr) { + try { + auto data = CopyByteBuffer(video_bytes); + SerializedPacketBundle serialized_data; + std::vector result; + std::vector filepaths(frame_ids.size(), "memory://video"); + + { + py::gil_scoped_release release; + serialized_data = dec->get_gop_from_bytes(data, frame_ids); + dec->decode_from_gop(serialized_data.data.get(), serialized_data.size, + filepaths, frame_ids, true, as_bgr, nullptr, &result); + } + return result; + } catch (const std::exception& e) { + throw std::runtime_error(e.what()); + } + }, + py::arg("video_bytes"), py::arg("frame_ids"), py::arg("as_bgr") = false, + R"pbdoc( + Decodes frames from a complete MP4 byte buffer into RGB/BGR data. + + This method is a memory-input variant of DecodeN12ToRGB. The input must be + a complete seekable container byte buffer, not a live stream. + )pbdoc") .def( "GetGOP", [](std::shared_ptr& dec, const std::vector& filepaths, @@ -556,6 +612,35 @@ void Init_PyNvGopDecoder(py::module& m) { >>> gop_data, first_ids, gop_lens = decoder.GetGOP(['video.mp4', 'video2.mp4'], [0, 10]) >>> print(f"Extracted GOP data for {len(first_ids)} GOPs") )pbdoc") + .def( + "GetGOPFromBytes", + [](std::shared_ptr& dec, const py::buffer& video_bytes, + const std::vector frame_ids) { + try { + auto data = CopyByteBuffer(video_bytes); + SerializedPacketBundle serialized_data; + { + py::gil_scoped_release release; + serialized_data = dec->get_gop_from_bytes(data, frame_ids); + } + + auto capsule = py::capsule(serialized_data.data.release(), + [](void* ptr) { delete[] static_cast(ptr); }); + py::array_t numpy_data(serialized_data.size, + static_cast(capsule.get_pointer()), capsule); + return py::make_tuple(numpy_data, serialized_data.first_frame_ids, + serialized_data.gop_lens); + } catch (const std::exception& e) { + throw std::runtime_error(e.what()); + } + }, + py::arg("video_bytes"), py::arg("frame_ids"), + R"pbdoc( + Extracts GOP packet data from a complete MP4 byte buffer. + + This is the memory-input variant of GetGOP and is intended for sources + such as FFRecord records or mmap slices that contain complete MP4 bytes. + )pbdoc") .def( "GetGOPList", [](std::shared_ptr& dec, const std::vector& filepaths, diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_separate_decoder.cpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_separate_decoder.cpp index a08465c..e43210c 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_separate_decoder.cpp +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDecoder_separate_decoder.cpp @@ -152,6 +152,112 @@ SerializedPacketBundle PyNvGopDecoder::get_gop(const std::vector& f return result; } +SerializedPacketBundle PyNvGopDecoder::get_gop_from_bytes( + std::shared_ptr> data, const std::vector frame_ids) { + nvtxRangePushA("GetGOPFromBytes"); + + if (!data || data->empty()) { + throw std::invalid_argument("[ERROR] video byte buffer is empty"); + } + if (frame_ids.empty()) { + throw std::invalid_argument("[ERROR] frame_ids must not be empty"); + } + if (frame_ids.size() > max_num_files) { + throw std::invalid_argument("[ERROR] frame_ids size is greater than max_num_files"); + } + + const size_t total_frames = frame_ids.size(); + + std::vector> demuxers; + std::vector>>> vpacket_queue; + std::vector>> vpacket_array; + std::vector> all_gop_lens(total_frames); + std::vector> all_first_frame_ids(total_frames); + + demuxers.resize(total_frames); + vpacket_queue.reserve(total_frames); + vpacket_array.reserve(total_frames); + for (size_t i = 0; i < total_frames; ++i) { + vpacket_queue.emplace_back(std::make_unique>>()); + vpacket_queue[i]->setSize(MAX_SIZE); + vpacket_array.emplace_back(); + } + + ensureDemuxRunnersInitialized(); + + nvtxRangePushA("Initialize memory demuxers"); + for (size_t i = 0; i < total_frames; ++i) { + demuxers[i].reset(new PyNvGopDemuxer("memory://video", data)); + if (!demuxers[i]->IsValid()) { + nvtxRangePop(); // Initialize memory demuxers + nvtxRangePop(); // GetGOPFromBytes + throw std::runtime_error("[ERROR] create memory demuxer failed"); + } + } + nvtxRangePop(); // Initialize memory demuxers + + nvtxRangePushA("Packet extraction from bytes"); + for (int i = 0; i < static_cast(total_frames); ++i) { + try { + std::vector sorted_frame_ids = {frame_ids[i]}; + if (demuxers[i]->IsVFRV2()) { + int st = ExtractAndProcessGopInfo(demuxers[i], sorted_frame_ids, all_first_frame_ids[i], + all_gop_lens[i]); + if (st != 0) { + throw std::runtime_error("[ERROR] extract and process gop info failed for bytes source"); + } + } +#ifdef PROCESS_SYNC + DemuxGopProc(demuxers[i].get(), vpacket_queue[i].get(), sorted_frame_ids, + all_first_frame_ids[i], all_gop_lens[i], vpacket_array[i], true); +#else + demux_runners[i].join(); + demux_runners[i].start(PyNvGopDecoder::DemuxGopProc, demuxers[i].get(), vpacket_queue[i].get(), + sorted_frame_ids, std::ref(all_first_frame_ids[i]), + std::ref(all_gop_lens[i]), std::ref(vpacket_array[i]), true); +#endif + } catch (const std::exception& e) { + this->force_join_all(); + nvtxRangePop(); // Packet extraction from bytes + nvtxRangePop(); // GetGOPFromBytes + throw std::runtime_error(e.what()); + } + } + nvtxRangePop(); // Packet extraction from bytes + + nvtxRangePushA("Demux thread join"); + try { + for (int i = 0; i < static_cast(total_frames); ++i) { +#ifndef PROCESS_SYNC + demux_runners[i].join(); +#endif + } + } catch (const std::exception& e) { + this->force_join_all(); + nvtxRangePop(); // Demux thread join + nvtxRangePop(); // GetGOPFromBytes + throw std::runtime_error(e.what()); + } + + for (int i = 0; i < static_cast(total_frames); i++) { + if (vpacket_array.at(i).empty()) { + this->force_join_all(); + nvtxRangePop(); // Demux thread join + nvtxRangePop(); // GetGOPFromBytes + throw std::runtime_error("[ERROR] vpacket_array is empty for bytes source"); + } + } + nvtxRangePop(); // Demux thread join + + nvtxRangePushA("CreateSerializedPacketBundle"); + SerializedPacketBundle result = createSerializedPacketBundle( + total_frames, demuxers, all_gop_lens, all_first_frame_ids, vpacket_queue, vpacket_array); + nvtxRangePop(); // CreateSerializedPacketBundle + + nvtxRangePop(); // GetGOPFromBytes + return result; +} + std::vector PyNvGopDecoder::get_gop_list(const std::vector& filepaths, const std::vector frame_ids, const FastStreamInfo* fastStreamInfos) { diff --git a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDemuxer.cpp b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDemuxer.cpp index 1373051..99017b8 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDemuxer.cpp +++ b/packages/on_demand_video_decoder/ext_impl/src/PyNvOnDemandDecoder/src/PyNvGopDemuxer.cpp @@ -37,6 +37,15 @@ PyNvGopDemuxer::PyNvGopDemuxer(const std::string& filePath, const FastStreamInfo nvtxRangePop(); } +PyNvGopDemuxer::PyNvGopDemuxer(const std::string& filePath, + std::shared_ptr> memoryData) { + nvtxRangePushA("FFmpegDemuxer_Create_FromMemory"); + auto provider = std::make_shared(std::move(memoryData)); + demuxer.reset(new FFmpegDemuxer(std::move(provider))); + this->filename = filePath; + nvtxRangePop(); +} + // Frame and PTS mapping methods void PyNvGopDemuxer::set_pts_frameid_mapping(std::map&& frame2pts, std::map&& pts2frame) { diff --git a/packages/on_demand_video_decoder/ext_impl/src/VideoCodecSDKUtils/helper_classes/Utils/FFmpegDemuxer.h b/packages/on_demand_video_decoder/ext_impl/src/VideoCodecSDKUtils/helper_classes/Utils/FFmpegDemuxer.h index 534fe20..00eb541 100644 --- a/packages/on_demand_video_decoder/ext_impl/src/VideoCodecSDKUtils/helper_classes/Utils/FFmpegDemuxer.h +++ b/packages/on_demand_video_decoder/ext_impl/src/VideoCodecSDKUtils/helper_classes/Utils/FFmpegDemuxer.h @@ -16,20 +16,25 @@ #pragma once -extern "C" { -#include -#include -#include -#include -#include -#include -#include +extern "C" { +#include +#include +#include +#include +#include +#include +#include /* Explicitly include bsf.h when building against FFmpeg 4.3 (libavcodec 58.45.100) or later for backward compatibility */ #if LIBAVCODEC_VERSION_INT >= 3824484 #include -#endif -} -#include "cuviddec.h" +#endif +} +#include +#include +#include +#include +#include +#include "cuviddec.h" #include "nvcuvid.h" #include "NvCodecUtils.h" #include "nvtx3/nvtx3.hpp" @@ -477,17 +482,86 @@ class FFmpegDemuxer { FFBufferIO* ffbufIO = NULL; public: - class DataProvider { - public: - virtual ~DataProvider() {} - virtual int GetData(uint8_t* pBuf, int nBuf) = 0; - }; - - private: - void init(AVFormatContext* fmtc_, int64_t timeScale = 1000 /*Hz*/, const FastStreamInfo* fastStreamInfo = nullptr) { - if (!fmtc_) { - LOG(ERROR) << "No AVFormatContext provided."; - return; + class DataProvider { + public: + virtual ~DataProvider() {} + virtual int GetData(uint8_t* pBuf, int nBuf) = 0; + }; + + class SeekableDataProvider : public DataProvider { + public: + virtual int64_t Seek(int64_t offset, int whence) { + if (whence == AVSEEK_SIZE) { + return Size(); + } + return AVERROR(ENOSYS); + } + virtual int64_t Size() const { return -1; } + virtual bool IsSeekable() const { return Size() >= 0; } + }; + + class MemoryDataProvider : public SeekableDataProvider { + public: + explicit MemoryDataProvider(std::shared_ptr> data) + : data_(std::move(data)), pos_(0) {} + + int GetData(uint8_t* pBuf, int nBuf) override { + if (!data_ || nBuf <= 0 || pos_ >= static_cast(data_->size())) { + return AVERROR_EOF; + } + int64_t remaining = static_cast(data_->size()) - pos_; + int bytes_to_copy = static_cast(std::min(remaining, nBuf)); + std::memcpy(pBuf, data_->data() + pos_, bytes_to_copy); + pos_ += bytes_to_copy; + return bytes_to_copy; + } + + int64_t Seek(int64_t offset, int whence) override { + if (!data_) { + return AVERROR(EINVAL); + } + if (whence == AVSEEK_SIZE) { + return static_cast(data_->size()); + } + + int64_t new_pos = 0; + switch (whence) { + case SEEK_SET: + new_pos = offset; + break; + case SEEK_CUR: + new_pos = pos_ + offset; + break; + case SEEK_END: + new_pos = static_cast(data_->size()) + offset; + break; + default: + return AVERROR(EINVAL); + } + + if (new_pos < 0 || new_pos > static_cast(data_->size())) { + return AVERROR(EINVAL); + } + pos_ = new_pos; + return pos_; + } + + int64_t Size() const override { + return data_ ? static_cast(data_->size()) : -1; + } + + private: + std::shared_ptr> data_; + int64_t pos_; + }; + + private: + std::shared_ptr dataProvider = nullptr; + + void init(AVFormatContext* fmtc_, int64_t timeScale = 1000 /*Hz*/, const FastStreamInfo* fastStreamInfo = nullptr) { + if (!fmtc_) { + LOG(ERROR) << "No AVFormatContext provided."; + return; } fmtc = fmtc_; @@ -690,11 +764,12 @@ class FFmpegDemuxer { avcodec_parameters_copy(bsfc->par_in, fmtc->streams[iVideoStream]->codecpar); ck(av_bsf_init(bsfc)); } - - /* Some inputs doesn't allow seek functionality. - * Check this ahead of time. */ - is_seekable = fmtc->iformat->read_seek || fmtc->iformat->read_seek2; - } + + /* Some inputs doesn't allow seek functionality. + * Check this ahead of time. */ + bool avio_seekable = !fmtc->pb || (fmtc->pb->seekable & AVIO_SEEKABLE_NORMAL); + is_seekable = (fmtc->iformat->read_seek || fmtc->iformat->read_seek2) && avio_seekable; + } /** * @brief Private constructor to initialize libavformat resources. @@ -726,16 +801,22 @@ class FFmpegDemuxer { LOG(ERROR) << "FFmpeg error: " << __FILE__ << " " << __LINE__; return NULL; } - avioc = - avio_alloc_context(avioc_buffer, avioc_buffer_size, 0, pDataProvider, &ReadPacket, NULL, NULL); - if (!avioc) { - LOG(ERROR) << "FFmpeg error: " << __FILE__ << " " << __LINE__; - return NULL; - } - ctx->pb = avioc; - - ck(avformat_open_input(&ctx, NULL, NULL, NULL)); - return ctx; + SeekableDataProvider* pSeekableDataProvider = dynamic_cast(pDataProvider); + int64_t (*seek_cb)(void*, int64_t, int) = + pSeekableDataProvider && pSeekableDataProvider->IsSeekable() ? &SeekPacket : NULL; + avioc = avio_alloc_context(avioc_buffer, avioc_buffer_size, 0, pDataProvider, &ReadPacket, NULL, + seek_cb); + if (!avioc) { + LOG(ERROR) << "FFmpeg error: " << __FILE__ << " " << __LINE__; + return NULL; + } + if (pSeekableDataProvider && pSeekableDataProvider->IsSeekable()) { + avioc->seekable = AVIO_SEEKABLE_NORMAL; + } + ctx->pb = avioc; + + ck(avformat_open_input(&ctx, NULL, NULL, NULL)); + return ctx; } /** @@ -755,10 +836,19 @@ class FFmpegDemuxer { public: FFmpegDemuxer(const char *szFilePath, int64_t timescale = 1000 /*Hz*/) : FFmpegDemuxer(CreateFormatContext(szFilePath), timescale) {} FFmpegDemuxer(const char *szFilePath, const FastStreamInfo* fastStreamInfo, int64_t timescale = 1000 /*Hz*/) : FFmpegDemuxer(CreateFormatContext(szFilePath), fastStreamInfo, timescale) {} - - FFmpegDemuxer(DataProvider* pDataProvider) : FFmpegDemuxer(CreateFormatContext(pDataProvider)) { - avioc = fmtc->pb; - } + + FFmpegDemuxer(DataProvider* pDataProvider) : FFmpegDemuxer(CreateFormatContext(pDataProvider)) { + avioc = fmtc->pb; + } + + FFmpegDemuxer(std::shared_ptr pDataProvider, int64_t timescale = 1000 /*Hz*/) { + dataProvider = std::move(pDataProvider); + AVFormatContext* ctx = CreateFormatContext(dataProvider.get()); + avioc = ctx ? ctx->pb : nullptr; + nvtxRangePushA("init"); + init(ctx, timescale); + nvtxRangePop(); + } FFmpegDemuxer(const char* szFilePath, int buf_size /*512*1024*/, int64_t timescale = 1000) { unsigned char* io_buffer; @@ -1256,10 +1346,15 @@ class FFmpegDemuxer { return true; } - static int ReadPacket(void* opaque, uint8_t* pBuf, int nBuf) { - return ((DataProvider*)opaque)->GetData(pBuf, nBuf); - } -}; + static int ReadPacket(void* opaque, uint8_t* pBuf, int nBuf) { + return ((DataProvider*)opaque)->GetData(pBuf, nBuf); + } + + static int64_t SeekPacket(void* opaque, int64_t offset, int whence) { + auto* provider = dynamic_cast((DataProvider*)opaque); + return provider ? provider->Seek(offset, whence) : AVERROR(ENOSYS); + } +}; // #ifndef DEMUX_ONLY inline cudaVideoCodec FFmpeg2NvCodecId(AVCodecID id) { diff --git a/packages/on_demand_video_decoder/tests/test_shared_gop_store.py b/packages/on_demand_video_decoder/tests/test_shared_gop_store.py index 4bd9c42..1c60932 100644 --- a/packages/on_demand_video_decoder/tests/test_shared_gop_store.py +++ b/packages/on_demand_video_decoder/tests/test_shared_gop_store.py @@ -142,6 +142,29 @@ def test_read_returns_correct_data(self, store): view = store.read(ref) np.testing.assert_array_equal(view, data) + def test_put_accepts_bytes(self, store): + """put() accepts raw bytes without requiring a numpy array wrapper.""" + data = _make_gop_data(4096, seed=17) + ref = store.put("/video/bytes.mp4", 0, 30, data.tobytes()) + view = store.read(ref) + np.testing.assert_array_equal(view, data) + + def test_put_accepts_mutable_bytes_like_objects(self, store): + """put() accepts bytearray and memoryview GOP payloads.""" + data0 = _make_gop_data(512, seed=18) + data1 = _make_gop_data(768, seed=19) + + ref0 = store.put("/video/bytearray.mp4", 0, 30, bytearray(data0)) + ref1 = store.put("/video/memoryview.mp4", 0, 30, memoryview(data1)) + + np.testing.assert_array_equal(store.read(ref0), data0) + np.testing.assert_array_equal(store.read(ref1), data1) + + def test_put_rejects_empty_bytes(self, store): + """put() rejects empty payloads before allocating SharedMemory.""" + with pytest.raises(ValueError, match="must not be empty"): + store.put("/video/empty.mp4", 0, 30, b"") + def test_cleanup_removes_all_shm(self, store_id): """After cleanup(), no shm files remain for this store.""" s = SharedGopStore.create(capacity=4, store_id=store_id)