Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func Strategies() []string {
}

func init() {
sql.Register("hotload", &hdriver{ctx: context.Background(), cgroup: make(map[string]*chanGroup)})
ctx := context.Background()
sql.Register("hotload", &hdriver{
ctx: ctx,
cgroup: make(map[string]*chanGroup),
})
}

// hdriver is the hotload driver.
Expand Down
10 changes: 10 additions & 0 deletions fsnotify/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Strategy struct {
mu sync.RWMutex
paths map[string]*pathWatch
watcher watcher
pathMon *hotload.PathMonitor
}

type pendingOperation struct {
Expand Down Expand Up @@ -166,6 +167,9 @@ func (s *Strategy) Watch(ctx context.Context, pth string, pathQry string) (value
s.watcher = watcher
go s.runLoop()
}
if s.pathMon == nil {
s.pathMon = hotload.NewPathMonitor(ctx)
}
pathW, found := s.paths[pth]
if found {
pathW.logf("fsnotify.Watch", "path already being watched")
Expand All @@ -174,6 +178,12 @@ func (s *Strategy) Watch(ctx context.Context, pth string, pathQry string) (value
if err := s.watcher.Add(pth); err != nil {
return "", nil, err
}
if err := s.pathMon.AddMonitoredPath(pth); err != nil {
if err != hotload.ErrDuplicatePath {
s.errlogf("fsnotify.Watch", "pathMon.AddMonitoredPath(%s) failed, err=%v", pth, err)
return "", nil, err
}
}
bs, err := s.readConfigFile(pth)
if err != nil {
s.watcher.Remove(pth)
Expand Down
28 changes: 28 additions & 0 deletions integrationtests/hotload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"fmt"
"log"
"os"
"strings"
"time"

"github.com/infobloxopen/hotload"
_ "github.com/infobloxopen/hotload/fsnotify"
"github.com/infobloxopen/hotload/internal"
"github.com/infobloxopen/hotload/metrics"
"github.com/infobloxopen/hotload/modtime"
"github.com/lib/pq"
_ "github.com/lib/pq"
Expand Down Expand Up @@ -238,6 +241,19 @@ var _ = AfterSuite(func(ctx context.Context) {
}

//expectConnCountInDb(hlt, 3)

// HotloadPathChksumChangeTotal metric should be incremented
err = internal.CollectAndRegexpCompare(metrics.HotloadPathChksumChangeTotal,
strings.NewReader(fmt.Sprintf(expectHotloadPathChksumChangeTotalMetricRegexp,
"/tmp/hotload_integration_test_dsn_config.txt")),
metrics.HotloadPathChksumChangeTotalName)
Expect(err).ShouldNot(HaveOccurred())

err = internal.CollectAndRegexpCompare(metrics.HotloadPathChksumTimestampSeconds,
strings.NewReader(fmt.Sprintf(expectHotloadPathChksumTimestampSecondsMetricRegexp,
"/tmp/hotload_integration_test_dsn_config.txt")),
metrics.HotloadPathChksumTimestampSecondsName)
Expect(err).ShouldNot(HaveOccurred())
}, NodeTimeout(240*time.Second))

var _ = Describe("hotload integration tests - sanity", Serial, func() {
Expand Down Expand Up @@ -302,3 +318,15 @@ var _ = Describe("hotload integration tests - sanity", Serial, func() {
expectValueInDb(hlt1Db, "sanity", false, 1, 1)
})
})

var expectHotloadPathChksumChangeTotalMetricRegexp = `
# HELP hotload_path_chksum_change_total Hotload path checksum change total by path
# TYPE hotload_path_chksum_change_total counter
hotload_path_chksum_change_total{path="%s"} 4\d
`

var expectHotloadPathChksumTimestampSecondsMetricRegexp = `
# HELP hotload_path_chksum_timestamp_seconds Hotload path checksum last changed \(unix timestamp\), by path
# TYPE hotload_path_chksum_timestamp_seconds gauge
hotload_path_chksum_timestamp_seconds{path="%s"} \d\.\d+e\+\d+
`
3 changes: 3 additions & 0 deletions integrationtests/integrationtests_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"testing"

"github.com/infobloxopen/hotload"
"github.com/infobloxopen/hotload/internal"
"github.com/infobloxopen/hotload/logger"

Expand Down Expand Up @@ -55,6 +56,8 @@ func TestIntegrationtests(t *testing.T) {
nrr := internal.NewNonRandomReader(1)
uuid.SetRand(nrr)

os.Setenv(hotload.CheckIntervalEnvVar, "1s")

pgHost, ok := os.LookupEnv("HOTLOAD_INTEGRATION_TEST_POSTGRES_HOST")
pgHost = strings.TrimSpace(pgHost)
if ok && len(pgHost) > 0 {
Expand Down
28 changes: 28 additions & 0 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,38 @@ func SetHotloadLastChangedTimestampSeconds(url string, val float64) {
HotloadLastChangedTimestampSeconds.WithLabelValues(url).Set(val)
}

// HotloadPathChksumChangeTotal is count of path checksum changes polled by hotload
var HotloadPathChksumChangeTotalName = "hotload_path_chksum_change_total"
var HotloadPathChksumChangeTotalHelp = "Hotload path checksum change total by path"
var HotloadPathChksumChangeTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: HotloadPathChksumChangeTotalName,
Help: HotloadPathChksumChangeTotalHelp,
}, []string{PathKey})

func IncHotloadPathChksumChangeTotal(path string) {
HotloadPathChksumChangeTotal.WithLabelValues(path).Inc()
}

// HotloadPathChksumTimestampSeconds is timestamp when hotload path checksum changed (unix timestamp)
var HotloadPathChksumTimestampSecondsName = "hotload_path_chksum_timestamp_seconds"
var HotloadPathChksumTimestampSecondsHelp = "Hotload path checksum last changed (unix timestamp), by path"
var HotloadPathChksumTimestampSeconds = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: HotloadPathChksumTimestampSecondsName,
Help: HotloadPathChksumTimestampSecondsHelp,
}, []string{PathKey})

func SetHotloadPathChksumTimestampSeconds(path string, val float64) {
HotloadPathChksumTimestampSeconds.WithLabelValues(path).Set(val)
}

func GetCollectors() []prometheus.Collector {
return []prometheus.Collector{
SqlStmtsSummary,
HotloadModtimeLatencyHistogram,
HotloadChangeTotal,
HotloadLastChangedTimestampSeconds,
HotloadPathChksumChangeTotal,
HotloadPathChksumTimestampSeconds,
}
}

Expand All @@ -78,6 +104,8 @@ func ResetCollectors() {
HotloadModtimeLatencyHistogram.Reset()
HotloadChangeTotal.Reset()
HotloadLastChangedTimestampSeconds.Reset()
HotloadPathChksumChangeTotal.Reset()
HotloadPathChksumTimestampSeconds.Reset()
}

func init() {
Expand Down
138 changes: 138 additions & 0 deletions path_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package hotload

import (
"context"
"errors"
"fmt"
"hash/crc64"
"os"
"path"
"strings"
"sync"
"time"

"github.com/infobloxopen/hotload/logger"
"github.com/infobloxopen/hotload/metrics"
)

const (
CheckIntervalEnvVar = "HOTLOAD_PATH_MONITOR_CHECK_INTERVAL_DURATION"
)

var (
ErrDuplicatePath = errors.New("duplicate path")
ErrPathNotFound = errors.New("path not found")

crc64Table = crc64.MakeTable(crc64.ECMA)
)

type PathMonitor struct {
sync.RWMutex // used to synchronize changes to the set of paths being monitored
ctx context.Context
checkIntv time.Duration
paths map[string]*pathRecord
}

type pathRecord struct {
path string
crc64 uint64
}

func NewPathMonitor(ctx context.Context) *PathMonitor {
durStr := os.Getenv(CheckIntervalEnvVar)
durStr = strings.TrimSpace(durStr)
if len(durStr) <= 0 {
durStr = "60s"
}
checkIntv, err := time.ParseDuration(durStr)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(%s) err=%v", durStr, err))
}

pthm := &PathMonitor{
ctx: ctx,
checkIntv: checkIntv,
paths: make(map[string]*pathRecord),
}

go pthm.runLoop(pthm.ctx)

return pthm
}

// AddMonitoredPath adds a path to be checked for change in contents
func (pthm *PathMonitor) AddMonitoredPath(pathStr string) error {
pathStr = CleanPath(pathStr)

pthm.Lock()
defer pthm.Unlock()

pathRec, found := pthm.paths[pathStr]
if found {
return ErrDuplicatePath
}

pathRec = &pathRecord{
path: pathStr,
}
pthm.paths[pathStr] = pathRec

return nil
}

// checkPathChksums checks for changes in its set of paths
func (pthm *PathMonitor) checkPathChksums(ctx context.Context, nowTime time.Time) {
nowUnix := float64(nowTime.Unix())

pthm.RLock()
defer pthm.RUnlock()

for pathStr, pathRec := range pthm.paths {
pathBytes, err := os.ReadFile(pathStr)
if err != nil {
// log error, but continue
logger.ErrLogf("PathMonitor.checkPathChksums", "ReadFile(%s) err=%s", pathStr, err)
continue
}

newCrc64 := crc64.Checksum(pathBytes, crc64Table)
if pathRec.crc64 == newCrc64 {
continue
}

pathRec.crc64 = newCrc64

// Update metrics
metrics.IncHotloadPathChksumChangeTotal(pathStr)
metrics.SetHotloadPathChksumTimestampSeconds(pathStr, nowUnix)
}
}

// runLoop is the background PathMonitor thread that
// periodically checks for changes in paths' contents in a loop.
// Terminated when context is done (canceled).
func (pthm *PathMonitor) runLoop(ctx context.Context) {
logger.Logf("PathMonitor.runLoop", "started")

checkTicker := time.NewTicker(pthm.checkIntv)
defer checkTicker.Stop()

loop:
for {
select {
case <-ctx.Done():
checkTicker.Stop()
break loop

case curTime := <-checkTicker.C:
pthm.checkPathChksums(ctx, curTime)
}
}

logger.Logf("PathMonitor.runLoop", "terminated")
}

// CleanPath cleans and trimspaces path strings
func CleanPath(pathStr string) string {
return path.Clean(strings.TrimSpace(pathStr))
}