Skip to content

Commit b3d8f1b

Browse files
committed
Refactor webhook sender to add manager/preparer interfaces
- Split `sender.go` into orchestrator logic and Slack-specific implementation (`slack.go`) - Introduce `Manager` and `Preparer` interfaces to decouple specific platform logic
1 parent c769fdf commit b3d8f1b

5 files changed

Lines changed: 589 additions & 252 deletions

File tree

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package webhook
16+
17+
import (
18+
"context"
19+
"io"
20+
"net/http"
21+
"strings"
22+
"time"
23+
24+
"github.com/GoogleChrome/webstatus.dev/lib/workertypes"
25+
)
26+
27+
type mockHTTPClient struct {
28+
doFunc func(req *http.Request) (*http.Response, error)
29+
}
30+
31+
func (m *mockHTTPClient) Do(req *http.Request) (*http.Response, error) {
32+
return m.doFunc(req)
33+
}
34+
35+
type mockChannelStateManager struct {
36+
successCalls []successCall
37+
failureCalls []failureCall
38+
recordErr error
39+
}
40+
41+
type successCall struct {
42+
channelID string
43+
timestamp time.Time
44+
eventID string
45+
}
46+
47+
type failureCall struct {
48+
channelID string
49+
err error
50+
timestamp time.Time
51+
isPermanent bool
52+
eventID string
53+
}
54+
55+
func (m *mockChannelStateManager) RecordSuccess(_ context.Context, channelID string,
56+
timestamp time.Time, eventID string) error {
57+
m.successCalls = append(m.successCalls, successCall{channelID, timestamp, eventID})
58+
59+
return m.recordErr
60+
}
61+
62+
func (m *mockChannelStateManager) RecordFailure(_ context.Context, channelID string,
63+
err error, timestamp time.Time, isPermanent bool, eventID string) error {
64+
m.failureCalls = append(m.failureCalls, failureCall{channelID, err, timestamp, isPermanent, eventID})
65+
66+
return m.recordErr
67+
}
68+
69+
func newTestIncomingWebhookDeliveryJob(url string, wType workertypes.WebhookType,
70+
query string, summary []byte) workertypes.IncomingWebhookDeliveryJob {
71+
return workertypes.IncomingWebhookDeliveryJob{
72+
WebhookEventID: "evt-123",
73+
WebhookDeliveryJob: workertypes.WebhookDeliveryJob{
74+
ChannelID: "chan-1",
75+
WebhookURL: url,
76+
WebhookType: wType,
77+
SubscriptionID: "sub-456",
78+
Triggers: []workertypes.JobTrigger{},
79+
Metadata: workertypes.DeliveryMetadata{
80+
EventID: "evt-123",
81+
SearchID: "search-789",
82+
SearchName: "Test",
83+
Query: query,
84+
Frequency: workertypes.FrequencyWeekly,
85+
GeneratedAt: testGeneratedAt(),
86+
},
87+
SummaryRaw: summary,
88+
},
89+
}
90+
}
91+
92+
func newTestResponse(status int, body string) *http.Response {
93+
return &http.Response{
94+
StatusCode: status,
95+
Status: http.StatusText(status),
96+
Proto: "HTTP/1.1",
97+
ProtoMajor: 1,
98+
ProtoMinor: 1,
99+
Header: make(http.Header),
100+
Body: io.NopCloser(strings.NewReader(body)),
101+
ContentLength: int64(len(body)),
102+
TransferEncoding: []string{},
103+
Close: false,
104+
Uncompressed: false,
105+
Trailer: make(http.Header),
106+
Request: nil,
107+
TLS: nil,
108+
}
109+
}
110+
111+
func testGeneratedAt() time.Time {
112+
return time.Date(2026, 3, 12, 0, 0, 0, 0, time.UTC)
113+
}

workers/webhook/pkg/webhook/sender.go

Lines changed: 43 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,14 @@
1515
package webhook
1616

1717
import (
18-
"bytes"
1918
"context"
20-
"encoding/json"
2119
"errors"
2220
"fmt"
2321
"log/slog"
2422
"net/http"
25-
"net/url"
2623
"time"
2724

25+
"github.com/GoogleChrome/webstatus.dev/lib/event"
2826
"github.com/GoogleChrome/webstatus.dev/lib/workertypes"
2927
)
3028

@@ -52,97 +50,69 @@ func NewSender(httpClient HTTPClient, stateManager ChannelStateManager, frontend
5250
}
5351
}
5452

55-
type SlackPayload struct {
56-
Text string `json:"text"`
57-
}
53+
var (
54+
// ErrTransientWebhook is a transient failure that should be retried.
55+
ErrTransientWebhook = errors.New("transient webhook failure")
56+
// ErrPermanentWebhook is a permanent failure that should not be retried.
57+
ErrPermanentWebhook = errors.New("permanent webhook failure")
58+
)
5859

59-
type webhookPreparer interface {
60-
Prepare(ctx context.Context, job workertypes.IncomingWebhookDeliveryJob) (*http.Request, error)
60+
type webhookSender interface {
61+
Send(ctx context.Context) error
6162
}
6263

63-
type slackPreparer struct {
64-
frontendBaseURL string
64+
// Manager wraps the type-specific webhook logic.
65+
type Manager struct {
66+
sender webhookSender
6567
}
6668

67-
func (s *slackPreparer) Prepare(
68-
ctx context.Context, job workertypes.IncomingWebhookDeliveryJob) (*http.Request, error) {
69-
parsedURL, err := url.Parse(job.WebhookURL)
70-
if err != nil || parsedURL.Scheme != "https" || parsedURL.Host != "hooks.slack.com" {
71-
// Record permanent failure due to invalid URL
72-
return nil, fmt.Errorf("invalid webhook URL: %s", job.WebhookURL)
73-
}
74-
75-
var summary workertypes.EventSummary
76-
if err := json.Unmarshal(job.SummaryRaw, &summary); err != nil {
77-
return nil, fmt.Errorf("failed to unmarshal summary: %w", err)
78-
}
79-
80-
resultsURL := fmt.Sprintf("%s/features?q=%s", s.frontendBaseURL, url.QueryEscape(job.Metadata.Query))
81-
82-
payload := SlackPayload{
83-
Text: fmt.Sprintf("WebStatus.dev Notification: %s\nQuery: %s\nView Results: %s",
84-
summary.Text, job.Metadata.Query, resultsURL),
85-
}
86-
87-
payloadBytes, err := json.Marshal(payload)
88-
if err != nil {
89-
return nil, fmt.Errorf("failed to marshal slack payload: %w", err)
90-
}
69+
func (s *Sender) getManager(_ context.Context, job workertypes.IncomingWebhookDeliveryJob) (*Manager, error) {
70+
switch job.WebhookType {
71+
case workertypes.WebhookTypeSlack:
72+
slack, err := newSlackSender(s.frontendBaseURL, s.httpClient, job)
73+
if err != nil {
74+
return nil, err
75+
}
9176

92-
req, err := http.NewRequestWithContext(ctx, http.MethodPost, job.WebhookURL, bytes.NewBuffer(payloadBytes))
93-
if err != nil {
94-
return nil, fmt.Errorf("failed to create request: %w", err)
77+
return &Manager{sender: slack}, nil
78+
default:
79+
return nil, fmt.Errorf("%w: unsupported type %v", ErrPermanentWebhook, job.WebhookType)
9580
}
96-
req.Header.Set("Content-Type", "application/json")
97-
98-
return req, nil
9981
}
10082

10183
func (s *Sender) SendWebhook(ctx context.Context, job workertypes.IncomingWebhookDeliveryJob) error {
10284
slog.InfoContext(ctx, "sending webhook", "channelID", job.ChannelID, "url", job.WebhookURL)
10385

104-
var preparer webhookPreparer
105-
switch job.WebhookType {
106-
case workertypes.WebhookTypeSlack:
107-
preparer = &slackPreparer{frontendBaseURL: s.frontendBaseURL}
108-
default:
109-
err := fmt.Errorf("unsupported webhook type: %v", job.WebhookType)
110-
_ = s.stateManager.RecordFailure(ctx, job.ChannelID, err, time.Now(), true, job.WebhookEventID)
111-
112-
return err
113-
}
114-
115-
req, err := preparer.Prepare(ctx, job)
86+
mgr, err := s.getManager(ctx, job)
11687
if err != nil {
117-
// Preparation failures (like invalid payload or URL format) are typically permanent
118-
_ = s.stateManager.RecordFailure(ctx, job.ChannelID, err, time.Now(), true, job.WebhookEventID)
88+
// If we fail here, it's permanent when trying to get the manager.
89+
s.recordFailure(ctx, job, err, true)
11990

120-
return fmt.Errorf("failed to prepare webhook request: %w", err)
91+
return fmt.Errorf("failed to prepare webhook: %w", err)
12192
}
12293

123-
resp, err := s.httpClient.Do(req)
124-
if err != nil {
125-
// Transient error?
126-
_ = s.stateManager.RecordFailure(ctx, job.ChannelID, err, time.Now(), false, job.WebhookEventID)
94+
if err := mgr.sender.Send(ctx); err != nil {
95+
isTransient := errors.Is(err, ErrTransientWebhook)
96+
s.recordFailure(ctx, job, err, !isTransient)
97+
98+
if isTransient {
99+
return errors.Join(event.ErrTransientFailure, err)
100+
}
127101

128102
return fmt.Errorf("failed to send webhook: %w", err)
129103
}
130-
defer resp.Body.Close()
131-
132-
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
133-
// Success
134-
_ = s.stateManager.RecordSuccess(ctx, job.ChannelID, time.Now(), job.WebhookEventID)
135104

136-
return nil
105+
if err := s.stateManager.RecordSuccess(ctx, job.ChannelID, time.Now(), job.WebhookEventID); err != nil {
106+
slog.WarnContext(ctx, "failed to record success", "error", err)
137107
}
138108

139-
// Failure
140-
errorMsg := fmt.Sprintf("webhook returned status code %d", resp.StatusCode)
141-
webhookErr := errors.New(errorMsg)
142-
isPermanent := resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone ||
143-
resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden
144-
145-
_ = s.stateManager.RecordFailure(ctx, job.ChannelID, webhookErr, time.Now(), isPermanent, job.WebhookEventID)
109+
return nil
110+
}
146111

147-
return fmt.Errorf("webhook failed: %s", errorMsg)
112+
func (s *Sender) recordFailure(ctx context.Context, job workertypes.IncomingWebhookDeliveryJob,
113+
err error, permanent bool) {
114+
if dbErr := s.stateManager.RecordFailure(ctx, job.ChannelID, err, time.Now(),
115+
permanent, job.WebhookEventID); dbErr != nil {
116+
slog.ErrorContext(ctx, "failed to record failure", "error", dbErr)
117+
}
148118
}

0 commit comments

Comments
 (0)