Skip to content

feat(drivers/weiyun_open): support weiyun official mcp api#2305

Open
yokinanya wants to merge 1 commit intoOpenListTeam:mainfrom
yokinanya:codex/add-weiyun-open-driver
Open

feat(drivers/weiyun_open): support weiyun official mcp api#2305
yokinanya wants to merge 1 commit intoOpenListTeam:mainfrom
yokinanya:codex/add-weiyun-open-driver

Conversation

@yokinanya
Copy link
Copy Markdown

Description / 描述

  • 新增 WeiYun Open 存储驱动,基于微云官方 MCP 接口实现
  • 官方接口目前未提供创建目录、移动文件或目录、重命名文件或目录等接口,所以无法实现

暂无权限添加标签,需要管理员处理

Motivation and Context / 背景

使用官方MCP接口,可以避免cookie过期的问题

How Has This Been Tested? / 测试

已执行以下测试:

  • go test ./drivers/weiyun_open
  • go test ./drivers

Checklist / 检查清单

  • I have read the CONTRIBUTING document.
    我已阅读 CONTRIBUTING 文档。
  • I have formatted my code with go fmt or prettier.
    我已使用 go fmtprettier 格式化提交的代码。
  • I have added appropriate labels to this PR (or mentioned needed labels in the description if lacking permissions).
    我已为此 PR 添加了适当的标签(如无权限或需要的标签不存在,请在描述中说明,管理员将后续处理)。
  • I have requested review from relevant code authors using the "Request review" feature when applicable.
    我已在适当情况下使用"Request review"功能请求相关代码作者进行审查。
  • I have updated the repository accordingly (If it’s needed).
    我已相应更新了相关仓库(若适用)。

@j2rong4cn
Copy link
Copy Markdown
Member

分片上传参考可以这个,stream.NewStreamSectionReader + errgroup.NewOrderedGroupWithContext

func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createResp *UploadCreateResp, up driver.UpdateProgress) error {
uploadDomain := createResp.Data.Servers[0]
size := file.GetSize()
chunkSize := createResp.Data.SliceSize
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
if err != nil {
return err
}
uploadNums := (size + chunkSize - 1) / chunkSize
thread := min(int(uploadNums), d.UploadThread)
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
for partIndex := range uploadNums {
if utils.IsCanceled(uploadCtx) {
break
}
partIndex := partIndex
partNumber := partIndex + 1 // 分片号从1开始
offset := partIndex * chunkSize
size := min(chunkSize, size-offset)
var reader io.ReadSeeker
var rateLimitedRd io.Reader
sliceMD5 := ""
// 表单
b := bytes.NewBuffer(make([]byte, 0, 2048))
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) (err error) {
reader, err = ss.GetSectionReader(offset, size)
return
},
Do: func(ctx context.Context) (err error) {
reader.Seek(0, io.SeekStart)
if sliceMD5 == "" {
// 把耗时的计算放在这里,避免阻塞其他协程
sliceMD5, err = utils.HashReader(utils.MD5, reader)
if err != nil {
return err
}
reader.Seek(0, io.SeekStart)
}
b.Reset()
w := multipart.NewWriter(b)
// 添加表单字段
err = w.WriteField("preuploadID", createResp.Data.PreuploadID)
if err != nil {
return err
}
err = w.WriteField("sliceNo", strconv.FormatInt(partNumber, 10))
if err != nil {
return err
}
err = w.WriteField("sliceMD5", sliceMD5)
if err != nil {
return err
}
// 写入文件内容
_, err = w.CreateFormFile("slice", fmt.Sprintf("%s.part%d", file.GetName(), partNumber))
if err != nil {
return err
}
headSize := b.Len()
err = w.Close()
if err != nil {
return err
}
head := bytes.NewReader(b.Bytes()[:headSize])
tail := bytes.NewReader(b.Bytes()[headSize:])
rateLimitedRd = driver.NewLimitedUploadStream(ctx, io.MultiReader(head, reader, tail))
token, err := d.getAccessToken(false)
if err != nil {
return err
}
// 创建请求并设置header
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadDomain+"/upload/v2/file/slice", rateLimitedRd)
if err != nil {
return err
}
// 设置请求头
req.Header.Add("Authorization", "Bearer "+token)
req.Header.Add("Content-Type", w.FormDataContentType())
req.Header.Add("Platform", "open_platform")
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("slice %d upload failed, status code: %d", partNumber, res.StatusCode)
}
b.Reset()
_, err = b.ReadFrom(res.Body)
if err != nil {
return err
}
var resp BaseResp
err = json.Unmarshal(b.Bytes(), &resp)
if err != nil {
return err
}
if resp.Code != 0 {
return fmt.Errorf("slice %d upload failed: %s", partNumber, resp.Message)
}
progress := 100 * float64(threadG.Success()+1) / float64(uploadNums+1)
up(progress)
return nil
},
After: func(err error) {
ss.FreeSectionReader(reader)
},
})
}
if err := threadG.Wait(); err != nil {
return err
}
return nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants