-
Notifications
You must be signed in to change notification settings - Fork 1k
refactor(filter): replace config package for hystrix #3193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,30 +16,37 @@ | |
| */ | ||
|
|
||
| // Package hystrix provides hystrix filter. | ||
| // To use hystrix, you need to configure commands using hystrix-go API: | ||
| // | ||
| // import "github.com/afex/hystrix-go/hystrix" | ||
| // | ||
| // // Resource name format: dubbo:consumer:InterfaceName:group:version:Method(param1,param2) | ||
| // // Example: dubbo:consumer:com.example.GreetService:::Greet(string,string) | ||
| // hystrix.ConfigureCommand("dubbo:consumer:com.example.GreetService:::Greet(string,string)", hystrix.CommandConfig{ | ||
| // Timeout: 1000, | ||
| // MaxConcurrentRequests: 20, | ||
| // RequestVolumeThreshold: 20, | ||
| // SleepWindow: 5000, | ||
| // ErrorPercentThreshold: 50, | ||
| // }) | ||
| package hystrix | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "reflect" | ||
| "regexp" | ||
| "sync" | ||
| "strings" | ||
| ) | ||
|
|
||
| import ( | ||
| "github.com/afex/hystrix-go/hystrix" | ||
|
|
||
| "github.com/dubbogo/gost/log/logger" | ||
|
|
||
| perrors "github.com/pkg/errors" | ||
|
|
||
| "gopkg.in/yaml.v2" | ||
| ) | ||
|
|
||
| import ( | ||
| "dubbo.apache.org/dubbo-go/v3/common" | ||
| "dubbo.apache.org/dubbo-go/v3/common/constant" | ||
| "dubbo.apache.org/dubbo-go/v3/common/extension" | ||
| "dubbo.apache.org/dubbo-go/v3/config" | ||
| "dubbo.apache.org/dubbo-go/v3/filter" | ||
| "dubbo.apache.org/dubbo-go/v3/protocol/base" | ||
| "dubbo.apache.org/dubbo-go/v3/protocol/result" | ||
|
|
@@ -50,14 +57,6 @@ const ( | |
| HYSTRIX = "hystrix" | ||
| ) | ||
|
|
||
| var ( | ||
| confConsumer = &FilterConfig{} | ||
| confProvider = &FilterConfig{} | ||
| configLoadMutex = sync.RWMutex{} | ||
| consumerConfigOnce sync.Once | ||
| providerConfigOnce sync.Once | ||
| ) | ||
|
|
||
| func init() { | ||
| extension.SetFilter(constant.HystrixConsumerFilterKey, newFilterConsumer) | ||
| extension.SetFilter(constant.HystrixProviderFilterKey, newFilterProvider) | ||
|
|
@@ -87,111 +86,31 @@ func NewHystrixFilterError(err error, failByHystrix bool) error { | |
| } | ||
|
|
||
| // Filter for Hystrix | ||
| /** | ||
| * You should add hystrix related configuration in provider or consumer config or both, according to which side you are to apply Filter. | ||
| * For example: | ||
| * filter_conf: | ||
| * hystrix: | ||
| * configs: | ||
| * # =========== Define config here ============ | ||
| * "Default": | ||
| * timeout : 1000 | ||
| * max_concurrent_requests : 25 | ||
| * sleep_window : 5000 | ||
| * error_percent_threshold : 50 | ||
| * request_volume_threshold: 20 | ||
| * "userp": | ||
| * timeout: 2000 | ||
| * max_concurrent_requests: 512 | ||
| * sleep_window: 4000 | ||
| * error_percent_threshold: 35 | ||
| * request_volume_threshold: 6 | ||
| * "userp_m": | ||
| * timeout : 1200 | ||
| * max_concurrent_requests : 512 | ||
| * sleep_window : 6000 | ||
| * error_percent_threshold : 60 | ||
| * request_volume_threshold: 16 | ||
| * # =========== Define error whitelist which will be ignored by Hystrix counter ============ | ||
| * error_whitelist: [".*exception.*"] | ||
| * | ||
| * # =========== Apply default config here =========== | ||
| * default: "Default" | ||
| * | ||
| * services: | ||
| * "com.ikurento.user.UserProvider": | ||
| * # =========== Apply service level config =========== | ||
| * service_config: "userp" | ||
| * # =========== Apply method level config =========== | ||
| * methods: | ||
| * "GetUser": "userp_m" | ||
| * "GetUser1": "userp_m" | ||
| */ | ||
| type Filter struct { | ||
| COrP bool // true for consumer | ||
| res map[string][]*regexp.Regexp | ||
| ifNewMap sync.Map | ||
| COrP bool // true for consumer, false for provider | ||
| } | ||
|
|
||
| // Invoke is an implementation of filter, provides Hystrix pattern latency and fault tolerance | ||
| func (f *Filter) Invoke(ctx context.Context, invoker base.Invoker, invocation base.Invocation) result.Result { | ||
| cmdName := fmt.Sprintf("%s&method=%s", invoker.GetURL().Key(), invocation.MethodName()) | ||
| cmdName := getResourceName(invoker, invocation, f.COrP) | ||
|
||
|
|
||
| // Do the configuration if the circuit breaker is created for the first time | ||
| if _, load := f.ifNewMap.LoadOrStore(cmdName, true); !load { | ||
| configLoadMutex.Lock() | ||
| filterConf := getConfig(invoker.GetURL().Service(), invocation.MethodName(), f.COrP) | ||
| for _, ptn := range filterConf.Error { | ||
| reg, err := regexp.Compile(ptn) | ||
| if err != nil { | ||
| logger.Warnf("[Hystrix Filter]Errors occurred parsing error omit regexp: %s, %v", ptn, err) | ||
| } else { | ||
| if f.res == nil { | ||
| f.res = make(map[string][]*regexp.Regexp) | ||
| } | ||
| f.res[invocation.MethodName()] = append(f.res[invocation.MethodName()], reg) | ||
| } | ||
| } | ||
| hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{ | ||
| Timeout: filterConf.Timeout, | ||
| MaxConcurrentRequests: filterConf.MaxConcurrentRequests, | ||
| SleepWindow: filterConf.SleepWindow, | ||
| ErrorPercentThreshold: filterConf.ErrorPercentThreshold, | ||
| RequestVolumeThreshold: filterConf.RequestVolumeThreshold, | ||
| }) | ||
| configLoadMutex.Unlock() | ||
| } | ||
| configLoadMutex.RLock() | ||
| _, _, err := hystrix.GetCircuit(cmdName) | ||
| configLoadMutex.RUnlock() | ||
| if err != nil { | ||
| logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: %+v", cmdName, err) | ||
| return invoker.Invoke(ctx, invocation) | ||
| } | ||
| logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName) | ||
| var res result.Result | ||
| _ = hystrix.Do(cmdName, func() error { | ||
| err := hystrix.Do(cmdName, func() error { | ||
| res = invoker.Invoke(ctx, invocation) | ||
| err := res.Error() | ||
| if err != nil { | ||
| res.SetError(NewHystrixFilterError(err, false)) | ||
| for _, reg := range f.res[invocation.MethodName()] { | ||
| if reg.MatchString(err.Error()) { | ||
| logger.Debugf("[Hystrix Filter]Error in invocation but omitted in circuit breaker: %v; %s", err, cmdName) | ||
| return nil | ||
| } | ||
| } | ||
| } | ||
| return err | ||
| return res.Error() | ||
|
Comment on lines
+98
to
+100
|
||
| }, func(err error) error { | ||
| // Return error and if it is caused by hystrix logic, so that it can be handled by previous filters. | ||
| // Circuit is open, return fallback error | ||
| _, ok := err.(hystrix.CircuitError) | ||
| logger.Debugf("[Hystrix Filter]Hystrix health check counted, error is: %v, failed by hystrix: %v; %s", err, ok, cmdName) | ||
| logger.Debugf("[Hystrix Filter] Circuit opened for %s, failed by hystrix: %v", cmdName, ok) | ||
| res = &result.RPCResult{} | ||
| res.SetResult(nil) | ||
| res.SetError(NewHystrixFilterError(err, ok)) | ||
| return err | ||
| }) | ||
|
zbchi marked this conversation as resolved.
|
||
|
|
||
| if err != nil { | ||
| return res | ||
| } | ||
| return res | ||
| } | ||
|
|
||
|
|
@@ -202,134 +121,50 @@ func (f *Filter) OnResponse(ctx context.Context, result result.Result, invoker b | |
|
|
||
| // newFilterConsumer returns Filter instance for consumer | ||
| func newFilterConsumer() filter.Filter { | ||
| // When first called, load the config in | ||
| consumerConfigOnce.Do(func() { | ||
| if err := initConfigConsumer(); err != nil { | ||
| logger.Warnf("[Hystrix Filter]ShutdownConfig load failed for consumer, error is: %v , will use default", err) | ||
| } | ||
| }) | ||
| return &Filter{COrP: true} | ||
| } | ||
|
|
||
| // newFilterProvider returns Filter instance for provider | ||
| func newFilterProvider() filter.Filter { | ||
| providerConfigOnce.Do(func() { | ||
| if err := initConfigProvider(); err != nil { | ||
| logger.Warnf("[Hystrix Filter]ShutdownConfig load failed for provider, error is: %v , will use default", err) | ||
| } | ||
| }) | ||
| return &Filter{COrP: false} | ||
| } | ||
|
|
||
| func getConfig(service string, method string, cOrP bool) CommandConfigWithError { | ||
| // Find method level config | ||
| var conf *FilterConfig | ||
| if cOrP { | ||
| conf = confConsumer | ||
| } else { | ||
| conf = confProvider | ||
| } | ||
| getConf := conf.Configs[conf.Services[service].Methods[method]] | ||
| if getConf != nil { | ||
| logger.Infof("[Hystrix Filter]Found method-level config for %s - %s", service, method) | ||
| return *getConf | ||
| } | ||
| // Find service level config | ||
| getConf = conf.Configs[conf.Services[service].ServiceConfig] | ||
| if getConf != nil { | ||
| logger.Infof("[Hystrix Filter]Found service-level config for %s - %s", service, method) | ||
| return *getConf | ||
| } | ||
| // Find default config | ||
| getConf = conf.Configs[conf.Default] | ||
| if getConf != nil { | ||
| logger.Infof("[Hystrix Filter]Found global default config for %s - %s", service, method) | ||
| return *getConf | ||
| } | ||
| getConf = &CommandConfigWithError{} | ||
| logger.Infof("[Hystrix Filter]No config found for %s - %s, using default", service, method) | ||
| return *getConf | ||
| } | ||
| const ( | ||
| DefaultProviderPrefix = "dubbo:provider:" | ||
| DefaultConsumerPrefix = "dubbo:consumer:" | ||
| ) | ||
|
|
||
| func initConfigConsumer() error { | ||
| if config.GetConsumerConfig().FilterConf == nil { | ||
| return perrors.Errorf("no config for hystrix_consumer") | ||
| } | ||
| filterConf := config.GetConsumerConfig().FilterConf | ||
| var filterConfig any | ||
| switch reflect.ValueOf(filterConf).Interface().(type) { | ||
| case map[any]any: | ||
| filterConfig = config.GetConsumerConfig().FilterConf.(map[any]any)[HYSTRIX] | ||
| case map[string]any: | ||
| filterConfig = config.GetConsumerConfig().FilterConf.(map[string]any)[HYSTRIX] | ||
| } | ||
| if filterConfig == nil { | ||
| return perrors.Errorf("no config for hystrix_consumer") | ||
| } | ||
| hystrixConfByte, err := yaml.Marshal(filterConfig) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| err = yaml.Unmarshal(hystrixConfByte, confConsumer) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
| func getResourceName(invoker base.Invoker, invocation base.Invocation, isConsumer bool) string { | ||
| var sb strings.Builder | ||
|
|
||
| func initConfigProvider() error { | ||
| if config.GetProviderConfig().FilterConf == nil { | ||
| return perrors.Errorf("no config for hystrix_provider") | ||
| } | ||
| filterConfig := config.GetProviderConfig().FilterConf.(map[any]any)[HYSTRIX] | ||
| if filterConfig == nil { | ||
| return perrors.Errorf("no config for hystrix_provider") | ||
| } | ||
| hystrixConfByte, err := yaml.Marshal(filterConfig) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| err = yaml.Unmarshal(hystrixConfByte, confProvider) | ||
| if err != nil { | ||
| return err | ||
| if isConsumer { | ||
| sb.WriteString(DefaultConsumerPrefix) | ||
| } else { | ||
| sb.WriteString(DefaultProviderPrefix) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| //For sake of dynamic config | ||
| //func RefreshHystrix() error { | ||
| // conf = &FilterConfig{} | ||
| // hystrix.Flush() | ||
| // return initHystrixConfig() | ||
| //} | ||
|
|
||
| // CommandConfigWithError describes hystrix command configs with error whitelist. | ||
| type CommandConfigWithError struct { | ||
| Timeout int `yaml:"timeout"` | ||
| MaxConcurrentRequests int `yaml:"max_concurrent_requests"` | ||
| RequestVolumeThreshold int `yaml:"request_volume_threshold"` | ||
| SleepWindow int `yaml:"sleep_window"` | ||
| ErrorPercentThreshold int `yaml:"error_percent_threshold"` | ||
| Error []string `yaml:"error_whitelist"` | ||
| } | ||
| // Format: interface:group:version | ||
| sb.WriteString(getColonSeparatedKey(invoker.GetURL())) | ||
| sb.WriteString(":") | ||
| sb.WriteString(invocation.MethodName()) | ||
| sb.WriteString("(") | ||
|
|
||
| //ShutdownConfig: | ||
| //- Timeout: how long to wait for command to complete, in milliseconds | ||
| //- MaxConcurrentRequests: how many commands of the same type can run at the same time | ||
| //- RequestVolumeThreshold: the minimum number of requests needed before a circuit can be tripped due to health | ||
| //- SleepWindow: how long, in milliseconds, to wait after a circuit opens before testing for recovery | ||
| //- ErrorPercentThreshold: it causes circuits to open once the rolling measure of errors exceeds this percent of requests | ||
| //See hystrix doc | ||
| isFirst := true | ||
| for _, v := range invocation.ParameterTypes() { | ||
| if !isFirst { | ||
| sb.WriteString(",") | ||
| } | ||
| sb.WriteString(v.Name()) | ||
| isFirst = false | ||
| } | ||
| sb.WriteString(")") | ||
|
|
||
| // FilterConfig holds hystrix configs at default/service/method levels. | ||
| type FilterConfig struct { | ||
| Configs map[string]*CommandConfigWithError | ||
| Default string | ||
| Services map[string]ServiceHystrixConfig | ||
| return sb.String() | ||
| } | ||
|
|
||
| // ServiceHystrixConfig binds service-level and method-level hystrix configs. | ||
| type ServiceHystrixConfig struct { | ||
| ServiceConfig string `yaml:"service_config"` | ||
| Methods map[string]string | ||
| func getColonSeparatedKey(url *common.URL) string { | ||
| return fmt.Sprintf("%s:%s:%s", | ||
| url.Service(), | ||
| url.GetParam(constant.GroupKey, ""), | ||
| url.GetParam(constant.VersionKey, "")) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation example shows parameter types as "string,string" but the test shows empty parameter types with "()". The documentation should clarify that parameter types are included only when they exist, and show a clearer example. For instance, the test uses "TestMethod()" with no parameters, which doesn't match the documentation example format.