diff --git a/README.md b/README.md index 0bdf1d5..c453f3b 100644 --- a/README.md +++ b/README.md @@ -65,17 +65,96 @@ type taskJob struct { example: ``` - t1 := &Task{ - Job: getJob(f), - RunTime: time.Now().UnixNano() + int64(time.Second)*1, - Spacing: int64(3 * time.Second), - EndTime: time.Now().UnixNano() + int64(time.Second*20), - Uuid: "123", - } - f1 := func(reply Reply) { - fmt.Println(reply) - fmt.Println("It's reply") - } - t1.GetJob().OnStart(f1) - cron.AddTask(t1) +cron := GetTaskScheduler() + cron.Start() + f := func() { + fmt.Println("now is run job") + time.Sleep(1 * time.Second) + fmt.Println("now job success") + } + t1 := &Task{ + Job: getJob(f), + RunTime: time.Now().UnixNano() + int64(time.Second)*1, + Spacing: int64(3 * time.Second), + EndTime: time.Now().UnixNano() + int64(time.Second*20), + Uuid: "123", + } + f1 := func(reply Reply) { + log.Println("task uuid:" + reply.Ts.GetUuid() + " run start") + log.Println("task uuid:" + reply.Ts.GetUuid() + " start time" + utils.GetTimeString()) + } + t1.GetJob().OnStart(f1) + t1.GetJob().OnFinish(func(reply Reply) { + log.Println("task uuid:" + reply.Ts.GetUuid() + "success") + log.Println("task uuid:" + reply.Ts.GetUuid() + " finish time" + utils.GetTimeString()) + }) + cron.AddTask(t1) + + timer := time.NewTimer(10 * time.Second) + for { + select { + case <-timer.C: + fmt.Println("over") + } + break + } +``` + +执行结果 +``` +=== RUN TestJobEvent +2021/03/03 18:03:52 task uuid:test run start +2021/03/03 18:03:52 task uuid:test start time2021-03-03 18:03:52 +now is run job +now job success +2021/03/03 18:03:53 task uuid:testsuccess +2021/03/03 18:03:53 task uuid:test finish time2021-03-03 18:03:53 +2021/03/03 18:03:56 task uuid:test run start +2021/03/03 18:03:56 task uuid:test start time2021-03-03 18:03:56 +now is run job +now job success +2021/03/03 18:03:57 task uuid:testsuccess +2021/03/03 18:03:57 task uuid:test finish time2021-03-03 18:03:57 +2021/03/03 18:04:00 task uuid:test run start +2021/03/03 18:04:00 task uuid:test start time2021-03-03 18:04:00 +now is run job +over +--- PASS: TestJobEvent (10.00s) +``` + +手动停止正在执行的任务: +``` +cron := GetTaskScheduler() + cron.Start() + f := func() { + fmt.Println("now is run job") + time.Sleep(1 * time.Second) + } + t1 := &Task{ + Job: getJob(f), + RunTime: time.Now().UnixNano() + int64(time.Second)*1, + Spacing: int64(2 * time.Second), + EndTime: time.Now().UnixNano() + int64(time.Second*20), + Uuid: "123", + } + f1 := func(reply Reply) { + log.Println("task uuid:" + reply.Ts.GetUuid() + " stop time" + utils.GetTimeString()) + } + t1.GetJob().OnStop(f1) + cron.AddTask(t1) + + go func() { + t2 := time.NewTimer(2 * time.Second) + <-t2.C + cron.StopOnce("123") + }() +``` + +执行结果 +``` +=== RUN TestJobStopEvent +now is run job +2021/03/03 18:07:22 task uuid:123 stop time2021-03-03 18:07:22 +over +--- PASS: TestJobStopEvent (10.00s) ``` \ No newline at end of file diff --git a/init.go b/init.go index 453a6eb..2dde71c 100644 --- a/init.go +++ b/init.go @@ -1,30 +1,26 @@ package timer import ( - "log" - "os" - "sync" + "log" + "os" + "sync" ) -var successResult = Reply{ - Code:200, - Msg:"操作成功", - Err:nil, -} - var TS *TaskScheduler +var stop chan string -func init() { - TS = &TaskScheduler{ - tasks: new(sync.Map), - running:new(sync.Map), - add: make(chan TaskInterface), - stop: make(chan struct{}), - remove: make(chan string), - Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile), - } +func init() { + TS = &TaskScheduler{ + tasks: new(sync.Map), + running: new(sync.Map), + add: make(chan TaskInterface), //添加新任务信号 + stop: make(chan struct{}), //终止主进程 + remove: make(chan string), //添加子进程 + Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile), + } + stop = make(chan string) //手动关闭正在执行的任务信号 } func GetTaskScheduler() *TaskScheduler { - return TS + return TS } diff --git a/interface.go b/interface.go index b166993..03ad347 100644 --- a/interface.go +++ b/interface.go @@ -28,4 +28,17 @@ type TaskGetInterface interface{ type TaskLogInterface interface { Println(v ...interface{}) + Printf(format string, v ...interface{}) + Fatal(v ...interface{}) + Fatalf(format string, v ...interface{}) + Panic(v ...interface{}) + Panicf(format string, v ...interface{}) +} + +type LogInterface interface { + + Debug(format string, v ...interface{}) + Info(format string, v ...interface{}) + Warn(format string, v ...interface{}) + Fatal(format string, v ...interface{}) } diff --git a/job.go b/job.go index 32f5682..8f51100 100644 --- a/job.go +++ b/job.go @@ -1,18 +1,19 @@ package timer import ( - "github.com/pkg/errors" - "runtime" - "time" + "context" + "errors" + "runtime" + "time" ) //default Job type taskJob struct { Fn func() - err chan error - done chan bool - stop chan bool + finish chan interface{} replies map[string]func(reply Reply) + ctx context.Context + cancel context.CancelFunc Task TaskInterface } @@ -40,87 +41,83 @@ func (j *taskJob) OnError(f func(reply Reply)) { j.replies["error"] = f } -func (j *taskJob) run() { - - isPanic := false - defer func() { - if x := recover(); x != nil { - err := errors.Errorf("job error with panic:%v", x) - j.err <- err - isPanic = true - return - } - }() +func (j *taskJob) loop() { + + task := j.GetTask() + loop := false + now := time.Now().UnixNano() + spacing := task.GetSpacing() + if task.GetRunNumber() > 1 { + task.SetRunNumber(task.GetRunNumber() - 1) + loop = true + } else if task.GetEndTime() > now && spacing > 0 { + loop = true + } - defer func() { - if !isPanic { - j.done <- true + if loop { + if spacing > 0 { + //must use now time + //task.SetRuntime(task.GetRunTime() + task.GetSpacing()) + task.SetRuntime(now + spacing) + TS.addTaskChannel(task) } - }() - j.Fn() -} + } -func (j *taskJob) Stop() { - j.stop <- true } //run job and catch error func (j *taskJob) Run() { if f, ok := j.replies["start"]; ok { - f(Reply{}) + f(GetReply(j.Task, "-1", "start running", nil)) } - go j.run() + go func() { + defer func() { + e := recover() + if e != nil { + if f, ok := j.replies["error"]; ok { + f(getDefaultErrorReply(j.Task, panicToError(e))) + } + } else { + j.finish <- true + } + }() + j.Fn() + }() for { select { - case e := <-j.err: - if f, ok := j.replies["error"]; ok { - reply := Reply{ - Code: 500, - Msg: e.Error(), - Err: e, - } - f(reply) + //获取到终止信号 + case <-j.ctx.Done(): + if f, ok := j.replies["stop"]; ok { + f(GetReply(j.Task, "-1", "stop running", nil)) } + j.close() return - case <-j.done: + case <-j.finish: if f, ok := j.replies["finish"]; ok { - f(successResult) + f(getDefaultSuccessReply(j.Task)) } - - task := j.GetTask() - loop := false - now := time.Now().UnixNano() - if task.GetRunNumber() > 1 { - task.SetRunNumber(task.GetRunNumber() - 1) - loop = true - } else if task.GetEndTime() > now { - loop = true - } - - if loop { - spacing := task.GetSpacing() - if spacing > 0 { - //must use now time - //task.SetRuntime(task.GetRunTime() + task.GetSpacing()) - task.SetRuntime(now + spacing) - TS.addTaskChannel(task) - } - }else { - j.close(false) - } + j.loop() + j.close() return - case <-j.stop: - j.close(true) } } } -func (j *taskJob) close(exit bool) { - close(j.done) - close(j.err) - close(j.stop) - if exit { - runtime.Goexit() - } +func (j *taskJob) close() { + runtime.Goexit() +} + +func panicToError(r interface{}) error { + var err error + //check exactly what the panic was and create error. + switch x := r.(type) { + case string: + err = errors.New(x) + case error: + err = x + default: + err = errors.New("UnKnow panic") + } + return err } diff --git a/reply.go b/reply.go new file mode 100644 index 0000000..b30b5a9 --- /dev/null +++ b/reply.go @@ -0,0 +1,34 @@ +package timer + +var successReply = Reply{ + Code: "200", + Msg: "执行成功", + Err: nil, +} + +func getDefaultSuccessReply(task TaskInterface) Reply { + return Reply{ + Code: "200", + Msg: "执行成功", + Err: nil, + Ts: task, + } +} + +func getDefaultErrorReply(task TaskInterface, err error) Reply { + return Reply{ + Code: "-1", + Msg: "执行失败", + Err: err, + Ts: task, + } +} + +func GetReply(task TaskInterface, Code, Msg string, err error) Reply { + return Reply{ + Code: Code, + Msg: "执行失败", + Err: err, + Ts: task, + } +} diff --git a/struct.go b/struct.go index 0fbbf6d..6acf525 100644 --- a/struct.go +++ b/struct.go @@ -3,33 +3,32 @@ package timer import "sync" type Reply struct { - Code int64 `json:"code"` - Msg string `json:"msg"` - Err error `json:"err"` - Ts Task `json:"task"` + Code string `json:"code"` + Msg string `json:"msg"` + Err error `json:"err"` + Ts TaskInterface `json:"task"` } type TaskScheduler struct { - tasks *sync.Map - running *sync.Map - add chan TaskInterface - remove chan string - stop chan struct{} - Logger TaskLogInterface + tasks *sync.Map + running *sync.Map + add chan TaskInterface + remove chan string + stop chan struct{} + Logger TaskLogInterface } //need to do task has interface Job type Task struct { - Job *taskJob `json:"job"` - Uuid string `json:"uuid"` - RunTime int64 `json:"run_time"`//UnixNanoTime - Spacing int64 `json:"spacing"`//spacing sencond - EndTime int64 `json:"end_time"`//UnixNanoTime - Number int `json:"number"`//exec number - Status int `json:"status"` + Job *taskJob `json:"job"` + Uuid string `json:"uuid"` + RunTime int64 `json:"run_time"` //UnixNanoTime + Spacing int64 `json:"spacing"` //spacing sencond + EndTime int64 `json:"end_time"` //UnixNanoTime + Number int `json:"number"` //exec number + Status int `json:"status"` } - // //func GetSuccessResult(msg string) Reply { // if msg != "" { @@ -44,4 +43,4 @@ type Task struct { // Msg:msg, // Err:err, // } -//} \ No newline at end of file +//} diff --git a/task.go b/task.go index 53d2a91..b315e85 100644 --- a/task.go +++ b/task.go @@ -1,6 +1,7 @@ package timer import ( + "context" "fmt" "github.com/google/uuid" "time" @@ -80,14 +81,18 @@ func (task *Task) GetStatus() int { func GetJob(f func()) *taskJob { return getJob(f) } + //get a new Job func getJob(f func()) *taskJob { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + return &taskJob{ Fn: f, - stop: make(chan bool, 1), - err: make(chan error, 1), - done: make(chan bool, 1), + finish: make(chan interface{}), replies: make(map[string]func(reply Reply)), + ctx: ctx, + cancel: cancel, } } diff --git a/task_test.go b/task_test.go index 1ff9abc..b45b1b8 100644 --- a/task_test.go +++ b/task_test.go @@ -2,6 +2,8 @@ package timer import ( "fmt" + "github.com/google/uuid" + "github.com/grestful/utils" "log" "os" "testing" @@ -32,20 +34,24 @@ func TestTime(t *testing.T) { func Test_AddFunc(t *testing.T) { cron := GetTaskScheduler() - go cron.Start() - + cron.Start() cron.AddFunc(time.Now().UnixNano()+int64(time.Second*1), func() { fmt.Println("one second after") }) cron.AddFunc(time.Now().UnixNano()+int64(time.Second*1), func() { - fmt.Println("one second after, task second") + fmt.Println("one second after 1, task second") }) cron.AddFunc(time.Now().UnixNano()+int64(time.Second*10), func() { fmt.Println("ten second after") }) + cron.tasks.Range(func(key, value interface{}) bool { + fmt.Println(key, value) + return true + }) + timer := time.NewTimer(11 * time.Second) for { select { @@ -75,8 +81,8 @@ func Test_AddFuncSpace(t *testing.T) { }) cron.AddFuncSpaceNumber(int64(time.Second*1), 10, func() { - fmt.Println("number 10") - }) + fmt.Println("number 10") + }) timer := time.NewTimer(11 * time.Second) for { select { @@ -123,26 +129,87 @@ func Test_AddTask(t *testing.T) { } } -func Test_JobStartEvent(t *testing.T) { +func TestJobEvent(t *testing.T) { cron := GetTaskScheduler() cron.Start() f := func() { - fmt.Println("hello") + fmt.Println("now is run job") + time.Sleep(1 * time.Second) + fmt.Println("now job success") } t1 := &Task{ Job: getJob(f), RunTime: time.Now().UnixNano() + int64(time.Second)*1, Spacing: int64(3 * time.Second), EndTime: time.Now().UnixNano() + int64(time.Second*20), - Uuid: "123", + Uuid: "test", } f1 := func(reply Reply) { - fmt.Println(reply) - fmt.Println("It's reply") + log.Println("task uuid:" + reply.Ts.GetUuid() + " run start") + log.Println("task uuid:" + reply.Ts.GetUuid() + " start time" + utils.GetTimeString()) } t1.GetJob().OnStart(f1) + t1.GetJob().OnFinish(func(reply Reply) { + log.Println("task uuid:" + reply.Ts.GetUuid() + "success") + log.Println("task uuid:" + reply.Ts.GetUuid() + " finish time" + utils.GetTimeString()) + }) + cron.AddTask(t1) + + timer := time.NewTimer(10 * time.Second) + for { + select { + case <-timer.C: + fmt.Println("over") + } + break + } +} + +func TestTaskLoop(t *testing.T) { + fmt.Println(uuid.New().String()) + fmt.Println(uuid.New().String()) + fmt.Println(uuid.New().String()) + fmt.Println(uuid.New().String()) + fmt.Println(uuid.New().String()) + + //f := func() { + // fmt.Println("hello") + //} + //t1 := &Task{ + // Job: getJob(f), + // RunTime: time.Now().UnixNano() + int64(time.Second)*1, + // Spacing: int64(3 * time.Second), + // EndTime: time.Now().UnixNano() + int64(time.Second*20), + // Uuid: "123", + //} +} + +func TestJobStopEvent(t *testing.T) { + cron := GetTaskScheduler() + cron.Start() + f := func() { + fmt.Println("now is run job") + time.Sleep(1 * time.Second) + } + t1 := &Task{ + Job: getJob(f), + RunTime: time.Now().UnixNano() + int64(time.Second)*1, + Spacing: int64(2 * time.Second), + EndTime: time.Now().UnixNano() + int64(time.Second*20), + Uuid: "123", + } + f1 := func(reply Reply) { + log.Println("task uuid:" + reply.Ts.GetUuid() + " stop time" + utils.GetTimeString()) + } + t1.GetJob().OnStop(f1) cron.AddTask(t1) + go func() { + t2 := time.NewTimer(2 * time.Second) + <-t2.C + cron.StopOnce("123") + }() + timer := time.NewTimer(10 * time.Second) for { select { diff --git a/timer.go b/timer.go index e86db3d..92a4fcd 100644 --- a/timer.go +++ b/timer.go @@ -1,216 +1,247 @@ package timer import ( - "fmt" - "github.com/google/uuid" - "log" - "time" + "fmt" + "github.com/google/uuid" + "time" ) - //add spacing time job to list with number func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) { - task := getTaskWithFuncSpacingNumber(spaceTime, number, f) - scheduler.AddTask(task) + task := getTaskWithFuncSpacingNumber(spaceTime, number, f) + scheduler.AddTask(task) } + //add spacing time job to list with endTime //spaceTime is nano time func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) { - task := getTaskWithFuncSpacing(spaceTime, endTime, f) - scheduler.AddTask(task) + task := getTaskWithFuncSpacing(spaceTime, endTime, f) + scheduler.AddTask(task) } //add func to list func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) { - task := getTaskWithFunc(unixTime, f) - scheduler.AddTask(task) + task := getTaskWithFunc(unixTime, f) + scheduler.AddTask(task) } func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) { - scheduler.addTaskChannel(task) + scheduler.addTaskChannel(task) } //add a task to list func (scheduler *TaskScheduler) AddTask(task *Task) string { - now := time.Now().UnixNano() - if task.RunTime != 0 { - if task.RunTime < 9999999999 { - task.RunTime = task.RunTime * int64(time.Second) - } - if task.RunTime <= now { - - if task.Spacing > 0 { - task.RunTime = now + task.Spacing - }else { - //延遲1秒 - task.RunTime = now + int64(time.Second) - } - } - } else { - if task.Spacing > 0 { - task.RunTime = now + task.Spacing - }else{ - scheduler.Logger.Println("error too add task! Runtime error") - return "" - } - } - - if task.Uuid == "" { - task.Uuid = uuid.New().String() - } - scheduler.addTaskChannel(task) - return task.GetUuid() -} - -func (scheduler *TaskScheduler) addTask(task TaskInterface) string { - scheduler.tasks.Store(task.GetUuid(), task) - return task.GetUuid() -} - -func (scheduler *TaskScheduler) addTaskChannel(task TaskInterface) { - scheduler.add <- task + now := time.Now().UnixNano() + if task.RunTime != 0 { + if task.RunTime < 9999999999 { + task.RunTime = task.RunTime * int64(time.Second) + } + if task.RunTime <= now { + if task.Spacing > 0 { + task.RunTime = now + task.Spacing + } else { + //延遲1秒 + task.RunTime = now + int64(time.Second) + } + } + } else { + if task.Spacing > 0 { + task.RunTime = now + task.Spacing + } else { + scheduler.Logger.Println("error too add task! Runtime error") + return "" + } + } + + if task.Uuid == "" { + task.Uuid = uuid.New().String() + } + go scheduler.addTaskChannel(task) + return task.GetUuid() +} + +func (scheduler *TaskScheduler) addTask(task TaskInterface) string { + scheduler.tasks.Store(task.GetUuid(), task) + return task.GetUuid() } + +func (scheduler *TaskScheduler) addTaskChannel(task TaskInterface) { + scheduler.add <- task +} + //new export func (scheduler *TaskScheduler) ExportInterface() []TaskInterface { - tasks := make([]TaskInterface, 0) - scheduler.tasks.Range(func(key, value interface{}) bool { - switch value.(type) { - case TaskInterface: - tasks = append(tasks, value.(TaskInterface)) - } - - return true - }) - return tasks + tasks := make([]TaskInterface, 0) + scheduler.tasks.Range(func(key, value interface{}) bool { + switch value.(type) { + case TaskInterface: + tasks = append(tasks, value.(TaskInterface)) + } + + return true + }) + return tasks } + //compatible old export tasks func (scheduler *TaskScheduler) Export() []*Task { - tasks := make([]*Task,0) - scheduler.tasks.Range(func(key, value interface{}) bool { - switch value.(type) { - case *Task: - tasks = append(tasks, value.(*Task)) - } - - return true - }) - return tasks + tasks := make([]*Task, 0) + scheduler.tasks.Range(func(key, value interface{}) bool { + switch value.(type) { + case *Task: + tasks = append(tasks, value.(*Task)) + } + + return true + }) + return tasks } //stop task with uuid func (scheduler *TaskScheduler) StopOnce(uuidStr string) { - scheduler.remove <- uuidStr + scheduler.remove <- uuidStr } //run Cron func (scheduler *TaskScheduler) Start() { - //初始化的時候加入一個一年的長定時器,間隔1小時執行一次 - task := getTaskWithFuncSpacing(int64(3600*time.Second), time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() { - log.Println("It's a Hour timer!") - }) - scheduler.tasks.Store(task.Uuid, task) - go scheduler.run() + //初始化的時候加入一個一年的長定時器,間隔1小時執行一次 + //task := getTaskWithFuncSpacing(int64(3600*time.Second), time.Now().Add(time.Hour*24*365).UnixNano(), func() { + // scheduler.Logger.Println("It's a Hour timer!") + //}) + //scheduler.tasks.Store(task.Uuid, task) + go scheduler.run() } //stop all func (scheduler *TaskScheduler) Stop() { - scheduler.stop <- struct{}{} + scheduler.stop <- struct{}{} } //run task list //if is empty, run a year timer task func (scheduler *TaskScheduler) run() { - for { - now := time.Now() - task := scheduler.GetTask() - var d time.Duration - if task == nil { - d = 1 * time.Second - }else{ - task.GetJob().SetTask(task) - runTime := task.GetRunTime() - i64 := runTime - now.UnixNano() - if i64 < 0 { - if task.GetSpacing() > 0 { - task.SetRuntime(now.UnixNano() + task.GetSpacing()) - }else{ - task.SetRuntime(now.UnixNano() + int64(time.Second)) - } - task.SetStatus(1) - go task.RunJob() - continue - } else { - sec := runTime / int64(time.Second) - nsec := runTime % int64(time.Second) - - d = time.Unix(sec, nsec).Sub(now) - } - } - - fmt.Println(d) - timer := time.NewTimer(d) - - //catch a chan and do something - for { - select { - //if time has expired do task and shift key if is task list - case <-timer.C: - //not get remove,just run - scheduler.removeTask(task.GetUuid()) - go task.RunJob() - timer.Stop() - //if add task - case t1 := <-scheduler.add: - scheduler.addTask(t1) - timer.Stop() - // remove task with remove uuid - case uuidStr := <-scheduler.remove: - scheduler.removeTask(uuidStr) - timer.Stop() - //if get a stop single exit - case <-scheduler.stop: - timer.Stop() - return - } - - break - } - } + for { + now := time.Now() + task := scheduler.GetTask() + var d time.Duration + if task != nil { + task.GetJob().SetTask(task) + runTime := task.GetRunTime() + i64 := runTime - now.UnixNano() + if i64 < 0 { + //执行并移除 + scheduler.runJob(task) + continue + } else { + sec := runTime / int64(time.Second) + nsec := runTime % int64(time.Second) + d = time.Unix(sec, nsec).Sub(now) + } + } else { + //休眠一秒等待 + d = time.Second + } + timer := time.NewTimer(d) + + //catch a chan and do something + for { + select { + //if time has expired do task and shift key if is task list + case <-timer.C: + if task == nil { + fmt.Println("wait......") + continue + } + scheduler.runJob(task) + timer.Stop() + //if add task + case t1 := <-scheduler.add: + scheduler.addTask(t1) + timer.Stop() + // remove task with remove uuid + case uuidStr := <-scheduler.remove: + scheduler.removeTask(uuidStr) + scheduler.removeAndStopRunningTask(uuidStr) + timer.Stop() + //if get a stop single exit + case <-scheduler.stop: + timer.Stop() + return + default: + + } + + break + } + } +} + +func (scheduler *TaskScheduler) runJob(task TaskInterface) { + scheduler.removeTask(task.GetUuid()) + scheduler.addRunningTask(task) + task.SetStatus(1) + go task.RunJob() } //return a task and key In task list func (scheduler *TaskScheduler) GetTask() (task TaskInterface) { - var min int64 = 0 - scheduler.tasks.Range(func(key, value interface{}) bool { - switch value.(type) { - case TaskInterface: - - t := value.(TaskInterface) - runTime := t.GetRunTime() - if min == 0 { - min = runTime - task = t - } else { - if min > runTime { - min = runTime - task = t - } - } - } - - return true - }) - - //if task != nil { - // scheduler.removeTask(task.GetUuid()) - //} - return task + var min int64 = 0 + //fmt.Println(scheduler.tasks) + scheduler.tasks.Range(func(key, value interface{}) bool { + //fmt.Println(key,value,321312312) + switch value.(type) { + case TaskInterface: + + t := value.(TaskInterface) + runTime := t.GetRunTime() + if min == 0 { + min = runTime + task = t + } else { + if min > runTime { + min = runTime + task = t + } + } + //default: + // fmt.Println(value, "vvvvvvvvvvvvvvvvvvvvv") + } + + return true + }) + + // + //scheduler.tasks.Range(func(key, value interface{}) bool { + // fmt.Println(key, value, "=============================") + // return true + //}) + return task } //remove task by uuid func (scheduler *TaskScheduler) removeTask(uuidStr string) { - scheduler.tasks.Delete(uuidStr) + scheduler.tasks.Delete(uuidStr) } +//add running by uuid +func (scheduler *TaskScheduler) addRunningTask(task TaskInterface) { + scheduler.running.Store(task.GetUuid(), task) +} + +//remove running by uuid +func (scheduler *TaskScheduler) removeRunningTask(uuidStr string) { + scheduler.running.Delete(uuidStr) +} +//remove running by uuid +func (scheduler *TaskScheduler) removeAndStopRunningTask(uuidStr string) { + scheduler.running.Range(func(key, value interface{}) bool { + if key == uuidStr { + (value.(TaskInterface)).GetJob().cancel() + return false + } + return true + }) + scheduler.removeRunningTask(uuidStr) +}