Skip to content

Commit ba204df

Browse files
prometherionmjuraga
authored andcommitted
OPTIM/MINOR: cluster: avoiding goroutines leak for monitorBootstrapKey
1 parent 7b52f13 commit ba204df

File tree

1 file changed

+67
-62
lines changed

1 file changed

+67
-62
lines changed

configuration/cluster_sync.go

Lines changed: 67 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -186,73 +186,78 @@ func (c *ClusterSync) issueRefreshRequest(url, port, basePath string, nodesPath
186186
}
187187

188188
func (c *ClusterSync) monitorBootstrapKey() {
189-
for range c.cfg.Notify.BootstrapKeyChanged.Subscribe("monitorBootstrapKey") {
190-
key := c.cfg.Cluster.BootstrapKey.Load()
191-
c.cfg.Cluster.CertificateFetched.Store(false)
192-
if key == "" {
193-
// do we need to delete cert here maybe?
194-
c.cfg.Cluster.ActiveBootstrapKey.Store("")
195-
err := c.cfg.Save()
189+
for {
190+
select {
191+
case <-c.cfg.Notify.BootstrapKeyChanged.Subscribe("monitorBootstrapKey"):
192+
key := c.cfg.Cluster.BootstrapKey.Load()
193+
c.cfg.Cluster.CertificateFetched.Store(false)
194+
if key == "" {
195+
// do we need to delete cert here maybe?
196+
c.cfg.Cluster.ActiveBootstrapKey.Store("")
197+
err := c.cfg.Save()
198+
if err != nil {
199+
log.Panic(err)
200+
}
201+
break
202+
}
203+
if key == c.cfg.Cluster.ActiveBootstrapKey.Load() {
204+
fetched := c.cfg.Cluster.CertificateFetched.Load()
205+
if !fetched {
206+
c.certFetch <- struct{}{}
207+
}
208+
break
209+
}
210+
data, err := DecodeBootstrapKey(key)
211+
if err != nil {
212+
log.Warning(err)
213+
}
214+
url := fmt.Sprintf("%s://%s", data["schema"], data["address"])
215+
c.cfg.Cluster.URL.Store(url)
216+
c.cfg.Cluster.Port.Store(data["port"])
217+
c.cfg.Cluster.APIBasePath.Store(data["api-base-path"])
218+
registerPath, ok := data["register-path"]
219+
if !ok {
220+
c.cfg.Cluster.APIRegisterPath.Store(data["path"])
221+
c.cfg.Cluster.APINodesPath.Store(data["path"])
222+
} else {
223+
c.cfg.Cluster.APIRegisterPath.Store(registerPath)
224+
c.cfg.Cluster.APINodesPath.Store(data["nodes-path"])
225+
}
226+
c.cfg.Cluster.Name.Store(data["name"])
227+
c.cfg.Cluster.Description.Store(data["description"])
228+
c.cfg.Mode.Store("cluster")
229+
err = c.cfg.Save()
196230
if err != nil {
197231
log.Panic(err)
198232
}
199-
continue
200-
}
201-
if key == c.cfg.Cluster.ActiveBootstrapKey.Load() {
202-
fetched := c.cfg.Cluster.CertificateFetched.Load()
203-
if !fetched {
204-
c.certFetch <- struct{}{}
233+
csr, key, err := generateCSR()
234+
if err != nil {
235+
log.Warning(err)
236+
break
205237
}
206-
continue
207-
}
208-
data, err := DecodeBootstrapKey(key)
209-
if err != nil {
210-
log.Warning(err)
211-
}
212-
url := fmt.Sprintf("%s://%s", data["schema"], data["address"])
213-
c.cfg.Cluster.URL.Store(url)
214-
c.cfg.Cluster.Port.Store(data["port"])
215-
c.cfg.Cluster.APIBasePath.Store(data["api-base-path"])
216-
registerPath, ok := data["register-path"]
217-
if !ok {
218-
c.cfg.Cluster.APIRegisterPath.Store(data["path"])
219-
c.cfg.Cluster.APINodesPath.Store(data["path"])
220-
} else {
221-
c.cfg.Cluster.APIRegisterPath.Store(registerPath)
222-
c.cfg.Cluster.APINodesPath.Store(data["nodes-path"])
223-
}
224-
c.cfg.Cluster.Name.Store(data["name"])
225-
c.cfg.Cluster.Description.Store(data["description"])
226-
c.cfg.Mode.Store("cluster")
227-
err = c.cfg.Save()
228-
if err != nil {
229-
log.Panic(err)
230-
}
231-
csr, key, err := generateCSR()
232-
if err != nil {
233-
log.Warning(err)
234-
continue
235-
}
236-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.key", c.cfg.Name.Load())), []byte(key), 0644)
237-
if err != nil {
238-
log.Warning(err)
239-
continue
240-
}
241-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s-csr.crt", c.cfg.Name.Load())), []byte(csr), 0644)
242-
if err != nil {
243-
log.Warning(err)
244-
continue
245-
}
246-
err = c.cfg.Save()
247-
if err != nil {
248-
log.Panic(err)
249-
}
250-
err = c.issueJoinRequest(url, data["port"], data["api-base-path"], c.cfg.Cluster.APIRegisterPath.Load(), csr, key)
251-
if err != nil {
252-
log.Warning(err)
253-
continue
238+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.key", c.cfg.Name.Load())), []byte(key), 0644)
239+
if err != nil {
240+
log.Warning(err)
241+
break
242+
}
243+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s-csr.crt", c.cfg.Name.Load())), []byte(csr), 0644)
244+
if err != nil {
245+
log.Warning(err)
246+
break
247+
}
248+
err = c.cfg.Save()
249+
if err != nil {
250+
log.Panic(err)
251+
}
252+
err = c.issueJoinRequest(url, data["port"], data["api-base-path"], c.cfg.Cluster.APIRegisterPath.Load(), csr, key)
253+
if err != nil {
254+
log.Warning(err)
255+
break
256+
}
257+
c.certFetch <- struct{}{}
258+
case <-c.Context.Done():
259+
return
254260
}
255-
c.certFetch <- struct{}{}
256261
}
257262
}
258263

0 commit comments

Comments
 (0)