diff --git a/cmd/common.go b/cmd/common.go index 6835a0f5c..3c47cbb58 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -17,6 +17,7 @@ func Init() { bootstrap.Log() bootstrap.InitDB() data.InitData() + bootstrap.InitPlugins() bootstrap.InitStreamLimit() bootstrap.InitIndex() bootstrap.InitUpgradePatch() diff --git a/go.mod b/go.mod index a2489c3c3..86fbf8f2e 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/OpenListTeam/sftpd-openlist v1.0.1 github.com/OpenListTeam/tache v0.2.0 github.com/OpenListTeam/times v0.1.0 + github.com/OpenListTeam/wazero-wasip2 v0.0.0-20251015145605-cd3a2c9131d9 github.com/OpenListTeam/wopan-sdk-go v0.1.5 github.com/ProtonMail/go-crypto v1.3.0 github.com/SheltonZhu/115driver v1.1.1 @@ -63,6 +64,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 github.com/tchap/go-patricia/v2 v2.3.3 + github.com/tetratelabs/wazero v1.9.0 github.com/u2takey/ffmpeg-go v0.5.0 github.com/upyun/go-sdk/v3 v3.0.4 github.com/winfsp/cgofuse v1.6.0 @@ -192,7 +194,7 @@ require ( github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect github.com/bytedance/sonic v1.13.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-semver v0.3.1 github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect @@ -303,3 +305,5 @@ replace github.com/ProtonMail/go-proton-api => github.com/henrybear327/go-proton replace github.com/cronokirby/saferith => github.com/Da3zKi7/saferith v0.33.0-fixed // replace github.com/OpenListTeam/115-sdk-go => ../../OpenListTeam/115-sdk-go + +replace google.golang.org/genproto => google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 diff --git a/go.sum b/go.sum index f8d6c74d5..6fbbe88a4 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/OpenListTeam/tache v0.2.0 h1:Q4MjuyECn0CZCf1ZF91JaVaZTaps1mOTAm8bFj8s github.com/OpenListTeam/tache v0.2.0/go.mod h1:qmnZ/VpY2DUlmjg3UoDeNFy/LRqrw0biN3hYEEGc/+A= github.com/OpenListTeam/times v0.1.0 h1:qknxw+qj5CYKgXAwydA102UEpPcpU8TYNGRmwRyPYpg= github.com/OpenListTeam/times v0.1.0/go.mod h1:Jx7qen5NCYzKk2w14YuvU48YYMcPa1P9a+EJePC15Pc= +github.com/OpenListTeam/wazero-wasip2 v0.0.0-20251015145605-cd3a2c9131d9 h1:yddTD9Fxh6bLMLmG0hSR7Eh6XkoK0RMlE4N1e6/+Iy8= +github.com/OpenListTeam/wazero-wasip2 v0.0.0-20251015145605-cd3a2c9131d9/go.mod h1:+BpydPG2cUQHYFwH3/lVmvXyMl/zxHW+XM+XTSzqu2Q= github.com/OpenListTeam/wopan-sdk-go v0.1.5 h1:iKKcVzIqBgtGDbn0QbdWrCazSGxXFmYFyrnFBG+U8dI= github.com/OpenListTeam/wopan-sdk-go v0.1.5/go.mod h1:otynv0CgSNUClPpUgZ44qCZGcMRe0dc83Pkk65xAunI= github.com/ProtonMail/bcrypt v0.0.0-20210511135022-227b4adcab57/go.mod h1:HecWFHognK8GfRDGnFQbW/LiV7A3MX3gZVs45vk5h8I= @@ -688,6 +690,8 @@ github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543 h1:6Y51mutOvRGRx6K github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543/go.mod h1:jpwqYA8KUVEvSUJHkCXsnBRJCSKP1BMa81QZ6kvRpow= github.com/tchap/go-patricia/v2 v2.3.3 h1:xfNEsODumaEcCcY3gI0hYPZ/PcpVv5ju6RMAhgwZDDc= github.com/tchap/go-patricia/v2 v2.3.3/go.mod h1:VZRHKAb53DLaG+nA9EaYYiaEx6YztwDlLElMsnSHD4k= +github.com/tetratelabs/wazero v1.9.0 h1:IcZ56OuxrtaEz8UYNRHBrUa9bYeX9oVY93KspZZBf/I= +github.com/tetratelabs/wazero v1.9.0/go.mod h1:TSbcXCfFP0L2FGkRPxHphadXPjo1T6W+CseNNY7EkjM= github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4= github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso= diff --git a/internal/alloc/alloc_other.go b/internal/alloc/alloc_other.go new file mode 100644 index 000000000..f8f5b3dfb --- /dev/null +++ b/internal/alloc/alloc_other.go @@ -0,0 +1,24 @@ +//go:build !unix && !windows + +package alloc // import "github.com/ncruces/go-sqlite3/internal/alloc" + +import "github.com/tetratelabs/wazero/experimental" + +func NewMemory(cap, max uint64) experimental.LinearMemory { + return &sliceMemory{make([]byte, 0, cap)} +} + +type sliceMemory struct { + buf []byte +} + +func (b *sliceMemory) Free() {} + +func (b *sliceMemory) Reallocate(size uint64) []byte { + if cap := uint64(cap(b.buf)); size > cap { + b.buf = append(b.buf[:cap], make([]byte, size-cap)...) + } else { + b.buf = b.buf[:size] + } + return b.buf +} diff --git a/internal/alloc/alloc_test.go b/internal/alloc/alloc_test.go new file mode 100644 index 000000000..62c440521 --- /dev/null +++ b/internal/alloc/alloc_test.go @@ -0,0 +1,14 @@ +package alloc_test // import "github.com/ncruces/go-sqlite3/internal/alloc" + +import ( + "math" + "testing" + + "github.com/OpenListTeam/OpenList/v4/internal/alloc" +) + +func TestVirtual(t *testing.T) { + defer func() { _ = recover() }() + alloc.NewMemory(math.MaxInt+2, math.MaxInt+2) + t.Error("want panic") +} diff --git a/internal/alloc/alloc_unix.go b/internal/alloc/alloc_unix.go new file mode 100644 index 000000000..ad79ef340 --- /dev/null +++ b/internal/alloc/alloc_unix.go @@ -0,0 +1,75 @@ +//go:build unix + +package alloc // import "github.com/ncruces/go-sqlite3/internal/alloc" + +import ( + "math" + + "github.com/tetratelabs/wazero/experimental" + "golang.org/x/sys/unix" +) + +func NewMemory(cap, max uint64) experimental.LinearMemory { + // Round up to the page size. + rnd := uint64(unix.Getpagesize() - 1) + res := (max + rnd) &^ rnd + + if res > math.MaxInt { + // This ensures int(res) overflows to a negative value, + // and unix.Mmap returns EINVAL. + res = math.MaxUint64 + } + + com := res + prot := unix.PROT_READ | unix.PROT_WRITE + if cap < max { // Commit memory only if cap=max. + com = 0 + prot = unix.PROT_NONE + } + + // Reserve res bytes of address space, to ensure we won't need to move it. + // A protected, private, anonymous mapping should not commit memory. + b, err := unix.Mmap(-1, 0, int(res), prot, unix.MAP_PRIVATE|unix.MAP_ANON) + if err != nil { + panic(err) + } + return &mmappedMemory{buf: b[:com]} +} + +// The slice covers the entire mmapped memory: +// - len(buf) is the already committed memory, +// - cap(buf) is the reserved address space. +type mmappedMemory struct { + buf []byte +} + +func (m *mmappedMemory) Reallocate(size uint64) []byte { + com := uint64(len(m.buf)) + res := uint64(cap(m.buf)) + if com < size && size <= res { + // Grow geometrically, round up to the page size. + rnd := uint64(unix.Getpagesize() - 1) + new := com + com>>3 + new = min(max(size, new), res) + new = (new + rnd) &^ rnd + + // Commit additional memory up to new bytes. + err := unix.Mprotect(m.buf[com:new], unix.PROT_READ|unix.PROT_WRITE) + if err != nil { + return nil + } + + m.buf = m.buf[:new] // Update committed memory. + } + // Limit returned capacity because bytes beyond + // len(m.buf) have not yet been committed. + return m.buf[:size:len(m.buf)] +} + +func (m *mmappedMemory) Free() { + err := unix.Munmap(m.buf[:cap(m.buf)]) + if err != nil { + panic(err) + } + m.buf = nil +} diff --git a/internal/alloc/alloc_windows.go b/internal/alloc/alloc_windows.go new file mode 100644 index 000000000..1a17b402a --- /dev/null +++ b/internal/alloc/alloc_windows.go @@ -0,0 +1,76 @@ +package alloc // import "github.com/ncruces/go-sqlite3/internal/alloc" + +import ( + "math" + "unsafe" + + "github.com/tetratelabs/wazero/experimental" + "golang.org/x/sys/windows" +) + +func NewMemory(cap, max uint64) experimental.LinearMemory { + // Round up to the page size. + rnd := uint64(windows.Getpagesize() - 1) + res := (max + rnd) &^ rnd + + if res > math.MaxInt { + // This ensures uintptr(res) overflows to a large value, + // and windows.VirtualAlloc returns an error. + res = math.MaxUint64 + } + + com := res + kind := windows.MEM_COMMIT + if cap < max { // Commit memory only if cap=max. + com = 0 + kind = windows.MEM_RESERVE + } + + // Reserve res bytes of address space, to ensure we won't need to move it. + r, err := windows.VirtualAlloc(0, uintptr(res), uint32(kind), windows.PAGE_READWRITE) + if err != nil { + panic(err) + } + buf := unsafe.Slice((*byte)(unsafe.Pointer(r)), int(max)) + mem := virtualMemory{addr: r, buf: buf[:com:res]} + return &mem +} + +// The slice covers the entire mmapped memory: +// - len(buf) is the already committed memory, +// - cap(buf) is the reserved address space. +type virtualMemory struct { + buf []byte + addr uintptr +} + +func (m *virtualMemory) Reallocate(size uint64) []byte { + com := uint64(len(m.buf)) + res := uint64(cap(m.buf)) + if com < size && size <= res { + // Grow geometrically, round up to the page size. + rnd := uint64(windows.Getpagesize() - 1) + new := com + com>>3 + new = min(max(size, new), res) + new = (new + rnd) &^ rnd + + // Commit additional memory up to new bytes. + _, err := windows.VirtualAlloc(m.addr, uintptr(new), windows.MEM_COMMIT, windows.PAGE_READWRITE) + if err != nil { + return nil + } + + m.buf = m.buf[:new] // Update committed memory. + } + // Limit returned capacity because bytes beyond + // len(m.buf) have not yet been committed. + return m.buf[:size:len(m.buf)] +} + +func (m *virtualMemory) Free() { + err := windows.VirtualFree(m.addr, 0, windows.MEM_RELEASE) + if err != nil { + panic(err) + } + m.addr = 0 +} diff --git a/internal/bootstrap/plugin.go b/internal/bootstrap/plugin.go new file mode 100644 index 000000000..5c2d92046 --- /dev/null +++ b/internal/bootstrap/plugin.go @@ -0,0 +1,23 @@ +// internal/bootstrap/plugin.go +package bootstrap + +import ( + "context" + "fmt" + + "github.com/OpenListTeam/OpenList/v4/cmd/flags" + "github.com/OpenListTeam/OpenList/v4/internal/plugin" +) + +// InitPlugins 初始化插件管理器 +func InitPlugins() { + // 2. 创建并初始化 Manager + // "data" 目录应从配置中获取 + manager, err := plugin.NewManager(context.Background(), flags.DataDir) + if err != nil { + // 在启动时,如果插件系统失败,应该 panic + panic(fmt.Sprintf("Failed to initialize plugin manager: %v", err)) + } + + plugin.PluginManager = manager +} diff --git a/internal/db/db.go b/internal/db/db.go index 96529c15d..c886dc58c 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -12,7 +12,7 @@ var db *gorm.DB func Init(d *gorm.DB) { db = d - err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey), new(model.SharingDB)) + err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem), new(model.SSHPublicKey), new(model.SharingDB), new(model.Plugin)) if err != nil { log.Fatalf("failed migrate database: %s", err.Error()) } diff --git a/internal/db/plugin.go b/internal/db/plugin.go new file mode 100644 index 000000000..66c446880 --- /dev/null +++ b/internal/db/plugin.go @@ -0,0 +1,47 @@ +package db + +import ( + "context" + + "github.com/OpenListTeam/OpenList/v4/internal/model" + "gorm.io/gorm" +) + +// CreatePlugin 在数据库中插入一条新的插件记录 +// 如果记录已存在,则会更新它 (Upsert) +func CreatePlugin(ctx context.Context, plugin *model.Plugin) error { + return db.WithContext(ctx).Save(plugin).Error +} + +// GetPluginByID 从数据库中根据 ID 查询单个插件 +func GetPluginByID(ctx context.Context, id string) (*model.Plugin, error) { + var plugin model.Plugin + err := db.WithContext(ctx).First(&plugin, "id = ?", id).Error + if err != nil { + if err == gorm.ErrRecordNotFound { + return nil, nil // 返回 nil, nil 表示未找到 + } + return nil, err + } + return &plugin, nil +} + +// GetAllPlugins 从数据库中获取所有已安装的插件 +func GetAllPlugins(ctx context.Context) ([]*model.Plugin, error) { + var plugins []*model.Plugin + err := db.WithContext(ctx).Find(&plugins).Error + return plugins, err +} + +// DeletePluginByID 从数据库中根据 ID 删除一个插件 +func DeletePluginByID(ctx context.Context, id string) error { + return db.WithContext(ctx).Delete(&model.Plugin{}, "id = ?", id).Error +} + +// UpdatePluginStatus 更新指定插件的状态和消息 +func UpdatePluginStatus(ctx context.Context, pluginID string, status model.PluginStatus, message string) error { + return db.WithContext(ctx).Model(&model.Plugin{}).Where("id = ?", pluginID).Updates(map[string]interface{}{ + "status": status, + "message": message, + }).Error +} diff --git a/internal/driver/item.go b/internal/driver/item.go index e8b0c8bf4..b20908a3f 100644 --- a/internal/driver/item.go +++ b/internal/driver/item.go @@ -19,6 +19,10 @@ type Info struct { Config Config `json:"config"` } +type IGetItem interface { + GetItems() []Item +} + type IRootPath interface { GetRootPath() string } diff --git a/internal/model/plugin.go b/internal/model/plugin.go new file mode 100644 index 000000000..a2e1445c9 --- /dev/null +++ b/internal/model/plugin.go @@ -0,0 +1,42 @@ +package model + +import "time" + +// PluginStatus 定义了插件的几种可能状态 +type PluginStatus string + +const ( + // StatusActive 表示插件已成功加载并正在运行 + StatusActive PluginStatus = "active" + // StatusInactive 表示插件已安装但未加载(例如,等待重启) + StatusInactive PluginStatus = "inactive" + // StatusError 表示插件在加载或运行时遇到错误 + StatusError PluginStatus = "error" +) + +type Plugin struct { + // 插件的唯一标识符,例如 "com.openlist.driver.s3" + // 这是主键 + ID string `gorm:"primaryKey" json:"id"` + + // --- 来自插件元数据 --- + Name string `json:"name"` + Version string `json:"version"` + Author string `json:"author"` + Description string `gorm:"type:text" json:"description"` + IconURL string `json:"icon_url"` + + // --- 管理器需要的信息 --- + // 插件的下载源地址 + SourceURL string `json:"source_url"` + // Wasm 文件在本地的存储路径 + WasmPath string `json:"wasm_path"` + + // 新增状态字段 + Status PluginStatus `gorm:"default:'inactive'" json:"status"` + Message string `gorm:"type:text" json:"message"` // 用于存储错误信息 + + // --- GORM 自动管理字段 --- + CreatedAt time.Time `json:"-"` + UpdatedAt time.Time `json:"-"` +} diff --git a/internal/op/driver.go b/internal/op/driver.go index f25b3a6b1..fcf122e59 100644 --- a/internal/op/driver.go +++ b/internal/op/driver.go @@ -15,12 +15,27 @@ type DriverConstructor func() driver.Driver var driverMap = map[string]DriverConstructor{} var driverInfoMap = map[string]driver.Info{} -func RegisterDriver(driver DriverConstructor) { +func RegisterDriver(driver DriverConstructor) error { // log.Infof("register driver: [%s]", config.Name) tempDriver := driver() + if tempDriver == nil { + return errors.New("register driver is null") + } tempConfig := tempDriver.Config() + + if driverMap[tempConfig.Name] != nil { + return errors.New("driver is registered") + } registerDriverItems(tempConfig, tempDriver.GetAddition()) driverMap[tempConfig.Name] = driver + return nil +} + +func UnRegisterDriver(driver DriverConstructor) { + if tempDriver := driver(); tempDriver != nil { + tempConfig := tempDriver.Config() + delete(driverMap, tempConfig.Name) + } } func GetDriver(name string) (DriverConstructor, error) { @@ -45,12 +60,18 @@ func GetDriverInfoMap() map[string]driver.Info { func registerDriverItems(config driver.Config, addition driver.Additional) { // log.Debugf("addition of %s: %+v", config.Name, addition) - tAddition := reflect.TypeOf(addition) - for tAddition.Kind() == reflect.Pointer { - tAddition = tAddition.Elem() + var additionalItems []driver.Item + if v, ok := addition.(driver.IGetItem); ok { + additionalItems = v.GetItems() + } else { + tAddition := reflect.TypeOf(addition) + for tAddition.Kind() == reflect.Pointer { + tAddition = tAddition.Elem() + } + additionalItems = getAdditionalItems(tAddition, config.DefaultRoot) } + mainItems := getMainItems(config) - additionalItems := getAdditionalItems(tAddition, config.DefaultRoot) driverInfoMap[config.Name] = driver.Info{ Common: mainItems, Additional: additionalItems, diff --git a/internal/plugin/driver.go b/internal/plugin/driver.go new file mode 100644 index 000000000..769428f10 --- /dev/null +++ b/internal/plugin/driver.go @@ -0,0 +1,909 @@ +package plugin + +import ( + "context" + stderrors "errors" + "fmt" + "io" + "net/http" + "os" + "runtime" + "sync/atomic" + + "github.com/OpenListTeam/OpenList/v4/internal/alloc" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/op" + plugin_warp "github.com/OpenListTeam/OpenList/v4/internal/plugin/warp" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/http_range" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + log "github.com/sirupsen/logrus" + + pool "github.com/jolestar/go-commons-pool/v2" + + manager_io "github.com/OpenListTeam/wazero-wasip2/manager/io" + io_v_0_2 "github.com/OpenListTeam/wazero-wasip2/wasip2/io/v0_2" + witgo "github.com/OpenListTeam/wazero-wasip2/wit-go" + + "github.com/pkg/errors" + + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" +) + +var PluginPrefix = "openlist:plugin-driver/exports@0.1.0#" + +// DriverPlugin 是*插件*管理器 (每个 .wasm 文件一个) +// 它管理共享的 wazero 资源 +type DriverPlugin struct { + plugin *PluginInfo + runtime wazero.Runtime // 共享的 wazero 运行时 + compiledModule wazero.CompiledModule // 共享的已编译模块 + host *DriverHost // 注册的 wasi host 资源, 这里的self.driver始终为nil +} + +// WasmInstance 代表池中的一个可重用对象 +// 它包含一个活动的 WASM 实例及其宿主/Guest API +type WasmInstance struct { + instance api.Module + exports *DriverHost + guest *witgo.Host +} + +// 内部函数,用于动态调用 Guest 以获取属性 +func (d *WasmInstance) GetProperties(ctx context.Context) (plugin_warp.DriverProps, error) { + var propertiesResult plugin_warp.DriverProps + err := d.guest.Call(ctx, PluginPrefix+"get-properties", &propertiesResult) + if err != nil { + return plugin_warp.DriverProps{}, err + } + return propertiesResult, nil +} + +// 内部函数,用于动态调用 Guest 以获取表单 +func (d *WasmInstance) GetFormMeta(ctx context.Context) ([]plugin_warp.FormField, error) { + var formMeta []plugin_warp.FormField + err := d.guest.Call(ctx, PluginPrefix+"get-form-meta", &formMeta) + if err != nil { + return nil, err + } + return formMeta, nil +} + +func (i *WasmInstance) Close() error { + return i.instance.Close(context.Background()) + // exports 借用WasmDriver的资源这里不销毁 +} + +// 用于创建和管理 WasmInstance +type driverPoolFactory struct { + ctx context.Context + driver *WasmDriver // 指向 WasmDriver (状态持有者) + compiledModule wazero.CompiledModule // 共享的模块 + runtime wazero.Runtime // 共享的运行时 + host *DriverHost +} + +func (f *driverPoolFactory) makeObject(ctx context.Context) (*WasmInstance, error) { + // 1. 配置模块 + moduleConfig := wazero.NewModuleConfig(). + WithFS(os.DirFS("/")). + WithStartFunctions("_initialize"). + WithStdout(os.Stdout). + WithStderr(os.Stderr). + WithStdin(os.Stdin). + // WithSysNanosleep(). + // WithSysNanotime(). + // WithSysWalltime(). + WithOsyield(func() { + runtime.Gosched() + }). + WithName(f.driver.plugin.plugin.ID) + + instanceCtx := experimental.WithMemoryAllocator(f.ctx, experimental.MemoryAllocatorFunc(alloc.NewMemory)) + + // 2. 实例化共享的已编译模块 + instance, err := f.runtime.InstantiateModule(instanceCtx, f.compiledModule, moduleConfig) + if err != nil { + return nil, fmt.Errorf("failed to instantiate module: %w", err) + } + + // 3. 创建 Guest API + guest, err := witgo.NewHost(instance) + if err != nil { + instance.Close(ctx) + return nil, err + } + + // 5. 组装 WasmInstance + wasmInstance := &WasmInstance{ + instance: instance, + exports: f.host, + guest: guest, + } + return wasmInstance, nil +} + +// MakeObject 创建一个新的 WasmInstance 并将其放入池中 +func (f *driverPoolFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { + wasmInstance, err := f.makeObject(ctx) + if err != nil { + return nil, err + } + + // 设置Host端句柄,用于配置获取等host端方法 + if err := wasmInstance.guest.Call(ctx, PluginPrefix+"set-handle", nil, uint32(f.driver.ID)); err != nil { + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + wasmInstance.Close() + return nil, errors.New("Internal error in plugin") + } + + // 调用实例的初始化方法 + ctxHandle := f.host.ContextManager().Add(ctx) + defer f.host.ContextManager().Remove(ctxHandle) + + var result witgo.Result[witgo.Unit, plugin_warp.ErrCode] + if err := wasmInstance.guest.Call(ctx, PluginPrefix+"init", &result, ctxHandle); err != nil { + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + wasmInstance.Close() + return nil, errors.New("Internal error in plugin") + } + if result.Err != nil { + wasmInstance.Close() + return nil, result.Err.ToError() + } + + return pool.NewPooledObject(wasmInstance), nil +} + +// DestroyObject 销毁池中的 WasmInstance +func (f *driverPoolFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { + instance := object.Object.(*WasmInstance) + log.Debugf("Destroying pooled WASM instance for plugin: %s", f.driver.Storage.MountPath) + + var err error + // 4. 调用实例的销毁化方法 + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + var result witgo.Result[witgo.Unit, plugin_warp.ErrCode] + if err = instance.guest.Call(ctx, PluginPrefix+"drop", &result, ctxHandle); err != nil { + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + err = errors.New("Internal error in plugin") + } else if result.Err != nil { + err = result.Err.ToError() + } + + return stderrors.Join(err, instance.Close()) +} + +// ValidateObject 验证实例是否仍然有效 +func (f *driverPoolFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { + instance := object.Object.(*WasmInstance) + return instance.instance != nil && !instance.instance.IsClosed() +} + +// ActivateObject 在借用时调用 +func (f *driverPoolFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { + return nil +} + +// PassivateObject 在归还时调用 +func (f *driverPoolFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { + return nil +} + +// WasmDriver 是*驱动*实例 (每个挂载点一个) +// 它管理池和*状态* +type WasmDriver struct { + model.Storage + flag uint32 + + plugin *DriverPlugin + + host *DriverHost + pool *pool.ObjectPool + + config plugin_warp.DriverProps + additional plugin_warp.Additional +} + +// NewDriverPlugin +// 创建插件管理器 +func NewDriverPlugin(ctx context.Context, plugin *PluginInfo) (*DriverPlugin, error) { + wasmBytes, err := os.ReadFile(plugin.WasmPath) + if err != nil { + return nil, fmt.Errorf("failed to read wasm file '%s': %w", plugin.WasmPath, err) + } + + // 1. 创建共享的 wazero 运行时 + rt := wazero.NewRuntime(ctx) + + // 2. 注册 wasip1/wasip2 资源 + wasi_snapshot_preview1.MustInstantiate(ctx, rt) + host := NewDriverHost() + if err := host.Instantiate(ctx, rt); err != nil { + return nil, err + } + + // 3. 编译共享的模块 + compiledModule, err := rt.CompileModule(ctx, wasmBytes) + if err != nil { + rt.Close(ctx) + return nil, fmt.Errorf("failed to compile wasm module for plugin '%s': %w", plugin.ID, err) + } + + // 4. 创建 DriverPlugin 实例(管理器) + driverPlugin := &DriverPlugin{ + plugin: plugin, + runtime: rt, + compiledModule: compiledModule, + host: host, + } + return driverPlugin, nil +} + +// Close 关闭共享的 wazero 运行时 +func (dp *DriverPlugin) Close(ctx context.Context) error { + log.Infof("Closing plugin runtime for: %s", dp.plugin.ID) + if dp.runtime != nil { + return dp.runtime.Close(ctx) + } + return nil +} + +// NewWasmDriver +// 创建*驱动实例* (每个挂载一个) +func (dp *DriverPlugin) NewWasmDriver() (driver.Driver, error) { + ctx := context.Background() // Factory/Pool context + + // 1. 创建 WasmDriver 实例 (状态持有者) + driver := &WasmDriver{ + plugin: dp, // 指向共享资源的管理器 + host: dp.host, + } + + type WasmDirverWarp struct { + *WasmDriver + } + driverWarp := &WasmDirverWarp{driver} + runtime.SetFinalizer(driverWarp, func(driver *WasmDirverWarp) { + dp.host.driver.Remove(uint32(driver.ID)) + }) + + // 3. 创建池工厂 + factory := &driverPoolFactory{ + ctx: ctx, + driver: driver, + compiledModule: dp.compiledModule, + runtime: dp.runtime, + host: dp.host, + } + + // 4. 配置并创建池 + poolConfig := pool.NewDefaultPoolConfig() + poolConfig.MaxIdle = 2 + poolConfig.MaxTotal = 8 + poolConfig.TestOnBorrow = true + poolConfig.BlockWhenExhausted = true + driver.pool = pool.NewObjectPool(ctx, factory, poolConfig) + + // 5. 首次获取插件信息 + initConfig := func() error { + instance, err := factory.makeObject(ctx) + if err != nil { + return err + } + defer instance.Close() + + props, err := instance.GetProperties(ctx) + if err != nil { + return fmt.Errorf("failed to refresh properties: %w", err) + } + driver.config = props + + forms, err := instance.GetFormMeta(ctx) + if err != nil { + return fmt.Errorf("failed to refresh forms: %w", err) + } + driver.additional.Forms = forms + return nil + } + if err := initConfig(); err != nil { + driver.Close(ctx) // 构造失败,关闭池 + return nil, err + } + return driverWarp, nil +} + +// Close (在 WasmDriver 上) 关闭此*实例*的池 +func (d *WasmDriver) Close(ctx context.Context) error { + log.Infof("Closing pool for driver: %s", d.MountPath) + if d.pool != nil { + d.pool.Close(ctx) + } + return nil +} + +// handleError 处理 wasm 驱动返回的错误 +func (d *WasmDriver) handleError(errcode *plugin_warp.ErrCode) error { + if errcode != nil { + err := errcode.ToError() + if errcode.Unauthorized != nil && d.Status == op.WORK { + if atomic.CompareAndSwapUint32(&d.flag, 0, 1) { + d.Status = err.Error() + op.MustSaveDriverStorage(d) + atomic.StoreUint32(&d.flag, 0) + } + return err + } + return err + } + return nil +} + +// // 内部函数,用于动态调用 Guest 以获取属性 +// func (d *WasmDriver) getProperties(ctx context.Context) (plugin_warp.DriverProps, error) { +// obj, err := d.pool.BorrowObject(ctx) +// if err != nil { +// return plugin_warp.DriverProps{}, fmt.Errorf("failed to borrow wasm instance: %w", err) +// } +// instance := obj.(*WasmInstance) +// defer d.pool.ReturnObject(ctx, obj) + +// return instance.GetProperties(ctx) +// } + +// // 内部函数,用于动态调用 Guest 以获取表单 +// func (d *WasmDriver) getFormMeta(ctx context.Context) ([]plugin_warp.FormField, error) { +// obj, err := d.pool.BorrowObject(ctx) +// if err != nil { +// return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) +// } +// instance := obj.(*WasmInstance) +// defer d.pool.ReturnObject(ctx, obj) + +// return instance.GetFormMeta(ctx) +// } + +// Config 返回缓存的配置 +func (d *WasmDriver) Config() driver.Config { + // props, err := d.getProperties(context.Background()) + // if err != nil { + // log.Errorf("failed to get properties: %s", err) + // return d.config.ToConfig() + // } + + // d.config = props + return d.config.ToConfig() +} + +func (d *WasmDriver) GetAddition() driver.Additional { + // newFormMeta, err := d.getFormMeta(context.Background()) + // if err != nil { + // log.Errorf("failed to get form meta: %s", err) + // return &d.additional + // } + // d.additional.Forms = newFormMeta + return &d.additional +} + +// Init 初始化驱动 +func (d *WasmDriver) Init(ctx context.Context) error { + log.Debugf("Re-initializing pool for plugin %s by clearing idle.", d.MountPath) + d.pool.Clear(ctx) + + // 注册 + d.host.driver.Set(uint32(d.ID), d) + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return fmt.Errorf("failed to pre-warm pool after re-init: %w", err) + } + d.pool.ReturnObject(ctx, obj) + return nil +} + +// Drop 销毁驱动 (由 Guest 调用) +func (d *WasmDriver) Drop(ctx context.Context) error { + log.Infof("Guest triggered Drop, closing pool for driver: %s", d.MountPath) + return d.Close(ctx) +} + +func (d *WasmDriver) GetRoot(ctx context.Context) (model.Obj, error) { + if !d.config.Capabilitys.ListFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + var result witgo.Result[plugin_warp.Object, plugin_warp.ErrCode] + err = instance.guest.Call(ctx, PluginPrefix+"get-root", &result, ctxHandle) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + return result.Ok, nil +} + +// GetFile 获取文件信息 +func (d *WasmDriver) Get(ctx context.Context, path string) (model.Obj, error) { + if !d.config.Capabilitys.GetFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + var result witgo.Result[plugin_warp.Object, plugin_warp.ErrCode] + err = instance.guest.Call(ctx, PluginPrefix+"get-file", &result, ctxHandle, path) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + return result.Ok, nil +} + +// List 列出文件 +func (d *WasmDriver) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) { + if !d.config.Capabilitys.ListFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + robj := dir.(*plugin_warp.Object) + var result witgo.Result[[]plugin_warp.Object, plugin_warp.ErrCode] + + param := struct { + Handle plugin_warp.Context + Obj *plugin_warp.Object + }{ctxHandle, robj} + err = instance.guest.Call(ctx, PluginPrefix+"list-files", &result, param) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + + if result.Err != nil { + return nil, d.handleError(result.Err) + } + return utils.MustSliceConvert(*result.Ok, func(o plugin_warp.Object) model.Obj { return &o }), nil +} + +// Link 获取文件直链或读取流 +func (d *WasmDriver) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { + if !d.config.Capabilitys.LinkFile { + return nil, errs.NotImplement + } + + // 这部分资源全由Host端管理 + // TODO: 或许应该把创建的Stream生命周期一同绑定到此处结束,防止忘记关闭导致的资源泄漏 + + pobj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := pobj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, pobj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + headersHandle := instance.exports.HTTPManager().Fields.Add(args.Header) + defer instance.exports.HTTPManager().Fields.Remove(headersHandle) + + obj := file.(*plugin_warp.Object) + + var result witgo.Result[plugin_warp.LinkResult, plugin_warp.ErrCode] + + param := struct { + Handle plugin_warp.Context + Obj *plugin_warp.Object + LinkArgs plugin_warp.LinkArgs + }{ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}} + err = instance.guest.Call(ctx, PluginPrefix+"link-file", &result, param) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + // 覆盖旧的Obj + if result.Ok.File.IsSome() { + *obj = *result.Ok.File.Some + } + + if result.Ok.Resource.Direct != nil { + direct := result.Ok.Resource.Direct + header, _ := instance.exports.HTTPManager().Fields.Pop(direct.Header) + link := &model.Link{URL: direct.Url, Header: http.Header(header)} + if direct.Expiratcion.IsSome() { + exp := direct.Expiratcion.Some.ToDuration() + link.Expiration = &exp + } + return link, nil + } + + if result.Ok.Resource.RangeStream != nil { + fileSize := obj.GetSize() + return &model.Link{ + RangeReader: stream.RateLimitRangeReaderFunc(func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) { + var size uint64 + if httpRange.Length < 0 || httpRange.Start+httpRange.Length > fileSize { + size = uint64(fileSize - httpRange.Start) + } else { + size = uint64(httpRange.Length) + } + + pobj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, err + } + instance := pobj.(*WasmInstance) + + r, w := io.Pipe() + cw := &checkWriter{W: w, N: size} + streamHandle := instance.exports.StreamManager().Add(&manager_io.Stream{ + Writer: cw, + CheckWriter: cw, + }) + ctxHandle := instance.exports.ContextManager().Add(ctx) + + type RangeSpec struct { + Offset uint64 + Size uint64 + Stream io_v_0_2.OutputStream + } + + var result witgo.Result[witgo.Unit, plugin_warp.ErrCode] + param := struct { + Handle plugin_warp.Context + Obj *plugin_warp.Object + LinkArgs plugin_warp.LinkArgs + RangeSpec RangeSpec + }{ctxHandle, obj, plugin_warp.LinkArgs{IP: args.IP, Header: headersHandle}, RangeSpec{Offset: uint64(httpRange.Start), Size: size, Stream: streamHandle}} + + go func() { + defer d.pool.ReturnObject(ctx, instance) + defer instance.exports.ContextManager().Remove(ctxHandle) + + if err := instance.guest.Call(ctx, PluginPrefix+"link-range", &result, param); err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + w.CloseWithError(errs.NotImplement) + return + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + w.CloseWithError(err) + return + } + + if result.Err != nil { + w.CloseWithError(d.handleError(result.Err)) + return + } + }() + + return utils.NewReadCloser(r, func() error { + instance.exports.StreamManager().Remove(streamHandle) + return r.Close() + }), nil + }), + }, nil + } + + return nil, errs.NotImplement +} + +type checkWriter struct { + W io.Writer + N uint64 +} + +func (c *checkWriter) Write(p []byte) (n int, err error) { + if c.N <= 0 { + return 0, stderrors.New("write limit exceeded") + } + n, err = c.W.Write(p[:min(uint64(len(p)), c.N)]) + c.N -= uint64(n) + return +} +func (c *checkWriter) CheckWrite() uint64 { + return max(c.N, 1) +} + +func (d *WasmDriver) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) { + if !d.config.Capabilitys.MkdirFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + robj := parentDir.(*plugin_warp.Object) + var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode] + + if err := instance.guest.Call(ctx, PluginPrefix+"make-dir", &result, ctxHandle, robj, dirName); err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + return result.Ok.Some, nil +} + +func (d *WasmDriver) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) { + if !d.config.Capabilitys.RenameFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + robj := srcObj.(*plugin_warp.Object) + var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode] + + err = instance.guest.Call(ctx, PluginPrefix+"rename-file", &result, ctxHandle, robj, newName) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + return result.Ok.Some, nil +} + +func (d *WasmDriver) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + if !d.config.Capabilitys.MoveFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + srcobj := srcObj.(*plugin_warp.Object) + dstobj := dstDir.(*plugin_warp.Object) + + var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode] + + err = instance.guest.Call(ctx, PluginPrefix+"move-file", &result, ctxHandle, srcobj, dstobj) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + return result.Ok.Some, nil +} + +func (d *WasmDriver) Remove(ctx context.Context, srcObj model.Obj) error { + if !d.config.Capabilitys.RemoveFile { + return errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + srcobj := srcObj.(*plugin_warp.Object) + + var result witgo.Result[witgo.Unit, plugin_warp.ErrCode] + + err = instance.guest.Call(ctx, PluginPrefix+"remove-file", &result, ctxHandle, srcobj) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return errors.New("Internal error in plugin") + } + + if result.Err != nil { + return d.handleError(result.Err) + } + + return nil +} + +func (d *WasmDriver) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + if !d.config.Capabilitys.CopyFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + srcobj := srcObj.(*plugin_warp.Object) + dstobj := dstDir.(*plugin_warp.Object) + + var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode] + + err = instance.guest.Call(ctx, PluginPrefix+"copy-file", &result, ctxHandle, srcobj, dstobj) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + return result.Ok.Some, nil +} + +func (d *WasmDriver) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { + if !d.config.Capabilitys.UploadFile { + return nil, errs.NotImplement + } + + obj, err := d.pool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("failed to borrow wasm instance: %w", err) + } + instance := obj.(*WasmInstance) + defer d.pool.ReturnObject(ctx, obj) + + ctxHandle := instance.exports.ContextManager().Add(ctx) + defer instance.exports.ContextManager().Remove(ctxHandle) + + stream := instance.exports.uploads.Add(&plugin_warp.UploadReadableType{FileStreamer: file, UpdateProgress: up}) + defer instance.exports.uploads.Remove(stream) + + dstobj := dstDir.(*plugin_warp.Object) + + var result witgo.Result[witgo.Option[plugin_warp.Object], plugin_warp.ErrCode] + + exist := witgo.None[plugin_warp.Object]() + if file.GetExist() != nil { + exist = witgo.Some(plugin_warp.ConvertObjToObject(file.GetExist())) + } + + uploadReq := &plugin_warp.UploadRequest{ + Target: plugin_warp.ConvertObjToObject(file), + Content: stream, + Exist: exist, + } + + err = instance.guest.Call(ctx, PluginPrefix+"upload-file", &result, ctxHandle, dstobj, uploadReq) + if err != nil { + if errors.Is(err, witgo.ErrNotExportFunc) { + return nil, errs.NotImplement + } + // 这里就不返回错误了,避免大量栈数据 + log.Errorln(err) + return nil, errors.New("Internal error in plugin") + } + + if result.Err != nil { + return nil, d.handleError(result.Err) + } + + return result.Ok.Some, nil +} + +var _ driver.Meta = (*WasmDriver)(nil) +var _ driver.Reader = (*WasmDriver)(nil) +var _ driver.Getter = (*WasmDriver)(nil) +var _ driver.GetRooter = (*WasmDriver)(nil) +var _ driver.MkdirResult = (*WasmDriver)(nil) +var _ driver.RenameResult = (*WasmDriver)(nil) +var _ driver.MoveResult = (*WasmDriver)(nil) +var _ driver.Remove = (*WasmDriver)(nil) +var _ driver.CopyResult = (*WasmDriver)(nil) +var _ driver.PutResult = (*WasmDriver)(nil) diff --git a/internal/plugin/host.go b/internal/plugin/host.go new file mode 100644 index 000000000..f756f2554 --- /dev/null +++ b/internal/plugin/host.go @@ -0,0 +1,284 @@ +package plugin + +import ( + "context" + "io" + "maps" + + log "github.com/sirupsen/logrus" + "github.com/tetratelabs/wazero" + + "github.com/OpenListTeam/OpenList/v4/internal/op" + plugin_warp "github.com/OpenListTeam/OpenList/v4/internal/plugin/warp" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/http_range" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + + manager_io "github.com/OpenListTeam/wazero-wasip2/manager/io" + "github.com/OpenListTeam/wazero-wasip2/wasip2" + wasi_clocks "github.com/OpenListTeam/wazero-wasip2/wasip2/clocks" + wasi_filesystem "github.com/OpenListTeam/wazero-wasip2/wasip2/filesystem" + wasi_http "github.com/OpenListTeam/wazero-wasip2/wasip2/http" + wasi_io "github.com/OpenListTeam/wazero-wasip2/wasip2/io" + io_v0_2 "github.com/OpenListTeam/wazero-wasip2/wasip2/io/v0_2" + wasi_random "github.com/OpenListTeam/wazero-wasip2/wasip2/random" + wasi_sockets "github.com/OpenListTeam/wazero-wasip2/wasip2/sockets" + witgo "github.com/OpenListTeam/wazero-wasip2/wit-go" +) + +type DriverHost struct { + *wasip2.Host + contexts *plugin_warp.ContextManaget + uploads *plugin_warp.UploadReadableManager + + driver *witgo.ResourceManager[*WasmDriver] +} + +func NewDriverHost() *DriverHost { + waspi2_host := wasip2.NewHost( + wasi_io.Module("0.2.2"), + wasi_filesystem.Module("0.2.2"), + wasi_random.Module("0.2.2"), + wasi_clocks.Module("0.2.2"), + wasi_sockets.Module("0.2.0"), + wasi_http.Module("0.2.0"), + ) + return &DriverHost{ + Host: waspi2_host, + contexts: plugin_warp.NewContextManager(), + uploads: plugin_warp.NewUploadManager(), + driver: witgo.NewResourceManager[*WasmDriver](nil), + } +} + +func (host *DriverHost) Instantiate(ctx context.Context, rt wazero.Runtime) error { + if err := host.Host.Instantiate(ctx, rt); err != nil { + return err + } + + module := rt.NewHostModuleBuilder("openlist:plugin-driver/host@0.1.0") + exports := witgo.NewExporter(module) + + exports.Export("log", host.Log) + exports.Export("load-config", host.LoadConfig) + exports.Export("save-config", host.SaveConfig) + if _, err := exports.Instantiate(ctx); err != nil { + return err + } + + moduleType := rt.NewHostModuleBuilder("openlist:plugin-driver/types@0.1.0") + exportsType := witgo.NewExporter(moduleType) + exportsType.Export("[resource-drop]cancellable", host.DropContext) + exportsType.Export("[method]cancellable.subscribe", host.Subscribe) + + exportsType.Export("[resource-drop]readable", host.DropReadable) + exportsType.Export("[method]readable.streams", host.Stream) + exportsType.Export("[method]readable.peek", host.StreamPeek) + exportsType.Export("[method]readable.chunks", host.Chunks) + exportsType.Export("[method]readable.next-chunk", host.NextChunk) + exportsType.Export("[method]readable.chunk-reset", host.ChunkReset) + exportsType.Export("[method]readable.get-hasher", host.GetHasher) + exportsType.Export("[method]readable.update-progress", host.UpdateProgress) + if _, err := exportsType.Instantiate(ctx); err != nil { + return err + } + + return nil +} + +func (host *DriverHost) ContextManager() *plugin_warp.ContextManaget { + return host.contexts +} + +func (host *DriverHost) UploadManager() *plugin_warp.UploadReadableManager { + return host.uploads +} + +func (host *DriverHost) DropReadable(this plugin_warp.UploadReadable) { + host.uploads.Remove(this) +} + +func (host *DriverHost) DropContext(this plugin_warp.Context) { + host.contexts.Remove(this) +} + +// log: func(level: log-level, message: string); +func (host *DriverHost) Log(level plugin_warp.LogLevel, message string) { + if level.Debug != nil { + log.Debugln(message) + } else if level.Error != nil { + log.Errorln(message) + } else if level.Info != nil { + log.Infoln(message) + } else if level.Warn != nil { + log.Warnln(message) + } else { + log.Traceln(message) + } +} + +// load-config: func(driver: u32) -> result, string>; +func (host *DriverHost) LoadConfig(driverHandle uint32) witgo.Result[[]byte, string] { + driver, ok := host.driver.Get(driverHandle) + if !ok || driver == nil { + return witgo.Err[[]byte]("host.driver is null, loading timing too early") + } + return witgo.Ok[[]byte, string](driver.additional.Bytes()) +} + +// save-config: func(driver: u32, config: list) -> result<_, string>; +func (host *DriverHost) SaveConfig(driverHandle uint32, config []byte) witgo.Result[witgo.Unit, string] { + driver, ok := host.driver.Get(driverHandle) + if !ok || driver == nil { + return witgo.Err[witgo.Unit]("host.driver is null, loading timing too early") + } + + driver.additional.SetBytes(config) + op.MustSaveDriverStorage(driver) + return witgo.Ok[witgo.Unit, string](witgo.Unit{}) +} + +// streams: func() -> result; +func (host *DriverHost) Stream(this plugin_warp.UploadReadable) witgo.Result[io_v0_2.InputStream, string] { + upload, ok := host.uploads.Get(this) + if !ok { + return witgo.Err[io_v0_2.InputStream]("UploadReadable::Stream: ErrorCodeBadDescriptor") + } + if upload.StreamConsume { + return witgo.Err[io_v0_2.InputStream]("UploadReadable::Stream: StreamConsume") + } + + upload.StreamConsume = true + streamHandle := host.StreamManager().Add(&manager_io.Stream{Reader: upload, Seeker: upload.GetFile()}) + return witgo.Ok[io_v0_2.InputStream, string](streamHandle) +} + +// peek: func(offset: u64, len: u64) -> result; +func (host *DriverHost) StreamPeek(this plugin_warp.UploadReadable, offset uint64, len uint64) witgo.Result[io_v0_2.InputStream, string] { + upload, ok := host.uploads.Get(this) + if !ok { + return witgo.Err[io_v0_2.InputStream]("UploadReadable::StreamPeek: ErrorCodeBadDescriptor") + } + if upload.StreamConsume { + return witgo.Err[io_v0_2.InputStream]("UploadReadable::StreamPeek: StreamConsume") + } + + peekReader, err := upload.RangeRead(http_range.Range{Start: int64(offset), Length: int64(len)}) + if err != nil { + return witgo.Err[io_v0_2.InputStream](err.Error()) + } + seeker, _ := peekReader.(io.Seeker) + streamHandle := host.StreamManager().Add(&manager_io.Stream{Reader: peekReader, Seeker: seeker}) + return witgo.Ok[io_v0_2.InputStream, string](streamHandle) +} + +// chunks: func(len: u32) -> result; +func (host *DriverHost) Chunks(this plugin_warp.UploadReadable, len uint32) witgo.Result[uint32, string] { + upload, ok := host.uploads.Get(this) + if !ok { + return witgo.Err[uint32]("UploadReadable::Chunks: ErrorCodeBadDescriptor") + } + if upload.StreamConsume { + return witgo.Err[uint32]("UploadReadable::Chunks: StreamConsume") + } + if upload.SectionReader != nil { + return witgo.Err[uint32]("UploadReadable::Chunks: Already exist chunk reader") + } + + ss, err := stream.NewStreamSectionReader(upload, int(len), &upload.UpdateProgress) + if err != nil { + return witgo.Err[uint32](err.Error()) + } + chunkSize := int64(len) + upload.SectionReader = &plugin_warp.StreamSectionReader{StreamSectionReaderIF: ss, CunketSize: chunkSize} + return witgo.Ok[uint32, string](uint32((upload.GetSize() + chunkSize - 1) / chunkSize)) +} + +// next-chunk: func() -> result; +func (host *DriverHost) NextChunk(this plugin_warp.UploadReadable) witgo.Result[io_v0_2.InputStream, string] { + upload, ok := host.uploads.Get(this) + if !ok { + return witgo.Err[io_v0_2.InputStream]("UploadReadable::NextChunk: ErrorCodeBadDescriptor") + } + if upload.SectionReader == nil { + return witgo.Err[io_v0_2.InputStream]("UploadReadable::NextChunk: No chunk reader") + } + + chunkSize := min(upload.SectionReader.CunketSize, upload.GetSize()-upload.SectionReader.Offset) + sr, err := upload.SectionReader.GetSectionReader(upload.SectionReader.Offset, chunkSize) + if err != nil { + return witgo.Err[io_v0_2.InputStream](err.Error()) + } + upload.SectionReader.Offset += chunkSize + streamHandle := host.StreamManager().Add(&manager_io.Stream{Reader: sr, Seeker: sr, Closer: utils.CloseFunc(func() error { + upload.SectionReader.FreeSectionReader(sr) + return nil + })}) + return witgo.Ok[io_v0_2.InputStream, string](streamHandle) +} + +// chunk-reset: func(chunk: input-stream) -> result<_, string>; +func (host *DriverHost) ChunkReset(this plugin_warp.UploadReadable, chunk io_v0_2.InputStream) witgo.Result[witgo.Unit, string] { + stream, ok := host.StreamManager().Get(chunk) + if !ok { + return witgo.Err[witgo.Unit]("UploadReadable::ChunkReset: ErrorCodeBadDescriptor") + } + if stream.Seeker == nil { + return witgo.Err[witgo.Unit]("UploadReadable::ChunkReset: Not Seeker") + } + _, err := stream.Seeker.Seek(0, io.SeekStart) + if err != nil { + return witgo.Err[witgo.Unit](err.Error()) + } + return witgo.Ok[witgo.Unit, string](witgo.Unit{}) +} + +// get-hasher: func(hashs: list) -> result, string>; +func (host *DriverHost) GetHasher(this plugin_warp.UploadReadable, hashs []plugin_warp.HashAlg) witgo.Result[[]plugin_warp.HashInfo, string] { + upload, ok := host.uploads.Get(this) + if !ok { + return witgo.Err[[]plugin_warp.HashInfo]("UploadReadable: ErrorCodeBadDescriptor") + } + + resultHashs := plugin_warp.HashInfoConvert2(upload.GetHash(), hashs) + if resultHashs != nil { + return witgo.Ok[[]plugin_warp.HashInfo, string](resultHashs) + } + + if upload.StreamConsume { + return witgo.Err[[]plugin_warp.HashInfo]("UploadReadable: StreamConsume") + } + + // 无法从obj中获取需要的hash,或者获取的hash不完整。 + // 需要缓存整个文件并进行hash计算 + hashTypes := plugin_warp.HashAlgConverts(hashs) + + hashers := utils.NewMultiHasher(hashTypes) + if _, err := upload.CacheFullAndWriter(&upload.UpdateProgress, hashers); err != nil { + return witgo.Err[[]plugin_warp.HashInfo](err.Error()) + } + + maps.Copy(upload.GetHash().Export(), hashers.GetHashInfo().Export()) + + return witgo.Ok[[]plugin_warp.HashInfo, string](plugin_warp.HashInfoConvert(*hashers.GetHashInfo())) +} + +// update-progress: func(progress: f64); +func (host *DriverHost) UpdateProgress(this plugin_warp.UploadReadable, progress float64) { + upload, ok := host.uploads.Get(this) + if ok { + upload.UpdateProgress(progress) + } +} + +// resource cancellable { subscribe: func() -> pollable; } +func (host *DriverHost) Subscribe(this plugin_warp.Context) io_v0_2.Pollable { + poll := host.Host.PollManager() + + ctx, ok := host.contexts.Get(this) + if !ok { + return poll.Add(manager_io.ReadyPollable) + } + + return poll.Add(&plugin_warp.ContextPollable{Context: ctx}) +} diff --git a/internal/plugin/manager.go b/internal/plugin/manager.go new file mode 100644 index 000000000..07ca81119 --- /dev/null +++ b/internal/plugin/manager.go @@ -0,0 +1,650 @@ +package plugin + +import ( + "archive/zip" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "regexp" + "strings" + "sync" + + "github.com/coreos/go-semver/semver" + log "github.com/sirupsen/logrus" + + "github.com/OpenListTeam/OpenList/v4/internal/db" + "github.com/OpenListTeam/OpenList/v4/internal/model" +) + +var ( + PluginManager *Manager +) + +// PluginInfo 只包含从数据库加载的插件元数据。 +type PluginInfo struct { + *model.Plugin + handler PluginHandler // 缓存与此插件匹配的处理器 + driver *DriverPlugin // 缓存已创建的驱动插件实例 +} + +// PluginHandler 定义了处理特定类型插件的接口 +type PluginHandler interface { + // Prefix 返回此处理器能处理的插件ID前缀 + Prefix() string + // Register 注册一个插件 + Register(ctx context.Context, plugin *PluginInfo) error + // Unregister 注销一个插件 + Unregister(ctx context.Context, plugin *PluginInfo) error +} + +// Manager 负责管理插件的生命周期(安装、卸载、加载元数据)。 +type Manager struct { + sync.RWMutex + plugins map[string]*PluginInfo // Key: 插件 ID + pluginDir string + httpClient *http.Client + handlers []PluginHandler // 插件处理器列表 +} + +// NewManager 创建一个新的、轻量级的插件管理器。 +func NewManager(ctx context.Context, dataDir string) (*Manager, error) { + pluginDir := filepath.Join(dataDir, "plugins") + if err := os.MkdirAll(pluginDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create plugin directory: %w", err) + } + + m := &Manager{ + plugins: make(map[string]*PluginInfo), + pluginDir: pluginDir, + httpClient: &http.Client{}, + // 在这里注册所有支持的插件处理器 + handlers: []PluginHandler{ + &DriverPluginHandler{}, // 注册驱动插件处理器 + // 未来可以添加 newThemePluginHandler(), newOtherPluginHandler() 等 + }, + } + + if err := m.loadPluginsFromDB(ctx); err != nil { + return nil, fmt.Errorf("failed to load plugins from database: %w", err) + } + + // 在 NewManager 中直接调用 RegisterAll,确保启动时所有插件都被注册 + m.RegisterAll(ctx) + + return m, nil +} + +// loadPluginsFromDB 在启动时仅从数据库加载插件元数据。 +func (m *Manager) loadPluginsFromDB(ctx context.Context) error { + storedPlugins, err := db.GetAllPlugins(ctx) + if err != nil { + return err + } + log.Infof("Found %d installed plugins in the database.", len(storedPlugins)) + for _, p := range storedPlugins { + if _, err := os.Stat(p.WasmPath); os.IsNotExist(err) { + log.Warnf("Plugin '%s' found in database but its wasm file is missing at %s. Skipping.", p.ID, p.WasmPath) + continue + } + pluginInfo := &PluginInfo{Plugin: p} + // 为插件找到匹配的处理器 + for _, h := range m.handlers { + if strings.HasPrefix(p.ID, h.Prefix()) { + pluginInfo.handler = h + break + } + } + if pluginInfo.handler == nil { + log.Warnf("No handler found for plugin type with ID '%s'. Skipping registration.", p.ID) + } + m.plugins[p.ID] = pluginInfo + log.Infof("Loaded plugin metadata: %s (v%s)", p.Name, p.Version) + } + return nil +} + +// RegisterAll 遍历所有已加载的插件,并使用对应的处理器进行注册。 +func (m *Manager) RegisterAll(ctx context.Context) { + m.RLock() + defer m.RUnlock() + log.Infof("Registering all loaded plugins...") + for id, pluginInfo := range m.plugins { + if pluginInfo.handler != nil { + if err := pluginInfo.handler.Register(ctx, pluginInfo); err != nil { + // 注册失败,更新数据库状态 + log.Errorf("Failed to register plugin '%s': %v", id, err) + pluginInfo.Status = model.StatusError + pluginInfo.Message = err.Error() + // 更新数据库 + if err := db.UpdatePluginStatus(ctx, id, model.StatusError, err.Error()); err != nil { + log.Errorf("Failed to update status for plugin '%s' in database: %v", id, err) + } + } else { + // 注册成功,更新状态 + pluginInfo.Status = model.StatusActive + pluginInfo.Message = "" + if err := db.UpdatePluginStatus(ctx, id, model.StatusActive, ""); err != nil { + log.Errorf("Failed to update status for plugin '%s' in database: %v", id, err) + } + } + } + } +} + +// Install 根据源字符串的格式自动选择安装方式。 +func (m *Manager) Install(ctx context.Context, source string) (*PluginInfo, error) { + if strings.HasSuffix(source, ".zip") { + log.Infof("Installing plugin from archive URL: %s", source) + return m.InstallFromArchiveURL(ctx, source) + } + if strings.HasPrefix(source, "https://github.com/") { + log.Infof("Installing plugin from GitHub repository: %s", source) + return m.InstallFromGitHub(ctx, source) + } + // 默认认为是本地文件系统路径 + log.Infof("Installing plugin from local path: %s", source) + return m.InstallFromLocal(ctx, source, "") +} + +// InstallFromLocal 从本地清单和 Wasm 文件安装插件。 +// manifestPath 是必需的,wasmPath 是可选的(如果为空,则在 manifestPath 相同目录下查找 .wasm 文件)。 +func (m *Manager) InstallFromLocal(ctx context.Context, manifestPath string, wasmPath string) (*PluginInfo, error) { + manifestBytes, err := os.ReadFile(manifestPath) + if err != nil { + return nil, fmt.Errorf("failed to read manifest file '%s': %w", manifestPath, err) + } + + if wasmPath == "" { + wasmPath = strings.TrimSuffix(manifestPath, filepath.Ext(manifestPath)) + ".wasm" + } + + wasmBytes, err := os.ReadFile(wasmPath) + if err != nil { + return nil, fmt.Errorf("failed to read wasm file at '%s': %w", wasmPath, err) + } + + return m.install(ctx, manifestBytes, wasmBytes, "local:"+manifestPath) +} + +// InstallFromUpload 从一个上传的文件流 (io.Reader) 安装插件。 +func (m *Manager) InstallFromUpload(ctx context.Context, fileReader io.Reader, originalFileName string) (*PluginInfo, error) { + // 1. 将上传的文件内容保存到一个临时文件中 + tmpFile, err := os.CreateTemp("", "plugin-upload-*.zip") + if err != nil { + return nil, fmt.Errorf("failed to create temporary file for upload: %w", err) + } + defer os.Remove(tmpFile.Name()) + + _, err = io.Copy(tmpFile, fileReader) + if err != nil { + return nil, fmt.Errorf("failed to save uploaded file to temporary location: %w", err) + } + // 必须关闭文件,以便 zip.OpenReader 能够读取它 + tmpFile.Close() + + // 2. 从这个临时的 zip 文件中提取 manifest 和 wasm + manifestBytes, wasmBytes, err := extractPluginFromZip(tmpFile.Name()) + if err != nil { + return nil, fmt.Errorf("failed to extract plugin from uploaded archive: %w", err) + } + + // 3. 调用核心安装逻辑,使用 "upload:[filename]" 作为来源标识 + return m.install(ctx, manifestBytes, wasmBytes, "upload:"+originalFileName) +} + +// InstallFromArchiveURL 从一个 zip 压缩包的 URL 安装插件。 +func (m *Manager) InstallFromArchiveURL(ctx context.Context, url string) (*PluginInfo, error) { + tmpFile, err := downloadTempFile(m.httpClient, url) + if err != nil { + return nil, fmt.Errorf("failed to download archive from %s: %w", url, err) + } + defer os.Remove(tmpFile.Name()) + + manifestBytes, wasmBytes, err := extractPluginFromZip(tmpFile.Name()) + if err != nil { + return nil, fmt.Errorf("failed to extract plugin from archive '%s': %w", url, err) + } + + return m.install(ctx, manifestBytes, wasmBytes, url) +} + +// InstallFromGitHub 从 GitHub 仓库的最新 release 安装插件。 +func (m *Manager) InstallFromGitHub(ctx context.Context, repoURL string) (*PluginInfo, error) { + repoURL = strings.TrimSuffix(repoURL, ".git") + parts := strings.Split(strings.TrimPrefix(repoURL, "https://github.com/"), "/") + if len(parts) < 2 { + return nil, fmt.Errorf("invalid github repo URL format: %s", repoURL) + } + owner, repo := parts[0], parts[1] + + // 1. 获取最新 release 信息 + apiURL := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases/latest", owner, repo) + log.Infof("Fetching latest release from GitHub API: %s", apiURL) + + req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/vnd.github.v3+json") + + resp, err := m.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to call GitHub API: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GitHub API returned non-200 status: %s", resp.Status) + } + + var release struct { + Assets []struct { + Name string `json:"name"` + DownloadURL string `json:"browser_download_url"` + } `json:"assets"` + } + + if err := json.NewDecoder(resp.Body).Decode(&release); err != nil { + return nil, fmt.Errorf("failed to parse GitHub API response: %w", err) + } + + // 2. 查找包含插件的 zip 资产 + var assetURL string + for _, asset := range release.Assets { + // 寻找第一个 .zip 文件作为目标 + if strings.HasSuffix(asset.Name, ".zip") { + assetURL = asset.DownloadURL + break + } + } + + if assetURL == "" { + return nil, fmt.Errorf("no .zip asset found in the latest release of %s/%s", owner, repo) + } + + log.Infof("Found release asset to download: %s", assetURL) + return m.InstallFromArchiveURL(ctx, assetURL) +} + +// install 是安装插件的核心逻辑 +func (m *Manager) install(ctx context.Context, manifestBytes []byte, wasmBytes []byte, sourceURL string) (*PluginInfo, error) { + m.Lock() + defer m.Unlock() + + var meta model.Plugin + if err := json.Unmarshal(manifestBytes, &meta); err != nil { + return nil, fmt.Errorf("failed to parse plugin manifest: %w", err) + } + if meta.ID == "" || meta.Name == "" || meta.Version == "" { + return nil, fmt.Errorf("plugin manifest is missing required fields (id, name, version)") + } + + // 1. 查找匹配的处理器并检查插件类型 + var handler PluginHandler + for _, h := range m.handlers { + if strings.HasPrefix(meta.ID, h.Prefix()) { + handler = h + break + } + } + if handler == nil { + return nil, fmt.Errorf("unsupported plugin type for ID '%s'", meta.ID) + } + + if _, exists := m.plugins[meta.ID]; exists { + return nil, fmt.Errorf("plugin with id '%s' already exists", meta.ID) + } + + fileName := formatPluginFileName(meta.Author, meta.ID) + wasmPath := filepath.Join(m.pluginDir, fileName) + if err := os.WriteFile(wasmPath, wasmBytes, 0644); err != nil { + return nil, fmt.Errorf("failed to save wasm file: %w", err) + } + + pluginModel := &model.Plugin{ + ID: meta.ID, + Name: meta.Name, + Version: meta.Version, + Author: meta.Author, + Description: meta.Description, + IconURL: meta.IconURL, + SourceURL: sourceURL, + WasmPath: wasmPath, + } + + // 先存入数据库,初始状态为 'inactive' + if err := db.CreatePlugin(ctx, pluginModel); err != nil { + os.Remove(wasmPath) + return nil, fmt.Errorf("failed to save plugin metadata to database: %w", err) + } + log.Infof("Plugin '%s' metadata saved to database with status: inactive.", pluginModel.ID) + + pluginInfo := &PluginInfo{Plugin: pluginModel, handler: handler} + m.plugins[pluginInfo.ID] = pluginInfo + + // 使用找到的处理器进行注册 + if err := handler.Register(ctx, pluginInfo); err != nil { + // 注册失败,更新数据库状态 + log.Errorf("Failed to register newly installed plugin '%s': %v", pluginInfo.ID, err) + pluginInfo.Status = model.StatusError + pluginInfo.Message = err.Error() + if dbErr := db.UpdatePluginStatus(ctx, pluginInfo.ID, model.StatusError, err.Error()); dbErr != nil { + log.Errorf("Failed to update error status for plugin '%s' in database: %v", pluginInfo.ID, dbErr) + } + } else { + // 注册成功,更新状态 + pluginInfo.Status = model.StatusActive + pluginInfo.Message = "" + if dbErr := db.UpdatePluginStatus(ctx, pluginInfo.ID, model.StatusActive, ""); dbErr != nil { + log.Errorf("Failed to update active status for plugin '%s' in database: %v", pluginInfo.ID, dbErr) + } + } + + return pluginInfo, nil +} + +// Uninstall 卸载一个插件 +func (m *Manager) Uninstall(ctx context.Context, pluginID string) error { + m.Lock() + defer m.Unlock() + + plugin, ok := m.plugins[pluginID] + if !ok { + return fmt.Errorf("plugin with ID '%s' not found", pluginID) + } + + // 1. 使用对应的处理器进行注销 + if plugin.handler != nil { + if err := plugin.handler.Unregister(ctx, plugin); err != nil { + // 即便注销失败,也要继续删除流程 + log.Warnf("Failed to unregister plugin '%s', but continuing with uninstallation: %v", pluginID, err) + } + } + + // 2. 关闭插件内部资源 (如果 driver 实例存在) + if plugin.driver != nil { + if err := plugin.driver.Close(ctx); err != nil { + log.Warnf("Error closing driver resources for plugin %s: %v", pluginID, err) + } + } + + // 3. 从数据库删除 + if err := db.DeletePluginByID(ctx, pluginID); err != nil { + return fmt.Errorf("failed to delete plugin '%s' from database: %w", pluginID, err) + } + + // 4. 删除文件 + if err := os.Remove(plugin.WasmPath); err != nil && !os.IsNotExist(err) { + log.Warnf("Failed to remove wasm file %s, but database entry was removed: %v", plugin.WasmPath, err) + } + + // 5. 从内存中删除 + delete(m.plugins, pluginID) + log.Infof("Plugin '%s' has been successfully uninstalled.", pluginID) + return nil +} + +// CheckForUpdate 检查单个指定插件的更新。 +// 如果有可用更新,则返回新版本号;否则返回空字符串。 +func (m *Manager) CheckForUpdate(ctx context.Context, pluginID string) (string, error) { + m.RLock() + plugin, ok := m.plugins[pluginID] + m.RUnlock() + + if !ok { + return "", fmt.Errorf("plugin with ID '%s' not found", pluginID) + } + + if !strings.HasPrefix(plugin.SourceURL, "https://github.com/") { + return "", fmt.Errorf("only plugins installed from GitHub can be checked for updates") + } + + latestVersionStr, err := m.getLatestGitHubVersionTag(ctx, plugin.SourceURL) + if err != nil { + return "", fmt.Errorf("failed to check for updates for plugin '%s': %w", pluginID, err) + } + + latestVersion, err := semver.NewVersion(latestVersionStr) + if err != nil { + return "", fmt.Errorf("invalid latest version format '%s' for plugin '%s': %w", latestVersionStr, pluginID, err) + } + + currentVersion, err := semver.NewVersion(plugin.Version) + if err != nil { + return "", fmt.Errorf("invalid current version format '%s' for plugin '%s': %w", plugin.Version, pluginID, err) + } + + if latestVersion.Compare(*currentVersion) > 0 { + return latestVersion.String(), nil + } + + // 没有可用更新 + return "", nil +} + +// CheckForUpdates 检查所有已安装插件的更新。 +func (m *Manager) CheckForUpdates(ctx context.Context) (map[string]string, error) { + m.RLock() + defer m.RUnlock() + + updatesAvailable := make(map[string]string) + + for id, plugin := range m.plugins { + if !strings.HasPrefix(plugin.SourceURL, "https://github.com/") { + continue // 只支持检查来自 GitHub 的插件 + } + + latestVersionStr, err := m.getLatestGitHubVersionTag(ctx, plugin.SourceURL) + if err != nil { + log.Warnf("Failed to check for updates for plugin '%s': %v", id, err) + continue + } + + latestVersion, err := semver.NewVersion(latestVersionStr) + if err != nil { + log.Warnf("Invalid latest version format '%s' for plugin '%s': %v", latestVersionStr, id, err) + continue + } + + currentVersion, err := semver.NewVersion(plugin.Version) + if err != nil { + log.Warnf("Invalid current version format '%s' for plugin '%s': %v", plugin.Version, id, err) + continue + } + + // 使用 Compare 方法进行比较 + if latestVersion.Compare(*currentVersion) > 0 { + updatesAvailable[id] = latestVersion.String() + log.Infof("Update available for plugin '%s': %s -> %s", id, currentVersion.String(), latestVersion.String()) + } + } + + return updatesAvailable, nil +} + +// Update 更新指定的插件到最新版本。 +func (m *Manager) Update(ctx context.Context, pluginID string) (*PluginInfo, error) { + m.Lock() + plugin, ok := m.plugins[pluginID] + m.Unlock() // 提前解锁 + + if !ok { + return nil, fmt.Errorf("plugin with ID '%s' not found", pluginID) + } + + if !strings.HasPrefix(plugin.SourceURL, "https://github.com/") { + return nil, fmt.Errorf("only plugins installed from GitHub can be updated automatically") + } + + log.Infof("Updating plugin '%s' from %s", pluginID, plugin.SourceURL) + + // 先卸载旧版本 + if err := m.Uninstall(ctx, pluginID); err != nil { + return nil, fmt.Errorf("failed to uninstall old version of plugin '%s' during update: %w", pluginID, err) + } + + // 重新从 GitHub 安装 + return m.Install(ctx, plugin.SourceURL) +} + +// getLatestGitHubVersionTag 从 GitHub API 获取最新的 release tag 字符串。 +func (m *Manager) getLatestGitHubVersionTag(ctx context.Context, repoURL string) (string, error) { + // 规范化 URL 并解析 owner/repo + repoURL = strings.TrimSuffix(repoURL, ".git") + parts := strings.Split(strings.TrimPrefix(repoURL, "https://github.com/"), "/") + if len(parts) < 2 { + return "", fmt.Errorf("invalid github repo URL format: %s", repoURL) + } + owner, repo := parts[0], parts[1] + + // 构建 API URL + apiURL := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases/latest", owner, repo) + + // 创建带上下文的 HTTP 请求 + req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil) + if err != nil { + return "", fmt.Errorf("failed to create request for GitHub API: %w", err) + } + // 根据 GitHub API v3 的要求设置 Accept header + req.Header.Set("Accept", "application/vnd.github.v3+json") + + // 执行请求 + resp, err := m.httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("failed to call GitHub API at %s: %w", apiURL, err) + } + defer resp.Body.Close() + + // 检查响应状态码 + if resp.StatusCode != http.StatusOK { + // 读取响应体以获取更详细的错误信息 + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("GitHub API returned non-200 status: %s, body: %s", resp.Status, string(body)) + } + + // 定义一个结构体来仅解析我们需要的字段 (tag_name) + var release struct { + TagName string `json:"tag_name"` + } + + // 解析 JSON 响应 + if err := json.NewDecoder(resp.Body).Decode(&release); err != nil { + return "", fmt.Errorf("failed to parse GitHub API response: %w", err) + } + + if release.TagName == "" { + return "", errors.New("no tag_name found in the latest release") + } + + return release.TagName, nil +} + +// --- 辅助函数 --- + +// extractPluginFromZip 从 zip 文件中提取 plugin.json 和 .wasm 文件 +func extractPluginFromZip(zipPath string) ([]byte, []byte, error) { + r, err := zip.OpenReader(zipPath) + if err != nil { + return nil, nil, err + } + defer r.Close() + + var manifestBytes, wasmBytes []byte + + for _, f := range r.File { + // 忽略目录和非插件文件 + if f.FileInfo().IsDir() { + continue + } + + baseName := filepath.Base(f.Name) + if baseName == "plugin.json" { + rc, err := f.Open() + if err != nil { + return nil, nil, err + } + manifestBytes, err = io.ReadAll(rc) + rc.Close() + if err != nil { + return nil, nil, err + } + } else if strings.HasSuffix(baseName, ".wasm") { + rc, err := f.Open() + if err != nil { + return nil, nil, err + } + wasmBytes, err = io.ReadAll(rc) + rc.Close() + if err != nil { + return nil, nil, err + } + } + } + + if manifestBytes == nil { + return nil, nil, errors.New("manifest 'plugin.json' not found in archive") + } + if wasmBytes == nil { + return nil, nil, errors.New("no .wasm file found in archive") + } + + return manifestBytes, wasmBytes, nil +} + +// downloadTempFile 将文件从 URL 下载到临时目录 +func downloadTempFile(client *http.Client, url string) (*os.File, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("server returned status: %s", resp.Status) + } + + tmpFile, err := os.CreateTemp("", "plugin-download-*.zip") + if err != nil { + return nil, err + } + + _, err = io.Copy(tmpFile, resp.Body) + if err != nil { + tmpFile.Close() + os.Remove(tmpFile.Name()) + return nil, err + } + + // 确保内容写入磁盘 + if err := tmpFile.Sync(); err != nil { + tmpFile.Close() + os.Remove(tmpFile.Name()) + return nil, err + } + + tmpFile.Close() + return tmpFile, nil +} + +var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9_.-]+`) + +func sanitize(s string) string { + if s == "" { + return "unknown" + } + return nonAlphanumericRegex.ReplaceAllString(s, "_") +} + +func formatPluginFileName(author, id string) string { + return fmt.Sprintf("%s-%s.wasm", sanitize(author), sanitize(id)) +} diff --git a/internal/plugin/manager_driver.go b/internal/plugin/manager_driver.go new file mode 100644 index 000000000..809ece821 --- /dev/null +++ b/internal/plugin/manager_driver.go @@ -0,0 +1,70 @@ +package plugin + +import ( + "context" + "fmt" + + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/op" + log "github.com/sirupsen/logrus" +) + +// --- 驱动插件处理器 --- + +// DriverPluginHandler 实现了 PluginHandler 接口,专门处理驱动插件 +type DriverPluginHandler struct{} + +func (h *DriverPluginHandler) Prefix() string { + return "openlist.driver." +} + +func (h *DriverPluginHandler) Register(ctx context.Context, plugin *PluginInfo) error { + if plugin.driver != nil { + return nil // 已经注册过了 + } + + var err error + plugin.driver, err = NewDriverPlugin(ctx, plugin) + if err != nil { + return fmt.Errorf("load driver plugin err: %w", err) + } + + err = op.RegisterDriver(func() driver.Driver { + tempDriver, err := plugin.driver.NewWasmDriver() + if err != nil { + log.Errorf("deferred load driver plugin err: %v", err) + return nil + } + return tempDriver + }) + if err != nil { + // 如果注册失败,关闭运行时 + plugin.driver.Close(ctx) + return fmt.Errorf("failed to register driver in op: %w", err) + } + + log.Infof("Successfully registered driver for plugin: %s", plugin.ID) + return nil +} + +func (h *DriverPluginHandler) Unregister(ctx context.Context, plugin *PluginInfo) error { + if plugin.driver == nil { + log.Errorf("plugin.driver is nil during unregister for plugin '%s', cannot get config", plugin.ID) + return fmt.Errorf("plugin.driver instance not found, cannot properly unregister from op") + } + + op.UnRegisterDriver(func() driver.Driver { + tempDriver, err := plugin.driver.NewWasmDriver() + if err != nil { + log.Warnf("Failed to create temp driver for unregister: %v", err) + return nil + } + return tempDriver + }) + + if err := plugin.driver.Close(ctx); err != nil { + log.Warnf("Error closing driver plugin runtime for %s: %v", plugin.ID, err) + } + + return nil +} diff --git a/internal/plugin/warp/context.go b/internal/plugin/warp/context.go new file mode 100644 index 000000000..b79bf8bb9 --- /dev/null +++ b/internal/plugin/warp/context.go @@ -0,0 +1,44 @@ +package plugin_warp + +import ( + "context" + + witgo "github.com/OpenListTeam/wazero-wasip2/wit-go" +) + +type ContextManaget = witgo.ResourceManager[context.Context] +type Context = uint32 + +func NewContextManager() *ContextManaget { + return witgo.NewResourceManager[context.Context](nil) +} + +type ContextPollable struct { + context.Context +} + +func (c *ContextPollable) IsReady() bool { + select { + case <-c.Done(): + return true + default: + return false + } +} + +// Block 阻塞直到 Pollable 就绪。 +func (c *ContextPollable) Block() { + <-c.Done() +} + +func (*ContextPollable) SetReady() { + +} + +func (ContextPollable) Close() { + +} + +func (c *ContextPollable) Channel() <-chan struct{} { + return c.Done() +} diff --git a/internal/plugin/warp/errors.go b/internal/plugin/warp/errors.go new file mode 100644 index 000000000..368f2a419 --- /dev/null +++ b/internal/plugin/warp/errors.go @@ -0,0 +1,45 @@ +package plugin_warp + +import ( + "errors" + + "github.com/OpenListTeam/OpenList/v4/internal/errs" +) + +type ErrCode struct { + InvalidHandle *struct{} `wit:"case(0)"` + // 表示功能未实现。 + NotImplemented *struct{} `wit:"case(1)"` + // 表示功能不支持。 + NotSupport *struct{} `wit:"case(2)"` + // 表示资源未找到。 + NotFound *struct{} `wit:"case(3)"` + // 表示路径是文件而非目录。 + NotFolder *struct{} `wit:"case(4)"` + // 表示路径是目录而非文件。 + NotFile *struct{} `wit:"case(5)"` + // 包含描述信息的通用错误。 + Generic *string `wit:"case(6)"` + // 授权失效,此时驱动处于无法自动恢复的状态 + Unauthorized *string `wit:"case(7)"` +} + +func (e ErrCode) ToError() error { + if e.InvalidHandle != nil { + return errs.StorageNotFound + } else if e.NotImplemented != nil { + return errs.NotImplement + } else if e.NotSupport != nil { + return errs.NotSupport + } else if e.NotFound != nil { + return errs.ObjectNotFound + } else if e.NotFile != nil { + return errs.NotFile + } else if e.NotFolder != nil { + return errs.NotFolder + } else if e.Unauthorized != nil { + return errors.New(*e.Unauthorized) + } + + return errors.New(*e.Generic) +} diff --git a/internal/plugin/warp/object.go b/internal/plugin/warp/object.go new file mode 100644 index 000000000..17c324e5b --- /dev/null +++ b/internal/plugin/warp/object.go @@ -0,0 +1,98 @@ +package plugin_warp + +import ( + "time" + + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + clocks "github.com/OpenListTeam/wazero-wasip2/wasip2/clocks/v0_2" + witgo "github.com/OpenListTeam/wazero-wasip2/wit-go" +) + +type Object struct { + // 对象的绝对路径。 + Path string + // 对象的id信息 + ID string + // 对象的名称。 + Name string + // 对象的大小(字节)。 + Size int64 + // 是否为目录。 + IsFolder bool + // 创建时间戳 + Created clocks.Duration + // 修改时间戳 + Modified clocks.Duration + // 缩略图链接。 + Thumbnail witgo.Option[string] + // 文件的哈希信息列表。 + Hashes []HashInfo + // 用于存储驱动特定的、非标准的元数据。 + Extra [][2]string +} + +func (o *Object) GetName() string { + return o.Name +} + +func (o *Object) GetSize() int64 { + return o.Size +} + +func (o *Object) ModTime() time.Time { + return o.Modified.ToTime() +} +func (o *Object) CreateTime() time.Time { + if o.Created == 0 { + return o.ModTime() + } + return o.Created.ToTime() +} + +func (o *Object) IsDir() bool { + return o.IsFolder +} + +func (o *Object) GetID() string { + return o.ID +} + +func (o *Object) GetPath() string { + return o.Path +} + +func (o *Object) SetPath(path string) { + o.Path = path +} + +func (o *Object) GetHash() utils.HashInfo { + return HashInfoConvert3(o.Hashes) +} + +func (o *Object) Thumb() string { + return o.Thumbnail.UnwrapOr("") +} + +var _ model.Obj = (*Object)(nil) +var _ model.Thumb = (*Object)(nil) +var _ model.SetPath = (*Object)(nil) + +func ConvertObjToObject(obj model.Obj) Object { + + thumbnail := witgo.None[string]() + if t, ok := obj.(model.Thumb); ok { + thumbnail = witgo.Some(t.Thumb()) + } + return Object{ + Path: obj.GetPath(), + ID: obj.GetID(), + Name: obj.GetName(), + Size: obj.GetSize(), + IsFolder: obj.IsDir(), + Created: clocks.Duration(obj.CreateTime().UnixNano()), + Modified: clocks.Duration(obj.ModTime().UnixNano()), + Thumbnail: thumbnail, + Hashes: HashInfoConvert(obj.GetHash()), + } +} diff --git a/internal/plugin/warp/types.go b/internal/plugin/warp/types.go new file mode 100644 index 000000000..2204d9b17 --- /dev/null +++ b/internal/plugin/warp/types.go @@ -0,0 +1,198 @@ +package plugin_warp + +import ( + "errors" + "fmt" + "slices" + "strings" + + "github.com/OpenListTeam/OpenList/v4/internal/conf" + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + clocks "github.com/OpenListTeam/wazero-wasip2/wasip2/clocks/v0_2" + wasi_http "github.com/OpenListTeam/wazero-wasip2/wasip2/http/v0_2" + witgo "github.com/OpenListTeam/wazero-wasip2/wit-go" +) + +type LogLevel struct { + Debug *struct{} `wit:"case(0)"` + Info *struct{} `wit:"case(1)"` + Warn *struct{} `wit:"case(2)"` + Error *struct{} `wit:"case(3)"` +} + +type HashAlg struct { + MD5 *struct{} `wit:"case(0)"` + SHA1 *struct{} `wit:"case(1)"` + SHA256 *struct{} `wit:"case(2)"` + GCID *struct{} `wit:"case(3)"` +} + +type HashInfo struct { + Alg HashAlg + Val string +} + +type LinkResult struct { + File witgo.Option[Object] + Resource LinkResource +} +type LinkResource struct { + Direct *struct { + Url string + Header wasi_http.Headers + Expiratcion witgo.Option[clocks.Duration] + } `wit:"case(0)"` + RangeStream *struct{} `wit:"case(1)"` +} + +type Capability struct { + GetFile bool + ListFile bool + LinkFile bool + MkdirFile bool + RenameFile bool + MoveFile bool + RemoveFile bool + CopyFile bool + UploadFile bool +} + +func (Capability) IsFlags() {} + +type DriverProps struct { + Name string + + OnlyProxy bool + NoCache bool + + Alert string + + NoOverwriteUpload bool + ProxyRange bool + + // 网盘能力标记 + Capabilitys Capability +} + +func (c DriverProps) ToConfig() driver.Config { + return driver.Config{ + Name: c.Name, + LocalSort: true, + OnlyProxy: c.OnlyProxy, + NoCache: c.NoCache, + NoUpload: !c.Capabilitys.UploadFile, + + CheckStatus: true, + Alert: c.Alert, + + NoOverwriteUpload: c.NoOverwriteUpload, + ProxyRangeOption: c.ProxyRange, + } +} + +type FormField struct { + // 字段的唯一标识符(键)。 + Name string + // 显示给用户的标签。 + Label string + // 字段的输入类型,用于 UI 渲染。 + Kind FieldKind + // 是否必填 + Required bool + // 字段的帮助或提示信息。 + Help string +} + +type FieldKind struct { + String *string `wit:"case(0)"` + Password *string `wit:"case(1)"` + Number *float64 `wit:"case(2)"` + Boolean *bool `wit:"case(3)"` + Text *string `wit:"case(4)"` + Select *[]string `wit:"case(5)"` +} + +type Additional struct { + Json []byte + Forms []FormField +} + +func NewAdditional(forms []FormField) Additional { + return Additional{ + Forms: forms, + } +} + +func (m *Additional) String() string { + return string(m.Json) +} +func (m *Additional) SetString(config string) { + m.Json = []byte(config) +} + +func (m *Additional) Bytes() []byte { + return m.Json +} + +func (m *Additional) SetBytes(config []byte) { + m.Json = config +} + +// MarshalJSON returns m as the JSON encoding of m. +func (m Additional) MarshalJSON() ([]byte, error) { + return m.Json, nil +} + +// UnmarshalJSON sets *m to a copy of data. +func (m *Additional) UnmarshalJSON(data []byte) error { + if m == nil { + return errors.New("json.RawMessage: UnmarshalJSON on nil pointer") + } + m.Json = slices.Clone(data) + return nil +} + +func (addit *Additional) GetItems() []driver.Item { + return utils.MustSliceConvert(addit.Forms, func(item FormField) driver.Item { + var typ string + var def string + var opts string + if item.Kind.Boolean != nil { + typ = conf.TypeBool + def = fmt.Sprintf("%t", *item.Kind.Boolean) + } else if item.Kind.Password != nil { + typ = conf.TypeString + def = *item.Kind.Password + } else if item.Kind.Number != nil { + typ = conf.TypeNumber + def = fmt.Sprintf("%f", *item.Kind.Number) + } else if item.Kind.Select != nil { + typ = conf.TypeSelect + if len(*item.Kind.Select) > 0 { + def = (*item.Kind.Select)[0] + opts = strings.Join((*item.Kind.Select), ",") + } + } else if item.Kind.String != nil { + typ = conf.TypeString + def = *item.Kind.String + } else if item.Kind.Text != nil { + typ = conf.TypeText + def = *item.Kind.Text + } + + return driver.Item{ + Name: item.Name, + Type: typ, + Default: def, + Options: opts, + Required: item.Required, + Help: item.Help, + } + }) +} + +type LinkArgs struct { + IP string + Header wasi_http.Headers +} diff --git a/internal/plugin/warp/upload.go b/internal/plugin/warp/upload.go new file mode 100644 index 000000000..fe84d741c --- /dev/null +++ b/internal/plugin/warp/upload.go @@ -0,0 +1,105 @@ +package plugin_warp + +import ( + "github.com/OpenListTeam/OpenList/v4/internal/driver" + "github.com/OpenListTeam/OpenList/v4/internal/model" + "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/utils" + hash_extend "github.com/OpenListTeam/OpenList/v4/pkg/utils/hash" + witgo "github.com/OpenListTeam/wazero-wasip2/wit-go" +) + +type UploadRequest struct { + Target Object + // 指向宿主端文件内容的句柄 + // 由host控制释放 + Content UploadReadable + // 如果是覆盖上传,宿主会提供被覆盖文件的原始对象数据 + Exist witgo.Option[Object] +} + +type UploadReadableType struct { + model.FileStreamer + StreamConsume bool + UpdateProgress driver.UpdateProgress + SectionReader *StreamSectionReader +} + +type StreamSectionReader struct { + stream.StreamSectionReaderIF + Offset int64 + CunketSize int64 +} + +type UploadReadableManager = witgo.ResourceManager[*UploadReadableType] +type UploadReadable = uint32 + +func NewUploadManager() *UploadReadableManager { + return witgo.NewResourceManager[*UploadReadableType](nil) +} + +func HashTypeConvert(typ *utils.HashType) HashAlg { + switch typ { + case utils.MD5: + return HashAlg{MD5: &struct{}{}} + case utils.SHA1: + return HashAlg{SHA1: &struct{}{}} + case utils.SHA256: + return HashAlg{SHA256: &struct{}{}} + case hash_extend.GCID: + return HashAlg{GCID: &struct{}{}} + } + panic("plase add hash convert") +} +func HashAlgConvert(hash HashAlg) *utils.HashType { + if hash.MD5 != nil { + return utils.MD5 + } else if hash.SHA1 != nil { + return utils.SHA1 + } else if hash.SHA256 != nil { + return utils.SHA256 + } else if hash.GCID != nil { + return hash_extend.GCID + } + panic("plase add hash convert") +} +func HashAlgConverts(HashAlgs []HashAlg) []*utils.HashType { + hashTypes := make([]*utils.HashType, 0, len(HashAlgs)) + for _, needHash := range HashAlgs { + hashTypes = append(hashTypes, HashAlgConvert(needHash)) + } + return hashTypes +} + +func HashInfoConvert(hashInfo utils.HashInfo) []HashInfo { + result := make([]HashInfo, 0, 4) + for hash, val := range hashInfo.All() { + if hash.Width != len(val) { + continue + } + result = append(result, HashInfo{Alg: HashTypeConvert(hash), Val: val}) + } + + return result +} + +func HashInfoConvert2(hashInfo utils.HashInfo, needHashs []HashAlg) []HashInfo { + resultHashs := make([]HashInfo, 0, len(needHashs)) + + for _, needHash := range needHashs { + hashType := HashAlgConvert(needHash) + hash := hashInfo.GetHash(hashType) + if hashType.Width != len(hash) { + return nil + } + resultHashs = append(resultHashs, HashInfo{Alg: needHash, Val: hash}) + } + return resultHashs +} +func HashInfoConvert3(hashInfo []HashInfo) utils.HashInfo { + newHashInfo := make(map[*utils.HashType]string, len(hashInfo)) + for _, hashInfo := range hashInfo { + newHashInfo[HashAlgConvert(hashInfo.Alg)] = hashInfo.Val + } + return utils.NewHashInfoByMap(newHashInfo) +} diff --git a/server/handles/plugin.go b/server/handles/plugin.go new file mode 100644 index 000000000..cfef43eb9 --- /dev/null +++ b/server/handles/plugin.go @@ -0,0 +1,245 @@ +package handles + +import ( + "fmt" + "net/http" + "strings" + + "github.com/OpenListTeam/OpenList/v4/internal/db" + "github.com/OpenListTeam/OpenList/v4/internal/plugin" + "github.com/OpenListTeam/OpenList/v4/server/common" + "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" +) + +// InstallPluginReq 定义了安装插件 API 的请求体结构 +type InstallPluginReq struct { + // Source 是插件的来源地址,可以是: + // 1. GitHub 仓库 URL (e.g., "https://github.com/user/repo") + // 2. Zip 压缩包 URL (e.g., "https://example.com/plugin.zip") + // 3. 本地 manifest 文件路径 (e.g., "/path/to/plugin.json") + Source string `json:"source" binding:"required"` +} + +// PluginIDReq 定义了需要插件 ID 的通用请求体结构 +type PluginIDReq struct { + ID string `json:"id" binding:"required"` +} + +// --- API 处理器 --- + +// ListPlugins godoc +// @Summary List all installed plugins +// @Description Get a list of all plugins that are currently installed. +// @Tags plugin +// @Produce json +// @Success 200 {object} common.Resp{data=[]model.Plugin} "A list of installed plugins" +// @Failure 500 {object} common.Resp "Internal server error" +// @Router /api/plugin/list [get] +func ListPlugins(c *gin.Context) { + // 直接从数据库获取最新的插件列表,确保状态是最新的 + plugins, err := db.GetAllPlugins(c.Request.Context()) + if err != nil { + log.Errorf("Failed to get all plugins from database: %v", err) + common.ErrorResp(c, err, http.StatusInternalServerError) + return + } + common.SuccessResp(c, plugins) +} + +// InstallPlugin godoc +// @Summary Install a new plugin +// @Description Install a plugin from a source URL (GitHub, Zip) or a local path. +// @Tags plugin +// @Accept json +// @Produce json +// @Param req body InstallPluginReq true "Plugin source" +// @Success 200 {object} common.Resp{data=model.Plugin} "Plugin installed successfully" +// @Failure 400 {object} common.Resp "Bad request" +// @Failure 500 {object} common.Resp "Internal server error" +// @Router /api/plugin/install [post] +func InstallPlugin(c *gin.Context) { + var req InstallPluginReq + if err := c.ShouldBindJSON(&req); err != nil { + common.ErrorResp(c, err, http.StatusBadRequest) + return + } + + log.Infof("Attempting to install plugin from source: %s", req.Source) + + pluginInfo, err := plugin.PluginManager.Install(c.Request.Context(), req.Source) + if err != nil { + log.Errorf("Failed to install plugin from source '%s': %v", req.Source, err) + common.ErrorResp(c, err, http.StatusInternalServerError) + return + } + + log.Infof("Successfully installed plugin: %s (v%s)", pluginInfo.Name, pluginInfo.Version) + common.SuccessResp(c, pluginInfo.Plugin) +} + +// InstallPluginFromUpload godoc +// @Summary Install a plugin from an uploaded zip file +// @Description Upload a .zip file containing plugin.json and a .wasm file to install a new plugin. +// @Tags plugin +// @Accept multipart/form-data +// @Produce json +// @Param file formData file true "The plugin zip file to upload" +// @Success 200 {object} common.Resp{data=model.Plugin} "Plugin installed successfully" +// @Failure 400 {object} common.Resp "Bad request (e.g., no file uploaded)" +// @Failure 500 {object} common.Resp "Internal server error" +// @Router /api/plugin/upload [post] +func InstallPluginFromUpload(c *gin.Context) { + // "file" 必须是前端上传文件时使用的表单字段名 (form field name) + file, err := c.FormFile("file") + if err != nil { + common.ErrorResp(c, fmt.Errorf("failed to get 'file' from form: %w", err), http.StatusBadRequest) + return + } + + log.Infof("Attempting to install plugin from uploaded file: %s", file.Filename) + + // 打开上传的文件以获取 io.Reader + f, err := file.Open() + if err != nil { + common.ErrorResp(c, fmt.Errorf("failed to open uploaded file: %w", err), http.StatusInternalServerError) + return + } + defer f.Close() + + // 调用管理器的 InstallFromUpload 方法 + pluginInfo, err := plugin.PluginManager.InstallFromUpload(c.Request.Context(), f, file.Filename) + if err != nil { + log.Errorf("Failed to install plugin from uploaded file '%s': %v", file.Filename, err) + common.ErrorResp(c, err, http.StatusInternalServerError) + return + } + + log.Infof("Successfully installed plugin from upload: %s (v%s)", pluginInfo.Name, pluginInfo.Version) + common.SuccessResp(c, pluginInfo.Plugin) +} + +// UninstallPlugin godoc +// @Summary Uninstall a plugin +// @Description Uninstall a plugin by its ID. +// @Tags plugin +// @Accept json +// @Produce json +// @Param req body PluginIDReq true "Plugin ID to uninstall" +// @Success 200 {object} common.Resp "Plugin uninstalled successfully" +// @Failure 400 {object} common.Resp "Bad request" +// @Failure 500 {object} common.Resp "Internal server error" +// @Router /api/plugin/uninstall [post] +func UninstallPlugin(c *gin.Context) { + var req PluginIDReq + if err := c.ShouldBindJSON(&req); err != nil { + common.ErrorResp(c, err, http.StatusBadRequest) + return + } + + log.Infof("Attempting to uninstall plugin with ID: %s", req.ID) + + if err := plugin.PluginManager.Uninstall(c.Request.Context(), req.ID); err != nil { + log.Errorf("Failed to uninstall plugin '%s': %v", req.ID, err) + common.ErrorResp(c, err, http.StatusInternalServerError) + return + } + + log.Infof("Successfully uninstalled plugin: %s", req.ID) + common.SuccessResp(c, "Plugin uninstalled successfully") +} + +// CheckForUpdates godoc +// @Summary Check for plugin updates +// @Description Checks all installed plugins from GitHub for available updates. +// @Tags plugin +// @Produce json +// @Success 200 {object} common.Resp{data=map[string]string} "A map of plugins with available updates (id: new_version)" +// @Failure 500 {object} common.Resp "Internal server error" +// @Router /api/plugin/updates/check [get] +func CheckForUpdates(c *gin.Context) { + log.Info("Checking for plugin updates...") + + updates, err := plugin.PluginManager.CheckForUpdates(c.Request.Context()) + if err != nil { + log.Errorf("Failed to check for plugin updates: %v", err) + common.ErrorResp(c, err, http.StatusInternalServerError) + return + } + + log.Infof("Found %d available plugin updates.", len(updates)) + common.SuccessResp(c, updates) +} + +// UpdatePlugin godoc +// @Summary Update a plugin +// @Description Update a specific plugin to its latest version. The plugin must have been installed from GitHub. +// @Tags plugin +// @Accept json +// @Produce json +// @Param req body PluginIDReq true "Plugin ID to update" +// @Success 200 {object} common.Resp{data=model.Plugin} "Plugin updated successfully" +// @Failure 400 {object} common.Resp "Bad request" +// @Failure 500 {object} common.Resp "Internal server error" +// @Router /api/plugin/update [post] +func UpdatePlugin(c *gin.Context) { + var req PluginIDReq + if err := c.ShouldBindJSON(&req); err != nil { + common.ErrorResp(c, err, http.StatusBadRequest) + return + } + + log.Infof("Attempting to update plugin with ID: %s", req.ID) + + updatedPluginInfo, err := plugin.PluginManager.Update(c.Request.Context(), req.ID) + if err != nil { + log.Errorf("Failed to update plugin '%s': %v", req.ID, err) + common.ErrorResp(c, err, http.StatusInternalServerError) + return + } + + log.Infof("Successfully updated plugin: %s", req.ID) + common.SuccessResp(c, updatedPluginInfo.Plugin) +} + +// internal/server/handles/plugin.go + +// CheckForUpdateSingle godoc +// @Summary Check for a single plugin update +// @Description Checks a specific plugin for an available update. +// @Tags plugin +// @Accept json +// @Produce json +// @Param req body PluginIDReq true "Plugin ID to check" +// @Success 200 {object} common.Resp{data=map[string]string} "A map containing the new version if an update is available (e.g., {\"new_version\": \"1.1.0\"})" +// @Failure 400 {object} common.Resp "Bad request" +// @Failure 404 {object} common.Resp "Plugin not found or not eligible for update" +// @Failure 500 {object} common.Resp "Internal server error" +// @Router /api/plugin/updates/check_one [post] +func CheckForUpdateSingle(c *gin.Context) { + var req PluginIDReq + if err := c.ShouldBindJSON(&req); err != nil { + common.ErrorResp(c, err, http.StatusBadRequest) + return + } + + log.Infof("Checking for update for plugin: %s", req.ID) + + newVersion, err := plugin.PluginManager.CheckForUpdate(c.Request.Context(), req.ID) + if err != nil { + // 区分是插件找不到还是检查过程出错 + if strings.Contains(err.Error(), "not found") { + common.ErrorResp(c, err, http.StatusNotFound) + } else { + common.ErrorResp(c, err, http.StatusInternalServerError) + } + return + } + + response := make(map[string]string) + if newVersion != "" { + response["new_version"] = newVersion + } + + common.SuccessResp(c, response) +} diff --git a/server/router.go b/server/router.go index 0975fe695..9e5372e82 100644 --- a/server/router.go +++ b/server/router.go @@ -166,6 +166,23 @@ func admin(g *gin.RouterGroup) { setting.POST("/set_thunderx", handles.SetThunderX) setting.POST("/set_thunder_browser", handles.SetThunderBrowser) + // 添加插件管理 API 路由组 + plugin := g.Group("/plugin") + { + plugin.GET("/list", handles.ListPlugins) + plugin.POST("/install", handles.InstallPlugin) + plugin.POST("/upload", handles.InstallPluginFromUpload) + plugin.POST("/uninstall", handles.UninstallPlugin) + plugin.POST("/update", handles.UpdatePlugin) + + // 将检查更新的路由放在一个子组中,更符合 RESTful 风格 + updates := plugin.Group("/updates") + { + updates.GET("/check", handles.CheckForUpdates) + updates.POST("/check_one", handles.CheckForUpdateSingle) + } + } + // retain /admin/task API to ensure compatibility with legacy automation scripts _task(g.Group("/task"))