diff --git a/common/constant/key.go b/common/constant/key.go index 5cc333f20a..2230437f95 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -475,6 +475,13 @@ const ( PrometheusPushgatewayPasswordKey = "prometheus.pushgateway.password" PrometheusPushgatewayPushIntervalKey = "prometheus.pushgateway.push.interval" PrometheusPushgatewayJobKey = "prometheus.pushgateway.job" + + ProbeEnabledKey = "probe.enabled" + ProbePortKey = "probe.port" + ProbeLivenessPathKey = "probe.liveness.path" + ProbeReadinessPathKey = "probe.readiness.path" + ProbeStartupPathKey = "probe.startup.path" + ProbeUseInternalStateKey = "probe.use-internal-state" ) // default meta cache config diff --git a/common/constant/metric.go b/common/constant/metric.go index 4b35721eba..9038262b46 100644 --- a/common/constant/metric.go +++ b/common/constant/metric.go @@ -53,6 +53,10 @@ const ( PrometheusDefaultMetricsPort = "9090" PrometheusDefaultPushInterval = 30 PrometheusDefaultJobName = "default_dubbo_job" + ProbeDefaultPort = "22222" + ProbeDefaultLivenessPath = "/live" + ProbeDefaultReadinessPath = "/ready" + ProbeDefaultStartupPath = "/startup" MetricFilterStartTime = "metric_filter_start_time" ) diff --git a/compat.go b/compat.go index 7844ba5b1b..5af9d39132 100644 --- a/compat.go +++ b/compat.go @@ -379,6 +379,21 @@ func compatMetricConfig(c *global.MetricsConfig) *config.MetricsConfig { EnableMetadata: c.EnableMetadata, EnableRegistry: c.EnableRegistry, EnableConfigCenter: c.EnableConfigCenter, + Probe: compatMetricProbeConfig(c.Probe), + } +} + +func compatMetricProbeConfig(c *global.ProbeConfig) *config.ProbeConfig { + if c == nil { + return nil + } + return &config.ProbeConfig{ + Enabled: c.Enabled, + Port: c.Port, + LivenessPath: c.LivenessPath, + ReadinessPath: c.ReadinessPath, + StartupPath: c.StartupPath, + UseInternalState: c.UseInternalState, } } @@ -916,6 +931,21 @@ func compatGlobalMetricConfig(c *config.MetricsConfig) *global.MetricsConfig { EnableMetadata: c.EnableMetadata, EnableRegistry: c.EnableRegistry, EnableConfigCenter: c.EnableConfigCenter, + Probe: compatGlobalMetricProbeConfig(c.Probe), + } +} + +func compatGlobalMetricProbeConfig(c *config.ProbeConfig) *global.ProbeConfig { + if c == nil { + return nil + } + return &global.ProbeConfig{ + Enabled: c.Enabled, + Port: c.Port, + LivenessPath: c.LivenessPath, + ReadinessPath: c.ReadinessPath, + StartupPath: c.StartupPath, + UseInternalState: c.UseInternalState, } } diff --git a/config/metric_config.go b/config/metric_config.go index 628b0f6a11..93b594b3ab 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -31,6 +31,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/metrics" + "dubbo.apache.org/dubbo-go/v3/metrics/probe" ) // MetricsConfig This is the config struct for all metrics implementation @@ -44,6 +45,7 @@ type MetricsConfig struct { EnableConfigCenter *bool `default:"false" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"` Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"` Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"` + Probe *ProbeConfig `yaml:"probe" json:"probe" property:"probe"` rootConfig *RootConfig } @@ -58,6 +60,15 @@ type PrometheusConfig struct { Pushgateway *PushgatewayConfig `yaml:"pushgateway" json:"pushgateway,omitempty" property:"pushgateway"` } +type ProbeConfig struct { + Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled"` + Port string `default:"22222" yaml:"port" json:"port,omitempty" property:"port"` + LivenessPath string `default:"/live" yaml:"liveness-path" json:"liveness-path,omitempty" property:"liveness-path"` + ReadinessPath string `default:"/ready" yaml:"readiness-path" json:"readiness-path,omitempty" property:"readiness-path"` + StartupPath string `default:"/startup" yaml:"startup-path" json:"startup-path,omitempty" property:"startup-path"` + UseInternalState *bool `default:"true" yaml:"use-internal-state" json:"use-internal-state,omitempty" property:"use-internal-state"` +} + type Exporter struct { Enabled *bool `default:"true" yaml:"enabled" json:"enabled,omitempty" property:"enabled"` } @@ -95,6 +106,16 @@ func (mc *MetricsConfig) Init(rc *RootConfig) error { if *mc.Enable { metrics.Init(mc.toURL()) } + if mc.Probe != nil && mc.Probe.Enabled != nil && *mc.Probe.Enabled { + probe.Init(&probe.Config{ + Enabled: true, + Port: mc.Probe.Port, + LivenessPath: mc.Probe.LivenessPath, + ReadinessPath: mc.Probe.ReadinessPath, + StartupPath: mc.Probe.StartupPath, + UseInternalState: mc.Probe.UseInternalState == nil || *mc.Probe.UseInternalState, + }) + } return nil } diff --git a/global/metric_config.go b/global/metric_config.go index 88f2964632..401d007081 100644 --- a/global/metric_config.go +++ b/global/metric_config.go @@ -28,6 +28,7 @@ type MetricsConfig struct { EnableMetadata *bool `default:"true" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"` EnableRegistry *bool `default:"true" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"` EnableConfigCenter *bool `default:"true" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"` + Probe *ProbeConfig `yaml:"probe" json:"probe" property:"probe"` } type AggregateConfig struct { @@ -41,6 +42,15 @@ type PrometheusConfig struct { Pushgateway *PushgatewayConfig `yaml:"pushgateway" json:"pushgateway,omitempty" property:"pushgateway"` } +type ProbeConfig struct { + Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled"` + Port string `default:"22222" yaml:"port" json:"port,omitempty" property:"port"` + LivenessPath string `default:"/live" yaml:"liveness-path" json:"liveness-path,omitempty" property:"liveness-path"` + ReadinessPath string `default:"/ready" yaml:"readiness-path" json:"readiness-path,omitempty" property:"readiness-path"` + StartupPath string `default:"/startup" yaml:"startup-path" json:"startup-path,omitempty" property:"startup-path"` + UseInternalState *bool `default:"true" yaml:"use-internal-state" json:"use-internal-state,omitempty" property:"use-internal-state"` +} + type Exporter struct { Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled"` } @@ -57,7 +67,7 @@ type PushgatewayConfig struct { func DefaultMetricsConfig() *MetricsConfig { // return a new config without setting any field means there is not any default value for initialization - return &MetricsConfig{Prometheus: defaultPrometheusConfig(), Aggregation: defaultAggregateConfig()} + return &MetricsConfig{Prometheus: defaultPrometheusConfig(), Aggregation: defaultAggregateConfig(), Probe: defaultProbeConfig()} } // Clone a new MetricsConfig @@ -100,6 +110,7 @@ func (c *MetricsConfig) Clone() *MetricsConfig { EnableMetadata: newEnableMetadata, EnableRegistry: newEnableRegistry, EnableConfigCenter: newEnableConfigCenter, + Probe: c.Probe.Clone(), } } @@ -155,6 +166,36 @@ func (c *PushgatewayConfig) Clone() *PushgatewayConfig { } } +func defaultProbeConfig() *ProbeConfig { + return &ProbeConfig{} +} + +func (c *ProbeConfig) Clone() *ProbeConfig { + if c == nil { + return nil + } + + var newEnabled *bool + if c.Enabled != nil { + newEnabled = new(bool) + *newEnabled = *c.Enabled + } + var newUseInternalState *bool + if c.UseInternalState != nil { + newUseInternalState = new(bool) + *newUseInternalState = *c.UseInternalState + } + + return &ProbeConfig{ + Enabled: newEnabled, + Port: c.Port, + LivenessPath: c.LivenessPath, + ReadinessPath: c.ReadinessPath, + StartupPath: c.StartupPath, + UseInternalState: newUseInternalState, + } +} + func defaultAggregateConfig() *AggregateConfig { return &AggregateConfig{} } diff --git a/metrics/options.go b/metrics/options.go index 58542cc572..d494e6a33b 100644 --- a/metrics/options.go +++ b/metrics/options.go @@ -152,3 +152,41 @@ func WithPath(path string) Option { opts.Metrics.Path = path } } + +// Below are options for probe +func WithProbeEnabled() Option { + return func(opts *Options) { + b := true + opts.Metrics.Probe.Enabled = &b + } +} + +func WithProbePort(port int) Option { + return func(opts *Options) { + opts.Metrics.Probe.Port = strconv.Itoa(port) + } +} + +func WithProbeLivenessPath(path string) Option { + return func(opts *Options) { + opts.Metrics.Probe.LivenessPath = path + } +} + +func WithProbeReadinessPath(path string) Option { + return func(opts *Options) { + opts.Metrics.Probe.ReadinessPath = path + } +} + +func WithProbeStartupPath(path string) Option { + return func(opts *Options) { + opts.Metrics.Probe.StartupPath = path + } +} + +func WithProbeUseInternalState(use bool) Option { + return func(opts *Options) { + opts.Metrics.Probe.UseInternalState = &use + } +} diff --git a/metrics/probe/http.go b/metrics/probe/http.go new file mode 100644 index 0000000000..bf5c686842 --- /dev/null +++ b/metrics/probe/http.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package probe + +import ( + "errors" + "net/http" +) + +var ( + errNotReady = errors.New("not ready") + errNotStarted = errors.New("not started") +) + +func livenessHandler(w http.ResponseWriter, r *http.Request) { + if err := CheckLiveness(r.Context()); err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) +} + +func readinessHandler(w http.ResponseWriter, r *http.Request) { + if err := CheckReadiness(r.Context()); err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) +} + +func startupHandler(w http.ResponseWriter, r *http.Request) { + if err := CheckStartup(r.Context()); err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) +} diff --git a/metrics/probe/http_test.go b/metrics/probe/http_test.go new file mode 100644 index 0000000000..1583860315 --- /dev/null +++ b/metrics/probe/http_test.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package probe + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "testing" +) + +func TestHandlersSuccess(t *testing.T) { + resetProbeState() + + RegisterLiveness("ok", func(context.Context) error { return nil }) + RegisterReadiness("ok", func(context.Context) error { return nil }) + RegisterStartup("ok", func(context.Context) error { return nil }) + + req := httptest.NewRequest(http.MethodGet, "/live", nil) + rec := httptest.NewRecorder() + livenessHandler(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200 for liveness, got %d", rec.Code) + } + + req = httptest.NewRequest(http.MethodGet, "/ready", nil) + rec = httptest.NewRecorder() + readinessHandler(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200 for readiness, got %d", rec.Code) + } + + req = httptest.NewRequest(http.MethodGet, "/startup", nil) + rec = httptest.NewRecorder() + startupHandler(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200 for startup, got %d", rec.Code) + } +} + +func TestHandlersFailure(t *testing.T) { + resetProbeState() + + RegisterLiveness("fail", func(context.Context) error { return errors.New("bad") }) + RegisterReadiness("fail", func(context.Context) error { return errors.New("bad") }) + RegisterStartup("fail", func(context.Context) error { return errors.New("bad") }) + + req := httptest.NewRequest(http.MethodGet, "/live", nil) + rec := httptest.NewRecorder() + livenessHandler(rec, req) + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503 for liveness, got %d", rec.Code) + } + + req = httptest.NewRequest(http.MethodGet, "/ready", nil) + rec = httptest.NewRecorder() + readinessHandler(rec, req) + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503 for readiness, got %d", rec.Code) + } + + req = httptest.NewRequest(http.MethodGet, "/startup", nil) + rec = httptest.NewRecorder() + startupHandler(rec, req) + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503 for startup, got %d", rec.Code) + } +} diff --git a/metrics/probe/probe.go b/metrics/probe/probe.go new file mode 100644 index 0000000000..6790fcfdb3 --- /dev/null +++ b/metrics/probe/probe.go @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package probe + +import ( + "context" + "fmt" + "sync" +) + +// CheckFunc returns nil when healthy. +type CheckFunc func(context.Context) error + +var ( + livenessMu sync.RWMutex + readinessMu sync.RWMutex + startupMu sync.RWMutex + + livenessChecks = make(map[string]CheckFunc) + readinessChecks = make(map[string]CheckFunc) + startupChecks = make(map[string]CheckFunc) +) + +// RegisterLiveness registers a liveness check. +func RegisterLiveness(name string, fn CheckFunc) { + if name == "" || fn == nil { + return + } + livenessMu.Lock() + defer livenessMu.Unlock() + livenessChecks[name] = fn +} + +// RegisterReadiness registers a readiness check. +func RegisterReadiness(name string, fn CheckFunc) { + if name == "" || fn == nil { + return + } + readinessMu.Lock() + defer readinessMu.Unlock() + readinessChecks[name] = fn +} + +// RegisterStartup registers a startup check. +func RegisterStartup(name string, fn CheckFunc) { + if name == "" || fn == nil { + return + } + startupMu.Lock() + defer startupMu.Unlock() + startupChecks[name] = fn +} + +func runChecks(ctx context.Context, mu *sync.RWMutex, checks map[string]CheckFunc) error { + mu.RLock() + defer mu.RUnlock() + for name, fn := range checks { + if err := fn(ctx); err != nil { + return fmt.Errorf("probe %s: %w", name, err) + } + } + return nil +} + +// CheckLiveness evaluates all liveness checks. +func CheckLiveness(ctx context.Context) error { + return runChecks(ctx, &livenessMu, livenessChecks) +} + +// CheckReadiness evaluates all readiness checks. +func CheckReadiness(ctx context.Context) error { + return runChecks(ctx, &readinessMu, readinessChecks) +} + +// CheckStartup evaluates all startup checks. +func CheckStartup(ctx context.Context) error { + return runChecks(ctx, &startupMu, startupChecks) +} diff --git a/metrics/probe/probe_test.go b/metrics/probe/probe_test.go new file mode 100644 index 0000000000..6c2a499d52 --- /dev/null +++ b/metrics/probe/probe_test.go @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package probe + +import ( + "context" + "errors" + "testing" +) + +func resetProbeState() { + livenessMu.Lock() + livenessChecks = map[string]CheckFunc{} + livenessMu.Unlock() + + readinessMu.Lock() + readinessChecks = map[string]CheckFunc{} + readinessMu.Unlock() + + startupMu.Lock() + startupChecks = map[string]CheckFunc{} + startupMu.Unlock() + + internalStateEnabled.Store(false) + readyFlag.Store(false) + startupFlag.Store(false) +} + +func TestRegisterAndCheckLiveness(t *testing.T) { + resetProbeState() + + RegisterLiveness("", func(context.Context) error { return nil }) + RegisterLiveness("nil", nil) + + livenessMu.RLock() + if got := len(livenessChecks); got != 0 { + livenessMu.RUnlock() + t.Fatalf("expected 0 liveness checks, got %d", got) + } + livenessMu.RUnlock() + + RegisterLiveness("ok", func(context.Context) error { return nil }) + if err := CheckLiveness(context.Background()); err != nil { + t.Fatalf("expected liveness ok, got %v", err) + } + + RegisterLiveness("fail", func(context.Context) error { return errors.New("boom") }) + if err := CheckLiveness(context.Background()); err == nil { + t.Fatalf("expected liveness error, got nil") + } +} + +func TestCheckReadinessAndStartupDefault(t *testing.T) { + resetProbeState() + + if err := CheckReadiness(context.Background()); err != nil { + t.Fatalf("expected readiness ok with no checks, got %v", err) + } + if err := CheckStartup(context.Background()); err != nil { + t.Fatalf("expected startup ok with no checks, got %v", err) + } +} + +func TestInternalStateFlags(t *testing.T) { + resetProbeState() + + EnableInternalState(true) + if internalReady() { + t.Fatalf("expected internalReady false by default") + } + if internalStartup() { + t.Fatalf("expected internalStartup false by default") + } + + SetReady(true) + SetStartupComplete(true) + if !internalReady() { + t.Fatalf("expected internalReady true after SetReady(true)") + } + if !internalStartup() { + t.Fatalf("expected internalStartup true after SetStartupComplete(true)") + } + + EnableInternalState(false) + if !internalReady() || !internalStartup() { + t.Fatalf("expected internal state checks bypassed when disabled") + } +} diff --git a/metrics/probe/server.go b/metrics/probe/server.go new file mode 100644 index 0000000000..0f542ceff2 --- /dev/null +++ b/metrics/probe/server.go @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package probe + +import ( + "context" + "net/http" + "sync" + "time" +) + +import ( + "github.com/dubbogo/gost/log/logger" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/global" +) + +type Config struct { + Enabled bool + Port string + LivenessPath string + ReadinessPath string + StartupPath string + UseInternalState bool +} + +var ( + startOnce sync.Once +) + +func Init(cfg *Config) { + if cfg == nil || !cfg.Enabled { + return + } + startOnce.Do(func() { + if cfg.UseInternalState { + EnableInternalState(true) + SetReady(false) + SetStartupComplete(false) + RegisterReadiness("internal", func(ctx context.Context) error { + if internalReady() { + return nil + } + return errNotReady + }) + RegisterStartup("internal", func(ctx context.Context) error { + if internalStartup() { + return nil + } + return errNotStarted + }) + } + + mux := http.NewServeMux() + if cfg.LivenessPath != "" { + mux.HandleFunc(cfg.LivenessPath, livenessHandler) + } + if cfg.ReadinessPath != "" { + mux.HandleFunc(cfg.ReadinessPath, readinessHandler) + } + if cfg.StartupPath != "" { + mux.HandleFunc(cfg.StartupPath, startupHandler) + } + srv := &http.Server{Addr: ":" + cfg.Port, Handler: mux} + extension.AddCustomShutdownCallback(func() { + SetReady(false) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + logger.Errorf("[kubernetes probe] probe server shutdown failed: %v", err) + } + }) + + go func() { + logger.Infof("[kubernetes probe] probe server listening on :%s", cfg.Port) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Errorf("[kubernetes probe] probe server stopped with error: %v", err) + } + }() + }) +} + +func BuildProbeConfig(probeCfg *global.ProbeConfig) *Config { + if probeCfg == nil || probeCfg.Enabled == nil || !*probeCfg.Enabled { + return nil + } + + useInternal := probeCfg.UseInternalState == nil || *probeCfg.UseInternalState + port := probeCfg.Port + if port == "" { + port = constant.ProbeDefaultPort + } + livenessPath := probeCfg.LivenessPath + if livenessPath == "" { + livenessPath = constant.ProbeDefaultLivenessPath + } + readinessPath := probeCfg.ReadinessPath + if readinessPath == "" { + readinessPath = constant.ProbeDefaultReadinessPath + } + startupPath := probeCfg.StartupPath + if startupPath == "" { + startupPath = constant.ProbeDefaultStartupPath + } + + return &Config{ + Enabled: true, + Port: port, + LivenessPath: livenessPath, + ReadinessPath: readinessPath, + StartupPath: startupPath, + UseInternalState: useInternal, + } +} diff --git a/metrics/probe/state.go b/metrics/probe/state.go new file mode 100644 index 0000000000..31a098028c --- /dev/null +++ b/metrics/probe/state.go @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package probe + +import ( + "sync/atomic" +) + +var ( + internalStateEnabled atomic.Bool + readyFlag atomic.Bool + startupFlag atomic.Bool +) + +// EnableInternalState controls whether readiness/startup checks +// should validate internal state flags. +func EnableInternalState(enabled bool) { + internalStateEnabled.Store(enabled) +} + +// SetReady sets readiness state used by the internal readiness check. +func SetReady(ready bool) { + readyFlag.Store(ready) +} + +// SetStartupComplete sets startup state used by the internal startup check. +func SetStartupComplete(ready bool) { + startupFlag.Store(ready) +} + +func internalReady() bool { + if !internalStateEnabled.Load() { + return true + } + return readyFlag.Load() +} + +func internalStartup() bool { + if !internalStateEnabled.Load() { + return true + } + return startupFlag.Load() +} diff --git a/metrics/probe/state_test.go b/metrics/probe/state_test.go new file mode 100644 index 0000000000..0c1acdc50f --- /dev/null +++ b/metrics/probe/state_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package probe + +import ( + "testing" +) + +func TestStateDefaultsAndToggle(t *testing.T) { + resetProbeState() + + if internalReady() != true { + t.Fatalf("expected internalReady true when internal state disabled") + } + if internalStartup() != true { + t.Fatalf("expected internalStartup true when internal state disabled") + } + + EnableInternalState(true) + if internalReady() { + t.Fatalf("expected internalReady false when enabled and not ready") + } + if internalStartup() { + t.Fatalf("expected internalStartup false when enabled and not started") + } + + SetReady(true) + SetStartupComplete(true) + if !internalReady() { + t.Fatalf("expected internalReady true after SetReady(true)") + } + if !internalStartup() { + t.Fatalf("expected internalStartup true after SetStartupComplete(true)") + } +} diff --git a/server/options.go b/server/options.go index 3c8413a816..0f32631463 100644 --- a/server/options.go +++ b/server/options.go @@ -43,6 +43,7 @@ import ( aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter" "dubbo.apache.org/dubbo-go/v3/global" "dubbo.apache.org/dubbo-go/v3/graceful_shutdown" + "dubbo.apache.org/dubbo-go/v3/metrics/probe" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/base" "dubbo.apache.org/dubbo-go/v3/registry" @@ -108,6 +109,11 @@ func (srvOpts *ServerOptions) init(opts ...ServerOption) error { // init graceful_shutdown graceful_shutdown.Init(graceful_shutdown.SetShutdownConfig(srvOpts.Shutdown)) + // init probe + if probeCfg := probe.BuildProbeConfig(srvOpts.Metrics.Probe); probeCfg != nil { + probe.Init(probeCfg) + } + return nil } diff --git a/server/server.go b/server/server.go index 271db85f46..6692edd76a 100644 --- a/server/server.go +++ b/server/server.go @@ -38,6 +38,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/dubboutil" "dubbo.apache.org/dubbo-go/v3/metadata" + "dubbo.apache.org/dubbo-go/v3/metrics/probe" "dubbo.apache.org/dubbo-go/v3/registry/exposed_tmp" ) @@ -329,6 +330,8 @@ func (s *Server) Serve() error { if err := exposed_tmp.RegisterServiceInstance(); err != nil { return err } + probe.SetStartupComplete(true) + probe.SetReady(true) select {} }