Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,16 @@ struct TEventTypeField {
PROBE(PDiskChunkReadPieceComplete, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, ui64, double), \
NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \
PROBE(PDiskChunkWritePieceComplete, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, ui64, double), \
NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \
PROBE(PDiskChunkWriteAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, double, ui64, bool, ui64, ui64), \
NAMES("pdisk", "reqId", "creationTimeSec", "owner", "isFast", "priorityClass", "size")) \
PROBE(PDiskChunkWriteLastPieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
PROBE(PDiskChunkWritePieceAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui32, ui64, ui64), \
NAMES("pdisk", "pieceIdx", "offset", "size")) \
PROBE(PDiskChunkWritePieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
TYPES(TPDiskIdField, ui64, ui64, ui64, ui64), \
NAMES("pdisk", "owner", "chunkIdx", "pieceOffset", "pieceSize")) \
PROBE(PDiskLogWriteComplete, GROUPS("PDisk", "PDiskRequest"), \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ class TRealBlockDevice : public IBlockDevice {
}

Y_VERIFY_S(PCtx->ActorSystem->AppData<TAppData>(), PCtx->PDiskLogPrefix);
Y_VERIFY_S(PCtx->ActorSystem->AppData<TAppData>()->IoContextFactory, PCtx->PDiskLogPrefix);
Y_VERIFY_S(PCtx->ActorSystem->AppData<TAppData>()->IoContextFactory, PCtx->PDiskLogPrefix);
auto *factory = PCtx->ActorSystem->AppData<TAppData>()->IoContextFactory;
IoContext = factory->CreateAsyncIoContext(Path, PCtx->PDiskId, Flags, SectorMap);
if (Flags & TDeviceMode::UseSpdk) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <util/system/thread.h>
#include <util/thread/lfqueue.h>

namespace NKikimr {
namespace NPDisk {

class TChunkWritePiece;

class TChunkWritePieceQueue {
public:
TChunkWritePieceQueue() = default;
bool Empty() const {
return SizeCounter.load(std::memory_order_relaxed) == 0;
}
size_t Size() const {
return SizeCounter.load(std::memory_order_relaxed);
}
TChunkWritePiece* Dequeue() {
TChunkWritePiece* item;
QueueImpl.Dequeue(&item);
SizeCounter.fetch_sub(1, std::memory_order_relaxed);
return item;
}
void Enqueue(TChunkWritePiece* item) {
QueueImpl.Enqueue(item);
SizeCounter.fetch_add(1, std::memory_order_relaxed);
}
private:
TLockFreeQueue<TChunkWritePiece*> QueueImpl;
std::atomic<size_t> SizeCounter = 0;
};

} // NPDisk
} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,55 @@ namespace NPDisk {
// Completion actions
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Chunk write completion actions
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////


TCompletionChunkWritePiece::TCompletionChunkWritePiece(TChunkWritePiece* piece, TCompletionChunkWrite* cumulativeCompletion)
: TCompletionAction()
, Piece(piece)
, CumulativeCompletion(cumulativeCompletion)
, Span(piece->Span.CreateChild(TWilson::PDiskDetailed, "PDisk.ChunkWritePiece.CompletionPart"))
, ActorSystem(Piece->PDisk->PCtx->ActorSystem)
{
}

TCompletionChunkWritePiece::~TCompletionChunkWritePiece() {
if (CumulativeCompletion) {
CumulativeCompletion->RemovePart(ActorSystem);
}
}

void TCompletionChunkWritePiece::Exec(TActorSystem *actorSystem) {
Span.Event("PDisk.CompletionChunkWritePart.Exec");
Y_VERIFY(actorSystem);
Y_VERIFY(CumulativeCompletion);
if (TCompletionAction::Result != EIoResult::Ok) {
Release(actorSystem);
return;
}

double deviceTimeMs = HPMilliSecondsFloat(GetTime - SubmitTime);
//TODO: Fork and join this orbit from ChunkWrite orbit.
LWTRACK(PDiskChunkWritePieceComplete, Orbit, Piece->PDisk->PCtx->PDiskId, Piece->PieceSize, Piece->PieceShift, deviceTimeMs);

CumulativeCompletion->CompletePart(actorSystem);
CumulativeCompletion = nullptr;

Span.Event("PDisk.CompletionChunkWritePart.ExecStop");
delete this;
}

void TCompletionChunkWritePiece::Release(TActorSystem *actorSystem) {
Y_UNUSED(actorSystem);
if (CumulativeCompletion) {
CumulativeCompletion->ErrorReason = ErrorReason;
}
Span.EndError("PDisk.CompletionChunkWritePart.Release");
delete this;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Log write completion action
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
56 changes: 50 additions & 6 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ class TCompletionChunkWrite : public TCompletionAction {
std::function<void()> OnDestroy;
TReqId ReqId;
NWilson::TSpan Span;

std::atomic<ui32> PartsStarted;
std::atomic<ui32> PartsRemoved;
std::atomic<ui32> PartsWritten;
public:
bool IsReplied = false;
ui32 Pieces;
TEvChunkWrite::TPartsPtr Parts;
std::optional<TAlignedData> Buffer;

TCompletionChunkWrite(const TActorId &recipient, TEvChunkWriteResult *event,
TPDiskMon *mon, ui32 pdiskId, NHPTimer::STime startTime, size_t sizeBytes,
ui8 priorityClass, std::function<void()> onDestroy, TReqId reqId, NWilson::TSpan&& span)
Expand Down Expand Up @@ -164,16 +167,57 @@ class TCompletionChunkWrite : public TCompletionAction {
if (Mon) {
Mon->GetWriteCounter(PriorityClass)->CountResponse();
}

delete this;
}

void Release(TActorSystem *actorSystem) override {
Event->Status = NKikimrProto::CORRUPTED;
Event->ErrorReason = ErrorReason;
actorSystem->Send(Recipient, Event.Release());
Span.EndError(ErrorReason);
if (!IsReplied) {
Event->Status = NKikimrProto::CORRUPTED;
Event->ErrorReason = ErrorReason;
actorSystem->Send(Recipient, Event.Release());
Span.EndError(ErrorReason);
}
delete this;
}

void AddPart() {
PartsStarted++;
}

bool AllPartsStarted() {
return PartsStarted == Pieces;
}

void RemovePart(TActorSystem *actorSystem) {
ui32 old = PartsRemoved.fetch_add(1, std::memory_order::seq_cst);
if (old + 1 == Pieces) {
if (PartsWritten.load(std::memory_order::seq_cst) == Pieces) {
Exec(actorSystem);
} else {
Release(actorSystem);
}
}
}

void CompletePart(TActorSystem *actorSystem) {
PartsWritten++;
RemovePart(actorSystem);
}
};

class TChunkWritePiece;

class TCompletionChunkWritePiece : public TCompletionAction {
TChunkWritePiece* Piece;
TCompletionChunkWrite* CumulativeCompletion;
NWilson::TSpan Span;
TActorSystem* ActorSystem;
public:
TCompletionChunkWritePiece(NKikimr::NPDisk::TChunkWritePiece* piece, TCompletionChunkWrite* cumulativeCompletion);
void Exec(TActorSystem *actorSystem) override;
void Release(TActorSystem *actorSystem) override;
virtual ~TCompletionChunkWritePiece();
};

class TCompletionLogWrite : public TCompletionAction {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ struct TPDiskConfig : public TThrRefBase {
NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColorBorder = NKikimrBlobStorage::TPDiskSpaceColor::GREEN;

ui32 CompletionThreadsCount = 1;
ui32 EncryptionThreadCount = 0;
bool UseNoopScheduler = false;

bool PlainDataChunks = false;
Expand Down Expand Up @@ -419,6 +420,9 @@ struct TPDiskConfig : public TThrRefBase {
if (cfg->HasCompletionThreadsCount()) {
CompletionThreadsCount = cfg->GetCompletionThreadsCount();
}
if (cfg->HasEncryptionThreadCount()) {
EncryptionThreadCount = cfg->GetEncryptionThreadCount();
}

if (cfg->HasUseNoopScheduler()) {
UseNoopScheduler = cfg->GetUseNoopScheduler();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ struct TDiskFormat {
str << " MagicFormatChunk: " << MagicFormatChunk << x;
str << " ChunkSize: " << ChunkSize << " bytes (" << (ChunkSize / 1000000ull) << " MB)" << x;
str << " SectorSize: " << SectorSize << x;
str << " SectorPayloadSize: " << SectorPayloadSize() << x;
str << " SysLogSectorCount: " << SysLogSectorCount << x;
str << " SystemChunkCount: " << SystemChunkCount << x;
str << " FormatText: \"" << FormatText << "\"" << x;
Expand Down
105 changes: 105 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_encryption_threads.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#include "blobstorage_pdisk_encryption_threads.h"
#include "blobstorage_pdisk_requestimpl.h"

#include <ydb/core/util/hp_timer_helpers.h>
#include <ydb/library/yverify_stream/yverify_stream.h>

namespace NKikimr {
namespace NPDisk {

TEncryptionThread::TEncryptionThread(TString name) : Name(name) {}

size_t TEncryptionThread::GetQueuedActions() {
return Queue.GetWaitingSize();
}

void* TEncryptionThread::ThreadProc() {
SetCurrentThreadName(Name.data());
bool isWorking = true;

while (isWorking) {
TAtomicBase itemCount = Queue.GetWaitingSize();
if (itemCount > 0) {
for (TAtomicBase idx = 0; idx < itemCount; ++idx) {
TChunkWritePiece *piece = Queue.Pop();
if (piece == nullptr) {
isWorking = false;
} else {
piece->Process();
}
}
} else {
Queue.ProducedWaitI();
}
}
return nullptr;
}

void TEncryptionThread::Schedule(TChunkWritePiece *piece) {
Queue.Push(piece);
}

TEncryptionThreads::TEncryptionThreads(size_t threadsCount) {
Y_VERIFY(threadsCount <= MAX_THREAD_COUNT, "too many encryption threads");
Threads.reserve(16);
for (size_t i = 0; i < std::min(threadsCount, size_t(2)); i++) {
Threads.push_back(MakeHolder<TEncryptionThread>(TStringBuilder() << "PdEncrypt" << i));
Threads.back()->Start();
}
AvailableThreads = threadsCount;
}

void TEncryptionThreads::SetThreadCount(size_t threadsCount) {
Y_VERIFY(threadsCount <= MAX_THREAD_COUNT, "too many encryption threads");
for (size_t i = Threads.size(); i < threadsCount; i++) {
Threads.push_back(MakeHolder<TEncryptionThread>(TStringBuilder() << "PdEncrypt" << i));
Threads.back()->Start();
}
AvailableThreads = threadsCount;
}


void TEncryptionThreads::StopWork() {
for (auto& thread : Threads) {
thread->Schedule(nullptr);
}
}

void TEncryptionThreads::Join() {
for (auto& thread : Threads) {
thread->Join();
}
Threads.clear();
}

void TEncryptionThreads::Stop() {
StopWork();
Join();
}

void TEncryptionThreads::Schedule(TChunkWritePiece* piece) noexcept {
if (!piece) {
StopWork();
return;
}

if (!AvailableThreads) {
piece->Process();
return;
}

auto min_it = Threads.begin();
auto minQueueSize = (*min_it)->GetQueuedActions();
for (auto it = Threads.begin() + 1; it != Threads.begin() + AvailableThreads; ++it) {
auto queueSize = (*it)->GetQueuedActions();
if (queueSize < minQueueSize) {
minQueueSize = queueSize;
min_it = it;
}
}
Y_VERIFY(min_it != Threads.end());
(*min_it)->Schedule(piece);
}

} // NPDisk
} // NKikimr
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "blobstorage_pdisk_completion.h"
#include "blobstorage_pdisk_util_countedqueuemanyone.h"

#include <util/system/thread.h>
#include <util/thread/lfqueue.h>

#include <queue>
#include <variant>

namespace NKikimr {
namespace NPDisk {

class TChunkWritePiece;

class TEncryptionThread : public ISimpleThread {
public:
TEncryptionThread(TString name);
void *ThreadProc() override;
void Schedule(TChunkWritePiece *piece);
size_t GetQueuedActions();
private:
TCountedQueueManyOne<TChunkWritePiece, 4 << 10> Queue;
TString Name;
};
class TEncryptionThreads {
public:
static const size_t MAX_THREAD_COUNT = 4;
TVector<THolder<TEncryptionThread>> Threads;
size_t AvailableThreads;

TEncryptionThreads(size_t threadsCount);
void SetThreadCount(size_t threadsCount);

// Schedule action execution
// pass action = nullptr to quit
void Schedule(TChunkWritePiece* piece) noexcept;

void StopWork();
void Join();
void Stop();
};


} // NPDisk
} // NKikimr
Loading
Loading