Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
golang 1.20.14
6 changes: 3 additions & 3 deletions Dockerfile.integrationtest
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
FROM golang:1.23 AS builder
FROM golang:1.24 AS builder
WORKDIR /go/src/github.com/infobloxopen/hotload/
COPY . .

# build integration test binary
RUN make build-test

FROM golang:1.23 AS runner
COPY --from=builder /go/src/github.com/infobloxopen/hotload/integrationtests.test .
FROM golang:1.24 AS runner
COPY --from=builder /go/src/github.com/infobloxopen/hotload/integrationtests/integrationtests.test .

2 changes: 1 addition & 1 deletion Dockerfile.test
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.23
FROM golang:1.24
WORKDIR /go/src/github.com/infobloxopen/hotload
COPY . .
RUN make ci-test
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ build: vet
get-ginkgo:
go get github.com/onsi/ginkgo/v2/ginkgo

test: vet get-ginkgo go-test
test: vet go-test

go-test:
go test -race github.com/infobloxopen/hotload \
go test -tags=unit_tests -race github.com/infobloxopen/hotload \
github.com/infobloxopen/hotload/fsnotify \
github.com/infobloxopen/hotload/internal \
github.com/infobloxopen/hotload/metrics \
Expand All @@ -48,8 +48,8 @@ integ-test-image: .integ-test-image-$(GIT_COMMIT)
deploy-integration-tests:
helm upgrade hotload-integration-tests integrationtests/helm/hotload-integration-tests -i --set image.tag=$(GIT_COMMIT)

build-test: vet get-ginkgo
go test -c ./integrationtests
build-test: vet
cd integrationtests && go test -tags=unit_tests -c .

kind-create-cluster:
kind create cluster
Expand All @@ -73,4 +73,4 @@ postgres-docker-compose-down:

# Requires postgres db, see target postgres-docker-compose-up
local-integration-tests:
HOTLOAD_PATH_CHKSUM_METRICS_ENABLE=true go test -v -race -timeout=3m -count=1 github.com/infobloxopen/hotload/integrationtests
cd integrationtests && HOTLOAD_PATH_CHKSUM_METRICS_ENABLE=true go test -tags=unit_tests -v -race -timeout=3m -count=1
307 changes: 178 additions & 129 deletions chanGroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,10 @@ package hotload
import (
"context"
"database/sql/driver"
"fmt"
"log"
"strings"
"sync"
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus/testutil"

"github.com/infobloxopen/hotload/internal"
"github.com/infobloxopen/hotload/metrics"
)

type testConn struct {
Expand All @@ -28,6 +20,7 @@ func (tc *testConn) Open(name string) (driver.Conn, error) {
func (tc *testConn) Prepare(query string) (driver.Stmt, error) {
return nil, nil
}

func (tc *testConn) Begin() (driver.Tx, error) {
return nil, nil
}
Expand Down Expand Up @@ -57,142 +50,198 @@ func (mw mockWatcher) sendValue(value string) {
log.Printf("mockWatcher: sent value '%s'", value)
}

var _ = DescribeTableSubtree("Driver", Serial, func(forceKill bool) {
var pctx context.Context
var ctx context.Context
var cancel context.CancelFunc
var cg *chanGroup
var mgdConns []*managedConn
var mockw *mockWatcher
Context("chanGroup", func() {
BeforeEach(func(ginkgoCtx context.Context) {
// Do NOT use ginkgoCtx as it will be canceled when BeforeEach finishes
pctx = context.Background()
ctx, cancel = context.WithCancel(pctx)
mockw = newMockWatcher()
cg = &chanGroup{
name: "fsnotify://postgres/tmp/mydsn.txt",
value: "1st-dsn",
newValChan: mockw.getReceiveChan(),
parentCtx: pctx,
ctx: ctx,
cancel: cancel,
sqlDriver: nil,
mu: sync.RWMutex{},
forceKill: forceKill,
}
cg.conns = []*managedConn{
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
}
mgdConns = cg.conns
func TestChanGroup_ChangeValueOnUpdate_ForceKillFalse(t *testing.T) {
testChanGroupChangeValueOnUpdate(t, false)
}

metrics.ResetCollectors()
}, NodeTimeout(5*time.Second))
func TestChanGroup_ChangeValueOnUpdate_ForceKillTrue(t *testing.T) {
testChanGroupChangeValueOnUpdate(t, true)
}

It("Should change value when a value is pushed to the values channel", func(ginkgoCtx context.Context) {
newVal := "2nd-dsn"
go cg.runLoop()
mockw.sendValue(newVal)
func testChanGroupChangeValueOnUpdate(t *testing.T, forceKill bool) {
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()

mockw := newMockWatcher()
cg := &chanGroup{
name: "fsnotify://postgres/tmp/mydsn.txt",
value: "1st-dsn",
newValChan: mockw.getReceiveChan(),
parentCtx: pctx,
ctx: ctx,
cancel: cancel,
sqlDriver: nil,
mu: sync.RWMutex{},
forceKill: forceKill,
}
cg.conns = []*managedConn{
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
}
mgdConns := cg.conns

// Yield to cg.runLoop() background thread
time.Sleep(200 * time.Millisecond)
newVal := "2nd-dsn"
go cg.runLoop()
mockw.sendValue(newVal)

cg.mu.RLock()
defer cg.mu.RUnlock()
Expect(cg.value).To(Equal(newVal))
// Yield to cg.runLoop() background thread
time.Sleep(200 * time.Millisecond)

Expect(len(cg.conns)).To(Equal(0), "number of managed conns should be reset to zero")
cg.mu.RLock()
if cg.value != newVal {
cg.mu.RUnlock()
t.Fatalf("Expected value to be %q but got %q", newVal, cg.value)
}

for _, mc := range mgdConns {
Expect(mc.GetReset()).To(BeTrue(), "managed connection should be marked reset")
if cg.forceKill {
Expect(mc.GetKill()).To(BeTrue(), "managed connection should be marked killed")
Expect(mc.conn.(*testConn).closed).To(BeTrue(), "Closed() should have been called on the underlying connection")
}
if len(cg.conns) != 0 {
cg.mu.RUnlock()
t.Fatalf("Expected number of managed conns to be reset to zero but got %d", len(cg.conns))
}
cg.mu.RUnlock()

for _, mc := range mgdConns {
if !mc.GetReset() {
t.Fatal("Expected managed connection to be marked reset")
}
if forceKill {
if !mc.GetKill() {
t.Fatal("Expected managed connection to be marked killed")
}
if !mc.conn.(*testConn).closed {
t.Fatal("Expected Close() to have been called on the underlying connection")
}
}
}
}

// HotloadChangeTotal metric should be incremented
err := testutil.CollectAndCompare(metrics.HotloadChangeTotal,
strings.NewReader(expectHotloadChangeTotalHelp+
fmt.Sprintf(expectHotloadChangeTotalMetric, cg.name, 1)))
Expect(err).ShouldNot(HaveOccurred())
}, NodeTimeout(5*time.Second))
func TestChanGroup_NoResetOnSameValue_ForceKillFalse(t *testing.T) {
testChanGroupNoResetOnSameValue(t, false)
}

It("Should not reset conns when the same value is pushed to the values channel", func(ginkgoCtx context.Context) {
sameVal := "1st-dsn"
go cg.runLoop()
mockw.sendValue(sameVal)
func TestChanGroup_NoResetOnSameValue_ForceKillTrue(t *testing.T) {
testChanGroupNoResetOnSameValue(t, true)
}

// Yield to cg.runLoop() background thread
time.Sleep(200 * time.Millisecond)
func testChanGroupNoResetOnSameValue(t *testing.T, forceKill bool) {
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()

mockw := newMockWatcher()
cg := &chanGroup{
name: "fsnotify://postgres/tmp/mydsn.txt",
value: "1st-dsn",
newValChan: mockw.getReceiveChan(),
parentCtx: pctx,
ctx: ctx,
cancel: cancel,
sqlDriver: nil,
mu: sync.RWMutex{},
forceKill: forceKill,
}
cg.conns = []*managedConn{
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
}
mgdConns := cg.conns

Expect(cg.value).To(Equal(sameVal))
sameVal := "1st-dsn"
go cg.runLoop()
mockw.sendValue(sameVal)

Expect(len(cg.conns)).To(Equal(3), "number of managed conns should not be reset to zero")
// Yield to cg.runLoop() background thread
time.Sleep(200 * time.Millisecond)

for _, c := range cg.conns {
Expect(c.GetReset()).To(BeFalse())
Expect(c.GetKill()).To(BeFalse())
Expect(c.conn.(*testConn).closed).To(BeFalse())
}
if cg.value != sameVal {
t.Fatalf("Expected value to be %q but got %q", sameVal, cg.value)
}

for _, mc := range mgdConns {
Expect(mc.GetReset()).To(BeFalse())
Expect(mc.GetKill()).To(BeFalse())
Expect(mc.conn.(*testConn).closed).To(BeFalse())
}
if len(cg.conns) != 3 {
t.Fatalf("Expected number of managed conns to remain at 3 but got %d", len(cg.conns))
}

// HotloadChangeTotal metric should NOT be incremented
err := testutil.CollectAndCompare(metrics.HotloadChangeTotal,
strings.NewReader(""))
Expect(err).ShouldNot(HaveOccurred())
}, NodeTimeout(5*time.Second))

It("Should change value and reset connections", func(ginkgoCtx context.Context) {
newVal := "2nd-dsn"
cg.processNewValue(newVal)
Expect(cg.value).To(Equal(newVal))

Expect(len(cg.conns)).To(Equal(0), "number of managed conns should be reset to zero")

for _, mc := range mgdConns {
Expect(mc.GetReset()).To(BeTrue(), "managed connection should be marked reset")
if cg.forceKill {
Expect(mc.GetKill()).To(BeTrue(), "managed connection should be marked killed")
Expect(mc.conn.(*testConn).closed).To(BeTrue(), "Closed() should have been called on the underlying connection")
}
}
for _, c := range cg.conns {
if c.GetReset() {
t.Fatal("Expected connection not to be marked reset")
}
if c.GetKill() {
t.Fatal("Expected connection not to be marked killed")
}
if c.conn.(*testConn).closed {
t.Fatal("Expected Close() not to have been called")
}
}

// HotloadChangeTotal metric should be incremented
err := testutil.CollectAndCompare(metrics.HotloadChangeTotal,
strings.NewReader(expectHotloadChangeTotalHelp+
fmt.Sprintf(expectHotloadChangeTotalMetric, cg.name, 1)))
Expect(err).ShouldNot(HaveOccurred())

err = internal.CollectAndRegexpCompare(metrics.HotloadLastChangedTimestampSeconds,
strings.NewReader(expectHotloadLastChangedTimestampSecondsMetricRegexp),
metrics.HotloadLastChangedTimestampSecondsName)
Expect(err).ShouldNot(HaveOccurred())
}, NodeTimeout(5*time.Second))
})
},
Entry("forceKill=false", false),
Entry("forceKill=true", true),
)
for _, mc := range mgdConns {
if mc.GetReset() {
t.Fatal("Expected connection not to be marked reset")
}
if mc.GetKill() {
t.Fatal("Expected connection not to be marked killed")
}
if mc.conn.(*testConn).closed {
t.Fatal("Expected Close() not to have been called")
}
}
}

func TestChanGroup_ProcessNewValue_ForceKillFalse(t *testing.T) {
testChanGroupProcessNewValue(t, false)
}

func TestChanGroup_ProcessNewValue_ForceKillTrue(t *testing.T) {
testChanGroupProcessNewValue(t, true)
}

var expectHotloadChangeTotalHelp = `
# HELP hotload_change_total Hotload change total by url
# TYPE hotload_change_total counter
`
func testChanGroupProcessNewValue(t *testing.T, forceKill bool) {
pctx := context.Background()
ctx, cancel := context.WithCancel(pctx)
defer cancel()

mockw := newMockWatcher()
cg := &chanGroup{
name: "fsnotify://postgres/tmp/mydsn.txt",
value: "1st-dsn",
newValChan: mockw.getReceiveChan(),
parentCtx: pctx,
ctx: ctx,
cancel: cancel,
sqlDriver: nil,
mu: sync.RWMutex{},
forceKill: forceKill,
}
cg.conns = []*managedConn{
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
newManagedConn(ctx, cg.value, cg.value, &testConn{}, cg.removeMgdConn),
}
mgdConns := cg.conns

newVal := "2nd-dsn"
cg.processNewValue(newVal)

if cg.value != newVal {
t.Fatalf("Expected value to be %q but got %q", newVal, cg.value)
}

var expectHotloadChangeTotalMetric = `
hotload_change_total{url="%s"} %d
`
if len(cg.conns) != 0 {
t.Fatalf("Expected number of managed conns to be reset to zero but got %d", len(cg.conns))
}

var expectHotloadLastChangedTimestampSecondsMetricRegexp = `
# HELP hotload_last_changed_timestamp_seconds Hotload last changed \(unix timestamp\), by url
# TYPE hotload_last_changed_timestamp_seconds gauge
hotload_last_changed_timestamp_seconds{url="fsnotify://postgres/tmp/mydsn.txt"} \d\.\d+e\+\d+
`
for _, mc := range mgdConns {
if !mc.GetReset() {
t.Fatal("Expected managed connection to be marked reset")
}
if forceKill {
if !mc.GetKill() {
t.Fatal("Expected managed connection to be marked killed")
}
if !mc.conn.(*testConn).closed {
t.Fatal("Expected Close() to have been called on the underlying connection")
}
}
}
}
Loading