Skip to content

Commit 1da3cda

Browse files
committed
added CheckConnection for postgres
1 parent 6e7ad11 commit 1da3cda

9 files changed

Lines changed: 472 additions & 82 deletions

File tree

go.mod

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@ module github.com/tkcrm/modules
33
go 1.23.0
44

55
require (
6-
github.com/cristalhq/aconfig v0.18.6
6+
github.com/cristalhq/aconfig v0.18.7
77
github.com/cristalhq/aconfig/aconfigdotenv v0.17.1
88
github.com/google/uuid v1.6.0
99
github.com/hibiken/asynq v0.25.1
10-
github.com/jackc/pgx/v5 v5.7.2
11-
github.com/nats-io/nats.go v1.39.1
12-
github.com/redis/go-redis/v9 v9.7.1
10+
github.com/jackc/pgx/v5 v5.7.5
11+
github.com/nats-io/nats.go v1.43.0
12+
github.com/redis/go-redis/v9 v9.9.0
1313
github.com/stretchr/testify v1.10.0
1414
github.com/uptrace/bun v1.2.11
1515
go.uber.org/zap v1.27.0
16-
golang.org/x/sync v0.12.0
16+
golang.org/x/sync v0.14.0
1717
)
1818

1919
require (
@@ -30,50 +30,53 @@ require (
3030
github.com/jackc/puddle/v2 v2.2.2 // indirect
3131
github.com/jinzhu/inflection v1.0.0 // indirect
3232
github.com/joho/godotenv v1.5.1 // indirect
33+
github.com/json-iterator/go v1.1.12 // indirect
3334
github.com/klauspost/compress v1.18.0 // indirect
3435
github.com/mattn/go-colorable v0.1.14 // indirect
3536
github.com/mattn/go-isatty v0.0.20 // indirect
3637
github.com/mitchellh/mapstructure v1.5.0 // indirect
38+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
39+
github.com/modern-go/reflect2 v1.0.2 // indirect
3740
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
38-
github.com/oklog/ulid/v2 v2.1.0 // indirect
41+
github.com/oklog/ulid/v2 v2.1.1 // indirect
3942
github.com/pkg/errors v0.9.1 // indirect
4043
github.com/pmezard/go-difflib v1.0.0 // indirect
41-
github.com/prometheus/client_model v0.6.1 // indirect
42-
github.com/prometheus/common v0.63.0 // indirect
43-
github.com/prometheus/procfs v0.16.0 // indirect
44+
github.com/prometheus/client_model v0.6.2 // indirect
45+
github.com/prometheus/common v0.64.0 // indirect
46+
github.com/prometheus/procfs v0.16.1 // indirect
4447
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
4548
github.com/robfig/cron/v3 v3.0.1 // indirect
46-
github.com/spf13/cast v1.7.1 // indirect
49+
github.com/spf13/cast v1.9.2 // indirect
4750
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
4851
github.com/vgarvardt/backoff v1.0.0 // indirect
4952
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
5053
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
51-
go.opentelemetry.io/otel v1.35.0 // indirect
52-
go.opentelemetry.io/otel/metric v1.35.0 // indirect
53-
go.opentelemetry.io/otel/trace v1.35.0 // indirect
54-
golang.org/x/sys v0.31.0 // indirect
55-
golang.org/x/text v0.23.0 // indirect
54+
go.opentelemetry.io/otel v1.36.0 // indirect
55+
go.opentelemetry.io/otel/metric v1.36.0 // indirect
56+
go.opentelemetry.io/otel/trace v1.36.0 // indirect
57+
golang.org/x/sys v0.33.0 // indirect
58+
golang.org/x/text v0.25.0 // indirect
5659
golang.org/x/time v0.11.0 // indirect
5760
gopkg.in/yaml.v3 v3.0.1 // indirect
5861
mellium.im/sasl v0.3.2 // indirect
5962
)
6063

6164
require (
62-
github.com/getsentry/sentry-go v0.31.1
65+
github.com/getsentry/sentry-go v0.33.0
6366
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
6467
github.com/gobeam/stringy v0.0.7
6568
github.com/goccy/go-json v0.10.5
66-
github.com/huandu/go-sqlbuilder v1.34.0
67-
github.com/nats-io/nkeys v0.4.10 // indirect
69+
github.com/huandu/go-sqlbuilder v1.35.0
70+
github.com/nats-io/nkeys v0.4.11 // indirect
6871
github.com/nats-io/nuid v1.0.1 // indirect
69-
github.com/prometheus/client_golang v1.21.1
70-
github.com/taosdata/driver-go/v3 v3.5.8
72+
github.com/prometheus/client_golang v1.22.0
73+
github.com/taosdata/driver-go/v3 v3.7.0
7174
github.com/ulule/limiter/v3 v3.11.2
7275
github.com/uptrace/bun/dialect/pgdialect v1.2.11
7376
github.com/uptrace/bun/driver/pgdriver v1.2.11
7477
github.com/uptrace/bun/extra/bundebug v1.2.11
75-
github.com/vgarvardt/gue/v5 v5.7.1
78+
github.com/vgarvardt/gue/v5 v5.8.0
7679
go.uber.org/multierr v1.11.0 // indirect
77-
golang.org/x/crypto v0.36.0 // indirect
78-
google.golang.org/protobuf v1.36.5
80+
golang.org/x/crypto v0.38.0 // indirect
81+
google.golang.org/protobuf v1.36.6
7982
)

go.sum

Lines changed: 51 additions & 47 deletions
Large diffs are not rendered by default.

pkg/db/postgres/logger.go

Lines changed: 0 additions & 5 deletions
This file was deleted.

pkg/db/postgres/postgres.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@ package postgres
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
"github.com/jackc/pgx/v5/pgxpool"
79
"github.com/tkcrm/modules/pkg/db/dbutils"
10+
"github.com/tkcrm/modules/pkg/retry"
811
)
912

1013
type PostgreSQL struct {
1114
DB *pgxpool.Pool
1215
cfg Config
1316
}
1417

15-
func New(ctx context.Context, cfg Config, logger logger) (*PostgreSQL, error) {
18+
func New(ctx context.Context, cfg Config) (*PostgreSQL, error) {
1619
instance := &PostgreSQL{
1720
cfg: cfg,
1821
}
@@ -33,13 +36,20 @@ func New(ctx context.Context, cfg Config, logger logger) (*PostgreSQL, error) {
3336
return nil, err
3437
}
3538

36-
if err := pool.Ping(ctx); err != nil {
37-
return nil, err
38-
}
39-
40-
logger.Info("successfully connected to postgres")
41-
4239
instance.DB = pool
4340

4441
return instance, nil
4542
}
43+
44+
// CheckConnection checks if the PostgreSQL connection pool is initialized and attempts to ping the database.
45+
// It retries the ping operation up to 5 times with a 2-second delay between attempts.
46+
// If the connection pool is not initialized, it returns an error.
47+
func (p *PostgreSQL) CheckConnection(ctx context.Context) error {
48+
if p.DB == nil {
49+
return fmt.Errorf("PostgreSQL connection pool is not initialized")
50+
}
51+
52+
return retry.New(retry.WithDelay(time.Second*2), retry.WithMaxAttempts(5)).Do(func() error {
53+
return p.DB.Ping(ctx)
54+
})
55+
}

pkg/retry/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package retry
2+
3+
import "errors"
4+
5+
var (
6+
ErrRetry = errors.New("retry")
7+
ErrExit = errors.New("exit")
8+
)

pkg/retry/options.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package retry
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
type Option func(*Retry)
9+
10+
// WithMaxAttempts sets the maximum number of attempts
11+
func WithMaxAttempts(maxAttempts int) Option {
12+
return func(r *Retry) {
13+
r.maxAttempts = maxAttempts
14+
}
15+
}
16+
17+
// WithPolicy sets the retry policy
18+
func WithPolicy(policy Policy) Option {
19+
return func(r *Retry) {
20+
r.policy = policy
21+
}
22+
}
23+
24+
// WithDelay sets the delay between retries
25+
func WithDelay(delay time.Duration) Option {
26+
return func(r *Retry) {
27+
r.delay = delay
28+
}
29+
}
30+
31+
// WithDebug sets the debug mode
32+
func WithDebug(debug bool) Option {
33+
return func(r *Retry) {
34+
r.debug = debug
35+
}
36+
}
37+
38+
// WithContext sets the ctx for Infinite policy retry
39+
func WithContext(ctx context.Context) Option {
40+
return func(r *Retry) {
41+
r.ctx = ctx
42+
}
43+
}
44+
45+
// SetMaxAttempts sets the maximum number of attempts
46+
func (r *Retry) SetMaxAttempts(maxAttempts int) *Retry {
47+
r.maxAttempts = maxAttempts
48+
return r
49+
}
50+
51+
// SetPolicy sets the retry policy
52+
func (r *Retry) SetPolicy(policy Policy) *Retry {
53+
r.policy = policy
54+
return r
55+
}
56+
57+
// SetDelay sets the delay between retries
58+
func (r *Retry) SetDelay(delay time.Duration) *Retry {
59+
r.delay = delay
60+
return r
61+
}
62+
63+
// SetDebug sets the debug mode
64+
func (r *Retry) SetDebug(debug bool) *Retry {
65+
r.debug = debug
66+
return r
67+
}

pkg/retry/policies.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package retry
2+
3+
import "fmt"
4+
5+
type Policy int
6+
7+
const (
8+
PolicyLinear Policy = iota
9+
PolicyBackoff
10+
PolicyInfinite
11+
)
12+
13+
// Validate validates the retry policy
14+
func (r Policy) Validate() error {
15+
switch r {
16+
case PolicyLinear, PolicyBackoff, PolicyInfinite:
17+
return nil
18+
default:
19+
return fmt.Errorf("invalid retry policy")
20+
}
21+
}

pkg/retry/retry.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package retry
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
)
9+
10+
// Retry represents a retry mechanism
11+
type Retry struct {
12+
ctx context.Context
13+
maxAttempts int
14+
policy Policy
15+
delay time.Duration
16+
debug bool
17+
}
18+
19+
// New creates a new Retry instance
20+
//
21+
// maxAttempts: the maximum number of attempts. Default is 5
22+
//
23+
// policy: the retry policy. Default is PolicyBackoff
24+
func New(opts ...Option) *Retry {
25+
r := &Retry{
26+
maxAttempts: 5,
27+
policy: PolicyBackoff,
28+
delay: 1 * time.Second,
29+
}
30+
31+
for _, opt := range opts {
32+
opt(r)
33+
}
34+
35+
return r
36+
}
37+
38+
// Do - performs the retry mechanism
39+
func (r *Retry) Do(fn func() error) error {
40+
switch r.policy {
41+
case PolicyLinear:
42+
return r.linearRetry(fn)
43+
case PolicyBackoff:
44+
return r.backoffRetry(fn)
45+
case PolicyInfinite:
46+
return r.infiniteRetry(fn)
47+
default:
48+
return fmt.Errorf("unsupported retry policy")
49+
}
50+
}
51+
52+
// linearRetry - performs a linear retry mechanism
53+
func (r *Retry) linearRetry(fn func() error) error {
54+
for attempt := 1; attempt <= r.maxAttempts; attempt++ {
55+
err := fn()
56+
if err == nil {
57+
return nil
58+
}
59+
60+
if errors.Is(err, ErrExit) {
61+
return err
62+
}
63+
64+
if attempt < r.maxAttempts {
65+
if r.debug {
66+
fmt.Printf("linear Retry attempt %d failed, retrying in %s...\n", attempt, r.delay)
67+
}
68+
time.Sleep(r.delay)
69+
}
70+
}
71+
return fmt.Errorf("linear retry failed after %d attempts", r.maxAttempts)
72+
}
73+
74+
// backoffRetry - performs a backoff retry mechanism
75+
func (r *Retry) backoffRetry(fn func() error) error {
76+
for attempt := 1; attempt <= r.maxAttempts; attempt++ {
77+
err := fn()
78+
if err == nil {
79+
return nil
80+
}
81+
82+
if errors.Is(err, ErrExit) {
83+
return err
84+
}
85+
86+
if attempt < r.maxAttempts {
87+
delay := r.delay * (1 << (attempt - 1)) // Увеличение задержки в 2 раза на каждую попытку
88+
if r.debug {
89+
fmt.Printf("backoff Retry attempt %d failed, retrying in %s...\n", attempt, delay)
90+
}
91+
time.Sleep(delay)
92+
}
93+
}
94+
return fmt.Errorf("backoff retry failed after %d attempts", r.maxAttempts)
95+
}
96+
97+
func (r *Retry) infiniteRetry(fn func() error) error {
98+
if r.ctx == nil {
99+
return fmt.Errorf("infinite retry cannot be initialized without ctx")
100+
}
101+
102+
resCh := make(chan error, 1)
103+
go func() {
104+
defer close(resCh)
105+
for {
106+
select {
107+
case <-r.ctx.Done():
108+
return
109+
default:
110+
err := fn()
111+
if err == nil {
112+
return
113+
}
114+
115+
if errors.Is(err, ErrExit) {
116+
resCh <- err
117+
return
118+
}
119+
120+
if r.debug {
121+
fmt.Printf("initnite retry attempt\n")
122+
}
123+
time.Sleep(r.delay)
124+
}
125+
}
126+
}()
127+
128+
return <-resCh
129+
}

0 commit comments

Comments
 (0)