diff --git a/controllers/cdapmaster_controller.go b/controllers/cdapmaster_controller.go index 75004b13..ee9bce93 100644 --- a/controllers/cdapmaster_controller.go +++ b/controllers/cdapmaster_controller.go @@ -16,14 +16,15 @@ limitations under the License. package controllers import ( - "cdap.io/cdap-operator/controllers/cdapmaster" "fmt" - batchv1 "k8s.io/api/batch/v1" - "sigs.k8s.io/controller-reconciler/pkg/finalizer" "strconv" "strings" "text/template" + "cdap.io/cdap-operator/controllers/cdapmaster" + batchv1 "k8s.io/api/batch/v1" + "sigs.k8s.io/controller-reconciler/pkg/finalizer" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" gr "sigs.k8s.io/controller-reconciler/pkg/genericreconciler" @@ -256,6 +257,9 @@ func (h *ServiceHandler) Objects(rsrc interface{}, rsrclabels map[string]string, // Copy NodePort from observed to ensure k8s services' nodePorts stay the same across reconciling iterators CopyNodePortIfAny(expected, observed) + // Update status section of CR with status of CDAP service readiness. + updateServiceStatus(m) + return expected, nil } diff --git a/controllers/service_status.go b/controllers/service_status.go new file mode 100644 index 00000000..0a971016 --- /dev/null +++ b/controllers/service_status.go @@ -0,0 +1,158 @@ +package controllers + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + + "cdap.io/cdap-operator/api/v1alpha1" + "sigs.k8s.io/controller-reconciler/pkg/status" +) + +type CDAPService = string + +const ( + appFabricSvc CDAPService = "appfabric" + datasetExecutorSvc CDAPService = "dataset.executor" + logSaverSvc CDAPService = "log.saver" + messagingSvc CDAPService = "messaging.service" + metadataSvc CDAPService = "metadata.service" + metricsSvc CDAPService = "metrics" + metricsProcessorSvc CDAPService = "metrics.processor" + runtimeSvc CDAPService = "runtime" +) + +var ( + serviceStatus *CDAPServiceStatus + cdapServices []CDAPService + cdapServiceQuery string + cdapSystemServiceQuery string +) + +func init() { + serviceStatus = new(CDAPServiceStatus) + serviceStatus.init() + cdapServices = []CDAPService{appFabricSvc, datasetExecutorSvc, logSaverSvc, messagingSvc, metadataSvc, metricsSvc, metricsProcessorSvc, runtimeSvc} +} + +type CDAPServiceStatus struct { + Ready status.Condition + Unhealthy status.Condition + Unknown status.Condition +} + +type SystemServiceStatusResponse struct { + Status string `json:"status"` + StatusCode json.Number `json:"statusCode"` + AppId string `json:"appId"` + ProgramType string `json:"programType"` + ProgramId string `json:"programId"` +} + +func (s *CDAPServiceStatus) init() { + s.Ready = status.Condition{ + Type: "CDAPStatusReady", + Reason: "Start", + Message: "CDAP services are ready", + } + s.Unhealthy = status.Condition{ + Type: "CDAPStatusUnhealthy", + Reason: "Start", + Message: "One or more CDAP services are unavailable", + } + s.Unknown = status.Condition{ + Type: "CDAPStatusUnknown", + Reason: "Start", + Message: "CDAP service status is unknown", + } +} + +func setServiceStatusUnknown(master *v1alpha1.CDAPMaster) { + setCondition(master, serviceStatus.Unknown) + clearCondition(master, serviceStatus.Ready) + clearCondition(master, serviceStatus.Unhealthy) +} + +func setServiceStatusUnhealthy(master *v1alpha1.CDAPMaster) { + setCondition(master, serviceStatus.Unhealthy) + clearCondition(master, serviceStatus.Ready) + clearCondition(master, serviceStatus.Unknown) +} + +func setServiceStatusReady(master *v1alpha1.CDAPMaster) { + setCondition(master, serviceStatus.Ready) + clearCondition(master, serviceStatus.Unhealthy) + clearCondition(master, serviceStatus.Unknown) +} + +func setCdapQueryUrls(master *v1alpha1.CDAPMaster) { + cdapServiceQuery = fmt.Sprintf("http://%s:11015/v3/system/services/status", getObjName(master, serviceRouter)) + cdapSystemServiceQuery = fmt.Sprintf("http://%s:11015/v3/namespaces/system/status", getObjName(master, serviceRouter)) +} + +func updateServiceStatus(master *v1alpha1.CDAPMaster) { + setCdapQueryUrls(master) + updateServiceStatusHelper(master) +} + +func updateServiceStatusHelper(master *v1alpha1.CDAPMaster) { + // Check if CDAP services are up. + resp, err := http.Get(cdapServiceQuery) + if err == nil { + d := json.NewDecoder(resp.Body) + var svcStatus map[string]string + err = d.Decode(&svcStatus) + if err == nil { + for _, s := range cdapServices { + if status, ok := svcStatus[s]; !ok || status != "OK" { + log.Printf("Service %q is not up, /v3/system/services returned: %s", s, svcStatus) + setServiceStatusUnhealthy(master) + return + } + } + } else { + log.Printf("Failed to decode CDAP service status response: %q", err) + setServiceStatusUnknown(master) + return + } + } else { + log.Printf("Failed to get CDAP service status: %q", err) + setServiceStatusUnknown(master) + return + } + + // Check if system services are running. + var jsonStr = []byte(`[{"appId": "dataprep", "programType": "Service", "programId": "service"}, {"appId": "pipeline", "programType": "Service", "programId": "studio"}] + `) + resp, err = http.Post(cdapSystemServiceQuery, "application/json", bytes.NewBuffer(jsonStr)) + if err == nil { + d := json.NewDecoder(resp.Body) + var serviceStatus []SystemServiceStatusResponse + err = d.Decode(&serviceStatus) + if err == nil { + if len(serviceStatus) != 2 { + log.Printf("Expected 2 service status entries, got: %q", serviceStatus) + setServiceStatusUnhealthy(master) + return + } + for i := 0; i < len(serviceStatus); i++ { + if sc, _ := serviceStatus[i].StatusCode.Int64(); sc != http.StatusOK { + log.Printf("%s status: %s", serviceStatus[i].AppId, serviceStatus[i].StatusCode) + setServiceStatusUnhealthy(master) + return + } + } + } else { + log.Printf("Failed to decode system service status response: %q", err) + setServiceStatusUnknown(master) + return + } + } else { + log.Printf("Failed to get system service status: %q", err) + setServiceStatusUnknown(master) + return + } + setServiceStatusReady(master) +} diff --git a/controllers/service_status_test.go b/controllers/service_status_test.go new file mode 100644 index 00000000..91f05d7b --- /dev/null +++ b/controllers/service_status_test.go @@ -0,0 +1,131 @@ +package controllers + +import ( + "fmt" + "net/http" + "net/http/httptest" + + "cdap.io/cdap-operator/api/v1alpha1" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Controller Suite", func() { + Describe("CDAP service status", func() { + It("CDAP is unavailable", func() { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer ts.Close() + + master := &v1alpha1.CDAPMaster{} + updateServiceStatus(master) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false)) + }) + It("Some CDAP service are not running", func() { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK"}`) + })) + defer ts.Close() + cdapServiceQuery = ts.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + }) + It("Some CDAP services are unhealthy", func() { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "NOTOK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts.Close() + cdapServiceQuery = ts.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + }) + It("CDAP system service status is not available", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false)) + }) + It("Dataprep service is not running", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `[{"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + }) + + It("Dataprep service is not healthy", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `[{"error": "Dataprep not found", "statusCode": 404, "appId": "dataprep", "programType": "service", "programId": "service"}, {"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(true)) + }) + It("CDAP is ready", func() { + ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"appfabric": "OK", "metrics": "OK", "metrics.processor": "OK", "log.saver": "OK", "dataset.executor": "OK", "runtime": "OK", "messaging.service": "OK", "metadata.service": "OK"}`) + })) + defer ts1.Close() + cdapServiceQuery = ts1.URL + + ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `[{"status": "RUNNING", "statusCode": 200, "appId": "dataprep", "programType": "service", "programId": "service"}, {"status": "Running", "statusCode": 200, "appId": "pipeline", "programType": "Service", "programId": "studio"}]`) + })) + defer ts2.Close() + cdapSystemServiceQuery = ts2.URL + + master := &v1alpha1.CDAPMaster{} + updateServiceStatusHelper(master) + Expect(isConditionTrue(master, serviceStatus.Ready)).To(Equal(true)) + Expect(isConditionTrue(master, serviceStatus.Unknown)).To(Equal(false)) + Expect(isConditionTrue(master, serviceStatus.Unhealthy)).To(Equal(false)) + }) + }) +})