-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.go
More file actions
100 lines (91 loc) · 2.16 KB
/
task.go
File metadata and controls
100 lines (91 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"sync/atomic"
protos "github.com/Pixelgaffer/dico-proto"
"github.com/golang/protobuf/proto"
log "github.com/Sirupsen/logrus"
)
var currTaskID int64
// Task is executed by an available worker
type Task struct {
options string
id int64
retries int64
failed bool
worker *Worker
}
func (t *Task) reportStatus(typ protos.TaskStatus_TaskStatusUpdate) {
m := &protos.TaskStatus{
Id: proto.Int64(t.id),
Options: proto.String(t.options),
Retries: proto.Int64(t.retries),
}
m.Type = &typ
if t.worker != nil {
m.Worker = proto.String(t.worker.connection.name())
}
for mang := range managers() {
mang.send <- m
}
}
func (t *Task) reportResult(data []byte) {
m := &protos.TaskResult{
Id: proto.Int64(t.id),
Options: proto.String(t.options),
Data: data,
}
for mang := range managers() {
mang.send <- m
}
}
func (t *Task) execute(c *Connection) {
stats.Pulse()
log.WithFields(log.Fields{
"id": t.id,
"options": t.options,
}).Info("executing task")
t.failed = false
c.send <- &protos.DoTask{
Id: proto.Int64(t.id),
Options: proto.String(t.options),
Code: proto.String("TODO"),
JobType: proto.String("TODO"),
}
t.reportStatus(protos.TaskStatus_STARTED)
for {
select {
case status := <-t.worker.taskStatusChan:
switch status.GetType() {
case protos.TaskStatus_FAILED:
log.WithFields(log.Fields{
"id": t.id,
"options": t.options,
}).Info("task failed")
t.failed = true
t.reportStatus(status.GetType())
accumulatedStats.With(func(s *AccumulatedStats) {
s.failedTasks++
})
return
case protos.TaskStatus_FINISHED:
t.reportStatus(status.GetType())
accumulatedStats.With(func(s *AccumulatedStats) {
s.completedTasks++
})
default:
log.WithField("status", status).Error("invalid status.Type")
}
case result := <-t.worker.taskResultChan:
log.WithField("id", t.id).Info("got task result")
t.reportResult(result.Data)
return
case <-t.worker.connection.doneCh:
t.failed = true
t.reportStatus(protos.TaskStatus_FAILED)
return
}
}
}
func getNextTaskID() int64 {
return atomic.AddInt64(&currTaskID, 1) - 1
}