-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathpostgres.go
More file actions
325 lines (311 loc) · 9.36 KB
/
postgres.go
File metadata and controls
325 lines (311 loc) · 9.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
package postgres
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"github.com/smartcontractkit/chainlink-testing-framework/framework/pods"
"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go"
tcwait "github.com/testcontainers/testcontainers-go/wait"
"github.com/smartcontractkit/chainlink-testing-framework/framework"
)
const (
User = "chainlink"
Password = "thispasswordislongenough"
Port = "5432"
ExposedStaticPort = 13000
Database = "chainlink"
JDDatabase = "job-distributor-db"
DBVolumeName = "postgresql_data"
)
type Input struct {
// Image PostgreSQL Docker image in format: $registry:$tag
Image string `toml:"image" validate:"required" comment:"PostgreSQL Docker image in format: $registry:$tag"`
// Port PostgreSQL connection port
Port int `toml:"port" comment:"PostgreSQL connection port"`
// Name PostgreSQL container name
Name string `toml:"name" comment:"PostgreSQL container name"`
// VolumeName PostgreSQL Docker volume name
VolumeName string `toml:"volume_name" comment:"PostgreSQL docker volume name"`
// Databases number of pre-created databases for Chainlink nodes
Databases int `toml:"databases" comment:"Number of pre-created databases for Chainlink nodes"`
// JDDatabase whether to create JobDistributor database or not
JDDatabase bool `toml:"jd_database" comment:"Whether to create JobDistributor database or not"`
// JDSQLDumpPath JobDistributor SQL dump path to load
JDSQLDumpPath string `toml:"jd_sql_dump_path" comment:"JobDistributor database dump path to load"`
// PullImage whether to pull PostgreSQL image or not
PullImage bool `toml:"pull_image" comment:"Whether to pull PostgreSQL image or not"`
// ContainerResources Docker container resources
ContainerResources *framework.ContainerResources `toml:"resources" comment:"Docker container resources"`
// Out PostgreSQL config output
Out *Output `toml:"out" comment:"PostgreSQL config output"`
}
type Output struct {
// Url PostgreSQL connection Url
Url string `toml:"url" comment:"PostgreSQL connection URL"`
// ContainerName PostgreSQL Docker container name
ContainerName string `toml:"container_name" comment:"Docker container name"`
// InternalURL PostgreSQL internal connection URL
InternalURL string `toml:"internal_url" comment:"PostgreSQL internal connection URL"`
// JDUrl PostgreSQL external connection URL to JobDistributor database
JDUrl string `toml:"jd_url" comment:"PostgreSQL internal connection URL to JobDistributor database"`
// JDInternalURL PostgreSQL internal connection URL to JobDistributor database
JDInternalURL string `toml:"jd_internal_url" comment:"PostgreSQL internal connection URL to JobDistributor database"`
// K8sService is a Kubernetes service spec used to connect locally
K8sService *v1.Service `toml:"k8s_service" comment:"Kubernetes service spec used to connect locally"`
}
func NewPostgreSQL(in *Input) (*Output, error) {
return NewWithContext(context.Background(), in)
}
func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
bindPort := fmt.Sprintf("%s/tcp", Port)
var containerName string
if in.Name == "" {
containerName = "ns-postgresql"
} else {
containerName = in.Name
}
var sqlCommands []string
for i := 0; i <= in.Databases; i++ {
sqlCommands = append(sqlCommands,
fmt.Sprintf("CREATE DATABASE db_%d;", i),
fmt.Sprintf("\\c db_%d", i),
"CREATE EXTENSION pg_stat_statements;",
)
}
sqlCommands = append(sqlCommands, "ALTER USER chainlink WITH SUPERUSER;")
if in.JDDatabase {
if in.JDSQLDumpPath != "" {
// if we have a full dump we replace RDS specific commands and apply it creating db and filling the tables
d, err := os.ReadFile(in.JDSQLDumpPath)
if err != nil {
return nil, fmt.Errorf("error reading JD dump file '%s': %v", in.JDSQLDumpPath, err)
}
// transaction_timeout is a custom RDS instruction, we must replace it
sqlMigration := strings.Replace(string(d), "SET transaction_timeout = 0;", "", -1)
sqlCommands = append(sqlCommands, sqlMigration)
sqlCommands = append(sqlCommands, "DELETE FROM public.csa_keypairs where id = 1;")
} else {
// if we don't have a dump we create an empty DB
sqlCommands = append(sqlCommands, fmt.Sprintf("CREATE DATABASE \"%s\";", JDDatabase))
}
}
initSQL := strings.Join(sqlCommands, "\n")
initFile, err := os.CreateTemp("", "init-*.sql")
if err != nil {
return nil, err
}
if _, err := initFile.WriteString(initSQL); err != nil {
return nil, err
}
if err := initFile.Close(); err != nil {
return nil, err
}
var portToExpose int
if in.Port != 0 {
portToExpose = in.Port
} else {
portToExpose = ExposedStaticPort
}
var o *Output
// k8s deployment
if pods.K8sEnabled() {
_, svc, err := pods.Run(ctx, &pods.Config{
Pods: []*pods.PodConfig{
{
Name: pods.Ptr(in.Name),
Image: pods.Ptr(in.Image),
Ports: []string{fmt.Sprintf("%d:%s", portToExpose, Port)},
Env: []v1.EnvVar{
{
Name: "POSTGRES_USER",
Value: User,
},
{
Name: "POSTGRES_PASSWORD",
Value: Password,
},
{
Name: "POSTGRES_DB",
Value: Database,
},
},
Requests: pods.ResourcesMedium(),
Limits: pods.ResourcesMedium(),
// container and pod security settings are specific to
// 'postgres' Docker image
ContainerSecurityContext: &v1.SecurityContext{
RunAsUser: pods.Ptr[int64](999),
RunAsGroup: pods.Ptr[int64](999),
},
PodSecurityContext: &v1.PodSecurityContext{
FSGroup: pods.Ptr[int64](999),
},
ConfigMap: map[string]string{
"init.sql": initSQL,
},
ConfigMapMountPath: map[string]string{
"init.sql": "/docker-entrypoint-initdb.d/init.sql",
},
VolumeClaimTemplates: pods.SizedVolumeClaim("4Gi"),
},
},
})
if err != nil {
return nil, err
}
o = &Output{
K8sService: svc,
ContainerName: containerName,
InternalURL: fmt.Sprintf(
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
User,
Password,
fmt.Sprintf("%s-svc", in.Name),
// use svc internally too
portToExpose,
Database,
),
Url: fmt.Sprintf(
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
User,
Password,
"localhost",
portToExpose,
Database,
),
}
if in.JDDatabase {
o.JDInternalURL = fmt.Sprintf(
"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
User,
Password,
fmt.Sprintf("%s-svc", in.Name),
Port,
JDDatabase,
)
o.JDUrl = fmt.Sprintf(
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
User,
Password,
"localhost",
portToExpose,
JDDatabase,
)
}
return o, nil
}
if err := framework.DefaultNetwork(nil); err != nil {
return nil, fmt.Errorf("failed to ensure default docker network %q: %w", framework.DefaultNetworkName, err)
}
// local deployment
req := testcontainers.ContainerRequest{
AlwaysPullImage: in.PullImage,
Image: in.Image,
Name: containerName,
Labels: framework.DefaultTCLabels(),
ExposedPorts: []string{bindPort},
Networks: []string{framework.DefaultNetworkName},
NetworkAliases: map[string][]string{
framework.DefaultNetworkName: {containerName},
},
Env: map[string]string{
"POSTGRES_USER": User,
"POSTGRES_PASSWORD": Password,
"POSTGRES_DB": Database,
},
Cmd: []string{
"postgres", "-c",
fmt.Sprintf("port=%s", Port),
"-c", "shared_preload_libraries=pg_stat_statements",
"-c", "pg_stat_statements.track=all",
},
Files: []testcontainers.ContainerFile{
{
HostFilePath: initFile.Name(),
ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql",
FileMode: 0o644,
},
},
Mounts: testcontainers.ContainerMounts{
{
Source: testcontainers.GenericVolumeMountSource{
Name: fmt.Sprintf("%s%s", DBVolumeName, in.VolumeName),
},
Target: "/var/lib/postgresql/data",
},
},
WaitingFor: tcwait.ForExec([]string{
"psql", "-h", "127.0.0.1",
"-U", User, "-p", Port, "-c", "select", "1", "-d", Database,
}).
WithStartupTimeout(3 * time.Minute).
WithPollInterval(200 * time.Millisecond),
}
req.HostConfigModifier = func(h *container.HostConfig) {
h.PortBindings = nat.PortMap{
nat.Port(bindPort): []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostPort: strconv.Itoa(portToExpose),
},
},
}
framework.ResourceLimitsFunc(h, in.ContainerResources)
}
c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
Reuse: true,
})
if err != nil {
return nil, err
}
host, err := framework.GetHostWithContext(ctx, c)
if err != nil {
return nil, err
}
o = &Output{
ContainerName: containerName,
InternalURL: fmt.Sprintf(
"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
User,
Password,
containerName,
Port,
Database,
),
Url: fmt.Sprintf(
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
User,
Password,
host,
portToExpose,
Database,
),
}
if in.JDDatabase {
o.JDInternalURL = fmt.Sprintf(
"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
User,
Password,
containerName,
Port,
JDDatabase,
)
o.JDUrl = fmt.Sprintf(
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
User,
Password,
host,
portToExpose,
JDDatabase,
)
}
return o, nil
}