Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import hashlib
import os
from multiprocessing import shared_memory
from typing import List, Optional
from typing import Any, List, Optional

import numpy as np

Expand Down Expand Up @@ -95,6 +95,31 @@ def _hash_video_path(video_path: str) -> np.uint64:
return np.uint64(int.from_bytes(digest, 'little'))


def _as_byte_view(data: Any) -> memoryview:
"""Return a C-contiguous uint8 memoryview over bytes-like GOP data."""
if isinstance(data, np.ndarray):
if not data.flags.c_contiguous:
data = np.ascontiguousarray(data)
view = memoryview(data)
else:
try:
view = memoryview(data)
except TypeError as exc:
raise TypeError("data must be a bytes-like object or numpy.ndarray") from exc
if not view.c_contiguous:
view = memoryview(view.tobytes())

if view.nbytes == 0:
raise ValueError("data must not be empty")

if view.ndim != 1 or view.itemsize != 1 or view.format not in ('B', 'b', 'c'):
try:
view = view.cast('B')
except TypeError as exc:
raise TypeError("data must expose a C-contiguous byte buffer") from exc
return view


class SharedGopStore:
"""Cross-process shared GOP store backed by POSIX SharedMemory.

Expand Down Expand Up @@ -254,9 +279,13 @@ def lookup(self, video_path: str, frame_id: int) -> Optional[GopRef]:
self._misses += 1
return None

def put(self, video_path: str, first_frame_id: int, gop_len: int, data: np.ndarray) -> GopRef:
def put(self, video_path: str, first_frame_id: int, gop_len: int, data: Any) -> GopRef:
"""Store GOP packet data and return a :class:`GopRef`.

``data`` may be a ``uint8`` numpy array or any C-contiguous
bytes-like object, including ``bytes``, ``bytearray``, and
``memoryview``.

Holds ``flock`` during eviction + insertion to guarantee atomicity.
Performs a double-check after acquiring the lock (another worker
may have inserted while we waited).
Expand All @@ -282,17 +311,18 @@ def put(self, video_path: str, first_frame_id: int, gop_len: int, data: np.ndarr
gop_len=int(e['gop_len']),
)

data_view = _as_byte_view(data)
slot_idx = self._find_free_or_evict()

# Content-addressed naming -- same GOP always gets the same name.
shm_name = f"{_SHM_PREFIX}_{self.store_id}_{vp_hash}_{first_frame_id}"
data_size = int(data.nbytes)
data_size = int(data_view.nbytes)

# Clean up if same GOP was previously cached then evicted
_cleanup_stale_shm(shm_name)

shm = shared_memory.SharedMemory(name=shm_name, create=True, size=data_size)
shm.buf[:data_size] = data.tobytes()
shm.buf[:data_size] = data_view
shm.close() # close handle; shm persists until unlink

# Write metadata entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class PyNvGopDecoder {
const std::vector<int> frame_ids,
const FastStreamInfo* fastStreamInfos = nullptr);

SerializedPacketBundle get_gop_from_bytes(std::shared_ptr<const std::vector<uint8_t>> data,
const std::vector<int> frame_ids);

/**
* Extract GOP data for multiple videos and return them as separate bundles
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

#include "FFmpegDemuxer.h"
#include <map>
#include <memory>
#include <pybind11/cast.h>
#include <pybind11/embed.h>
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <vector>
extern "C" {
#include <libavutil/frame.h>
}
Expand All @@ -38,6 +40,8 @@ class PyNvGopDemuxer {
public:
explicit PyNvGopDemuxer(const std::string&);
explicit PyNvGopDemuxer(const std::string& filePath, const FastStreamInfo* fastStreamInfo);
explicit PyNvGopDemuxer(const std::string& filePath,
std::shared_ptr<const std::vector<uint8_t>> memoryData);

uint32_t GetHeight() { return demuxer->GetHeight(); }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,34 @@

namespace fs = std::filesystem;

namespace {
py::buffer_info RequestByteBuffer(const py::buffer& buffer, const std::string& arg_name) {
py::buffer_info info = buffer.request();
if (info.itemsize != 1) {
throw std::invalid_argument(arg_name + " must expose a byte-sized buffer");
}
if (info.ndim > 1) {
throw std::invalid_argument(arg_name + " must be a contiguous 1D bytes-like object");
}
if (info.ndim == 1 && !info.strides.empty() && info.strides[0] != 1) {
throw std::invalid_argument(arg_name + " must be contiguous");
}

const size_t data_size = static_cast<size_t>(info.size) * static_cast<size_t>(info.itemsize);
if (data_size == 0) {
throw std::invalid_argument(arg_name + " must not be empty");
}
return info;
}

std::shared_ptr<std::vector<uint8_t>> CopyByteBuffer(const py::buffer& buffer) {
py::buffer_info info = RequestByteBuffer(buffer, "video_bytes");
const auto* src = static_cast<const uint8_t*>(info.ptr);
auto data = std::make_shared<std::vector<uint8_t>>(src, src + info.size * info.itemsize);
return data;
}
} // namespace

std::vector<FastStreamInfo> GetFastInitInfo(const std::vector<std::string>& filepaths) {
std::vector<FastStreamInfo> fast_stream_infos;
fast_stream_infos.reserve(filepaths.size());
Expand Down Expand Up @@ -487,6 +515,34 @@ void Init_PyNvGopDecoder(py::module& m) {
>>> rgb_frames = decoder.DecodeN12ToRGB(['video.mp4', 'video2.mp4'], [0, 10], as_bgr=True)
>>> print(f"Decoded {len(rgb_frames)} RGB frames")
)pbdoc")
.def(
"DecodeN12ToRGBFromBytes",
[](std::shared_ptr<PyNvGopDecoder>& dec, const py::buffer& video_bytes,
const std::vector<int> frame_ids, bool as_bgr) {
try {
auto data = CopyByteBuffer(video_bytes);
SerializedPacketBundle serialized_data;
std::vector<RGBFrame> result;
std::vector<std::string> filepaths(frame_ids.size(), "memory://video");

{
py::gil_scoped_release release;
serialized_data = dec->get_gop_from_bytes(data, frame_ids);
dec->decode_from_gop(serialized_data.data.get(), serialized_data.size,
filepaths, frame_ids, true, as_bgr, nullptr, &result);
}
return result;
} catch (const std::exception& e) {
throw std::runtime_error(e.what());
}
},
py::arg("video_bytes"), py::arg("frame_ids"), py::arg("as_bgr") = false,
R"pbdoc(
Decodes frames from a complete MP4 byte buffer into RGB/BGR data.

This method is a memory-input variant of DecodeN12ToRGB. The input must be
a complete seekable container byte buffer, not a live stream.
)pbdoc")
.def(
"GetGOP",
[](std::shared_ptr<PyNvGopDecoder>& dec, const std::vector<std::string>& filepaths,
Expand Down Expand Up @@ -556,6 +612,35 @@ void Init_PyNvGopDecoder(py::module& m) {
>>> gop_data, first_ids, gop_lens = decoder.GetGOP(['video.mp4', 'video2.mp4'], [0, 10])
>>> print(f"Extracted GOP data for {len(first_ids)} GOPs")
)pbdoc")
.def(
"GetGOPFromBytes",
[](std::shared_ptr<PyNvGopDecoder>& dec, const py::buffer& video_bytes,
const std::vector<int> frame_ids) {
try {
auto data = CopyByteBuffer(video_bytes);
SerializedPacketBundle serialized_data;
{
py::gil_scoped_release release;
serialized_data = dec->get_gop_from_bytes(data, frame_ids);
}

auto capsule = py::capsule(serialized_data.data.release(),
[](void* ptr) { delete[] static_cast<uint8_t*>(ptr); });
py::array_t<uint8_t> numpy_data(serialized_data.size,
static_cast<uint8_t*>(capsule.get_pointer()), capsule);
return py::make_tuple(numpy_data, serialized_data.first_frame_ids,
serialized_data.gop_lens);
} catch (const std::exception& e) {
throw std::runtime_error(e.what());
}
},
py::arg("video_bytes"), py::arg("frame_ids"),
R"pbdoc(
Extracts GOP packet data from a complete MP4 byte buffer.

This is the memory-input variant of GetGOP and is intended for sources
such as FFRecord records or mmap slices that contain complete MP4 bytes.
)pbdoc")
.def(
"GetGOPList",
[](std::shared_ptr<PyNvGopDecoder>& dec, const std::vector<std::string>& filepaths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,112 @@ SerializedPacketBundle PyNvGopDecoder::get_gop(const std::vector<std::string>& f
return result;
}

SerializedPacketBundle PyNvGopDecoder::get_gop_from_bytes(
std::shared_ptr<const std::vector<uint8_t>> data, const std::vector<int> frame_ids) {
nvtxRangePushA("GetGOPFromBytes");

if (!data || data->empty()) {
throw std::invalid_argument("[ERROR] video byte buffer is empty");
}
if (frame_ids.empty()) {
throw std::invalid_argument("[ERROR] frame_ids must not be empty");
}
if (frame_ids.size() > max_num_files) {
throw std::invalid_argument("[ERROR] frame_ids size is greater than max_num_files");
}

const size_t total_frames = frame_ids.size();

std::vector<std::unique_ptr<PyNvGopDemuxer>> demuxers;
std::vector<std::unique_ptr<ConcurrentQueue<std::tuple<uint8_t*, int, int>>>> vpacket_queue;
std::vector<std::vector<std::unique_ptr<uint8_t[]>>> vpacket_array;
std::vector<std::vector<int>> all_gop_lens(total_frames);
std::vector<std::vector<int>> all_first_frame_ids(total_frames);

demuxers.resize(total_frames);
vpacket_queue.reserve(total_frames);
vpacket_array.reserve(total_frames);
for (size_t i = 0; i < total_frames; ++i) {
vpacket_queue.emplace_back(std::make_unique<ConcurrentQueue<std::tuple<uint8_t*, int, int>>>());
vpacket_queue[i]->setSize(MAX_SIZE);
vpacket_array.emplace_back();
}

ensureDemuxRunnersInitialized();

nvtxRangePushA("Initialize memory demuxers");
for (size_t i = 0; i < total_frames; ++i) {
demuxers[i].reset(new PyNvGopDemuxer("memory://video", data));
if (!demuxers[i]->IsValid()) {
nvtxRangePop(); // Initialize memory demuxers
nvtxRangePop(); // GetGOPFromBytes
throw std::runtime_error("[ERROR] create memory demuxer failed");
}
}
nvtxRangePop(); // Initialize memory demuxers

nvtxRangePushA("Packet extraction from bytes");
for (int i = 0; i < static_cast<int>(total_frames); ++i) {
try {
std::vector<int> sorted_frame_ids = {frame_ids[i]};
if (demuxers[i]->IsVFRV2()) {
int st = ExtractAndProcessGopInfo(demuxers[i], sorted_frame_ids, all_first_frame_ids[i],
all_gop_lens[i]);
if (st != 0) {
throw std::runtime_error("[ERROR] extract and process gop info failed for bytes source");
}
}
#ifdef PROCESS_SYNC
DemuxGopProc(demuxers[i].get(), vpacket_queue[i].get(), sorted_frame_ids,
all_first_frame_ids[i], all_gop_lens[i], vpacket_array[i], true);
#else
demux_runners[i].join();
demux_runners[i].start(PyNvGopDecoder::DemuxGopProc, demuxers[i].get(), vpacket_queue[i].get(),
sorted_frame_ids, std::ref(all_first_frame_ids[i]),
std::ref(all_gop_lens[i]), std::ref(vpacket_array[i]), true);
#endif
} catch (const std::exception& e) {
this->force_join_all();
nvtxRangePop(); // Packet extraction from bytes
nvtxRangePop(); // GetGOPFromBytes
throw std::runtime_error(e.what());
}
}
nvtxRangePop(); // Packet extraction from bytes

nvtxRangePushA("Demux thread join");
try {
for (int i = 0; i < static_cast<int>(total_frames); ++i) {
#ifndef PROCESS_SYNC
demux_runners[i].join();
#endif
}
} catch (const std::exception& e) {
this->force_join_all();
nvtxRangePop(); // Demux thread join
nvtxRangePop(); // GetGOPFromBytes
throw std::runtime_error(e.what());
}

for (int i = 0; i < static_cast<int>(total_frames); i++) {
if (vpacket_array.at(i).empty()) {
this->force_join_all();
nvtxRangePop(); // Demux thread join
nvtxRangePop(); // GetGOPFromBytes
throw std::runtime_error("[ERROR] vpacket_array is empty for bytes source");
}
}
nvtxRangePop(); // Demux thread join

nvtxRangePushA("CreateSerializedPacketBundle");
SerializedPacketBundle result = createSerializedPacketBundle(
total_frames, demuxers, all_gop_lens, all_first_frame_ids, vpacket_queue, vpacket_array);
nvtxRangePop(); // CreateSerializedPacketBundle

nvtxRangePop(); // GetGOPFromBytes
return result;
}

std::vector<SerializedPacketBundle> PyNvGopDecoder::get_gop_list(const std::vector<std::string>& filepaths,
const std::vector<int> frame_ids,
const FastStreamInfo* fastStreamInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ PyNvGopDemuxer::PyNvGopDemuxer(const std::string& filePath, const FastStreamInfo
nvtxRangePop();
}

PyNvGopDemuxer::PyNvGopDemuxer(const std::string& filePath,
std::shared_ptr<const std::vector<uint8_t>> memoryData) {
nvtxRangePushA("FFmpegDemuxer_Create_FromMemory");
auto provider = std::make_shared<FFmpegDemuxer::MemoryDataProvider>(std::move(memoryData));
demuxer.reset(new FFmpegDemuxer(std::move(provider)));
this->filename = filePath;
nvtxRangePop();
}

// Frame and PTS mapping methods
void PyNvGopDemuxer::set_pts_frameid_mapping(std::map<int, int64_t>&& frame2pts,
std::map<int64_t, int>&& pts2frame) {
Expand Down
Loading