Skip to content

Commit 5e00a2f

Browse files
authored
Cherry-pick v0.9.x fixes to main (#754, #756, #758) (#773)
* Don't filter out identity columns as generated (#754) * Fix injector close not propagated (#756) * Fix views order in the pg_dump restore (#758) * Fix cherry-picked snapshot integration tests for main's options pattern The cherry-picked tests from v0.9.x used testPostgresProcessorCfg(url, bool) but main uses testPostgresProcessorCfg(opts ...option). Adapt the function signatures and callers to match main's pattern. * Fix cherry-picked views test to account for restoreSchemas step The "ok - with views" test from v0.9.x didn't include the schemaCreateDump as the first restore call, which exists on main via restoreSchemas(). Shift restore call indices by 1.
1 parent 9888095 commit 5e00a2f

10 files changed

Lines changed: 423 additions & 6 deletions

File tree

pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type dump struct {
7676
filtered []byte
7777
cleanupPart []byte
7878
indicesAndConstraints []byte
79+
views []byte
7980
sequences []string
8081
roles map[string]role
8182
eventTriggers []byte
@@ -240,9 +241,15 @@ func (s *SnapshotGenerator) CreateSnapshot(ctx context.Context, ss *snapshot.Sna
240241

241242
s.logger.Info("restoring schema indices and constraints", loglib.Fields{"schemaTables": ss.SchemaTables})
242243
if s.snapshotTracker != nil {
243-
return s.restoreIndicesWithTracking(ctx, dump.indicesAndConstraints)
244+
if err := s.restoreIndicesWithTracking(ctx, dump.indicesAndConstraints); err != nil {
245+
return err
246+
}
247+
} else if err := s.restoreDump(ctx, dump.indicesAndConstraints); err != nil {
248+
return err
244249
}
245-
return s.restoreDump(ctx, dump.indicesAndConstraints)
250+
251+
s.logger.Info("restoring views")
252+
return s.restoreDump(ctx, dump.views)
246253
}
247254

248255
func (s *SnapshotGenerator) Close() error {
@@ -290,6 +297,7 @@ func (s *SnapshotGenerator) dumpSchema(ctx context.Context, schemaTables map[str
290297

291298
s.dumpToFile(s.getDumpFileName("-filtered"), pgdumpOpts, parsedDump.filtered)
292299
s.dumpToFile(s.getDumpFileName("-indices-constraints"), pgdumpOpts, parsedDump.indicesAndConstraints)
300+
s.dumpToFile(s.getDumpFileName("-views"), pgdumpOpts, parsedDump.views)
293301

294302
// only if clean is enabled, produce the clean up part of the dump
295303
if s.optionGenerator.cleanTargetDB {
@@ -406,10 +414,12 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump {
406414
indicesAndConstraints := strings.Builder{}
407415
filteredDump := strings.Builder{}
408416
eventTriggersDump := strings.Builder{}
417+
viewsDump := strings.Builder{}
409418
sequenceNames := []string{}
410419
dumpRoles := make(map[string]role)
411420
alterTable := ""
412421
createEventTrigger := ""
422+
createView := ""
413423
for scanner.Scan() {
414424
line := scanner.Text()
415425
switch {
@@ -446,11 +456,27 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump {
446456
eventTriggersDump.WriteString(line)
447457
eventTriggersDump.WriteString("\n")
448458

459+
case strings.HasPrefix(line, "CREATE VIEW"),
460+
strings.HasPrefix(line, "CREATE MATERIALIZED VIEW"):
461+
createView = line
462+
fallthrough
463+
case createView != "":
464+
if strings.HasSuffix(line, ";") {
465+
viewsDump.WriteString(line)
466+
viewsDump.WriteString("\n\n")
467+
createView = ""
468+
continue
469+
}
470+
viewsDump.WriteString(line)
471+
viewsDump.WriteString("\n")
472+
449473
case strings.Contains(line, `\connect`):
450474
indicesAndConstraints.WriteString(line)
451475
indicesAndConstraints.WriteString("\n\n")
452476
filteredDump.WriteString(line)
453477
filteredDump.WriteString("\n")
478+
viewsDump.WriteString(line)
479+
viewsDump.WriteString("\n\n")
454480
case strings.HasPrefix(line, "CREATE INDEX"),
455481
strings.HasPrefix(line, "CREATE UNIQUE INDEX"),
456482
strings.HasPrefix(line, "CREATE CONSTRAINT"),
@@ -515,6 +541,7 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump {
515541
full: d,
516542
filtered: []byte(filteredDump.String()),
517543
indicesAndConstraints: []byte(indicesAndConstraints.String()),
544+
views: []byte(viewsDump.String()),
518545
sequences: sequenceNames,
519546
roles: dumpRoles,
520547
eventTriggers: []byte(eventTriggersDump.String()),

pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ func TestSnapshotGenerator_CreateSnapshot(t *testing.T) {
2323
t.Parallel()
2424

2525
schemaDump := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE INDEX a;\n")
26+
schemaDumpWithViews := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE VIEW public.test_view AS\n SELECT 1;\nCREATE INDEX a;\n")
2627
schemaDumpNoSequences := []byte("schema dump\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE INDEX a;\n")
2728
filteredDumpNoSequences := []byte("schema dump\nGRANT ALL ON SCHEMA \"public\" TO \"test_role\";\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\n")
2829
filteredDump := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nGRANT ALL ON SCHEMA \"public\" TO \"test_role\";\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\n")
2930
sequenceDump := []byte("sequence dump\n")
3031
indexDump := []byte("CREATE INDEX a;\n\n")
32+
testViewsDump := []byte("CREATE VIEW public.test_view AS\n SELECT 1;\n\n")
3133
rolesDumpOriginal := []byte("roles dump\nCREATE ROLE postgres\nCREATE ROLE test_role\nCREATE ROLE test_role2\nALTER ROLE test_role3 INHERIT FROM test_role;\n")
3234
rolesDumpFiltered := []byte("roles dump\nCREATE ROLE test_role\nCREATE ROLE test_role2\nGRANT \"test_role\" TO CURRENT_USER;\n")
3335
cleanupDump := []byte("cleanup dump\n")
@@ -281,6 +283,54 @@ func TestSnapshotGenerator_CreateSnapshot(t *testing.T) {
281283

282284
wantErr: nil,
283285
},
286+
{
287+
name: "ok - with views",
288+
snapshot: &snapshot.Snapshot{
289+
SchemaTables: map[string][]string{
290+
testSchema: {testTable},
291+
},
292+
},
293+
conn: validQuerier(),
294+
pgdumpFn: newMockPgdump(func(_ context.Context, i uint, po pglib.PGDumpOptions) ([]byte, error) {
295+
switch i {
296+
case 1:
297+
return schemaDumpWithViews, nil
298+
case 2:
299+
return sequenceDump, nil
300+
default:
301+
return nil, fmt.Errorf("unexpected call to pgdumpFn: %d", i)
302+
}
303+
}),
304+
pgdumpallFn: newMockPgdumpall(func(_ context.Context, i uint, po pglib.PGDumpAllOptions) ([]byte, error) {
305+
switch i {
306+
case 1:
307+
return rolesDumpOriginal, nil
308+
default:
309+
return nil, fmt.Errorf("unexpected call to pgdumpallFn: %d", i)
310+
}
311+
}),
312+
pgrestoreFn: newMockPgrestore(func(_ context.Context, i uint, po pglib.PGRestoreOptions, dump []byte) (string, error) {
313+
switch i {
314+
case 1:
315+
require.Equal(t, string(schemaCreateDump), string(dump))
316+
case 2:
317+
require.Equal(t, string(rolesDumpFiltered), string(dump))
318+
case 3:
319+
require.Equal(t, string(filteredDump), string(dump))
320+
case 4:
321+
require.Equal(t, string(sequenceDump), string(dump))
322+
case 5:
323+
require.Equal(t, string(indexDump), string(dump))
324+
case 6:
325+
require.Equal(t, string(testViewsDump), string(dump))
326+
default:
327+
return "", fmt.Errorf("unexpected call to pgrestoreFn: %d", i)
328+
}
329+
return "", nil
330+
}),
331+
332+
wantErr: nil,
333+
},
284334
{
285335
name: "ok - no sequence dump",
286336
snapshot: &snapshot.Snapshot{
@@ -1237,6 +1287,9 @@ func TestSnapshotGenerator_parseDump(t *testing.T) {
12371287
wantEventTriggersBytes, err := os.ReadFile("test/test_dump_event_triggers.sql")
12381288
require.NoError(t, err)
12391289

1290+
wantViewsBytes, err := os.ReadFile("test/test_dump_views.sql")
1291+
require.NoError(t, err)
1292+
12401293
sg := &SnapshotGenerator{
12411294
excludedSecurityLabels: []string{"anon"},
12421295
}
@@ -1248,12 +1301,15 @@ func TestSnapshotGenerator_parseDump(t *testing.T) {
12481301
wantConstraintsStr := strings.Trim(string(wantConstraintsBytes), "\n")
12491302
eventTriggersStr := strings.Trim(string(dump.eventTriggers), "\n")
12501303
wantEventTriggersStr := strings.Trim(string(wantEventTriggersBytes), "\n")
1304+
viewsStr := strings.Trim(string(dump.views), "\n")
1305+
wantViewsStr := strings.Trim(string(wantViewsBytes), "\n")
12511306
wantSequences := []string{`"musicbrainz"."alternative_medium_id_seq"`, `"musicbrainz"."Alternative_medium_id_seq"`}
12521307

12531308
require.Equal(t, wantFilteredStr, filteredStr)
12541309
require.Equal(t, wantConstraintsStr, constraintsStr)
12551310
require.Equal(t, wantSequences, dump.sequences)
12561311
require.Equal(t, wantEventTriggersStr, eventTriggersStr)
1312+
require.Equal(t, wantViewsStr, viewsStr)
12571313
}
12581314

12591315
func TestGetDumpsDiff(t *testing.T) {

pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump.sql

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,37 @@ ALTER TABLE ONLY musicbrainz.alternative_medium ALTER COLUMN id SET DEFAULT next
251251
ALTER TABLE ONLY musicbrainz.alternative_release ALTER COLUMN id SET DEFAULT nextval('musicbrainz.alternative_release_id_seq'::regclass);
252252

253253

254+
--
255+
-- Name: edit_summary; Type: VIEW; Schema: musicbrainz; Owner: postgres
256+
--
257+
258+
CREATE VIEW musicbrainz.edit_summary AS
259+
SELECT e.id,
260+
e.editor,
261+
e.type,
262+
e.status
263+
FROM musicbrainz.edit e
264+
GROUP BY e.id;
265+
266+
267+
ALTER VIEW musicbrainz.edit_summary OWNER TO postgres;
268+
269+
--
270+
-- Name: medium_summary; Type: MATERIALIZED VIEW; Schema: musicbrainz; Owner: postgres
271+
--
272+
273+
CREATE MATERIALIZED VIEW musicbrainz.medium_summary AS
274+
SELECT am.id,
275+
am.medium,
276+
am.alternative_release,
277+
am.name
278+
FROM musicbrainz.alternative_medium am
279+
GROUP BY am.id
280+
WITH NO DATA;
281+
282+
283+
ALTER MATERIALIZED VIEW musicbrainz.medium_summary OWNER TO postgres;
284+
254285
--
255286
-- Name: alternative_medium alternative_medium_pkey; Type: CONSTRAINT; Schema: musicbrainz; Owner: postgres
256287
--

pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_filtered.sql

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,20 @@ ALTER TABLE ONLY musicbrainz.alternative_medium ALTER COLUMN id SET DEFAULT next
240240
ALTER TABLE ONLY musicbrainz.alternative_release ALTER COLUMN id SET DEFAULT nextval('musicbrainz.alternative_release_id_seq'::regclass);
241241

242242

243+
--
244+
-- Name: edit_summary; Type: VIEW; Schema: musicbrainz; Owner: postgres
245+
--
246+
247+
248+
249+
250+
--
251+
-- Name: medium_summary; Type: MATERIALIZED VIEW; Schema: musicbrainz; Owner: postgres
252+
--
253+
254+
255+
256+
243257
--
244258
-- Name: alternative_medium alternative_medium_pkey; Type: CONSTRAINT; Schema: musicbrainz; Owner: postgres
245259
--
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
\connect test
2+
3+
CREATE VIEW musicbrainz.edit_summary AS
4+
SELECT e.id,
5+
e.editor,
6+
e.type,
7+
e.status
8+
FROM musicbrainz.edit e
9+
GROUP BY e.id;
10+
11+
CREATE MATERIALIZED VIEW musicbrainz.medium_summary AS
12+
SELECT am.id,
13+
am.medium,
14+
am.alternative_release,
15+
am.name
16+
FROM musicbrainz.alternative_medium am
17+
GROUP BY am.id
18+
WITH NO DATA;

pkg/stream/integration/pg_pg_integration_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ type testTableColumn struct {
2727
username string
2828
}
2929

30+
type idNameRow struct {
31+
id int
32+
name string
33+
}
34+
3035
func Test_PostgresToPostgres(t *testing.T) {
3136
if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" {
3237
t.Skip("skipping integration test...")
@@ -807,6 +812,38 @@ func getTestTableColumns(t *testing.T, ctx context.Context, conn pglib.Querier,
807812
return columns
808813
}
809814

815+
func getIDRows(t *testing.T, ctx context.Context, conn pglib.Querier, query string) []int {
816+
rows, err := conn.Query(ctx, query)
817+
require.NoError(t, err)
818+
defer rows.Close()
819+
820+
var ids []int
821+
for rows.Next() {
822+
var id int
823+
err := rows.Scan(&id)
824+
require.NoError(t, err)
825+
ids = append(ids, id)
826+
}
827+
require.NoError(t, rows.Err())
828+
return ids
829+
}
830+
831+
func getIDNameRows(t *testing.T, ctx context.Context, conn pglib.Querier, query string) []idNameRow {
832+
rows, err := conn.Query(ctx, query)
833+
require.NoError(t, err)
834+
defer rows.Close()
835+
836+
var result []idNameRow
837+
for rows.Next() {
838+
var r idNameRow
839+
err := rows.Scan(&r.id, &r.name)
840+
require.NoError(t, err)
841+
result = append(result, r)
842+
}
843+
require.NoError(t, rows.Err())
844+
return result
845+
}
846+
810847
func getRoles(t *testing.T, ctx context.Context, conn pglib.Querier) []string {
811848
rows, err := conn.Query(ctx, "select rolname from pg_roles where rolname not like 'pg_%' and rolname <> 'postgres'")
812849
require.NoError(t, err)

0 commit comments

Comments
 (0)