feat: impletment discovery service by zookeeper#779
feat: impletment discovery service by zookeeper#779Road2Melon wants to merge 10 commits intoapache:masterfrom
Conversation
|
Thank you very much for your PR, but I noticed that this issue was closed a long time ago. Why is it necessary to implement this feature now? |
I noticed that there are many TODO items in the discovery module, and these issues are awaiting specific implementations (including zk, nacos, etc.). These features already have concrete implementations in seata-java. I also observed that the project's contribution documentation encourages the enhancement and enrichment of features. I am quite interested in this issue, so I dedicated my efforts to researching and implementing this functionality, which may help enrich the project's features and be beneficial in the future. |
…I failure in this branch due to variable deletion in other branch codes 2.resolve typeCheck error)
…ci error[Multiplication of durations: `zkConfig.SessionTimeout * time.Second`])
…tual assignment to grouplist (ineffassign)])
There was a problem hiding this comment.
Pull Request Overview
This PR implements ZooKeeper-based service discovery for Seata-Go, adding the ability to register and discover transaction coordinators (TC) through ZooKeeper instead of file-based configuration.
Key changes:
- Implements
ZookeeperRegistryServicewith real-time watch capabilities for node changes - Adds configuration support for ZooKeeper connection parameters
- Provides comprehensive unit testing with mock ZooKeeper client
Reviewed Changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/discovery/zk.go | Main implementation of ZooKeeper registry service with watch functionality |
| pkg/discovery/zk_test.go | Unit tests for ZooKeeper registry service |
| pkg/discovery/config.go | Configuration structure for ZooKeeper connection parameters |
| pkg/discovery/init.go | Integration of ZooKeeper registry into the discovery initialization |
| pkg/discovery/file.go | Added shared constant for IP:port splitting |
| pkg/discovery/mock/ | Mock interfaces and implementations for testing |
| go.mod | Added ZooKeeper client dependency |
| "strings" | ||
| "sync" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Production code should not import mock packages. The mock import should be conditionally imported or the interface should be defined in the main package to avoid coupling production code with test utilities.
| "seata.apache.org/seata-go/pkg/util/log" | |
| "strconv" | |
| "strings" | |
| "sync" | |
| ) | |
| // ZkConnInterface abstracts the methods used from zk.Conn for easier testing and mocking. | |
| type ZkConnInterface interface { | |
| GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) | |
| Get(path string) ([]byte, *zk.Stat, error) | |
| Close() error | |
| } |
| stopCh: make(chan struct{}), | ||
| } | ||
|
|
||
| //go zkRegistryService.watch(zookeeperClusterPrefix) |
There was a problem hiding this comment.
Remove commented out code. Dead code should be removed to maintain code cleanliness.
| //go zkRegistryService.watch(zookeeperClusterPrefix) |
| if err != nil { | ||
| return "", nil, fmt.Errorf("Invalid port in Zookeeper data: %w", err) | ||
| } | ||
| clusterName := strings.TrimPrefix(path, zookeeperClusterPrefix+"/") |
There was a problem hiding this comment.
The function uses hardcoded zookeeperClusterPrefix but the actual path being watched is zkConfig.NodePath. This will cause incorrect cluster name parsing when a custom node path is used.
| clusterName := strings.TrimPrefix(path, zookeeperClusterPrefix+"/") | |
| clusterName := strings.TrimPrefix(path, nodePathPrefix+"/") |
| */ | ||
|
|
||
| // Code generated by MockGen. DO NOT EDIT. | ||
| // Source: test_etcd_client.go |
There was a problem hiding this comment.
The comment indicates this was generated from 'test_etcd_client.go' but this is a ZooKeeper mock. The source comment should reference the correct file.
| // Source: test_etcd_client.go |
| if s.grouplist[clusterName] == nil { | ||
| s.grouplist[clusterName] = []*ServiceInstance{serverInstance} | ||
| } else { | ||
| s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance) |
There was a problem hiding this comment.
• There may be an issue of adding the same service instance repeatedly.
• It is necessary to check whether the instance already exists.
| data, _, events, err := s.conn.GetW(path) | ||
| if err != nil { | ||
| log.Infof("Failed to get server instances from Zookeeper: %v", err) | ||
| return |
There was a problem hiding this comment.
Here, if an error occurs, the entire watch will just end. Should we consider adding a retry mechanism?
for {
data, _, events, err := s.conn.GetW(path)
if err != nil {
if retries < maxRetries {
retries++
time.Sleep(time.Second * time.Duration(retries))
continue
}
log.Errorf("Failed to watch after %d retries: %v", maxRetries, err)
return
}
retries = 0 // Reset the retry count.
// ...
}
ok,I will handle it in the next few days.Thanks for the reminder. |
What this PR does:
Implement the discovery service by Zookeeper.
Which issue(s) this PR fixes:
Fixes #580
Special notes for your reviewer:
I use the GetW function to monitor changes in Zookeeper nodes. When a node at the corresponding path changes, the corresponding operation is performed. I have written unit tests and have also verified the ZK registration and listening logic in incubator-seata-go-samples. By simply modifying the seatago.yml file and replacing the registry center with Zookeeper, TM and RM will be able to retrieve the TC's address. Below is the ZK configuration in seatago.yml.
# Registration Center registry: type: zk file: name: file zk: server-addr: 127.0.0.1:2181 session-timeout: 6000 connect-timeout: # not used username: "" # not used password: "" # not used node-path: /registry-seata/defaultDoes this PR introduce a user-facing change?: