Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .todos.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ Filtros de mensagens publicadas
-> ✅ CI para validação minima do projeto
-> ✅ Validar a aplicação para ver se todos lugares que salvam no redis estão com TTL >> OK
-> ✅ Conseguir rodar asynq e google pubsub juntos para configurações específicas




>> Adicionar Cancel context by timeout

>> Implementar ack | nack task
48 changes: 26 additions & 22 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,39 @@ import (
"github.com/redis/go-redis/v9"
)

// waitForShutdown waits for SIGINT/SIGTERM and gracefully shuts down the provided servers.
func waitForShutdown(server *http.Server) {
// Centralized shutdown: creates a context that is cancelled on SIGINT/SIGTERM and waits for all servers to shutdown.
func waitForShutdown(ctx context.Context, servers []*http.Server) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

log.Println("Shutting down servers...")
select {
case <-quit:
log.Println("Shutting down servers...")
case <-ctx.Done():
log.Println("Context cancelled, shutting down servers...")
}

shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

if server != nil {
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("Backoffice server shutdown error: %v", err)
for _, server := range servers {
if server != nil {
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("Server shutdown error: %v", err)
}
}
}

log.Println("Server shutdown complete")
log.Println("All servers shutdown complete")
}

// go run . --scope=all
// go run . --scope=backoffice
// go run . --scope=pubsub
// go run . --scope=task
func main() {
conf := cfg.Get()
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

scope := flag.String("scope", "all", "service to run")
flag.Parse()
Expand All @@ -64,6 +70,8 @@ func main() {
}

var servers []*http.Server
var closers []func()

if scopeOrAll(*scope, "backoffice") {
backofficeServer := backoffice.Start(
redisClient,
Expand All @@ -85,30 +93,26 @@ func main() {
s := pubsub.New(
store, memStore, fetch, storeInsights,
)

s.Start(ctx, conf)

defer s.Close()

closers = append(closers, s.Close)
servers = append(servers, s.Server())
}

if scopeOrAll(*scope, "task") {
s := task.New(
store, memStore, fetch, storeInsights,
)

s.Start(ctx, conf)

defer s.Close()

closers = append(closers, s.Close)
servers = append(servers, s.Server())
}

for _, server := range servers {
waitForShutdown(server)
}
waitForShutdown(ctx, servers)

// call all closers after shutdown
for _, closeFn := range closers {
closeFn()
}
}

func scopeOrAll(scope, expected string) bool {
Expand Down
216 changes: 216 additions & 0 deletions cmd/api/shutdown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package main

import (
"bufio"
"bytes"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"
)

// syncBuffer wraps a buffer so stdout and stderr can be written concurrently without dropping output.
type syncBuffer struct {
mu sync.Mutex
b bytes.Buffer
}

func (s *syncBuffer) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.b.Write(p)
}

func (s *syncBuffer) String() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.b.String()
}

// buildApiBinary builds ./cmd/api and returns the path to the binary.
func buildApiBinary(t *testing.T, moduleRoot string) string {
t.Helper()
exe := filepath.Join(t.TempDir(), "gqueue-api")
if os.PathListSeparator == ';' {
exe += ".exe"
}
build := exec.Command("go", "build", "-o", exe, "./cmd/api")
build.Dir = moduleRoot
if out, err := build.CombinedOutput(); err != nil {
t.Fatalf("go build ./cmd/api: %v\n%s", err, out)
}
return exe
}

// loadEnvFile reads path and sets env vars from KEY=VALUE lines (comments and empty lines ignored).
func loadEnvFile(path string) {
f, err := os.Open(path)
if err != nil {
return
}
defer f.Close()
s := bufio.NewScanner(f)
for s.Scan() {
line := strings.TrimSpace(s.Text())
if line == "" || strings.HasPrefix(line, "#") {
continue
}
if strings.HasPrefix(line, "export ") {
line = strings.TrimSpace(line[7:])
}
i := strings.Index(line, "=")
if i <= 0 {
continue
}
key := strings.TrimSpace(line[:i])
value := strings.TrimSpace(line[i+1:])
if len(value) >= 2 && (value[0] == '"' && value[len(value)-1] == '"' || value[0] == '\'' && value[len(value)-1] == '\'') {
value = value[1 : len(value)-1]
}
_ = os.Setenv(key, value)
}
}

// holdBackofficeConnection opens a TCP connection to the backoffice server and sends an
// incomplete HTTP request so the server blocks reading until ReadTimeout. When we SIGINT,
// the server must wait for this connection to drain before exiting—proving shutdown waits.
func holdBackofficeConnection(t *testing.T) (closeConn func()) {
t.Helper()
port := os.Getenv("BACKOFFICE_API_PORT")
if port == "" {
port = "8081"
}
addr := net.JoinHostPort("localhost", port)
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if err != nil {
t.Skipf("Cannot connect to backoffice at %s (server may not be up): %v", addr, err)
return func() {}
}
// Incomplete request: no \r\n\r\n so server keeps reading (until ReadTimeout).
_, _ = conn.Write([]byte("GET /health HTTP/1.1\r\nHost: localhost\r\n"))
return func() { _ = conn.Close() }
}

// requiredShutdownLogs are the log messages that must appear (see main.go waitForShutdown).
// We require the first and last so we know shutdown started and completed; the duration lines
// in between may be interleaved with asynq logs and can be flaky to capture.
const minShutdownDuration = 2 * time.Second

var requiredShutdownLogs = []string{
"Shutting down servers...",
"All servers shutdown complete",
}

func TestShutdownGraceful(t *testing.T) {
wd, err := os.Getwd()
if err != nil {
t.Fatalf("Failed to get working directory: %v", err)
}

// Find module root (same dir as go.mod).
moduleRoot := wd
for {
goMod := filepath.Join(moduleRoot, "go.mod")
if _, err := os.Stat(goMod); err == nil {
break
}
parent := filepath.Dir(moduleRoot)
if parent == moduleRoot {
t.Fatalf("go.mod not found (searched from %s)", wd)
}
moduleRoot = parent
}

// Load .env from module root so "go test" can use local env (only stdlib, no godotenv).
loadEnvFile(filepath.Join(moduleRoot, ".env"))

// Skip if required env is missing (integration test needs Redis + DB).
if os.Getenv("DB_CONNECTION_STRING") == "" || os.Getenv("CACHE_ADDR") == "" {
t.Skip("Skipping shutdown integration test: DB_CONNECTION_STRING and CACHE_ADDR must be set (use .env or run scripts/run_shutdown_test.sh)")
}

// Build and run the binary so SIGINT goes to the server process (go run would send it to the go process only).
exe := buildApiBinary(t, moduleRoot)
cmd := exec.Command(exe)
cmd.Dir = moduleRoot
output := &syncBuffer{}
cmd.Stdout = output
cmd.Stderr = output

if err := cmd.Start(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
defer func() {
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
}()

// Wait for server to be up (main finishes init and enters waitForShutdown).
time.Sleep(3 * time.Second)

if cmd.Process == nil {
t.Fatalf("cmd.Process is nil after Start")
}

// Hold one connection with an incomplete HTTP request so the server must wait for it
// (or ReadTimeout) during shutdown. This proves shutdown is not killing immediately.
closeConn := holdBackofficeConnection(t)
defer closeConn()

// Give the server time to see the connection (in-flight).
time.Sleep(500 * time.Millisecond)

startShutdown := time.Now()
if err := cmd.Process.Signal(os.Interrupt); err != nil {
t.Fatalf("Failed to send SIGINT: %v", err)
}

done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()

select {
case waitErr := <-done:
shutdownDuration := time.Since(startShutdown)
outStr := output.String()

// (1) Process must exit with code 0 (graceful exit). Non-zero = panic, crash, or unclean exit.
if waitErr != nil {
if exitErr, ok := waitErr.(*exec.ExitError); ok {
t.Errorf("Process did not exit gracefully: exit code %d. Output:\n%s", exitErr.ExitCode(), outStr)
} else {
t.Errorf("Process wait error: %v", waitErr)
}
}

// (2) All shutdown log lines from waitForShutdown must appear.
for _, want := range requiredShutdownLogs {
if !strings.Contains(outStr, want) {
t.Errorf("Shutdown log missing: %q. Full output:\n%s", want, outStr)
}
}

t.Logf("Shutdown completed in %v", shutdownDuration)

// (3) Shutdown must take at least minShutdownDuration: we held an in-flight connection
// (incomplete HTTP request); the server must wait for it before exiting.
if shutdownDuration < minShutdownDuration {
t.Errorf("Shutdown did not wait for in-flight connection: took %v, expected at least %v", shutdownDuration, minShutdownDuration)
}
// (4) Shutdown should complete under the 15s select timeout (server uses 1m timeout internally).
if shutdownDuration > 14*time.Second {
t.Errorf("Shutdown took too long: %v", shutdownDuration)
}
case <-time.After(15 * time.Second):
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
t.Fatalf("Shutdown did not complete within 15s. Output so far:\n%s", output.String())
}
}
8 changes: 6 additions & 2 deletions cmd/setup/backoffice/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log"
"net/http"
"time"

"github.com/IsaacDSC/gqueue/cmd/setup/middleware"
"github.com/IsaacDSC/gqueue/internal/app/backofficeapp"
Expand Down Expand Up @@ -53,8 +54,11 @@ func Start(
port := env.BackofficeApiPort

server := &http.Server{
Addr: port.String(),
Handler: handler,
Addr: port.String(),
Handler: handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}

log.Printf("[*] Starting Backoffice server on :%d", port)
Expand Down
17 changes: 4 additions & 13 deletions cmd/setup/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ package pubsub
import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"

"cloud.google.com/go/pubsub"
Expand All @@ -18,11 +15,7 @@ import (
)

func (s *Service) consumer(ctx context.Context, env cfg.Config) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, os.Interrupt)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Use only the context passed from main for shutdown control

concurrency := env.AsynqConfig.Concurrency

Expand Down Expand Up @@ -104,12 +97,10 @@ func (s *Service) consumer(ctx context.Context, env cfg.Config) {

log.Println("[*] starting worker with configs")
log.Println("[*] wq.concurrency", (len(handlers))*concurrency)
log.Println("[*] Worker started. Press Ctrl+C to gracefully shutdown...")

<-sigChan
log.Println("[*] Received shutdown signal, initiating graceful shutdown...")
log.Println("[*] Worker started. Waiting for shutdown signal from context...")

cancel()
<-ctx.Done()
log.Println("[*] Context cancelled, initiating graceful shutdown...")

done := make(chan struct{})
go func() {
Expand Down
Loading