diff --git a/api/api.go b/api/api.go index 20a3f7d8..c1a290ea 100644 --- a/api/api.go +++ b/api/api.go @@ -218,8 +218,9 @@ type ( // There is a limit of 5k lines for log-service's snapshot, so this parameter should NOT // be used in cases where more than 5k lines of logs are written by the logger. Otherwise, // the final logs blob may have missing logs. - SkipOpeningStream bool `json:"skip_opening_stream,omitempty"` - SkipClosingStream bool `json:"skip_closing_stream,omitempty"` + SkipOpeningStream bool `json:"skip_opening_stream,omitempty"` + SkipClosingStream bool `json:"skip_closing_stream,omitempty"` + DualLoggingEnabled bool `json:"dual_logging_enabled,omitempty"` } TIConfig struct { diff --git a/duallog/duallog.go b/duallog/duallog.go new file mode 100644 index 00000000..52d5c41f --- /dev/null +++ b/duallog/duallog.go @@ -0,0 +1,80 @@ +// Copyright 2024 Harness Inc. All rights reserved. +// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license +// that can be found in the licenses directory at the root of this repository, also available at +// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt. + +package duallog + +import ( + "encoding/json" + "fmt" + "os" + "time" +) + +// Meta holds metadata fields for dual-log JSON payloads. +type Meta struct { + AccountID string + OrgID string + ProjectID string + PipelineID string + RunSequence string + PlanExecutionID string + StageIdentifier string + StepIdentifier string + TaskID string +} + +// NewMetaConfig constructs a Meta with the given pipeline context fields. +func NewMetaConfig(accountID, orgID, projectID, pipelineID, buildID, planExecID, stageID, stepID, taskID string) *Meta { + return &Meta{ + AccountID: accountID, + OrgID: orgID, + ProjectID: projectID, + PipelineID: pipelineID, + RunSequence: buildID, + PlanExecutionID: planExecID, + StageIdentifier: stageID, + StepIdentifier: stepID, + TaskID: taskID, + } +} + +// EmitLine writes a single flat-JSON log line to os.Stdout with level "INFO". +// It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. +func EmitLine(meta *Meta, message string, ts time.Time, logType string) { + EmitLineWithLevel(meta, message, ts, logType, "INFO") +} + +// EmitLineWithLevel writes a single flat-JSON log line to os.Stdout with the specified level. +// It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. +func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType, level string) { + payload := map[string]interface{}{ + "timestamp": ts.UTC().Format(time.RFC3339Nano), + "level": level, + "message": message, + "logType": logType, + "logAbstractions": map[string]string{ + "accountId": meta.AccountID, + "orgId": meta.OrgID, + "projectId": meta.ProjectID, + "pipelineId": meta.PipelineID, + "runSequence": meta.RunSequence, + "planExecutionId": meta.PlanExecutionID, + "stageIdentifier": meta.StageIdentifier, + "stepIdentifier": meta.StepIdentifier, + }, + } + if meta.TaskID != "" { + payload["logContext"] = map[string]string{ + "taskId": meta.TaskID, + } + } + jsonBytes, err := json.Marshal(payload) + if err != nil { + fmt.Fprintf(os.Stderr, "duallog: EmitLineWithLevel failed to marshal JSON: %v (logType=%s, accountId=%s)\n", + err, logType, meta.AccountID) + return + } + fmt.Fprintln(os.Stdout, string(jsonBytes)) +} diff --git a/duallog/duallog_test.go b/duallog/duallog_test.go new file mode 100644 index 00000000..c6515a6d --- /dev/null +++ b/duallog/duallog_test.go @@ -0,0 +1,139 @@ +// Copyright 2024 Harness Inc. All rights reserved. +// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license +// that can be found in the licenses directory at the root of this repository, also available at +// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt. + +package duallog + +import ( + "bytes" + "encoding/json" + "io" + "os" + "testing" + "time" +) + +func TestNewMetaConfig(t *testing.T) { + m := NewMetaConfig("acc1", "org1", "proj1", "pipe1", "42", "exec1", "stage1", "step1", "task1") + if m.AccountID != "acc1" || m.OrgID != "org1" || m.ProjectID != "proj1" || + m.PipelineID != "pipe1" || m.RunSequence != "42" || m.PlanExecutionID != "exec1" || + m.StageIdentifier != "stage1" || m.StepIdentifier != "step1" || m.TaskID != "task1" { + t.Errorf("NewMetaConfig did not populate fields correctly: %+v", m) + } +} + +func TestEmitLine(t *testing.T) { + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + meta := &Meta{ + AccountID: "testAcct", + OrgID: "testOrg", + ProjectID: "testProj", + PipelineID: "testPipe", + RunSequence: "99", + PlanExecutionID: "exec123", + StageIdentifier: "stg1", + StepIdentifier: "stp1", + TaskID: "task-abc-DEL", + } + ts := time.Unix(1700000000, 500000000) + EmitLine(meta, "hello world", ts, "LITE_ENGINE_STEP_LOGS") + + w.Close() + var buf bytes.Buffer + _, _ = io.Copy(&buf, r) + os.Stdout = oldStdout + + line := buf.String() + if line == "" { + t.Fatal("EmitLine produced no output") + } + + var parsed map[string]interface{} + if err := json.Unmarshal([]byte(line), &parsed); err != nil { + t.Fatalf("EmitLine output is not valid JSON: %v\nOutput: %s", err, line) + } + + if parsed["message"] != "hello world" { + t.Errorf("expected message 'hello world', got %v", parsed["message"]) + } + if parsed["level"] != "INFO" { + t.Errorf("expected level 'INFO', got %v", parsed["level"]) + } + if parsed["logType"] != "LITE_ENGINE_STEP_LOGS" { + t.Errorf("expected logType 'LITE_ENGINE_STEP_LOGS', got %v", parsed["logType"]) + } + + abs, ok := parsed["logAbstractions"].(map[string]interface{}) + if !ok { + t.Fatal("logAbstractions missing or not a map") + } + if abs["accountId"] != "testAcct" { + t.Errorf("expected accountId 'testAcct', got %v", abs["accountId"]) + } + if abs["stepIdentifier"] != "stp1" { + t.Errorf("expected stepIdentifier 'stp1', got %v", abs["stepIdentifier"]) + } + + logCtx, ok := parsed["logContext"].(map[string]interface{}) + if !ok { + t.Fatal("logContext missing or not a map") + } + if logCtx["taskId"] != "task-abc-DEL" { + t.Errorf("expected taskId 'task-abc-DEL', got %v", logCtx["taskId"]) + } + + tsStr, ok := parsed["timestamp"].(string) + if !ok || tsStr == "" { + t.Fatalf("expected non-empty timestamp string, got %v", parsed["timestamp"]) + } + expectedTS := ts.UTC().Format(time.RFC3339Nano) + if tsStr != expectedTS { + t.Errorf("expected timestamp %q, got %q", expectedTS, tsStr) + } +} + +func TestEmitLineNoTaskID(t *testing.T) { + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + meta := &Meta{ + AccountID: "testAcct", + } + EmitLine(meta, "msg", time.Now(), "TEST") + + w.Close() + var buf bytes.Buffer + _, _ = io.Copy(&buf, r) + os.Stdout = oldStdout + + var parsed map[string]interface{} + if err := json.Unmarshal(buf.Bytes(), &parsed); err != nil { + t.Fatalf("EmitLine output is not valid JSON: %v", err) + } + if _, exists := parsed["logContext"]; exists { + t.Error("logContext should not be present when taskId is empty") + } +} + +func TestEmitLineNilMetaDoesNotPanic(t *testing.T) { + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + meta := &Meta{} + EmitLine(meta, "msg", time.Now(), "TEST") + + w.Close() + var buf bytes.Buffer + _, _ = io.Copy(&buf, r) + os.Stdout = oldStdout + + if buf.Len() == 0 { + t.Error("expected output for empty meta") + } +} diff --git a/handler/setup.go b/handler/setup.go index 45a120c9..8bd63f03 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -16,6 +16,7 @@ import ( "time" "github.com/harness/lite-engine/api" + "github.com/harness/lite-engine/duallog" "github.com/harness/lite-engine/engine" "github.com/harness/lite-engine/engine/spec" "github.com/harness/lite-engine/livelog" @@ -32,7 +33,10 @@ var ( harnessEnableDebugLogs = "HARNESS_ENABLE_DEBUG_LOGS" ) -const OSWindows = "windows" +const ( + OSWindows = "windows" + dualLoggingEnvVar = "HARNESS_LOG_STREAMING_STDOUT_ENABLED" +) func GetNetrc(os string) string { switch os { @@ -64,7 +68,7 @@ func GetNetrcFile(env map[string]string) (*spec.File, error) { } // HandleExecuteStep returns an http.HandlerFunc that executes a step -func HandleSetup(engine *engine.Engine) http.HandlerFunc { +func HandleSetup(engine *engine.Engine) http.HandlerFunc { //nolint:gocyclo return func(w http.ResponseWriter, r *http.Request) { st := time.Now() @@ -81,6 +85,12 @@ func HandleSetup(engine *engine.Engine) http.HandlerFunc { collector := osstats.New(context.Background(), statsInterval, logProcess) setProxyEnvs(s.Envs) + setHarnessEnvs(s.Envs) + + if val, ok := s.Envs[dualLoggingEnvVar]; ok && val == "true" { + s.LogConfig.DualLoggingEnabled = true + } + state := pipeline.GetState() state.Set(s.Secrets, s.LogConfig, getTiCfg(&s.TIConfig, &s.MtlsConfig, s.Envs), s.MtlsConfig, collector) @@ -92,6 +102,10 @@ func HandleSetup(engine *engine.Engine) http.HandlerFunc { Warnln("api: failed to initialize lite-engine log streaming") } + if s.LogConfig.DualLoggingEnabled { + initializeDualLogHook(&s) + } + // Initialize OS stats NDJSON streaming (file + upload) if MemoryMetricsLogKey is provided if err := initializeOSStatsStreaming(&s, state); err != nil { logger.FromRequest(r). @@ -288,6 +302,38 @@ func initializeOSStatsStreaming(setupReq *api.SetupRequest, state *pipeline.Stat return nil } +func setHarnessEnvs(environment map[string]string) { + harnessEnvs := []string{"HARNESS_EXECUTION_ID", "HARNESS_DELEGATE_TASK_ID"} + for _, v := range harnessEnvs { + if val, ok := environment[v]; ok && val != "" { + os.Setenv(v, val) + } + } +} + +// initializeDualLogHook adds a logrus hook that emits each lite-engine internal log entry +// as flat JSON to stdout for OTel collection, when dual logging is enabled. +func initializeDualLogHook(setupReq *api.SetupRequest) { + ti := &setupReq.TIConfig + taskID := "" + if setupReq.Envs != nil { + taskID = setupReq.Envs["HARNESS_DELEGATE_TASK_ID"] + } + + planExecID := os.Getenv("HARNESS_EXECUTION_ID") + + meta := duallog.NewMetaConfig( + ti.AccountID, ti.OrgID, ti.ProjectID, ti.PipelineID, + ti.BuildID, planExecID, ti.StageID, "lite-engine", taskID, + ) + logrus.AddHook(logger.NewDualLogHook(meta, "EXECUTION_LOGS")) + + logger.L.WithFields(logrus.Fields{ + "accountId": ti.AccountID, "pipelineId": ti.PipelineID, + "stageId": ti.StageID, "taskId": taskID, + }).Infoln("api: initialized dual log hook for lite-engine internal logs") +} + // syncSystemClock forces chrony to step the system clock if there is significant drift. // This fixes clock skew on ARM64 VMs after GCP hibernate resume, where the arch_sys_counter // clock source doesn't auto-adjust (unlike x86's kvm-clock). Without this, chrony may diff --git a/livelog/livelog.go b/livelog/livelog.go index 07553172..e729cda4 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/harness/lite-engine/duallog" "github.com/harness/lite-engine/internal/safego" "github.com/harness/lite-engine/logstream" "github.com/harness/lite-engine/logstream/remote" @@ -64,6 +65,9 @@ type Writer struct { trimNewLineSuffix bool lastFlushTime time.Time ctx context.Context + + dualLogMeta *duallog.Meta + dualLogType string } // New returns a new writer @@ -99,6 +103,12 @@ func (b *Writer) SetInterval(interval time.Duration) { b.interval = interval } +// SetDualLogConfig enables dual logging to stdout in flat JSON format. +func (b *Writer) SetDualLogConfig(meta *duallog.Meta, logType string) { + b.dualLogMeta = meta + b.dualLogType = logType +} + // Write uploads the live log stream to the server. func (b *Writer) Write(p []byte) (n int, err error) { var res []byte @@ -138,6 +148,10 @@ func (b *Writer) Write(p []byte) (n int, err error) { ElaspedTime: int64(time.Since(b.now).Seconds()), } + if b.dualLogMeta != nil { + duallog.EmitLine(b.dualLogMeta, line.Message, line.Timestamp, b.dualLogType) + } + jsonLine, _ := getLineBytes(line) if b.printToStdout { diff --git a/logger/duallog_hook.go b/logger/duallog_hook.go new file mode 100644 index 00000000..b291091c --- /dev/null +++ b/logger/duallog_hook.go @@ -0,0 +1,43 @@ +// Copyright 2024 Harness Inc. All rights reserved. +// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license +// that can be found in the licenses directory at the root of this repository, also available at +// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt. + +package logger + +import ( + "strings" + + "github.com/harness/lite-engine/duallog" + "github.com/sirupsen/logrus" +) + +// DualLogHook is a logrus hook that emits each log entry as flat JSON to stdout +// via duallog.EmitLine for OTel collection. It uses fmt.Fprintln(os.Stdout, ...) +// internally (via EmitLine) so there is no recursion risk with logrus. +type DualLogHook struct { + meta *duallog.Meta + logType string +} + +// NewDualLogHook creates a DualLogHook that will emit JSON logs to stdout. +func NewDualLogHook(meta *duallog.Meta, logType string) *DualLogHook { + return &DualLogHook{ + meta: meta, + logType: logType, + } +} + +// Levels returns all log levels so every logrus entry is captured. +func (h *DualLogHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +// Fire is called for each logrus entry. It formats the entry and emits +// a flat JSON line via duallog.EmitLine. +func (h *DualLogHook) Fire(entry *logrus.Entry) error { + msg := formatLogEntry(entry) + level := strings.ToUpper(entry.Level.String()) + duallog.EmitLineWithLevel(h.meta, msg, entry.Time, h.logType, level) + return nil +} diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index cf396bcb..41410db7 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -9,11 +9,13 @@ import ( "encoding/json" "fmt" "io" + "os" "sync" "time" "github.com/drone/runner-go/pipeline/runtime" "github.com/harness/lite-engine/api" + "github.com/harness/lite-engine/duallog" "github.com/harness/lite-engine/engine" errorcat "github.com/harness/lite-engine/engine/errors" "github.com/harness/lite-engine/engine/logutil" @@ -357,6 +359,7 @@ func (e *StepExecutor) executeStep(r *api.StartStepRequest, wr logstream.Writer) state, err := e.executeStepDrone(r) return state, nil, nil, nil, nil, nil, "", err } + // First try to get TI Config from pipeline state, if empty then use the one from step request var tiConfig *tiCfg.Cfg state := pipeline.GetState() @@ -497,6 +500,27 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { pipelineState.GetLogConfig().TrimNewLineSuffix, r.LogConfig.SkipOpeningStream, r.LogConfig.SkipClosingStream) + + if pipelineState.GetLogConfig().DualLoggingEnabled { + tiCfg := pipelineState.GetTIConfig() + taskID := r.StepStatus.TaskID + if taskID == "" { + taskID = os.Getenv("HARNESS_DELEGATE_TASK_ID") + } + meta := &duallog.Meta{ + AccountID: pipelineState.GetLogConfig().AccountID, + OrgID: tiCfg.GetOrgID(), + ProjectID: tiCfg.GetProjectID(), + PipelineID: tiCfg.GetPipelineID(), + RunSequence: tiCfg.GetBuildID(), + PlanExecutionID: os.Getenv("HARNESS_EXECUTION_ID"), + StageIdentifier: tiCfg.GetStageID(), + StepIdentifier: r.Name, + TaskID: taskID, + } + wc.SetDualLogConfig(meta, "EXECUTION_LOGS") + } + wr := logstream.NewReplacerWithEnvs(wc, secrets, r.Envs) safego.SafeGo("log_stream_open", func() { wr.Open() //nolint:errcheck diff --git a/pipeline/runtime/step_executor_stateless.go b/pipeline/runtime/step_executor_stateless.go index 61ba4ded..9e9346b8 100644 --- a/pipeline/runtime/step_executor_stateless.go +++ b/pipeline/runtime/step_executor_stateless.go @@ -77,7 +77,7 @@ func (e *StepExecutorStateless) executeStep( //nolint:gocritic runFunc := func(ctx context.Context, step *spec.Step, output io.Writer, isDrone bool, isHosted bool) (*runtime.State, error) { return engine.RunStep(ctx, engine.Opts{Opts: docker.Opts{DockerClient: dockerClient}}, step, output, cfg, isDrone, isHosted) } - // Temporary: this should be removed once we have a better way of handling test intelligence. + // Temporary: this should be removed once we have a better way of handling test intelligence. tiConfig := getTiCfg(&r.TIConfig, &r.MtlsConfig, r.Envs) r.DeleteTempStepFiles = true diff --git a/pipeline/state.go b/pipeline/state.go index a7dab51c..d6fee084 100644 --- a/pipeline/state.go +++ b/pipeline/state.go @@ -16,6 +16,7 @@ import ( "github.com/harness/lite-engine/logstream/remote" "github.com/harness/lite-engine/osstats" tiCfg "github.com/harness/lite-engine/ti/config" + "github.com/sirupsen/logrus" ) var ( @@ -72,6 +73,14 @@ func (s *State) Set(secrets []string, logConfig api.LogConfig, tiConfig tiCfg.Cf s.tiConfig = tiConfig s.mtlsConfig = mtlsConfig s.statsCollector = collector + + logrus.WithFields(logrus.Fields{ + "dualLoggingEnabled": logConfig.DualLoggingEnabled, + "logConfigURL": logConfig.URL, + "logConfigAccountID": logConfig.AccountID, + "trimNewLineSuffix": logConfig.TrimNewLineSuffix, + "secretsCount": len(secrets), + }).Info("pipeline.State.Set: pipeline state initialized with log config") } func (s *State) GetSecrets() []string {