Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/internal/httpserver/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (hc *Coordinator) Configure() {
// This is a healthcheck URL. Please don't change it
hc.router.GET("/burrow/admin", hc.handleAdmin)

hc.router.Handler(http.MethodGet, "/metrics", hc.handlePrometheusMetrics())

// All valid paths go here
hc.router.GET("/v3/kafka", hc.handleClusterList)
hc.router.GET("/v3/kafka/:cluster", hc.handleClusterDetail)
Expand Down
182 changes: 182 additions & 0 deletions core/internal/httpserver/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package httpserver

import (
"net/http"
"strconv"

"github.com/prometheus/client_golang/prometheus"

"github.com/linkedin/Burrow/core/protocol"

"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
consumerTotalLagGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_lag_total",
Help: "The sum of all partition current lag values for the group",
},
[]string{"cluster", "consumer_group"},
)

consumerStatusGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_status",
Help: "The status of the consumer group. It is calculated from the highest status for the individual partitions. Statuses are an index list from NOTFOUND, OK, WARN, ERR, STOP, STALL, REWIND",
},
[]string{"cluster", "consumer_group"},
)

consumerPartitionCurrentOffset = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_current_offset",
Help: "Latest offset that Burrow is storing for this partition",
},
[]string{"cluster", "consumer_group", "topic", "partition"},
)

consumerPartitionLagGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_partition_lag",
Help: "Number of messages the consumer group is behind by for a partition as reported by Burrow",
},
[]string{"cluster", "consumer_group", "topic", "partition"},
)

topicPartitionOffsetGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_topic_partition_offset",
Help: "Latest offset the topic that Burrow is storing for this partition",
},
[]string{"cluster", "topic", "partition"},
)
)

func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc {
promHandler := promhttp.Handler()

return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
for _, cluster := range listClusters(hc.App) {
for _, consumer := range listConsumers(hc.App, cluster) {
consumerStatus := getFullConsumerStatus(hc.App, cluster, consumer)

if consumerStatus == nil ||
consumerStatus.Status == protocol.StatusNotFound ||
consumerStatus.Complete < 1.0 {
Copy link

@vazdauta vazdauta Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what is the reason for this particular rule( consumerStatus.Complete < 1.0 ).
We have some partitions that have the completion percentage 0.1. The cause of this seems to be related to the fact that we have some partitions that are empty and eventually - their offset expires(although I'm not sure - it might be something else on our side). The consequence is that the completion percentage for the consumer is less than 1 and it is not included in prometheus.

continue
}

labels := map[string]string{
"cluster": cluster,
"consumer_group": consumer,
}

consumerTotalLagGauge.With(labels).Set(float64(consumerStatus.TotalLag))
consumerStatusGauge.With(labels).Set(float64(consumerStatus.Status))

for _, partition := range consumerStatus.Partitions {
if partition.Complete < 1.0 {
continue
}

labels := map[string]string{
"cluster": cluster,
"consumer_group": consumer,
"topic": partition.Topic,
"partition": strconv.FormatInt(int64(partition.Partition), 10),
}

consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset))
consumerPartitionLagGauge.With(labels).Set(float64(partition.CurrentLag))
}
}

// Topics
for _, topic := range listTopics(hc.App, cluster) {
for partitionNumber, offset := range getTopicDetail(hc.App, cluster, topic) {
topicPartitionOffsetGauge.With(map[string]string{
"cluster": cluster,
"topic": topic,
"partition": strconv.FormatInt(int64(partitionNumber), 10),
}).Set(float64(offset))
}
}
}

promHandler.ServeHTTP(resp, req)
})
}

func listClusters(app *protocol.ApplicationContext) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchClusters,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func listConsumers(app *protocol.ApplicationContext, cluster string) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumers,
Cluster: cluster,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func getFullConsumerStatus(app *protocol.ApplicationContext, cluster, consumer string) *protocol.ConsumerGroupStatus {
request := &protocol.EvaluatorRequest{
Cluster: cluster,
Group: consumer,
ShowAll: true,
Reply: make(chan *protocol.ConsumerGroupStatus),
}
app.EvaluatorChannel <- request
response := <-request.Reply
return response
}

func listTopics(app *protocol.ApplicationContext, cluster string) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchTopics,
Cluster: cluster,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func getTopicDetail(app *protocol.ApplicationContext, cluster, topic string) []int64 {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchTopic,
Cluster: cluster,
Topic: topic,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []int64{}
}

return response.([]int64)
}
156 changes: 156 additions & 0 deletions core/internal/httpserver/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package httpserver

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

"github.com/linkedin/Burrow/core/protocol"
)

func TestHttpServer_handlePrometheusMetrics(t *testing.T) {
coordinator := fixtureConfiguredCoordinator()

// Respond to the expected storage requests
go func() {
request := <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchClusters, request.RequestType, "Expected request of type StorageFetchClusters, not %v", request.RequestType)
request.Reply <- []string{"testcluster"}
close(request.Reply)

// List of consumers
request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchConsumers, request.RequestType, "Expected request of type StorageFetchConsumers, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
request.Reply <- []string{"testgroup", "testgroup2"}
close(request.Reply)

// List of topics
request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchTopics, request.RequestType, "Expected request of type StorageFetchTopics, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
request.Reply <- []string{"testtopic", "testtopic1"}
close(request.Reply)

// Topic details
request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchTopic, request.RequestType, "Expected request of type StorageFetchTopic, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testtopic", request.Topic, "Expected request Topic to be testtopic, not %v", request.Topic)
request.Reply <- []int64{6556, 5566}
close(request.Reply)

request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchTopic, request.RequestType, "Expected request of type StorageFetchTopic, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testtopic1", request.Topic, "Expected request Topic to be testtopic, not %v", request.Topic)
request.Reply <- []int64{54}
close(request.Reply)
}()

// Respond to the expected evaluator requests
go func() {
// testgroup happy paths
request := <-coordinator.App.EvaluatorChannel
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testgroup", request.Group, "Expected request Group to be testgroup, not %v", request.Group)
assert.True(t, request.ShowAll, "Expected request ShowAll to be True")
response := &protocol.ConsumerGroupStatus{
Cluster: request.Cluster,
Group: request.Group,
Status: protocol.StatusOK,
Complete: 1.0,
Partitions: []*protocol.PartitionStatus{
{
Topic: "testtopic",
Partition: 0,
Status: protocol.StatusOK,
CurrentLag: 100,
Complete: 1.0,
End: &protocol.ConsumerOffset{
Offset: 22663,
},
},
{
Topic: "testtopic",
Partition: 1,
Status: protocol.StatusOK,
CurrentLag: 10,
Complete: 1.0,
End: &protocol.ConsumerOffset{
Offset: 2488,
},
},
{
Topic: "testtopic1",
Partition: 0,
Status: protocol.StatusOK,
CurrentLag: 50,
Complete: 1.0,
End: &protocol.ConsumerOffset{
Offset: 99888,
},
},
{
Topic: "incomplete",
Partition: 0,
Status: protocol.StatusOK,
CurrentLag: 0,
Complete: 0.2,
End: &protocol.ConsumerOffset{
Offset: 5335,
},
},
},
TotalPartitions: 2134,
Maxlag: &protocol.PartitionStatus{},
TotalLag: 2345,
}
request.Reply <- response
close(request.Reply)

// testgroup2 not found
request = <-coordinator.App.EvaluatorChannel
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testgroup2", request.Group, "Expected request Group to be testgroup, not %v", request.Group)
assert.True(t, request.ShowAll, "Expected request ShowAll to be True")
response = &protocol.ConsumerGroupStatus{
Cluster: request.Cluster,
Group: request.Group,
Status: protocol.StatusNotFound,
}
request.Reply <- response
close(request.Reply)
}()

// Set up a request
req, err := http.NewRequest("GET", "/metrics", nil)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
rr := httptest.NewRecorder()
coordinator.router.ServeHTTP(rr, req)

assert.Equalf(t, http.StatusOK, rr.Code, "Expected response code to be 200, not %v", rr.Code)

promExp := rr.Body.String()
assert.Contains(t, promExp, `burrow_kafka_consumer_status{cluster="testcluster",consumer_group="testgroup"} 1`)
assert.Contains(t, promExp, `burrow_kafka_consumer_lag_total{cluster="testcluster",consumer_group="testgroup"} 2345`)

assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 100`)
assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 10`)
assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 50`)

assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 22663`)
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 2488`)
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 99888`)

assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic"} 6556`)
assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="1",topic="testtopic"} 5566`)
assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic1"} 54`)

assert.NotContains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="incomplete"} 0`)
assert.NotContains(t, promExp, "testgroup2")
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v0.9.3
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/smartystreets/assertions v1.1.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand Down Expand Up @@ -55,6 +56,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -111,6 +113,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down Expand Up @@ -141,12 +144,16 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
Expand Down