Skip to content

Commit 17d883f

Browse files
author
Pedro Santos
committed
feat: add support to use a bearer token for authenticated backends on the client
1 parent 3e976e5 commit 17d883f

4 files changed

Lines changed: 283 additions & 10 deletions

File tree

cmd/client/main.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@ import (
2727
"net/http"
2828
"net/url"
2929
"os"
30+
"path/filepath"
3031
"strings"
32+
"sync"
3133
"time"
3234

3335
"github.com/Showmax/go-fqdn"
3436
"github.com/alecthomas/kingpin/v2"
3537
"github.com/cenkalti/backoff/v4"
38+
"github.com/fsnotify/fsnotify"
3639
"github.com/prometheus-community/pushprox/util"
3740
"github.com/prometheus/client_golang/prometheus"
3841
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -47,7 +50,7 @@ var (
4750
tlsCert = kingpin.Flag("tls.cert", "<cert> Client certificate file").String()
4851
tlsKey = kingpin.Flag("tls.key", "<key> Private key file").String()
4952
metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String()
50-
53+
bearerTokenPath = kingpin.Flag("bearer-token-path", "<path> Path to file containing bearer token to authenticate requests").String()
5154
retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration()
5255
retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration()
5356
)
@@ -71,6 +74,10 @@ var (
7174
Help: "Number of poll errors",
7275
},
7376
)
77+
// bearerToken holds the current token string used for authentication.
78+
// Access must be synchronized to avoid race conditions between the watcher and HTTP handlers.
79+
bearerToken string
80+
bearerTokenMutex sync.RWMutex
7481
)
7582

7683
func init() {
@@ -114,6 +121,14 @@ func (c *Coordinator) doScrape(request *http.Request, client *http.Client) {
114121
c.handleErr(request, client, err)
115122
return
116123
}
124+
bearerTokenMutex.RLock()
125+
token := bearerToken
126+
bearerTokenMutex.RUnlock()
127+
128+
if token != "" {
129+
request.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
130+
}
131+
117132
ctx, cancel := context.WithTimeout(request.Context(), timeout)
118133
defer cancel()
119134
request = request.WithContext(ctx)
@@ -225,6 +240,56 @@ func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) {
225240
}
226241
}
227242

243+
// I need to automatically reload the bearer token if it changes
244+
// i.e. if it a short lived kubernetes token one
245+
func watchBearerTokenFile(path string, logger *slog.Logger) {
246+
loadBearerToken := func() {
247+
tokenBytes, err := os.ReadFile(path)
248+
if err != nil {
249+
logger.Error("Failed to read bearer token", "path", path, "err", err)
250+
return
251+
}
252+
bearerTokenMutex.Lock()
253+
bearerToken = strings.TrimSpace(string(tokenBytes)) // also trim spaces/newlines
254+
bearerTokenMutex.Unlock()
255+
logger.Info("Bearer token loaded from file", "path", path)
256+
}
257+
258+
loadBearerToken() // initial load
259+
260+
watcher, err := fsnotify.NewWatcher()
261+
if err != nil {
262+
logger.Error("Failed to create fsnotify watcher", "err", err)
263+
os.Exit(1)
264+
}
265+
defer watcher.Close()
266+
267+
tokenDir := filepath.Dir(path)
268+
if err := watcher.Add(tokenDir); err != nil {
269+
logger.Error("Failed to watch token directory", "dir", tokenDir, "err", err)
270+
os.Exit(1)
271+
}
272+
273+
for {
274+
select {
275+
case event, ok := <-watcher.Events:
276+
if !ok {
277+
return
278+
}
279+
if event.Name == path &&
280+
(event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create) {
281+
logger.Info("Bearer token file changed, reloading", "event", event)
282+
loadBearerToken()
283+
}
284+
case err, ok := <-watcher.Errors:
285+
if !ok {
286+
return
287+
}
288+
logger.Warn("fsnotify error", "err", err)
289+
}
290+
}
291+
}
292+
228293
func main() {
229294
promslogConfig := promslog.Config{}
230295
flag.AddFlags(kingpin.CommandLine, &promslogConfig)
@@ -276,6 +341,11 @@ func main() {
276341
}()
277342
}
278343

344+
// Set bearer token based on path
345+
if *bearerTokenPath != "" {
346+
go watchBearerTokenFile(*bearerTokenPath, coordinator.logger)
347+
}
348+
279349
transport := &http.Transport{
280350
Proxy: http.ProxyFromEnvironment,
281351
DialContext: (&net.Dialer{

cmd/client/main_test.go

Lines changed: 209 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,104 @@
1414
package main
1515

1616
import (
17+
"bytes"
1718
"errors"
1819
"fmt"
20+
"io"
1921
"net/http"
2022
"net/http/httptest"
23+
"net/url"
24+
"os"
25+
"strings"
26+
"sync"
2127
"testing"
28+
"time"
2229

30+
"github.com/cenkalti/backoff/v4"
2331
"github.com/prometheus/common/promslog"
2432
)
2533

2634
func prepareTest() (*httptest.Server, Coordinator) {
35+
// This test server acts as the proxyURL
2736
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
28-
w.WriteHeader(http.StatusOK)
29-
fmt.Fprintln(w, "GET /index.html HTTP/1.0\n\nOK")
37+
switch r.URL.Path {
38+
case "/poll":
39+
// On /poll, respond with an HTTP request serialized in the body
40+
var buf bytes.Buffer
41+
req, _ := http.NewRequest("GET", fmt.Sprintf("http://%s/", *myFqdn), nil)
42+
req.Header.Set("id", "test-scrape-id")
43+
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10")
44+
req.Write(&buf)
45+
w.WriteHeader(http.StatusOK)
46+
_, _ = w.Write(buf.Bytes())
47+
case "/push":
48+
// Accept pushed scrape results, just respond OK
49+
io.Copy(io.Discard, r.Body)
50+
w.WriteHeader(http.StatusOK)
51+
default:
52+
w.WriteHeader(http.StatusNotFound)
53+
}
3054
}))
55+
3156
c := Coordinator{logger: promslog.NewNopLogger()}
32-
*proxyURL = ts.URL
57+
*proxyURL = ts.URL + "/"
58+
*myFqdn = "test.local" // Set fqdn to test.local for matching hostnames
59+
3360
return ts, c
3461
}
3562

36-
func TestDoScrape(t *testing.T) {
63+
func TestDoScrape_Success(t *testing.T) {
3764
ts, c := prepareTest()
3865
defer ts.Close()
3966

40-
req, err := http.NewRequest("GET", ts.URL, nil)
67+
// Setup a test target server that will be scraped by doScrape
68+
targetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
69+
// Verify Authorization header if set
70+
auth := r.Header.Get("Authorization")
71+
if auth != "" && auth != "Bearer dummy-token" {
72+
t.Errorf("unexpected Authorization header: %s", auth)
73+
}
74+
w.Header().Set("Content-Type", "text/plain")
75+
w.WriteHeader(http.StatusOK)
76+
fmt.Fprintln(w, "OK")
77+
}))
78+
defer targetServer.Close()
79+
80+
// Override myFqdn to match targetServer hostname
81+
u, err := url.Parse(targetServer.URL)
82+
if err != nil {
83+
t.Fatal(err)
84+
}
85+
*myFqdn = u.Hostname()
86+
87+
// Prepare a scrape request targeting the test target server
88+
req, err := http.NewRequest("GET", targetServer.URL, nil)
89+
if err != nil {
90+
t.Fatal(err)
91+
}
92+
req.Header.Set("id", "scrape-id-123")
93+
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10")
94+
95+
// Set bearerToken for authorization testing
96+
bearerTokenMutex.Lock()
97+
bearerToken = "dummy-token"
98+
bearerTokenMutex.Unlock()
99+
100+
c.doScrape(req, targetServer.Client())
101+
}
102+
103+
func TestDoScrape_FailWrongFQDN(t *testing.T) {
104+
ts, c := prepareTest()
105+
defer ts.Close()
106+
107+
req, err := http.NewRequest("GET", "http://wronghost.local", nil)
41108
if err != nil {
42109
t.Fatal(err)
43110
}
44-
req.Header.Add("X-Prometheus-Scrape-Timeout-Seconds", "10.0")
45-
*myFqdn = ts.URL
111+
req.Header.Set("id", "fail-id")
112+
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10")
113+
114+
// This should cause handleErr due to fqdn mismatch
46115
c.doScrape(req, ts.Client())
47116
}
48117

@@ -57,10 +126,141 @@ func TestHandleErr(t *testing.T) {
57126
c.handleErr(req, ts.Client(), errors.New("test error"))
58127
}
59128

60-
func TestLoop(t *testing.T) {
129+
func TestDoPush_ErrorOnInvalidProxyURL(t *testing.T) {
130+
c := Coordinator{logger: promslog.NewNopLogger()}
131+
*proxyURL = "http://%41:8080" // invalid URL (percent-encoding issue)
132+
133+
resp := &http.Response{
134+
StatusCode: http.StatusOK,
135+
Body: io.NopCloser(strings.NewReader("test")),
136+
Header: http.Header{},
137+
}
138+
req, _ := http.NewRequest("GET", "http://example.com", nil)
139+
err := c.doPush(resp, req, http.DefaultClient)
140+
if err == nil {
141+
t.Errorf("expected error on invalid proxy URL, got nil")
142+
}
143+
}
144+
145+
func TestDoPoll(t *testing.T) {
61146
ts, c := prepareTest()
62147
defer ts.Close()
63-
if err := c.doPoll(ts.Client()); err != nil {
148+
149+
err := c.doPoll(ts.Client())
150+
if err != nil {
151+
t.Fatalf("doPoll failed: %v", err)
152+
}
153+
}
154+
155+
func TestLoopWithBackoff(t *testing.T) {
156+
var count int
157+
var mu sync.Mutex
158+
done := make(chan struct{})
159+
var once sync.Once
160+
161+
bo := backoffForTest(3)
162+
163+
go func() {
164+
err := backoff.RetryNotify(func() error {
165+
mu.Lock()
166+
defer mu.Unlock()
167+
count++
168+
if count > 2 {
169+
// safe close
170+
once.Do(func() { close(done) })
171+
return errors.New("forced error to stop retry")
172+
}
173+
return errors.New("temporary error")
174+
}, bo, func(err error, d time.Duration) {
175+
// No-op
176+
})
177+
178+
if err != nil {
179+
// safe even if already closed
180+
once.Do(func() { close(done) })
181+
}
182+
}()
183+
184+
select {
185+
case <-done:
186+
case <-time.After(1 * time.Second):
187+
t.Fatal("loop test timed out")
188+
}
189+
}
190+
191+
192+
func backoffForTest(maxRetries int) backoff.BackOff {
193+
b := backoff.NewExponentialBackOff()
194+
b.InitialInterval = 1 * time.Millisecond
195+
b.MaxInterval = 5 * time.Millisecond
196+
b.MaxElapsedTime = 10 * time.Millisecond
197+
return backoff.WithMaxRetries(b, uint64(maxRetries))
198+
}
199+
200+
func TestWatchBearerTokenFile(t *testing.T) {
201+
// This function is hard to test fully without fsnotify events,
202+
// but we can test the initial loading of the token file.
203+
204+
// Create a temporary file with a token
205+
tmpfile := t.TempDir() + "/tokenfile"
206+
tokenContent := "file-token\n"
207+
if err := os.WriteFile(tmpfile, []byte(tokenContent), 0600); err != nil {
208+
t.Fatal(err)
209+
}
210+
211+
logger := promslog.NewNopLogger()
212+
213+
// Run watchBearerTokenFile in a goroutine; it will load token initially
214+
go func() {
215+
// This will block watching the directory, so we only wait shortly
216+
watchBearerTokenFile(tmpfile, logger)
217+
}()
218+
219+
// Wait briefly for the token to load
220+
time.Sleep(100 * time.Millisecond)
221+
222+
bearerTokenMutex.RLock()
223+
defer bearerTokenMutex.RUnlock()
224+
if bearerToken != strings.TrimSpace(tokenContent) {
225+
t.Errorf("expected bearer token %q, got %q", strings.TrimSpace(tokenContent), bearerToken)
226+
}
227+
}
228+
229+
func TestBearerTokenHeader(t *testing.T) {
230+
token := "dummy-token"
231+
bearerTokenMutex.Lock()
232+
bearerToken = token
233+
bearerTokenMutex.Unlock()
234+
235+
var receivedToken string
236+
237+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
238+
receivedToken = r.Header.Get("Authorization")
239+
w.WriteHeader(http.StatusOK)
240+
}))
241+
defer ts.Close()
242+
243+
// Ensure myFqdn matches the test server's hostname
244+
u, err := url.Parse(ts.URL)
245+
if err != nil {
246+
t.Fatal(err)
247+
}
248+
*myFqdn = u.Hostname()
249+
250+
req, err := http.NewRequest("GET", ts.URL, nil)
251+
if err != nil {
64252
t.Fatal(err)
65253
}
254+
255+
// Set required headers for doScrape to accept this request
256+
req.Header.Set("id", "token-test-id")
257+
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", "10")
258+
259+
c := Coordinator{logger: promslog.NewNopLogger()}
260+
c.doScrape(req, ts.Client())
261+
262+
expected := "Bearer dummy-token"
263+
if receivedToken != expected {
264+
t.Fatalf("expected %q, got %q", expected, receivedToken)
265+
}
66266
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/Showmax/go-fqdn v1.0.0
77
github.com/alecthomas/kingpin/v2 v2.4.0
88
github.com/cenkalti/backoff/v4 v4.3.0
9+
github.com/fsnotify/fsnotify v1.9.0
910
github.com/google/uuid v1.6.0
1011
github.com/prometheus/client_golang v1.21.0
1112
github.com/prometheus/common v0.62.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
1313
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1414
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1515
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
16+
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
17+
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
1618
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
1719
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1820
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=

0 commit comments

Comments
 (0)