Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 92 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,96 @@ type taskJob struct {

example:
```
t1 := &Task{
Job: getJob(f),
RunTime: time.Now().UnixNano() + int64(time.Second)*1,
Spacing: int64(3 * time.Second),
EndTime: time.Now().UnixNano() + int64(time.Second*20),
Uuid: "123",
}
f1 := func(reply Reply) {
fmt.Println(reply)
fmt.Println("It's reply")
}
t1.GetJob().OnStart(f1)
cron.AddTask(t1)
cron := GetTaskScheduler()
cron.Start()
f := func() {
fmt.Println("now is run job")
time.Sleep(1 * time.Second)
fmt.Println("now job success")
}
t1 := &Task{
Job: getJob(f),
RunTime: time.Now().UnixNano() + int64(time.Second)*1,
Spacing: int64(3 * time.Second),
EndTime: time.Now().UnixNano() + int64(time.Second*20),
Uuid: "123",
}
f1 := func(reply Reply) {
log.Println("task uuid:" + reply.Ts.GetUuid() + " run start")
log.Println("task uuid:" + reply.Ts.GetUuid() + " start time" + utils.GetTimeString())
}
t1.GetJob().OnStart(f1)
t1.GetJob().OnFinish(func(reply Reply) {
log.Println("task uuid:" + reply.Ts.GetUuid() + "success")
log.Println("task uuid:" + reply.Ts.GetUuid() + " finish time" + utils.GetTimeString())
})
cron.AddTask(t1)

timer := time.NewTimer(10 * time.Second)
for {
select {
case <-timer.C:
fmt.Println("over")
}
break
}
```

执行结果
```
=== RUN TestJobEvent
2021/03/03 18:03:52 task uuid:test run start
2021/03/03 18:03:52 task uuid:test start time2021-03-03 18:03:52
now is run job
now job success
2021/03/03 18:03:53 task uuid:testsuccess
2021/03/03 18:03:53 task uuid:test finish time2021-03-03 18:03:53
2021/03/03 18:03:56 task uuid:test run start
2021/03/03 18:03:56 task uuid:test start time2021-03-03 18:03:56
now is run job
now job success
2021/03/03 18:03:57 task uuid:testsuccess
2021/03/03 18:03:57 task uuid:test finish time2021-03-03 18:03:57
2021/03/03 18:04:00 task uuid:test run start
2021/03/03 18:04:00 task uuid:test start time2021-03-03 18:04:00
now is run job
over
--- PASS: TestJobEvent (10.00s)
```

手动停止正在执行的任务:
```
cron := GetTaskScheduler()
cron.Start()
f := func() {
fmt.Println("now is run job")
time.Sleep(1 * time.Second)
}
t1 := &Task{
Job: getJob(f),
RunTime: time.Now().UnixNano() + int64(time.Second)*1,
Spacing: int64(2 * time.Second),
EndTime: time.Now().UnixNano() + int64(time.Second*20),
Uuid: "123",
}
f1 := func(reply Reply) {
log.Println("task uuid:" + reply.Ts.GetUuid() + " stop time" + utils.GetTimeString())
}
t1.GetJob().OnStop(f1)
cron.AddTask(t1)

go func() {
t2 := time.NewTimer(2 * time.Second)
<-t2.C
cron.StopOnce("123")
}()
```

执行结果
```
=== RUN TestJobStopEvent
now is run job
2021/03/03 18:07:22 task uuid:123 stop time2021-03-03 18:07:22
over
--- PASS: TestJobStopEvent (10.00s)
```
34 changes: 15 additions & 19 deletions init.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
package timer

import (
"log"
"os"
"sync"
"log"
"os"
"sync"
)

var successResult = Reply{
Code:200,
Msg:"操作成功",
Err:nil,
}

var TS *TaskScheduler
var stop chan string

func init() {
TS = &TaskScheduler{
tasks: new(sync.Map),
running:new(sync.Map),
add: make(chan TaskInterface),
stop: make(chan struct{}),
remove: make(chan string),
Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile),
}
func init() {
TS = &TaskScheduler{
tasks: new(sync.Map),
running: new(sync.Map),
add: make(chan TaskInterface), //添加新任务信号
stop: make(chan struct{}), //终止主进程
remove: make(chan string), //添加子进程
Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile),
}
stop = make(chan string) //手动关闭正在执行的任务信号
}

func GetTaskScheduler() *TaskScheduler {
return TS
return TS
}
13 changes: 13 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,17 @@ type TaskGetInterface interface{

type TaskLogInterface interface {
Println(v ...interface{})
Printf(format string, v ...interface{})
Fatal(v ...interface{})
Fatalf(format string, v ...interface{})
Panic(v ...interface{})
Panicf(format string, v ...interface{})
}

type LogInterface interface {

Debug(format string, v ...interface{})
Info(format string, v ...interface{})
Warn(format string, v ...interface{})
Fatal(format string, v ...interface{})
}
133 changes: 65 additions & 68 deletions job.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package timer

import (
"github.com/pkg/errors"
"runtime"
"time"
"context"
"errors"
"runtime"
"time"
)

//default Job
type taskJob struct {
Fn func()
err chan error
done chan bool
stop chan bool
finish chan interface{}
replies map[string]func(reply Reply)
ctx context.Context
cancel context.CancelFunc
Task TaskInterface
}

Expand Down Expand Up @@ -40,87 +41,83 @@ func (j *taskJob) OnError(f func(reply Reply)) {
j.replies["error"] = f
}

func (j *taskJob) run() {

isPanic := false
defer func() {
if x := recover(); x != nil {
err := errors.Errorf("job error with panic:%v", x)
j.err <- err
isPanic = true
return
}
}()
func (j *taskJob) loop() {

task := j.GetTask()
loop := false
now := time.Now().UnixNano()
spacing := task.GetSpacing()
if task.GetRunNumber() > 1 {
task.SetRunNumber(task.GetRunNumber() - 1)
loop = true
} else if task.GetEndTime() > now && spacing > 0 {
loop = true
}

defer func() {
if !isPanic {
j.done <- true
if loop {
if spacing > 0 {
//must use now time
//task.SetRuntime(task.GetRunTime() + task.GetSpacing())
task.SetRuntime(now + spacing)
TS.addTaskChannel(task)
}
}()
j.Fn()
}
}

func (j *taskJob) Stop() {
j.stop <- true
}

//run job and catch error
func (j *taskJob) Run() {
if f, ok := j.replies["start"]; ok {
f(Reply{})
f(GetReply(j.Task, "-1", "start running", nil))
}

go j.run()
go func() {
defer func() {
e := recover()
if e != nil {
if f, ok := j.replies["error"]; ok {
f(getDefaultErrorReply(j.Task, panicToError(e)))
}
} else {
j.finish <- true
}
}()
j.Fn()
}()
for {
select {
case e := <-j.err:
if f, ok := j.replies["error"]; ok {
reply := Reply{
Code: 500,
Msg: e.Error(),
Err: e,
}
f(reply)
//获取到终止信号
case <-j.ctx.Done():
if f, ok := j.replies["stop"]; ok {
f(GetReply(j.Task, "-1", "stop running", nil))
}
j.close()
return
case <-j.done:
case <-j.finish:
if f, ok := j.replies["finish"]; ok {
f(successResult)
f(getDefaultSuccessReply(j.Task))
}

task := j.GetTask()
loop := false
now := time.Now().UnixNano()
if task.GetRunNumber() > 1 {
task.SetRunNumber(task.GetRunNumber() - 1)
loop = true
} else if task.GetEndTime() > now {
loop = true
}

if loop {
spacing := task.GetSpacing()
if spacing > 0 {
//must use now time
//task.SetRuntime(task.GetRunTime() + task.GetSpacing())
task.SetRuntime(now + spacing)
TS.addTaskChannel(task)
}
}else {
j.close(false)
}
j.loop()
j.close()
return
case <-j.stop:
j.close(true)
}
}
}

func (j *taskJob) close(exit bool) {
close(j.done)
close(j.err)
close(j.stop)
if exit {
runtime.Goexit()
}
func (j *taskJob) close() {
runtime.Goexit()
}

func panicToError(r interface{}) error {
var err error
//check exactly what the panic was and create error.
switch x := r.(type) {
case string:
err = errors.New(x)
case error:
err = x
default:
err = errors.New("UnKnow panic")
Copy link

Copilot AI Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible spelling mistake: consider changing 'UnKnow panic' to 'Unknown panic' in the panic error message.

Suggested change
err = errors.New("UnKnow panic")
err = errors.New("Unknown panic")

Copilot uses AI. Check for mistakes.
}
return err
}
34 changes: 34 additions & 0 deletions reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package timer

var successReply = Reply{
Code: "200",
Msg: "执行成功",
Err: nil,
}

func getDefaultSuccessReply(task TaskInterface) Reply {
return Reply{
Code: "200",
Msg: "执行成功",
Err: nil,
Ts: task,
}
}

func getDefaultErrorReply(task TaskInterface, err error) Reply {
return Reply{
Code: "-1",
Msg: "执行失败",
Err: err,
Ts: task,
}
}

func GetReply(task TaskInterface, Code, Msg string, err error) Reply {
return Reply{
Code: Code,
Msg: "执行失败",
Err: err,
Ts: task,
}
}
Loading