Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ import:
- package: gopkg.in/tylerb/graceful.v1
version: v1.2.15
- package: github.com/kshvakov/clickhouse
- package: go.uber.org/zap
version: ^1.8.0
37 changes: 23 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"
"time"

"go.uber.org/zap"
)

// a lot of this borrows directly from:
Expand All @@ -28,11 +30,10 @@ type config struct {

var (
versionFlag bool
debug bool
)

func main() {
excode := 0

conf := parseFlags()

if versionFlag {
Expand All @@ -41,27 +42,32 @@ func main() {
if VersionPrerelease != "" {
fmt.Println("Version PreRelease:", VersionPrerelease)
}
os.Exit(excode)
return
}

var logger *zap.Logger
if debug {
logger, _ = zap.NewDevelopment()
} else {
logger, _ = zap.NewProduction()
}

fmt.Println("Starting up..")
defer logger.Sync() // flushes buffer, if any
sugar := logger.Sugar()

srv, err := NewP2CServer(conf)
sugar.Info("Starting up..")

srv, err := NewP2CServer(conf, sugar)
if err != nil {
fmt.Printf("Error: could not create server: %s\n", err.Error())
excode = 1
os.Exit(excode)
sugar.Fatalf("could not create server: %s\n", err.Error())
}
err = srv.Start()
if err != nil {
fmt.Printf("Error: http server returned error: %s\n", err.Error())
excode = 1
sugar.Fatalf("http server returned error: %s\n", err.Error())
}

fmt.Println("Shutting down..")
sugar.Info("Shutting down..")
srv.Shutdown()
fmt.Println("Exiting..")
os.Exit(excode)
sugar.Info("Exiting..")
}

func parseFlags() *config {
Expand All @@ -70,6 +76,9 @@ func parseFlags() *config {
// print version?
flag.BoolVar(&versionFlag, "version", false, "Version")

// turn on debug?
flag.BoolVar(&debug, "debug", false, "turn on debug mode")

// clickhouse dsn
ddsn := "tcp://127.0.0.1:9000?username=&password=&database=metrics&" +
"read_timeout=10&write_timeout=10&alt_hosts="
Expand Down
39 changes: 27 additions & 12 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ import (
"fmt"
"strings"

"github.com/kshvakov/clickhouse"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
"go.uber.org/zap"
)

var readerContent = []interface{}{"component", "reader"}

type p2cReader struct {
conf *config
db *sql.DB
conf *config
db *sql.DB
logger *zap.SugaredLogger
}

// getTimePeriod return select and where SQL chunks relating to the time period -or- error
Expand Down Expand Up @@ -152,13 +157,23 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) {
return sql, nil
}

func NewP2CReader(conf *config) (*p2cReader, error) {
func NewP2CReader(conf *config, sugar *zap.SugaredLogger) (*p2cReader, error) {
var err error
r := new(p2cReader)
r.conf = conf
r.logger = sugar
r.db, err = sql.Open("clickhouse", r.conf.ChDSN)
if err != nil {
fmt.Printf("Error connecting to clickhouse: %s\n", err.Error())
r.logger.With(readerContent...).Errorf("connecting to clickhouse: %s", err.Error())
return r, err
}

if err := r.db.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
r.logger.With(readerContent...).Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
r.logger.With(readerContent...).Error(err.Error())
}
return r, err
}

Expand All @@ -182,27 +197,27 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error)
rcount := 0
for _, q := range req.Queries {
// remove me..
fmt.Printf("\nquery: start: %d, end: %d\n\n", q.StartTimestampMs, q.EndTimestampMs)
r.logger.With(readerContent...).Debug("\nquery: start: %d, end: %d", q.StartTimestampMs, q.EndTimestampMs)

// get the select sql
sqlStr, err = r.getSQL(q)
fmt.Printf("query: running sql: %s\n\n", sqlStr)
r.logger.With(readerContent...).Debug("query: running sql: %s", sqlStr)
if err != nil {
fmt.Printf("Error: reader: getSQL: %s\n", err.Error())
r.logger.With(readerContent...).Errorf("reader: getSQL: %s", err.Error())
return &resp, err
}

// get the select sql
if err != nil {
fmt.Printf("Error: reader: getSQL: %s\n", err.Error())
r.logger.With(readerContent...).Errorf("reader: getSQL: %s", err.Error())
return &resp, err
}

// todo: metrics on number of errors, rows, selects, timings, etc
rows, err = r.db.Query(sqlStr)
if err != nil {
fmt.Printf("Error: query failed: %s", sqlStr)
fmt.Printf("Error: query error: %s\n", err)
r.logger.With(readerContent...).Errorf("query failed: %s", sqlStr)
r.logger.With(readerContent...).Errorf("query error: %s", err)
return &resp, err
}

Expand All @@ -218,7 +233,7 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error)
value float64
)
if err = rows.Scan(&cnt, &t, &name, &tags, &value); err != nil {
fmt.Printf("Error: scan: %s\n", err.Error())
r.logger.With(readerContent...).Errorf("scan: %s", err.Error())
}
// remove this..
//fmt.Printf(fmt.Sprintf("%d,%d,%s,%s,%f\n", cnt, t, name, strings.Join(tags, ":"), value))
Expand All @@ -244,7 +259,7 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error)
resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts)
}

fmt.Printf("query: returning %d rows for %d queries\n", rcount, len(req.Queries))
r.logger.With(readerContent...).Debug("query: returning %d rows for %d queries", rcount, len(req.Queries))

return &resp, nil

Expand Down
22 changes: 12 additions & 10 deletions srv.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package main

import (
"fmt"
"io/ioutil"
"net/http"
"time"

"fmt"

"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
"go.uber.org/zap"
"gopkg.in/tylerb/graceful.v1"
)

Expand All @@ -29,24 +29,26 @@ type p2cServer struct {
writer *p2cWriter
reader *p2cReader
rx prometheus.Counter
logger *zap.SugaredLogger
}

func NewP2CServer(conf *config) (*p2cServer, error) {
func NewP2CServer(conf *config, sugar *zap.SugaredLogger) (*p2cServer, error) {
var err error
c := new(p2cServer)
c.requests = make(chan *p2cRequest, conf.ChanSize)
c.mux = http.NewServeMux()
c.conf = conf
c.logger = sugar

c.writer, err = NewP2CWriter(conf, c.requests)
c.writer, err = NewP2CWriter(conf, c.requests, sugar)
if err != nil {
fmt.Printf("Error creating clickhouse writer: %s\n", err.Error())
c.logger.Errorf("creating clickhouse writer: %s\n", err.Error())
return c, err
}

c.reader, err = NewP2CReader(conf)
c.reader, err = NewP2CReader(conf, sugar)
if err != nil {
fmt.Printf("Error creating clickhouse reader: %s\n", err.Error())
c.logger.Errorf("creating clickhouse reader: %s\n", err.Error())
return c, err
}

Expand Down Expand Up @@ -161,7 +163,7 @@ func (c *p2cServer) process(req remote.WriteRequest) {
}

func (c *p2cServer) Start() error {
fmt.Println("HTTP server starting...")
c.logger.Info("HTTP server starting...")
c.writer.Start()
return graceful.RunWithErr(c.conf.HTTPAddr, c.conf.HTTPTimeout, c.mux)
}
Expand All @@ -178,10 +180,10 @@ func (c *p2cServer) Shutdown() {

select {
case <-wchan:
fmt.Println("Writer shutdown cleanly..")
c.logger.Info("Writer shutdown cleanly..")
// All done!
case <-time.After(10 * time.Second):
fmt.Println("Writer shutdown timed out, samples will be lost..")
c.logger.Info("Writer shutdown timed out, samples will be lost..")
}

}
36 changes: 25 additions & 11 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import (
"database/sql"
"fmt"
"sort"
"time"

"sync"
"time"

"github.com/kshvakov/clickhouse"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var insertSQL = `INSERT INTO %s.%s
(date, name, tags, val, ts)
VALUES (?, ?, ?, ?, ?)`

var writerContent = []interface{}{"component", "writer"}

type p2cWriter struct {
conf *config
requests chan *p2cRequest
Expand All @@ -25,16 +27,28 @@ type p2cWriter struct {
ko prometheus.Counter
test prometheus.Counter
timings prometheus.Histogram

logger *zap.SugaredLogger
}

func NewP2CWriter(conf *config, reqs chan *p2cRequest) (*p2cWriter, error) {
func NewP2CWriter(conf *config, reqs chan *p2cRequest, sugar *zap.SugaredLogger) (*p2cWriter, error) {
var err error
w := new(p2cWriter)
w.conf = conf
w.requests = reqs
w.logger = sugar
w.db, err = sql.Open("clickhouse", w.conf.ChDSN)
if err != nil {
fmt.Printf("Error connecting to clickhouse: %s\n", err.Error())
w.logger.With(writerContent...).Errorf("connecting to clickhouse: %s", err.Error())
return w, err
}

if err := w.db.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
w.logger.With(writerContent...).Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
w.logger.With(writerContent...).Error(err.Error())
}
return w, err
}

Expand Down Expand Up @@ -78,7 +92,7 @@ func (w *p2cWriter) Start() {

go func() {
w.wg.Add(1)
fmt.Println("Writer starting..")
w.logger.With(writerContent...).Info("Writer starting..")
sql := fmt.Sprintf(insertSQL, w.conf.ChDB, w.conf.ChTable)
ok := true
for ok {
Expand All @@ -92,7 +106,7 @@ func (w *p2cWriter) Start() {
// get requet and also check if channel is closed
req, ok = <-w.requests
if !ok {
fmt.Println("Writer stopping..")
w.logger.With(writerContent...).Info("Writer stopping..")
break
}
reqs = append(reqs, req)
Expand All @@ -107,7 +121,7 @@ func (w *p2cWriter) Start() {
// post them to db all at once
tx, err := w.db.Begin()
if err != nil {
fmt.Printf("Error: begin transaction: %s\n", err.Error())
w.logger.With(writerContent...).Errorf("begin transaction: %s", err.Error())
w.ko.Add(1.0)
continue
}
Expand All @@ -116,7 +130,7 @@ func (w *p2cWriter) Start() {
smt, err := tx.Prepare(sql)
for _, req := range reqs {
if err != nil {
fmt.Printf("Error: prepare statement: %s\n", err.Error())
w.logger.With(writerContent...).Errorf("prepare statement: %s", err.Error())
w.ko.Add(1.0)
continue
}
Expand All @@ -128,22 +142,22 @@ func (w *p2cWriter) Start() {
req.val, req.ts)

if err != nil {
fmt.Printf("Error: statement exec: %s\n", err.Error())
w.logger.With(writerContent...).Errorf("statement exec: %s", err.Error())
w.ko.Add(1.0)
}
}

// commit and record metrics
if err = tx.Commit(); err != nil {
fmt.Printf("Error: commit failed: %s\n", err.Error())
w.logger.With(writerContent...).Errorf("commit failed: %s", err.Error())
w.ko.Add(1.0)
} else {
w.tx.Add(float64(nmetrics))
w.timings.Observe(float64(time.Since(tstart)))
}

}
fmt.Println("Writer stopped..")
w.logger.With(writerContent...).Info("Writer stopped..")
w.wg.Done()
}()
}
Expand Down