Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ func gracefulShutdownInit() {
if !exist {
return
}
if filter, ok := gracefulShutdownConsumerFilter.(Setter); ok && rootConfig.Shutdown != nil {
rc := GetRootConfig()
if filter, ok := gracefulShutdownConsumerFilter.(Setter); ok && rc != nil && rc.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown())
}

if filter, ok := gracefulShutdownProviderFilter.(Setter); ok && rootConfig.Shutdown != nil {
if filter, ok := gracefulShutdownProviderFilter.(Setter); ok && rc != nil && rc.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig, GetShutDown())
}

Expand Down Expand Up @@ -127,21 +128,22 @@ func destroyAllRegistries() {
func destroyProtocols() {
logger.Info("Graceful shutdown --- Destroy protocols. ")

if rootConfig.Protocols == nil {
rc := GetRootConfig()
if rc == nil || rc.Protocols == nil {
return
}

consumerProtocols := getConsumerProtocols()
consumerProtocols := getConsumerProtocols(rc)

destroyProviderProtocols(consumerProtocols)
destroyProviderProtocols(rc, consumerProtocols)
destroyConsumerProtocols(consumerProtocols)
}

// destroyProviderProtocols destroys the provider's protocol.
// if the protocol is consumer's protocol too, we will keep it
func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
func destroyProviderProtocols(rc *RootConfig, consumerProtocols *gxset.HashSet) {
logger.Info("Graceful shutdown --- First destroy provider's protocols. ")
for _, protocol := range rootConfig.Protocols {
for _, protocol := range rc.Protocols {
// the protocol is the consumer's protocol too, we can not destroy it.
if consumerProtocols.Contains(protocol.Name) {
continue
Expand All @@ -159,18 +161,19 @@ func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) {

func waitAndAcceptNewRequests() {
logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
if rootConfig.Shutdown == nil {
rc := GetRootConfig()
if rc == nil || rc.Shutdown == nil {
return
}

time.Sleep(rootConfig.Shutdown.GetConsumerUpdateWaitTime())
time.Sleep(rc.Shutdown.GetConsumerUpdateWaitTime())

timeout := rootConfig.Shutdown.GetStepTimeout()
timeout := rc.Shutdown.GetStepTimeout()
// ignore this step
if timeout < 0 {
return
}
waitingProviderProcessedTimeout(rootConfig.Shutdown)
waitingProviderProcessedTimeout(rc.Shutdown)
}

func waitingProviderProcessedTimeout(shutdownConfig *ShutdownConfig) {
Expand All @@ -193,12 +196,13 @@ func waitingProviderProcessedTimeout(shutdownConfig *ShutdownConfig) {
// for provider. It will wait for processing receiving requests
func waitForSendingAndReceivingRequests() {
logger.Info("Graceful shutdown --- Keep waiting until sending/accepting requests finish or timeout. ")
if rootConfig == nil || rootConfig.Shutdown == nil {
rc := GetRootConfig()
if rc == nil || rc.Shutdown == nil {
// ignore this step
return
}
rootConfig.Shutdown.RejectRequest.Store(true)
waitingConsumerProcessedTimeout(rootConfig.Shutdown)
rc.Shutdown.RejectRequest.Store(true)
waitingConsumerProcessedTimeout(rc.Shutdown)
}

func waitingConsumerProcessedTimeout(shutdownConfig *ShutdownConfig) {
Expand All @@ -217,21 +221,22 @@ func waitingConsumerProcessedTimeout(shutdownConfig *ShutdownConfig) {

func totalTimeout() time.Duration {
timeout := defaultShutDownTime
if rootConfig.Shutdown != nil && rootConfig.Shutdown.GetTimeout() > timeout {
timeout = rootConfig.Shutdown.GetTimeout()
rc := GetRootConfig()
if rc != nil && rc.Shutdown != nil && rc.Shutdown.GetTimeout() > timeout {
timeout = rc.Shutdown.GetTimeout()
}

return timeout
}

// we can not get the protocols from consumerConfig because some protocol don't have configuration, like jsonrpc.
func getConsumerProtocols() *gxset.HashSet {
func getConsumerProtocols(rc *RootConfig) *gxset.HashSet {
result := gxset.NewSet()
if rootConfig.Consumer == nil || rootConfig.Consumer.References == nil {
if rc == nil || rc.Consumer == nil || rc.Consumer.References == nil {
return result
}

for _, reference := range rootConfig.Consumer.References {
for _, reference := range rc.Consumer.References {
result.Add(reference.Protocol)
}
return result
Expand Down
65 changes: 43 additions & 22 deletions graceful_shutdown/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const (
)

var (
initOnce sync.Once
initOnce sync.Once
shutdownOnce sync.Once

proMu sync.Mutex
protocols map[string]struct{}
Expand Down Expand Up @@ -87,13 +88,13 @@ func Init(opts ...Option) {

go func() {
sig := <-signals
logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
logger.Infof("Received signal: %v, application is shutting down gracefully", sig)
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(newOpts.Shutdown), func() {
logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
logger.Warn("Graceful shutdown timeout, application will shutdown immediately")
os.Exit(0)
})
beforeShutdown(newOpts.Shutdown)
BeforeShutdown(newOpts.Shutdown)
// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range DumpHeapShutdownSignals {
if sig == dumpSignal {
Expand Down Expand Up @@ -124,28 +125,41 @@ func totalTimeout(shutdown *global.ShutdownConfig) time.Duration {
return timeout
}

func beforeShutdown(shutdown *global.ShutdownConfig) {
destroyRegistries()
// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
// The value of configuration depends on how long the clients will get notification.
waitAndAcceptNewRequests(shutdown)

// reject sending/receiving the new request but keeping waiting for accepting requests
waitForSendingAndReceivingRequests(shutdown)

// destroy all protocols
destroyProtocols()

logger.Info("Graceful shutdown --- Execute the custom callbacks.")
customCallbacks := extension.GetAllCustomShutdownCallbacks()
for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() {
callback.Value.(func())()
}
// BeforeShutdown performs graceful shutdown cleanup including:
// - Destroying registries
// - Waiting for active requests to complete
// - Destroying protocols
// - Executing custom shutdown callbacks
// This function can be called manually when InternalSignal is disabled.
func BeforeShutdown(shutdown *global.ShutdownConfig) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原来 beforeShutdown 是私有函数,只在 Init 的 goroutine 里调用一次。导出为 BeforeShutdown 后,server.go 可以直接调用,而 Init 内部的 goroutine(InternalSignal=true 时)也会调用。两次 destroyRegistries() + destroyProtocols() 会对已销毁的资源重复操作(double free 语义),行为未定义。虽然 server.goInternalSignal 判断来规避,但这依赖调用者自律。

建议:加 sync.Once 保证无论被谁调用多少次,只执行一次。

shutdownOnce.Do(func() {
destroyRegistries()
// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
// The value of configuration depends on how long the clients will get notification.
waitAndAcceptNewRequests(shutdown)

// reject sending/receiving the new request but keeping waiting for accepting requests
waitForSendingAndReceivingRequests(shutdown)

// destroy all protocols
destroyProtocols()

logger.Info("Graceful shutdown --- Execute the custom callbacks.")
customCallbacks := extension.GetAllCustomShutdownCallbacks()
for callback := customCallbacks.Front(); callback != nil; callback = callback.Next() {
callback.Value.(func())()
}
})
}

// destroyRegistries destroys RegistryProtocol directly.
func destroyRegistries() {
logger.Info("Graceful shutdown --- Destroy all registriesConfig. ")
defer func() {
if r := recover(); r != nil {
logger.Errorf("Failed to destroy registries: %v", r)
}
}()
registryProtocol := extension.GetProtocol(constant.RegistryProtocol)
registryProtocol.Destroy()
}
Expand Down Expand Up @@ -209,6 +223,13 @@ func destroyProtocols() {
// extension.GetProtocol might panic
defer proMu.Unlock()
for name := range protocols {
extension.GetProtocol(name).Destroy()
func() {
defer func() {
if r := recover(); r != nil {
logger.Errorf("Failed to destroy protocol %s: %v", name, r)
}
}()
extension.GetProtocol(name).Destroy()
}()
}
}
82 changes: 54 additions & 28 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package server

import (
"context"
"os"
"os/signal"
"reflect"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
)

Expand All @@ -37,6 +39,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/dubboutil"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/metadata"
"dubbo.apache.org/dubbo-go/v3/registry/exposed_tmp"
)
Expand Down Expand Up @@ -104,10 +107,12 @@ func (s *Server) genSvcOpts(handler any, info *common.ServiceInfo, opts ...Servi
return nil, errors.New("Server has not been initialized, please use NewServer() to create Server")
}
var svcOpts []ServiceOption

appCfg := s.cfg.Application
proCfg := s.cfg.Provider
prosCfg := s.cfg.Protocols
regsCfg := s.cfg.Registries

// todo(DMwangnima): record the registered service
// Record the registered service for debugging and monitoring
interfaceName := common.GetReference(handler)
Expand All @@ -116,7 +121,7 @@ func (s *Server) genSvcOpts(handler any, info *common.ServiceInfo, opts ...Servi
newSvcOpts := defaultServiceOptions()
if appCfg != nil {
svcOpts = append(svcOpts,
SetApplication(s.cfg.Application),
SetApplication(appCfg),
)
}
if proCfg != nil {
Expand Down Expand Up @@ -234,32 +239,33 @@ func createReflectionMethodFunc(method reflect.Method) func(ctx context.Context,
}
}

// Add a method with a name of a different first-letter case
// to achieve interoperability with java
// TODO: The method name case sensitivity in Dubbo-java should be addressed.
// We ought to make changes to handle this issue.
// enhanceServiceInfo fills in missing MethodFunc entries via reflection.
// Case-insensitive Triple routing is handled in the transport-layer route mux,
// but lowercase-first ServiceInfo method names still need MethodFunc backfill so
// reflection-based invocation can reach the exported Go method.
func enhanceServiceInfo(info *common.ServiceInfo) *common.ServiceInfo {
if info == nil {
return info
}

// Get service type for reflection-based method calls
var svcType reflect.Type
if info.ServiceType != nil {
svcType = reflect.TypeOf(info.ServiceType)
}

// Build method map for reflection lookup
// Build method map for reflection lookup.
// Keep the first-rune-swapped alias for lowercase-first ServiceInfo names
// (for example "sayHello" -> "SayHello") without duplicating metadata.
methodMap := make(map[string]reflect.Method)
if svcType != nil {
for i := 0; i < svcType.NumMethod(); i++ {
m := svcType.Method(i)
methodMap[m.Name] = m
methodMap[strings.ToLower(m.Name)] = m
methodMap[dubboutil.SwapCaseFirstRune(m.Name)] = m
}
}

// Add MethodFunc to methods that don't have it
// Fill in MethodFunc for methods that don't already have one.
for i := range info.Methods {
if info.Methods[i].MethodFunc == nil && svcType != nil {
if reflectMethod, ok := methodMap[info.Methods[i].Name]; ok {
Expand All @@ -268,26 +274,13 @@ func enhanceServiceInfo(info *common.ServiceInfo) *common.ServiceInfo {
}
}

// Create additional methods with swapped-case names for Java interoperability
var additionalMethods []common.MethodInfo
for _, method := range info.Methods {
newMethod := method
newMethod.Name = dubboutil.SwapCaseFirstRune(method.Name)
if method.MethodFunc != nil {
newMethod.MethodFunc = method.MethodFunc
} else if svcType != nil {
if reflectMethod, ok := methodMap[dubboutil.SwapCaseFirstRune(method.Name)]; ok {
newMethod.MethodFunc = createReflectionMethodFunc(reflectMethod)
}
}
additionalMethods = append(additionalMethods, newMethod)
}
info.Methods = append(info.Methods, additionalMethods...)

return info
}

func (s *Server) exportServices() error {
// add read lock to protect svcOptsMap data
s.mu.RLock()
defer s.mu.RUnlock()
for _, svcOpts := range s.svcOptsMap {
if err := svcOpts.Export(); err != nil {
logger.Errorf("export %s service failed, err: %s", svcOpts.Service.Interface, err)
Expand All @@ -299,12 +292,17 @@ func (s *Server) exportServices() error {

func (s *Server) Serve() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.serve {
// release lock in case causing deadlock
s.mu.Unlock()
return errors.New("server has already been started")
}
// prevent multiple calls to Serve
s.serve = true

// release lock in case causing deadlock
s.mu.Unlock()

// the registryConfig in ServiceOptions and ServerOptions all need to init a metadataReporter,
// when ServiceOptions.init() is called we don't know if a new registry config is set in the future use serviceOption
if err := metadata.InitRegistryMetadataReport(s.cfg.Registries); err != nil {
Expand All @@ -329,12 +327,40 @@ func (s *Server) Serve() error {
if err := exposed_tmp.RegisterServiceInstance(); err != nil {
return err
}
select {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serve() 全程持有写锁(通过 defer s.mu.Unlock()),但函数内部进入 select{} 永久阻塞,锁永不释放。这会饿死所有调用 GetServiceOptions()GetServiceInfo()GetRPCService() 等方法的读操作,健康检查、动态路由等都会死锁。

建议:serve = true 的赋值在锁内完成后立即释放锁,再进入阻塞等待。

// Listen for shutdown signals to enable graceful shutdown.
// Use the same signal set as the graceful_shutdown package for consistency.
shutdown := s.cfg.Shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, graceful_shutdown.ShutdownSignals...)
defer signal.Stop(sigChan)

// Block until a shutdown signal is received.
sig := <-sigChan
logger.Infof("Received signal: %v, application is shutting down gracefully", sig)

// Perform graceful shutdown cleanup.
// BeforeShutdown is protected by sync.Once, so even if graceful_shutdown.Init()
// (InternalSignal=true) also calls it concurrently, only one execution will run.
if shutdown != nil {
graceful_shutdown.BeforeShutdown(shutdown)
}

// Handle signals that require heap dump (e.g., SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGSYS)
for _, dumpSignal := range graceful_shutdown.DumpHeapShutdownSignals {
if sig == dumpSignal {
debug.WriteHeapDump(os.Stdout.Fd())
}
}

os.Exit(0)
return nil // unreachable, but satisfies the compiler
}

// In order to expose internal services
func (s *Server) exportInternalServices() error {
cfg := &ServiceOptions{}

cfg.Application = s.cfg.Application
cfg.Provider = s.cfg.Provider
cfg.Protocols = s.cfg.Protocols
Expand Down
Loading