From e22ea544faceb027bcb8cca34814e6aef45a5e18 Mon Sep 17 00:00:00 2001 From: Prashant Jaikumar Date: Fri, 22 Jan 2021 14:54:06 -0800 Subject: [PATCH] Update status CR section of CR with CDAP readiness status CDAP instance is ready when all services are up and running. The operator monitors service status by querying CDAP REST APIs and updates the status section. The control plane can check the CR status after instance creation/upgrade to determine whether CDAP is up and running. --- controllers/cdapmaster_controller.go | 10 +- controllers/service_status.go | 158 +++++++++++++++++++++++++++ controllers/service_status_test.go | 131 ++++++++++++++++++++++ 3 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 controllers/service_status.go create mode 100644 controllers/service_status_test.go diff --git a/controllers/cdapmaster_controller.go b/controllers/cdapmaster_controller.go index 75004b13..ee9bce93 100644 --- a/controllers/cdapmaster_controller.go +++ b/controllers/cdapmaster_controller.go @@ -16,14 +16,15 @@ limitations under the License. package controllers import ( - "cdap.io/cdap-operator/controllers/cdapmaster" "fmt" - batchv1 "k8s.io/api/batch/v1" - "sigs.k8s.io/controller-reconciler/pkg/finalizer" "strconv" "strings" "text/template" + "cdap.io/cdap-operator/controllers/cdapmaster" + batchv1 "k8s.io/api/batch/v1" + "sigs.k8s.io/controller-reconciler/pkg/finalizer" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" gr "sigs.k8s.io/controller-reconciler/pkg/genericreconciler" @@ -256,6 +257,9 @@ func (h *ServiceHandler) Objects(rsrc interface{}, rsrclabels map[string]string, // Copy NodePort from observed to ensure k8s services' nodePorts stay the same across reconciling iterators CopyNodePortIfAny(expected, observed) + // Update status section of CR with status of CDAP service readiness. + updateServiceStatus(m) + return expected, nil } diff --git a/controllers/service_status.go b/controllers/service_status.go new file mode 100644 index 00000000..0a971016 --- /dev/null +++ b/controllers/service_status.go @@ -0,0 +1,158 @@ +package controllers + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + + "cdap.io/cdap-operator/api/v1alpha1" + "sigs.k8s.io/controller-reconciler/pkg/status" +) + +type CDAPService = string + +const ( + appFabricSvc CDAPService = "appfabric" + datasetExecutorSvc CDAPService = "dataset.executor" + logSaverSvc CDAPService = "log.saver" + messagingSvc CDAPService = "messaging.service" + metadataSvc CDAPService = "metadata.service" + metricsSvc CDAPService = "metrics" + metricsProcessorSvc CDAPService = "metrics.processor" + runtimeSvc CDAPService = "runtime" +) + +var ( + serviceStatus *CDAPServiceStatus + cdapServices []CDAPService + cdapServiceQuery string + cdapSystemServiceQuery string +) + +func init() { + serviceStatus = new(CDAPServiceStatus) + serviceStatus.init() + cdapServices = []CDAPService{appFabricSvc, datasetExecutorSvc, logSaverSvc, messagingSvc, metadataSvc, metricsSvc, metricsProcessorSvc, runtimeSvc} +} + +type CDAPServiceStatus struct { + Ready status.Condition + Unhealthy status.Condition + Unknown status.Condition +} + +type SystemServiceStatusResponse struct { + Status string `json:"status"` + StatusCode json.Number `json:"statusCode"` + AppId string `json:"appId"` + ProgramType string `json:"programType"` + ProgramId string `json:"programId"` +} + +func (s *CDAPServiceStatus) init() { + s.Ready = status.Condition{ + Type: "CDAPStatusReady", + Reason: "Start", + Message: "CDAP services are ready", + } + s.Unhealthy = status.Condition{ + Type: "CDAPStatusUnhealthy", + Reason: "Start", + Message: "One or more CDAP services are unavailable", + } + s.Unknown = status.Condition{ + Type: "CDAPStatusUnknown", + Reason: "Start", + Message: "CDAP service status is unknown", + } +} + +func setServiceStatusUnknown(master *v1alpha1.CDAPMaster) { + setCondition(master, serviceStatus.Unknown) + clearCondition(master, serviceStatus.Ready) + clearCondition(master, serviceStatus.Unhealthy) +} + +func setServiceStatusUnhealthy(master *v1alpha1.CDAPMaster) { + setCondition(master, serviceStatus.Unhealthy) + clearCondition(master, serviceStatus.Ready) + clearCondition(master, serviceStatus.Unknown) +} + +func setServiceStatusReady(master *v1alpha1.CDAPMaster) { + setCondition(master, serviceStatus.Ready) + clearCondition(master, serviceStatus.Unhealthy) + clearCondition(master, serviceStatus.Unknown) +} + +func setCdapQueryUrls(master *v1alpha1.CDAPMaster) { + cdapServiceQuery = fmt.Sprintf("http://%s:11015/v3/system/services/status", getObjName(master, serviceRouter)) + cdapSystemServiceQuery = fmt.Sprintf("http://%s:11015/v3/namespaces/system/status", getObjName(master, serviceRouter)) +} + +func updateServiceStatus(master *v1alpha1.CDAPMaster) { + setCdapQueryUrls(master) + updateServiceStatusHelper(master) +} + +func updateServiceStatusHelper(master *v1alpha1.CDAPMaster) { + // Check if CDAP services are up. + resp, err := http.Get(cdapServiceQuery) + if err == nil { + d := json.NewDecoder(resp.Body) + var svcStatus map[string]string + err = d.Decode(&svcStatus) + if err == nil { + for _, s := range cdapServices { + if status, ok := svcStatus[s]; !ok || status != "OK" { + log.Printf("Service %q is not up, /v3/system/services returned: %s", s, svcStatus) + setServiceStatusUnhealthy(master) + return + } + } + } else { + log.Printf("Failed to decode CDAP service status response: %q", err) + setServiceStatusUnknown(master) + return + } + } else { + log.Printf("Failed to get CDAP service status: %q", err) + setServiceStatusUnknown(master) + return + } + + // Check if system services are running. + var jsonStr = []byte(`[{"appId": "dataprep", "programType": "Service", "programId": "service"}, {"appId": "pipeline", "programType": "Service", "programId": "studio"}] + `) + resp, err = http.Post(cdapSystemServiceQuery, "application/json", bytes.NewBuffer(jsonStr)) + if err == nil { + d := json.NewDecoder(resp.Body) + var serviceStatus []SystemServiceStatusResponse + err = d.Decode(&serviceStatus) + if err == nil { + if len(serviceStatus) != 2 { + log.Printf("Expected 2 service status entries, got: %q", serviceStatus) + setServiceStatusUnhealthy(master) + return + } + for i := 0; i < len(serviceStatus); i++ { + if sc, _ := serviceStatus[i].StatusCode.Int64(); sc != http.StatusOK { + log.Printf("%s status: %s", serviceStatus[i].AppId, serviceStatus[i].StatusCode) + setServiceStatusUnhealthy(master) + return + } + } + } else { + log.Printf("Failed to decode system service status response: %q", err) + setServiceStatusUnknown(master) + return + } + } else { + log.Printf("Failed to get system service status: %q", err) + setServiceStatusUnknown(master) + return + } + setServiceStatusReady(master) +} diff --git a/controllers/service_status_test.go b/controllers/service_status_test.go new file mode 100644 index 00000000..91f05d7b --- /dev/null +++ b/controllers/service_status_test.go @@ -0,0 +1,131 @@ +package controllers + +import ( + "fmt" + "net/http" + "net/http/httptest" + + "cdap.io/cdap-operator/api/v1alpha1" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Controller Suite", func() { + Describe("CDAP service status", func() { + It("CDAP is unavailable", func() { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer ts.Close() + + master := &v1alpha1.CDAPMaster{} + updateServiceStatus(master) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false)) + }) + It("Some CDAP service are not running", func() { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK"}`) + })) + defer ts.Close() + cdapServiceQuery = ts.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + }) + It("Some CDAP services are unhealthy", func() { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "NOTOK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts.Close() + cdapServiceQuery = ts.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + }) + It("CDAP system service status is not available", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false)) + }) + It("Dataprep service is not running", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `[{"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + }) + + It("Dataprep service is not healthy", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `[{"error": "Dataprep not found", "statusCode": 404, "appId": "dataprep", "programType": "service", "programId": "service"}, {"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + }) + It("CDAP is ready", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `[{"status": "RUNNING", "statusCode": 200, "appId": "dataprep", "programType": "service", "programId": "service"}, {"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false)) + }) + }) +})