Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ hooks.yaml
.env
.claude/scheduled_tasks.lock
.playwright-mcp/
/hooksctl
297 changes: 279 additions & 18 deletions cmd/hooksctl/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,55 @@
"syscall"
"time"

tea "charm.land/bubbletea/v2"
xterm "github.com/charmbracelet/x/term"
"github.com/onebusaway/hooks/internal/push"
"github.com/onebusaway/hooks/internal/tui"
)

// forwardTestCtx is non-nil only in tests; production paths derive their
// own context from os signals.
var forwardTestCtx context.Context

const (
deliverySuffixMalformed = "malformed"
deliverySuffixTransportErr = "transport err"
deliverySuffixRetrying = "retrying"
deliverySuffixCancelled = "cancelled"
deliverySuffixErr = "err"
)

// errSkipEvent is returned when an event payload is permanently malformed.
// The caller advances the cursor past the broken event rather than reconnecting.
var errSkipEvent = errors.New("skip event")

type parsedEvent struct {
DeliveryID string
Headers map[string]string
Body []byte
}

func parseEventPayload(msg map[string]string) (parsedEvent, error) {
var raw struct {
DeliveryID string `json:"delivery_id"`
ProviderTimestamp time.Time `json:"provider_timestamp"`
Headers map[string]string `json:"headers"`
Body string `json:"body"`
}
if err := json.Unmarshal([]byte(msg["data"]), &raw); err != nil {
return parsedEvent{}, fmt.Errorf("%w: parse: %w", errSkipEvent, err)
}
bodyBytes, err := base64.StdEncoding.DecodeString(raw.Body)
if err != nil {
return parsedEvent{}, fmt.Errorf("%w: decode: %w", errSkipEvent, err)
}
delivID := raw.DeliveryID
if delivID == "" {
delivID = msg["id"]
}
return parsedEvent{DeliveryID: delivID, Headers: raw.Headers, Body: bodyBytes}, nil
}

func cmdForward(g globals, args []string) int {
fs := newFlagSet("forward")
to := fs.String("to", "", "local URL to POST every event to")
Expand Down Expand Up @@ -84,16 +126,20 @@

cli := &http.Client{Timeout: *timeout}

if xterm.IsTerminal(os.Stdout.Fd()) {
return runWithTUI(ctx, cancel, g, source, *to, subscribeToken, cursorPath, &startCursor, cli, *exitOnError)
}

for {
if err := streamFromCursor(ctx, g, subscribeToken, source, &startCursor, cursorPath, *to, cli, *exitOnError); err != nil {
if ctx.Err() != nil {
return 0
}
if *exitOnError {
fmt.Fprintln(os.Stderr, "forward:", err)

Check failure on line 139 in cmd/hooksctl/forward.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "forward:" 3 times.

See more on https://sonarcloud.io/project/issues?id=OneBusAway_hooks&issues=AZ4ktrIflhLVGXtYZ7k0&open=AZ4ktrIflhLVGXtYZ7k0&pullRequest=10
return 1
}
// Backoff capped at 60s; mirrors push policy.
// Fixed random reconnect delay in [500ms, 2.5s); see attemptBackoff for per-delivery retry.
delay := backoff()
fmt.Fprintf(os.Stderr, "forward: %v; reconnecting in %s\n", err, delay)
select {
Expand All @@ -107,7 +153,214 @@
}
}

func streamFromCursor(ctx context.Context, g globals, bearer, source string, cursor *int64, cursorPath, to string, cli *http.Client, exitOnError bool) error {
// runWithTUI runs the forward loop in a goroutine and drives a Bubble Tea TUI
// in the foreground. cancel is called by the TUI when the user quits.
func runWithTUI(ctx context.Context, cancel context.CancelFunc, g globals, source, to, subscribeToken, cursorPath string, cursor *int64, cli *http.Client, exitOnError bool) int {

Check failure on line 158 in cmd/hooksctl/forward.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=OneBusAway_hooks&issues=AZ4ktrIflhLVGXtYZ7k1&open=AZ4ktrIflhLVGXtYZ7k1&pullRequest=10

Check warning on line 158 in cmd/hooksctl/forward.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 10 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=OneBusAway_hooks&issues=AZ4ktrIflhLVGXtYZ7k2&open=AZ4ktrIflhLVGXtYZ7k2&pullRequest=10
prefix, suffix := tokenFingerprint(subscribeToken)
baseSession := tui.SessionInfo{
State: tui.StateOnline,
UptimeStart: time.Now(),
ForwardURL: strings.TrimRight(g.Server, "/") + "/subscribe/" + source,
TargetURL: to,
TokenPrefix: prefix,
TokenSuffix: suffix,
Scopes: []string{source},
}

model := tui.New(baseSession, cancel)
prog := tea.NewProgram(model, tea.WithContext(ctx))

errCh := make(chan error, 1)
go func() {
reconnectCount := 0
for {
info := baseSession
info.ReconnectCount = reconnectCount
prog.Send(tui.SessionStateMsg{Info: info})

err := streamFromCursorTUI(ctx, prog, g, subscribeToken, source, cursor, cursorPath, to, cli, exitOnError)
if err == nil && ctx.Err() == nil {
// Server closed the stream cleanly; mirror the non-TUI auto-exit behavior.
info := baseSession
info.State = tui.StateOffline
prog.Send(tui.SessionStateMsg{Info: info})
prog.Send(tui.QuitMsg{})
return
}
if err == nil || ctx.Err() != nil {
return
}

if exitOnError {
errCh <- err
prog.Send(tui.QuitMsg{})
return
}

reconnectCount++
info = baseSession
info.State = tui.StateReconnecting
info.ReconnectCount = reconnectCount
prog.Send(tui.SessionStateMsg{Info: info})

d := backoff()
select {
case <-ctx.Done():
return
case <-time.After(d):
}
}
}()

if _, err := prog.Run(); err != nil {
if ctx.Err() != nil {
return 0
}
cancel()
fmt.Fprintln(os.Stderr, "forward:", err)
return 1
}
select {
case err := <-errCh:
fmt.Fprintln(os.Stderr, "forward:", err)
return 1
default:
return 0
}
}

func streamFromCursorTUI(ctx context.Context, prog *tea.Program, g globals, bearer, source string, cursor *int64, cursorPath, to string, cli *http.Client, exitOnError bool) error {

Check warning on line 232 in cmd/hooksctl/forward.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 10 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=OneBusAway_hooks&issues=AZ4ktrIflhLVGXtYZ7k3&open=AZ4ktrIflhLVGXtYZ7k3&pullRequest=10
return streamFromCursorWith(ctx, g, bearer, source, cursor, cursorPath, func(ctx context.Context, msg map[string]string) error {
return forwardOneTUI(ctx, prog, cli, to, msg, source, exitOnError)
})
}

func forwardOneTUI(ctx context.Context, prog *tea.Program, cli *http.Client, to string, msg map[string]string, source string, exitOnError bool) error {

Check failure on line 238 in cmd/hooksctl/forward.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 33 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=OneBusAway_hooks&issues=AZ4ktrIflhLVGXtYZ7k4&open=AZ4ktrIflhLVGXtYZ7k4&pullRequest=10
p, err := parseEventPayload(msg)
if err != nil {
prog.Send(tui.DeliveryReceivedMsg{Delivery: tui.Delivery{
ID: msg["id"],
RecvAt: time.Now(),
Method: http.MethodPost,
Path: "/" + source,
Source: source,
Suffix: deliverySuffixMalformed,
}})
return err
}

recv := tui.Delivery{
ID: p.DeliveryID,
RecvAt: time.Now(),
Method: http.MethodPost,
Path: "/" + source,
Source: source,
InFlight: true,
SizeBytes: int64(len(p.Body)),
}
prog.Send(tui.DeliveryReceivedMsg{Delivery: recv})

start := time.Now()
var finalStatus int
var forwardErr error

for attempt := 0; ; attempt++ {
if attempt > 0 {
prog.Send(tui.DeliveryReceivedMsg{Delivery: recv})
}
finalStatus = 0

req, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewReader(p.Body))
if err != nil {
forwardErr = err
break
}
for k, v := range p.Headers {
if push.IsHopByHop(k) {
continue
}
req.Header.Set(k, v)
}
req.Header.Set("X-Hooks-Delivery-Id", p.DeliveryID)
req.Header.Set("X-Hooks-Sequence", msg["id"])
req.Header.Set("X-Hooks-Source", msg["event"])

resp, err := cli.Do(req)
if err != nil {
if ctx.Err() != nil {
forwardErr = ctx.Err()
break
}
if exitOnError {
forwardErr = fmt.Errorf("transport: %w", err)
break
}
prog.Send(tui.DeliveryCompletedMsg{
ID: p.DeliveryID,
DurationMS: time.Since(start).Milliseconds(),
Suffix: deliverySuffixTransportErr,
})
if !sleepWithCtx(ctx, attemptBackoff(attempt)) {
forwardErr = ctx.Err()
break
}
continue
}
_ = resp.Body.Close()
finalStatus = resp.StatusCode
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
break
}
if exitOnError {
forwardErr = fmt.Errorf("target returned %d", resp.StatusCode)
break
}
prog.Send(tui.DeliveryCompletedMsg{
ID: p.DeliveryID,
Status: resp.StatusCode,
DurationMS: time.Since(start).Milliseconds(),
Suffix: deliverySuffixRetrying,
})
if !sleepWithCtx(ctx, attemptBackoff(attempt)) {
forwardErr = ctx.Err()
break
}
}

suffix := ""
if forwardErr != nil {
if ctx.Err() != nil {
suffix = deliverySuffixCancelled
} else {
suffix = deliverySuffixErr
}
}

prog.Send(tui.DeliveryCompletedMsg{
ID: p.DeliveryID,
Status: finalStatus,
DurationMS: time.Since(start).Milliseconds(),
Suffix: suffix,
})

return forwardErr
}

// tokenFingerprint returns the first 6 and last 3 characters of a token.
func tokenFingerprint(token string) (prefix, suffix string) {
r := []rune(token)
if len(r) > 9 {
return string(r[:6]), string(r[len(r)-3:])
}
if len(r) > 3 {
return string(r[:3]), string(r[len(r)-3:])
}
return token, ""
}

// streamFromCursorWith opens an SSE subscription and calls handle for each event.
// Malformed events that return errSkipEvent have their cursor advanced and are skipped.
func streamFromCursorWith(ctx context.Context, g globals, bearer, source string, cursor *int64, cursorPath string, handle func(context.Context, map[string]string) error) error {

Check failure on line 363 in cmd/hooksctl/forward.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=OneBusAway_hooks&issues=AZ4ktrIflhLVGXtYZ7k5&open=AZ4ktrIflhLVGXtYZ7k5&pullRequest=10
endpoint := fmt.Sprintf("%s/subscribe/%s?since=%d", strings.TrimRight(g.Server, "/"), source, *cursor)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
Expand Down Expand Up @@ -136,15 +389,21 @@
}
seq, err := strconv.ParseInt(current["id"], 10, 64)
if err != nil {
current = map[string]string{}
clear(current)
continue
}
if err := forwardOne(ctx, cli, to, current, exitOnError); err != nil {
if err := handle(ctx, current); err != nil {
if errors.Is(err, errSkipEvent) {
*cursor = seq
saveCursor(cursorPath, seq)
clear(current)
continue
}
return err
}
*cursor = seq
saveCursor(cursorPath, seq)
current = map[string]string{}
clear(current)
case strings.HasPrefix(line, ":"):
// keepalive
default:
Expand All @@ -156,23 +415,21 @@
return scanner.Err()
}

func streamFromCursor(ctx context.Context, g globals, bearer, source string, cursor *int64, cursorPath, to string, cli *http.Client, exitOnError bool) error {
return streamFromCursorWith(ctx, g, bearer, source, cursor, cursorPath, func(ctx context.Context, msg map[string]string) error {
return forwardOne(ctx, cli, to, msg, exitOnError)
})
}

func forwardOne(ctx context.Context, cli *http.Client, to string, msg map[string]string, exitOnError bool) error {
var p struct {
DeliveryID string `json:"delivery_id"`
ProviderTimestamp time.Time `json:"provider_timestamp"`
Headers map[string]string `json:"headers"`
Body string `json:"body"`
}
if err := json.Unmarshal([]byte(msg["data"]), &p); err != nil {
return fmt.Errorf("parse event: %w", err)
}
bodyBytes, err := base64.StdEncoding.DecodeString(p.Body)
p, err := parseEventPayload(msg)
if err != nil {
return fmt.Errorf("decode body: %w", err)
fmt.Fprintf(os.Stderr, "forward: malformed event seq=%s: %v\n", msg["id"], err)
return err
}

for attempt := 0; ; attempt++ {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewReader(bodyBytes))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewReader(p.Body))
if err != nil {
return err
}
Expand Down Expand Up @@ -284,7 +541,11 @@
}

func saveCursor(path string, seq int64) {
_ = os.WriteFile(path, []byte(strconv.FormatInt(seq, 10)+"\n"), 0o600)
if err := os.WriteFile(path, []byte(strconv.FormatInt(seq, 10)+"\n"), 0o600); err != nil {
if !xterm.IsTerminal(os.Stdout.Fd()) {
fmt.Fprintf(os.Stderr, "forward: save cursor: %v\n", err)
}
}
}

// ephemeralListener is the in-memory record of a `kind='listener'`,
Expand Down
Loading
Loading