Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ydb/apps/ydbd/export.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "export.h"

#include <ydb/core/tx/datashard/export_s3.h>
#include <ydb/core/tx/datashard/export_fs.h>

NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt(
const IExport::TTask& task, const IExport::TTableColumns& columns) const
Expand All @@ -22,5 +23,11 @@ NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3(
#endif
}

NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToFs(
const IExport::TTask& task, const IExport::TTableColumns& columns) const
{
return new NKikimr::NDataShard::TFsExport(task, columns);
}

void TDataShardExportFactory::Shutdown() {
}
1 change: 1 addition & 0 deletions ydb/apps/ydbd/export.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory {
public:
IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
IExport* CreateExportToFs(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
void Shutdown() override;
};
7 changes: 7 additions & 0 deletions ydb/apps/ydbd/export/export.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "export.h"

#include <ydb/core/tx/datashard/export_s3.h>
#include <ydb/core/tx/datashard/export_fs.h>

NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt(
const IExport::TTask& task, const IExport::TTableColumns& columns) const
Expand All @@ -22,6 +23,12 @@ NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3(
#endif
}

NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToFs(
const IExport::TTask& task, const IExport::TTableColumns& columns) const
{
return new NKikimr::NDataShard::TFsExport(task, columns);
}

void TDataShardExportFactory::Shutdown() {
// No cleanup required for TDataShardExportFactory.
}
1 change: 1 addition & 0 deletions ydb/apps/ydbd/export/export.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory {
public:
IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
IExport* CreateExportToFs(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
void Shutdown() override;
};
7 changes: 7 additions & 0 deletions ydb/core/driver_lib/run/export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <kikimr/yndx/yt/yt_shutdown.h>

#include <contrib/ydb/core/tx/datashard/export_s3.h>
#include <contrib/ydb/core/tx/datashard/export_fs.h>

NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt(
const IExport::TTask& task, const IExport::TTableColumns& columns) const
Expand All @@ -29,6 +30,12 @@ NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3(
#endif
}

NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToFs(
const IExport::TTask& task, const IExport::TTableColumns& columns) const
{
return new NKikimr::NDataShard::TFsExport(task, columns);
}

void TDataShardExportFactory::Shutdown() {
#ifndef KIKIMR_DISABLE_YT
ShutdownYT();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/export.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory {
public:
IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
IExport* CreateExportToFs(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
void Shutdown() override;
};
3 changes: 3 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "ydb/core/protos/channel_purpose.proto";
import "ydb/core/protos/compaction.proto";
import "ydb/core/protos/filestore_config.proto";
import "ydb/core/protos/follower_group.proto";
import "ydb/core/protos/fs_settings.proto";
import "ydb/core/protos/index_builder.proto";
import "ydb/core/protos/pqconfig.proto";
import "ydb/core/protos/replication.proto";
Expand Down Expand Up @@ -1320,6 +1321,7 @@ message TBackupTask {
oneof Settings {
TYTSettings YTSettings = 4;
NKikimrSchemeOp.TS3Settings S3Settings = 9;
NKikimrSchemeOp.TFSSettings FSSettings = 20;
}

optional TPathDescription Table = 10; // for further restore
Expand Down Expand Up @@ -1367,6 +1369,7 @@ message TRestoreTask {

oneof Settings {
NKikimrSchemeOp.TS3Settings S3Settings = 6;
NKikimrSchemeOp.TFSSettings FSSettings = 9;
}

optional bool ValidateChecksums = 7; // currently available for s3
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/protos/fs_settings.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package NKikimrSchemeOp;
option java_package = "ru.yandex.kikimr.proto";

message TFSSettings {
optional string BasePath = 1; // Base path on the file system (e.g., /mnt/exports)
optional string Path = 2; // Relative path for this specific backup/restore operation
Comment on lines +5 to +6
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The protobuf fields BasePath and Path use optional string which allows them to be empty. However, operations that use these settings expect non-empty values. Consider:

  1. Making these fields required instead of optional (though this is deprecated in proto3)
  2. Or adding validation in the code that uses these settings to reject empty paths
  3. Adding comments to document that these fields must not be empty
Suggested change
optional string BasePath = 1; // Base path on the file system (e.g., /mnt/exports)
optional string Path = 2; // Relative path for this specific backup/restore operation
// Base path on the file system (e.g., /mnt/exports). This field must not be empty.
optional string BasePath = 1;
// Relative path for this specific backup/restore operation. This field must not be empty.
optional string Path = 2;

Copilot uses AI. Check for mistakes.
}



1 change: 1 addition & 0 deletions ydb/core/protos/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ SRCS(
flat_scheme_op.proto
flat_tx_scheme.proto
follower_group.proto
fs_settings.proto
grpc.proto
grpc_pq_old.proto
grpc_status_proxy.proto
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/testlib/basics/appdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/tx/datashard/export_iface.h>
#include <ydb/core/tx/datashard/export_s3.h>
#include <ydb/core/tx/datashard/export_fs.h>
#include <ydb/core/tx/schemeshard/schemeshard_operation_factory.h>
#include <ydb/core/protos/blobstorage.pb.h>
#include <ydb/core/protos/config.pb.h>
Expand Down Expand Up @@ -48,6 +49,12 @@ namespace NKikimr {
#endif
}

IExport* CreateExportToFs(
const IExport::TTask& task, const IExport::TTableColumns& columns) const override
{
return new NDataShard::TFsExport(task, columns);
}

void Shutdown() override {
}
};
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/backup/iscan/iscan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ TConclusion<std::unique_ptr<NTable::IScan>> CreateIScanExportUploader(const TAct
break;
case NKikimrSchemeOp::TBackupTask::SETTINGS_NOT_SET:
return TConclusionStatus::Fail("Internal error. It is not possible to have empty settings for backup here");
default:
return TConclusionStatus::Fail("Internal error. Unsupported type of backup task settings");
}

auto createUploader = [subscriberActorId = subscriberActorId, txId = txId, exp]() {
Expand Down Expand Up @@ -331,4 +333,4 @@ std::unique_ptr<IActor> CreateExportUploaderActor(const TActorId& subscriberActo
return std::make_unique<TUploaderActor>(backupTask, exportFactory, tableColumns, subscriberActorId, txId);
}

} // namespace NKikimr::NColumnShard::NBackup
} // namespace NKikimr::NColumnShard::NBackup
59 changes: 58 additions & 1 deletion ydb/core/tx/datashard/backup_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#include "export_iface.h"
#include "export_scan.h"
#include "export_s3.h"
#include "export_fs.h"

#include <ydb/core/protos/datashard_config.pb.h>
#include <ydb/core/protos/fs_settings.pb.h>

namespace NKikimr {
namespace NDataShard {
Expand Down Expand Up @@ -74,6 +76,31 @@ class TBackupUnit : public TBackupRestoreUnitBase<TEvDataShard::TEvCancelBackup>
Abort(op, ctx, "Exports to S3 are disabled");
return false;
}
} else if (backup.HasFSSettings()) {
LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::Run - FS export"
<< ", tableId# " << tableId
<< ", basePath# " << backup.GetFSSettings().GetBasePath()
<< ", path# " << backup.GetFSSettings().GetPath()
<< ", shardNum# " << backup.GetShardNum());

NBackupRestoreTraits::ECompressionCodec codec;
if (!TryCodecFromTask(backup, codec)) {
Abort(op, ctx, TStringBuilder() << "Unsupported compression codec"
<< ": " << backup.GetCompression().GetCodec());
return false;
}

if (auto* exportFactory = appData->DataShardExportFactory) {
std::shared_ptr<IExport>(exportFactory->CreateExportToFs(backup, columns)).swap(exp);
LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::Run - FS export created"
<< ", tableId# " << tableId
<< ", exportPtr# " << (void*)exp.get());
} else {
Abort(op, ctx, "Exports to FS are disabled");
return false;
}
} else {
Abort(op, ctx, "Unsupported backup task");
return false;
Expand All @@ -83,9 +110,19 @@ class TBackupUnit : public TBackupRestoreUnitBase<TEvDataShard::TEvCancelBackup>
return exp->CreateUploader(self, txId);
};

LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::Run - creating buffer and scan"
<< ", tableId# " << tableId
<< ", txId# " << op->GetTxId());

THolder<IBuffer> buffer{exp->CreateBuffer()};
THolder<NTable::IScan> scan{CreateExportScan(std::move(buffer), createUploader)};

LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::Run - scan created, queueing"
<< ", tableId# " << tableId
<< ", localTableId# " << localTableId);

const auto& taskName = appData->DataShardConfig.GetBackupTaskName();
const auto taskPrio = appData->DataShardConfig.GetBackupTaskPriority();

Expand Down Expand Up @@ -113,13 +150,21 @@ class TBackupUnit : public TBackupRestoreUnitBase<TEvDataShard::TEvCancelBackup>
return op->HasScanResult();
}

bool ProcessResult(TOperation::TPtr op, const TActorContext&) override {
bool ProcessResult(TOperation::TPtr op, const TActorContext& ctx) override {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_ENSURE(tx, "cannot cast operation of kind " << op->GetKind());

auto* result = CheckedCast<TExportScanProduct*>(op->ScanResult().Get());
bool done = true;

LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::ProcessResult"
<< ", txId# " << op->GetTxId()
<< ", outcome# " << static_cast<int>(result->Outcome)
<< ", error# " << result->Error
<< ", bytesRead# " << result->BytesRead
<< ", rowsRead# " << result->RowsRead);

switch (result->Outcome) {
case EExportOutcome::Success:
case EExportOutcome::Error:
Expand All @@ -128,18 +173,30 @@ class TBackupUnit : public TBackupRestoreUnitBase<TEvDataShard::TEvCancelBackup>
schemeOp->Error = std::move(result->Error);
schemeOp->BytesProcessed = result->BytesRead;
schemeOp->RowsProcessed = result->RowsRead;
LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::ProcessResult - updated schemeOp"
<< ", txId# " << op->GetTxId()
<< ", success# " << schemeOp->Success);
} else {
Y_ENSURE(false, "Cannot find schema tx: " << op->GetTxId());
}
break;
case EExportOutcome::Aborted:
LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::ProcessResult - aborted"
<< ", txId# " << op->GetTxId());
done = false;
break;
}

op->SetScanResult(nullptr);
tx->SetScanTask(0);

LOG_INFO_S(ctx, NKikimrServices::DATASHARD_BACKUP,
"TBackupUnit::ProcessResult - done"
<< ", txId# " << op->GetTxId()
<< ", done# " << done);

return done;
}

Expand Down
31 changes: 31 additions & 0 deletions ydb/core/tx/datashard/export_fs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include "export_iface.h"

namespace NKikimr {
namespace NDataShard {

class TFsExport: public IExport {
public:
explicit TFsExport(const TTask& task, const TTableColumns& columns)
: Task(task)
, Columns(columns)
{
Y_ENSURE(task.HasFSSettings());
}

IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override;

IBuffer* CreateBuffer() const override;

void Shutdown() const override {}

protected:
const TTask Task;
const TTableColumns Columns;
};

} // NDataShard
} // NKikimr


Loading
Loading