Skip to content

Commit 9462a2b

Browse files
author
willzhen
committed
Add videocut demo
1 parent b9ae6f3 commit 9462a2b

File tree

7 files changed

+492
-126
lines changed

7 files changed

+492
-126
lines changed

example/add_service/add_service.go

Lines changed: 0 additions & 126 deletions
This file was deleted.

example/videocut_example/main.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"strconv"
8+
"time"
9+
10+
lighttaskscheduler "github.com/memory-overflow/light-task-scheduler"
11+
memeorycontainer "github.com/memory-overflow/light-task-scheduler/container/memory_container"
12+
videocut "github.com/memory-overflow/light-task-scheduler/example/videocut_example/video_cut"
13+
)
14+
15+
func main() {
16+
go videocut.StartServer() // start video cut microservice
17+
18+
container := memeorycontainer.MakeQueueContainer(10000, 100*time.Millisecond)
19+
actuator := videocut.MakeVideoCutActuator()
20+
sch := lighttaskscheduler.MakeNewScheduler(
21+
context.Background(),
22+
container, actuator,
23+
lighttaskscheduler.Config{
24+
TaskLimit: 2,
25+
ScanInterval: 50 * time.Millisecond,
26+
TaskTimeout: 20 * time.Second, // 20s 超时
27+
},
28+
)
29+
30+
var c chan os.Signal
31+
for i := 100; i < 200; i += 10 {
32+
select {
33+
case <-c:
34+
return
35+
default:
36+
sch.AddTask(context.Background(),
37+
lighttaskscheduler.Task{
38+
TaskId: strconv.Itoa(i),
39+
TaskItem: videocut.VideoCutTask{
40+
InputVideo: "/data/workspace/ai-media/media-ai-ppl/video/benpaobaxiongdi_S2EP12_20150703.mp4",
41+
CutStartTime: 10,
42+
CutEndTime: float32(i),
43+
},
44+
})
45+
}
46+
}
47+
48+
for range c {
49+
log.Println("stop Scheduling")
50+
sch.Close()
51+
return
52+
}
53+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package videocut
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"os"
9+
"sync"
10+
"time"
11+
12+
"github.com/memory-overflow/go-common-library/httpcall"
13+
framework "github.com/memory-overflow/light-task-scheduler"
14+
)
15+
16+
// VideoCutActuator 示例执行器
17+
type VideoCutActuator struct {
18+
EndPoint string
19+
}
20+
21+
// MakeVideoCutActuator 构造执行器
22+
func MakeVideoCutActuator() *VideoCutActuator {
23+
return &VideoCutActuator{
24+
EndPoint: "http://127.0.0.1:8000",
25+
}
26+
}
27+
28+
// Init 任务在被调度前的初始化工作
29+
func (v *VideoCutActuator) Init(ctx context.Context, ftask *framework.Task) (
30+
newTask *framework.Task, err error) {
31+
// 初始化检查任务参数
32+
task, ok := ftask.TaskItem.(VideoCutTask)
33+
if !ok {
34+
return ftask, fmt.Errorf("TaskItem not be set to AddTask")
35+
}
36+
if _, err := os.Stat(task.InputVideo); err != nil {
37+
return ftask, fmt.Errorf("InputVideo does not exist")
38+
}
39+
if task.CutStartTime > task.CutEndTime {
40+
return ftask, fmt.Errorf("error: CutStartTime after CutEndTime")
41+
}
42+
log.Println("hhhhh")
43+
return ftask, nil
44+
}
45+
46+
// Run 执行任务
47+
func (e *VideoCutActuator) Start(ctx context.Context, ftask *framework.Task) (
48+
newTask *framework.Task, ignoreErr bool, err error) {
49+
task, ok := ftask.TaskItem.(VideoCutTask)
50+
if !ok {
51+
return ftask, false, fmt.Errorf("TaskItem not be set to AddTask")
52+
}
53+
req := VideoCutReq{
54+
InputVideo: task.InputVideo,
55+
StartTime: task.CutStartTime,
56+
EndTIme: task.CutEndTime,
57+
}
58+
var rsp VideoCutRsp
59+
err = httpcall.JsonPost(ctx, e.EndPoint+"/VideoCut", nil, req, &rsp)
60+
if err != nil {
61+
return nil, true, err // 网络问题导致调度,忽略错误,重试调度
62+
}
63+
64+
task.TaskId = rsp.TaskId
65+
ftask.TaskItem = task
66+
log.Printf("[fTaskId:%s, cutTaskId:%s]start success\n", ftask.TaskId, rsp.TaskId)
67+
return ftask, false, nil
68+
}
69+
70+
// ExportOutput 导出任务输出,自行处理任务结果
71+
func (e *VideoCutActuator) ExportOutput(ctx context.Context, ftask *framework.Task) error {
72+
task, ok := ftask.TaskItem.(VideoCutTask)
73+
if !ok {
74+
return fmt.Errorf("TaskItem not be set to VideoCutTask")
75+
}
76+
req := StatusRequest{
77+
TaskId: task.TaskId,
78+
}
79+
var rsp GetOutputVideoResponse
80+
err := httpcall.JsonPost(ctx, e.EndPoint+"/GetOutputVideo", nil, req, &rsp)
81+
if err != nil {
82+
log.Printf("[fTaskId:%s, cutTaskId:%s]get video output error: %v\n",
83+
ftask.TaskId, task.TaskId, err)
84+
return err
85+
}
86+
if rsp.Reason != "" {
87+
err = fmt.Errorf("[TaskId:%s]get video output error: %s", task.TaskId, rsp.Reason)
88+
return err
89+
}
90+
log.Printf("[fTaskId:%s, cutTaskId:%s]finished cut %s from %g to %g, output: %s, timecost: %ds",
91+
ftask.TaskId, task.TaskId, task.InputVideo, task.CutStartTime, task.CutEndTime,
92+
rsp.OutputVideo, (time.Now().Unix() - ftask.TaskStartTime.Unix()))
93+
return nil
94+
}
95+
96+
// Stop 停止任务
97+
func (e *VideoCutActuator) Stop(ctx context.Context, ftask *framework.Task) error {
98+
task, ok := ftask.TaskItem.(VideoCutTask)
99+
if !ok {
100+
return fmt.Errorf("TaskItem not be set to VideoCutTask")
101+
}
102+
req := StopRequest{
103+
TaskId: task.TaskId,
104+
}
105+
var rsp StopResponse
106+
err := httpcall.JsonPost(ctx, e.EndPoint+"/Stop", nil, req, &rsp)
107+
if err != nil {
108+
log.Printf("[fTaskId:%s, cutTaskId:%s]stop task error: %v\n",
109+
ftask.TaskId, task.TaskId, err)
110+
return err
111+
}
112+
if rsp.Reason != "" {
113+
err = fmt.Errorf("[fTaskId:%s, cutTaskId:%s]stop task error: %s",
114+
ftask.TaskId, task.TaskId, rsp.Reason)
115+
return err
116+
}
117+
log.Printf("[fTaskId:%s, cutTaskId:%s]stop task success", ftask.TaskId, task.TaskId)
118+
return nil
119+
}
120+
121+
// GetAsyncTaskStatus 获取任务状态
122+
func (e *VideoCutActuator) GetAsyncTaskStatus(ctx context.Context, tasks []framework.Task) (
123+
status []framework.AsyncTaskStatus, err error) {
124+
status = make([]framework.AsyncTaskStatus, len(tasks))
125+
wg := sync.WaitGroup{}
126+
for i := range tasks {
127+
wg.Add(1)
128+
go func(index int) {
129+
defer wg.Done()
130+
task, ok := tasks[index].TaskItem.(VideoCutTask)
131+
if !ok {
132+
status[index] = framework.AsyncTaskStatus{
133+
TaskStatus: framework.TASK_STATUS_FAILED,
134+
FailedReason: errors.New("system error: TaskItem not be set to VideoCutTask"),
135+
Progress: float32(0.0),
136+
}
137+
return
138+
}
139+
140+
req := StatusRequest{
141+
TaskId: task.TaskId,
142+
}
143+
var rsp StatusResponse
144+
err := httpcall.JsonPost(ctx, e.EndPoint+"/Status", nil, req, &rsp)
145+
if err != nil {
146+
log.Printf("[fTaskId:%s, cutTaskId:%s]get task status: %v\n", tasks[index].TaskId, task.TaskId, err)
147+
status[index] = framework.AsyncTaskStatus{
148+
TaskStatus: framework.TASK_STATUS_RUNNING,
149+
Progress: float32(0.0),
150+
} // 网络错误,当成继续执行
151+
return
152+
}
153+
if rsp.Reason != "" {
154+
status[index] = framework.AsyncTaskStatus{
155+
TaskStatus: framework.TASK_STATUS_FAILED,
156+
FailedReason: fmt.Errorf("get task status failed: " + rsp.Reason),
157+
Progress: float32(0.0),
158+
}
159+
return
160+
}
161+
if rsp.Status == TASK_STATUS_RUNNING {
162+
status[index] = framework.AsyncTaskStatus{
163+
TaskStatus: framework.TASK_STATUS_RUNNING,
164+
Progress: float32(0.0),
165+
}
166+
} else if rsp.Status == TASK_STATUS_SUCCESS {
167+
status[index] = framework.AsyncTaskStatus{
168+
TaskStatus: framework.TASK_STATUS_SUCCESS,
169+
Progress: float32(100.0),
170+
}
171+
} else if rsp.Status == TASK_STATUS_FAILED {
172+
status[index] = framework.AsyncTaskStatus{
173+
TaskStatus: framework.TASK_STATUS_FAILED,
174+
Progress: float32(0.0),
175+
}
176+
}
177+
}(i)
178+
}
179+
wg.Wait()
180+
return status, nil
181+
}
182+
183+
// GetOutput 提供业务查询任务结果的接口
184+
func (e *VideoCutActuator) GetOutput(ctx context.Context, ftask *framework.Task) (
185+
data interface{}, err error) {
186+
return nil, nil
187+
}
188+
189+
// GetOutput ...
190+
func (e *VideoCutActuator) Delete(ctx context.Context, ftask *framework.Task) (err error) {
191+
return e.Stop(ctx, ftask)
192+
}

0 commit comments

Comments
 (0)