Skip to content

Commit d604da2

Browse files
committed
feat(wfctl): add provider log capture
1 parent c79dc5d commit d604da2

15 files changed

Lines changed: 1338 additions & 360 deletions

cmd/wfctl/iac_typed_adapter.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ package main
2727
import (
2828
"context"
2929
"encoding/json"
30+
"errors"
3031
"fmt"
32+
"io"
3133
"log"
3234
"math"
35+
"strings"
3336
"time"
3437

3538
"google.golang.org/grpc"
@@ -53,6 +56,7 @@ const (
5356
iacServiceMigrationRepairer = "workflow.plugin.external.iac.IaCProviderMigrationRepairer"
5457
iacServiceValidator = "workflow.plugin.external.iac.IaCProviderValidator"
5558
iacServiceDriftConfigDetect = "workflow.plugin.external.iac.IaCProviderDriftConfigDetector"
59+
iacServiceLogCapture = "workflow.plugin.external.iac.IaCProviderLogCapture"
5660
iacServiceFinalizer = "workflow.plugin.external.iac.IaCProviderFinalizer"
5761
iacServiceResourceDriver = "workflow.plugin.external.iac.ResourceDriver"
5862
)
@@ -78,6 +82,7 @@ type typedIaCAdapter struct {
7882
repairer pb.IaCProviderMigrationRepairerClient
7983
validator pb.IaCProviderValidatorClient
8084
driftCfg pb.IaCProviderDriftConfigDetectorClient
85+
logCapture pb.IaCProviderLogCaptureClient
8186
finalizer pb.IaCProviderFinalizerClient
8287
resourceDriv pb.ResourceDriverClient
8388

@@ -117,6 +122,9 @@ func newTypedIaCAdapter(conn *grpc.ClientConn, registered map[string]bool) *type
117122
if registered[iacServiceDriftConfigDetect] {
118123
a.driftCfg = pb.NewIaCProviderDriftConfigDetectorClient(conn)
119124
}
125+
if registered[iacServiceLogCapture] {
126+
a.logCapture = pb.NewIaCProviderLogCaptureClient(conn)
127+
}
120128
if registered[iacServiceFinalizer] {
121129
a.finalizer = pb.NewIaCProviderFinalizerClient(conn)
122130
}
@@ -192,6 +200,13 @@ func (a *typedIaCAdapter) DriftConfigDetector() pb.IaCProviderDriftConfigDetecto
192200
return a.driftCfg
193201
}
194202

203+
// LogCapture returns the typed pb.IaCProviderLogCaptureClient or nil
204+
// when the plugin did not register IaCProviderLogCapture. Used by
205+
// `wfctl logs capture`.
206+
func (a *typedIaCAdapter) LogCapture() pb.IaCProviderLogCaptureClient {
207+
return a.logCapture
208+
}
209+
195210
// CredentialRevoker returns the typed
196211
// pb.IaCProviderCredentialRevokerClient or nil when the plugin did not
197212
// register IaCProviderCredentialRevoker. Used by
@@ -591,6 +606,42 @@ func (a *typedIaCAdapter) RepairDirtyMigration(ctx context.Context, req interfac
591606
return migrationRepairResultFromPB(resp.GetResult()), nil
592607
}
593608

609+
// CaptureLogs satisfies interfaces.LogCaptureProvider.
610+
func (a *typedIaCAdapter) CaptureLogs(ctx context.Context, req interfaces.LogCaptureRequest, sink interfaces.LogCaptureSink) error {
611+
if a.logCapture == nil {
612+
return unimplementedOptional(iacServiceLogCapture)
613+
}
614+
pbReq, err := logCaptureRequestToPB(req)
615+
if err != nil {
616+
return err
617+
}
618+
stream, err := a.logCapture.CaptureLogs(ctx, pbReq)
619+
if err != nil {
620+
return translateRPCErr(err)
621+
}
622+
for {
623+
chunk, recvErr := stream.Recv()
624+
if recvErr != nil {
625+
if errors.Is(recvErr, io.EOF) {
626+
return nil
627+
}
628+
return translateRPCErr(recvErr)
629+
}
630+
if sink != nil {
631+
if err := sink.WriteLogChunk(interfaces.LogChunk{
632+
Data: append([]byte(nil), chunk.GetData()...),
633+
Source: chunk.GetSource(),
634+
EOF: chunk.GetEof(),
635+
}); err != nil {
636+
return err
637+
}
638+
}
639+
if chunk.GetEof() {
640+
return nil
641+
}
642+
}
643+
}
644+
594645
// ─── typedResourceDriver (per-type ResourceDriver wrapper) ──────────────────
595646

596647
// typedResourceDriver implements interfaces.ResourceDriver on top of the
@@ -1261,6 +1312,46 @@ func migrationRepairResultFromPB(r *pb.MigrationRepairResult) *interfaces.Migrat
12611312
}
12621313
}
12631314

1315+
func logCaptureRequestToPB(r interfaces.LogCaptureRequest) (*pb.CaptureLogsRequest, error) {
1316+
tailLines := r.TailLines
1317+
if tailLines < 0 {
1318+
tailLines = 0
1319+
} else if tailLines > math.MaxInt32 {
1320+
tailLines = math.MaxInt32
1321+
}
1322+
logType, err := logCaptureTypeToPB(r.LogType)
1323+
if err != nil {
1324+
return nil, err
1325+
}
1326+
return &pb.CaptureLogsRequest{
1327+
ResourceName: r.ResourceName,
1328+
ResourceType: r.ResourceType,
1329+
ProviderId: r.ProviderID,
1330+
ComponentName: r.ComponentName,
1331+
LogType: logType,
1332+
TailLines: int32(tailLines), //nolint:gosec // G115: clamped above
1333+
Follow: r.Follow,
1334+
DurationSeconds: r.DurationSeconds,
1335+
DeploymentId: r.DeploymentID,
1336+
}, nil
1337+
}
1338+
1339+
func logCaptureTypeToPB(s string) (pb.LogCaptureType, error) {
1340+
switch strings.ToUpper(strings.TrimSpace(s)) {
1341+
case "BUILD":
1342+
return pb.LogCaptureType_LOG_CAPTURE_TYPE_BUILD, nil
1343+
case "DEPLOY":
1344+
return pb.LogCaptureType_LOG_CAPTURE_TYPE_DEPLOY, nil
1345+
case "", "RUN":
1346+
return pb.LogCaptureType_LOG_CAPTURE_TYPE_RUN, nil
1347+
case "RUN_RESTARTED":
1348+
return pb.LogCaptureType_LOG_CAPTURE_TYPE_RUN_RESTARTED, nil
1349+
default:
1350+
return pb.LogCaptureType_LOG_CAPTURE_TYPE_UNSPECIFIED,
1351+
fmt.Errorf("log capture: unsupported log type %q (want BUILD, DEPLOY, RUN, or RUN_RESTARTED)", s)
1352+
}
1353+
}
1354+
12641355
func timeToPB(t time.Time) *timestamppb.Timestamp {
12651356
if t.IsZero() {
12661357
return nil

cmd/wfctl/iac_typed_adapter_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"context"
2424
"errors"
2525
"net"
26+
"strings"
2627
"testing"
2728

2829
"google.golang.org/grpc"
@@ -127,6 +128,23 @@ func TestTypedAdapter_ValidatePlanReturnsNilWhenValidatorAbsent(t *testing.T) {
127128
}
128129
}
129130

131+
func TestTypedAdapter_CaptureLogsRejectsUnknownType(t *testing.T) {
132+
adapter := fixtureTypedAdapter{
133+
LogCapture: &pb.UnimplementedIaCProviderLogCaptureServer{},
134+
}.build(t)
135+
136+
err := adapter.CaptureLogs(context.Background(), interfaces.LogCaptureRequest{
137+
ResourceName: "app",
138+
LogType: "typo",
139+
}, nil)
140+
if err == nil {
141+
t.Fatal("expected unsupported log type error")
142+
}
143+
if !strings.Contains(err.Error(), "unsupported log type") {
144+
t.Fatalf("error = %q, want unsupported log type", err.Error())
145+
}
146+
}
147+
130148
// TestTypedAdapter_DriftClassEnumRoundTrip ensures every DriftClass
131149
// constant survives the proto-enum conversion in both directions —
132150
// regression guard against silent drop to DriftClassUnknown.

cmd/wfctl/iac_typed_fixture_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type fixtureTypedAdapter struct {
9797
Validator pb.IaCProviderValidatorServer
9898
DriftConfigDetect pb.IaCProviderDriftConfigDetectorServer
9999
ResourceDriver pb.ResourceDriverServer
100+
LogCapture pb.IaCProviderLogCaptureServer
100101
}
101102

102103
// build spins up a bufconn-backed gRPC server running f's set of services,
@@ -146,6 +147,10 @@ func (f fixtureTypedAdapter) build(t *testing.T) *typedIaCAdapter {
146147
pb.RegisterResourceDriverServer(server, f.ResourceDriver)
147148
registered[iacServiceResourceDriver] = true
148149
}
150+
if f.LogCapture != nil {
151+
pb.RegisterIaCProviderLogCaptureServer(server, f.LogCapture)
152+
registered[iacServiceLogCapture] = true
153+
}
149154

150155
go func() { _ = server.Serve(listener) }()
151156
t.Cleanup(server.Stop)

cmd/wfctl/logs.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"io"
8+
"os"
9+
"strings"
10+
"time"
11+
12+
"github.com/GoCodeAlone/workflow/config"
13+
"github.com/GoCodeAlone/workflow/interfaces"
14+
)
15+
16+
func runLogs(args []string) error {
17+
return runLogsWithOutput(args, os.Stdout)
18+
}
19+
20+
func runLogsWithOutput(args []string, out io.Writer) error {
21+
if len(args) < 1 {
22+
return logsUsage()
23+
}
24+
switch args[0] {
25+
case "capture":
26+
return runLogsCapture(args[1:], out)
27+
default:
28+
return logsUsage()
29+
}
30+
}
31+
32+
func logsUsage() error {
33+
fmt.Fprintf(flag.CommandLine.Output(), `Usage: wfctl logs <action> [options]
34+
35+
Actions:
36+
capture Capture provider logs for an infrastructure resource
37+
38+
Options:
39+
--config <file> Config file (default: infra.yaml or config/infra.yaml)
40+
--env <name> Environment name for provider config resolution
41+
--resource <name> infra.container_service resource name
42+
--component <name> Provider component name (for example App Platform service)
43+
--type <type> Log type: BUILD, DEPLOY, RUN, RUN_RESTARTED (default RUN)
44+
--tail <n> Tail line count (default 300)
45+
--follow Follow live logs until --duration expires
46+
--duration <d> Max follow duration (default 2m)
47+
--deployment <id> Provider deployment ID when supported
48+
--plugin-dir <dir> External plugin directory
49+
`)
50+
return fmt.Errorf("missing or unknown logs action")
51+
}
52+
53+
func runLogsCapture(args []string, out io.Writer) error {
54+
fs := flag.NewFlagSet("logs capture", flag.ContinueOnError)
55+
fs.SetOutput(flag.CommandLine.Output())
56+
var configFile, envName, resourceName, componentName, logType, deploymentID, pluginDir string
57+
var tailLines int
58+
var follow bool
59+
var duration time.Duration
60+
fs.StringVar(&configFile, "config", "", "Config file")
61+
fs.StringVar(&configFile, "c", "", "Config file")
62+
fs.StringVar(&envName, "env", "", "Environment name")
63+
fs.StringVar(&resourceName, "resource", "", "infra.container_service resource name")
64+
fs.StringVar(&componentName, "component", "", "Provider component name")
65+
fs.StringVar(&logType, "type", "RUN", "Log type")
66+
fs.IntVar(&tailLines, "tail", 300, "Tail line count")
67+
fs.BoolVar(&follow, "follow", false, "Follow live logs")
68+
fs.DurationVar(&duration, "duration", 2*time.Minute, "Max follow duration")
69+
fs.StringVar(&deploymentID, "deployment", "", "Provider deployment ID")
70+
fs.StringVar(&pluginDir, "plugin-dir", "", "External plugin directory")
71+
if err := fs.Parse(args); err != nil {
72+
return err
73+
}
74+
if resourceName == "" {
75+
return fmt.Errorf("logs capture: --resource is required")
76+
}
77+
normalizedLogType, err := normalizeLogCaptureType(logType)
78+
if err != nil {
79+
return err
80+
}
81+
cfgFile, err := resolveInfraConfig(fs, configFile)
82+
if err != nil {
83+
return err
84+
}
85+
cfg, err := config.LoadFromFile(cfgFile)
86+
if err != nil {
87+
return fmt.Errorf("load config: %w", err)
88+
}
89+
spec, providerRef, err := resolveLogCaptureResource(cfg, envName, resourceName)
90+
if err != nil {
91+
return err
92+
}
93+
providerDefs, _, disabled := resolveProviderDefs(cfg, envName)
94+
if _, ok := disabled[providerRef]; ok {
95+
return fmt.Errorf("logs capture: provider %q is disabled for environment %q", providerRef, envName)
96+
}
97+
def, ok := providerDefs[providerRef]
98+
if !ok || def.provType == "" {
99+
return fmt.Errorf("logs capture: resource %q references unknown iac.provider %q", resourceName, providerRef)
100+
}
101+
102+
prevPluginDir := currentInfraPluginDir
103+
currentInfraPluginDir = pluginDir
104+
defer func() { currentInfraPluginDir = prevPluginDir }()
105+
106+
ctx := context.Background()
107+
if follow && duration > 0 {
108+
var cancel context.CancelFunc
109+
ctx, cancel = context.WithTimeout(ctx, duration)
110+
defer cancel()
111+
}
112+
durationSeconds := int64(0)
113+
if follow {
114+
durationSeconds = int64(duration / time.Second)
115+
}
116+
provider, closer, err := resolveIaCProvider(ctx, def.provType, def.provCfg)
117+
if err != nil {
118+
return fmt.Errorf("load provider %q: %w", def.provType, err)
119+
}
120+
if closer != nil {
121+
defer closer.Close()
122+
}
123+
capturer, ok := provider.(interfaces.LogCaptureProvider)
124+
if !ok {
125+
return fmt.Errorf("provider %q does not support log capture", def.provType)
126+
}
127+
req := interfaces.LogCaptureRequest{
128+
ResourceName: logCaptureResourceCloudName(spec),
129+
ResourceType: spec.Type,
130+
ProviderID: logCaptureString(spec.Config["provider_id"]),
131+
ComponentName: componentName,
132+
LogType: normalizedLogType,
133+
TailLines: tailLines,
134+
Follow: follow,
135+
DurationSeconds: durationSeconds,
136+
DeploymentID: deploymentID,
137+
}
138+
return capturer.CaptureLogs(ctx, req, writerLogSink{out: out})
139+
}
140+
141+
func normalizeLogCaptureType(s string) (string, error) {
142+
switch strings.ToUpper(strings.TrimSpace(s)) {
143+
case "", "RUN":
144+
return "RUN", nil
145+
case "BUILD":
146+
return "BUILD", nil
147+
case "DEPLOY":
148+
return "DEPLOY", nil
149+
case "RUN_RESTARTED":
150+
return "RUN_RESTARTED", nil
151+
default:
152+
return "", fmt.Errorf("logs capture: unsupported --type %q (want BUILD, DEPLOY, RUN, or RUN_RESTARTED)", s)
153+
}
154+
}
155+
156+
func resolveLogCaptureResource(cfg *config.WorkflowConfig, envName, name string) (interfaces.ResourceSpec, string, error) {
157+
for i := range cfg.Modules {
158+
m := cfg.Modules[i]
159+
if m.Name != name {
160+
continue
161+
}
162+
resolved := m.Config
163+
if envName != "" {
164+
envResolved, ok := m.ResolveForEnv(envName)
165+
if !ok {
166+
return interfaces.ResourceSpec{}, "", fmt.Errorf("logs capture: resource %q is disabled for environment %q", name, envName)
167+
}
168+
resolved = envResolved.Config
169+
}
170+
cfgMap := config.ExpandEnvInMap(resolved)
171+
providerRef := resolveIaCProviderRef(cfgMap)
172+
if providerRef == "" {
173+
return interfaces.ResourceSpec{}, "", fmt.Errorf("logs capture: resource %q missing iac_provider/provider", name)
174+
}
175+
return interfaces.ResourceSpec{Name: m.Name, Type: m.Type, Config: cfgMap}, providerRef, nil
176+
}
177+
return interfaces.ResourceSpec{}, "", fmt.Errorf("logs capture: resource %q not found", name)
178+
}
179+
180+
func logCaptureResourceCloudName(spec interfaces.ResourceSpec) string {
181+
for _, key := range []string{"app_name", "name"} {
182+
if v := logCaptureString(spec.Config[key]); v != "" {
183+
return v
184+
}
185+
}
186+
return spec.Name
187+
}
188+
189+
func logCaptureString(v any) string {
190+
s, _ := v.(string)
191+
return s
192+
}
193+
194+
type writerLogSink struct {
195+
out io.Writer
196+
}
197+
198+
func (s writerLogSink) WriteLogChunk(chunk interfaces.LogChunk) error {
199+
if len(chunk.Data) == 0 {
200+
return nil
201+
}
202+
_, err := s.out.Write(chunk.Data)
203+
return err
204+
}

0 commit comments

Comments
 (0)