Skip to content

Commit 2939f87

Browse files
committed
added --ddl-only feature
1 parent 37bf4be commit 2939f87

12 files changed

Lines changed: 311 additions & 57 deletions

File tree

Cargo.lock

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pgcopy"
3-
version = "0.1.2"
3+
version = "0.1.3"
44
edition = "2024"
55
description = "CLI tool to export selected PostgreSQL objects into a bundle and import them into another database."
66
authors = ["hexqnt <kvover@gmail.org>"]

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
```bash
2727
pgcopy export --config <path/to/config.toml> --out <path/to/bundle> [--concurrency N] [--quiet] [--no-progress]
28-
pgcopy import --in <path/to/bundle> [--mode replace|append] [--concurrency N] [--quiet] [--no-progress]
28+
pgcopy import --in <path/to/bundle> [--mode replace|append] [--concurrency N] [--ddl-only] [--quiet] [--no-progress]
2929
pgcopy info --in <path/to/bundle> [--format text|json] [--objects] [--quiet]
3030
```
3131

@@ -35,6 +35,7 @@ pgcopy info --in <path/to/bundle> [--format text|json] [--objects] [--quiet]
3535
Операция выполняется атомарно на уровне объекта (`BEGIN/COMMIT`): при ошибке во время `replace`
3636
изменения по этому объекту откатываются (`ROLLBACK`).
3737
- `append`: если таблица есть, проверяет совместимость и дозаписывает данные.
38+
- `--ddl-only`: выполняет только DDL-часть импорта (создание/подготовка таблиц), без загрузки данных.
3839

3940
Поддерживается также алиас `--concurency` (без второй `r`) для `--concurrency`.
4041

src/export.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,20 @@ pub async fn run(
8585
};
8686

8787
progress.set_bundle_running(out_path);
88-
let write_result =
89-
bundle_io::write_bundle(scratch.path(), out_path, &manifest, password.as_deref());
88+
let bundle_scratch_path = scratch.path().to_path_buf();
89+
let bundle_out_path = out_path.to_path_buf();
90+
let bundle_password = password;
91+
let bundle_manifest = manifest;
92+
let write_result = tokio::task::spawn_blocking(move || {
93+
bundle_io::write_bundle(
94+
&bundle_scratch_path,
95+
&bundle_out_path,
96+
&bundle_manifest,
97+
bundle_password.as_deref(),
98+
)
99+
})
100+
.await
101+
.context("bundle writer task failed")?;
90102
match write_result {
91103
Ok(()) => {
92104
progress.finish_bundle_done(out_path);

src/importer.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub async fn run(
3030
bundle_path: &Path,
3131
mode: ImportMode,
3232
concurrency: usize,
33+
ddl_only: bool,
3334
bundle_password: Option<&str>,
3435
target_config: tokio_postgres::Config,
3536
progress_enabled: bool,
@@ -40,15 +41,21 @@ pub async fn run(
4041

4142
let access = bundle_io::resolve_access(bundle_path, bundle_password)?;
4243
let client = pg::connect(&target_config).await?;
43-
let target_version_num = pg::server_version_num(&client).await?;
44+
let target_version_num = if ddl_only {
45+
0
46+
} else {
47+
pg::server_version_num(&client).await?
48+
};
4449

45-
if concurrency == 1 {
50+
// DDL-only всегда выполняем потоково: это избегает дорогой распаковки data/* на диск.
51+
if ddl_only || concurrency == 1 {
4652
stream::import_objects_streaming(
4753
bundle_path,
4854
access.password.as_deref(),
4955
access.is_encrypted,
5056
&client,
5157
mode,
58+
ddl_only,
5259
target_version_num,
5360
progress_enabled,
5461
)
@@ -57,12 +64,20 @@ pub async fn run(
5764
}
5865

5966
let scratch = tempfile::tempdir().context("failed to create temporary directory for import")?;
60-
bundle_io::unpack_bundle(
61-
bundle_path,
62-
scratch.path(),
63-
access.password.as_deref(),
64-
access.is_encrypted,
65-
)?;
67+
let unpack_bundle_path = bundle_path.to_path_buf();
68+
let unpack_scratch_path = scratch.path().to_path_buf();
69+
let unpack_password = access.password;
70+
let unpack_is_encrypted = access.is_encrypted;
71+
tokio::task::spawn_blocking(move || {
72+
bundle_io::unpack_bundle(
73+
&unpack_bundle_path,
74+
&unpack_scratch_path,
75+
unpack_password.as_deref(),
76+
unpack_is_encrypted,
77+
)
78+
})
79+
.await
80+
.context("parallel import bundle unpack task failed")??;
6681

6782
let manifest = bundle_io::read_manifest_from_dir(scratch.path())?;
6883
compat::validate_data_compatibility(&manifest, target_version_num)?;
@@ -72,6 +87,7 @@ pub async fn run(
7287
&manifest,
7388
mode,
7489
concurrency,
90+
ddl_only,
7591
progress_enabled,
7692
)
7793
.await?;

src/importer/compat.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,62 @@ pub(super) async fn validate_existing_table_compatibility(
8484

8585
Ok(())
8686
}
87+
88+
#[cfg(test)]
89+
mod tests {
90+
use super::validate_data_compatibility;
91+
use crate::manifest::{Manifest, ManifestObject};
92+
use crate::types::DataFormat;
93+
94+
fn manifest_for_compat(data_format: DataFormat, source_pg_version_num: i32) -> Manifest {
95+
Manifest {
96+
format_version: 1,
97+
created_at: "2026-02-19T10:00:00Z".to_owned(),
98+
source_fingerprint: Some("database=app user=app".to_owned()),
99+
source_pg_version_num,
100+
data_format,
101+
consistent_snapshot: true,
102+
objects: vec![ManifestObject {
103+
kind: "table".to_owned(),
104+
source_schema: "public".to_owned(),
105+
source_name: "orders".to_owned(),
106+
target_schema: "archive".to_owned(),
107+
target_name: "orders".to_owned(),
108+
source_select: "select * from public.orders".to_owned(),
109+
normalized_select: "SELECT \"id\" FROM \"public\".\"orders\"".to_owned(),
110+
ddl_path: "ddl/0001__public.orders.sql".to_owned(),
111+
data_path: "data/0001__public.orders.copybin".to_owned(),
112+
effective_columns: vec!["id".to_owned()],
113+
effective_column_types: vec!["bigint".to_owned()],
114+
column_projection: "*".to_owned(),
115+
row_estimate: Some(10),
116+
}],
117+
}
118+
}
119+
120+
#[test]
121+
fn binary_compatibility_passes_for_same_major() {
122+
let manifest = manifest_for_compat(DataFormat::Binary, 150002);
123+
validate_data_compatibility(&manifest, 150099)
124+
.expect("binary format should be compatible on same major version");
125+
}
126+
127+
#[test]
128+
fn binary_compatibility_fails_for_different_major() {
129+
let manifest = manifest_for_compat(DataFormat::Binary, 140012);
130+
let error = validate_data_compatibility(&manifest, 150001)
131+
.expect_err("binary format must fail across major versions");
132+
assert!(
133+
error
134+
.to_string()
135+
.contains("binary COPY compatibility check failed")
136+
);
137+
}
138+
139+
#[test]
140+
fn csv_compatibility_ignores_major_version_difference() {
141+
let manifest = manifest_for_compat(DataFormat::Csv, 140012);
142+
validate_data_compatibility(&manifest, 160003)
143+
.expect("csv format should ignore major version difference");
144+
}
145+
}

src/importer/copy_stream.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ pub async fn copy_data_in_reader<R: Read>(
7575

7676
let mut buffer = vec![0_u8; 64 * 1024];
7777
loop {
78-
let read = reader
79-
.read(&mut buffer)
78+
let read = tokio::task::block_in_place(|| reader.read(&mut buffer))
8079
.context("failed to read streaming data from bundle")?;
8180

8281
if read == 0 {

src/importer/load.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,26 @@ where
4949
}
5050
}
5151
}
52+
53+
/// Выполняет только DDL-подготовку целевого объекта без загрузки данных.
54+
pub(super) async fn prepare_object_ddl_only(
55+
client: &tokio_postgres::Client,
56+
object: &ManifestObject,
57+
mode: ImportMode,
58+
ddl_sql: &str,
59+
) -> Result<()> {
60+
match mode {
61+
ImportMode::Replace => {
62+
replace_tx::run_replace_atomically(client, object, async {
63+
target_table::prepare_target_table(client, object, ImportMode::Replace, ddl_sql)
64+
.await?;
65+
Ok(())
66+
})
67+
.await
68+
}
69+
ImportMode::Append => {
70+
target_table::prepare_target_table(client, object, ImportMode::Append, ddl_sql).await?;
71+
Ok(())
72+
}
73+
}
74+
}

src/importer/parallel.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub async fn import_objects_parallel(
2828
manifest: &Manifest,
2929
mode: ImportMode,
3030
concurrency: usize,
31+
ddl_only: bool,
3132
progress_enabled: bool,
3233
) -> Result<u64> {
3334
let data_format = manifest.data_format;
@@ -46,7 +47,15 @@ pub async fn import_objects_parallel(
4647
let scratch_dir = scratch_dir.to_path_buf();
4748
// Каждый worker использует отдельное соединение с target БД.
4849
workers.spawn(async move {
49-
import_worker(&target_config, &scratch_dir, tasks, mode, data_format).await
50+
import_worker(
51+
&target_config,
52+
&scratch_dir,
53+
tasks,
54+
mode,
55+
data_format,
56+
ddl_only,
57+
)
58+
.await
5059
});
5160
}
5261

@@ -84,6 +93,7 @@ async fn import_worker(
8493
tasks: Vec<(usize, ManifestObject)>,
8594
mode: ImportMode,
8695
data_format: DataFormat,
96+
ddl_only: bool,
8797
) -> ImportWorkerOutcome {
8898
let Some((first_index, first_object)) = tasks.first().cloned() else {
8999
return ImportWorkerOutcome {
@@ -112,7 +122,7 @@ async fn import_worker(
112122
};
113123

114124
for (index, object) in tasks {
115-
let imported = import_object(&client, scratch_dir, &object, mode, data_format)
125+
let imported = import_object(&client, scratch_dir, &object, mode, data_format, ddl_only)
116126
.await
117127
.with_context(|| {
118128
format!(
@@ -149,8 +159,14 @@ async fn import_object(
149159
object: &ManifestObject,
150160
mode: ImportMode,
151161
data_format: DataFormat,
162+
ddl_only: bool,
152163
) -> Result<u64> {
153164
let ddl_sql = read_ddl_from_bundle(scratch_dir, &object.ddl_path).await?;
165+
if ddl_only {
166+
load::prepare_object_ddl_only(client, object, mode, &ddl_sql).await?;
167+
return Ok(0);
168+
}
169+
154170
let data_path = scratch_dir.join(&object.data_path);
155171
let inserted_rows = load::load_object(client, object, mode, &ddl_sql, || async {
156172
copy_stream::copy_data_in_file(

0 commit comments

Comments
 (0)