Skip to content

Commit d3aa844

Browse files
committed
fix: synchronize remote debugging plugins across cluster nodes
Remote debugging plugins were not being synchronized across cluster nodes, causing "no plugin available nodes found" errors when trying to invoke plugins from different nodes. 1. **Remote debugging plugins not registered to cluster** - The `ClusterTunnel` notifier was not being added to ControlPanel 2. **Plugin ID inconsistency** - Remote plugins used different plugin_id formats during installation vs. querying 3. **Non-idempotent registration** - `RegisterPlugin` failed on reconnection with "plugin has been registered" error - **internal/types/models/curd/atomic.go**: - Unify plugin_id calculation for remote plugins (author/name without version) - Remove plugin_id from plugin query conditions - Clear old cache when plugin_id is updated - **internal/cluster/plugin.go**: - Make `RegisterPlugin` idempotent by updating existing plugin instead of returning error - **internal/core/control_panel/daemon.go**: - Add cluster field to ControlPanel - Add SetCluster() method for lazy cluster initialization - **internal/core/control_panel/server_debugger.go**: - Register remote debugging plugins to cluster on connection - Unregister from cluster on disconnection - **internal/core/plugin_manager/manager.go**: - Add SetCluster() method to set cluster after initialization - **internal/server/server.go**: - Call SetCluster() instead of AddClusterTunnel() Only remote debugging plugins are synchronized across cluster nodes. Local plugins run only on the node where they are installed and are not registered to the cluster. - Error handling improvements using `errors.Is()` instead of `==` - Handle 404 for missing plugin assets gracefully - Handle already-installed debugging plugins gracefully - Remote debugging plugin can be invoked from any node in the cluster - Plugin reconnection works without errors - Cache invalidation works correctly when plugin_id changes
1 parent e8f8f17 commit d3aa844

12 files changed

Lines changed: 292 additions & 52 deletions

File tree

internal/cluster/plugin.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,17 @@ func (c *Cluster) RegisterPlugin(lifetime plugin_entities.PluginLifetime) error
3737
}
3838

3939
if c.plugins.Exists(identity.String()) {
40-
return errors.New("plugin has been registered")
40+
// idempotent: plugin already registered, just update its state
41+
if existing, ok := c.plugins.Load(identity.String()); ok {
42+
// update the lifetime reference
43+
existing.lifetime = lifetime
44+
// update plugin state immediately
45+
err = c.doPluginStateUpdate(existing)
46+
if err != nil {
47+
return errors.Join(err, errors.New("failed to update plugin state"))
48+
}
49+
}
50+
return nil
4151
}
4252

4353
l := &pluginLifeTime{

internal/cluster/plugin_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,107 @@ func TestPluginScheduleLifetime(t *testing.T) {
140140
}
141141
}
142142

143+
func TestPluginRegisterIdempotent(t *testing.T) {
144+
plugin := getRandomPluginRuntime()
145+
cluster, err := createSimulationCluster(1)
146+
if err != nil {
147+
t.Errorf("create simulation cluster failed: %v", err)
148+
return
149+
}
150+
151+
launchSimulationCluster(cluster)
152+
defer closeSimulationCluster(cluster, t)
153+
154+
// wait for cluster to be ready
155+
time.Sleep(time.Second * 1)
156+
157+
// first registration should succeed
158+
err = cluster[0].RegisterPlugin(&plugin)
159+
if err != nil {
160+
t.Errorf("first register plugin failed: %v", err)
161+
return
162+
}
163+
164+
identity, err := plugin.Identity()
165+
if err != nil {
166+
t.Errorf("get plugin identity failed: %v", err)
167+
return
168+
}
169+
170+
hashedIdentity := plugin_entities.HashedIdentity(identity.String())
171+
172+
// wait for plugin to be scheduled
173+
time.Sleep(time.Second * 1)
174+
175+
// verify plugin is registered
176+
nodes, err := cluster[0].FetchPluginAvailableNodesByHashedId(hashedIdentity)
177+
if err != nil {
178+
t.Errorf("fetch plugin available nodes failed: %v", err)
179+
return
180+
}
181+
182+
if len(nodes) != 1 {
183+
t.Errorf("plugin not scheduled after first registration")
184+
return
185+
}
186+
187+
// second registration with same identity should be idempotent (no error)
188+
err = cluster[0].RegisterPlugin(&plugin)
189+
if err != nil {
190+
t.Errorf("second register plugin failed (should be idempotent): %v", err)
191+
return
192+
}
193+
194+
// verify plugin is still registered after second registration
195+
nodes, err = cluster[0].FetchPluginAvailableNodesByHashedId(hashedIdentity)
196+
if err != nil {
197+
t.Errorf("fetch plugin available nodes failed after second registration: %v", err)
198+
return
199+
}
200+
201+
if len(nodes) != 1 {
202+
t.Errorf("plugin not available after second registration")
203+
return
204+
}
205+
206+
// unregister the plugin
207+
if err := cluster[0].UnregisterPlugin(&plugin); err != nil {
208+
t.Errorf("unregister plugin failed: %v", err)
209+
return
210+
}
211+
212+
// verify plugin is unregistered
213+
nodes, err = cluster[0].FetchPluginAvailableNodesByHashedId(hashedIdentity)
214+
if err != nil {
215+
t.Errorf("fetch plugin available nodes failed after unregister: %v", err)
216+
return
217+
}
218+
219+
if len(nodes) != 0 {
220+
t.Errorf("plugin still available after unregister")
221+
return
222+
}
223+
224+
// registration after unregister should succeed again
225+
err = cluster[0].RegisterPlugin(&plugin)
226+
if err != nil {
227+
t.Errorf("register after unregister failed: %v", err)
228+
return
229+
}
230+
231+
// verify plugin is registered again
232+
nodes, err = cluster[0].FetchPluginAvailableNodesByHashedId(hashedIdentity)
233+
if err != nil {
234+
t.Errorf("fetch plugin available nodes failed after re-registration: %v", err)
235+
return
236+
}
237+
238+
if len(nodes) != 1 {
239+
t.Errorf("plugin not available after re-registration")
240+
return
241+
}
242+
}
243+
143244
// TODO: I need to implement this test, now it's randomly working
144245
// func TestPluginScheduleWhenMasterClusterShutdown(t *testing.T) {
145246
// plugins := []fakePlugin{

internal/core/control_panel/daemon.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"sync"
55
"time"
66

7+
"github.com/langgenius/dify-plugin-daemon/internal/cluster"
78
"github.com/langgenius/dify-plugin-daemon/internal/core/debugging_runtime"
89
"github.com/langgenius/dify-plugin-daemon/internal/core/local_runtime"
910
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_transport"
@@ -17,6 +18,9 @@ type ControlPanel struct {
1718
// app config
1819
config *app.Config
1920

21+
// cluster for remote debugging plugins
22+
cluster *cluster.Cluster
23+
2024
// debugging server
2125
debuggingServer *debugging_runtime.RemotePluginServer
2226

@@ -82,12 +86,14 @@ func NewControlPanel(
8286
mediaBucket *media_transport.MediaBucket,
8387
packageBucket *media_transport.PackageBucket,
8488
installedBucket *media_transport.InstalledBucket,
89+
cluster *cluster.Cluster,
8590
) *ControlPanel {
8691
return &ControlPanel{
8792
config: config,
8893
mediaBucket: mediaBucket,
8994
packageBucket: packageBucket,
9095
installedBucket: installedBucket,
96+
cluster: cluster,
9197

9298
localPluginLaunchingSemaphore: make(chan bool, config.PluginLocalLaunchingConcurrent),
9399

@@ -99,3 +105,7 @@ func NewControlPanel(
99105
localPluginInstallationLock: lock.NewGranularityLock(),
100106
}
101107
}
108+
109+
func (c *ControlPanel) SetCluster(cluster *cluster.Cluster) {
110+
c.cluster = cluster
111+
}

internal/core/control_panel/server_debugger.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ func (c *ControlPanel) onDebuggingRuntimeConnected(
3333
// store plugin runtime
3434
c.debuggingPluginRuntime.Store(pluginIdentifier, rpr)
3535

36+
if c.cluster != nil {
37+
if err = c.cluster.RegisterPlugin(rpr); err != nil {
38+
log.Error("failed to register remote debugging plugin to cluster", "error", err)
39+
}
40+
}
41+
3642
// notify notifiers a new debugging runtime is connected
3743
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
3844
notifier.OnDebuggingRuntimeConnected(rpr)
@@ -51,6 +57,12 @@ func (c *ControlPanel) onDebuggingRuntimeDisconnected(
5157
return
5258
}
5359

60+
if c.cluster != nil {
61+
if err = c.cluster.UnregisterPlugin(rpr); err != nil {
62+
log.Error("failed to unregister remote debugging plugin from cluster", "error", err)
63+
}
64+
}
65+
5466
// delete plugin runtime
5567
c.debuggingPluginRuntime.Delete(pluginIdentifier)
5668

internal/core/plugin_manager/cluster_tunnel.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,25 @@ func (t *ClusterTunnel) OnLocalRuntimeStopped(
6767
) {
6868
// NOP
6969
}
70+
71+
func (t *ClusterTunnel) OnLocalRuntimeScaleUp(
72+
runtime *local_runtime.LocalPluginRuntime,
73+
instanceNums int32,
74+
) {
75+
// NOP
76+
}
77+
78+
func (t *ClusterTunnel) OnLocalRuntimeScaleDown(
79+
runtime *local_runtime.LocalPluginRuntime,
80+
instanceNums int32,
81+
) {
82+
// NOP
83+
}
84+
85+
func (t *ClusterTunnel) OnLocalRuntimeInstanceLog(
86+
runtime *local_runtime.LocalPluginRuntime,
87+
instance *local_runtime.PluginInstance,
88+
event plugin_entities.PluginLogEvent,
89+
) {
90+
// NOP
91+
}

internal/core/plugin_manager/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
lru "github.com/hashicorp/golang-lru/v2"
88
"github.com/langgenius/dify-cloud-kit/oss"
9+
"github.com/langgenius/dify-plugin-daemon/internal/cluster"
910
controlpanel "github.com/langgenius/dify-plugin-daemon/internal/core/control_panel"
1011
"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
1112
"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation/calldify"
@@ -80,6 +81,7 @@ func InitGlobalManager(oss oss.OSS, config *app.Config) *PluginManager {
8081
mediaBucket,
8182
packageBucket,
8283
installedBucket,
84+
nil, // cluster will be set later via SetCluster
8385
),
8486
config: config,
8587
}
@@ -91,6 +93,10 @@ func InitGlobalManager(oss oss.OSS, config *app.Config) *PluginManager {
9193
return manager
9294
}
9395

96+
func (p *PluginManager) SetCluster(cluster *cluster.Cluster) {
97+
p.controlPanel.SetCluster(cluster)
98+
}
99+
94100
func Manager() *PluginManager {
95101
return manager
96102
}

internal/server/controllers/plugins.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ func GetAsset(c *gin.Context) {
1818
asset, err := pluginManager.GetAsset(c.Param("id"))
1919

2020
if err != nil {
21+
if strings.Contains(err.Error(), "no such file or directory") {
22+
return
23+
}
2124
c.JSON(http.StatusInternalServerError, exception.InternalServerError(err).ToResponse())
2225
return
2326
}

internal/server/middleware.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (app *App) FetchPluginInstallation() gin.HandlerFunc {
5757
},
5858
)
5959

60-
if err == db.ErrDatabaseNotFound {
60+
if errors.Is(err, db.ErrDatabaseNotFound) {
6161
ctx.AbortWithStatusJSON(404, exception.ErrPluginNotFound().ToResponse())
6262
return
6363
}
@@ -119,6 +119,7 @@ func (app *App) redirectPluginInvokeByPluginIdentifier(
119119
// try find the correct node
120120
nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_unique_identifier.String())
121121
if err != nil {
122+
log.Error("Failed to fetch plugin nodes by id", "error", err)
122123
ctx.AbortWithStatusJSON(
123124
500,
124125
exception.InternalServerError(
@@ -127,6 +128,7 @@ func (app *App) redirectPluginInvokeByPluginIdentifier(
127128
)
128129
return
129130
} else if len(nodes) == 0 {
131+
log.Error("no plugin available nodes found")
130132
ctx.AbortWithStatusJSON(
131133
404,
132134
exception.InternalServerError(

internal/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ func (app *App) Run(config *app.Config) {
106106
// create cluster
107107
app.cluster = cluster.NewCluster(config)
108108

109+
// set cluster to control panel for remote debugging plugin synchronization
110+
app.pluginManager.SetCluster(app.cluster)
111+
109112
// init manager
110113
app.pluginManager.Launch(config)
111114

internal/service/install_plugin.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,10 @@ func UninstallPlugin(
316316
db.Equal("tenant_id", tenant_id),
317317
db.Equal("id", plugin_installation_id),
318318
)
319-
if err == db.ErrDatabaseNotFound {
320-
return exception.ErrPluginNotFound().ToResponse()
321-
}
322319
if err != nil {
320+
if errors.Is(err, db.ErrDatabaseNotFound) {
321+
return entities.NewSuccessResponse(true)
322+
}
323323
return exception.InternalServerError(err).ToResponse()
324324
}
325325

@@ -349,7 +349,7 @@ func UninstallPlugin(
349349
pluginInstallationCacheKey := helper.PluginInstallationCacheKey(pluginUniqueIdentifier.PluginID(), tenant_id)
350350
_, _ = cache.AutoDelete[models.PluginInstallation](pluginInstallationCacheKey)
351351

352-
if deleteResponse.IsPluginDeleted && deleteResponse.Plugin != nil && deleteResponse.Plugin.InstallType == plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL {
352+
if deleteResponse != nil && deleteResponse.IsPluginDeleted && deleteResponse.Plugin != nil && deleteResponse.Plugin.InstallType == plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL {
353353
manager := plugin_manager.Manager()
354354
if manager == nil {
355355
return exception.InternalServerError(errors.New("plugin manager is not initialized")).ToResponse()
@@ -360,7 +360,7 @@ func UninstallPlugin(
360360
}
361361

362362
shutdownCh, err := manager.ShutdownLocalPluginGracefully(pluginUniqueIdentifier)
363-
if err == controlpanel.ErrLocalPluginRuntimeNotFound {
363+
if errors.Is(err, controlpanel.ErrLocalPluginRuntimeNotFound) {
364364
return entities.NewSuccessResponse(true)
365365
} else if err != nil {
366366
return exception.InternalServerError(err).ToResponse()

0 commit comments

Comments
 (0)