Skip to content

Commit cd1a4f2

Browse files
authored
Merge pull request #2 from mateusmlo/feat/server
Feat/server
2 parents c649cbc + a44e86a commit cd1a4f2

File tree

4 files changed

+760
-0
lines changed

4 files changed

+760
-0
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/mateusmlo/taskqueue
33
go 1.24.1
44

55
require (
6+
github.com/google/uuid v1.6.0 // indirect
67
golang.org/x/net v0.42.0 // indirect
78
golang.org/x/sys v0.34.0 // indirect
89
golang.org/x/text v0.27.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
2+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
13
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
24
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
35
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=

internal/server/server.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/status"
11+
"google.golang.org/protobuf/types/known/timestamppb"
12+
13+
"github.com/mateusmlo/taskqueue/proto"
14+
)
15+
16+
type Priority int
17+
type TaskStatus int
18+
19+
const (
20+
HIGH Priority = iota
21+
MEDIUM
22+
LOW
23+
)
24+
25+
const (
26+
PENDING TaskStatus = iota
27+
RUNNING
28+
COMPLETED
29+
FAILED
30+
)
31+
32+
type Server struct {
33+
tasks map[string]*Task
34+
tasksMux sync.RWMutex
35+
36+
pendingQueues map[Priority][]*Task
37+
queuesMux sync.RWMutex
38+
39+
workers map[string]*Worker
40+
workersMux sync.RWMutex
41+
42+
ctx context.Context
43+
cancel context.CancelFunc
44+
45+
proto.UnimplementedTaskQueueServer
46+
proto.UnimplementedWorkerServiceServer
47+
}
48+
49+
type Task struct {
50+
ID string
51+
Type string
52+
Payload []byte
53+
Priority Priority
54+
Status TaskStatus
55+
RetryCount int
56+
MaxRetries int
57+
CreatedAt time.Time
58+
StartedAt *time.Time
59+
CompletedAt *time.Time
60+
Result []byte
61+
Error string
62+
WorkerID string
63+
}
64+
65+
type Worker struct {
66+
ID string
67+
Address string
68+
RegisteredAt time.Time
69+
LastHeartbeat time.Time
70+
TaskTypes []string
71+
Capacity int
72+
CurrentLoad int
73+
}
74+
75+
func NewServer() *Server {
76+
ctx, cancel := context.WithCancel(context.Background())
77+
78+
return &Server{
79+
tasks: make(map[string]*Task),
80+
pendingQueues: make(map[Priority][]*Task),
81+
workers: make(map[string]*Worker),
82+
ctx: ctx,
83+
cancel: cancel,
84+
}
85+
}
86+
87+
// toProtoTask converts internal Task to proto.Task
88+
func (t *Task) toProtoTask() *proto.Task {
89+
protoTask := &proto.Task{
90+
Id: t.ID,
91+
Type: t.Type,
92+
Payload: t.Payload,
93+
Priority: proto.Priority(t.Priority),
94+
MaxRetries: int32(t.MaxRetries),
95+
RetryCount: int32(t.RetryCount),
96+
CreatedAt: timestamppb.New(t.CreatedAt),
97+
Status: proto.TaskStatus(t.Status),
98+
}
99+
100+
// Handle optional timestamp fields
101+
if t.StartedAt != nil {
102+
protoTask.StartedAt = timestamppb.New(*t.StartedAt)
103+
}
104+
if t.CompletedAt != nil {
105+
protoTask.CompletedAt = timestamppb.New(*t.CompletedAt)
106+
}
107+
108+
return protoTask
109+
}
110+
111+
func (s *Server) SubmitTask(ctx context.Context, req *proto.SubmitTaskRequest) (*proto.SubmitTaskResponse, error) {
112+
uuid, err := uuid.NewV7()
113+
if err != nil {
114+
return nil, err
115+
}
116+
taskID := uuid.String()
117+
118+
newTask := &Task{
119+
ID: taskID,
120+
Type: req.Type,
121+
Payload: req.Payload,
122+
Priority: Priority(req.Priority),
123+
Status: PENDING,
124+
RetryCount: 0,
125+
MaxRetries: int(req.MaxRetries),
126+
CreatedAt: time.Now(),
127+
}
128+
129+
s.tasksMux.Lock()
130+
defer s.tasksMux.Unlock()
131+
132+
s.tasks[taskID] = newTask
133+
134+
s.queuesMux.Lock()
135+
defer s.queuesMux.Unlock()
136+
137+
s.pendingQueues[newTask.Priority] = append(s.pendingQueues[newTask.Priority], newTask)
138+
139+
return &proto.SubmitTaskResponse{TaskId: newTask.ID}, nil
140+
}
141+
142+
func (s *Server) GetTaskStatus(ctx context.Context, req *proto.GetTaskStatusRequest) (*proto.GetTaskStatusResponse, error) {
143+
s.tasksMux.RLock()
144+
defer s.tasksMux.RUnlock()
145+
146+
task, exists := s.tasks[req.TaskId]
147+
if !exists {
148+
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
149+
}
150+
151+
return &proto.GetTaskStatusResponse{Status: proto.TaskStatus(task.Status)}, nil
152+
}
153+
154+
func (s *Server) GetTaskResult(ctx context.Context, req *proto.GetTaskResultRequest) (*proto.GetTaskResultResponse, error) {
155+
s.tasksMux.RLock()
156+
defer s.tasksMux.RUnlock()
157+
158+
task, exists := s.tasks[req.TaskId]
159+
if !exists {
160+
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
161+
}
162+
163+
if task.Status != COMPLETED {
164+
return nil, status.Errorf(codes.FailedPrecondition, "task %s not completed yet", req.TaskId)
165+
}
166+
167+
return &proto.GetTaskResultResponse{Task: task.toProtoTask()}, nil
168+
}

0 commit comments

Comments
 (0)