diff --git a/ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h b/ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h index 17626ab65ae2..0e0f5f086cd8 100644 --- a/ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h +++ b/ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h @@ -217,10 +217,16 @@ struct TEventTypeField { PROBE(PDiskChunkReadPieceComplete, GROUPS("PDisk", "PDiskRequest"), \ TYPES(TPDiskIdField, ui64, ui64, double), \ NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \ + PROBE(PDiskChunkWritePieceComplete, GROUPS("PDisk", "PDiskRequest"), \ + TYPES(TPDiskIdField, ui64, ui64, double), \ + NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \ PROBE(PDiskChunkWriteAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \ TYPES(TPDiskIdField, ui64, double, ui64, bool, ui64, ui64), \ NAMES("pdisk", "reqId", "creationTimeSec", "owner", "isFast", "priorityClass", "size")) \ - PROBE(PDiskChunkWriteLastPieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \ + PROBE(PDiskChunkWritePieceAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \ + TYPES(TPDiskIdField, ui32, ui64, ui64), \ + NAMES("pdisk", "pieceIdx", "offset", "size")) \ + PROBE(PDiskChunkWritePieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \ TYPES(TPDiskIdField, ui64, ui64, ui64, ui64), \ NAMES("pdisk", "owner", "chunkIdx", "pieceOffset", "pieceSize")) \ PROBE(PDiskLogWriteComplete, GROUPS("PDisk", "PDiskRequest"), \ diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp index 2f78bd4ec64f..e212403dbd9b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp @@ -889,7 +889,7 @@ class TRealBlockDevice : public IBlockDevice { } Y_VERIFY_S(PCtx->ActorSystem->AppData(), PCtx->PDiskLogPrefix); - Y_VERIFY_S(PCtx->ActorSystem->AppData()->IoContextFactory, PCtx->PDiskLogPrefix); + Y_VERIFY_S(PCtx->ActorSystem->AppData()->IoContextFactory, PCtx->PDiskLogPrefix); auto *factory = PCtx->ActorSystem->AppData()->IoContextFactory; IoContext = factory->CreateAsyncIoContext(Path, PCtx->PDiskId, Flags, SectorMap); if (Flags & TDeviceMode::UseSpdk) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_chunk_write_queue.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_chunk_write_queue.h new file mode 100644 index 000000000000..8e70c43acb42 --- /dev/null +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_chunk_write_queue.h @@ -0,0 +1,34 @@ +#include +#include + +namespace NKikimr { +namespace NPDisk { + +class TChunkWritePiece; + +class TChunkWritePieceQueue { +public: + TChunkWritePieceQueue() = default; + bool Empty() const { + return SizeCounter.load(std::memory_order_relaxed) == 0; + } + size_t Size() const { + return SizeCounter.load(std::memory_order_relaxed); + } + TChunkWritePiece* Dequeue() { + TChunkWritePiece* item; + QueueImpl.Dequeue(&item); + SizeCounter.fetch_sub(1, std::memory_order_relaxed); + return item; + } + void Enqueue(TChunkWritePiece* item) { + QueueImpl.Enqueue(item); + SizeCounter.fetch_add(1, std::memory_order_relaxed); + } +private: + TLockFreeQueue QueueImpl; + std::atomic SizeCounter = 0; +}; + +} // NPDisk +} // NKikimr diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp index fc240d5854f5..1335717f3a2e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp @@ -11,6 +11,55 @@ namespace NPDisk { // Completion actions //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Chunk write completion actions +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + +TCompletionChunkWritePiece::TCompletionChunkWritePiece(TChunkWritePiece* piece, TCompletionChunkWrite* cumulativeCompletion) + : TCompletionAction() + , Piece(piece) + , CumulativeCompletion(cumulativeCompletion) + , Span(piece->Span.CreateChild(TWilson::PDiskDetailed, "PDisk.ChunkWritePiece.CompletionPart")) + , ActorSystem(Piece->PDisk->PCtx->ActorSystem) +{ +} + +TCompletionChunkWritePiece::~TCompletionChunkWritePiece() { + if (CumulativeCompletion) { + CumulativeCompletion->RemovePart(ActorSystem); + } +} + +void TCompletionChunkWritePiece::Exec(TActorSystem *actorSystem) { + Span.Event("PDisk.CompletionChunkWritePart.Exec"); + Y_VERIFY(actorSystem); + Y_VERIFY(CumulativeCompletion); + if (TCompletionAction::Result != EIoResult::Ok) { + Release(actorSystem); + return; + } + + double deviceTimeMs = HPMilliSecondsFloat(GetTime - SubmitTime); + //TODO: Fork and join this orbit from ChunkWrite orbit. + LWTRACK(PDiskChunkWritePieceComplete, Orbit, Piece->PDisk->PCtx->PDiskId, Piece->PieceSize, Piece->PieceShift, deviceTimeMs); + + CumulativeCompletion->CompletePart(actorSystem); + CumulativeCompletion = nullptr; + + Span.Event("PDisk.CompletionChunkWritePart.ExecStop"); + delete this; +} + +void TCompletionChunkWritePiece::Release(TActorSystem *actorSystem) { + Y_UNUSED(actorSystem); + if (CumulativeCompletion) { + CumulativeCompletion->ErrorReason = ErrorReason; + } + Span.EndError("PDisk.CompletionChunkWritePart.Release"); + delete this; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Log write completion action //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h index cc06f5a45da9..a5c362412e37 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h @@ -76,11 +76,14 @@ class TCompletionChunkWrite : public TCompletionAction { std::function OnDestroy; TReqId ReqId; NWilson::TSpan Span; - + std::atomic PartsStarted; + std::atomic PartsRemoved; + std::atomic PartsWritten; public: + bool IsReplied = false; + ui32 Pieces; TEvChunkWrite::TPartsPtr Parts; std::optional Buffer; - TCompletionChunkWrite(const TActorId &recipient, TEvChunkWriteResult *event, TPDiskMon *mon, ui32 pdiskId, NHPTimer::STime startTime, size_t sizeBytes, ui8 priorityClass, std::function onDestroy, TReqId reqId, NWilson::TSpan&& span) @@ -164,16 +167,57 @@ class TCompletionChunkWrite : public TCompletionAction { if (Mon) { Mon->GetWriteCounter(PriorityClass)->CountResponse(); } + delete this; } void Release(TActorSystem *actorSystem) override { - Event->Status = NKikimrProto::CORRUPTED; - Event->ErrorReason = ErrorReason; - actorSystem->Send(Recipient, Event.Release()); - Span.EndError(ErrorReason); + if (!IsReplied) { + Event->Status = NKikimrProto::CORRUPTED; + Event->ErrorReason = ErrorReason; + actorSystem->Send(Recipient, Event.Release()); + Span.EndError(ErrorReason); + } delete this; } + + void AddPart() { + PartsStarted++; + } + + bool AllPartsStarted() { + return PartsStarted == Pieces; + } + + void RemovePart(TActorSystem *actorSystem) { + ui32 old = PartsRemoved.fetch_add(1, std::memory_order::seq_cst); + if (old + 1 == Pieces) { + if (PartsWritten.load(std::memory_order::seq_cst) == Pieces) { + Exec(actorSystem); + } else { + Release(actorSystem); + } + } + } + + void CompletePart(TActorSystem *actorSystem) { + PartsWritten++; + RemovePart(actorSystem); + } +}; + +class TChunkWritePiece; + +class TCompletionChunkWritePiece : public TCompletionAction { + TChunkWritePiece* Piece; + TCompletionChunkWrite* CumulativeCompletion; + NWilson::TSpan Span; + TActorSystem* ActorSystem; +public: + TCompletionChunkWritePiece(NKikimr::NPDisk::TChunkWritePiece* piece, TCompletionChunkWrite* cumulativeCompletion); + void Exec(TActorSystem *actorSystem) override; + void Release(TActorSystem *actorSystem) override; + virtual ~TCompletionChunkWritePiece(); }; class TCompletionLogWrite : public TCompletionAction { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h index 3f4a3f3ffb5c..c6640382af83 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h @@ -165,6 +165,7 @@ struct TPDiskConfig : public TThrRefBase { NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColorBorder = NKikimrBlobStorage::TPDiskSpaceColor::GREEN; ui32 CompletionThreadsCount = 1; + ui32 EncryptionThreadCount = 0; bool UseNoopScheduler = false; bool PlainDataChunks = false; @@ -419,6 +420,9 @@ struct TPDiskConfig : public TThrRefBase { if (cfg->HasCompletionThreadsCount()) { CompletionThreadsCount = cfg->GetCompletionThreadsCount(); } + if (cfg->HasEncryptionThreadCount()) { + EncryptionThreadCount = cfg->GetEncryptionThreadCount(); + } if (cfg->HasUseNoopScheduler()) { UseNoopScheduler = cfg->GetUseNoopScheduler(); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h index 74f8b41ca96f..1a9623f369ac 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h @@ -654,6 +654,7 @@ struct TDiskFormat { str << " MagicFormatChunk: " << MagicFormatChunk << x; str << " ChunkSize: " << ChunkSize << " bytes (" << (ChunkSize / 1000000ull) << " MB)" << x; str << " SectorSize: " << SectorSize << x; + str << " SectorPayloadSize: " << SectorPayloadSize() << x; str << " SysLogSectorCount: " << SysLogSectorCount << x; str << " SystemChunkCount: " << SystemChunkCount << x; str << " FormatText: \"" << FormatText << "\"" << x; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_encryption_threads.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_encryption_threads.cpp new file mode 100644 index 000000000000..2d35fb844ec9 --- /dev/null +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_encryption_threads.cpp @@ -0,0 +1,105 @@ +#include "blobstorage_pdisk_encryption_threads.h" +#include "blobstorage_pdisk_requestimpl.h" + +#include +#include + +namespace NKikimr { +namespace NPDisk { + +TEncryptionThread::TEncryptionThread(TString name) : Name(name) {} + +size_t TEncryptionThread::GetQueuedActions() { + return Queue.GetWaitingSize(); +} + +void* TEncryptionThread::ThreadProc() { + SetCurrentThreadName(Name.data()); + bool isWorking = true; + + while (isWorking) { + TAtomicBase itemCount = Queue.GetWaitingSize(); + if (itemCount > 0) { + for (TAtomicBase idx = 0; idx < itemCount; ++idx) { + TChunkWritePiece *piece = Queue.Pop(); + if (piece == nullptr) { + isWorking = false; + } else { + piece->Process(); + } + } + } else { + Queue.ProducedWaitI(); + } + } + return nullptr; +} + +void TEncryptionThread::Schedule(TChunkWritePiece *piece) { + Queue.Push(piece); +} + +TEncryptionThreads::TEncryptionThreads(size_t threadsCount) { + Y_VERIFY(threadsCount <= MAX_THREAD_COUNT, "too many encryption threads"); + Threads.reserve(16); + for (size_t i = 0; i < std::min(threadsCount, size_t(2)); i++) { + Threads.push_back(MakeHolder(TStringBuilder() << "PdEncrypt" << i)); + Threads.back()->Start(); + } + AvailableThreads = threadsCount; +} + +void TEncryptionThreads::SetThreadCount(size_t threadsCount) { + Y_VERIFY(threadsCount <= MAX_THREAD_COUNT, "too many encryption threads"); + for (size_t i = Threads.size(); i < threadsCount; i++) { + Threads.push_back(MakeHolder(TStringBuilder() << "PdEncrypt" << i)); + Threads.back()->Start(); + } + AvailableThreads = threadsCount; +} + + +void TEncryptionThreads::StopWork() { + for (auto& thread : Threads) { + thread->Schedule(nullptr); + } +} + +void TEncryptionThreads::Join() { + for (auto& thread : Threads) { + thread->Join(); + } + Threads.clear(); +} + +void TEncryptionThreads::Stop() { + StopWork(); + Join(); +} + +void TEncryptionThreads::Schedule(TChunkWritePiece* piece) noexcept { + if (!piece) { + StopWork(); + return; + } + + if (!AvailableThreads) { + piece->Process(); + return; + } + + auto min_it = Threads.begin(); + auto minQueueSize = (*min_it)->GetQueuedActions(); + for (auto it = Threads.begin() + 1; it != Threads.begin() + AvailableThreads; ++it) { + auto queueSize = (*it)->GetQueuedActions(); + if (queueSize < minQueueSize) { + minQueueSize = queueSize; + min_it = it; + } + } + Y_VERIFY(min_it != Threads.end()); + (*min_it)->Schedule(piece); +} + +} // NPDisk +} // NKikimr diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_encryption_threads.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_encryption_threads.h new file mode 100644 index 000000000000..2972f31f4bdf --- /dev/null +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_encryption_threads.h @@ -0,0 +1,45 @@ +#include "blobstorage_pdisk_completion.h" +#include "blobstorage_pdisk_util_countedqueuemanyone.h" + +#include +#include + +#include +#include + +namespace NKikimr { +namespace NPDisk { + +class TChunkWritePiece; + +class TEncryptionThread : public ISimpleThread { +public: + TEncryptionThread(TString name); + void *ThreadProc() override; + void Schedule(TChunkWritePiece *piece); + size_t GetQueuedActions(); +private: + TCountedQueueManyOne Queue; + TString Name; +}; +class TEncryptionThreads { +public: + static const size_t MAX_THREAD_COUNT = 4; + TVector> Threads; + size_t AvailableThreads; + + TEncryptionThreads(size_t threadsCount); + void SetThreadCount(size_t threadsCount); + + // Schedule action execution + // pass action = nullptr to quit + void Schedule(TChunkWritePiece* piece) noexcept; + + void StopWork(); + void Join(); + void Stop(); +}; + + +} // NPDisk +} // NKikimr diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 70e9cf79d2ce..3d157a56f9f4 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -36,6 +36,7 @@ TPDisk::TPDisk(std::shared_ptr pCtx, const TIntrusivePtrDeviceInFlight) , ReqCreator(PCtx, &Mon, &DriveModel, &EstimatedLogChunkIdx, cfg->SeparateHugePriorities) , ReorderingMs(cfg->ReorderingMs) + , EncryptionThreads(cfg->EncryptionThreadCount) , LogSeekCostLoop(2) , ExpectedDiskGuid(cfg->PDiskGuid) , PDiskCategory(cfg->PDiskCategory) @@ -65,6 +66,9 @@ TPDisk::TPDisk(std::shared_ptr pCtx, const TIntrusivePtrUseNoopScheduler, 0, 1); UseNoopSchedulerHDD = TControlWrapper(Cfg->UseNoopScheduler, 0, 1); + EncryptionThreadCountSSD = TControlWrapper(Cfg->EncryptionThreadCount, 0, TEncryptionThreads::MAX_THREAD_COUNT); + EncryptionThreadCountHDD = TControlWrapper(Cfg->EncryptionThreadCount, 0, TEncryptionThreads::MAX_THREAD_COUNT); + EncryptionThreadCountCached = PDiskCategory.IsSolidState() ? EncryptionThreadCountSSD : EncryptionThreadCountHDD; if (Cfg->SectorMap) { auto diskModeParams = Cfg->SectorMap->GetDiskModeParams(); @@ -98,6 +102,8 @@ TPDisk::TPDisk(std::shared_ptr pCtx, const TIntrusivePtrPDiskId, type = PDiskCategory]() { return TStringBuilder() << "PDisk DebugInfo# { Id# " << id << " Type# " << type.TypeStrLong() << " }"; }; @@ -298,6 +304,7 @@ void TPDisk::Stop() { P_LOG(PRI_NOTICE, BPD01, "Shutdown", (OwnerInfo, StartupOwnerInfo())); + // BlockDevice->Stop() frees all completion events that are released and sent to device BlockDevice->Stop(); // BlockDevice is stopped, the data will NOT hit the disk. @@ -328,11 +335,19 @@ void TPDisk::Stop() { TRequestBase::AbortDelete(req.Get(), PCtx->ActorSystem); } - for (; JointChunkWrites.size(); JointChunkWrites.pop()) { - auto* req = JointChunkWrites.front(); - Y_VERIFY_DEBUG_S(req->GetType() == ERequestType::RequestChunkWritePiece, - PCtx->PDiskLogPrefix << "Unexpected request type# " << TypeName(req)); - TRequestBase::AbortDelete(req, PCtx->ActorSystem); + { + /* Waits all thread pool tasks to be completed. + * Do we need to look for an implementation that drops all requests + * For Stop() to be faster? + */ + EncryptionThreads.Stop(); + + while (!JointChunkWrites.Empty()) { + auto piece = JointChunkWrites.Dequeue(); + //When control reaches this function, req->Completion is already freed by + //TCompletionChunkWritePiece::Release() calls + TRequestBase::AbortDelete(piece, PCtx->ActorSystem); + } } for (; JointLogWrites.size(); JointLogWrites.pop()) { @@ -782,11 +797,14 @@ ui32 TPDisk::AskVDisksToCutLogs(TOwner ownerFilter, bool doForce) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Chunk writing //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void TPDisk::ChunkWritePiecePlain(TChunkWrite *evChunkWrite) { + +void TPDisk::ChunkWritePiecePlain(TChunkWritePiece *piece) { + auto& evChunkWrite = piece->ChunkWrite; + Y_ENSURE(evChunkWrite->Pieces == 1); ui32 chunkIdx = evChunkWrite->ChunkIdx; Y_VERIFY_S(chunkIdx != 0, PCtx->PDiskLogPrefix); - auto& comp = evChunkWrite->Completion; + auto& headComp = evChunkWrite->Completion; const ui32 count = evChunkWrite->PartsPtr->Size(); const void* buff = nullptr; ui32 chunkOffset = evChunkWrite->Offset; @@ -807,7 +825,7 @@ void TPDisk::ChunkWritePiecePlain(TChunkWrite *evChunkWrite) { && size % Format.SectorSize == 0) { newSize = size; } else { - newSize = comp->CompactBuffer(chunkOffset % Format.SectorSize, Format.SectorSize); + newSize = headComp->CompactBuffer(chunkOffset % Format.SectorSize, Format.SectorSize); auto log = [&] () { TStringStream str; @@ -825,29 +843,30 @@ void TPDisk::ChunkWritePiecePlain(TChunkWrite *evChunkWrite) { (size, size), (newSize, newSize), (Sizes, log())); *Mon.WriteBufferCompactedBytes += size; } - buff = comp->GetBuffer(); + buff = headComp->GetBuffer(); P_LOG(PRI_DEBUG, BPD01, "Chunk write", (ChunkIdx, chunkIdx), (Encrypted, false), (format_ChunkSize, Format.ChunkSize), (aligndown, AlignDown(chunkOffset, Format.SectorSize)), (chunkOffset, chunkOffset), (Size, size), (NewSize, newSize), (diskOffset, diskOffset), (count, count)); auto traceId = evChunkWrite->Span.GetTraceId(); - evChunkWrite->Completion->Orbit = std::move(evChunkWrite->Orbit); - BlockDevice->PwriteAsync(buff, newSize, diskOffset, comp.Release(), evChunkWrite->ReqId, &traceId); + BlockDevice->PwriteAsync(buff, newSize, diskOffset, piece->Completion.Release(), evChunkWrite->ReqId, &traceId); evChunkWrite->IsReplied = true; } -bool TPDisk::ChunkWritePieceEncrypted(TChunkWrite *evChunkWrite, TChunkWriter& writer, ui32 bytesAvailable) { +void TPDisk::ChunkWritePieceEncrypted(TChunkWritePiece *piece, TChunkWriter& writer, ui32 bytesAvailable) { + auto evChunkWrite = piece->ChunkWrite.Get(); const ui32 count = evChunkWrite->PartsPtr->Size(); - for (ui32 partIdx = evChunkWrite->CurrentPart; partIdx < count; ++partIdx) { - ui32 remainingPartSize = (*evChunkWrite->PartsPtr)[partIdx].second - evChunkWrite->CurrentPartOffset; + + for (ui32 partIdx = piece->PartIdx; partIdx < count; ++partIdx) { + ui32 remainingPartSize = (*evChunkWrite->PartsPtr)[partIdx].second - piece->PartOffset; auto traceId = evChunkWrite->Span.GetTraceId(); if (bytesAvailable < remainingPartSize) { ui32 sizeToWrite = bytesAvailable; if (sizeToWrite > 0) { ui8 *data = (ui8*)(*evChunkWrite->PartsPtr)[partIdx].first; if (data) { - ui8 *source = data + evChunkWrite->CurrentPartOffset; + ui8 *source = data + piece->PartOffset; NSan::CheckMemIsInitialized(source, sizeToWrite); writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &traceId); *Mon.BandwidthPChunkPayload += sizeToWrite; @@ -857,29 +876,31 @@ bool TPDisk::ChunkWritePieceEncrypted(TChunkWrite *evChunkWrite, TChunkWriter& w } evChunkWrite->RemainingSize -= sizeToWrite; evChunkWrite->BytesWritten += sizeToWrite; + piece->PartOffset = 0; } - evChunkWrite->CurrentPartOffset += sizeToWrite; - evChunkWrite->CurrentPart = partIdx; - return false; + piece->MarkReady(PCtx->PDiskLogPrefix); + if (piece->Completion) { + writer.Flush(evChunkWrite->ReqId, &traceId, piece->Completion.Release()); + } + return; } else { Y_VERIFY_S(remainingPartSize, PCtx->PDiskLogPrefix); ui32 sizeToWrite = remainingPartSize; bytesAvailable -= remainingPartSize; ui8 *data = (ui8*)(*evChunkWrite->PartsPtr)[partIdx].first; if (data) { - ui8 *source = data + evChunkWrite->CurrentPartOffset; + ui8 *source = data + piece->PartOffset; writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &traceId); *Mon.BandwidthPChunkPayload += sizeToWrite; } else { writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &traceId); *Mon.BandwidthPChunkPadding += sizeToWrite; } - evChunkWrite->CurrentPartOffset = 0; evChunkWrite->RemainingSize -= sizeToWrite; evChunkWrite->BytesWritten += sizeToWrite; + piece->PartOffset = 0; } } - Y_VERIFY_S(evChunkWrite->RemainingSize == 0, PCtx->PDiskLogPrefix); ui32 chunkIdx = evChunkWrite->ChunkIdx; P_LOG(PRI_DEBUG, BPD79, "ChunkWrite", @@ -895,32 +916,58 @@ bool TPDisk::ChunkWritePieceEncrypted(TChunkWrite *evChunkWrite, TChunkWriter& w } auto traceId = evChunkWrite->Span.GetTraceId(); - evChunkWrite->Completion->Orbit = std::move(evChunkWrite->Orbit); - writer.Flush(evChunkWrite->ReqId, &traceId, evChunkWrite->Completion.Release()); - evChunkWrite->IsReplied = true; - return true; + piece->MarkReady(PCtx->PDiskLogPrefix); + + if (!piece->ShouldDetach) { + P_LOG(PRI_DEBUG, BPD11, "Performing TChunkWritePiece write to block device", + (ReqId, piece->ChunkWrite->ReqId), + (chunkIdx, piece->ChunkWrite->ChunkIdx), + (PieceShift, piece->PieceShift), + (PieceSize, piece->PieceSize), + ); + LWTRACK(PDiskChunkWritePieceSendToDevice, + piece->Orbit, PCtx->PDiskId, piece->ChunkWrite->Owner, piece->ChunkWrite->ChunkIdx, + piece->PieceShift, piece->PieceSize + ); + } + + writer.Flush(evChunkWrite->ReqId, &traceId, piece->Completion.Release()); +} + +void TPDisk::PushChunkWrite(TChunkWritePiece *piece) { + JointChunkWrites.Enqueue(piece); } -bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pieceSize) { +void TPDisk::ChunkWritePiece(TChunkWritePiece *piece) { + P_LOG(PRI_DEBUG, BPD01, "TPDisk::ChunkWritePiece", + (ChunkIdx, piece->ChunkWrite->ChunkIdx), + (Offset, piece->PieceShift), + (Size, piece->PieceSize) + ); + auto evChunkWrite = piece->ChunkWrite.Get(); if (evChunkWrite->IsReplied) { - return true; + return; } - Y_VERIFY_S(evChunkWrite->BytesWritten == pieceShift, PCtx->PDiskLogPrefix); - TGuard guard(StateMutex); - Y_VERIFY_S(pieceShift % Format.SectorPayloadSize() == 0, PCtx->PDiskLogPrefix); - Y_VERIFY_S(pieceSize % Format.SectorPayloadSize() == 0 || pieceShift + pieceSize == evChunkWrite->TotalSize, - PCtx->PDiskLogPrefix << "pieceShift# " << pieceShift << " pieceSize# " << pieceSize + Y_ENSURE(piece->ChunkWrite->Completion, "Existing ChunkWritePiece should have a parent TCompletionChunkWrite"); + + piece->ChunkWrite->Completion->AddPart(); + if (piece->ChunkWrite->Completion->AllPartsStarted()) { + Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass, piece->ChunkWrite->LifeDurationMs(HPNow())); + } + + Y_VERIFY_S(piece->PieceShift % Format.SectorPayloadSize() == 0, PCtx->PDiskLogPrefix); + Y_VERIFY_S(piece->PieceSize % Format.SectorPayloadSize() == 0 || piece->PieceShift + piece->PieceSize == evChunkWrite->TotalSize, + PCtx->PDiskLogPrefix << "pieceShift# " << piece->PieceShift << " pieceSize# " << piece->PieceSize << " evChunkWrite->TotalSize# " << evChunkWrite->TotalSize); ui64 desiredSectorIdx = 0; ui64 sectorOffset = 0; ui64 lastSectorIdx; - if (!ParseSectorOffset(Format, PCtx->ActorSystem, PCtx->PDiskId, evChunkWrite->Offset + evChunkWrite->BytesWritten, - evChunkWrite->TotalSize - evChunkWrite->BytesWritten, desiredSectorIdx, lastSectorIdx, sectorOffset, + if (!ParseSectorOffset(Format, PCtx->ActorSystem, PCtx->PDiskId, evChunkWrite->Offset + piece->PieceShift, + evChunkWrite->TotalSize - piece->PieceShift, desiredSectorIdx, lastSectorIdx, sectorOffset, PCtx->PDiskLogPrefix)) { - guard.Release(); ui32 chunkIdx = evChunkWrite->ChunkIdx; Y_VERIFY_S(chunkIdx != 0, PCtx->PDiskLogPrefix); @@ -929,29 +976,28 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi (ui32)evChunkWrite->TotalSize, (ui32)chunkIdx, (ui32)evChunkWrite->Owner); P_LOG(PRI_ERROR, BPD01, err); SendChunkWriteError(*evChunkWrite, err, NKikimrProto::ERROR); - return true; + return; } if (evChunkWrite->ChunkEncrypted) { ui32 chunkIdx = evChunkWrite->ChunkIdx; Y_VERIFY_S(chunkIdx != 0, PCtx->PDiskLogPrefix); - TChunkState &state = ChunkState[chunkIdx]; - state.CurrentNonce = state.Nonce + (ui64)desiredSectorIdx; + const TChunkState &state = ChunkState[chunkIdx]; + auto currentNonce = state.Nonce + (ui64)desiredSectorIdx; ui32 dataChunkSizeSectors = Format.ChunkSize / Format.SectorSize; - TChunkWriter writer(Mon, *BlockDevice.Get(), Format, state.CurrentNonce, Format.ChunkKey, BufferPool.Get(), + auto writer = TChunkWriter(Mon, *BlockDevice.Get(), Format, currentNonce, Format.ChunkKey, BufferPool.Get(), desiredSectorIdx, dataChunkSizeSectors, Format.MagicDataChunk, chunkIdx, nullptr, desiredSectorIdx, - nullptr, PCtx, &DriveModel, Cfg->EnableSectorEncryption); + nullptr, PCtx, &DriveModel, Cfg->EnableSectorEncryption, piece->ShouldDetach); - guard.Release(); - bool end = ChunkWritePieceEncrypted(evChunkWrite, writer, pieceSize); - LWTRACK(PDiskChunkWriteLastPieceSendToDevice, evChunkWrite->Orbit, PCtx->PDiskId, evChunkWrite->Owner, chunkIdx, pieceShift, pieceSize); - return end; + piece->Span.Event("PDisk.ChunkWritePiece.EncryptionStart"); + ChunkWritePieceEncrypted(piece, writer, piece->PieceSize); + if (piece->ShouldDetach) { + piece->ChunkWriter = writer.ExportBufferedWriter(); + } } else { - guard.Release(); - ChunkWritePiecePlain(evChunkWrite); - return true; + ChunkWritePiecePlain(piece); } } @@ -969,6 +1015,9 @@ void TPDisk::SendChunkWriteError(TChunkWrite &chunkWrite, const TString &errorRe PCtx->ActorSystem->Send(chunkWrite.Sender, ev.release()); Mon.GetWriteCounter(chunkWrite.PriorityClass)->CountResponse(); chunkWrite.IsReplied = true; + if (chunkWrite.Completion) { + chunkWrite.Completion->IsReplied = true; + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1391,7 +1440,6 @@ TVector TPDisk::AllocateChunkForOwner(const TRequestBase *req, const PCtx->PDiskLogPrefix << "chunkIdx# " << chunkIdx << " desired ownerId# " << req->Owner << " state# " << state.ToString()); state.Nonce = chunkNonce; - state.CurrentNonce = chunkNonce; P_LOG(PRI_INFO, BPD01, "chunk is allocated", (ChunkIdx, chunkIdx), (OldOwnerId, state.OwnerId), @@ -2127,7 +2175,6 @@ void TPDisk::ForceDeleteChunk(TChunkIdx chunkIdx) { state.OwnerId = OwnerUnallocated; state.CommitState = TChunkState::FREE; state.Nonce = 0; - state.CurrentNonce = 0; Keeper.PushFreeOwnerChunk(owner, chunkIdx); break; case TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS: @@ -2293,7 +2340,7 @@ void TPDisk::Slay(TSlay &evSlay) { TVDiskID vDiskId = evSlay.VDiskId; vDiskId.GroupGeneration = -1; auto it = VDiskOwners.find(vDiskId); - + for (auto& pendingInit : PendingYardInits) { if (vDiskId == pendingInit->VDiskIdWOGeneration()) { TStringStream str; @@ -2358,34 +2405,44 @@ void TPDisk::Slay(TSlay &evSlay) { void TPDisk::ProcessChunkWriteQueue() { auto start = HPNow(); - size_t initialSize = JointChunkWrites.size(); + size_t initialSize = JointChunkWrites.Size(); size_t processed = 0; size_t processedBytes = 0; double processedCostMs = 0; - while (JointChunkWrites.size()) { - TRequestBase *req = JointChunkWrites.front(); - JointChunkWrites.pop(); - req->Span.Event("PDisk.BeforeBlockDevice"); + while (!JointChunkWrites.Empty()) { + auto piece = JointChunkWrites.Dequeue(); + if (!piece->Processed) { + if (piece->ShouldDetach) { + EncryptionThreads.Schedule(piece); + continue; //not counts as processed; it is only scheduled + } + ChunkWritePiece(piece); + } - Y_VERIFY_S(req->GetType() == ERequestType::RequestChunkWritePiece, PCtx->PDiskLogPrefix - << "Unexpected request type# " << ui64(req->GetType()) - << " TypeName# " << TypeName(*req) << " in JointChunkWrites"); - TChunkWritePiece *piece = static_cast(req); processed++; processedBytes += piece->PieceSize; processedCostMs += piece->GetCostMs(); - P_LOG(PRI_DEBUG, BPD01, "ChunkWritePiece", - (ChunkIdx, piece->ChunkWrite->ChunkIdx), - (Offset, piece->PieceShift), - (Size, piece->PieceSize) - ); - bool lastPart = ChunkWritePiece(piece->ChunkWrite.Get(), piece->PieceShift, piece->PieceSize); - if (lastPart) { - Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass, piece->ChunkWrite->LifeDurationMs(HPNow())); + // For plain chunks, nothing happens here, and ChunkWriter is nullptr + // For no encryption threads, we do not hold ChunkWriter, so nothing happens also + if (piece->ChunkWriter) { + P_LOG(PRI_DEBUG, BPD11, "Performing TChunkWritePiece write to block device", + (ReqId, piece->ChunkWrite->ReqId), + (chunkIdx, piece->ChunkWrite->ChunkIdx), + (PieceShift, piece->PieceShift), + (PieceSize, piece->PieceSize), + ); + LWTRACK(PDiskChunkWritePieceSendToDevice, + piece->Orbit, PCtx->PDiskId, piece->ChunkWrite->Owner, piece->ChunkWrite->ChunkIdx, + piece->PieceShift, piece->PieceSize + ); + piece->ChunkWriter->WriteToBlockDevice(); } + + // One more flush is sent to BlockDevice from TSectorWriter destructor. delete piece; + // prevent the thread from being stuck for long if (UseNoopSchedulerCached && processed >= Cfg->SchedulerCfg.MaxChunkWritesPerCycle && HPMilliSecondsFloat(HPNow() - start) > Cfg->SchedulerCfg.MaxChunkWritesDurationPerCycleMs) { @@ -2804,6 +2861,8 @@ bool TPDisk::Initialize() { REGISTER_LOCAL_CONTROL(ForsetiOpPieceSizeRot); icb->RegisterSharedControl(UseNoopSchedulerHDD, "PDiskControls.UseNoopSchedulerHDD"); icb->RegisterSharedControl(UseNoopSchedulerSSD, "PDiskControls.UseNoopSchedulerSSD"); + icb->RegisterSharedControl(EncryptionThreadCountHDD, "PDiskControls.EncryptionThreadCountHDD"); + icb->RegisterSharedControl(EncryptionThreadCountSSD, "PDiskControls.EncryptionThreadCountSSD"); if (Cfg->SectorMap) { auto diskModeParams = Cfg->SectorMap->GetDiskModeParams(); @@ -2959,7 +3018,7 @@ NKikimrProto::EReplyStatus TPDisk::CheckOwnerAndRound(TRequestBase* req, TString if (!IsOwnerUser(req->Owner)) { if (req->Owner == OwnerUnallocated && req->OwnerRound == 0) { - return NKikimrProto::OK; + return NKikimrProto::OK; } err << " ownerId# " << req->Owner << " < Begin# " << (ui32)OwnerBeginUser << " or >= End# " << (ui32)OwnerEndUser << " Marker# BPD72"; @@ -3182,7 +3241,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { --state.OperationsInProgress; --inFlight->ChunkWrites; }; - ev.Completion = MakeHolder(ev.Sender, result.release(), &Mon, PCtx->PDiskId, + ev.Completion = new TCompletionChunkWrite(ev.Sender, result.release(), &Mon, PCtx->PDiskId, ev.CreationTime, ev.TotalSize, ev.PriorityClass, std::move(onDestroy), ev.ReqId, ev.Span.CreateChild(TWilson::PDiskBasic, "PDisk.CompletionChunkWrite")); ev.Completion->Parts = ev.PartsPtr; @@ -3361,19 +3420,44 @@ void TPDisk::PushRequestToScheduler(TRequestBase *request) { ? ui64(ForsetiOpPieceSizeCached) * Format.SectorPayloadSize() / Format.SectorSize : whole->TotalSize; const ui32 jobCount = (whole->TotalSize + jobSizeLimit - 1) / jobSizeLimit; - + whole->Completion->Pieces = jobCount; ui32 remainingSize = whole->TotalSize; + + TStackVec ChunkWritePiecesBuffer; + ui32 partIdx = 0; + ui32 partsSize = 0; + for (ui32 idx = 0; idx < jobCount; ++idx) { auto span = request->Span.CreateChild(TWilson::PDiskDetailed, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END); + bool isLast = idx == jobCount - 1; span.Attribute("small_job_idx", idx) - .Attribute("is_last_piece", idx == jobCount - 1); + .Attribute("is_last_piece", isLast); ui32 jobSize = Min(remainingSize, jobSizeLimit); - TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * jobSizeLimit, jobSize, std::move(span)); - piece->GateId = whole->GateId; + auto piece = new TChunkWritePiece(this, whole, idx * jobSizeLimit, jobSize, isLast, std::move(span)); + for (; partIdx < whole->PartsPtr->Size(); ++partIdx) { + auto partSize = (*whole->PartsPtr)[partIdx].second; + if (piece->PieceShift < partsSize + partSize) { + piece->PartOffset = piece->PieceShift - partsSize; + break; + } + partsSize += partSize; + } + piece->PartIdx = partIdx; piece->EstimateCost(DriveModel); - AddJobToScheduler(piece, request->JobKind); remainingSize -= jobSize; + ChunkWritePiecesBuffer.emplace_back(std::move(piece)); } + + // to register all pieces first, then schedule + for (ui32 idx = 0; idx < jobCount; idx++) { + auto& piece = ChunkWritePiecesBuffer[idx]; + P_LOG(PRI_INFO, BPD01, "PDiskChunkWritePieceAddToScheduler", (idx, idx), (jobSizeLimit, jobSizeLimit), + (pieceShift, piece->PieceShift), (pieceSize, piece->PieceSize)); + LWTRACK(PDiskChunkWritePieceAddToScheduler, whole->Orbit, PCtx->PDiskId, idx, piece->PieceShift, + piece->PieceSize); + AddJobToScheduler(piece, request->JobKind); + } + ChunkWritePiecesBuffer.clear(); Y_VERIFY_S(remainingSize == 0, PCtx->PDiskLogPrefix << "remainingSize# " << remainingSize); } else if (request->GetType() == ERequestType::RequestChunkRead) { TIntrusivePtr read = std::move(static_cast(request)->SelfPointer); @@ -3508,7 +3592,19 @@ void TPDisk::RouteRequest(TRequestBase *request) { break; } case ERequestType::RequestChunkWritePiece: - JointChunkWrites.push(request); + { + + request->Span.Event("PDisk.BeforeBlockDevice"); + + Y_VERIFY_S(request->GetType() == ERequestType::RequestChunkWritePiece, PCtx->PDiskLogPrefix + << "Unexpected request type# " << ui64(request->GetType()) + << " TypeName# " << TypeName(*request) << " in ChunkEncoder"); + + TChunkWritePiece *piece = static_cast(request); + + JointChunkWrites.Enqueue(piece); + } + break; case ERequestType::RequestChunkTrim: { @@ -3776,9 +3872,10 @@ void TPDisk::Update() { ForsetiOpPieceSizeCached = PDiskCategory.IsSolidState() ? ForsetiOpPieceSizeSsd : ForsetiOpPieceSizeRot; ForsetiOpPieceSizeCached = Min(ForsetiOpPieceSizeCached, Cfg->BufferPoolBufferSizeBytes); ForsetiOpPieceSizeCached = AlignDown(ForsetiOpPieceSizeCached, Format.SectorSize); - auto prev = UseNoopSchedulerCached; UseNoopSchedulerCached = PDiskCategory.IsSolidState() ? UseNoopSchedulerSSD : UseNoopSchedulerHDD; + EncryptionThreadCountCached = PDiskCategory.IsSolidState() ? EncryptionThreadCountSSD : EncryptionThreadCountHDD; + EncryptionThreads.SetThreadCount(EncryptionThreadCountCached); // if we are going to start using noop scheduler then drain Forseti scheduler if (!prev && UseNoopSchedulerCached) { while (!ForsetiScheduler.IsEmpty()) { @@ -3803,7 +3900,7 @@ void TPDisk::Update() { } // Processing - bool isNonLogWorkloadPresent = !JointChunkWrites.empty() || !FastOperationsQueue.empty() || + bool isNonLogWorkloadPresent = !JointChunkWrites.Empty() || !FastOperationsQueue.empty() || !JointChunkReads.empty() || !JointLogReads.empty() || !JointChunkTrims.empty() || !JointChunkForgets.empty(); bool isLogWorkloadPresent = !JointLogWrites.empty(); bool isNothingToDo = true; @@ -3917,7 +4014,7 @@ void TPDisk::Update() { } auto entireUpdateMs = Mon.UpdateDurationTracker.UpdateEnded(); - LWTRACK(PDiskUpdateEnded, UpdateCycleOrbit, PCtx->PDiskId, entireUpdateMs ); + LWTRACK(PDiskUpdateEnded, UpdateCycleOrbit, PCtx->PDiskId, entireUpdateMs); UpdateCycleOrbit.Reset(); *Mon.PDiskThreadCPU = ThreadCPUTime(); } @@ -4302,7 +4399,7 @@ void TPDisk::ProgressShredState() { << " ShredGeneration# " << ShredGeneration << " ShredState# " << (ui32)ShredState); // Send/schedule a request to retry - THolder completion(new TCompletionEventSender(this, PCtx->PDiskActor, new NPDisk::TEvContinueShred())); + THolder completion(new TCompletionEventSender(this, PCtx->PDiskActor, new NPDisk::TEvContinueShred())); if (ReleaseUnusedLogChunks(completion.Get())) { ContinueShredsInFlight++; WriteSysLogRestorePoint(completion.Release(), TReqId(TReqId::ShredPDisk, 0), {}); @@ -4326,7 +4423,7 @@ void TPDisk::ProgressShredState() { } } // Looks good, but there still can be chunks that need to be shredded still int transition between states. - // For example, log chunks are removed from the log chunk list on log cut but added to free chunk list on log cut + // For example, log chunks are removed from the log chunk list on log cut but added to free chunk list on log cut // write operation completion. So, walk through the whole chunk list and check. for (ui32 chunkIdx = Format.SystemChunkCount; chunkIdx < ChunkState.size(); ++chunkIdx) { TChunkState &state = ChunkState[chunkIdx]; @@ -4341,7 +4438,7 @@ void TPDisk::ProgressShredState() { << ", there are already ContinueShredsInFlight# " << ContinueShredsInFlight.load() << " so just wait for it to arrive. " << " ShredGeneration# " << ShredGeneration); - return; + return; } else { LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, "PDisk# " << PCtx->PDiskId diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index eec5d6831850..8deb906ab835 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -3,13 +3,10 @@ #include "blobstorage_pdisk_blockdevice.h" #include -#include "blobstorage_pdisk_chunk_tracker.h" -#include "blobstorage_pdisk_crypto.h" -#include "blobstorage_pdisk_data.h" +#include "blobstorage_pdisk_chunk_write_queue.h" +#include "blobstorage_pdisk_encryption_threads.h" #include "blobstorage_pdisk_delayed_cost_loop.h" #include "blobstorage_pdisk_drivemodel.h" -#include "blobstorage_pdisk_free_chunks.h" -#include "blobstorage_pdisk_gate.h" #include "blobstorage_pdisk_keeper.h" #include "blobstorage_pdisk_req_creator.h" #include "blobstorage_pdisk_requestimpl.h" @@ -82,13 +79,14 @@ class TPDisk : public IPDisk { TVector JointLogReads; std::queue> JointChunkReads; - std::queue JointChunkWrites; + TChunkWritePieceQueue JointChunkWrites; std::queue JointLogWrites; TVector JointChunkTrims; TVector> JointChunkForgets; TVector> FastOperationsQueue; TDeque PausedQueue; std::set> PendingYardInits; + TEncryptionThreads EncryptionThreads; ui64 LastFlushId = 0; bool IsQueuePaused = false; bool IsQueueStep = false; @@ -110,7 +108,10 @@ class TPDisk : public IPDisk { TControlWrapper ForsetiOpPieceSizeRot; TControlWrapper UseNoopSchedulerSSD; TControlWrapper UseNoopSchedulerHDD; + TControlWrapper EncryptionThreadCountSSD; + TControlWrapper EncryptionThreadCountHDD; bool UseNoopSchedulerCached = false; + size_t EncryptionThreadCountCached; // SectorMap Controls TControlWrapper SectorMapFirstSectorReadRate; @@ -313,9 +314,10 @@ class TPDisk : public IPDisk { bool parseCommitMessage); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Chunk writing - bool ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pieceSize); - void ChunkWritePiecePlain(TChunkWrite *evChunkWrite); - bool ChunkWritePieceEncrypted(TChunkWrite *evChunkWrite, TChunkWriter &writer, ui32 bytesAvailable); + void PushChunkWrite(TChunkWritePiece *piece); + void ChunkWritePiece(TChunkWritePiece *piece); + void ChunkWritePiecePlain(TChunkWritePiece *piece); + void ChunkWritePieceEncrypted(TChunkWritePiece *piece, TChunkWriter &writer, ui32 bytesAvailable); void SendChunkWriteError(TChunkWrite &evChunkWrite, const TString &errorReason, NKikimrProto::EReplyStatus status); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Chunk reading diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp index caaa5568c76d..d2ea14aa7d49 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp @@ -1,5 +1,6 @@ #include "blobstorage_pdisk_requestimpl.h" #include "blobstorage_pdisk_completion_impl.h" +#include "blobstorage_pdisk_impl.h" namespace NKikimr { namespace NPDisk { @@ -64,6 +65,11 @@ TChunkWrite::TChunkWrite(const NPDisk::TEvChunkWrite &ev, const TActorId &sender SlackSize = Max(); } +void TChunkWrite::Abort(TActorSystem* actorSystem) { + if (!AtomicSwap(&Aborted, true)) { + actorSystem->Send(Sender, new NPDisk::TEvChunkWriteResult(NKikimrProto::CORRUPTED, ChunkIdx, Cookie, 0, "TChunkWrite is being aborted")); + } +} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TChunkRead @@ -87,6 +93,50 @@ void TChunkRead::Abort(TActorSystem* actorSystem) { } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TChunkWritePiece +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +TChunkWritePiece::TChunkWritePiece(TPDisk *pdisk, TIntrusivePtr &write, ui32 pieceShift, ui32 pieceSize, bool isLast, NWilson::TSpan span) + : TRequestBase(write->Sender, write->ReqId, write->Owner, write->OwnerRound, write->PriorityClass, std::move(span)) + , PDisk(pdisk) + , ChunkWrite(write) + , PieceShift(pieceShift) + , PieceSize(pieceSize) + , ShouldDetach(ChunkWrite->ChunkEncrypted && pdisk->EncryptionThreadCountCached) +{ + + /* + * If encryption threads are used, each TChunkWritePiece has its own completion. + * If no encryption threads are used, last TChunkWritePiece completion tracks whole write completion. + */ + if (ShouldDetach) { + Completion = MakeHolder(this, ChunkWrite->Completion); + } else if (isLast) { + Completion = THolder(ChunkWrite->Completion); + } + + ChunkWrite->RegisterPiece(); + GateId = ChunkWrite->GateId; +} + +TChunkWritePiece::~TChunkWritePiece() { +} + +void TChunkWritePiece::Process() { + PDisk->ChunkWritePiece(this); + PDisk->PushChunkWrite(this); +} + +void TChunkWritePiece::MarkReady(const TString& logPrefix) { + Processed = true; + auto evChunkWrite = ChunkWrite.Get(); + ui8 old = evChunkWrite->ReadyForBlockDevice.fetch_add(1, std::memory_order::seq_cst); + if (old + 1 == evChunkWrite->Pieces) { + Y_VERIFY_S(evChunkWrite->RemainingSize == 0, logPrefix); + Y_VERIFY_S(evChunkWrite->Completion, logPrefix); + evChunkWrite->Completion->Orbit = std::move(evChunkWrite->Orbit); + } +} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TChunkReadPiece //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h index fe028d0fc9be..184b6cea3451 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h @@ -478,11 +478,12 @@ class TChunkReadPiece : public TRequestBase { } }; - -class TCompletionChunkWrite; // // TChunkWrite // +class TCompletionChunkWrite; +class TCompletionChunkWritePiece; + class TChunkWrite : public TRequestBase { public: ui32 ChunkIdx; @@ -495,17 +496,16 @@ class TChunkWrite : public TRequestBase { bool ChunkEncrypted = true; ui32 TotalSize; - ui32 CurrentPart = 0; - ui32 CurrentPartOffset = 0; - ui32 RemainingSize = 0; + std::atomic RemainingSize = 0; ui32 SlackSize; - ui32 BytesWritten = 0; + std::atomic BytesWritten = 0; TAtomic Pieces = 0; TAtomic Aborted = 0; + std::atomic ReadyForBlockDevice = 0; - THolder Completion; + TCompletionChunkWrite* Completion = nullptr; TChunkWrite(const NPDisk::TEvChunkWrite &ev, const TActorId &sender, TReqId reqId, NWilson::TSpan span); @@ -545,30 +545,31 @@ class TChunkWrite : public TRequestBase { } } - void Abort(TActorSystem* actorSystem) override { - if (!AtomicSwap(&Aborted, true)) { - actorSystem->Send(Sender, new NPDisk::TEvChunkWriteResult(NKikimrProto::CORRUPTED, ChunkIdx, Cookie, 0, "TChunkWrite is being aborted")); - } - } + void Abort(TActorSystem* actorSystem) override; }; // // TChunkWritePiece // + +class TBufferedWriter; + class TChunkWritePiece : public TRequestBase { public: + TPDisk *PDisk; + TIntrusivePtr ChunkWrite; ui32 PieceShift; ui32 PieceSize; + ui32 PartIdx; + ui32 PartOffset = 0; + THolder ChunkWriter; + THolder Completion; + bool Processed = false; + bool ShouldDetach; - TChunkWritePiece(TIntrusivePtr &write, ui32 pieceShift, ui32 pieceSize, NWilson::TSpan span) - : TRequestBase(write->Sender, write->ReqId, write->Owner, write->OwnerRound, write->PriorityClass, std::move(span)) - , ChunkWrite(write) - , PieceShift(pieceShift) - , PieceSize(pieceSize) - { - ChunkWrite->RegisterPiece(); - } + TChunkWritePiece(TPDisk *pdisk, TIntrusivePtr &write, ui32 pieceShift, ui32 pieceSize, bool isLast, NWilson::TSpan span); + ~TChunkWritePiece(); ERequestType GetType() const override { return ERequestType::RequestChunkWritePiece; @@ -584,6 +585,9 @@ class TChunkWritePiece : public TRequestBase { ChunkWrite->AbortPiece(actorSystem); } } + + void Process(); + void MarkReady(const TString& logPrefix); }; // diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h index ee5bbae6eb84..fbac060cd414 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h @@ -274,7 +274,6 @@ struct TChunkState { }; ui64 Nonce; - ui64 CurrentNonce; ui64 PreviousNonce; std::atomic OperationsInProgress; TOwner OwnerId; @@ -284,7 +283,6 @@ struct TChunkState { ui64 ShredGeneration; TChunkState() : Nonce(0) - , CurrentNonce(0) , PreviousNonce(0) , OperationsInProgress(0) , OwnerId(OwnerUnallocated) @@ -306,7 +304,6 @@ struct TChunkState { TStringStream str; str << "{ "; OUT_VAR(Nonce); - OUT_VAR(CurrentNonce); OUT_VAR(PreviousNonce); OUT_VAR(OperationsInProgress.load()); OUT_VAR(OwnerId); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_thread.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_thread.h index 54387e931468..eac00f30bb79 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_thread.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_thread.h @@ -55,4 +55,3 @@ class TPDiskThread : public TThread { } // NPDisk } // NKikimr - diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp index 51d2ba494a3f..69eb3a904254 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp @@ -1,5 +1,6 @@ #include "blobstorage_pdisk_ut.h" +#include "blobstorage_pdisk.h" #include "blobstorage_pdisk_abstract.h" #include "blobstorage_pdisk_impl.h" #include "blobstorage_pdisk_ut_env.h" @@ -551,39 +552,47 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { } Y_UNIT_TEST(TestFakeErrorPDiskManyChunkWrite) { - TActorTestContext testCtx{{}}; - testCtx.TestCtx.SectorMap->IoErrorEveryNthRequests = 1000; - - const TVDiskID vDiskID(0, 1, 0, 0, 0); - const auto evInitRes = testCtx.TestResponse( - new NPDisk::TEvYardInit(2, vDiskID, testCtx.TestCtx.PDiskGuid), - NKikimrProto::OK); + for (int EncryptionThreadCount : {0, 1, 3}) { + TActorTestContext testCtx{{}}; + testCtx.TestCtx.SectorMap->IoErrorEveryNthRequests = 1000; - const auto evReserveRes = testCtx.TestResponse( - new NPDisk::TEvChunkReserve(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 1), - NKikimrProto::OK); - UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1); - const ui32 reservedChunk = evReserveRes->ChunkIds.front(); + Cerr << "EncryptionThreadCount# " << EncryptionThreadCount << Endl; + testCtx.SafeRunOnPDisk([=] (NPDisk::TPDisk* pdisk) { + pdisk->EncryptionThreadCountHDD = EncryptionThreadCount; + pdisk->EncryptionThreadCountSSD = EncryptionThreadCount; + }); - ui32 errors = 0; - bool printed = false; - for (ui32 i = 0; i < 10'000; ++i) { - testCtx.Send(new NPDisk::TEvChunkWrite(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, - reservedChunk, 0, new NPDisk::TEvChunkWrite::TAlignedParts(PrepareData(1024)), nullptr, false, 0)); + const TVDiskID vDiskID(0, 1, 0, 0, 0); + const auto evInitRes = testCtx.TestResponse( + new NPDisk::TEvYardInit(2, vDiskID, testCtx.TestCtx.PDiskGuid), + NKikimrProto::OK); - const auto res = testCtx.Recv(); - //Ctest << res->ToString() << Endl; - if (res->Status != NKikimrProto::OK) { - ++errors; - if (!printed) { - printed = true; - Ctest << res->ToString() << Endl; + const auto evReserveRes = testCtx.TestResponse( + new NPDisk::TEvChunkReserve(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, 1), + NKikimrProto::OK); + UNIT_ASSERT(evReserveRes->ChunkIds.size() == 1); + const ui32 reservedChunk = evReserveRes->ChunkIds.front(); + + ui32 errors = 0; + bool printed = false; + for (ui32 i = 0; i < 10'000; ++i) { + testCtx.Send(new NPDisk::TEvChunkWrite(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, + reservedChunk, 0, new NPDisk::TEvChunkWrite::TAlignedParts(PrepareData(1024)), nullptr, false, 0)); + + const auto res = testCtx.Recv(); + //Ctest << res->ToString() << Endl; + if (res->Status != NKikimrProto::OK) { + ++errors; + if (!printed) { + printed = true; + Ctest << res->ToString() << Endl; + } + } else { + UNIT_ASSERT(errors == 0); } - } else { - UNIT_ASSERT(errors == 0); } + UNIT_ASSERT(errors > 0); } - UNIT_ASSERT(errors > 0); } Y_UNIT_TEST(TestSIGSEGVInTUndelivered) { @@ -1165,12 +1174,15 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { ui32 logBuffSize = 250; ui32 chunkBuffSize = 128_KB; - for (ui32 testCase = 0; testCase < 8; testCase++) { + for (ui32 testCase = 0; testCase < 16; testCase++) { Cerr << "restart# " << bool(testCase & 4) << " start with noop scheduler# " << bool(testCase & 1) - << " end with noop scheduler# " << bool(testCase & 2) << Endl; + << " end with noop scheduler# " << bool(testCase & 2) << " multiple encryption threads# " + << bool(testCase & 8)<< Endl; testCtx.SafeRunOnPDisk([=] (NPDisk::TPDisk* pdisk) { pdisk->UseNoopSchedulerHDD = testCase & 1; pdisk->UseNoopSchedulerSSD = testCase & 1; + pdisk->EncryptionThreadCountHDD = (testCase & 8 > 0) ? 3 : 0; + pdisk->EncryptionThreadCountSSD = (testCase & 8 > 0) ? 3 : 0; }); vdisk.InitFull(); @@ -1291,6 +1303,69 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { } } + Y_UNIT_TEST(TestUnalignedChunkWriteParts) { + size_t offset = 17954752; + size_t size = 772160; + size_t partsCount = 4; + TVector partSize = {764032, 16, 676, 7436}; + TReallyFastRng32 rng(12345); + + auto counter = MakeIntrusive<::NMonitoring::TCounterForPtr>(); + TMemoryConsumer consumer(counter); + TRope rope; + size_t createdBytes = 0; + if (size >= partsCount) { + for (size_t i = 0; i < partsCount; ++i) { + TRope x(PrepareData(partSize[i])); + createdBytes += x.size(); + rope.Insert(rope.End(), std::move(x)); + } + } + UNIT_ASSERT(createdBytes == size); + auto parts = MakeIntrusive(std::move(rope), size); + + TActorTestContext testCtx({ false }); + TVDiskMock vdisk(&testCtx); + + + testCtx.SafeRunOnPDisk([=] (NPDisk::TPDisk* pdisk) { + pdisk->EncryptionThreadCountHDD = 3; + pdisk->EncryptionThreadCountSSD = 3; + }); + + vdisk.InitFull(); + vdisk.ReserveChunk(); + auto chunk = *vdisk.Chunks[EChunkState::RESERVED].begin(); + vdisk.CommitReservedChunks(); + + testCtx.TestResponse( + new NPDisk::TEvChunkWrite(vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound, + chunk, offset, parts, nullptr, false, 0), + NKikimrProto::OK); + auto read = testCtx.TestResponse( + new NPDisk::TEvChunkRead(vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound, + chunk, offset, size, 0, 0), + NKikimrProto::OK); + + UNIT_ASSERT(read->Data.IsReadable()); + UNIT_ASSERT_EQUAL(ConvertIPartsToString(parts.Get()), read->Data.ToString()); + size_t partialOffset = 18718784; + size_t partialSize = 8128; + auto partialRead = testCtx.TestResponse( + new NPDisk::TEvChunkRead(vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound, + chunk, partialOffset, partialSize, 0, 0), + NKikimrProto::OK); + UNIT_ASSERT(partialRead->Data.IsReadable()); + UNIT_ASSERT_VALUES_EQUAL(partialRead->Data.ToString().size(), 8128); + offset = 0; + for (size_t i = 1; i < partsCount; i++) { + auto expected = (*parts.Get())[i]; + int diff = memcmp(expected.first, const_cast(&partialRead->Data)->RawDataPtr(offset, expected.second), expected.second); + UNIT_ASSERT(!diff); + offset += expected.second; + } + } + Y_UNIT_TEST(PlainChunksWriteReadALot) { TActorTestContext testCtx{{ .PlainDataChunks = true, @@ -1343,6 +1418,7 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { Cerr << " total_speed# " << 2 * written / duration.SecondsFloat() / 1e9 << " GB/s" << Endl; } + Y_UNIT_TEST(ChunkWriteBadOffset) { TActorTestContext testCtx{{}}; @@ -1384,11 +1460,13 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { ui32 logBuffSize = 250; ui32 chunkBuffSize = 128_KB; - for (ui32 testCase = 0; testCase < 4; testCase++) { + for (ui32 testCase = 0; testCase < 8; testCase++) { Cerr << "testCase# " << testCase << Endl; auto cfg = testCtx.GetPDiskConfig(); cfg->PlainDataChunks = testCase & 1; + cfg->EncryptionThreadCount = testCase & 2; Cerr << "plainDataChunk# " << cfg->PlainDataChunks << Endl; + Cerr << "EncryptionThreadCount# " << cfg->EncryptionThreadCount << Endl; testCtx.UpdateConfigRecreatePDisk(cfg); vdisk.InitFull(); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.cpp index 41cbd2ce3503..5aaa0237263c 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.cpp @@ -6,11 +6,60 @@ namespace NKikimr { namespace NPDisk { + //////////////////////////////////////////////////////////////////////////// -// BufferedWriter +// TBlockDeviceWrite //////////////////////////////////////////////////////////////////////////// -void TBufferedWriter::WriteBufferWithFlush(TReqId reqId, NWilson::TTraceId *traceId, +TBufferedWriter::TBlockDeviceWrite::TBlockDeviceWrite(const TReqId& ReqId, TBuffer::TPtr &&buffer, ui64 StartOffset, ui64 DirtyFrom, ui64 DirtyTo, NWilson::TTraceId *TraceId, TActorSystem* ActorSystem) + : TBlockDeviceAction(ReqId), Deleter(ActorSystem), Buffer(std::unique_ptr(buffer.Release(), Deleter)), StartOffset(StartOffset), DirtyFrom(DirtyFrom), DirtyTo(DirtyTo), TraceId(*TraceId) +{ +} + +void TBufferedWriter::TBlockDeviceWrite::DoCall(IBlockDevice &BlockDevice) { + ui8 *source = Buffer->Data() + DirtyFrom - StartOffset; + ui32 sizeToWrite = (ui32)(DirtyTo - DirtyFrom); + BlockDevice.PwriteAsync(source, sizeToWrite, DirtyFrom, Buffer.release(), ReqId, &TraceId); +} + +TBufferedWriter::TBlockDeviceWrite::TReleaseWriteAction::TReleaseWriteAction(TActorSystem *ActorSystem) : ActorSystem(ActorSystem) {} + +void TBufferedWriter::TBlockDeviceWrite::TReleaseWriteAction::operator()(TBuffer *buffer) const { + if (buffer->FlushAction) { + //frees buffer->FlushAction + buffer->FlushAction->Release(ActorSystem); + } + + buffer->ReturnToPool(); +} + + +//////////////////////////////////////////////////////////////////////////// +// TBlockDeviceFlush +//////////////////////////////////////////////////////////////////////////// +TBufferedWriter::TBlockDeviceFlush::TBlockDeviceFlush(const TReqId& ReqId, TCompletionAction* completion, TActorSystem* actorSystem) + : TBlockDeviceAction(ReqId), Deleter(actorSystem), Completion(std::unique_ptr(completion, Deleter)) +{ +} + +void TBufferedWriter::TBlockDeviceFlush::DoCall(IBlockDevice &BlockDevice) { + BlockDevice.FlushAsync(Completion.release(), ReqId); +} + +TBufferedWriter::TBlockDeviceFlush::TReleaseFlushAction::TReleaseFlushAction(TActorSystem *ActorSystem) : ActorSystem(ActorSystem) {} + +void TBufferedWriter::TBlockDeviceFlush::TReleaseFlushAction::operator()(TCompletionAction *action) const { + if (action->FlushAction) { + action->FlushAction->Release(ActorSystem); + } + //frees action->FlushAction + action->Release(ActorSystem); +} + +//////////////////////////////////////////////////////////////////////////// +// BufferedWriter +//////////////////////////////////////////////////////////////////////////// +void TBufferedWriter::WriteToBuffer(TReqId reqId, NWilson::TTraceId *traceId, TCompletionAction *flushAction, ui32 chunkIdx) { static NWilson::TTraceId noTrace; if (DirtyFrom != DirtyTo) { @@ -20,19 +69,28 @@ void TBufferedWriter::WriteBufferWithFlush(TReqId reqId, NWilson::TTraceId *trac CurrentBuffer->FlushAction = flushAction; CurrentBuffer->CostNs = DriveModel->TimeForSizeNs(sizeToWrite, chunkIdx, TDriveModel::OP_TYPE_WRITE); Y_VERIFY_DEBUG_S(sizeToWrite <= CurrentBuffer->Size(), PCtx->PDiskLogPrefix); - BlockDevice.PwriteAsync(source, sizeToWrite, DirtyFrom, CurrentBuffer.Release(), reqId, traceId); + if (WithDelayedFlush) { + BlockDeviceActions.push(MakeHolder(reqId, std::move(CurrentBuffer), StartOffset, DirtyFrom, DirtyTo, traceId, PCtx->ActorSystem)); + } else { + BlockDevice.PwriteAsync(source, sizeToWrite, DirtyFrom, CurrentBuffer.Release(), reqId, traceId); + } CurrentBuffer = TBuffer::TPtr(Pool->Pop()); CurrentSector = CurrentBuffer->Data(); + StartOffset = DirtyTo; DirtyFrom = DirtyTo; } else if (flushAction) { flushAction->CostNs = 1; - BlockDevice.FlushAsync(flushAction, reqId); + if (WithDelayedFlush) { + BlockDeviceActions.push(MakeHolder(reqId, flushAction, PCtx->ActorSystem)); + } else { + BlockDevice.FlushAsync(flushAction, reqId); + } } } TBufferedWriter::TBufferedWriter(ui64 sectorSize, IBlockDevice &blockDevice, TDiskFormat &format, TBufferPool *pool, - TActorSystem *actorSystem, TDriveModel *driveModel, std::shared_ptr pCtx) + TActorSystem *actorSystem, TDriveModel *driveModel, std::shared_ptr pCtx, bool withDelayedFlush) : SectorSize(sectorSize) , BlockDevice(blockDevice) , Format(format) @@ -47,10 +105,12 @@ TBufferedWriter::TBufferedWriter(ui64 sectorSize, IBlockDevice &blockDevice, TDi , LastReqId(TReqId::InitialTSectorWriterReqId, 0) , DriveModel(driveModel) , PCtx(std::move(pCtx)) + , WithDelayedFlush(withDelayedFlush) { } void TBufferedWriter::SetupWithBuffer(ui64 startOffset, ui64 currentOffset, TBuffer *buffer, ui32 count, TReqId reqId) { + Y_VERIFY_S(!WithDelayedFlush, "SetupWithBuffer should not have DelayedFlush"); CurrentBuffer.Reset(buffer); CurrentSector = CurrentBuffer->Data() + (currentOffset - startOffset); @@ -68,7 +128,7 @@ ui8* TBufferedWriter::Seek(ui64 offset, ui32 count, ui32 reserve, TReqId reqId, Y_VERIFY_S(count > 0, PCtx->PDiskLogPrefix); Y_VERIFY_S(count <= 16, PCtx->PDiskLogPrefix); if (NextOffset != offset || NextOffset + SectorSize * reserve - StartOffset > CurrentBuffer->Size()) { - WriteBufferWithFlush(LastReqId, traceId, nullptr, chunkIdx); + WriteToBuffer(LastReqId, traceId, nullptr, chunkIdx); StartOffset = offset; DirtyFrom = offset; DirtyTo = offset; @@ -90,7 +150,7 @@ ui8* TBufferedWriter::RawData() const { void TBufferedWriter::Flush(TReqId reqId, NWilson::TTraceId *traceId, TCompletionAction *flushAction, ui32 chunkIdx) { - WriteBufferWithFlush(reqId, traceId, flushAction, chunkIdx); + WriteToBuffer(reqId, traceId, flushAction, chunkIdx); } void TBufferedWriter::MarkDirty() { @@ -101,7 +161,17 @@ void TBufferedWriter::Obliterate() { DirtyTo = DirtyFrom; } +void TBufferedWriter::WriteToBlockDevice() { + Y_VERIFY(WithDelayedFlush, "WriteToBlockDevice should be called only for buffers WithDelayedFlush"); + while (!BlockDeviceActions.empty()) { + const auto& action = BlockDeviceActions.front(); + action->DoCall(BlockDevice); + BlockDeviceActions.pop(); + } +} + TBufferedWriter::~TBufferedWriter() { + BlockDeviceActions = {}; Flush(LastReqId, nullptr, nullptr, 0); } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.h index a635914f5849..7cfc9b736779 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_writer.h @@ -10,9 +10,12 @@ #include "blobstorage_pdisk_request_id.h" #include +#include #include +#include + namespace NKikimr { namespace NPDisk { @@ -24,6 +27,48 @@ struct TBuffer; // BufferedWriter //////////////////////////////////////////////////////////////////////////// class TBufferedWriter { +private: + class TBlockDeviceAction { + protected: + TReqId ReqId; + public: + TBlockDeviceAction(const TReqId& ReqId) : ReqId(ReqId) {} + virtual void DoCall(IBlockDevice &BlockDevice) = 0; + virtual ~TBlockDeviceAction() = default; + }; + + class TBlockDeviceWrite : public TBlockDeviceAction { + public: + class TReleaseWriteAction { + public: + TActorSystem* ActorSystem; + TReleaseWriteAction() = delete; + TReleaseWriteAction(TActorSystem* actorSystem); + void operator()(TBuffer *buffer) const; + }; + TReleaseWriteAction Deleter; + std::unique_ptr Buffer; + ui64 StartOffset; + ui64 DirtyFrom; + ui64 DirtyTo; + NWilson::TTraceId TraceId; + TBlockDeviceWrite(const TReqId& ReqId, TBuffer::TPtr&& buffer, ui64 StartOffset, ui64 DirtyFrom, ui64 DirtyTo, NWilson::TTraceId *TraceId, TActorSystem* actorSystem); + virtual void DoCall(IBlockDevice &BlockDevice) override; + }; + class TBlockDeviceFlush : public TBlockDeviceAction { + public: + class TReleaseFlushAction { + public: + TActorSystem* ActorSystem; + TReleaseFlushAction() = delete; + TReleaseFlushAction(TActorSystem* actorSystem); + void operator()(TCompletionAction *action) const; + }; + TReleaseFlushAction Deleter; + std::unique_ptr Completion; + TBlockDeviceFlush(const TReqId& ReqId, TCompletionAction* Completion, TActorSystem* actorSystem); + virtual void DoCall(IBlockDevice &BlockDevice) override; + }; protected: ui64 SectorSize; IBlockDevice &BlockDevice; @@ -45,11 +90,14 @@ class TBufferedWriter { std::shared_ptr PCtx; - void WriteBufferWithFlush(TReqId reqId, NWilson::TTraceId *traceId, + bool WithDelayedFlush; + std::queue> BlockDeviceActions; + + void WriteToBuffer(TReqId reqId, NWilson::TTraceId *traceId, TCompletionAction *flushAction, ui32 chunkIdx); public: TBufferedWriter(ui64 sectorSize, IBlockDevice &blockDevice, TDiskFormat &format, TBufferPool *pool, - TActorSystem *actorSystem, TDriveModel *driveModel, std::shared_ptr pCtx); + TActorSystem *actorSystem, TDriveModel *driveModel, std::shared_ptr pCtx, bool withDelayedFlush); void SetupWithBuffer(ui64 startOffset, ui64 currentOffset, TBuffer *buffer, ui32 count, TReqId reqId); ui8* Seek(ui64 offset, ui32 count, ui32 reserve, TReqId reqId, NWilson::TTraceId *traceId, ui32 chunkIdx); ui8* Get() const; @@ -58,6 +106,7 @@ class TBufferedWriter { ui32 chunkIdx); void MarkDirty(); void Obliterate(); + void WriteToBlockDevice(); ~TBufferedWriter(); }; @@ -112,7 +161,7 @@ class TSectorWriter { TSectorWriter(TPDiskMon &mon, IBlockDevice &blockDevice, TDiskFormat &format, ui64 &nonce, const TKey &key, TBufferPool *pool, ui64 firstSectorIdx, ui64 endSectorIdx, ui64 dataMagic, ui32 chunkIdx, TLogChunkInfo *logChunkInfo, ui64 sectorIdx, TBuffer *buffer, std::shared_ptr pCtx, - TDriveModel *driveModel, bool enableEncrytion) + TDriveModel *driveModel, bool enableEncrytion, bool withDelayedFlush = false) : Mon(mon) , BlockDevice(blockDevice) , Format(format) @@ -134,7 +183,7 @@ class TSectorWriter { { Y_VERIFY_S(!LogChunkInfo || LogChunkInfo->ChunkIdx == ChunkIdx, PCtx->PDiskLogPrefix); BufferedWriter.Reset(new TBufferedWriter(Format.SectorSize, BlockDevice, Format, pool, - PCtx->ActorSystem, DriveModel, PCtx)); + PCtx->ActorSystem, DriveModel, PCtx, withDelayedFlush)); Cypher.SetKey(key); Cypher.StartMessage(Nonce); @@ -428,7 +477,7 @@ class TSectorWriter { } else { *Mon.BandwidthPLogRecordHeader += sizeof(TLogPageHeader); } - P_LOG(PRI_DEBUG, BPD61, SelfInfo() << " LogPageHeader", + P_LOG(PRI_DEBUG, BPD61, SelfInfo() << " LogPageHeader", (ChunkIdx, ChunkIdx), (SectorIdx, SectorIdx), (Nonce, Nonce)); Write(&header, sizeof(TLogPageHeader), reqId, traceId); } @@ -466,6 +515,14 @@ class TSectorWriter { } } + void WriteToBlockDevice() { + BufferedWriter->WriteToBlockDevice(); + } + + THolder ExportBufferedWriter() { + return std::move(BufferedWriter); + } + protected: void FinalizeWrite(ui64 size, TReqId reqId, NWilson::TTraceId *traceId) { CurrentPosition += size; diff --git a/ydb/core/blobstorage/pdisk/ut/ya.make b/ydb/core/blobstorage/pdisk/ut/ya.make index 0490d8f73b34..b7cf4a611fb4 100644 --- a/ydb/core/blobstorage/pdisk/ut/ya.make +++ b/ydb/core/blobstorage/pdisk/ut/ya.make @@ -21,7 +21,7 @@ PEERDIR( ydb/core/testlib/actors ) -IF (YDB_ENABLE_PDISK_SHRED) +IF (YDB_ENABLE_PDISK_SHRED) CFLAGS( -DENABLE_PDISK_SHRED ) @@ -45,12 +45,8 @@ SRCS( mock/pdisk_mock.cpp ) -IF (BUILD_TYPE != "DEBUG") - SRCS( - blobstorage_pdisk_ut_yard.cpp - ) -ELSE () - MESSAGE(WARNING "It takes too much time to run test in DEBUG mode, some tests are skipped") -ENDIF () +SRCS( + blobstorage_pdisk_ut_yard.cpp +) END() diff --git a/ydb/core/blobstorage/pdisk/ya.make b/ydb/core/blobstorage/pdisk/ya.make index e9bf4478f0cb..3b668faf2ad7 100644 --- a/ydb/core/blobstorage/pdisk/ya.make +++ b/ydb/core/blobstorage/pdisk/ya.make @@ -1,11 +1,11 @@ LIBRARY() -IF (YDB_ENABLE_PDISK_SHRED) +IF (YDB_ENABLE_PDISK_SHRED) CFLAGS( -DENABLE_PDISK_SHRED ) ENDIF() -IF (YDB_DISABLE_PDISK_ENCRYPTION) +IF (YDB_DISABLE_PDISK_ENCRYPTION) CFLAGS( -DDISABLE_PDISK_ENCRYPTION ) @@ -51,6 +51,7 @@ SRCS( blobstorage_pdisk_delayed_cost_loop.cpp blobstorage_pdisk_driveestimator.cpp blobstorage_pdisk_drivemodel_db.cpp + blobstorage_pdisk_encryption_threads.cpp blobstorage_pdisk_impl.cpp blobstorage_pdisk_impl_http.cpp blobstorage_pdisk_impl_log.cpp diff --git a/ydb/core/protos/blobstorage_pdisk_config.proto b/ydb/core/protos/blobstorage_pdisk_config.proto index 0488d1488e85..75ee5fcc0a9b 100644 --- a/ydb/core/protos/blobstorage_pdisk_config.proto +++ b/ydb/core/protos/blobstorage_pdisk_config.proto @@ -97,4 +97,5 @@ message TPDiskConfig { optional bool PlainDataChunks = 2005; reserved 2006; // optional uint32 SlotSizeInUnits = 2006; optional bool SeparateHugePriorities = 2007; + optional uint32 EncryptionThreadCount = 2008; }; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 91bfa39e1788..1218b93b9982 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1697,6 +1697,16 @@ message TImmediateControlsConfig { MinValue: 0, MaxValue: 1, DefaultValue: 0 }]; + optional uint64 EncryptionThreadCountHDD = 7 [(ControlOptions) = { + Description: "Enables separate chunkwrites encryption threads for HDD disks only", + MinValue: 0, + MaxValue: 16, + DefaultValue: 0 }]; + optional uint64 EncryptionThreadCountSSD = 8 [(ControlOptions) = { + Description: "Enables separate chunkwrites encryption threads for SSD, NVME", + MinValue: 0, + MaxValue: 16, + DefaultValue: 0 }]; } diff --git a/ydb/library/actors/core/out.txt b/ydb/library/actors/core/out.txt new file mode 100644 index 000000000000..9ee56a3b2aed --- /dev/null +++ b/ydb/library/actors/core/out.txt @@ -0,0 +1,184 @@ +diff --git a/ydb/library/actors/core/process_stats.cpp b/ydb/library/actors/core/process_stats.cpp +index f9028537c5f..5c5ca28fe66 100644 +--- a/ydb/library/actors/core/process_stats.cpp ++++ b/ydb/library/actors/core/process_stats.cpp +@@ -40,13 +40,18 @@ namespace NActors { + return 1.f; + #endif + } ++ ++ void ConvertFromKb(ui64& value) { ++ value *= 1024; ++ } + } + + bool TProcStat::Fill(pid_t pid) { ++ TString strPid(ToString(pid)); ++ TString str; ++ + try { +- TString strPid(ToString(pid)); + TFileInput proc("/proc/" + strPid + "/status"); +- TString str; + while (proc.ReadLine(str)) { + if (ExtractVal(str, "VmRSS:", Rss)) + continue; +@@ -58,8 +63,21 @@ namespace NActors { + // Convert from kB to bytes + Rss *= 1024; + +- float tickPerMillisec = TicksPerMillisec(); ++ } catch (...) { ++ } ++ ++ Vsize = 0; ++ Utime = 0; ++ Stime = 0; ++ MinFlt = 0; ++ MajFlt = 0; ++ SystemUptime = {}; ++ Uptime = {}; ++ NumThreads = 0; ++ ++ float ticksPerMillisec = TicksPerMillisec(); + ++ try { + TFileInput procStat("/proc/" + strPid + "/stat"); + procStat.ReadLine(str); + if (!str.empty()) { +@@ -70,14 +88,20 @@ namespace NActors { + &Pid, &State, &Ppid, &Pgrp, &Session, &TtyNr, &TPgid, &Flags, &MinFlt, &CMinFlt, + &MajFlt, &CMajFlt, &Utime, &Stime, &CUtime, &CStime, &Priority, &Nice, &NumThreads, + &ItRealValue, &StartTime, &Vsize, &RssPages, &RssLim); +- Utime /= tickPerMillisec; +- Stime /= tickPerMillisec; +- CUtime /= tickPerMillisec; +- CStime /= tickPerMillisec; +- SystemUptime = ::Uptime(); +- Uptime = SystemUptime - TDuration::MilliSeconds(StartTime / TicksPerMillisec()); ++ Utime /= ticksPerMillisec; ++ Stime /= ticksPerMillisec; ++ CUtime /= ticksPerMillisec; ++ CStime /= ticksPerMillisec; + } ++ SystemUptime = ::Uptime(); ++ Uptime = SystemUptime - TDuration::MilliSeconds(StartTime / ticksPerMillisec); ++ } catch (...) { ++ } ++ ++ FileRss = 0; ++ AnonRss = 0; + ++ try { + TFileInput statm("/proc/" + strPid + "/statm"); + statm.ReadLine(str); + TVector fields; +@@ -91,34 +115,71 @@ namespace NActors { + FileRss = shared * PageSize; + AnonRss = (resident - shared) * PageSize; + } ++ } catch (...) { ++ } ++ ++ CGroupMemLim = 0; ++ ++ try { ++ bool isV2 = NFs::Exists("/sys/fs/cgroup/cgroup.controllers"); + + TFileInput cgroup("/proc/" + strPid + "/cgroup"); + TString line; + TString memoryCGroup; +- while (cgroup.ReadLine(line) > 0) { ++ while (cgroup.ReadLine(line)) { ++ TVector fields; + StringSplitter(line).Split(':').Collect(&fields); +- if (fields.size() > 2 && fields[1] == "memory") { +- memoryCGroup = fields[2]; +- break; ++ if (fields.size() <= 2) { ++ continue; ++ } ++ if (isV2) { ++ if (fields[0] == "0") { ++ memoryCGroup = fields[2]; ++ break; ++ } ++ } else { ++ if (fields[1] == "memory") { ++ memoryCGroup = fields[2]; ++ break; ++ } + } + } + +- TString cgroupFileName = "/sys/fs/cgroup/memory" + memoryCGroup + "/memory.limit_in_bytes"; +- if (!NFs::Exists(cgroupFileName)) { +- // fallback for mk8s +- cgroupFileName = "/sys/fs/cgroup/memory/memory.limit_in_bytes"; +- } +- TFileInput limit(cgroupFileName); +- if (limit.ReadLine(line) > 0) { +- CGroupMemLim = FromString(line); +- if (CGroupMemLim > (1ULL << 40)) { +- CGroupMemLim = 0; ++ if (!memoryCGroup.empty() && memoryCGroup != "/") { ++ TString cgroupFileName; ++ if (isV2) { ++ cgroupFileName = "/sys/fs/cgroup" + memoryCGroup + "/memory.max"; ++ } else { ++ cgroupFileName = "/sys/fs/cgroup/memory" + memoryCGroup + "/memory.limit_in_bytes"; ++ // fallback for mk8s ++ if (!NFs::Exists(cgroupFileName)) { ++ cgroupFileName = "/sys/fs/cgroup/memory/memory.limit_in_bytes"; ++ } ++ } ++ TFileInput limit(cgroupFileName); ++ if (limit.ReadLine(line) && line != "max") { ++ CGroupMemLim = FromString(line); ++ if (CGroupMemLim > (1ULL << 40)) { ++ CGroupMemLim = 0; ++ } + } + } ++ } catch (...) { ++ } + ++ try { ++ TFileInput memInfo("/proc/meminfo"); ++ while (memInfo.ReadLine(str)) { ++ if (ExtractVal(str, "MemTotal:", MemTotal)) ++ continue; ++ if (ExtractVal(str, "MemAvailable:", MemAvailable)) ++ continue; ++ } ++ ConvertFromKb(MemTotal); ++ ConvertFromKb(MemAvailable); + } catch (...) { +- return false; + } ++ + return true; + } + +@@ -220,7 +281,7 @@ namespace { + *MajorPageFaults = procStat.MajFlt; + *UptimeSeconds = procStat.Uptime.Seconds(); + *NumThreads = procStat.NumThreads; +- *SystemUptimeSeconds = procStat.Uptime.Seconds(); ++ *SystemUptimeSeconds = procStat.SystemUptime.Seconds(); + } + + private: +diff --git a/ydb/library/actors/core/process_stats.h b/ydb/library/actors/core/process_stats.h +index 5681f0eb1a8..4091a61b782 100644 +--- a/ydb/library/actors/core/process_stats.h ++++ b/ydb/library/actors/core/process_stats.h +@@ -43,6 +43,8 @@ namespace NActors { + ui64 FileRss; + ui64 AnonRss; + ui64 CGroupMemLim = 0; ++ ui64 MemTotal; ++ ui64 MemAvailable; + + TDuration Uptime; + TDuration SystemUptime; diff --git a/ydb/library/yaml_config/protos/config.proto b/ydb/library/yaml_config/protos/config.proto index 9ea03d2cf4af..8c87c786bc10 100644 --- a/ydb/library/yaml_config/protos/config.proto +++ b/ydb/library/yaml_config/protos/config.proto @@ -73,8 +73,10 @@ message TExtendedHostConfigDrive { optional NKikimrBlobStorage.TPDiskConfig PDiskConfig = 6 [(NMarkers.CopyTo) = "THostConfigDrive"]; optional uint64 ExpectedSlotCount = 7; + optional uint32 EncryptionThreadCount = 11; } + message THosts { optional uint32 NodeId = 1; optional uint32 Port = 2; diff --git a/ydb/library/yaml_config/yaml_config_parser.cpp b/ydb/library/yaml_config/yaml_config_parser.cpp index eaf8dc6ed88e..95f87266410c 100644 --- a/ydb/library/yaml_config/yaml_config_parser.cpp +++ b/ydb/library/yaml_config/yaml_config_parser.cpp @@ -603,6 +603,9 @@ namespace NKikimr::NYaml { if (drive.HasExpectedSlotCount()) { drive.MutablePDiskConfig()->SetExpectedSlotCount(drive.GetExpectedSlotCount()); } + if (drive.HasEncryptionThreadCount()) { + drive.MutablePDiskConfig()->SetEncryptionThreadCount(drive.GetEncryptionThreadCount()); + } } } } diff --git a/ydb/tools/cfg/base.py b/ydb/tools/cfg/base.py index 44add276bfb7..0f7e38e7f43f 100644 --- a/ydb/tools/cfg/base.py +++ b/ydb/tools/cfg/base.py @@ -62,12 +62,13 @@ def merge_with_default(dft, override): class KiKiMRDrive(object): - def __init__(self, type, path, shared_with_os=False, expected_slot_count=None, kind=None, pdisk_config=None): + def __init__(self, type, path, shared_with_os=False, expected_slot_count=None, kind=None, pdisk_config=None, encryption_thread_count=None): self.type = type self.path = path self.shared_with_os = shared_with_os self.pdisk_config = pdisk_config self.expected_slot_count = expected_slot_count + self.encryption_thread_count = encryption_thread_count self.kind = kind def __eq__(self, other): @@ -77,11 +78,12 @@ def __eq__(self, other): and self.shared_with_os == other.shared_with_os and self.expected_slot_count == other.expected_slot_count and self.kind == other.kind + and self.encryption_thread_count == other.encryption_thread_count and self.pdisk_config == other.pdisk_config ) def __hash__(self): - return hash("\0".join(map(str, (self.type, self.path, self.shared_with_os, self.expected_slot_count, self.kind, self.pdisk_config)))) + return hash("\0".join(map(str, (self.type, self.path, self.shared_with_os, self.expected_slot_count, self.encryption_thread_count, self.kind, self.pdisk_config)))) Domain = collections.namedtuple( diff --git a/ydb/tools/cfg/dynamic.py b/ydb/tools/cfg/dynamic.py index 460959c6a348..732c7a60e9bb 100644 --- a/ydb/tools/cfg/dynamic.py +++ b/ydb/tools/cfg/dynamic.py @@ -187,9 +187,16 @@ def add_drive(array, drive): Type=drive.type, Kind=drive.kind, ) + pc = None if drive.expected_slot_count is not None: pc = pdisk_config.TPDiskConfig(ExpectedSlotCount=drive.expected_slot_count) kwargs.update(PDiskConfig=pc) + if drive.encryption_thread_count is not None: + if pc is None: + pc = pdisk_config.TPDiskConfig(EncryptionThreadCount=drive.encryption_thread_count) + else: + pc.EncryptionThreadCount = drive.encryption_thread_count + kwargs.update(PDiskConfig=pc) array.add(**kwargs) for host_config in self._cluster_details.host_configs: diff --git a/ydb/tools/cfg/validation.py b/ydb/tools/cfg/validation.py index 3ab5c311aabf..adce6568d4f5 100644 --- a/ydb/tools/cfg/validation.py +++ b/ydb/tools/cfg/validation.py @@ -155,6 +155,7 @@ "path": dict(type="string", minLength=1), "shared_with_os": dict(type="boolean"), "expected_slot_count": dict(type="integer"), + "encryption_thread_count": dict(type="integer"), "pdisk_config": { "type": "object", "additionalProperties": True,