feat: [1/3] refactor pipeline into shared package with step-aware log streaming#182
Conversation
…ming Extract pipeline stage execution from cli/cmd/start_pipeline.go into a reusable pkg/pipeline package with a Runner that orchestrates stage execution, discovers sub-steps from the IDE server's pipeline status API, and streams logs per-step via SSE. - Add pkg/pipeline with Runner, Client interface, and step discovery - Add StreamLogs SSE method to api/workspace.go for real-time log output - Add StreamLogs to cli/cmd Client interface - Refactor start_pipeline.go to delegate to pipeline.Runner - Add pipeline streaming unit tests (single/multi-step, no-stream cases) - Update .mockery.yml for pipeline mock generation Signed-off-by: Alex <132889147+alexvcodesphere@users.noreply.github.com>
NautiluX
left a comment
There was a problem hiding this comment.
Thanks for opening the PR, I think it should orient itself a bit better on the rest of the codebase, e.g. reusing existing functionality for log streaming instead of implementing it again.
In general I think functions tend to be lengthy and overly complex, I recommend trying to simplify them for improved readability. The handling of the log streaming with timeout specifically looks very complex. I know it's not easy to get parallel code working without losing readability, but I think you can achieve a bit more simplicity.
| // log entries to the provided writer until the context is cancelled or the | ||
| // stream ends. This is used during pipeline execution to provide real-time | ||
| // log output. | ||
| func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { |
There was a problem hiding this comment.
We already have a pretty similar function here:
Line 171 in 900cc71
it's not beautiful either but we shouldn't reimplement it but reuse/refactor.
| for _, stage := range stages { | ||
| // Sync the landscape before the run stage | ||
| if stage == "run" { | ||
| fmt.Println(" 🔄 Syncing landscape...") |
There was a problem hiding this comment.
please use the log framework instead of fmt
| // Step-aware log streaming for non-run stages. | ||
| // Each step gets its own context; when a new step is discovered the | ||
| // previous step's stream is cancelled and drained before moving on. | ||
| streamEnabled := stage != "run" && cfg.ApiUrl != "" |
There was a problem hiding this comment.
how is the apiurl related to the log streaming?
| err := r.waitForStageWithStepCallback(wsId, stage, cfg, startStreamForStep) | ||
|
|
||
| // Drain final step's logs | ||
| drainStream() |
| mockClient.EXPECT().GetPipelineState(wsId, "prepare").Return([]api.PipelineStatus{ | ||
| statusWithSteps("codesphere-ide", "success", "success"), | ||
| }, nil).NotBefore(startCall) | ||
| // StreamLogs should NOT be called — mockery will fail if it is |
There was a problem hiding this comment.
| // StreamLogs should NOT be called — mockery will fail if it is | |
| // StreamLogs should NOT be called |
| go func() { | ||
| defer stepWg.Done() | ||
| if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil { | ||
| _, _ = fmt.Fprintf(os.Stderr, "⚠ log stream error (step %d): %v\n", step, err) |
There was a problem hiding this comment.
what is the expected behavior of this? We just ignore the error and go to the next step?
| startStreamForStep := func(step int, totalSteps int) { | ||
| if !streamEnabled || step <= streamingStep { | ||
| return | ||
| } | ||
|
|
||
| // Drain previous step before starting next | ||
| drainStream() | ||
|
|
||
| streamingStep = step | ||
| fmt.Printf("\n 📋 Step %d/%d\n", step+1, totalSteps) | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| stepCancel = cancel | ||
| stepWg.Add(1) | ||
| go func() { | ||
| defer stepWg.Done() | ||
| if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil { | ||
| _, _ = fmt.Fprintf(os.Stderr, "⚠ log stream error (step %d): %v\n", step, err) | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
why is this function defined inside of the function? I think it deserves a private but real function for improved readability of the runStage function.
| if onStep != nil { | ||
| for _, s := range status { | ||
| if s.Server == IdeServer { | ||
| total := len(s.Steps) | ||
| for i, step := range s.Steps { | ||
| if step.State == "running" || step.State == "success" { | ||
| onStep(i, total) | ||
| } | ||
| } | ||
| break | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
I wonder if we can find a simpler way of implementing this functionality.
| ctx, cancel := context.WithCancel(context.Background()) | ||
| stepCancel = cancel | ||
| stepWg.Add(1) | ||
| go func() { | ||
| defer stepWg.Done() | ||
| if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil { | ||
| _, _ = fmt.Fprintf(os.Stderr, "⚠ log stream error (step %d): %v\n", step, err) | ||
| } | ||
| }() |
There was a problem hiding this comment.
so there are 2 things happening in parallel here, on one hand we start the stream and for the timeout we have the waiting function that would cancel this go func when the timeout is reached, right?
I'm wondering if we can not achieve the intended behavior without this go func, or hiding the cancel after time X inside the streamlogs function that anyways takes a context here.
| stepWg.Add(1) | ||
| go func() { | ||
| defer stepWg.Done() | ||
| if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil { |
There was a problem hiding this comment.
why does this endpoint need an API endpoint but none of the others?
PR 1/3: Pipeline Refactor + Streaming
Extract pipeline stage execution from
cli/cmd/start_pipeline.gointo a reusablepkg/pipelinepackage.Changes
pkg/pipeline/pipeline.go—Runnerorchestrates stage execution, discovers sub-steps from IDE server's pipeline status API, streams logs per-step via SSEapi/workspace.go—StreamLogsSSE method for real-time log outputcli/cmd/start_pipeline.go— Refactored to delegate topipeline.Runnercli/cmd/client.go— AddedStreamLogsto Client interfacepkg/pipeline/pipeline_test.go— 4 streaming unit testsMerge Order
pkg/deploypackagecs deploy githubCLI command