From b63ad50f0421f2a5993aad11fe9ac691eba6ffa6 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 21 Apr 2026 12:21:43 +0800 Subject: [PATCH] Fix EC striped reads: correct slice offsets in stripe buffer and drain futures divideOneStripe: add bufBaseOffset and include it when computing addSlice() positions into the destination ByteBuffer. Reason: readOneStripe() may read a sub-range of one logical stripe; valid bytes lie in curStripeBuf at [stripeBufOffset, stripeLimit) where stripeBufOffset is offsetInBlockGroup % stripeLen. Previously slices were laid out from buffer offset 0, while copyToTarget() consumed from stripeBufOffset, so applications read wrong bytes (zeros or stale data). That broke format parsers that depend on exact byte streams (e.g. Parquet footer magic). StripedInputStreamImpl: pass stripeBufOffset into divideOneStripe() for sequential reads; pass 0 in fetchBlockByteRange() where the buffer holds exactly the requested byte range. resetCurStripeBuffer: after clear(), memset the full capacity of curStripeBuf and parityBuf when allocated so EC logical holes (ALLZERO / tail padding) do not leak bytes from a prior stripe. StripingCell: fix constructor and init() parameter shadowing (ecPolicy and idxInBlkGroup were self-assigned and never stored in members). StripeReader::readStripe: remove the early break that cleared futures once fetchedChunksNum reached dataBlkNum. Parity chunk reads and decode retry paths enqueue additional futures and must all complete before leaving the loop. --- src/client/StripeReader.cpp | 6 ++--- src/client/StripedBlockUtil.cpp | 5 ++-- src/client/StripedBlockUtil.h | 37 ++++++++++++++++----------- src/client/StripedInputStreamImpl.cpp | 15 ++++++++--- 4 files changed, 39 insertions(+), 24 deletions(-) diff --git a/src/client/StripeReader.cpp b/src/client/StripeReader.cpp index 2c5b7154..078a238c 100644 --- a/src/client/StripeReader.cpp +++ b/src/client/StripeReader.cpp @@ -311,10 +311,8 @@ void StripeReader::readStripe() { returnedChunk->state = StripedBlockUtil::StripingChunk::FETCHED; alignedStripe.fetchedChunksNum++; updateState4SuccessRead(r); - if (alignedStripe.fetchedChunksNum == dataBlkNum) { - clearFutures(); - break; - } + // Do not exit early when fetchedChunksNum reaches dataBlkNum: parity + // reads and decode retries also enqueue futures and must be drained. } else { returnedChunk->state = StripedBlockUtil::StripingChunk::MISSING; dfsStripedInputStream->closeReader(readerInfos[r.index]); diff --git a/src/client/StripedBlockUtil.cpp b/src/client/StripedBlockUtil.cpp index ab306538..1473cd09 100644 --- a/src/client/StripedBlockUtil.cpp +++ b/src/client/StripedBlockUtil.cpp @@ -71,7 +71,8 @@ void StripedBlockUtil::constructInternalBlock(LocatedBlock & bg, int32_t idxInRe void StripedBlockUtil::divideOneStripe(shared_ptr ecPolicy, int cellSize, LocatedBlock & blockGroup, long rangeStartInBlockGroup, - long rangeEndInBlockGroup, ByteBuffer * buf, std::vector & stripes) { + long rangeEndInBlockGroup, ByteBuffer * buf, int32_t bufBaseOffset, + std::vector & stripes) { int dataBlkNum = ecPolicy->getNumDataUnits(); // Step 1: map the byte range to StripingCells std::vector cells; @@ -105,7 +106,7 @@ void StripedBlockUtil::divideOneStripe(shared_ptr ecPolicy, chunk = shared_ptr(new StripingChunk()); s->chunks[cell.idxInStripe] = chunk; } - int pos = static_cast(done + overlapStart - cellStart); + int pos = static_cast(bufBaseOffset + done + overlapStart - cellStart); chunk->getChunkBuffer()->addSlice(buf, pos, overLapLen); } } diff --git a/src/client/StripedBlockUtil.h b/src/client/StripedBlockUtil.h index 4eb83d86..79f2f121 100644 --- a/src/client/StripedBlockUtil.h +++ b/src/client/StripedBlockUtil.h @@ -329,26 +329,26 @@ class StripedBlockUtil { StripingCell() = default; - StripingCell(shared_ptr ecPolicy, int cellSize, long idxInBlkGroup, + StripingCell(shared_ptr policy, int portionSize, long blkGroupCellIndex, long cellOffset) { - ecPolicy = ecPolicy; - idxInBlkGroup = idxInBlkGroup; - idxInInternalBlk = idxInBlkGroup / ecPolicy->getNumDataUnits(); - idxInStripe = static_cast(idxInBlkGroup - - idxInInternalBlk * ecPolicy->getNumDataUnits()); + ecPolicy = policy; + idxInBlkGroup = blkGroupCellIndex; + idxInInternalBlk = blkGroupCellIndex / policy->getNumDataUnits(); + idxInStripe = static_cast(blkGroupCellIndex - + idxInInternalBlk * policy->getNumDataUnits()); offset = cellOffset; - size = cellSize; + size = portionSize; } - void init(shared_ptr ecPolicy, int cellSize, long idxInBlkGroup, + void init(shared_ptr policy, int portionSize, long blkGroupCellIndex, long cellOffset) { - ecPolicy = ecPolicy; - idxInBlkGroup = idxInBlkGroup; - idxInInternalBlk = idxInBlkGroup / ecPolicy->getNumDataUnits(); - idxInStripe = static_cast(idxInBlkGroup - - idxInInternalBlk * ecPolicy->getNumDataUnits()); + ecPolicy = policy; + idxInBlkGroup = blkGroupCellIndex; + idxInInternalBlk = blkGroupCellIndex / policy->getNumDataUnits(); + idxInStripe = static_cast(blkGroupCellIndex - + idxInInternalBlk * policy->getNumDataUnits()); offset = cellOffset; - size = cellSize; + size = portionSize; } int getIdxInStripe() const { @@ -426,9 +426,16 @@ class StripedBlockUtil { static void checkBlocks(ExtendedBlock blockGroup, int i, ExtendedBlock blocki); static void constructInternalBlock(LocatedBlock & bg, int32_t idxInReturnedLocs, int32_t cellSize, int32_t dataBlkNum, int32_t idxInBlockGroup, LocatedBlock & lb); + /** + * @param bufBaseOffset byte offset in buf where the logical range + * [rangeStartInBlockGroup, rangeEndInBlockGroup] is laid out contiguously. + * Use 0 when buf only holds that range; use (offsetInBlockGroup % stripeLen) + * when buf is the full stripe buffer in StripedInputStreamImpl::readOneStripe. + */ static void divideOneStripe(shared_ptr ecPolicy, int cellSize, LocatedBlock & blockGroup, long rangeStartInBlockGroup, - long rangeEndInBlockGroup, ByteBuffer * buf, std::vector & stripes); + long rangeEndInBlockGroup, ByteBuffer * buf, int32_t bufBaseOffset, + std::vector & stripes); static int64_t getInternalBlockLength(int64_t dataSize, int32_t cellSize, int32_t numDataBlocks, int32_t idxInBlockGroup); diff --git a/src/client/StripedInputStreamImpl.cpp b/src/client/StripedInputStreamImpl.cpp index 7efad306..dbf34f29 100644 --- a/src/client/StripedInputStreamImpl.cpp +++ b/src/client/StripedInputStreamImpl.cpp @@ -33,6 +33,7 @@ #include #include +#include namespace Hdfs { @@ -108,6 +109,14 @@ void StripedInputStreamImpl::resetCurStripeBuffer(bool force) { if (curStripeBuf != nullptr) { curStripeBuf->clear(); + // EC stripes can have logical holes (ALLZERO / padding). Zero the whole + // buffer so copyToTarget never exposes stale bytes from a prior stripe. + std::memset(curStripeBuf->getBuffer(), 0, curStripeBuf->capacity()); + } + + if (parityBuf != nullptr) { + parityBuf->clear(); + std::memset(parityBuf->getBuffer(), 0, parityBuf->capacity()); } curStripeRange->setLength(0); @@ -255,7 +264,7 @@ void StripedInputStreamImpl::fetchBlockByteRange(shared_ptr curBlo std::vector stripes; StripedBlockUtil::divideOneStripe( ecPolicy, cellSize, *blockGroup, start, - end, byteBuffer.get(), stripes); + end, byteBuffer.get(), 0, stripes); std::vector blks; StripedBlockUtil::parseStripedBlockGroup(*blockGroup, cellSize, dataBlkNum, parityBlkNum, blks); for (int i = 0; i < static_cast(blockGroup->getIndices().size()); ++i) { @@ -322,8 +331,8 @@ void StripedInputStreamImpl::readOneStripe() { int64_t stripeRangeLen = stripeLimit - stripeBufOffset; std::vector stripes; StripedBlockUtil::divideOneStripe( - ecPolicy, cellSize, *curBlock, offsetInBlockGroup, - offsetInBlockGroup + stripeRangeLen - 1, curStripeBuf, stripes); + ecPolicy, cellSize, *curBlock, offsetInBlockGroup, + offsetInBlockGroup + stripeRangeLen - 1, curStripeBuf, stripeBufOffset, stripes); // 3. read stripe for (int i = 0; i < static_cast(stripes.size()); ++i) {