Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions controllers/cdapmaster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ limitations under the License.
package controllers

import (
"cdap.io/cdap-operator/controllers/cdapmaster"
"fmt"
batchv1 "k8s.io/api/batch/v1"
"sigs.k8s.io/controller-reconciler/pkg/finalizer"
"strconv"
"strings"
"text/template"

"cdap.io/cdap-operator/controllers/cdapmaster"
batchv1 "k8s.io/api/batch/v1"
"sigs.k8s.io/controller-reconciler/pkg/finalizer"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
gr "sigs.k8s.io/controller-reconciler/pkg/genericreconciler"
Expand Down Expand Up @@ -256,6 +257,9 @@ func (h *ServiceHandler) Objects(rsrc interface{}, rsrclabels map[string]string,
// Copy NodePort from observed to ensure k8s services' nodePorts stay the same across reconciling iterators
CopyNodePortIfAny(expected, observed)

// Update status section of CR with status of CDAP service readiness.
updateServiceStatus(m)

return expected, nil
}

Expand Down
158 changes: 158 additions & 0 deletions controllers/service_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package controllers

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"

"cdap.io/cdap-operator/api/v1alpha1"
"sigs.k8s.io/controller-reconciler/pkg/status"
)

type CDAPService = string

const (
appFabricSvc CDAPService = "appfabric"
datasetExecutorSvc CDAPService = "dataset.executor"
logSaverSvc CDAPService = "log.saver"
messagingSvc CDAPService = "messaging.service"
metadataSvc CDAPService = "metadata.service"
metricsSvc CDAPService = "metrics"
metricsProcessorSvc CDAPService = "metrics.processor"
runtimeSvc CDAPService = "runtime"
)

var (
serviceStatus *CDAPServiceStatus
cdapServices []CDAPService
cdapServiceQuery string
cdapSystemServiceQuery string
)

func init() {
serviceStatus = new(CDAPServiceStatus)
serviceStatus.init()
cdapServices = []CDAPService{appFabricSvc, datasetExecutorSvc, logSaverSvc, messagingSvc, metadataSvc, metricsSvc, metricsProcessorSvc, runtimeSvc}
}

type CDAPServiceStatus struct {
Ready status.Condition
Unhealthy status.Condition
Unknown status.Condition
}

type SystemServiceStatusResponse struct {
Status string `json:"status"`
StatusCode json.Number `json:"statusCode"`
AppId string `json:"appId"`
ProgramType string `json:"programType"`
ProgramId string `json:"programId"`
}

func (s *CDAPServiceStatus) init() {
s.Ready = status.Condition{
Type: "CDAPStatusReady",
Reason: "Start",
Message: "CDAP services are ready",
}
s.Unhealthy = status.Condition{
Type: "CDAPStatusUnhealthy",
Reason: "Start",
Message: "One or more CDAP services are unavailable",
}
s.Unknown = status.Condition{
Type: "CDAPStatusUnknown",
Reason: "Start",
Message: "CDAP service status is unknown",
}
}

func setServiceStatusUnknown(master *v1alpha1.CDAPMaster) {
setCondition(master, serviceStatus.Unknown)
clearCondition(master, serviceStatus.Ready)
clearCondition(master, serviceStatus.Unhealthy)
}

func setServiceStatusUnhealthy(master *v1alpha1.CDAPMaster) {
setCondition(master, serviceStatus.Unhealthy)
clearCondition(master, serviceStatus.Ready)
clearCondition(master, serviceStatus.Unknown)
}

func setServiceStatusReady(master *v1alpha1.CDAPMaster) {
setCondition(master, serviceStatus.Ready)
clearCondition(master, serviceStatus.Unhealthy)
clearCondition(master, serviceStatus.Unknown)
}

func setCdapQueryUrls(master *v1alpha1.CDAPMaster) {
cdapServiceQuery = fmt.Sprintf("http://%s:11015/v3/system/services/status", getObjName(master, serviceRouter))
cdapSystemServiceQuery = fmt.Sprintf("http://%s:11015/v3/namespaces/system/status", getObjName(master, serviceRouter))
}

func updateServiceStatus(master *v1alpha1.CDAPMaster) {
setCdapQueryUrls(master)
updateServiceStatusHelper(master)
}

func updateServiceStatusHelper(master *v1alpha1.CDAPMaster) {
// Check if CDAP services are up.
resp, err := http.Get(cdapServiceQuery)
if err == nil {
d := json.NewDecoder(resp.Body)
var svcStatus map[string]string
err = d.Decode(&svcStatus)
if err == nil {
for _, s := range cdapServices {
if status, ok := svcStatus[s]; !ok || status != "OK" {
log.Printf("Service %q is not up, /v3/system/services returned: %s", s, svcStatus)
setServiceStatusUnhealthy(master)
return
}
}
} else {
log.Printf("Failed to decode CDAP service status response: %q", err)
setServiceStatusUnknown(master)
return
}
} else {
log.Printf("Failed to get CDAP service status: %q", err)
setServiceStatusUnknown(master)
return
}

// Check if system services are running.
var jsonStr = []byte(`[{"appId": "dataprep", "programType": "Service", "programId": "service"}, {"appId": "pipeline", "programType": "Service", "programId": "studio"}]
`)
resp, err = http.Post(cdapSystemServiceQuery, "application/json", bytes.NewBuffer(jsonStr))
if err == nil {
d := json.NewDecoder(resp.Body)
var serviceStatus []SystemServiceStatusResponse
err = d.Decode(&serviceStatus)
if err == nil {
if len(serviceStatus) != 2 {
log.Printf("Expected 2 service status entries, got: %q", serviceStatus)
setServiceStatusUnhealthy(master)
return
}
for i := 0; i < len(serviceStatus); i++ {
if sc, _ := serviceStatus[i].StatusCode.Int64(); sc != http.StatusOK {
log.Printf("%s status: %s", serviceStatus[i].AppId, serviceStatus[i].StatusCode)
setServiceStatusUnhealthy(master)
return
}
}
} else {
log.Printf("Failed to decode system service status response: %q", err)
setServiceStatusUnknown(master)
return
}
} else {
log.Printf("Failed to get system service status: %q", err)
setServiceStatusUnknown(master)
return
}
setServiceStatusReady(master)
}
131 changes: 131 additions & 0 deletions controllers/service_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package controllers

import (
"fmt"
"net/http"
"net/http/httptest"

"cdap.io/cdap-operator/api/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Controller Suite", func() {
Describe("CDAP service status", func() {
It("CDAP is unavailable", func() {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer ts.Close()

master := &v1alpha1.CDAPMaster{}
updateServiceStatus(master)
Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(true))
Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false))
})
It("Some CDAP service are not running", func() {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"appfabric": "OK"}`)
}))
defer ts.Close()
cdapServiceQuery = ts.URL

master := &v1alpha1.CDAPMaster{}
updateServiceStatusHelper(master)
Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true))
Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false))
})
It("Some CDAP services are unhealthy", func() {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"appfabric": "NOTOK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`)
}))
defer ts.Close()
cdapServiceQuery = ts.URL

master := &v1alpha1.CDAPMaster{}
updateServiceStatusHelper(master)
Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true))
Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false))
})
It("CDAP system service status is not available", func() {
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`)
}))
defer ts1.Close()
cdapServiceQuery = ts1.URL

ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer ts2.Close()
cdapSystemServiceQuery = ts2.URL

master := &v1alpha1.CDAPMaster{}
updateServiceStatusHelper(master)
Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(true))
Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false))
})
It("Dataprep service is not running", func() {
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`)
}))
defer ts1.Close()
cdapServiceQuery = ts1.URL

ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `[{"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`)
}))
defer ts2.Close()
cdapSystemServiceQuery = ts2.URL

master := &v1alpha1.CDAPMaster{}
updateServiceStatusHelper(master)
Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true))
})

It("Dataprep service is not healthy", func() {
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`)
}))
defer ts1.Close()
cdapServiceQuery = ts1.URL

ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `[{"error": "Dataprep not found", "statusCode": 404, "appId": "dataprep", "programType": "service", "programId": "service"}, {"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`)
}))
defer ts2.Close()
cdapSystemServiceQuery = ts2.URL

master := &v1alpha1.CDAPMaster{}
updateServiceStatusHelper(master)
Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true))
})
It("CDAP is ready", func() {
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`)
}))
defer ts1.Close()
cdapServiceQuery = ts1.URL

ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `[{"status": "RUNNING", "statusCode": 200, "appId": "dataprep", "programType": "service", "programId": "service"}, {"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`)
}))
defer ts2.Close()
cdapSystemServiceQuery = ts2.URL

master := &v1alpha1.CDAPMaster{}
updateServiceStatusHelper(master)
Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(true))
Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false))
Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false))
})
})
})