Skip to content

Commit 9252ea5

Browse files
committed
fix copilot feedback
1 parent dee8a45 commit 9252ea5

3 files changed

Lines changed: 47 additions & 42 deletions

File tree

flow/connectors/postgres/postgres.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1317,7 +1317,7 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(
13171317
return fmt.Errorf("unknown type system %d", schemaDelta.System)
13181318
}
13191319

1320-
if addedColumn.Type == "numeric" && addedColumn.TypeModifier != -1 {
1320+
if strings.EqualFold(columnType, "numeric") && addedColumn.TypeModifier != -1 {
13211321
precision, scale := numeric.ParseNumericTypmod(addedColumn.TypeModifier)
13221322
columnType = fmt.Sprintf("numeric(%d,%d)", precision, scale)
13231323
}

flow/connectors/postgres/postgres_schema_delta_test.go

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,17 @@ var typeSystems = []protos.TypeSystem{protos.TypeSystem_Q, protos.TypeSystem_PG}
5454

5555
func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
5656
for _, system := range typeSystems {
57-
s.t.Run(system.String(), func(_ *testing.T) {
58-
s.testSimpleAddColumn(system)
57+
s.t.Run(system.String(), func(t *testing.T) {
58+
s.testSimpleAddColumn(t, system)
5959
})
6060
}
6161
}
6262

63-
func (s PostgresSchemaDeltaTestSuite) testSimpleAddColumn(system protos.TypeSystem) {
63+
func (s PostgresSchemaDeltaTestSuite) testSimpleAddColumn(t *testing.T, system protos.TypeSystem) {
6464
tableName := fmt.Sprintf("%s.simple_add_column_%s", s.schema, strings.ToLower(system.String()))
65-
_, err := s.connector.conn.Exec(s.t.Context(),
65+
_, err := s.connector.conn.Exec(t.Context(),
6666
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
67-
require.NoError(s.t, err)
67+
require.NoError(t, err)
6868

6969
addedColumns := fieldsForSystem([]*protos.FieldDescription{
7070
{
@@ -76,17 +76,17 @@ func (s PostgresSchemaDeltaTestSuite) testSimpleAddColumn(system protos.TypeSyst
7676
},
7777
}, system)
7878

79-
require.NoError(s.t, s.connector.ReplayTableSchemaDeltas(s.t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
79+
require.NoError(t, s.connector.ReplayTableSchemaDeltas(t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
8080
SrcTableName: tableName,
8181
DstTableName: tableName,
8282
AddedColumns: addedColumns,
8383
System: system,
8484
}}))
8585

86-
output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, system,
86+
output, err := s.connector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, system,
8787
[]*protos.TableMapping{{SourceTableIdentifier: tableName}})
88-
require.NoError(s.t, err)
89-
require.NotEqual(s.t, 0, output[tableName].TableOid)
88+
require.NoError(t, err)
89+
require.NotEqual(t, 0, output[tableName].TableOid)
9090
output[tableName].TableOid = 0 // zero out TableOid for comparison
9191

9292
expectedColumns := fieldsForSystem([]*protos.FieldDescription{
@@ -105,7 +105,7 @@ func (s PostgresSchemaDeltaTestSuite) testSimpleAddColumn(system protos.TypeSyst
105105
},
106106
}, system)
107107

108-
require.Equal(s.t, &protos.TableSchema{
108+
require.Equal(t, &protos.TableSchema{
109109
TableIdentifier: tableName,
110110
PrimaryKeyColumns: []string{"id"},
111111
System: system,
@@ -115,17 +115,17 @@ func (s PostgresSchemaDeltaTestSuite) testSimpleAddColumn(system protos.TypeSyst
115115

116116
func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
117117
for _, system := range typeSystems {
118-
s.t.Run(system.String(), func(_ *testing.T) {
119-
s.testAddAllColumnTypes(system)
118+
s.t.Run(system.String(), func(t *testing.T) {
119+
s.testAddAllColumnTypes(t, system)
120120
})
121121
}
122122
}
123123

124-
func (s PostgresSchemaDeltaTestSuite) testAddAllColumnTypes(system protos.TypeSystem) {
124+
func (s PostgresSchemaDeltaTestSuite) testAddAllColumnTypes(t *testing.T, system protos.TypeSystem) {
125125
tableName := fmt.Sprintf("%s.add_drop_all_column_types_%s", s.schema, strings.ToLower(system.String()))
126-
_, err := s.connector.conn.Exec(s.t.Context(),
126+
_, err := s.connector.conn.Exec(t.Context(),
127127
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
128-
require.NoError(s.t, err)
128+
require.NoError(t, err)
129129

130130
fields := fieldsForSystem(AddAllColumnTypesFields, system)
131131
expectedTableSchema := &protos.TableSchema{
@@ -141,34 +141,34 @@ func (s PostgresSchemaDeltaTestSuite) testAddAllColumnTypes(system protos.TypeSy
141141
}
142142
}
143143

144-
require.NoError(s.t, s.connector.ReplayTableSchemaDeltas(s.t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
144+
require.NoError(t, s.connector.ReplayTableSchemaDeltas(t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
145145
SrcTableName: tableName,
146146
DstTableName: tableName,
147147
AddedColumns: addedColumns,
148148
System: system,
149149
}}))
150150

151-
output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, system,
151+
output, err := s.connector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, system,
152152
[]*protos.TableMapping{{SourceTableIdentifier: tableName}})
153-
require.NoError(s.t, err)
154-
require.NotEqual(s.t, 0, output[tableName].TableOid)
153+
require.NoError(t, err)
154+
require.NotEqual(t, 0, output[tableName].TableOid)
155155
output[tableName].TableOid = 0 // zero out TableOid for comparison
156-
require.Equal(s.t, expectedTableSchema, output[tableName])
156+
require.Equal(t, expectedTableSchema, output[tableName])
157157
}
158158

159159
func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
160160
for _, system := range typeSystems {
161-
s.t.Run(system.String(), func(_ *testing.T) {
162-
s.testAddTrickyColumnNames(system)
161+
s.t.Run(system.String(), func(t *testing.T) {
162+
s.testAddTrickyColumnNames(t, system)
163163
})
164164
}
165165
}
166166

167-
func (s PostgresSchemaDeltaTestSuite) testAddTrickyColumnNames(system protos.TypeSystem) {
167+
func (s PostgresSchemaDeltaTestSuite) testAddTrickyColumnNames(t *testing.T, system protos.TypeSystem) {
168168
tableName := fmt.Sprintf("%s.add_drop_tricky_column_names_%s", s.schema, strings.ToLower(system.String()))
169-
_, err := s.connector.conn.Exec(s.t.Context(),
169+
_, err := s.connector.conn.Exec(t.Context(),
170170
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
171-
require.NoError(s.t, err)
171+
require.NoError(t, err)
172172

173173
fields := fieldsForSystem(TrickyFields, system)
174174
expectedTableSchema := &protos.TableSchema{
@@ -184,34 +184,34 @@ func (s PostgresSchemaDeltaTestSuite) testAddTrickyColumnNames(system protos.Typ
184184
}
185185
}
186186

187-
require.NoError(s.t, s.connector.ReplayTableSchemaDeltas(s.t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
187+
require.NoError(t, s.connector.ReplayTableSchemaDeltas(t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
188188
SrcTableName: tableName,
189189
DstTableName: tableName,
190190
AddedColumns: addedColumns,
191191
System: system,
192192
}}))
193193

194-
output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, system,
194+
output, err := s.connector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, system,
195195
[]*protos.TableMapping{{SourceTableIdentifier: tableName}})
196-
require.NoError(s.t, err)
197-
require.NotEqual(s.t, 0, output[tableName].TableOid)
196+
require.NoError(t, err)
197+
require.NotEqual(t, 0, output[tableName].TableOid)
198198
output[tableName].TableOid = 0 // zero out TableOid for comparison
199-
require.Equal(s.t, expectedTableSchema, output[tableName])
199+
require.Equal(t, expectedTableSchema, output[tableName])
200200
}
201201

202202
func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
203203
for _, system := range typeSystems {
204-
s.t.Run(system.String(), func(_ *testing.T) {
205-
s.testAddDropWhitespaceColumnNames(system)
204+
s.t.Run(system.String(), func(t *testing.T) {
205+
s.testAddDropWhitespaceColumnNames(t, system)
206206
})
207207
}
208208
}
209209

210-
func (s PostgresSchemaDeltaTestSuite) testAddDropWhitespaceColumnNames(system protos.TypeSystem) {
210+
func (s PostgresSchemaDeltaTestSuite) testAddDropWhitespaceColumnNames(t *testing.T, system protos.TypeSystem) {
211211
tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names_%s", s.schema, strings.ToLower(system.String()))
212-
_, err := s.connector.conn.Exec(s.t.Context(),
212+
_, err := s.connector.conn.Exec(t.Context(),
213213
fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName))
214-
require.NoError(s.t, err)
214+
require.NoError(t, err)
215215

216216
fields := fieldsForSystem(WhitespaceFields, system)
217217
expectedTableSchema := &protos.TableSchema{
@@ -227,19 +227,19 @@ func (s PostgresSchemaDeltaTestSuite) testAddDropWhitespaceColumnNames(system pr
227227
}
228228
}
229229

230-
require.NoError(s.t, s.connector.ReplayTableSchemaDeltas(s.t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
230+
require.NoError(t, s.connector.ReplayTableSchemaDeltas(t.Context(), nil, "schema_delta_flow", nil, []*protos.TableSchemaDelta{{
231231
SrcTableName: tableName,
232232
DstTableName: tableName,
233233
AddedColumns: addedColumns,
234234
System: system,
235235
}}))
236236

237-
output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, system,
237+
output, err := s.connector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, system,
238238
[]*protos.TableMapping{{SourceTableIdentifier: tableName}})
239-
require.NoError(s.t, err)
240-
require.NotEqual(s.t, 0, output[tableName].TableOid)
239+
require.NoError(t, err)
240+
require.NotEqual(t, 0, output[tableName].TableOid)
241241
output[tableName].TableOid = 0 // zero out TableOid for comparison
242-
require.Equal(s.t, expectedTableSchema, output[tableName])
242+
require.Equal(t, expectedTableSchema, output[tableName])
243243
}
244244

245245
func TestPostgresSchemaDeltaTestSuite(t *testing.T) {

flow/connectors/postgres/schema_delta_test_constants.go renamed to flow/connectors/postgres/schema_delta_test_constants_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package connpostgres
22

33
import (
4+
"fmt"
5+
46
"github.com/PeerDB-io/peerdb/flow/generated/protos"
57
numeric "github.com/PeerDB-io/peerdb/flow/shared/datatypes"
68
"github.com/PeerDB-io/peerdb/flow/shared/types"
@@ -31,7 +33,10 @@ func fieldsForSystem(qFields []*protos.FieldDescription, system protos.TypeSyste
3133
}
3234
result := make([]*protos.FieldDescription, len(qFields))
3335
for i, f := range qFields {
34-
pgType := qValueKindToPgName[f.Type]
36+
pgType, ok := qValueKindToPgName[f.Type]
37+
if !ok {
38+
panic(fmt.Sprintf("no PG type mapping for QValueKind %q", f.Type))
39+
}
3540
result[i] = &protos.FieldDescription{
3641
Name: f.Name,
3742
Type: pgType,

0 commit comments

Comments
 (0)