Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions cmd/wfctl/iac_typed_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
"strings"
"time"

"google.golang.org/grpc"
Expand All @@ -53,6 +56,7 @@ const (
iacServiceMigrationRepairer = "workflow.plugin.external.iac.IaCProviderMigrationRepairer"
iacServiceValidator = "workflow.plugin.external.iac.IaCProviderValidator"
iacServiceDriftConfigDetect = "workflow.plugin.external.iac.IaCProviderDriftConfigDetector"
iacServiceLogCapture = "workflow.plugin.external.iac.IaCProviderLogCapture"
iacServiceFinalizer = "workflow.plugin.external.iac.IaCProviderFinalizer"
iacServiceResourceDriver = "workflow.plugin.external.iac.ResourceDriver"
)
Expand All @@ -78,6 +82,7 @@ type typedIaCAdapter struct {
repairer pb.IaCProviderMigrationRepairerClient
validator pb.IaCProviderValidatorClient
driftCfg pb.IaCProviderDriftConfigDetectorClient
logCapture pb.IaCProviderLogCaptureClient
finalizer pb.IaCProviderFinalizerClient
resourceDriv pb.ResourceDriverClient

Expand Down Expand Up @@ -117,6 +122,9 @@ func newTypedIaCAdapter(conn *grpc.ClientConn, registered map[string]bool) *type
if registered[iacServiceDriftConfigDetect] {
a.driftCfg = pb.NewIaCProviderDriftConfigDetectorClient(conn)
}
if registered[iacServiceLogCapture] {
a.logCapture = pb.NewIaCProviderLogCaptureClient(conn)
}
if registered[iacServiceFinalizer] {
a.finalizer = pb.NewIaCProviderFinalizerClient(conn)
}
Expand Down Expand Up @@ -192,6 +200,13 @@ func (a *typedIaCAdapter) DriftConfigDetector() pb.IaCProviderDriftConfigDetecto
return a.driftCfg
}

// LogCapture returns the typed pb.IaCProviderLogCaptureClient or nil
// when the plugin did not register IaCProviderLogCapture. Used by
// `wfctl logs capture`.
func (a *typedIaCAdapter) LogCapture() pb.IaCProviderLogCaptureClient {
return a.logCapture
}

// CredentialRevoker returns the typed
// pb.IaCProviderCredentialRevokerClient or nil when the plugin did not
// register IaCProviderCredentialRevoker. Used by
Expand Down Expand Up @@ -591,6 +606,42 @@ func (a *typedIaCAdapter) RepairDirtyMigration(ctx context.Context, req interfac
return migrationRepairResultFromPB(resp.GetResult()), nil
}

// CaptureLogs satisfies interfaces.LogCaptureProvider.
func (a *typedIaCAdapter) CaptureLogs(ctx context.Context, req interfaces.LogCaptureRequest, sink interfaces.LogCaptureSink) error {
if a.logCapture == nil {
return unimplementedOptional(iacServiceLogCapture)
}
pbReq, err := logCaptureRequestToPB(req)
if err != nil {
return err
}
stream, err := a.logCapture.CaptureLogs(ctx, pbReq)
if err != nil {
return translateRPCErr(err)
}
for {
chunk, recvErr := stream.Recv()
if recvErr != nil {
if errors.Is(recvErr, io.EOF) {
return nil
}
return translateRPCErr(recvErr)
}
if sink != nil {
if err := sink.WriteLogChunk(interfaces.LogChunk{
Data: append([]byte(nil), chunk.GetData()...),
Source: chunk.GetSource(),
EOF: chunk.GetEof(),
}); err != nil {
return err
}
}
if chunk.GetEof() {
return nil
}
}
}

// ─── typedResourceDriver (per-type ResourceDriver wrapper) ──────────────────

// typedResourceDriver implements interfaces.ResourceDriver on top of the
Expand Down Expand Up @@ -1261,6 +1312,46 @@ func migrationRepairResultFromPB(r *pb.MigrationRepairResult) *interfaces.Migrat
}
}

func logCaptureRequestToPB(r interfaces.LogCaptureRequest) (*pb.CaptureLogsRequest, error) {
tailLines := r.TailLines
if tailLines < 0 {
tailLines = 0
} else if tailLines > math.MaxInt32 {
tailLines = math.MaxInt32
}
logType, err := logCaptureTypeToPB(r.LogType)
if err != nil {
return nil, err
}
return &pb.CaptureLogsRequest{
ResourceName: r.ResourceName,
ResourceType: r.ResourceType,
ProviderId: r.ProviderID,
ComponentName: r.ComponentName,
LogType: logType,
TailLines: int32(tailLines), //nolint:gosec // G115: clamped above
Follow: r.Follow,
DurationSeconds: r.DurationSeconds,
DeploymentId: r.DeploymentID,
}, nil
}

func logCaptureTypeToPB(s string) (pb.LogCaptureType, error) {
switch strings.ToUpper(strings.TrimSpace(s)) {
case "BUILD":
return pb.LogCaptureType_LOG_CAPTURE_TYPE_BUILD, nil
case "DEPLOY":
return pb.LogCaptureType_LOG_CAPTURE_TYPE_DEPLOY, nil
case "", "RUN":
return pb.LogCaptureType_LOG_CAPTURE_TYPE_RUN, nil
case "RUN_RESTARTED":
return pb.LogCaptureType_LOG_CAPTURE_TYPE_RUN_RESTARTED, nil
default:
return pb.LogCaptureType_LOG_CAPTURE_TYPE_UNSPECIFIED,
fmt.Errorf("log capture: unsupported log type %q (want BUILD, DEPLOY, RUN, or RUN_RESTARTED)", s)
}
}

func timeToPB(t time.Time) *timestamppb.Timestamp {
if t.IsZero() {
return nil
Expand Down
18 changes: 18 additions & 0 deletions cmd/wfctl/iac_typed_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"net"
"strings"
"testing"

"google.golang.org/grpc"
Expand Down Expand Up @@ -127,6 +128,23 @@ func TestTypedAdapter_ValidatePlanReturnsNilWhenValidatorAbsent(t *testing.T) {
}
}

func TestTypedAdapter_CaptureLogsRejectsUnknownType(t *testing.T) {
adapter := fixtureTypedAdapter{
LogCapture: &pb.UnimplementedIaCProviderLogCaptureServer{},
}.build(t)

err := adapter.CaptureLogs(context.Background(), interfaces.LogCaptureRequest{
ResourceName: "app",
LogType: "typo",
}, nil)
if err == nil {
t.Fatal("expected unsupported log type error")
}
if !strings.Contains(err.Error(), "unsupported log type") {
t.Fatalf("error = %q, want unsupported log type", err.Error())
}
}

// TestTypedAdapter_DriftClassEnumRoundTrip ensures every DriftClass
// constant survives the proto-enum conversion in both directions —
// regression guard against silent drop to DriftClassUnknown.
Expand Down
5 changes: 5 additions & 0 deletions cmd/wfctl/iac_typed_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type fixtureTypedAdapter struct {
Validator pb.IaCProviderValidatorServer
DriftConfigDetect pb.IaCProviderDriftConfigDetectorServer
ResourceDriver pb.ResourceDriverServer
LogCapture pb.IaCProviderLogCaptureServer
}

// build spins up a bufconn-backed gRPC server running f's set of services,
Expand Down Expand Up @@ -146,6 +147,10 @@ func (f fixtureTypedAdapter) build(t *testing.T) *typedIaCAdapter {
pb.RegisterResourceDriverServer(server, f.ResourceDriver)
registered[iacServiceResourceDriver] = true
}
if f.LogCapture != nil {
pb.RegisterIaCProviderLogCaptureServer(server, f.LogCapture)
registered[iacServiceLogCapture] = true
}

go func() { _ = server.Serve(listener) }()
t.Cleanup(server.Stop)
Expand Down
204 changes: 204 additions & 0 deletions cmd/wfctl/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package main

import (
"context"
"flag"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/GoCodeAlone/workflow/config"
"github.com/GoCodeAlone/workflow/interfaces"
)

func runLogs(args []string) error {
return runLogsWithOutput(args, os.Stdout)
}

func runLogsWithOutput(args []string, out io.Writer) error {
if len(args) < 1 {
return logsUsage()
}
switch args[0] {
case "capture":
return runLogsCapture(args[1:], out)
default:
return logsUsage()
}
}

func logsUsage() error {
fmt.Fprintf(flag.CommandLine.Output(), `Usage: wfctl logs <action> [options]

Actions:
capture Capture provider logs for an infrastructure resource

Options:
--config <file> Config file (default: infra.yaml or config/infra.yaml)
--env <name> Environment name for provider config resolution
--resource <name> infra.container_service resource name
--component <name> Provider component name (for example App Platform service)
--type <type> Log type: BUILD, DEPLOY, RUN, RUN_RESTARTED (default RUN)
--tail <n> Tail line count (default 300)
--follow Follow live logs until --duration expires
--duration <d> Max follow duration (default 2m)
--deployment <id> Provider deployment ID when supported
--plugin-dir <dir> External plugin directory
`)
return fmt.Errorf("missing or unknown logs action")
}

func runLogsCapture(args []string, out io.Writer) error {
fs := flag.NewFlagSet("logs capture", flag.ContinueOnError)
fs.SetOutput(flag.CommandLine.Output())
var configFile, envName, resourceName, componentName, logType, deploymentID, pluginDir string
var tailLines int
var follow bool
var duration time.Duration
fs.StringVar(&configFile, "config", "", "Config file")
fs.StringVar(&configFile, "c", "", "Config file")
fs.StringVar(&envName, "env", "", "Environment name")
fs.StringVar(&resourceName, "resource", "", "infra.container_service resource name")
fs.StringVar(&componentName, "component", "", "Provider component name")
fs.StringVar(&logType, "type", "RUN", "Log type")
fs.IntVar(&tailLines, "tail", 300, "Tail line count")
fs.BoolVar(&follow, "follow", false, "Follow live logs")
fs.DurationVar(&duration, "duration", 2*time.Minute, "Max follow duration")
fs.StringVar(&deploymentID, "deployment", "", "Provider deployment ID")
fs.StringVar(&pluginDir, "plugin-dir", "", "External plugin directory")
if err := fs.Parse(args); err != nil {
return err
}
if resourceName == "" {
return fmt.Errorf("logs capture: --resource is required")
}
normalizedLogType, err := normalizeLogCaptureType(logType)
if err != nil {
return err
}
cfgFile, err := resolveInfraConfig(fs, configFile)
if err != nil {
return err
}
cfg, err := config.LoadFromFile(cfgFile)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
spec, providerRef, err := resolveLogCaptureResource(cfg, envName, resourceName)
if err != nil {
return err
}
providerDefs, _, disabled := resolveProviderDefs(cfg, envName)
if _, ok := disabled[providerRef]; ok {
return fmt.Errorf("logs capture: provider %q is disabled for environment %q", providerRef, envName)
}
def, ok := providerDefs[providerRef]
if !ok || def.provType == "" {
return fmt.Errorf("logs capture: resource %q references unknown iac.provider %q", resourceName, providerRef)
}

prevPluginDir := currentInfraPluginDir
currentInfraPluginDir = pluginDir
defer func() { currentInfraPluginDir = prevPluginDir }()

ctx := context.Background()
if follow && duration > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, duration)
defer cancel()
}
durationSeconds := int64(0)
if follow {
durationSeconds = int64(duration / time.Second)
}
provider, closer, err := resolveIaCProvider(ctx, def.provType, def.provCfg)
if err != nil {
return fmt.Errorf("load provider %q: %w", def.provType, err)
}
if closer != nil {
defer closer.Close()
}
capturer, ok := provider.(interfaces.LogCaptureProvider)
if !ok {
return fmt.Errorf("provider %q does not support log capture", def.provType)
}
req := interfaces.LogCaptureRequest{
ResourceName: logCaptureResourceCloudName(spec),
ResourceType: spec.Type,
ProviderID: logCaptureString(spec.Config["provider_id"]),
ComponentName: componentName,
LogType: normalizedLogType,
TailLines: tailLines,
Comment thread
intel352 marked this conversation as resolved.
Follow: follow,
DurationSeconds: durationSeconds,
DeploymentID: deploymentID,
}
return capturer.CaptureLogs(ctx, req, writerLogSink{out: out})
}

func normalizeLogCaptureType(s string) (string, error) {
switch strings.ToUpper(strings.TrimSpace(s)) {
case "", "RUN":
return "RUN", nil
case "BUILD":
return "BUILD", nil
case "DEPLOY":
return "DEPLOY", nil
case "RUN_RESTARTED":
return "RUN_RESTARTED", nil
default:
return "", fmt.Errorf("logs capture: unsupported --type %q (want BUILD, DEPLOY, RUN, or RUN_RESTARTED)", s)
}
}

func resolveLogCaptureResource(cfg *config.WorkflowConfig, envName, name string) (interfaces.ResourceSpec, string, error) {
for i := range cfg.Modules {
m := cfg.Modules[i]
if m.Name != name {
continue
}
resolved := m.Config
if envName != "" {
envResolved, ok := m.ResolveForEnv(envName)
if !ok {
return interfaces.ResourceSpec{}, "", fmt.Errorf("logs capture: resource %q is disabled for environment %q", name, envName)
}
resolved = envResolved.Config
}
cfgMap := config.ExpandEnvInMapPreservingKeys(resolved, infraPreserveKeys)
providerRef := resolveIaCProviderRef(cfgMap)
if providerRef == "" {
return interfaces.ResourceSpec{}, "", fmt.Errorf("logs capture: resource %q missing iac_provider/provider", name)
}
return interfaces.ResourceSpec{Name: m.Name, Type: m.Type, Config: cfgMap}, providerRef, nil
Comment thread
intel352 marked this conversation as resolved.
}
return interfaces.ResourceSpec{}, "", fmt.Errorf("logs capture: resource %q not found", name)
}

func logCaptureResourceCloudName(spec interfaces.ResourceSpec) string {
for _, key := range []string{"app_name", "name"} {
if v := logCaptureString(spec.Config[key]); v != "" {
return v
}
}
return spec.Name
}

func logCaptureString(v any) string {
s, _ := v.(string)
return s
}

type writerLogSink struct {
out io.Writer
}

func (s writerLogSink) WriteLogChunk(chunk interfaces.LogChunk) error {
if len(chunk.Data) == 0 {
return nil
}
_, err := s.out.Write(chunk.Data)
return err
}
Loading
Loading