Skip to content

Commit 4d5bba8

Browse files
feat: enhance Authentik flow import with validation and logging
- Added comprehensive logging throughout flow import process to track success/failure - Improved error handling by parsing response body for validation errors even on 200 OK status - Added flow existence verification after import with exponential backoff retry (5 attempts) - Enhanced retry logic for flow UUID lookups with increased attempts (5) and exponential delays - Added debug logging of rendered YAML preview to verify template
1 parent 2fe6b73 commit 4d5bba8

2 files changed

Lines changed: 147 additions & 22 deletions

File tree

pkg/authentik/flows.go

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010
"io"
1111
"mime/multipart"
1212
"net/http"
13+
14+
"github.com/uptrace/opentelemetry-go-extra/otelzap"
15+
"go.uber.org/zap"
1316
)
1417

1518
// FlowResponse represents an Authentik flow
@@ -202,25 +205,44 @@ func (c *APIClient) DeleteFlowBySlug(ctx context.Context, slug string) error {
202205

203206
// ImportFlow uploads a blueprint YAML definition for a flow.
204207
func (c *APIClient) ImportFlow(ctx context.Context, yaml []byte) error {
208+
logger := otelzap.Ctx(ctx)
209+
210+
logger.Debug("Importing Authentik flow",
211+
zap.Int("yaml_size_bytes", len(yaml)))
212+
205213
url := fmt.Sprintf("%s/api/v3/flows/instances/import/", c.BaseURL)
206214

207215
body := &bytes.Buffer{}
208216
writer := multipart.NewWriter(body)
209217
part, err := writer.CreateFormFile("file", "flow.yaml")
210218
if err != nil {
219+
logger.Error("Failed to create multipart form for flow import",
220+
zap.Error(err))
211221
return fmt.Errorf("failed to create multipart form: %w", err)
212222
}
213223

214224
if _, err := part.Write(yaml); err != nil {
225+
logger.Error("Failed to write flow blueprint to multipart form",
226+
zap.Error(err))
215227
return fmt.Errorf("failed to write flow blueprint: %w", err)
216228
}
217229

218230
if err := writer.Close(); err != nil {
231+
logger.Error("Failed to finalize multipart form",
232+
zap.Error(err))
219233
return fmt.Errorf("failed to finalize multipart form: %w", err)
220234
}
221235

236+
logger.Debug("Making flow import request",
237+
zap.String("url", url),
238+
zap.String("method", http.MethodPost),
239+
zap.String("content_type", writer.FormDataContentType()))
240+
222241
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
223242
if err != nil {
243+
logger.Error("Failed to create HTTP request for flow import",
244+
zap.String("url", url),
245+
zap.Error(err))
224246
return fmt.Errorf("failed to create request: %w", err)
225247
}
226248

@@ -229,14 +251,84 @@ func (c *APIClient) ImportFlow(ctx context.Context, yaml []byte) error {
229251

230252
resp, err := c.HTTPClient.Do(req)
231253
if err != nil {
254+
logger.Error("Flow import request failed",
255+
zap.String("url", url),
256+
zap.Error(err))
232257
return fmt.Errorf("flow import request failed: %w", err)
233258
}
234259
defer func() { _ = resp.Body.Close() }()
235260

261+
responseBody, readErr := io.ReadAll(io.LimitReader(resp.Body, 4096))
262+
if readErr != nil {
263+
logger.Error("Failed to read import response body",
264+
zap.Error(readErr))
265+
return fmt.Errorf("failed to read response body: %w", readErr)
266+
}
267+
268+
// CRITICAL: Log response body at INFO level so we can see what Authentik returned
269+
logger.Info("Received flow import response",
270+
zap.Int("status_code", resp.StatusCode),
271+
zap.Int("body_length", len(responseBody)),
272+
zap.String("response_body", string(responseBody))) // CRITICAL: Log actual response
273+
274+
// CRITICAL FIX: Parse response body on ALL status codes, not just errors
275+
// Authentik may return 200 OK but include validation errors in the response
276+
var importResponse struct {
277+
Success bool `json:"success"`
278+
Detail string `json:"detail"`
279+
Logs []string `json:"logs"`
280+
}
281+
282+
// Try to parse the response (may be JSON or empty on 204 No Content)
283+
if len(responseBody) > 0 {
284+
if err := json.Unmarshal(responseBody, &importResponse); err != nil {
285+
logger.Warn("Failed to parse import response as JSON",
286+
zap.Error(err),
287+
zap.String("raw_response", string(responseBody)))
288+
// Continue - may be plain text or empty response
289+
} else {
290+
logger.Info("Parsed import response",
291+
zap.Bool("success", importResponse.Success),
292+
zap.String("detail", importResponse.Detail),
293+
zap.Strings("logs", importResponse.Logs))
294+
}
295+
}
296+
297+
// Check for HTTP-level errors
236298
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusNoContent {
237-
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048))
238-
return fmt.Errorf("flow import failed with status %d: %s", resp.StatusCode, string(body))
239-
}
299+
// HTTP error status
300+
if importResponse.Detail != "" {
301+
logger.Error("Flow import failed with API error",
302+
zap.Int("status_code", resp.StatusCode),
303+
zap.String("error_detail", importResponse.Detail),
304+
zap.Strings("import_logs", importResponse.Logs))
305+
return fmt.Errorf("flow import failed with status %d: %s (logs: %v)", resp.StatusCode, importResponse.Detail, importResponse.Logs)
306+
}
307+
308+
logger.Error("Flow import failed",
309+
zap.Int("status_code", resp.StatusCode),
310+
zap.String("response_body", string(responseBody)))
311+
return fmt.Errorf("flow import failed with status %d: %s", resp.StatusCode, string(responseBody))
312+
}
313+
314+
// CRITICAL: Check response body for import validation errors even on 200 OK
315+
if importResponse.Success == false && importResponse.Detail != "" {
316+
logger.Error("Flow import returned success status but validation failed",
317+
zap.Int("status_code", resp.StatusCode),
318+
zap.String("error_detail", importResponse.Detail),
319+
zap.Strings("import_logs", importResponse.Logs))
320+
return fmt.Errorf("flow import validation failed: %s (logs: %v)", importResponse.Detail, importResponse.Logs)
321+
}
322+
323+
// Log import logs if available (may contain warnings or info)
324+
if len(importResponse.Logs) > 0 {
325+
logger.Info("Flow import completed with logs",
326+
zap.Strings("import_logs", importResponse.Logs))
327+
}
328+
329+
logger.Debug("Flow import successful",
330+
zap.Int("status_code", resp.StatusCode),
331+
zap.String("note", "Flow may take a few seconds to become queryable due to API indexing"))
240332

241333
return nil
242334
}

pkg/hecate/default_flows.go

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,12 @@ func EnableDefaultFlows(rc *eos_io.RuntimeContext, cfg *DefaultFlowsConfig) erro
148148
return fmt.Errorf("failed to render flow template %q: %w", flow.Name, err)
149149
}
150150

151+
// CRITICAL: Log rendered YAML preview to verify template interpolation
152+
logger.Debug("Rendered flow YAML",
153+
zap.String("flow", flow.Name),
154+
zap.Int("yaml_size", len(rendered)),
155+
zap.String("yaml_preview", string(rendered[:min(500, len(rendered))]))) // First 500 chars
156+
151157
if cfg.DryRun {
152158
logger.Info("[dry-run] Would import Authentik flow",
153159
zap.String("flow", flow.Name),
@@ -163,13 +169,31 @@ func EnableDefaultFlows(rc *eos_io.RuntimeContext, cfg *DefaultFlowsConfig) erro
163169
}
164170
}
165171

172+
logger.Info("Importing flow to Authentik",
173+
zap.String("flow", flow.Name),
174+
zap.String("slug", flow.Slug))
175+
166176
if err := client.ImportFlow(rc.Ctx, rendered); err != nil {
167177
return fmt.Errorf("failed to import flow %q: %w", flow.Name, err)
168178
}
169179

170-
logger.Info("✓ Flow imported",
180+
// CRITICAL: Verify flow actually exists after import
181+
// RATIONALE: Authentik may return 200 OK but not create the flow (validation errors, etc.)
182+
// Use retry to handle eventual consistency (API indexing lag)
183+
// RETRY STRATEGY: 5 attempts with 2s initial delay (2s, 4s, 8s, 16s, 32s = max 62s wait)
184+
verifiedFlow := getFlowWithRetry(rc, client, logger, flow.Slug, 5, 2*time.Second)
185+
if verifiedFlow == nil {
186+
logger.Error("Flow import reported success but flow does not exist",
187+
zap.String("flow_name", flow.Name),
188+
zap.String("slug", flow.Slug),
189+
zap.String("remediation", "Check Authentik logs and YAML syntax"))
190+
return fmt.Errorf("flow import verification failed: flow %q does not exist after import", flow.Name)
191+
}
192+
193+
logger.Info("✓ Flow imported and verified",
171194
zap.String("flow", flow.Name),
172-
zap.String("slug", flow.Slug))
195+
zap.String("slug", flow.Slug),
196+
zap.String("uuid", verifiedFlow.PK))
173197
importedSlugs = append(importedSlugs, flow.Slug)
174198
}
175199

@@ -202,11 +226,12 @@ func EnableDefaultFlows(rc *eos_io.RuntimeContext, cfg *DefaultFlowsConfig) erro
202226
// CRITICAL: Brand API requires flow UUIDs, not slugs
203227
// Lookup each flow to get its PK (UUID) with retry logic for eventual consistency
204228
// RATIONALE: Freshly imported flows may not be immediately queryable due to Authentik indexing
205-
authFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-authentication", appSlug), 3, 2*time.Second)
206-
enrollmentFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-enrollment", appSlug), 3, 2*time.Second)
207-
invalidationGlobalFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-invalidation-global", appSlug), 3, 2*time.Second)
208-
recoveryFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-recovery", appSlug), 3, 2*time.Second)
209-
unenrollmentFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-unenrollment", appSlug), 3, 2*time.Second)
229+
// RETRY STRATEGY: 5 attempts with exponential backoff (1s, 2s, 4s, 8s, 16s = max 31s wait)
230+
authFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-authentication", appSlug), 5, 1*time.Second)
231+
enrollmentFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-enrollment", appSlug), 5, 1*time.Second)
232+
invalidationGlobalFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-invalidation-global", appSlug), 5, 1*time.Second)
233+
recoveryFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-recovery", appSlug), 5, 1*time.Second)
234+
unenrollmentFlow := getFlowWithRetry(rc, client, logger, fmt.Sprintf("%s-unenrollment", appSlug), 5, 1*time.Second)
210235

211236
// Only update brand if we successfully looked up all flow UUIDs
212237
if authFlow != nil && enrollmentFlow != nil && invalidationGlobalFlow != nil && recoveryFlow != nil && unenrollmentFlow != nil {
@@ -963,41 +988,49 @@ entries:
963988
timeout: 30
964989
`
965990

966-
// getFlowWithRetry attempts to retrieve a flow with retry logic for eventual consistency
967-
// RATIONALE: Freshly imported flows may not be immediately queryable in Authentik
968-
// This is a timing issue - the flow exists but the index hasn't updated yet
969-
func getFlowWithRetry(rc *eos_io.RuntimeContext, client *authentik.APIClient, logger otelzap.LoggerWithCtx, slug string, maxRetries int, retryDelay time.Duration) *authentik.FlowResponse {
991+
// getFlowWithRetry attempts to retrieve a flow with retry logic and exponential backoff
992+
// RATIONALE: Freshly imported flows may not be immediately queryable in Authentik due to indexing lag
993+
// This is a timing issue - the flow exists but the API index hasn't updated yet
994+
// RETRY STRATEGY: Exponential backoff (1s, 2s, 4s, 8s, 16s) for up to 5 attempts (max 31s wait)
995+
func getFlowWithRetry(rc *eos_io.RuntimeContext, client *authentik.APIClient, logger otelzap.LoggerWithCtx, slug string, maxRetries int, initialDelay time.Duration) *authentik.FlowResponse {
996+
currentDelay := initialDelay
997+
970998
for attempt := 1; attempt <= maxRetries; attempt++ {
971999
flow, err := client.GetFlow(rc.Ctx, slug)
9721000
if err != nil {
973-
logger.Warn("Failed to lookup flow UUID",
1001+
logger.Warn("Failed to lookup flow UUID (API error)",
9741002
zap.String("slug", slug),
9751003
zap.Int("attempt", attempt),
9761004
zap.Int("max_retries", maxRetries),
9771005
zap.Error(err))
9781006
} else if flow != nil {
9791007
// Success - flow found
980-
logger.Debug("Found flow UUID",
1008+
logger.Debug("Flow UUID resolved",
9811009
zap.String("slug", slug),
9821010
zap.String("uuid", flow.PK),
1011+
zap.String("flow_name", flow.Name),
9831012
zap.Int("attempt", attempt))
9841013
return flow
9851014
}
9861015

9871016
// Flow not found yet (flow == nil, err == nil means "not found" per GetFlow contract)
9881017
if attempt < maxRetries {
989-
logger.Debug("Flow not found yet, waiting before retry",
1018+
logger.Debug("Flow not indexed yet, retrying with exponential backoff",
9901019
zap.String("slug", slug),
9911020
zap.Int("attempt", attempt),
9921021
zap.Int("max_retries", maxRetries),
993-
zap.Duration("retry_delay", retryDelay))
994-
time.Sleep(retryDelay)
1022+
zap.Duration("retry_delay", currentDelay),
1023+
zap.String("reason", "Authentik API eventual consistency"))
1024+
time.Sleep(currentDelay)
1025+
currentDelay *= 2 // Exponential backoff: 1s → 2s → 4s → 8s → 16s
9951026
}
9961027
}
9971028

998-
// All retries exhausted
999-
logger.Warn("Flow not found after all retries",
1029+
// All retries exhausted - flow may not exist or API is very slow
1030+
logger.Warn("Flow not found after all retries (may not exist or API indexing is slow)",
10001031
zap.String("slug", slug),
1001-
zap.Int("max_retries", maxRetries))
1032+
zap.Int("max_retries", maxRetries),
1033+
zap.Duration("total_wait_time", initialDelay*(1<<uint(maxRetries)-1)), // Sum of geometric series
1034+
zap.String("remediation", "Check if flow was actually imported successfully"))
10021035
return nil
10031036
}

0 commit comments

Comments
 (0)