diff --git a/ydb/apps/ydbd/export.cpp b/ydb/apps/ydbd/export.cpp index de0efb5a834f..218b6dae97dc 100644 --- a/ydb/apps/ydbd/export.cpp +++ b/ydb/apps/ydbd/export.cpp @@ -1,6 +1,7 @@ #include "export.h" #include +#include NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt( const IExport::TTask& task, const IExport::TTableColumns& columns) const @@ -22,5 +23,11 @@ NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3( #endif } +NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToFs( + const IExport::TTask& task, const IExport::TTableColumns& columns) const +{ + return new NKikimr::NDataShard::TFsExport(task, columns); +} + void TDataShardExportFactory::Shutdown() { } diff --git a/ydb/apps/ydbd/export.h b/ydb/apps/ydbd/export.h index 9d077f16aa5e..e7613aaff639 100644 --- a/ydb/apps/ydbd/export.h +++ b/ydb/apps/ydbd/export.h @@ -8,5 +8,6 @@ class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory { public: IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; + IExport* CreateExportToFs(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; void Shutdown() override; }; diff --git a/ydb/apps/ydbd/export/export.cpp b/ydb/apps/ydbd/export/export.cpp index 3f85de70e33e..c797b3fdd2dc 100644 --- a/ydb/apps/ydbd/export/export.cpp +++ b/ydb/apps/ydbd/export/export.cpp @@ -1,6 +1,7 @@ #include "export.h" #include +#include NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt( const IExport::TTask& task, const IExport::TTableColumns& columns) const @@ -22,6 +23,12 @@ NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3( #endif } +NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToFs( + const IExport::TTask& task, const IExport::TTableColumns& columns) const +{ + return new NKikimr::NDataShard::TFsExport(task, columns); +} + void TDataShardExportFactory::Shutdown() { // No cleanup required for TDataShardExportFactory. } diff --git a/ydb/apps/ydbd/export/export.h b/ydb/apps/ydbd/export/export.h index 9d077f16aa5e..e7613aaff639 100644 --- a/ydb/apps/ydbd/export/export.h +++ b/ydb/apps/ydbd/export/export.h @@ -8,5 +8,6 @@ class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory { public: IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; + IExport* CreateExportToFs(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; void Shutdown() override; }; diff --git a/ydb/core/driver_lib/run/export.cpp b/ydb/core/driver_lib/run/export.cpp index 75d4386b4e17..b3c1b913d1b5 100644 --- a/ydb/core/driver_lib/run/export.cpp +++ b/ydb/core/driver_lib/run/export.cpp @@ -4,6 +4,7 @@ #include #include +#include NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt( const IExport::TTask& task, const IExport::TTableColumns& columns) const @@ -29,6 +30,12 @@ NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3( #endif } +NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToFs( + const IExport::TTask& task, const IExport::TTableColumns& columns) const +{ + return new NKikimr::NDataShard::TFsExport(task, columns); +} + void TDataShardExportFactory::Shutdown() { #ifndef KIKIMR_DISABLE_YT ShutdownYT(); diff --git a/ydb/core/driver_lib/run/export.h b/ydb/core/driver_lib/run/export.h index 5f422b5f4011..1898fa00ba93 100644 --- a/ydb/core/driver_lib/run/export.h +++ b/ydb/core/driver_lib/run/export.h @@ -8,5 +8,6 @@ class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory { public: IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; + IExport* CreateExportToFs(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; void Shutdown() override; }; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 6eaa71c40e15..425c0dcd6be3 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -5,6 +5,7 @@ import "ydb/core/protos/channel_purpose.proto"; import "ydb/core/protos/compaction.proto"; import "ydb/core/protos/filestore_config.proto"; import "ydb/core/protos/follower_group.proto"; +import "ydb/core/protos/fs_settings.proto"; import "ydb/core/protos/index_builder.proto"; import "ydb/core/protos/pqconfig.proto"; import "ydb/core/protos/replication.proto"; @@ -1320,6 +1321,7 @@ message TBackupTask { oneof Settings { TYTSettings YTSettings = 4; NKikimrSchemeOp.TS3Settings S3Settings = 9; + NKikimrSchemeOp.TFSSettings FSSettings = 20; } optional TPathDescription Table = 10; // for further restore @@ -1367,6 +1369,7 @@ message TRestoreTask { oneof Settings { NKikimrSchemeOp.TS3Settings S3Settings = 6; + NKikimrSchemeOp.TFSSettings FSSettings = 9; } optional bool ValidateChecksums = 7; // currently available for s3 diff --git a/ydb/core/protos/fs_settings.proto b/ydb/core/protos/fs_settings.proto new file mode 100644 index 000000000000..14414ce1ddae --- /dev/null +++ b/ydb/core/protos/fs_settings.proto @@ -0,0 +1,10 @@ +package NKikimrSchemeOp; +option java_package = "ru.yandex.kikimr.proto"; + +message TFSSettings { + optional string BasePath = 1; // Base path on the file system (e.g., /mnt/exports) + optional string Path = 2; // Relative path for this specific backup/restore operation +} + + + diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index 2eaea64c1951..edb2b7491d78 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -82,6 +82,7 @@ SRCS( flat_scheme_op.proto flat_tx_scheme.proto follower_group.proto + fs_settings.proto grpc.proto grpc_pq_old.proto grpc_status_proxy.proto diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h index 793af8526dae..6dfe55c08fe6 100644 --- a/ydb/core/testlib/basics/appdata.h +++ b/ydb/core/testlib/basics/appdata.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,12 @@ namespace NKikimr { #endif } + IExport* CreateExportToFs( + const IExport::TTask& task, const IExport::TTableColumns& columns) const override + { + return new NDataShard::TFsExport(task, columns); + } + void Shutdown() override { } }; diff --git a/ydb/core/tx/columnshard/backup/iscan/iscan.cpp b/ydb/core/tx/columnshard/backup/iscan/iscan.cpp index 819000fd1190..a29cfb023865 100644 --- a/ydb/core/tx/columnshard/backup/iscan/iscan.cpp +++ b/ydb/core/tx/columnshard/backup/iscan/iscan.cpp @@ -67,6 +67,8 @@ TConclusion> CreateIScanExportUploader(const TAct break; case NKikimrSchemeOp::TBackupTask::SETTINGS_NOT_SET: return TConclusionStatus::Fail("Internal error. It is not possible to have empty settings for backup here"); + default: + return TConclusionStatus::Fail("Internal error. Unsupported type of backup task settings"); } auto createUploader = [subscriberActorId = subscriberActorId, txId = txId, exp]() { @@ -331,4 +333,4 @@ std::unique_ptr CreateExportUploaderActor(const TActorId& subscriberActo return std::make_unique(backupTask, exportFactory, tableColumns, subscriberActorId, txId); } -} // namespace NKikimr::NColumnShard::NBackup \ No newline at end of file +} // namespace NKikimr::NColumnShard::NBackup diff --git a/ydb/core/tx/datashard/backup_unit.cpp b/ydb/core/tx/datashard/backup_unit.cpp index 0350299defa8..531d3d5cf123 100644 --- a/ydb/core/tx/datashard/backup_unit.cpp +++ b/ydb/core/tx/datashard/backup_unit.cpp @@ -4,8 +4,10 @@ #include "export_iface.h" #include "export_scan.h" #include "export_s3.h" +#include "export_fs.h" #include +#include namespace NKikimr { namespace NDataShard { @@ -74,6 +76,31 @@ class TBackupUnit : public TBackupRestoreUnitBase Abort(op, ctx, "Exports to S3 are disabled"); return false; } + } else if (backup.HasFSSettings()) { + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::Run - FS export" + << ", tableId# " << tableId + << ", basePath# " << backup.GetFSSettings().GetBasePath() + << ", path# " << backup.GetFSSettings().GetPath() + << ", shardNum# " << backup.GetShardNum()); + + NBackupRestoreTraits::ECompressionCodec codec; + if (!TryCodecFromTask(backup, codec)) { + Abort(op, ctx, TStringBuilder() << "Unsupported compression codec" + << ": " << backup.GetCompression().GetCodec()); + return false; + } + + if (auto* exportFactory = appData->DataShardExportFactory) { + std::shared_ptr(exportFactory->CreateExportToFs(backup, columns)).swap(exp); + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::Run - FS export created" + << ", tableId# " << tableId + << ", exportPtr# " << (void*)exp.get()); + } else { + Abort(op, ctx, "Exports to FS are disabled"); + return false; + } } else { Abort(op, ctx, "Unsupported backup task"); return false; @@ -83,9 +110,19 @@ class TBackupUnit : public TBackupRestoreUnitBase return exp->CreateUploader(self, txId); }; + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::Run - creating buffer and scan" + << ", tableId# " << tableId + << ", txId# " << op->GetTxId()); + THolder buffer{exp->CreateBuffer()}; THolder scan{CreateExportScan(std::move(buffer), createUploader)}; + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::Run - scan created, queueing" + << ", tableId# " << tableId + << ", localTableId# " << localTableId); + const auto& taskName = appData->DataShardConfig.GetBackupTaskName(); const auto taskPrio = appData->DataShardConfig.GetBackupTaskPriority(); @@ -113,13 +150,21 @@ class TBackupUnit : public TBackupRestoreUnitBase return op->HasScanResult(); } - bool ProcessResult(TOperation::TPtr op, const TActorContext&) override { + bool ProcessResult(TOperation::TPtr op, const TActorContext& ctx) override { TActiveTransaction* tx = dynamic_cast(op.Get()); Y_ENSURE(tx, "cannot cast operation of kind " << op->GetKind()); auto* result = CheckedCast(op->ScanResult().Get()); bool done = true; + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::ProcessResult" + << ", txId# " << op->GetTxId() + << ", outcome# " << static_cast(result->Outcome) + << ", error# " << result->Error + << ", bytesRead# " << result->BytesRead + << ", rowsRead# " << result->RowsRead); + switch (result->Outcome) { case EExportOutcome::Success: case EExportOutcome::Error: @@ -128,11 +173,18 @@ class TBackupUnit : public TBackupRestoreUnitBase schemeOp->Error = std::move(result->Error); schemeOp->BytesProcessed = result->BytesRead; schemeOp->RowsProcessed = result->RowsRead; + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::ProcessResult - updated schemeOp" + << ", txId# " << op->GetTxId() + << ", success# " << schemeOp->Success); } else { Y_ENSURE(false, "Cannot find schema tx: " << op->GetTxId()); } break; case EExportOutcome::Aborted: + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::ProcessResult - aborted" + << ", txId# " << op->GetTxId()); done = false; break; } @@ -140,6 +192,11 @@ class TBackupUnit : public TBackupRestoreUnitBase op->SetScanResult(nullptr); tx->SetScanTask(0); + LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP, + "TBackupUnit::ProcessResult - done" + << ", txId# " << op->GetTxId() + << ", done# " << done); + return done; } diff --git a/ydb/core/tx/datashard/export_fs.h b/ydb/core/tx/datashard/export_fs.h new file mode 100644 index 000000000000..b61005e7d451 --- /dev/null +++ b/ydb/core/tx/datashard/export_fs.h @@ -0,0 +1,31 @@ +#pragma once + +#include "export_iface.h" + +namespace NKikimr { +namespace NDataShard { + +class TFsExport: public IExport { +public: + explicit TFsExport(const TTask& task, const TTableColumns& columns) + : Task(task) + , Columns(columns) + { + Y_ENSURE(task.HasFSSettings()); + } + + IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override; + + IBuffer* CreateBuffer() const override; + + void Shutdown() const override {} + +protected: + const TTask Task; + const TTableColumns Columns; +}; + +} // NDataShard +} // NKikimr + + diff --git a/ydb/core/tx/datashard/export_fs_uploader.cpp b/ydb/core/tx/datashard/export_fs_uploader.cpp new file mode 100644 index 000000000000..38e9e1528039 --- /dev/null +++ b/ydb/core/tx/datashard/export_fs_uploader.cpp @@ -0,0 +1,612 @@ +#include "export_common.h" +#include "export_fs.h" +#include "export_s3_buffer.h" +#include "backup_restore_traits.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr { +namespace NDataShard { + +using namespace NBackup; +using namespace NBackupRestoreTraits; + +class TFsSettings { +public: + const TString BasePath; // Base path on filesystem (e.g., /mnt/exports) + const TString RelativePath; // Relative path for this export item + const ui32 Shard; + + explicit TFsSettings(const NKikimrSchemeOp::TFSSettings& settings, ui32 shard) + : BasePath(settings.GetBasePath()) + , RelativePath(settings.GetPath()) + , Shard(shard) + { + } + + static TFsSettings FromBackupTask(const NKikimrSchemeOp::TBackupTask& task) { + return TFsSettings(task.GetFSSettings(), task.GetShardNum()); + } + + TString GetFullPath() const { + return TFsPath(BasePath) / RelativePath; + } + + TString GetPermissionsPath() const { + return TFsPath(GetFullPath()) / PermissionsKeySuffix(false); + } + + TString GetMetadataPath() const { + return TFsPath(GetFullPath()) / MetadataKeySuffix(false); + } + + TString GetSchemePath() const { + return TFsPath(GetFullPath()) / SchemeKeySuffix(false); + } + + TString GetDataPath(EDataFormat format, ECompressionCodec codec) const { + return TFsPath(GetFullPath()) / DataKeySuffix(Shard, format, codec, false); + } + + TString GetChangefeedPath(const TString& changefeedPrefix) const { + return TFsPath(GetFullPath()) / changefeedPrefix / ChangefeedKeySuffix(false); + } + + TString GetTopicPath(const TString& changefeedPrefix) const { + return TFsPath(GetFullPath()) / changefeedPrefix / TopicKeySuffix(false); + } +}; + +struct TChangefeedExportDescriptions { + const Ydb::Table::ChangefeedDescription ChangefeedDescription; + const Ydb::Topic::DescribeTopicResult Topic; + TString Name; + TString Prefix; +}; + +class TFsUploader: public TActorBootstrapped { + using TEvBuffer = TEvExportScan::TEvBuffer; + + bool WriteFile(const TString& path, const TStringBuf& data, TString& error, bool isAppend = false) { + try { + TFsPath fsPath(path); + fsPath.Parent().MkDirs(); + + auto flags = CreateAlways | WrOnly; + if (isAppend) { + flags = OpenAlways | WrOnly | ForAppend; + } + TFile file(path, flags); + file.Flock(LOCK_EX); + file.Write(data.data(), data.size()); + file.Close(); + + EXPORT_LOG_D("WriteFile succeeded" + << ": self# " << SelfId() + << ", path# " << path + << ", size# " << data.size()); + + return true; + } catch (const std::exception& ex) { + error = TStringBuilder() << "Failed to write file " << path << ": " << ex.what(); + return false; + } + } + + bool AppendFile(const TString& path, const TStringBuf& data, TString& error) { + return WriteFile(path, data, error, true); + } + + bool WriteMessage(const google::protobuf::Message& message, const TString& path, TString& error) { + TString data; + google::protobuf::TextFormat::PrintToString(message, &data); + return WriteFile(path, data, error); + } + + bool WriteFileWithChecksum(const TString& path, const TString& data, TString& error) { + if (!WriteFile(path, data, error)) { + return false; + } + + if (EnableChecksums) { + TString checksum = ComputeChecksum(data); + TFsPath fsPath(path); + TString filename = fsPath.GetName(); + checksum += ' ' + filename; + + TString checksumPath = ChecksumKey(path); + if (!WriteFile(checksumPath, checksum, error)) { + return false; + } + } + + return true; + } + + bool WriteMessageWithChecksum(const google::protobuf::Message& message, const TString& path, TString& error) { + TString data; + google::protobuf::TextFormat::PrintToString(message, &data); + return WriteFileWithChecksum(path, data, error); + } + + void UploadMetadata() { + EXPORT_LOG_I("UploadMetadata started" + << ": self# " << SelfId() + << ", path# " << Settings.GetMetadataPath() + << ", metadataSize# " << Metadata.size()); + + TString error; + if (!WriteFileWithChecksum(Settings.GetMetadataPath(), Metadata, error)) { + return Finish(false, error); + } + + MetadataUploaded = true; + EXPORT_LOG_I("UploadMetadata completed" + << ": self# " << SelfId() + << ", enablePermissions# " << EnablePermissions); + + if (EnablePermissions) { + UploadPermissions(); + } else { + UploadScheme(); + } + } + + void UploadPermissions() { + EXPORT_LOG_I("UploadPermissions started" + << ": self# " << SelfId() + << ", path# " << Settings.GetPermissionsPath() + << ", hasPermissions# " << Permissions.Defined()); + + if (!Permissions) { + return Finish(false, "Cannot infer permissions"); + } + + TString error; + if (!WriteMessageWithChecksum(Permissions.GetRef(), Settings.GetPermissionsPath(), error)) { + return Finish(false, error); + } + + PermissionsUploaded = true; + EXPORT_LOG_I("UploadPermissions completed" + << ": self# " << SelfId()); + UploadScheme(); + } + + void UploadScheme() { + EXPORT_LOG_I("UploadScheme started" + << ": self# " << SelfId() + << ", path# " << Settings.GetSchemePath() + << ", hasScheme# " << Scheme.Defined()); + + if (!Scheme) { + return Finish(false, "Cannot infer scheme"); + } + + TString error; + if (!WriteMessageWithChecksum(Scheme.GetRef(), Settings.GetSchemePath(), error)) { + return Finish(false, error); + } + + SchemeUploaded = true; + EXPORT_LOG_I("UploadScheme completed" + << ": self# " << SelfId()); + UploadChangefeeds(); + } + + void UploadChangefeeds() { + EXPORT_LOG_I("UploadChangefeeds started" + << ": self# " << SelfId() + << ", total# " << Changefeeds.size()); + + for (const auto& desc : Changefeeds) { + EXPORT_LOG_I("UploadChangefeeds processing changefeed" + << ": self# " << SelfId() + << ", name# " << desc.Name + << ", prefix# " << desc.Prefix); + + TString error; + + if (!WriteMessageWithChecksum(desc.ChangefeedDescription, Settings.GetChangefeedPath(desc.Prefix), error)) { + return Finish(false, error); + } + + if (!WriteMessageWithChecksum(desc.Topic, Settings.GetTopicPath(desc.Prefix), error)) { + return Finish(false, error); + } + } + + ChangefeedsUploaded = true; + EXPORT_LOG_I("UploadChangefeeds completed" + << ": self# " << SelfId() + << ", scanner# " << Scanner); + + StartDataUpload(); + } + + void StartDataUpload() { + EXPORT_LOG_I("StartDataUpload" + << ": self# " << SelfId() + << ", scanner# " << Scanner + << ", dataPath# " << Settings.GetDataPath(EDataFormat::Csv, CompressionCodec)); + + Become(&TThis::StateUploadData); + + if (Scanner) { + EXPORT_LOG_I("StartDataUpload: scanner ready, requesting data" + << ": self# " << SelfId()); + Send(Scanner, new TEvExportScan::TEvFeed()); + } else { + EXPORT_LOG_I("StartDataUpload: waiting for scanner" + << ": self# " << SelfId()); + } + } + + void Handle(TEvExportScan::TEvReady::TPtr& ev) { + EXPORT_LOG_I("Handle TEvExportScan::TEvReady" + << ": self# " << SelfId() + << ", sender# " << ev->Sender + << ", metadataUploaded# " << MetadataUploaded + << ", schemeUploaded# " << SchemeUploaded + << ", permissionsUploaded# " << PermissionsUploaded + << ", changefeedsUploaded# " << ChangefeedsUploaded + << ", error# " << Error.GetOrElse("none")); + + Scanner = ev->Sender; + + if (Error) { + return PassAway(); + } + + const bool permissionsDone = !EnablePermissions || PermissionsUploaded; + if (SchemeUploaded && MetadataUploaded && permissionsDone && ChangefeedsUploaded) { + StartDataUpload(); + } else { + EXPORT_LOG_I("Handle TEvReady: waiting for uploads to complete" + << ": self# " << SelfId()); + } + } + + void HandleDataReady(TEvExportScan::TEvReady::TPtr& ev) { + EXPORT_LOG_I("HandleDataReady" + << ": self# " << SelfId() + << ", sender# " << ev->Sender); + + Scanner = ev->Sender; + + if (Error) { + return PassAway(); + } + + Send(Scanner, new TEvExportScan::TEvFeed()); + } + + void Handle(TEvBuffer::TPtr& ev) { + EXPORT_LOG_I("Handle TEvExportScan::TEvBuffer" + << ": self# " << SelfId() + << ", sender# " << ev->Sender + << ", isScanner# " << (ev->Sender == Scanner) + << ", last# " << ev->Get()->Last + << ", bufferSize# " << ev->Get()->Buffer.Size() + << ", msg# " << ev->Get()->ToString()); + + if (ev->Sender != Scanner) { + EXPORT_LOG_W("Handle TEvBuffer: ignoring buffer from unknown sender" + << ": self# " << SelfId() + << ", sender# " << ev->Sender + << ", scanner# " << Scanner); + return; + } + + auto& buffer = ev->Get()->Buffer; + const TString dataPath = Settings.GetDataPath(EDataFormat::Csv, CompressionCodec); + + if (buffer.Size() > 0) { + TString error; + if (!AppendFile(dataPath, TStringBuf(buffer.Data(), buffer.Size()), error)) { + return Finish(false, error); + } + DataBytesWritten += buffer.Size(); + } + + if (ev->Get()->Last) { + EXPORT_LOG_I("Handle TEvBuffer: last buffer received" + << ": self# " << SelfId() + << ", totalBytesWritten# " << DataBytesWritten + << ", checksum# " << ev->Get()->Checksum); + + if (EnableChecksums && !ev->Get()->Checksum.empty()) { + TString checksumPath = ChecksumKey(dataPath); + TFsPath fsPath(dataPath); + TString checksumContent = ev->Get()->Checksum + ' ' + fsPath.GetName(); + + TString error; + if (!WriteFile(checksumPath, checksumContent, error)) { + return Finish(false, error); + } + } + + Finish(true); + } else { + EXPORT_LOG_I("Handle TEvBuffer: requesting more data" + << ": self# " << SelfId() + << ", bytesWrittenSoFar# " << DataBytesWritten); + Send(Scanner, new TEvExportScan::TEvFeed()); + } + } + + void Finish(bool success = true, const TString& error = TString()) { + EXPORT_LOG_I("Finish" + << ": self# " << SelfId() + << ", success# " << success + << ", error# " << error); + + if (!success) { + Error = error; + } + + if (Scanner) { + Send(Scanner, new TEvExportScan::TEvFinish(success, error)); + } + + PassAway(); + } + + void PassAway() override { + if (Scanner && Error) { + Send(Scanner, new TEvExportScan::TEvFinish(false, Error.GetOrElse(TString()))); + } + + IActor::PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR; // Reuse existing activity type + } + + static constexpr TStringBuf LogPrefix() { + return "fs"sv; + } + + explicit TFsUploader( + const TActorId& dataShard, ui64 txId, + const NKikimrSchemeOp::TBackupTask& task, + TMaybe&& scheme, + TVector changefeeds, + TMaybe&& permissions, + TString&& metadata, + ECompressionCodec compressionCodec) + : Settings(TFsSettings::FromBackupTask(task)) + , DataShard(dataShard) + , TxId(txId) + , Scheme(std::move(scheme)) + , Changefeeds(std::move(changefeeds)) + , Metadata(std::move(metadata)) + , Permissions(std::move(permissions)) + , Retries(task.GetNumberOfRetries()) + , CompressionCodec(compressionCodec) + , SchemeUploaded(task.GetShardNum() == 0 ? false : true) + , ChangefeedsUploaded(task.GetShardNum() == 0 ? false : true) + , MetadataUploaded(task.GetShardNum() == 0 ? false : true) + , PermissionsUploaded(task.GetShardNum() == 0 ? false : true) + , EnableChecksums(task.GetEnableChecksums()) + , EnablePermissions(task.GetEnablePermissions()) + { + Y_UNUSED(DataShard); + Y_UNUSED(TxId); + Y_UNUSED(Retries); + } + + void Bootstrap() { + EXPORT_LOG_I("Bootstrap" + << ": self# " << SelfId() + << ", shardNum# " << Settings.Shard + << ", basePath# " << Settings.BasePath + << ", relativePath# " << Settings.RelativePath + << ", metadataUploaded# " << MetadataUploaded + << ", schemeUploaded# " << SchemeUploaded + << ", permissionsUploaded# " << PermissionsUploaded + << ", changefeedsUploaded# " << ChangefeedsUploaded); + + if (!MetadataUploaded) { + EXPORT_LOG_I("Starting metadata upload" + << ": self# " << SelfId()); + UploadMetadata(); + } else { + EXPORT_LOG_I("Waiting for scanner" + << ": self# " << SelfId()); + Become(&TThis::StateWaitForScanner); + } + } + + STATEFN(StateBase) { + EXPORT_LOG_D("StateBase received event" + << ": self# " << SelfId() + << ", type# " << ev->GetTypeRewrite()); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExportScan::TEvReady, Handle); + + sFunc(TEvents::TEvWakeup, Bootstrap); + sFunc(TEvents::TEvPoisonPill, PassAway); + default: + EXPORT_LOG_W("StateBase unhandled event" + << ": self# " << SelfId() + << ", type# " << ev->GetTypeRewrite()); + break; + } + } + + STATEFN(StateWaitForScanner) { + EXPORT_LOG_D("StateWaitForScanner received event" + << ": self# " << SelfId() + << ", type# " << ev->GetTypeRewrite()); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExportScan::TEvReady, Handle); + hFunc(TEvBuffer, Handle); + + sFunc(TEvents::TEvPoisonPill, PassAway); + default: + EXPORT_LOG_W("StateWaitForScanner unhandled event" + << ": self# " << SelfId() + << ", type# " << ev->GetTypeRewrite()); + break; + } + } + + STATEFN(StateUploadData) { + EXPORT_LOG_D("StateUploadData received event" + << ": self# " << SelfId() + << ", type# " << ev->GetTypeRewrite()); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExportScan::TEvReady, HandleDataReady); + hFunc(TEvBuffer, Handle); + + sFunc(TEvents::TEvPoisonPill, PassAway); + default: + EXPORT_LOG_W("StateUploadData unhandled event" + << ": self# " << SelfId() + << ", type# " << ev->GetTypeRewrite()); + break; + } + } + +private: + TFsSettings Settings; + + const TActorId DataShard; + const ui64 TxId; + const TMaybe Scheme; + const TVector Changefeeds; + const TString Metadata; + const TMaybe Permissions; + + const ui32 Retries; + const ECompressionCodec CompressionCodec; + ui64 DataBytesWritten = 0; + + TActorId Scanner; + bool SchemeUploaded; + bool ChangefeedsUploaded; + bool MetadataUploaded; + bool PermissionsUploaded; + TMaybe Error; + + bool EnableChecksums; + bool EnablePermissions; + +}; // TFsUploader + +IActor* TFsExport::CreateUploader(const TActorId& dataShard, ui64 txId) const { + auto scheme = (Task.GetShardNum() == 0) + ? GenYdbScheme(Columns, Task.GetTable()) + : Nothing(); + + TMetadata metadata; + metadata.SetVersion(Task.GetEnableChecksums() ? 1 : 0); + metadata.SetEnablePermissions(Task.GetEnablePermissions()); + + TVector changefeeds; + if (AppData()->FeatureFlags.GetEnableChangefeedsExport()) { + const auto& persQueues = Task.GetChangefeedUnderlyingTopics(); + const auto& cdcStreams = Task.GetTable().GetTable().GetCdcStreams(); + Y_ASSERT(persQueues.size() == cdcStreams.size()); + + const int changefeedsCount = cdcStreams.size(); + changefeeds.reserve(changefeedsCount); + + for (int i = 0; i < changefeedsCount; ++i) { + Ydb::Table::ChangefeedDescription changefeed; + const auto& cdcStream = cdcStreams.at(i); + FillChangefeedDescription(changefeed, cdcStream); + + Ydb::Topic::DescribeTopicResult topic; + const auto& pq = persQueues.at(i); + Ydb::StatusIds::StatusCode status; + TString error; + FillTopicDescription(topic, pq.GetPersQueueGroup(), pq.GetSelf(), cdcStream.GetName(), status, error); + // Unnecessary fields + topic.clear_self(); + topic.clear_topic_stats(); + + auto& descr = changefeeds.emplace_back(changefeed, topic); + descr.Name = descr.ChangefeedDescription.name(); + descr.Prefix = descr.Name; + + metadata.AddChangefeed(TChangefeedMetadata{ + .ExportPrefix = descr.Prefix, + .Name = descr.Name, + }); + } + } + + auto permissions = (Task.GetEnablePermissions() && Task.GetShardNum() == 0) + ? GenYdbPermissions(Task.GetTable()) + : Nothing(); + + TFullBackupMetadata::TPtr backup = new TFullBackupMetadata{ + .SnapshotVts = TVirtualTimestamp( + Task.GetSnapshotStep(), + Task.GetSnapshotTxId()) + }; + metadata.AddFullBackup(backup); + + return new TFsUploader( + dataShard, txId, Task, std::move(scheme), std::move(changefeeds), std::move(permissions), + metadata.Serialize(), CodecFromTask(Task)); +} + +IExport::IBuffer* TFsExport::CreateBuffer() const { + using namespace NBackupRestoreTraits; + + const auto& scanSettings = Task.GetScanSettings(); + const ui64 maxRows = scanSettings.GetRowsBatchSize() ? scanSettings.GetRowsBatchSize() : Max(); + const ui64 maxBytes = scanSettings.GetBytesBatchSize(); + + TS3ExportBufferSettings bufferSettings; + bufferSettings + .WithColumns(Columns) + .WithMaxRows(maxRows) + .WithMaxBytes(maxBytes) + .WithMinBytes(0); // No minimum for filesystem + + if (Task.GetEnableChecksums()) { + bufferSettings.WithChecksum(TS3ExportBufferSettings::Sha256Checksum()); + } + + switch (CodecFromTask(Task)) { + case ECompressionCodec::None: + break; + case ECompressionCodec::Zstd: + bufferSettings + .WithCompression(TS3ExportBufferSettings::ZstdCompression(Task.GetCompression().GetLevel())); + break; + case ECompressionCodec::Invalid: + Y_ENSURE(false, "unreachable"); + } + + return CreateS3ExportBuffer(std::move(bufferSettings)); +} + +} // NDataShard +} // NKikimr diff --git a/ydb/core/tx/datashard/export_iface.h b/ydb/core/tx/datashard/export_iface.h index 54e45ba39bb9..b34bc0e26d1c 100644 --- a/ydb/core/tx/datashard/export_iface.h +++ b/ydb/core/tx/datashard/export_iface.h @@ -27,6 +27,7 @@ class IExportFactory { virtual IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const = 0; virtual IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const = 0; + virtual IExport* CreateExportToFs(const IExport::TTask& task, const IExport::TTableColumns& columns) const = 0; virtual void Shutdown() = 0; }; diff --git a/ydb/core/tx/datashard/export_scan.cpp b/ydb/core/tx/datashard/export_scan.cpp index 0275c392c1d4..864e2a8fdfd1 100644 --- a/ydb/core/tx/datashard/export_scan.cpp +++ b/ydb/core/tx/datashard/export_scan.cpp @@ -73,19 +73,42 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle } void MaybeReady() { + EXPORT_LOG_I("MaybeReady" + << ": self# " << SelfId() + << ", uploader# " << Uploader + << ", isRegistered# " << State.Test(ES_REGISTERED) + << ", isInitialized# " << State.Test(ES_INITIALIZED) + << ", isReady# " << IsReady()); if (IsReady()) { + EXPORT_LOG_I("MaybeReady - sending TEvReady to uploader" + << ": self# " << SelfId() + << ", uploader# " << Uploader); Send(Uploader, new TEvExportScan::TEvReady()); } } EScan MaybeSendBuffer() { const bool noMoreData = State.Test(ES_NO_MORE_DATA); + const bool bufferFilled = Buffer->IsFilled(); + const bool uploaderReady = State.Test(ES_UPLOADER_READY); + const bool bufferSent = State.Test(ES_BUFFER_SENT); + + EXPORT_LOG_D("MaybeSendBuffer" + << ": self# " << SelfId() + << ", noMoreData# " << noMoreData + << ", bufferFilled# " << bufferFilled + << ", uploaderReady# " << uploaderReady + << ", bufferSent# " << bufferSent); - if (!noMoreData && !Buffer->IsFilled()) { + if (!noMoreData && !bufferFilled) { return EScan::Feed; } - if (!State.Test(ES_UPLOADER_READY) || State.Test(ES_BUFFER_SENT)) { + if (!uploaderReady || bufferSent) { + EXPORT_LOG_D("MaybeSendBuffer - sleeping" + << ": self# " << SelfId() + << ", uploaderReady# " << uploaderReady + << ", bufferSent# " << bufferSent); Spent->Alter(false); return EScan::Sleep; } @@ -96,14 +119,25 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle if (!ev) { Success = false; Error = Buffer->GetError(); + EXPORT_LOG_E("MaybeSendBuffer - failed to prepare event" + << ": self# " << SelfId() + << ", error# " << Error); return EScan::Final; } + EXPORT_LOG_I("MaybeSendBuffer - sending buffer to uploader" + << ": self# " << SelfId() + << ", uploader# " << Uploader + << ", noMoreData# " << noMoreData + << ", rows# " << stats.Rows + << ", bytesRead# " << stats.BytesRead); Send(Uploader, std::move(ev)); State.Set(ES_BUFFER_SENT); Stats->Aggr(stats); if (noMoreData) { + EXPORT_LOG_I("MaybeSendBuffer - no more data, sleeping" + << ": self# " << SelfId()); Spent->Alter(false); return EScan::Sleep; } @@ -114,7 +148,7 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle void Handle(TEvExportScan::TEvReset::TPtr&) { Y_ENSURE(IsReady()); - EXPORT_LOG_D("Handle TEvExportScan::TEvReset" + EXPORT_LOG_I("Handle TEvExportScan::TEvReset" << ": self# " << SelfId()); Stats.Reset(new TStats); @@ -126,8 +160,9 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle void Handle(TEvExportScan::TEvFeed::TPtr&) { Y_ENSURE(IsReady()); - EXPORT_LOG_D("Handle TEvExportScan::TEvFeed" - << ": self# " << SelfId()); + EXPORT_LOG_I("Handle TEvExportScan::TEvFeed" + << ": self# " << SelfId() + << ", uploader# " << Uploader); State.Set(ES_UPLOADER_READY).Reset(ES_BUFFER_SENT); Spent->Alter(true); @@ -139,12 +174,17 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle void Handle(TEvExportScan::TEvFinish::TPtr& ev) { Y_ENSURE(IsReady()); - EXPORT_LOG_D("Handle TEvExportScan::TEvFinish" + EXPORT_LOG_I("Handle TEvExportScan::TEvFinish" << ": self# " << SelfId() + << ", sender# " << ev->Sender + << ", success# " << ev->Get()->Success + << ", error# " << ev->Get()->Error << ", msg# " << ev->Get()->ToString()); Success = ev->Get()->Success; Error = ev->Get()->Error; + EXPORT_LOG_I("Handle TEvFinish - touching driver with Final" + << ": self# " << SelfId()); Driver->Touch(EScan::Final); } @@ -184,17 +224,27 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle } void Registered(TActorSystem* sys, const TActorId&) override { + EXPORT_LOG_I("Registered - creating uploader" + << ": self# " << SelfId()); Uploader = sys->Register(CreateUploaderFn(), TMailboxType::HTSwap, AppData()->BatchPoolId); + EXPORT_LOG_I("Registered - uploader created" + << ": self# " << SelfId() + << ", uploader# " << Uploader); State.Set(ES_REGISTERED); MaybeReady(); } EScan Seek(TLead& lead, ui64) override { + EXPORT_LOG_I("Seek called" + << ": self# " << SelfId() + << ", uploader# " << Uploader); lead.To(Scheme->Tags(), {}, ESeek::Lower); Buffer->Clear(); State.Set(ES_INITIALIZED); + EXPORT_LOG_I("Seek - set initialized, calling MaybeReady" + << ": self# " << SelfId()); MaybeReady(); Spent->Alter(true); @@ -213,11 +263,19 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle } EScan Exhausted() override { + EXPORT_LOG_I("Exhausted - no more data" + << ": self# " << SelfId() + << ", uploader# " << Uploader); State.Set(ES_NO_MORE_DATA); return MaybeSendBuffer(); } TAutoPtr Finish(EStatus status) override { + EXPORT_LOG_I("Finish called" + << ": self# " << SelfId() + << ", status# " << static_cast(status) + << ", success# " << Success + << ", error# " << Error); auto outcome = EExportOutcome::Success; if (status != EStatus::Done) { outcome = status == EStatus::Exception @@ -227,6 +285,11 @@ class TExportScan: private NActors::IActorCallback, public IActorExceptionHandle outcome = EExportOutcome::Error; } + EXPORT_LOG_I("Finish - outcome determined" + << ": self# " << SelfId() + << ", outcome# " << static_cast(outcome) + << ", bytesRead# " << Stats->BytesRead + << ", rows# " << Stats->Rows); PassAway(); return new TExportScanProduct(outcome, Error, Stats->BytesRead, Stats->Rows); } diff --git a/ydb/core/tx/datashard/restore_unit.cpp b/ydb/core/tx/datashard/restore_unit.cpp index cc3990371c93..91fce89c3edf 100644 --- a/ydb/core/tx/datashard/restore_unit.cpp +++ b/ydb/core/tx/datashard/restore_unit.cpp @@ -48,6 +48,10 @@ class TRestoreUnit : public TBackupRestoreUnitBaseSetAsyncJobResult(new TImportJobProduct(true, TString(), 0, 0)); + break; + default: Abort(op, ctx, TStringBuilder() << "Unknown settings: " << static_cast(settingsKind)); return false; diff --git a/ydb/core/tx/datashard/ut_export/ya.make b/ydb/core/tx/datashard/ut_export/ya.make deleted file mode 100644 index 7e204cc7e216..000000000000 --- a/ydb/core/tx/datashard/ut_export/ya.make +++ /dev/null @@ -1,13 +0,0 @@ -UNITTEST_FOR(ydb/core/tx/datashard) - -PEERDIR( - ydb/core/testlib/default -) - -YQL_LAST_ABI_VERSION() - -SRCS( - export_s3_buffer_ut.cpp -) - -END() diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 8e59941d595e..e8e88aede10c 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -153,6 +153,7 @@ SRCS( execution_unit_ctors.h execution_unit_kind.h export_common.cpp + export_fs_uploader.cpp export_iface.cpp export_iface.h export_scan.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index c26815a43e20..2a51a04d7057 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4541,9 +4541,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { rowset.GetValue()); TString peerName = rowset.GetValueOrDefault(); - Ydb::Import::ImportFromS3Settings settings; - Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(settings, rowset.GetValue())); - + TString settings = rowset.GetValue(); TImportInfo::TPtr importInfo = new TImportInfo(id, uid, kind, settings, domainPathId, peerName); if (rowset.HaveValue()) { diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp index 06540f533841..397c0b3e7f46 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp @@ -346,6 +346,16 @@ template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromFsSettin }; } +TParts ImportKindSpecificParts(const TImportInfo& info) { + switch (info.Kind) { + case TImportInfo::EKind::S3: + return ImportKindSpecificParts(info.GetS3Settings()); + case TImportInfo::EKind::FS: + return ImportKindSpecificParts(info.GetFsSettings()); + } + return {}; +} + } // anonymous namespace template @@ -437,11 +447,14 @@ void AuditLogExportEnd(const TExportInfo& info, TSchemeShard* SS) { proto.MutableExportToS3Settings()->clear_access_key(); proto.MutableExportToS3Settings()->clear_secret_key(); break; + case TExportInfo::EKind::FS: + Y_ABORT_UNLESS(proto.MutableExportToFsSettings()->ParseFromString(info.Settings)); + break; } _AuditLogXxportEnd(info, "EXPORT END", ExportKindSpecificParts(proto), SS); } void AuditLogImportEnd(const TImportInfo& info, TSchemeShard* SS) { - _AuditLogXxportEnd(info, "IMPORT END", ImportKindSpecificParts(info.Settings), SS); + _AuditLogXxportEnd(info, "IMPORT END", ImportKindSpecificParts(info), SS); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index bbf417e1160b..52db1916b750 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -140,6 +140,10 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn exprt.MutableExportToS3Settings()->clear_access_key(); exprt.MutableExportToS3Settings()->clear_secret_key(); break; + + case TExportInfo::EKind::FS: + Y_ABORT_UNLESS(exprt.MutableExportToFsSettings()->ParseFromString(exportInfo.Settings)); + break; } } @@ -220,6 +224,8 @@ void TSchemeShard::PersistExportItemState(NIceDb::TNiceDb& db, const TExportInfo } void TSchemeShard::Handle(TEvExport::TEvCreateExportRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Handle TEvExport::TEvCreateExportRequest, txId# " << ev->Get()->Record.GetTxId()); Execute(CreateTxCreateExport(ev), ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 8df1b37af75c..84e9e87d7ee6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -148,6 +148,22 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } break; + case NKikimrExport::TCreateExportRequest::kExportToFsSettings: + { + const auto& settings = request.GetRequest().GetExportToFsSettings(); + exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::FS, settings, domainPath.Base()->PathId, request.GetPeerName()); + exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableChecksumsExport(); + exportInfo->EnablePermissions = AppData()->FeatureFlags.GetEnablePermissionsExport(); + TString explain; + if (!FillItems(*exportInfo, settings, explain)) { + return Reply( + std::move(response), + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Failed item check: " << explain + ); + } + } + break; default: Y_DEBUG_ABORT("Unknown export kind"); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export__list.cpp b/ydb/core/tx/schemeshard/schemeshard_export__list.cpp index 2507bb6bfbfd..e3c21d95db56 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__list.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__list.cpp @@ -21,6 +21,8 @@ struct TSchemeShard::TExport::TTxList: public TSchemeShard::TXxport::TTxList< static bool TryParseKind(const TString& str, TExportInfo::EKind& parsed) { if (str == "export/s3") { parsed = TExportInfo::EKind::S3; + } else if (str == "export/fs") { + parsed = TExportInfo::EKind::FS; } else { // fallback to yt parsed = TExportInfo::EKind::YT; } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index eeeb8bf79f21..00e419294151 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -256,6 +257,24 @@ THolder BackupPropose( } } break; + case TExportInfo::EKind::FS: + { + Ydb::Export::ExportToFsSettings exportSettings; + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo.Settings)); + + task.SetNumberOfRetries(exportSettings.number_of_retries()); + auto& backupSettings = *task.MutableFSSettings(); + backupSettings.SetBasePath(exportSettings.base_path()); + backupSettings.SetPath(exportSettings.items(itemIdx).destination_path()); + + if (const auto compression = exportSettings.compression()) { + Y_ABORT_UNLESS(FillCompression(*task.MutableCompression(), compression)); + } + + task.SetEnableChecksums(exportInfo.EnableChecksums); + task.SetEnablePermissions(exportInfo.EnablePermissions); + } + break; } return propose; diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp index 02b48060d451..687a416b2407 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp @@ -105,19 +105,26 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI } switch (importInfo.Kind) { - case TImportInfo::EKind::S3: - import.MutableImportFromS3Settings()->CopyFrom(importInfo.Settings); + case TImportInfo::EKind::S3: { + Ydb::Import::ImportFromS3Settings settings = importInfo.GetS3Settings(); + import.MutableImportFromS3Settings()->CopyFrom(settings); import.MutableImportFromS3Settings()->clear_access_key(); import.MutableImportFromS3Settings()->clear_secret_key(); break; } + case TImportInfo::EKind::FS: { + Ydb::Import::ImportFromFsSettings settings = importInfo.GetFsSettings(); + import.MutableImportFromFsSettings()->CopyFrom(settings); + break; + } + } } void TSchemeShard::PersistCreateImport(NIceDb::TNiceDb& db, const TImportInfo& importInfo) { db.Table().Key(importInfo.Id).Update( NIceDb::TUpdate(importInfo.Uid), NIceDb::TUpdate(static_cast(importInfo.Kind)), - NIceDb::TUpdate(importInfo.Settings.SerializeAsString()), + NIceDb::TUpdate(importInfo.SettingsSerialized), NIceDb::TUpdate(importInfo.DomainPathId.OwnerId), NIceDb::TUpdate(importInfo.DomainPathId.LocalPathId), NIceDb::TUpdate(importInfo.Items.size()), diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index 7cf443003295..9562fc1289ac 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -208,6 +208,23 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } break; + case NKikimrImport::TCreateImportRequest::kImportFromFsSettings: + { + const auto& settings = request.GetRequest().GetImportFromFsSettings(); + + importInfo = new TImportInfo(id, uid, TImportInfo::EKind::FS, settings, domainPath.Base()->PathId, request.GetPeerName()); + + if (request.HasUserSID()) { + importInfo->UserSID = request.GetUserSID(); + } + + TString explain; + if (!FillItems(*importInfo, settings, explain)) { + return Reply(std::move(response), Ydb::StatusIds::BAD_REQUEST, explain); + } + } + break; + default: Y_DEBUG_ABORT("Unknown import kind"); } @@ -267,23 +284,35 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { return true; } - template - bool FillItems(TImportInfo& importInfo, const TSettings& settings, TString& explain) { + // Common helper to validate destination path + bool ValidateAndAddDestinationPath(const TString& dstPath, THashSet& dstPaths, TString& explain) { + if (dstPath) { + if (!dstPaths.insert(NBackup::NormalizeItemPath(dstPath)).second) { + explain = TStringBuilder() << "Duplicate destination_path: " << dstPath; + return false; + } + + if (!ValidateImportDstPath(dstPath, Self, explain)) { + return false; + } + } + return true; + } + + // S3-specific FillItems + bool FillItems(TImportInfo& importInfo, const Ydb::Import::ImportFromS3Settings& settings, TString& explain) { THashSet dstPaths; importInfo.Items.reserve(settings.items().size()); for (ui32 itemIdx : xrange(settings.items().size())) { const TString& dstPath = settings.items(itemIdx).destination_path(); - if (dstPath) { - if (!dstPaths.insert(NBackup::NormalizeItemPath(dstPath)).second) { - explain = TStringBuilder() << "Duplicate destination_path: " << dstPath; - return false; - } + + if (!ValidateAndAddDestinationPath(dstPath, dstPaths, explain)) { + return false; + } - if (!ValidateImportDstPath(dstPath, Self, explain)) { - return false; - } - } else if (settings.source_prefix().empty()) { // Can not take path from schema mapping + if (!dstPath && settings.source_prefix().empty()) { + // Can not take path from schema mapping explain = "No common source prefix and item destination path set"; return false; } @@ -296,6 +325,37 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { return true; } + // FS-specific FillItems + bool FillItems(TImportInfo& importInfo, const Ydb::Import::ImportFromFsSettings& settings, TString& explain) { + THashSet dstPaths; + + importInfo.Items.reserve(settings.items().size()); + for (ui32 itemIdx : xrange(settings.items().size())) { + const TString& dstPath = settings.items(itemIdx).destination_path(); + + if (!ValidateAndAddDestinationPath(dstPath, dstPaths, explain)) { + return false; + } + + if (!dstPath) { + explain = "destination_path is required for FS import items"; + return false; + } + + const TString& srcPath = settings.items(itemIdx).source_path(); + if (!srcPath) { + explain = "source_path is required for FS import items"; + return false; + } + + auto& item = importInfo.Items.emplace_back(dstPath); + // For FS imports, source_path is the full relative path from base_path + item.SrcPath = NBackup::NormalizeItemPath(srcPath); + } + + return true; + } + }; // TTxCreate struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase { @@ -404,9 +464,14 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase LOG_I("TImport::TTxProgress: Get scheme" << ": info# " << importInfo->ToString() << ", item# " << item.ToString(itemIdx)); - - item.SchemeGetter = ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx, item.ExportItemIV)); - Self->RunningImportSchemeGetters.emplace(item.SchemeGetter); + + if (importInfo->Kind == TImportInfo::EKind::S3) { + item.SchemeGetter = ctx.RegisterWithSameMailbox(CreateSchemeGetter(Self->SelfId(), importInfo, itemIdx, item.ExportItemIV)); + Self->RunningImportSchemeGetters.emplace(item.SchemeGetter); + } else { + item.SchemeGetter = ctx.Register(CreateSchemeGetterFS(Self->SelfId(), importInfo, itemIdx), TMailboxType::Simple, AppData()->IOPoolId); + Self->RunningImportSchemeGetters.emplace(item.SchemeGetter); + } } void GetSchemaMapping(TImportInfo::TPtr importInfo, const TActorContext& ctx) { @@ -1080,8 +1145,12 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } if (!importInfo->SchemaMapping->Items.empty()) { - if (importInfo->Settings.has_encryption_settings() != importInfo->SchemaMapping->Items[0].IV.Defined()) { - return CancelAndPersist(db, importInfo, -1, {}, "incorrect schema mapping"); + // TODO(st-shchetinin): Only S3 imports support schema mapping with encryption (add for FS) + if (importInfo->Kind == TImportInfo::EKind::S3) { + auto settings = importInfo->GetS3Settings(); + if (settings.has_encryption_settings() != importInfo->SchemaMapping->Items[0].IV.Defined()) { + return CancelAndPersist(db, importInfo, -1, {}, "incorrect schema mapping"); + } } } @@ -1468,6 +1537,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (!item.Table) { Y_ABORT("Create Scheme Object: schema objects are empty"); } + if (importInfo->Kind == TImportInfo::EKind::FS) { + item.State = EState::Done; + break; + } item.State = EState::Transferring; AllocateTxId(*importInfo, itemIdx); break; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index fd87f36fa171..5cfce72b4e92 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -152,27 +153,29 @@ THolder RestoreTableDataPropose( task.SetTableName(dstPath.LeafName()); *task.MutableTableDescription() = RebuildTableDescription(GetTableDescription(ss, item.DstPathId), *item.Table); - if (importInfo.Settings.has_encryption_settings()) { - auto& taskEncryptionSettings = *task.MutableEncryptionSettings(); - *taskEncryptionSettings.MutableSymmetricKey() = importInfo.Settings.encryption_settings().symmetric_key(); - if (item.ExportItemIV) { - taskEncryptionSettings.SetIV(item.ExportItemIV->GetBinaryString()); - } - } - switch (importInfo.Kind) { case TImportInfo::EKind::S3: { - task.SetNumberOfRetries(importInfo.Settings.number_of_retries()); + auto settings = importInfo.GetS3Settings(); + + if (settings.has_encryption_settings()) { + auto& taskEncryptionSettings = *task.MutableEncryptionSettings(); + *taskEncryptionSettings.MutableSymmetricKey() = settings.encryption_settings().symmetric_key(); + if (item.ExportItemIV) { + taskEncryptionSettings.SetIV(item.ExportItemIV->GetBinaryString()); + } + } + + task.SetNumberOfRetries(settings.number_of_retries()); auto& restoreSettings = *task.MutableS3Settings(); - restoreSettings.SetEndpoint(importInfo.Settings.endpoint()); - restoreSettings.SetBucket(importInfo.Settings.bucket()); - restoreSettings.SetAccessKey(importInfo.Settings.access_key()); - restoreSettings.SetSecretKey(importInfo.Settings.secret_key()); + restoreSettings.SetEndpoint(settings.endpoint()); + restoreSettings.SetBucket(settings.bucket()); + restoreSettings.SetAccessKey(settings.access_key()); + restoreSettings.SetSecretKey(settings.secret_key()); restoreSettings.SetObjectKeyPattern(importInfo.GetItemSrcPrefix(itemIdx)); - restoreSettings.SetUseVirtualAddressing(!importInfo.Settings.disable_virtual_addressing()); + restoreSettings.SetUseVirtualAddressing(!settings.disable_virtual_addressing()); - switch (importInfo.Settings.scheme()) { + switch (settings.scheme()) { case Ydb::Import::ImportFromS3Settings::HTTP: restoreSettings.SetScheme(NKikimrSchemeOp::TS3Settings::HTTP); break; @@ -183,12 +186,26 @@ THolder RestoreTableDataPropose( Y_ABORT("Unknown scheme"); } - if (const auto region = importInfo.Settings.region()) { + if (const auto region = settings.region()) { restoreSettings.SetRegion(region); } if (!item.Metadata.HasVersion() || item.Metadata.GetVersion() > 0) { - task.SetValidateChecksums(!importInfo.Settings.skip_checksum_validation()); + task.SetValidateChecksums(!importInfo.GetSkipChecksumValidation()); + } + } + break; + + case TImportInfo::EKind::FS: + { + auto settings = importInfo.GetFsSettings(); + task.SetNumberOfRetries(settings.number_of_retries()); + auto& restoreSettings = *task.MutableFSSettings(); + restoreSettings.SetBasePath(settings.base_path()); + restoreSettings.SetPath(importInfo.GetItemSrcPrefix(itemIdx)); + + if (!item.Metadata.HasVersion() || item.Metadata.GetVersion() > 0) { + task.SetValidateChecksums(!importInfo.GetSkipChecksumValidation()); } } break; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_getters.cpp b/ydb/core/tx/schemeshard/schemeshard_import_getters.cpp index ae8362e0aacd..be4e0f5d430e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_getters.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_getters.cpp @@ -20,6 +20,9 @@ #include +#include +#include +#include #include #include @@ -44,10 +47,11 @@ struct TGetterSettings { static TGetterSettings FromImportInfo(const TImportInfo::TPtr& importInfo, TMaybe iv) { TGetterSettings settings; - settings.ExternalStorageConfig.reset(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->Settings)); - settings.Retries = importInfo->Settings.number_of_retries(); - if (importInfo->Settings.has_encryption_settings()) { - settings.Key = NBackup::TEncryptionKey(importInfo->Settings.encryption_settings().symmetric_key().key()); + Y_ABORT_UNLESS(importInfo->Kind == TImportInfo::EKind::S3); + settings.ExternalStorageConfig.reset(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->GetS3Settings())); + settings.Retries = importInfo->GetS3Settings().number_of_retries(); + if (importInfo->GetS3Settings().has_encryption_settings()) { + settings.Key = NBackup::TEncryptionKey(importInfo->GetS3Settings().encryption_settings().symmetric_key().key()); } settings.IV = std::move(iv); return settings; @@ -447,8 +451,11 @@ class TSchemeGetter: public TGetterFromS3 { LOG_T("Trying to parse metadata" << ": self# " << SelfId() << ", body# " << SubstGlobalCopy(content, "\n", "\\n")); - - item.Metadata = NBackup::TMetadata::Deserialize(content); + try { + item.Metadata = NBackup::TMetadata::Deserialize(content); + } catch (const std::exception& e) { + return Reply(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Failed to parse metadata: " << e.what()); + } if (item.Metadata.HasVersion() && item.Metadata.GetVersion() == 0) { NeedValidateChecksums = false; @@ -772,8 +779,8 @@ class TSchemeGetter: public TGetterFromS3 { , MetadataKey(MetadataKeyFromSettings(*ImportInfo, itemIdx)) , SchemeKey(SchemeKeyFromSettings(*ImportInfo, itemIdx, "scheme.pb")) , PermissionsKey(PermissionsKeyFromSettings(*ImportInfo, itemIdx)) - , NeedDownloadPermissions(!ImportInfo->Settings.no_acl()) - , NeedValidateChecksums(!ImportInfo->Settings.skip_checksum_validation()) + , NeedDownloadPermissions(!ImportInfo->GetNoAcl()) + , NeedValidateChecksums(!ImportInfo->GetSkipChecksumValidation()) { } @@ -851,15 +858,15 @@ class TSchemeGetter: public TGetterFromS3 { class TSchemaMappingGetter : public TGetterFromS3 { static TString MetadataKeyFromSettings(const TImportInfo& importInfo) { - return TStringBuilder() << importInfo.Settings.source_prefix() << "/metadata.json"; + return TStringBuilder() << importInfo.GetS3Settings().source_prefix() << "/metadata.json"; } static TString SchemaMappingKeyFromSettings(const TImportInfo& importInfo) { - return TStringBuilder() << importInfo.Settings.source_prefix() << "/SchemaMapping/mapping.json"; + return TStringBuilder() << importInfo.GetS3Settings().source_prefix() << "/SchemaMapping/mapping.json"; } static TString SchemaMappingMetadataKeyFromSettings(const TImportInfo& importInfo) { - return TStringBuilder() << importInfo.Settings.source_prefix() << "/SchemaMapping/metadata.json"; + return TStringBuilder() << importInfo.GetS3Settings().source_prefix() << "/SchemaMapping/metadata.json"; } void HandleMetadata(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { @@ -1402,10 +1409,155 @@ class TListObjectsInS3ExportGetter : public TGetterFromS3 { + + bool ProcessMetadata(const TString& content, TString& error) { + try { + ImportInfo->Items[ItemIdx].Metadata = NBackup::TMetadata::Deserialize(content); + return true; + } catch (const std::exception& e) { + error = TStringBuilder() << "Failed to parse metadata: " << e.what(); + return false; + } + } + + bool ProcessScheme(const TString& content, TString& error) { + auto& item = ImportInfo->Items[ItemIdx]; + + Ydb::Table::CreateTableRequest table; + if (table.ParseFromString(content)) { + item.Table = table; + return true; + } + + error = "Failed to parse scheme as table"; + return false; + } + + void ProcessPermissions(const TString& content) { + auto& item = ImportInfo->Items[ItemIdx]; + Ydb::Scheme::ModifyPermissionsRequest permissions; + if (permissions.ParseFromString(content)) { + item.Permissions = permissions; + } + } + + void Reply(bool success, const TString& errorMessage = {}) { + LOG_I("TSchemeGetterFS: Reply" + << ": self# " << SelfId() + << ", importId# " << ImportInfo->Id + << ", itemIdx# " << ItemIdx + << ", success# " << success + << ", error# " << errorMessage); + + Send(ReplyTo, new TEvPrivate::TEvImportSchemeReady(ImportInfo->Id, ItemIdx, success, errorMessage)); + PassAway(); + } + +public: + explicit TSchemeGetterFS(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) + : ReplyTo(replyTo) + , ImportInfo(std::move(importInfo)) + , ItemIdx(itemIdx) + { + Y_ABORT_UNLESS(ImportInfo->Kind == TImportInfo::EKind::FS); + } + + void Bootstrap() { + const auto settings = ImportInfo->GetFsSettings(); + const TString basePath = settings.base_path(); + + Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size()); + auto& item = ImportInfo->Items[ItemIdx]; + + TString sourcePath = item.SrcPath; + if (sourcePath.empty()) { + Reply(false, "Source path is empty for import item"); + return; + } + + const TFsPath itemPath = TFsPath(basePath) / sourcePath; + TString error; + + const TString metadataPath = itemPath / "metadata.json"; + TString metadataContent; + + if (!TFSHelper::ReadFile(metadataPath, metadataContent, error)) { + Reply(false, error); + return; + } + + if (!ProcessMetadata(metadataContent, error)) { + Reply(false, error); + return; + } + + const TString schemeFileName = NYdb::NDump::NFiles::TableScheme().FileName; + const TString schemePath = itemPath / schemeFileName; + TString schemeContent; + + if (!TFSHelper::ReadFile(schemePath, schemeContent, error)) { + Reply(false, error); + return; + } + + if (!ProcessScheme(schemeContent, error)) { + Reply(false, error); + return; + } + + if (!ImportInfo->GetNoAcl()) { + const TString permissionsPath = itemPath / "permissions.pb"; + TString permissionsContent; + + if (TFSHelper::ReadFile(permissionsPath, permissionsContent, error)) { + ProcessPermissions(permissionsContent); + } + } + + Reply(true); + } + +private: + const TActorId ReplyTo; + TImportInfo::TPtr ImportInfo; + const ui32 ItemIdx; +}; + IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx, TMaybe iv) { return new TSchemeGetter(replyTo, std::move(importInfo), itemIdx, std::move(iv)); } +IActor* CreateSchemeGetterFS(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) { + return new TSchemeGetterFS(replyTo, std::move(importInfo), itemIdx); +} + IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo) { return new TSchemaMappingGetter(replyTo, std::move(importInfo)); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_getters.h b/ydb/core/tx/schemeshard/schemeshard_import_getters.h index 1a5ae6f2cb9b..e8fb686e2cec 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_getters.h +++ b/ydb/core/tx/schemeshard/schemeshard_import_getters.h @@ -13,5 +13,7 @@ IActor* CreateSchemaMappingGetter(const TActorId& replyTo, TImportInfo::TPtr imp IActor* CreateListObjectsInS3ExportGetter(TEvImport::TEvListObjectsInS3ExportRequest::TPtr&& ev); +IActor* CreateSchemeGetterFS(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx); + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 56043b0ff0c7..7a9c39846929 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -2960,14 +2960,17 @@ NProtoBuf::Timestamp SecondsToProtoTimeStamp(ui64 sec) { TImportInfo::TFillItemsFromSchemaMappingResult TImportInfo::FillItemsFromSchemaMapping(TSchemeShard* ss) { TFillItemsFromSchemaMappingResult result; + Y_ABORT_UNLESS(Kind == EKind::S3); + auto settings = GetS3Settings(); + TString dstRoot; - if (Settings.destination_path().empty()) { + if (settings.destination_path().empty()) { dstRoot = CanonizePath(ss->RootPathElements); } else { - dstRoot = CanonizePath(Settings.destination_path()); + dstRoot = CanonizePath(settings.destination_path()); } - TString sourcePrefix = NBackup::NormalizeExportPrefix(Settings.source_prefix()); + TString sourcePrefix = NBackup::NormalizeExportPrefix(settings.source_prefix()); if (sourcePrefix) { sourcePrefix.push_back('/'); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 4bac42385e60..8a3ae5d0b50c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -3,6 +3,7 @@ #include "olap/schema/schema.h" #include "olap/schema/update.h" #include "schemeshard_identificators.h" +#include "schemeshard_info_types_helper.h" #include "schemeshard_path_element.h" #include "schemeshard_schema.h" #include "schemeshard_tx_infly.h" @@ -2858,6 +2859,7 @@ struct TExportInfo: public TSimpleRefCount { enum class EKind: ui8 { YT = 0, S3, + FS, }; struct TItem { @@ -3030,6 +3032,7 @@ struct TImportInfo: public TSimpleRefCount { enum class EKind: ui8 { S3 = 0, + FS = 1, }; struct TItem { @@ -3089,7 +3092,9 @@ struct TImportInfo: public TSimpleRefCount { ui64 Id; // TxId from the original TEvCreateImportRequest TString Uid; EKind Kind; - Ydb::Import::ImportFromS3Settings Settings; + const TString SettingsSerialized; + std::variant Settings; TPathId DomainPathId; TMaybe UserSID; TString PeerName; // required for making audit log records @@ -3108,6 +3113,21 @@ struct TImportInfo: public TSimpleRefCount { TInstant StartTime = TInstant::Zero(); TInstant EndTime = TInstant::Zero(); +private: + template + static TString SerializeSettings(const TSettingsPB& settings) { + TString serialized; + Y_ABORT_UNLESS(settings.SerializeToString(&serialized)); + return serialized; + } + + template + auto Visit(TFunc&& func) const { + return VisitSettings(Settings, std::forward(func)); + } + +public: + TString GetItemSrcPrefix(size_t i) const { if (i < Items.size() && Items[i].SrcPrefix) { return Items[i].SrcPrefix; @@ -3115,29 +3135,84 @@ struct TImportInfo: public TSimpleRefCount { // Backward compatibility. // But there can be no paths in settings at all. - if (i < ui32(Settings.items_size())) { - return Settings.items(i).source_prefix(); - } + return Visit([i](const auto& settings) -> TString { + // using T = std::decay_t; + return GetItemSource(settings, i); + }); + } + + Ydb::Import::ImportFromS3Settings GetS3Settings() const { + Y_ABORT_UNLESS(Kind == EKind::S3); + return std::get(Settings); + } + + Ydb::Import::ImportFromFsSettings GetFsSettings() const { + Y_ABORT_UNLESS(Kind == EKind::FS); + return std::get(Settings); + } - return {}; + // Getters for common settings fields + bool GetNoAcl() const { + return Visit([](const auto& settings) { + return settings.no_acl(); + }); + } + + bool GetSkipChecksumValidation() const { + return Visit([](const auto& settings) { + return settings.skip_checksum_validation(); + }); } explicit TImportInfo( const ui64 id, const TString& uid, const EKind kind, - const Ydb::Import::ImportFromS3Settings& settings, + const TString& serializedSettings, const TPathId domainPathId, const TString& peerName) : Id(id) , Uid(uid) , Kind(kind) - , Settings(settings) + , SettingsSerialized(serializedSettings) , DomainPathId(domainPathId) , PeerName(peerName) { + // Parse settings from serialized string based on import kind. + switch (kind) { + case EKind::S3: { + Settings = ParseSettings(serializedSettings); + break; + } + case EKind::FS: { + Settings = ParseSettings(serializedSettings); + break; + } + default: + Y_ABORT("Unknown import kind"); + } } + template + explicit TImportInfo( + const ui64 id, + const TString& uid, + const EKind kind, + const TSettingsPB& settingsPb, + const TPathId domainPathId, + const TString& peerName) + : Id(id) + , Uid(uid) + , Kind(kind) + , SettingsSerialized(SerializeSettings(settingsPb)) + , Settings(settingsPb) + , DomainPathId(domainPathId) + , PeerName(peerName) + { + } + +public: + TString ToString() const; bool IsFinished() const; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types_helper.h b/ydb/core/tx/schemeshard/schemeshard_info_types_helper.h new file mode 100644 index 000000000000..74ccf1be84d3 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_info_types_helper.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NSchemeShard { + +template +TString GetItemSource(const TItem& item); + +template <> +inline TString GetItemSource(const Ydb::Import::ImportFromS3Settings::Item& item) { + return item.source_prefix(); +} + +template <> +inline TString GetItemSource(const Ydb::Import::ImportFromFsSettings::Item& item) { + return item.source_path(); +} + +template +inline TString GetItemSource(const TSettings& settings, size_t i) { + if (i < ui32(settings.items_size())) { + return GetItemSource(settings.items(i)); + } + return {}; +} + +template +auto VisitSettings(const TVariant& settings, TFunc&& func) { + return std::visit(std::forward(func), settings); +} + +template +TSettings ParseSettings(const TString& serializedSettings) { + TSettings tmpSettings; + Y_ABORT_UNLESS(tmpSettings.ParseFromString(serializedSettings)); + return tmpSettings; +} + +} // namespace NKikimr::NSchemeShard + diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export_fs.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export_fs.cpp new file mode 100644 index 000000000000..487cb091c30e --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_export/ut_export_fs.cpp @@ -0,0 +1,423 @@ +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include + +using namespace NSchemeShardUT_Private; + +namespace { + + void Run(TTestBasicRuntime& runtime, TTestEnv& env, const TVector& tables, + const TString& request, + Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { + + ui64 txId = 100; + + for (const auto& table : tables) { + TestCreateTable(runtime, ++txId, "/MyRoot", table); + env.TestWaitNotification(runtime, txId); + } + + runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); + + const auto initialStatus = expectedStatus == Ydb::StatusIds::PRECONDITION_FAILED + ? expectedStatus + : Ydb::StatusIds::SUCCESS; + TestExport(runtime, ++txId, "/MyRoot", request, "", "", initialStatus); + env.TestWaitNotification(runtime, txId); + + if (initialStatus != Ydb::StatusIds::SUCCESS) { + return; + } + + const ui64 exportId = txId; + TestGetExport(runtime, exportId, "/MyRoot", expectedStatus); + + TestForgetExport(runtime, ++txId, "/MyRoot", exportId); + env.TestWaitNotification(runtime, exportId); + + TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + } + + class TFsExportFixture : public NUnitTest::TBaseFixture { + public: + void RunFs(const TVector& tables, const TString& basePath, const TString& destinationPath, + Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS, + bool checkFsFilesExistence = true) { + + TString requestStr = Sprintf(R"( + ExportToFsSettings { + base_path: "%s" + items { + source_path: "/MyRoot/%s" + destination_path: "%s" + } + } + )", basePath.c_str(), tables[0].Contains("Name:") ? ExtractTableName(tables[0]).c_str() : "Table", destinationPath.c_str()); + + Env(); // Init test env + Runtime().GetAppData().FeatureFlags.SetEnableChecksumsExport(true); + Runtime().GetAppData().FeatureFlags.SetEnablePermissionsExport(true); + + Run(Runtime(), Env(), tables, requestStr, expectedStatus); + + if (expectedStatus == Ydb::StatusIds::SUCCESS && checkFsFilesExistence) { + TFsPath exportPath = TFsPath(basePath) / destinationPath; + + // Check metadata file + TFsPath metadataPath = exportPath / "metadata.json"; + UNIT_ASSERT_C(metadataPath.Exists(), "Metadata file should exist: " << metadataPath.GetPath()); + + // Check scheme file + TFsPath schemePath = exportPath / "scheme.pb"; + UNIT_ASSERT_C(schemePath.Exists(), "Scheme file should exist: " << schemePath.GetPath()); + + // Check permissions file (if enabled) + if (Runtime().GetAppData().FeatureFlags.GetEnablePermissionsExport()) { + TFsPath permissionsPath = exportPath / "permissions.pb"; + UNIT_ASSERT_C(permissionsPath.Exists(), "Permissions file should exist: " << permissionsPath.GetPath()); + } + + // Check checksums (if enabled) + if (Runtime().GetAppData().FeatureFlags.GetEnableChecksumsExport()) { + TFsPath metadataChecksumPath = exportPath / "metadata.json.sha256"; + UNIT_ASSERT_C(metadataChecksumPath.Exists(), "Metadata checksum should exist: " << metadataChecksumPath.GetPath()); + + TFsPath schemeChecksumPath = exportPath / "scheme.pb.sha256"; + UNIT_ASSERT_C(schemeChecksumPath.Exists(), "Scheme checksum should exist: " << schemeChecksumPath.GetPath()); + } + } + } + + void RunFsMultiTable(const TVector& tables, const TString& basePath, const TVector& destinationPaths, + Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) { + + TStringBuilder items; + for (size_t i = 0; i < tables.size(); ++i) { + TString tableName = ExtractTableName(tables[i]); + TString destPath = i < destinationPaths.size() ? destinationPaths[i] : ("backup/" + tableName); + + items << "items {" + << " source_path: \"/MyRoot/" << tableName << "\"" + << " destination_path: \"" << destPath << "\"" + << " }"; + } + + TString requestStr = Sprintf(R"( + ExportToFsSettings { + base_path: "%s" + %s + } + )", basePath.c_str(), items.c_str()); + + Env(); // Init test env + Runtime().GetAppData().FeatureFlags.SetEnableChecksumsExport(true); + Runtime().GetAppData().FeatureFlags.SetEnablePermissionsExport(true); + + Run(Runtime(), Env(), tables, requestStr, expectedStatus); + + if (expectedStatus == Ydb::StatusIds::SUCCESS) { + for (size_t i = 0; i < destinationPaths.size(); ++i) { + TFsPath exportPath = TFsPath(basePath) / destinationPaths[i]; + TFsPath schemePath = exportPath / "scheme.pb"; + UNIT_ASSERT_C(schemePath.Exists(), "Scheme file should exist for table " << i << ": " << schemePath.GetPath()); + } + } + } + + bool HasFsFile(const TString& basePath, const TString& relativePath) { + TFsPath filePath = TFsPath(basePath) / relativePath; + return filePath.Exists(); + } + + TString GetFsFileContent(const TString& basePath, const TString& relativePath) { + TFsPath filePath = TFsPath(basePath) / relativePath; + if (filePath.Exists()) { + TFileInput file(filePath.GetPath()); + return file.ReadAll(); + } + return {}; + } + + protected: + TTestBasicRuntime& Runtime() { + if (!TestRuntime) { + TestRuntime.ConstructInPlace(); + } + return *TestRuntime; + } + + TTestEnvOptions& EnvOptions() { + if (!TestEnvOptions) { + TestEnvOptions.ConstructInPlace(); + } + return *TestEnvOptions; + } + + TTestEnv& Env() { + if (!TestEnv) { + TestEnv.ConstructInPlace(Runtime(), EnvOptions()); + } + return *TestEnv; + } + + TTempDir& TempDir() { + if (!TestTempDir) { + TestTempDir.ConstructInPlace(); + } + return *TestTempDir; + } + + private: + static TString ExtractTableName(const TString& tableSchema) { + // Extract "Name: "Table"" from schema + size_t pos = tableSchema.find("Name:"); + if (pos == TString::npos) { + return "Table"; + } + pos = tableSchema.find('"', pos); + if (pos == TString::npos) { + return "Table"; + } + size_t endPos = tableSchema.find('"', pos + 1); + if (endPos == TString::npos) { + return "Table"; + } + return tableSchema.substr(pos + 1, endPos - pos - 1); + } + + TMaybe TestRuntime; + TMaybe TestEnvOptions; + TMaybe TestEnv; + TMaybe TestTempDir; + }; + +} // anonymous + +Y_UNIT_TEST_SUITE_F(TExportToFsTests, TFsExportFixture) { + Y_UNIT_TEST(ShouldSucceedOnSingleShardTable) { + TString basePath = TempDir().Path(); + + RunFs({ + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + }, basePath, "backup/Table"); + } + + Y_UNIT_TEST(ShouldSucceedOnMultiShardTable) { + TString basePath = TempDir().Path(); + + RunFs({ + R"( + Name: "Table" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + UniformPartitionsCount: 2 + )", + }, basePath, "backup/MultiShardTable"); + } + + Y_UNIT_TEST(ShouldSucceedOnManyTables) { + TString basePath = TempDir().Path(); + + RunFsMultiTable({ + R"( + Name: "Table1" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + R"( + Name: "Table2" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + }, basePath, {"backup/Table1", "backup/Table2"}); + } + + Y_UNIT_TEST(ShouldCheckFilesCreatedOnDisk) { + TString basePath = TempDir().Path(); + TString destinationPath = "backup/TestTable"; + + RunFs({ + R"( + Name: "TestTable" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + }, basePath, destinationPath); + + // Check all expected files exist + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/metadata.json"), "metadata.json"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/scheme.pb"), "scheme.pb"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/permissions.pb"), "permissions.pb"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/metadata.json.sha256"), "metadata.json.sha256"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/scheme.pb.sha256"), "scheme.pb.sha256"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/permissions.pb.sha256"), "permissions.pb.sha256"); + + // Check scheme content + TString schemeContent = GetFsFileContent(basePath, destinationPath + "/scheme.pb"); + UNIT_ASSERT_C(!schemeContent.empty(), "Scheme file should not be empty"); + + Ydb::Table::CreateTableRequest schemeProto; + UNIT_ASSERT_C(google::protobuf::TextFormat::ParseFromString(schemeContent, &schemeProto), + "Should parse scheme protobuf"); + + UNIT_ASSERT_VALUES_EQUAL(schemeProto.columns_size(), 2); + UNIT_ASSERT_VALUES_EQUAL(schemeProto.columns(0).name(), "key"); + UNIT_ASSERT_VALUES_EQUAL(schemeProto.columns(1).name(), "value"); + UNIT_ASSERT_VALUES_EQUAL(schemeProto.primary_key_size(), 1); + UNIT_ASSERT_VALUES_EQUAL(schemeProto.primary_key(0), "key"); + + // Check checksum format + TString checksumContent = GetFsFileContent(basePath, destinationPath + "/metadata.json.sha256"); + UNIT_ASSERT_C(!checksumContent.empty(), "Checksum should not be empty"); + UNIT_ASSERT_C(checksumContent.Contains("metadata.json"), "Checksum should contain filename"); + UNIT_ASSERT_GE(checksumContent.size(), 64); // sha256 is 64 hex chars + } + + Y_UNIT_TEST(ShouldAcceptCompressionSettings) { + TString basePath = TempDir().Path(); + TString destinationPath = "backup/Table"; + ui64 txId = 100; + + Env(); + Runtime().GetAppData().FeatureFlags.SetEnableChecksumsExport(true); + Runtime().GetAppData().FeatureFlags.SetEnablePermissionsExport(true); + + TestCreateTable(Runtime(), ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + Env().TestWaitNotification(Runtime(), txId); + + TString request = Sprintf(R"( + ExportToFsSettings { + base_path: "%s" + compression: "zstd-3" + items { + source_path: "/MyRoot/Table" + destination_path: "%s" + } + } + )", basePath.c_str(), destinationPath.c_str()); + + TestExport(Runtime(), ++txId, "/MyRoot", request); + Env().TestWaitNotification(Runtime(), txId); + + auto response = TestGetExport(Runtime(), txId, "/MyRoot"); + UNIT_ASSERT(response.GetResponse().GetEntry().HasExportToFsSettings()); + + const auto& settings = response.GetResponse().GetEntry().GetExportToFsSettings(); + UNIT_ASSERT_VALUES_EQUAL(settings.compression(), "zstd-3"); + + // Check that files exist on filesystem + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/metadata.json"), + "metadata.json should exist"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/scheme.pb"), + "scheme.pb should exist"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/permissions.pb"), + "permissions.pb should exist"); + + // Check checksums exist + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/metadata.json.sha256"), + "metadata.json.sha256 should exist"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/scheme.pb.sha256"), + "scheme.pb.sha256 should exist"); + UNIT_ASSERT_C(HasFsFile(basePath, destinationPath + "/permissions.pb.sha256"), + "permissions.pb.sha256 should exist"); + + TString schemeContent = GetFsFileContent(basePath, destinationPath + "/scheme.pb"); + UNIT_ASSERT_C(!schemeContent.empty(), "Scheme file should not be empty"); + + Ydb::Table::CreateTableRequest schemeProto; + UNIT_ASSERT_C(google::protobuf::TextFormat::ParseFromString(schemeContent, &schemeProto), + "Should parse scheme protobuf"); + + UNIT_ASSERT_VALUES_EQUAL(schemeProto.columns_size(), 2); + UNIT_ASSERT_VALUES_EQUAL(schemeProto.primary_key_size(), 1); + } + + Y_UNIT_TEST(ShouldFailOnNonExistentPath) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestExport(runtime, ++txId, "/MyRoot", R"( + ExportToFsSettings { + base_path: "/tmp/ydb_export" + items { + source_path: "/MyRoot/NonExistentTable" + destination_path: "backup/Table" + } + } + )", "", "", Ydb::StatusIds::BAD_REQUEST); + } + + Y_UNIT_TEST(ShouldFailOnDeletedPath) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "TableToDelete" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestDropTable(runtime, ++txId, "/MyRoot", "TableToDelete"); + env.TestWaitNotification(runtime, txId); + + TestExport(runtime, ++txId, "/MyRoot", R"( + ExportToFsSettings { + base_path: "/tmp/ydb_export" + items { + source_path: "/MyRoot/TableToDelete" + destination_path: "backup/Table" + } + } + )", "", "", Ydb::StatusIds::BAD_REQUEST); + } + + Y_UNIT_TEST(ShouldHandleNestedPaths) { + TString basePath = TempDir().Path(); + + RunFs({ + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + }, basePath, "deep/nested/directory/structure/backup"); + } +} diff --git a/ydb/core/tx/schemeshard/ut_export/ya.make b/ydb/core/tx/schemeshard/ut_export/ya.make index df6ca93fb643..de730e88e51a 100644 --- a/ydb/core/tx/schemeshard/ut_export/ya.make +++ b/ydb/core/tx/schemeshard/ut_export/ya.make @@ -28,6 +28,7 @@ IF (NOT OS_WINDOWS) ) SRCS( ut_export.cpp + ut_export_fs.cpp ) ENDIF() diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore_fs.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore_fs.cpp new file mode 100644 index 000000000000..0984db45876d --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore_fs.cpp @@ -0,0 +1,423 @@ +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace NSchemeShardUT_Private; + +namespace { + +class TTempBackupFiles { +public: + explicit TTempBackupFiles() + { + } + + const TString& GetBasePath() const { + return TempDir.Name(); + } + + void CreateTableBackup(const TString& tablePath, const TString& tableName) { + CreateTableBackup( + tablePath, + tableName, + { + {"key", Ydb::Type::UTF8}, + {"value", Ydb::Type::UTF8} + }, + {"key"} + ); + } + + void CreateTableBackup(const TString& tablePath, const TString& tableName, + const TVector>& columns, + const TVector& keyColumns) { + const TString fullPath = TempDir.Name() + "/" + tablePath; + MakePathIfNotExist(fullPath.c_str()); + + // Create metadata.json + CreateMetadataFile(fullPath); + + // Create scheme.pb + CreateTableSchemeFile(fullPath, tableName, columns, keyColumns); + + // Create permissions.pb + CreatePermissionsFile(fullPath); + } + +private: + static void CreateMetadataFile(const TString& dirPath) { + NBackup::TMetadata metadata; + metadata.SetVersion(1); + metadata.SetEnablePermissions(true); + + TString serialized = metadata.Serialize(); + + TFileOutput file(dirPath + "/metadata.json"); + file.Write(serialized); + } + + static void CreateTableSchemeFile(const TString& dirPath, const TString& tableName, + const TVector>& columns, + const TVector& keyColumns) { + Ydb::Table::CreateTableRequest table; + table.set_path(tableName); + + for (const auto& [colName, colType] : columns) { + auto* col = table.add_columns(); + col->set_name(colName); + col->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(colType); + } + + for (const auto& keyCol : keyColumns) { + table.add_primary_key(keyCol); + } + + TString serialized; + Y_ABORT_UNLESS(table.SerializeToString(&serialized)); + + TFileOutput file(dirPath + "/" + NYdb::NDump::NFiles::TableScheme().FileName); + file.Write(serialized); + } + + static void CreatePermissionsFile(const TString& dirPath) { + Ydb::Scheme::ModifyPermissionsRequest permissions; + + TString serialized; + Y_ABORT_UNLESS(permissions.SerializeToString(&serialized)); + + TFileOutput file(dirPath + "/permissions.pb"); + file.Write(serialized); + } + + TTempDir TempDir; +}; + +} // namespace + +Y_UNIT_TEST_SUITE(TSchemeShardImportFromFsTests) { + Y_UNIT_TEST(ShouldSucceedCreateImportFromFs) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TTempBackupFiles backup; + backup.CreateTableBackup("backup/Table", "Table"); + + TString importSettings = Sprintf(R"( + ImportFromFsSettings { + base_path: "%s" + items { + source_path: "backup/Table" + destination_path: "/MyRoot/RestoredTable" + } + } + )", backup.GetBasePath().c_str()); + + TestImport(runtime, ++txId, "/MyRoot", importSettings); + env.TestWaitNotification(runtime, txId); + + auto response = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + const auto& entry = response.GetResponse().GetEntry(); + + UNIT_ASSERT(entry.HasImportFromFsSettings()); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE); + + const auto& settings = entry.GetImportFromFsSettings(); + UNIT_ASSERT_VALUES_EQUAL(settings.base_path(), backup.GetBasePath()); + UNIT_ASSERT_VALUES_EQUAL(settings.items_size(), 1); + UNIT_ASSERT_VALUES_EQUAL(settings.items(0).source_path(), "backup/Table"); + UNIT_ASSERT_VALUES_EQUAL(settings.items(0).destination_path(), "/MyRoot/RestoredTable"); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/RestoredTable"), { + NLs::PathExist, + NLs::IsTable + }); + } + + Y_UNIT_TEST(ShouldAcceptNoAclForFs) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TTempBackupFiles backup; + backup.CreateTableBackup("backup/Table", "Table"); + + TString importSettings = Sprintf(R"( + ImportFromFsSettings { + base_path: "%s" + no_acl: true + items { + source_path: "backup/Table" + destination_path: "/MyRoot/RestoredTable" + } + } + )", backup.GetBasePath().c_str()); + + TestImport(runtime, ++txId, "/MyRoot", importSettings); + env.TestWaitNotification(runtime, txId); + + auto response = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + const auto& entry = response.GetResponse().GetEntry(); + + UNIT_ASSERT(entry.HasImportFromFsSettings()); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE); + + const auto& settings = entry.GetImportFromFsSettings(); + UNIT_ASSERT_VALUES_EQUAL(settings.no_acl(), true); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/RestoredTable"), { + NLs::PathExist, + NLs::IsTable + }); + } + + Y_UNIT_TEST(ShouldAcceptSkipChecksumValidation) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TTempBackupFiles backup; + backup.CreateTableBackup("backup/Table", "Table"); + + TString importSettings = Sprintf(R"( + ImportFromFsSettings { + base_path: "%s" + skip_checksum_validation: true + items { + source_path: "backup/Table" + destination_path: "/MyRoot/RestoredTable" + } + } + )", backup.GetBasePath().c_str()); + + TestImport(runtime, ++txId, "/MyRoot", importSettings); + env.TestWaitNotification(runtime, txId); + + auto response = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + const auto& entry = response.GetResponse().GetEntry(); + + UNIT_ASSERT(entry.HasImportFromFsSettings()); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE); + + const auto& settings = entry.GetImportFromFsSettings(); + UNIT_ASSERT_VALUES_EQUAL(settings.skip_checksum_validation(), true); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/RestoredTable"), { + NLs::PathExist, + NLs::IsTable + }); + } + + Y_UNIT_TEST(ShouldFailOnInvalidDestinationPath) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + // Invalid destination path (empty) should fail validation + TestImport(runtime, ++txId, "/MyRoot", R"( + ImportFromFsSettings { + base_path: "/mnt/backups" + items { + source_path: "backup/Table" + destination_path: "" + } + } + )", "", "", Ydb::StatusIds::BAD_REQUEST); + } + + Y_UNIT_TEST(ShouldFailOnDuplicateDestination) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + // Duplicate destination paths should fail validation + TestImport(runtime, ++txId, "/MyRoot", R"( + ImportFromFsSettings { + base_path: "/mnt/backups" + items { + source_path: "backup/Table1" + destination_path: "/MyRoot/SameName" + } + items { + source_path: "backup/Table2" + destination_path: "/MyRoot/SameName" + } + } + )", "", "", Ydb::StatusIds::BAD_REQUEST); + } + + Y_UNIT_TEST(FsImportWithMultipleTables) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TTempBackupFiles backup; + backup.CreateTableBackup("backup/Table1", "Table1"); + backup.CreateTableBackup("backup/Table2", "Table2"); + + TString importSettings = Sprintf(R"( + ImportFromFsSettings { + base_path: "%s" + items { + source_path: "backup/Table1" + destination_path: "/MyRoot/RestoredTable1" + } + items { + source_path: "backup/Table2" + destination_path: "/MyRoot/RestoredTable2" + } + } + )", backup.GetBasePath().c_str()); + + TestImport(runtime, ++txId, "/MyRoot", importSettings); + env.TestWaitNotification(runtime, txId); + + auto response = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + const auto& entry = response.GetResponse().GetEntry(); + + UNIT_ASSERT(entry.HasImportFromFsSettings()); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE); + + const auto& settings = entry.GetImportFromFsSettings(); + UNIT_ASSERT_VALUES_EQUAL(settings.items_size(), 2); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/RestoredTable1"), { + NLs::PathExist, + NLs::IsTable + }); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/RestoredTable2"), { + NLs::PathExist, + NLs::IsTable + }); + } + + Y_UNIT_TEST(ShouldFailOnMissingBackupFiles) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TTempBackupFiles backup; + + TString importSettings = Sprintf(R"( + ImportFromFsSettings { + base_path: "%s" + items { + source_path: "backup/NonExistentTable" + destination_path: "/MyRoot/RestoredTable" + } + } + )", backup.GetBasePath().c_str()); + + TestImport(runtime, ++txId, "/MyRoot", importSettings); + env.TestWaitNotification(runtime, txId); + + auto response = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED); + const auto& entry = response.GetResponse().GetEntry(); + + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/RestoredTable"), { + NLs::PathNotExist + }); + } + + Y_UNIT_TEST(ShouldValidateTableSchema) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TTempBackupFiles backup; + backup.CreateTableBackup("backup/ComplexTable", "ComplexTable"); + + TString importSettings = Sprintf(R"( + ImportFromFsSettings { + base_path: "%s" + items { + source_path: "backup/ComplexTable" + destination_path: "/MyRoot/ComplexTable" + } + } + )", backup.GetBasePath().c_str()); + + TestImport(runtime, ++txId, "/MyRoot", importSettings); + env.TestWaitNotification(runtime, txId); + + auto response = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetEntry().GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE); + + auto describe = DescribePath(runtime, "/MyRoot/ComplexTable"); + TestDescribeResult(describe, { + NLs::PathExist, + NLs::IsTable + }); + + const auto& table = describe.GetPathDescription().GetTable(); + UNIT_ASSERT_VALUES_EQUAL(table.ColumnsSize(), 2); + UNIT_ASSERT_VALUES_EQUAL(table.GetColumns(0).GetName(), "key"); + UNIT_ASSERT_VALUES_EQUAL(table.GetColumns(1).GetName(), "value"); + UNIT_ASSERT_VALUES_EQUAL(table.KeyColumnNamesSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(table.GetKeyColumnNames(0), "key"); + } + + Y_UNIT_TEST(ShouldImportTableWithDifferentTypes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TTempBackupFiles backup; + backup.CreateTableBackup( + "backup/TypedTable", + "TypedTable", + { + {"id", Ydb::Type::UINT64}, + {"name", Ydb::Type::UTF8}, + {"value", Ydb::Type::INT32}, + {"flag", Ydb::Type::BOOL} + }, + {"id"} + ); + + TString importSettings = Sprintf(R"( + ImportFromFsSettings { + base_path: "%s" + items { + source_path: "backup/TypedTable" + destination_path: "/MyRoot/TypedTable" + } + } + )", backup.GetBasePath().c_str()); + + TestImport(runtime, ++txId, "/MyRoot", importSettings); + env.TestWaitNotification(runtime, txId); + + auto response = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetEntry().GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE); + + auto describe = DescribePath(runtime, "/MyRoot/TypedTable"); + TestDescribeResult(describe, { + NLs::PathExist, + NLs::IsTable + }); + + const auto& table = describe.GetPathDescription().GetTable(); + UNIT_ASSERT_VALUES_EQUAL(table.ColumnsSize(), 4); + UNIT_ASSERT_VALUES_EQUAL(table.GetColumns(0).GetName(), "id"); + UNIT_ASSERT_VALUES_EQUAL(table.GetColumns(1).GetName(), "name"); + UNIT_ASSERT_VALUES_EQUAL(table.GetColumns(2).GetName(), "value"); + UNIT_ASSERT_VALUES_EQUAL(table.GetColumns(3).GetName(), "flag"); + UNIT_ASSERT_VALUES_EQUAL(table.KeyColumnNamesSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(table.GetKeyColumnNames(0), "id"); + } +} + + diff --git a/ydb/core/tx/schemeshard/ut_restore/ya.make b/ydb/core/tx/schemeshard/ut_restore/ya.make index 72c4ad464ef0..e4009603e204 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ya.make +++ b/ydb/core/tx/schemeshard/ut_restore/ya.make @@ -27,6 +27,7 @@ PEERDIR( SRCS( ut_restore.cpp + ut_restore_fs.cpp ) YQL_LAST_ABI_VERSION()