Skip to content

Commit e83a6c8

Browse files
authored
Merge pull request #5 from mateusmlo/feat/server-cmd
Feat/server cmd
2 parents f69d7e9 + 7673626 commit e83a6c8

18 files changed

Lines changed: 2300 additions & 73 deletions

cmd/client/client.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
pb "github.com/mateusmlo/taskqueue/proto"
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/credentials"
12+
)
13+
14+
// createGRPCConnection creates a gRPC connection with TLS credentials
15+
func createGRPCConnection(certPath, serverName, address string) (*grpc.ClientConn, error) {
16+
tc, err := credentials.NewClientTLSFromFile(certPath, serverName)
17+
if err != nil {
18+
return nil, fmt.Errorf("failed to load TLS credentials: %w", err)
19+
}
20+
21+
clientConn, err := grpc.NewClient(address, grpc.WithTransportCredentials(tc))
22+
if err != nil {
23+
return nil, fmt.Errorf("failed to connect to server: %w", err)
24+
}
25+
26+
return clientConn, nil
27+
}
28+
29+
// submitTask submits a task to the task queue and returns the task ID
30+
func submitTask(ctx context.Context, client pb.TaskQueueClient, taskType string, payload []byte, priority pb.Priority, maxRetries int32) (string, error) {
31+
res, err := client.SubmitTask(ctx, &pb.SubmitTaskRequest{
32+
Type: taskType,
33+
Payload: payload,
34+
Priority: int32(priority),
35+
MaxRetries: maxRetries,
36+
})
37+
38+
if err != nil {
39+
return "", fmt.Errorf("task failed to submit: %w", err)
40+
}
41+
42+
return res.TaskId, nil
43+
}
44+
45+
// pollTaskUntilComplete polls the task status until it completes or times out
46+
func pollTaskUntilComplete(ctx context.Context, client pb.TaskQueueClient, taskID string, pollInterval time.Duration) error {
47+
for {
48+
taskStatusRes, err := client.GetTaskStatus(ctx, &pb.GetTaskStatusRequest{
49+
TaskId: taskID,
50+
})
51+
52+
if err != nil {
53+
return fmt.Errorf("failed to get task status: %w", err)
54+
}
55+
56+
if taskStatusRes.Status == pb.TaskStatus_COMPLETED {
57+
return nil
58+
}
59+
60+
if taskStatusRes.Status == pb.TaskStatus_FAILED {
61+
return fmt.Errorf("task failed")
62+
}
63+
64+
select {
65+
case <-ctx.Done():
66+
return ctx.Err()
67+
case <-time.After(pollInterval):
68+
// Continue polling
69+
}
70+
}
71+
}
72+
73+
// getTaskResult retrieves the task result
74+
func getTaskResult(ctx context.Context, client pb.TaskQueueClient, taskID string) (*pb.GetTaskResultResponse, error) {
75+
taskRes, err := client.GetTaskResult(ctx, &pb.GetTaskResultRequest{TaskId: taskID})
76+
if err != nil {
77+
return nil, fmt.Errorf("failed to get task result: %w", err)
78+
}
79+
80+
return taskRes, nil
81+
}
82+
83+
func main() {
84+
ctx := context.Background()
85+
86+
clientConn, err := createGRPCConnection("cert/server.crt", "localhost", "localhost:50051")
87+
if err != nil {
88+
fmt.Printf("Connection error: %s\n", err.Error())
89+
panic(err)
90+
}
91+
defer clientConn.Close()
92+
93+
taskClient := pb.NewTaskQueueClient(clientConn)
94+
95+
taskID, err := submitTask(ctx, taskClient, "reverseStr", []byte("hello world"), pb.Priority_HIGH, 3)
96+
if err != nil {
97+
log.Fatalf("Submit error: %s\n", err.Error())
98+
}
99+
100+
fmt.Printf("Task submitted with ID: %s\n", taskID)
101+
102+
ctxDeadline, cancelFunc := context.WithTimeout(ctx, 10*time.Second)
103+
defer cancelFunc()
104+
105+
fmt.Println("Polling task status...")
106+
err = pollTaskUntilComplete(ctxDeadline, taskClient, taskID, 1*time.Second)
107+
if err != nil {
108+
log.Fatalf("Polling error: %s\n", err.Error())
109+
}
110+
111+
log.Println("Task completed, fetching result...")
112+
113+
taskRes, err := getTaskResult(ctx, taskClient, taskID)
114+
if err != nil {
115+
log.Fatalf("Get result error: %s\n", err.Error())
116+
}
117+
118+
fmt.Printf("Task result: %s\n", taskRes.GetResult())
119+
}

0 commit comments

Comments
 (0)