diff --git a/src/client/StripeReader.cpp b/src/client/StripeReader.cpp index 2c5b7154..078a238c 100644 --- a/src/client/StripeReader.cpp +++ b/src/client/StripeReader.cpp @@ -311,10 +311,8 @@ void StripeReader::readStripe() { returnedChunk->state = StripedBlockUtil::StripingChunk::FETCHED; alignedStripe.fetchedChunksNum++; updateState4SuccessRead(r); - if (alignedStripe.fetchedChunksNum == dataBlkNum) { - clearFutures(); - break; - } + // Do not exit early when fetchedChunksNum reaches dataBlkNum: parity + // reads and decode retries also enqueue futures and must be drained. } else { returnedChunk->state = StripedBlockUtil::StripingChunk::MISSING; dfsStripedInputStream->closeReader(readerInfos[r.index]); diff --git a/src/client/StripedBlockUtil.cpp b/src/client/StripedBlockUtil.cpp index ab306538..1473cd09 100644 --- a/src/client/StripedBlockUtil.cpp +++ b/src/client/StripedBlockUtil.cpp @@ -71,7 +71,8 @@ void StripedBlockUtil::constructInternalBlock(LocatedBlock & bg, int32_t idxInRe void StripedBlockUtil::divideOneStripe(shared_ptr ecPolicy, int cellSize, LocatedBlock & blockGroup, long rangeStartInBlockGroup, - long rangeEndInBlockGroup, ByteBuffer * buf, std::vector & stripes) { + long rangeEndInBlockGroup, ByteBuffer * buf, int32_t bufBaseOffset, + std::vector & stripes) { int dataBlkNum = ecPolicy->getNumDataUnits(); // Step 1: map the byte range to StripingCells std::vector cells; @@ -105,7 +106,7 @@ void StripedBlockUtil::divideOneStripe(shared_ptr ecPolicy, chunk = shared_ptr(new StripingChunk()); s->chunks[cell.idxInStripe] = chunk; } - int pos = static_cast(done + overlapStart - cellStart); + int pos = static_cast(bufBaseOffset + done + overlapStart - cellStart); chunk->getChunkBuffer()->addSlice(buf, pos, overLapLen); } } diff --git a/src/client/StripedBlockUtil.h b/src/client/StripedBlockUtil.h index 4eb83d86..79f2f121 100644 --- a/src/client/StripedBlockUtil.h +++ b/src/client/StripedBlockUtil.h @@ -329,26 +329,26 @@ class StripedBlockUtil { StripingCell() = default; - StripingCell(shared_ptr ecPolicy, int cellSize, long idxInBlkGroup, + StripingCell(shared_ptr policy, int portionSize, long blkGroupCellIndex, long cellOffset) { - ecPolicy = ecPolicy; - idxInBlkGroup = idxInBlkGroup; - idxInInternalBlk = idxInBlkGroup / ecPolicy->getNumDataUnits(); - idxInStripe = static_cast(idxInBlkGroup - - idxInInternalBlk * ecPolicy->getNumDataUnits()); + ecPolicy = policy; + idxInBlkGroup = blkGroupCellIndex; + idxInInternalBlk = blkGroupCellIndex / policy->getNumDataUnits(); + idxInStripe = static_cast(blkGroupCellIndex - + idxInInternalBlk * policy->getNumDataUnits()); offset = cellOffset; - size = cellSize; + size = portionSize; } - void init(shared_ptr ecPolicy, int cellSize, long idxInBlkGroup, + void init(shared_ptr policy, int portionSize, long blkGroupCellIndex, long cellOffset) { - ecPolicy = ecPolicy; - idxInBlkGroup = idxInBlkGroup; - idxInInternalBlk = idxInBlkGroup / ecPolicy->getNumDataUnits(); - idxInStripe = static_cast(idxInBlkGroup - - idxInInternalBlk * ecPolicy->getNumDataUnits()); + ecPolicy = policy; + idxInBlkGroup = blkGroupCellIndex; + idxInInternalBlk = blkGroupCellIndex / policy->getNumDataUnits(); + idxInStripe = static_cast(blkGroupCellIndex - + idxInInternalBlk * policy->getNumDataUnits()); offset = cellOffset; - size = cellSize; + size = portionSize; } int getIdxInStripe() const { @@ -426,9 +426,16 @@ class StripedBlockUtil { static void checkBlocks(ExtendedBlock blockGroup, int i, ExtendedBlock blocki); static void constructInternalBlock(LocatedBlock & bg, int32_t idxInReturnedLocs, int32_t cellSize, int32_t dataBlkNum, int32_t idxInBlockGroup, LocatedBlock & lb); + /** + * @param bufBaseOffset byte offset in buf where the logical range + * [rangeStartInBlockGroup, rangeEndInBlockGroup] is laid out contiguously. + * Use 0 when buf only holds that range; use (offsetInBlockGroup % stripeLen) + * when buf is the full stripe buffer in StripedInputStreamImpl::readOneStripe. + */ static void divideOneStripe(shared_ptr ecPolicy, int cellSize, LocatedBlock & blockGroup, long rangeStartInBlockGroup, - long rangeEndInBlockGroup, ByteBuffer * buf, std::vector & stripes); + long rangeEndInBlockGroup, ByteBuffer * buf, int32_t bufBaseOffset, + std::vector & stripes); static int64_t getInternalBlockLength(int64_t dataSize, int32_t cellSize, int32_t numDataBlocks, int32_t idxInBlockGroup); diff --git a/src/client/StripedInputStreamImpl.cpp b/src/client/StripedInputStreamImpl.cpp index 7efad306..dbf34f29 100644 --- a/src/client/StripedInputStreamImpl.cpp +++ b/src/client/StripedInputStreamImpl.cpp @@ -33,6 +33,7 @@ #include #include +#include namespace Hdfs { @@ -108,6 +109,14 @@ void StripedInputStreamImpl::resetCurStripeBuffer(bool force) { if (curStripeBuf != nullptr) { curStripeBuf->clear(); + // EC stripes can have logical holes (ALLZERO / padding). Zero the whole + // buffer so copyToTarget never exposes stale bytes from a prior stripe. + std::memset(curStripeBuf->getBuffer(), 0, curStripeBuf->capacity()); + } + + if (parityBuf != nullptr) { + parityBuf->clear(); + std::memset(parityBuf->getBuffer(), 0, parityBuf->capacity()); } curStripeRange->setLength(0); @@ -255,7 +264,7 @@ void StripedInputStreamImpl::fetchBlockByteRange(shared_ptr curBlo std::vector stripes; StripedBlockUtil::divideOneStripe( ecPolicy, cellSize, *blockGroup, start, - end, byteBuffer.get(), stripes); + end, byteBuffer.get(), 0, stripes); std::vector blks; StripedBlockUtil::parseStripedBlockGroup(*blockGroup, cellSize, dataBlkNum, parityBlkNum, blks); for (int i = 0; i < static_cast(blockGroup->getIndices().size()); ++i) { @@ -322,8 +331,8 @@ void StripedInputStreamImpl::readOneStripe() { int64_t stripeRangeLen = stripeLimit - stripeBufOffset; std::vector stripes; StripedBlockUtil::divideOneStripe( - ecPolicy, cellSize, *curBlock, offsetInBlockGroup, - offsetInBlockGroup + stripeRangeLen - 1, curStripeBuf, stripes); + ecPolicy, cellSize, *curBlock, offsetInBlockGroup, + offsetInBlockGroup + stripeRangeLen - 1, curStripeBuf, stripeBufOffset, stripes); // 3. read stripe for (int i = 0; i < static_cast(stripes.size()); ++i) {