-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathqueue-middleware.go
More file actions
145 lines (121 loc) · 3.25 KB
/
queue-middleware.go
File metadata and controls
145 lines (121 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync/atomic"
"time"
"github.com/puzpuzpuz/xsync/v3"
"github.com/segmentio/fasthash/fnv1a"
"golang.org/x/sync/semaphore"
)
type queuedReq struct {
w http.ResponseWriter
r *http.Request
}
var buckets = func() [52]*semaphore.Weighted {
var s [52]*semaphore.Weighted
for i := range s {
s[i] = semaphore.NewWeighted(1)
}
return s
}()
var (
queueAcquireTimeoutError = errors.New("QAT")
redirectToCloudflareCacheHitMaybe = errors.New("RTCCHM")
requestCanceledAbortEverything = errors.New("RCAE")
serverUnderHeavyLoad = errors.New("SUHL")
)
var inCourse = xsync.NewMapOfWithHasher[uint64, struct{}](
func(key uint64, seed uint64) uint64 { return key },
)
func await(ctx context.Context) {
val := ctx.Value("ticket")
if val == nil {
return
}
code := val.(int)
reqNum := ctx.Value("reqNum").(uint64)
if _, ok := inCourse.LoadOrStore(reqNum, struct{}{}); ok {
// we've already acquired a semaphore for this request, no need to do it again
return
}
sem := buckets[code]
if sem.TryAcquire(1) {
// means we're the first to use this bucket
go func() {
// we'll release it after the request is answered
<-ctx.Done()
sem.Release(1)
}()
} else {
// otherwise someone else has already locked it, so we wait
acquireTimeout, cancel := context.WithTimeoutCause(ctx, time.Second*6, queueAcquireTimeoutError)
defer cancel()
err := sem.Acquire(acquireTimeout, 1)
if err == nil {
// got it soon enough
sem.Release(1)
panic(redirectToCloudflareCacheHitMaybe)
} else if context.Cause(acquireTimeout) == queueAcquireTimeoutError {
// took too long
panic(serverUnderHeavyLoad)
} else {
// request was canceled
panic(requestCanceledAbortEverything)
}
}
}
var reqNumSource atomic.Uint64
func queueMiddleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/favicon.ico" || strings.HasPrefix(r.URL.Path, "/njump/static/") {
next.ServeHTTP(w, r)
return
}
reqNum := reqNumSource.Add(1)
// these will be used when we later call await(ctx)
ticket := int(fnv1a.HashString64(r.URL.Path) % uint64(len(buckets)))
ctx := context.WithValue(
context.WithValue(
r.Context(),
"reqNum", reqNum,
),
"ticket", ticket,
)
defer func() {
err := recover()
if err == nil {
return
}
switch err {
// if we are not the first to request this we will wait for the underlying page to be loaded
// then we will be redirect to open it again, so hopefully we will hit the cloudflare cache this time
case redirectToCloudflareCacheHitMaybe:
path := r.URL.Path
if r.URL.RawQuery != "" {
path += "?" + r.URL.RawQuery
}
http.Redirect(w, r, path, http.StatusFound)
case serverUnderHeavyLoad:
w.WriteHeader(504)
w.Write([]byte("server under heavy load, please try again in a couple of seconds"))
return
case requestCanceledAbortEverything:
return
default:
trace := trackError(r, err)
w.WriteHeader(500)
fmt.Fprintf(w, "%s\n", err)
for _, line := range trace {
fmt.Fprintf(w, "%s\n", line)
}
}
}()
next.ServeHTTP(w, r.WithContext(ctx))
// cleanup this
inCourse.Delete(reqNum)
}
}