From 37850227a8bac674d5d54cb6d263ed351bb3903e Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Fri, 20 Mar 2026 09:21:04 -0700 Subject: [PATCH 01/14] feat:[CI-21635]: Changes for supporting dual logging --- api/api.go | 5 +- duallog/duallog.go | 84 +++++++++++++++ duallog/duallog_test.go | 164 ++++++++++++++++++++++++++++++ livelog/livelog.go | 14 +++ pipeline/runtime/step_executor.go | 25 +++++ 5 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 duallog/duallog.go create mode 100644 duallog/duallog_test.go diff --git a/api/api.go b/api/api.go index 20a3f7d8..c1a290ea 100644 --- a/api/api.go +++ b/api/api.go @@ -218,8 +218,9 @@ type ( // There is a limit of 5k lines for log-service's snapshot, so this parameter should NOT // be used in cases where more than 5k lines of logs are written by the logger. Otherwise, // the final logs blob may have missing logs. - SkipOpeningStream bool `json:"skip_opening_stream,omitempty"` - SkipClosingStream bool `json:"skip_closing_stream,omitempty"` + SkipOpeningStream bool `json:"skip_opening_stream,omitempty"` + SkipClosingStream bool `json:"skip_closing_stream,omitempty"` + DualLoggingEnabled bool `json:"dual_logging_enabled,omitempty"` } TIConfig struct { diff --git a/duallog/duallog.go b/duallog/duallog.go new file mode 100644 index 00000000..cef16e0c --- /dev/null +++ b/duallog/duallog.go @@ -0,0 +1,84 @@ +// Copyright 2024 Harness Inc. All rights reserved. +// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license +// that can be found in the licenses directory at the root of this repository, also available at +// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt. + +package duallog + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "time" +) + +// Meta holds metadata fields for dual-log JSON payloads. +type Meta struct { + AccountID string + OrgID string + ProjectID string + PipelineID string + RunSequence string + PlanExecutionID string + StageIdentifier string + StepIdentifier string + TaskID string +} + +// NewMetaFromTIConfig constructs a Meta from TI config fields and other sources. +func NewMetaFromTIConfig(accountID, orgID, projectID, pipelineID, buildID, planExecID, stageID, stepID, taskID string) *Meta { + return &Meta{ + AccountID: accountID, + OrgID: orgID, + ProjectID: projectID, + PipelineID: pipelineID, + RunSequence: buildID, + PlanExecutionID: planExecID, + StageIdentifier: stageID, + StepIdentifier: stepID, + TaskID: taskID, + } +} + +// ExtractStepID extracts the last segment from a logKey of the form +// accountId/orgId/projectId/pipelineId/runSequence/stageId/stepId. +func ExtractStepID(logKey string) string { + if logKey == "" { + return "" + } + parts := strings.Split(logKey, "/") + return parts[len(parts)-1] +} + +// EmitLine writes a single flat-JSON log line to os.Stdout. +// It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. +func EmitLine(meta *Meta, message string, ts time.Time, logType string) { + payload := map[string]interface{}{ + "timestamp": float64(ts.UnixNano()) / 1e9, + "level": "INFO", + "message": message, + "logType": logType, + "log_source": "streaming", + "logAbstractions": map[string]string{ + "accountId": meta.AccountID, + "orgId": meta.OrgID, + "projectId": meta.ProjectID, + "pipelineId": meta.PipelineID, + "runSequence": meta.RunSequence, + "planExecutionId": meta.PlanExecutionID, + "stageIdentifier": meta.StageIdentifier, + "stepIdentifier": meta.StepIdentifier, + }, + } + if meta.TaskID != "" { + payload["logContext"] = map[string]string{ + "taskId": meta.TaskID, + } + } + jsonBytes, err := json.Marshal(payload) + if err != nil { + return + } + fmt.Fprintln(os.Stdout, string(jsonBytes)) +} diff --git a/duallog/duallog_test.go b/duallog/duallog_test.go new file mode 100644 index 00000000..5460cdcf --- /dev/null +++ b/duallog/duallog_test.go @@ -0,0 +1,164 @@ +// Copyright 2024 Harness Inc. All rights reserved. +// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license +// that can be found in the licenses directory at the root of this repository, also available at +// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt. + +package duallog + +import ( + "bytes" + "encoding/json" + "io" + "os" + "testing" + "time" +) + +func TestExtractStepID(t *testing.T) { + tests := []struct { + name string + logKey string + want string + }{ + {"full key", "acct/org/proj/pipe/1/stage/step1", "step1"}, + {"single segment", "stepOnly", "stepOnly"}, + {"empty", "", ""}, + {"two segments", "prefix/stepId", "stepId"}, + {"trailing slash stripped", "a/b/c", "c"}, + {"simplified format", "h61p38AZSV6MzEkpWWBtew/pipeline/k8s0tes/2/-1xGcOrSDS/ci/addon:20004", "addon:20004"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ExtractStepID(tt.logKey) + if got != tt.want { + t.Errorf("ExtractStepID(%q) = %q, want %q", tt.logKey, got, tt.want) + } + }) + } +} + +func TestNewMetaFromTIConfig(t *testing.T) { + m := NewMetaFromTIConfig("acc1", "org1", "proj1", "pipe1", "42", "exec1", "stage1", "step1", "task1") + if m.AccountID != "acc1" || m.OrgID != "org1" || m.ProjectID != "proj1" || + m.PipelineID != "pipe1" || m.RunSequence != "42" || m.PlanExecutionID != "exec1" || + m.StageIdentifier != "stage1" || m.StepIdentifier != "step1" || m.TaskID != "task1" { + t.Errorf("NewMetaFromTIConfig did not populate fields correctly: %+v", m) + } +} + +func TestEmitLine(t *testing.T) { + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + meta := &Meta{ + AccountID: "testAcct", + OrgID: "testOrg", + ProjectID: "testProj", + PipelineID: "testPipe", + RunSequence: "99", + PlanExecutionID: "exec123", + StageIdentifier: "stg1", + StepIdentifier: "stp1", + TaskID: "task-abc-DEL", + } + ts := time.Unix(1700000000, 500000000) + EmitLine(meta, "hello world", ts, "LITE_ENGINE_STEP_LOGS") + + w.Close() + var buf bytes.Buffer + io.Copy(&buf, r) + os.Stdout = oldStdout + + line := buf.String() + if line == "" { + t.Fatal("EmitLine produced no output") + } + + var parsed map[string]interface{} + if err := json.Unmarshal([]byte(line), &parsed); err != nil { + t.Fatalf("EmitLine output is not valid JSON: %v\nOutput: %s", err, line) + } + + if parsed["message"] != "hello world" { + t.Errorf("expected message 'hello world', got %v", parsed["message"]) + } + if parsed["level"] != "INFO" { + t.Errorf("expected level 'INFO', got %v", parsed["level"]) + } + if parsed["logType"] != "LITE_ENGINE_STEP_LOGS" { + t.Errorf("expected logType 'LITE_ENGINE_STEP_LOGS', got %v", parsed["logType"]) + } + if parsed["log_source"] != "streaming" { + t.Errorf("expected log_source 'streaming', got %v", parsed["log_source"]) + } + + abs, ok := parsed["logAbstractions"].(map[string]interface{}) + if !ok { + t.Fatal("logAbstractions missing or not a map") + } + if abs["accountId"] != "testAcct" { + t.Errorf("expected accountId 'testAcct', got %v", abs["accountId"]) + } + if abs["stepIdentifier"] != "stp1" { + t.Errorf("expected stepIdentifier 'stp1', got %v", abs["stepIdentifier"]) + } + + logCtx, ok := parsed["logContext"].(map[string]interface{}) + if !ok { + t.Fatal("logContext missing or not a map") + } + if logCtx["taskId"] != "task-abc-DEL" { + t.Errorf("expected taskId 'task-abc-DEL', got %v", logCtx["taskId"]) + } + + ts64, ok := parsed["timestamp"].(float64) + if !ok || ts64 <= 0 { + t.Errorf("expected positive timestamp, got %v", parsed["timestamp"]) + } +} + +func TestEmitLineNoTaskID(t *testing.T) { + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + meta := &Meta{ + AccountID: "testAcct", + } + EmitLine(meta, "msg", time.Now(), "TEST") + + w.Close() + var buf bytes.Buffer + io.Copy(&buf, r) + os.Stdout = oldStdout + + var parsed map[string]interface{} + if err := json.Unmarshal(buf.Bytes(), &parsed); err != nil { + t.Fatalf("EmitLine output is not valid JSON: %v", err) + } + if _, exists := parsed["logContext"]; exists { + t.Error("logContext should not be present when taskId is empty") + } + if parsed["log_source"] != "streaming" { + t.Errorf("expected log_source 'streaming', got %v", parsed["log_source"]) + } +} + +func TestEmitLineNilMetaDoesNotPanic(t *testing.T) { + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + meta := &Meta{} + EmitLine(meta, "msg", time.Now(), "TEST") + + w.Close() + var buf bytes.Buffer + io.Copy(&buf, r) + os.Stdout = oldStdout + + if buf.Len() == 0 { + t.Error("expected output for empty meta") + } +} diff --git a/livelog/livelog.go b/livelog/livelog.go index 07553172..e729cda4 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/harness/lite-engine/duallog" "github.com/harness/lite-engine/internal/safego" "github.com/harness/lite-engine/logstream" "github.com/harness/lite-engine/logstream/remote" @@ -64,6 +65,9 @@ type Writer struct { trimNewLineSuffix bool lastFlushTime time.Time ctx context.Context + + dualLogMeta *duallog.Meta + dualLogType string } // New returns a new writer @@ -99,6 +103,12 @@ func (b *Writer) SetInterval(interval time.Duration) { b.interval = interval } +// SetDualLogConfig enables dual logging to stdout in flat JSON format. +func (b *Writer) SetDualLogConfig(meta *duallog.Meta, logType string) { + b.dualLogMeta = meta + b.dualLogType = logType +} + // Write uploads the live log stream to the server. func (b *Writer) Write(p []byte) (n int, err error) { var res []byte @@ -138,6 +148,10 @@ func (b *Writer) Write(p []byte) (n int, err error) { ElaspedTime: int64(time.Since(b.now).Seconds()), } + if b.dualLogMeta != nil { + duallog.EmitLine(b.dualLogMeta, line.Message, line.Timestamp, b.dualLogType) + } + jsonLine, _ := getLineBytes(line) if b.printToStdout { diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index cf396bcb..b1396fa1 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -9,11 +9,13 @@ import ( "encoding/json" "fmt" "io" + "os" "sync" "time" "github.com/drone/runner-go/pipeline/runtime" "github.com/harness/lite-engine/api" + "github.com/harness/lite-engine/duallog" "github.com/harness/lite-engine/engine" errorcat "github.com/harness/lite-engine/engine/errors" "github.com/harness/lite-engine/engine/logutil" @@ -497,6 +499,29 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { pipelineState.GetLogConfig().TrimNewLineSuffix, r.LogConfig.SkipOpeningStream, r.LogConfig.SkipClosingStream) + + if pipelineState.GetLogConfig().DualLoggingEnabled { + tiCfg := pipelineState.GetTIConfig() + var accountID, orgID, projectID, pipelineID, buildID, stageID string + if tiCfg != nil && tiCfg.GetClient() != nil { + accountID = tiCfg.GetAccountID() + orgID = tiCfg.GetOrgID() + projectID = tiCfg.GetProjectID() + pipelineID = tiCfg.GetPipelineID() + buildID = tiCfg.GetBuildID() + stageID = tiCfg.GetStageID() + } + meta := duallog.NewMetaFromTIConfig( + accountID, orgID, projectID, + pipelineID, buildID, + os.Getenv("HARNESS_EXECUTION_ID"), + stageID, + duallog.ExtractStepID(r.LogKey), + r.StepStatus.TaskID, + ) + wc.SetDualLogConfig(meta, "LITE_ENGINE_STEP_LOGS") + } + wr := logstream.NewReplacerWithEnvs(wc, secrets, r.Envs) safego.SafeGo("log_stream_open", func() { wr.Open() //nolint:errcheck From ea2f5fda6cec4acf23eda133ad441710001793f2 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Fri, 20 Mar 2026 09:51:39 -0700 Subject: [PATCH 02/14] feat:[CI-21635]: Added logging for debugging TO REVERT --- duallog/duallog.go | 18 +++++++++++++++- livelog/livelog.go | 7 ++++++ pipeline/runtime/step_executor.go | 36 +++++++++++++++++++++++++++++-- pipeline/state.go | 9 ++++++++ 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/duallog/duallog.go b/duallog/duallog.go index cef16e0c..a2a7e85b 100644 --- a/duallog/duallog.go +++ b/duallog/duallog.go @@ -11,6 +11,8 @@ import ( "os" "strings" "time" + + "github.com/sirupsen/logrus" ) // Meta holds metadata fields for dual-log JSON payloads. @@ -28,6 +30,11 @@ type Meta struct { // NewMetaFromTIConfig constructs a Meta from TI config fields and other sources. func NewMetaFromTIConfig(accountID, orgID, projectID, pipelineID, buildID, planExecID, stageID, stepID, taskID string) *Meta { + logrus.WithFields(logrus.Fields{ + "accountID": accountID, "orgID": orgID, "projectID": projectID, + "pipelineID": pipelineID, "buildID": buildID, "planExecID": planExecID, + "stageID": stageID, "stepID": stepID, "taskID": taskID, + }).Info("duallog: NewMetaFromTIConfig called") return &Meta{ AccountID: accountID, OrgID: orgID, @@ -45,10 +52,15 @@ func NewMetaFromTIConfig(accountID, orgID, projectID, pipelineID, buildID, planE // accountId/orgId/projectId/pipelineId/runSequence/stageId/stepId. func ExtractStepID(logKey string) string { if logKey == "" { + logrus.Warn("duallog: ExtractStepID called with empty logKey") return "" } parts := strings.Split(logKey, "/") - return parts[len(parts)-1] + stepID := parts[len(parts)-1] + logrus.WithFields(logrus.Fields{ + "logKey": logKey, "extractedStepID": stepID, "partsCount": len(parts), + }).Info("duallog: ExtractStepID result") + return stepID } // EmitLine writes a single flat-JSON log line to os.Stdout. @@ -78,6 +90,10 @@ func EmitLine(meta *Meta, message string, ts time.Time, logType string) { } jsonBytes, err := json.Marshal(payload) if err != nil { + logrus.WithFields(logrus.Fields{ + "error": err, "logType": logType, "accountId": meta.AccountID, + "stepIdentifier": meta.StepIdentifier, + }).Error("duallog: EmitLine failed to marshal JSON payload") return } fmt.Fprintln(os.Stdout, string(jsonBytes)) diff --git a/livelog/livelog.go b/livelog/livelog.go index e729cda4..bdcc45ad 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -107,6 +107,13 @@ func (b *Writer) SetInterval(interval time.Duration) { func (b *Writer) SetDualLogConfig(meta *duallog.Meta, logType string) { b.dualLogMeta = meta b.dualLogType = logType + logrus.WithFields(logrus.Fields{ + "logType": logType, "logKey": b.key, "logSource": b.name, + "accountId": meta.AccountID, "orgId": meta.OrgID, "projectId": meta.ProjectID, + "pipelineId": meta.PipelineID, "runSequence": meta.RunSequence, + "planExecutionId": meta.PlanExecutionID, "stageIdentifier": meta.StageIdentifier, + "stepIdentifier": meta.StepIdentifier, "taskId": meta.TaskID, + }).Info("livelog: dual log config set on Writer") } // Write uploads the live log stream to the server. diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index b1396fa1..10160fbc 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -500,6 +500,13 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { r.LogConfig.SkipOpeningStream, r.LogConfig.SkipClosingStream) + logrus.WithFields(logrus.Fields{ + "dual_logging_enabled": pipelineState.GetLogConfig().DualLoggingEnabled, + "log_key": r.LogKey, + "step_name": r.Name, + "task_id": r.StepStatus.TaskID, + }).Info("getLogStreamWriter: checking dual logging config") + if pipelineState.GetLogConfig().DualLoggingEnabled { tiCfg := pipelineState.GetTIConfig() var accountID, orgID, projectID, pipelineID, buildID, stageID string @@ -510,16 +517,41 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { pipelineID = tiCfg.GetPipelineID() buildID = tiCfg.GetBuildID() stageID = tiCfg.GetStageID() + logrus.WithFields(logrus.Fields{ + "accountID": accountID, "orgID": orgID, "projectID": projectID, + "pipelineID": pipelineID, "buildID": buildID, "stageID": stageID, + }).Info("getLogStreamWriter: TIConfig metadata extracted for dual logging") + } else { + logrus.Warn("getLogStreamWriter: TIConfig or TIConfig.Client is nil, metadata may be incomplete") } + + harnessExecID := os.Getenv("HARNESS_EXECUTION_ID") + extractedStepID := duallog.ExtractStepID(r.LogKey) + logrus.WithFields(logrus.Fields{ + "HARNESS_EXECUTION_ID": harnessExecID, + "extractedStepID": extractedStepID, + "logKey": r.LogKey, + "taskID": r.StepStatus.TaskID, + }).Info("getLogStreamWriter: building dual log meta") + meta := duallog.NewMetaFromTIConfig( accountID, orgID, projectID, pipelineID, buildID, - os.Getenv("HARNESS_EXECUTION_ID"), + harnessExecID, stageID, - duallog.ExtractStepID(r.LogKey), + extractedStepID, r.StepStatus.TaskID, ) wc.SetDualLogConfig(meta, "LITE_ENGINE_STEP_LOGS") + + logrus.WithFields(logrus.Fields{ + "accountID": meta.AccountID, "orgID": meta.OrgID, "projectID": meta.ProjectID, + "pipelineID": meta.PipelineID, "runSequence": meta.RunSequence, + "planExecutionID": meta.PlanExecutionID, "stageIdentifier": meta.StageIdentifier, + "stepIdentifier": meta.StepIdentifier, "taskID": meta.TaskID, + }).Info("getLogStreamWriter: dual log meta configured on livelog writer") + } else { + logrus.WithField("log_key", r.LogKey).Info("getLogStreamWriter: dual logging is NOT enabled for this execution") } wr := logstream.NewReplacerWithEnvs(wc, secrets, r.Envs) diff --git a/pipeline/state.go b/pipeline/state.go index a7dab51c..4e40e272 100644 --- a/pipeline/state.go +++ b/pipeline/state.go @@ -15,6 +15,7 @@ import ( "github.com/harness/lite-engine/logstream/filestore" "github.com/harness/lite-engine/logstream/remote" "github.com/harness/lite-engine/osstats" + "github.com/sirupsen/logrus" tiCfg "github.com/harness/lite-engine/ti/config" ) @@ -72,6 +73,14 @@ func (s *State) Set(secrets []string, logConfig api.LogConfig, tiConfig tiCfg.Cf s.tiConfig = tiConfig s.mtlsConfig = mtlsConfig s.statsCollector = collector + + logrus.WithFields(logrus.Fields{ + "dualLoggingEnabled": logConfig.DualLoggingEnabled, + "logConfigURL": logConfig.URL, + "logConfigAccountID": logConfig.AccountID, + "trimNewLineSuffix": logConfig.TrimNewLineSuffix, + "secretsCount": len(secrets), + }).Info("pipeline.State.Set: pipeline state initialized with log config") } func (s *State) GetSecrets() []string { From 7830d307b837793d637ce54b0baaa29cac70a0ad Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Fri, 20 Mar 2026 13:42:33 -0700 Subject: [PATCH 03/14] feat:[CI-21635]: Added fix for flattening all logs --- duallog/duallog.go | 16 ++++++++++------ handler/setup.go | 33 ++++++++++++++++++++++++++++++++ logger/duallog_hook.go | 43 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 6 deletions(-) create mode 100644 logger/duallog_hook.go diff --git a/duallog/duallog.go b/duallog/duallog.go index a2a7e85b..86553946 100644 --- a/duallog/duallog.go +++ b/duallog/duallog.go @@ -63,12 +63,18 @@ func ExtractStepID(logKey string) string { return stepID } -// EmitLine writes a single flat-JSON log line to os.Stdout. +// EmitLine writes a single flat-JSON log line to os.Stdout with level "INFO". // It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. func EmitLine(meta *Meta, message string, ts time.Time, logType string) { + EmitLineWithLevel(meta, message, ts, logType, "INFO") +} + +// EmitLineWithLevel writes a single flat-JSON log line to os.Stdout with the specified level. +// It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. +func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType string, level string) { payload := map[string]interface{}{ "timestamp": float64(ts.UnixNano()) / 1e9, - "level": "INFO", + "level": level, "message": message, "logType": logType, "log_source": "streaming", @@ -90,10 +96,8 @@ func EmitLine(meta *Meta, message string, ts time.Time, logType string) { } jsonBytes, err := json.Marshal(payload) if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, "logType": logType, "accountId": meta.AccountID, - "stepIdentifier": meta.StepIdentifier, - }).Error("duallog: EmitLine failed to marshal JSON payload") + fmt.Fprintf(os.Stderr, "duallog: EmitLineWithLevel failed to marshal JSON: %v (logType=%s, accountId=%s)\n", + err, logType, meta.AccountID) return } fmt.Fprintln(os.Stdout, string(jsonBytes)) diff --git a/handler/setup.go b/handler/setup.go index 45a120c9..e0f1b089 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -16,6 +16,7 @@ import ( "time" "github.com/harness/lite-engine/api" + "github.com/harness/lite-engine/duallog" "github.com/harness/lite-engine/engine" "github.com/harness/lite-engine/engine/spec" "github.com/harness/lite-engine/livelog" @@ -92,6 +93,10 @@ func HandleSetup(engine *engine.Engine) http.HandlerFunc { Warnln("api: failed to initialize lite-engine log streaming") } + if s.LogConfig.DualLoggingEnabled { + initializeDualLogHook(&s) + } + // Initialize OS stats NDJSON streaming (file + upload) if MemoryMetricsLogKey is provided if err := initializeOSStatsStreaming(&s, state); err != nil { logger.FromRequest(r). @@ -288,6 +293,34 @@ func initializeOSStatsStreaming(setupReq *api.SetupRequest, state *pipeline.Stat return nil } +// initializeDualLogHook adds a logrus hook that emits each lite-engine internal log entry +// as flat JSON to stdout for OTel collection, when dual logging is enabled. +func initializeDualLogHook(setupReq *api.SetupRequest) { + ti := &setupReq.TIConfig + taskID := "" + if setupReq.Envs != nil { + taskID = setupReq.Envs["HARNESS_DELEGATE_TASK_ID"] + } + + planExecID := "" + if setupReq.Envs != nil { + if v, ok := setupReq.Envs["HARNESS_EXECUTION_ID"]; ok { + planExecID = v + } + } + + meta := duallog.NewMetaFromTIConfig( + ti.AccountID, ti.OrgID, ti.ProjectID, ti.PipelineID, + ti.BuildID, planExecID, ti.StageID, "lite-engine", taskID, + ) + logrus.AddHook(logger.NewDualLogHook(meta, "LITE_ENGINE_LOGS")) + + logger.L.WithFields(logrus.Fields{ + "accountId": ti.AccountID, "pipelineId": ti.PipelineID, + "stageId": ti.StageID, "taskId": taskID, + }).Infoln("api: initialized dual log hook for lite-engine internal logs") +} + // syncSystemClock forces chrony to step the system clock if there is significant drift. // This fixes clock skew on ARM64 VMs after GCP hibernate resume, where the arch_sys_counter // clock source doesn't auto-adjust (unlike x86's kvm-clock). Without this, chrony may diff --git a/logger/duallog_hook.go b/logger/duallog_hook.go new file mode 100644 index 00000000..b291091c --- /dev/null +++ b/logger/duallog_hook.go @@ -0,0 +1,43 @@ +// Copyright 2024 Harness Inc. All rights reserved. +// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license +// that can be found in the licenses directory at the root of this repository, also available at +// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt. + +package logger + +import ( + "strings" + + "github.com/harness/lite-engine/duallog" + "github.com/sirupsen/logrus" +) + +// DualLogHook is a logrus hook that emits each log entry as flat JSON to stdout +// via duallog.EmitLine for OTel collection. It uses fmt.Fprintln(os.Stdout, ...) +// internally (via EmitLine) so there is no recursion risk with logrus. +type DualLogHook struct { + meta *duallog.Meta + logType string +} + +// NewDualLogHook creates a DualLogHook that will emit JSON logs to stdout. +func NewDualLogHook(meta *duallog.Meta, logType string) *DualLogHook { + return &DualLogHook{ + meta: meta, + logType: logType, + } +} + +// Levels returns all log levels so every logrus entry is captured. +func (h *DualLogHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +// Fire is called for each logrus entry. It formats the entry and emits +// a flat JSON line via duallog.EmitLine. +func (h *DualLogHook) Fire(entry *logrus.Entry) error { + msg := formatLogEntry(entry) + level := strings.ToUpper(entry.Level.String()) + duallog.EmitLineWithLevel(h.meta, msg, entry.Time, h.logType, level) + return nil +} From 927944f35241919d2c143c7d49a8c2d20f571fe0 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Fri, 20 Mar 2026 19:05:20 -0700 Subject: [PATCH 04/14] feat:[CI-21635]: Removed unnecessary logs --- duallog/duallog.go | 23 +++++------------------ livelog/livelog.go | 7 ------- pipeline/runtime/step_executor.go | 29 +++-------------------------- 3 files changed, 8 insertions(+), 51 deletions(-) diff --git a/duallog/duallog.go b/duallog/duallog.go index 86553946..098a44bf 100644 --- a/duallog/duallog.go +++ b/duallog/duallog.go @@ -11,8 +11,6 @@ import ( "os" "strings" "time" - - "github.com/sirupsen/logrus" ) // Meta holds metadata fields for dual-log JSON payloads. @@ -30,11 +28,6 @@ type Meta struct { // NewMetaFromTIConfig constructs a Meta from TI config fields and other sources. func NewMetaFromTIConfig(accountID, orgID, projectID, pipelineID, buildID, planExecID, stageID, stepID, taskID string) *Meta { - logrus.WithFields(logrus.Fields{ - "accountID": accountID, "orgID": orgID, "projectID": projectID, - "pipelineID": pipelineID, "buildID": buildID, "planExecID": planExecID, - "stageID": stageID, "stepID": stepID, "taskID": taskID, - }).Info("duallog: NewMetaFromTIConfig called") return &Meta{ AccountID: accountID, OrgID: orgID, @@ -52,15 +45,10 @@ func NewMetaFromTIConfig(accountID, orgID, projectID, pipelineID, buildID, planE // accountId/orgId/projectId/pipelineId/runSequence/stageId/stepId. func ExtractStepID(logKey string) string { if logKey == "" { - logrus.Warn("duallog: ExtractStepID called with empty logKey") return "" } parts := strings.Split(logKey, "/") - stepID := parts[len(parts)-1] - logrus.WithFields(logrus.Fields{ - "logKey": logKey, "extractedStepID": stepID, "partsCount": len(parts), - }).Info("duallog: ExtractStepID result") - return stepID + return parts[len(parts)-1] } // EmitLine writes a single flat-JSON log line to os.Stdout with level "INFO". @@ -73,11 +61,10 @@ func EmitLine(meta *Meta, message string, ts time.Time, logType string) { // It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType string, level string) { payload := map[string]interface{}{ - "timestamp": float64(ts.UnixNano()) / 1e9, - "level": level, - "message": message, - "logType": logType, - "log_source": "streaming", + "timestamp": float64(ts.UnixNano()) / 1e9, + "level": level, + "message": message, + "logType": "EXECUTION_LOGS", "logAbstractions": map[string]string{ "accountId": meta.AccountID, "orgId": meta.OrgID, diff --git a/livelog/livelog.go b/livelog/livelog.go index bdcc45ad..e729cda4 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -107,13 +107,6 @@ func (b *Writer) SetInterval(interval time.Duration) { func (b *Writer) SetDualLogConfig(meta *duallog.Meta, logType string) { b.dualLogMeta = meta b.dualLogType = logType - logrus.WithFields(logrus.Fields{ - "logType": logType, "logKey": b.key, "logSource": b.name, - "accountId": meta.AccountID, "orgId": meta.OrgID, "projectId": meta.ProjectID, - "pipelineId": meta.PipelineID, "runSequence": meta.RunSequence, - "planExecutionId": meta.PlanExecutionID, "stageIdentifier": meta.StageIdentifier, - "stepIdentifier": meta.StepIdentifier, "taskId": meta.TaskID, - }).Info("livelog: dual log config set on Writer") } // Write uploads the live log stream to the server. diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index 10160fbc..61f747c5 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -500,13 +500,6 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { r.LogConfig.SkipOpeningStream, r.LogConfig.SkipClosingStream) - logrus.WithFields(logrus.Fields{ - "dual_logging_enabled": pipelineState.GetLogConfig().DualLoggingEnabled, - "log_key": r.LogKey, - "step_name": r.Name, - "task_id": r.StepStatus.TaskID, - }).Info("getLogStreamWriter: checking dual logging config") - if pipelineState.GetLogConfig().DualLoggingEnabled { tiCfg := pipelineState.GetTIConfig() var accountID, orgID, projectID, pipelineID, buildID, stageID string @@ -517,22 +510,12 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { pipelineID = tiCfg.GetPipelineID() buildID = tiCfg.GetBuildID() stageID = tiCfg.GetStageID() - logrus.WithFields(logrus.Fields{ - "accountID": accountID, "orgID": orgID, "projectID": projectID, - "pipelineID": pipelineID, "buildID": buildID, "stageID": stageID, - }).Info("getLogStreamWriter: TIConfig metadata extracted for dual logging") } else { - logrus.Warn("getLogStreamWriter: TIConfig or TIConfig.Client is nil, metadata may be incomplete") + logrus.Warn("getLogStreamWriter: TIConfig or TIConfig.Client is nil, dual log metadata may be incomplete") } harnessExecID := os.Getenv("HARNESS_EXECUTION_ID") extractedStepID := duallog.ExtractStepID(r.LogKey) - logrus.WithFields(logrus.Fields{ - "HARNESS_EXECUTION_ID": harnessExecID, - "extractedStepID": extractedStepID, - "logKey": r.LogKey, - "taskID": r.StepStatus.TaskID, - }).Info("getLogStreamWriter: building dual log meta") meta := duallog.NewMetaFromTIConfig( accountID, orgID, projectID, @@ -543,15 +526,9 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { r.StepStatus.TaskID, ) wc.SetDualLogConfig(meta, "LITE_ENGINE_STEP_LOGS") - logrus.WithFields(logrus.Fields{ - "accountID": meta.AccountID, "orgID": meta.OrgID, "projectID": meta.ProjectID, - "pipelineID": meta.PipelineID, "runSequence": meta.RunSequence, - "planExecutionID": meta.PlanExecutionID, "stageIdentifier": meta.StageIdentifier, - "stepIdentifier": meta.StepIdentifier, "taskID": meta.TaskID, - }).Info("getLogStreamWriter: dual log meta configured on livelog writer") - } else { - logrus.WithField("log_key", r.LogKey).Info("getLogStreamWriter: dual logging is NOT enabled for this execution") + "accountId": accountID, "stepName": r.Name, "logKey": r.LogKey, + }).Info("getLogStreamWriter: dual logging enabled for step") } wr := logstream.NewReplacerWithEnvs(wc, secrets, r.Envs) From 03456ef2b7d266037818a314ccac7d38e1645c37 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Mon, 23 Mar 2026 14:43:50 -0700 Subject: [PATCH 05/14] feat:[CI-21635]: Removed dead code, unnecessary changes and reviewed comments --- duallog/duallog.go | 13 +---------- duallog/duallog_test.go | 29 ------------------------ handler/setup.go | 2 +- pipeline/runtime/step_executor.go | 37 +++++++++---------------------- 4 files changed, 13 insertions(+), 68 deletions(-) diff --git a/duallog/duallog.go b/duallog/duallog.go index 098a44bf..8657fa7b 100644 --- a/duallog/duallog.go +++ b/duallog/duallog.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "os" - "strings" "time" ) @@ -41,16 +40,6 @@ func NewMetaFromTIConfig(accountID, orgID, projectID, pipelineID, buildID, planE } } -// ExtractStepID extracts the last segment from a logKey of the form -// accountId/orgId/projectId/pipelineId/runSequence/stageId/stepId. -func ExtractStepID(logKey string) string { - if logKey == "" { - return "" - } - parts := strings.Split(logKey, "/") - return parts[len(parts)-1] -} - // EmitLine writes a single flat-JSON log line to os.Stdout with level "INFO". // It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. func EmitLine(meta *Meta, message string, ts time.Time, logType string) { @@ -64,7 +53,7 @@ func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType string, "timestamp": float64(ts.UnixNano()) / 1e9, "level": level, "message": message, - "logType": "EXECUTION_LOGS", + "logType": logType, "logAbstractions": map[string]string{ "accountId": meta.AccountID, "orgId": meta.OrgID, diff --git a/duallog/duallog_test.go b/duallog/duallog_test.go index 5460cdcf..7a340e8e 100644 --- a/duallog/duallog_test.go +++ b/duallog/duallog_test.go @@ -14,29 +14,6 @@ import ( "time" ) -func TestExtractStepID(t *testing.T) { - tests := []struct { - name string - logKey string - want string - }{ - {"full key", "acct/org/proj/pipe/1/stage/step1", "step1"}, - {"single segment", "stepOnly", "stepOnly"}, - {"empty", "", ""}, - {"two segments", "prefix/stepId", "stepId"}, - {"trailing slash stripped", "a/b/c", "c"}, - {"simplified format", "h61p38AZSV6MzEkpWWBtew/pipeline/k8s0tes/2/-1xGcOrSDS/ci/addon:20004", "addon:20004"}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := ExtractStepID(tt.logKey) - if got != tt.want { - t.Errorf("ExtractStepID(%q) = %q, want %q", tt.logKey, got, tt.want) - } - }) - } -} - func TestNewMetaFromTIConfig(t *testing.T) { m := NewMetaFromTIConfig("acc1", "org1", "proj1", "pipe1", "42", "exec1", "stage1", "step1", "task1") if m.AccountID != "acc1" || m.OrgID != "org1" || m.ProjectID != "proj1" || @@ -89,9 +66,6 @@ func TestEmitLine(t *testing.T) { if parsed["logType"] != "LITE_ENGINE_STEP_LOGS" { t.Errorf("expected logType 'LITE_ENGINE_STEP_LOGS', got %v", parsed["logType"]) } - if parsed["log_source"] != "streaming" { - t.Errorf("expected log_source 'streaming', got %v", parsed["log_source"]) - } abs, ok := parsed["logAbstractions"].(map[string]interface{}) if !ok { @@ -140,9 +114,6 @@ func TestEmitLineNoTaskID(t *testing.T) { if _, exists := parsed["logContext"]; exists { t.Error("logContext should not be present when taskId is empty") } - if parsed["log_source"] != "streaming" { - t.Errorf("expected log_source 'streaming', got %v", parsed["log_source"]) - } } func TestEmitLineNilMetaDoesNotPanic(t *testing.T) { diff --git a/handler/setup.go b/handler/setup.go index e0f1b089..d5851b5d 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -313,7 +313,7 @@ func initializeDualLogHook(setupReq *api.SetupRequest) { ti.AccountID, ti.OrgID, ti.ProjectID, ti.PipelineID, ti.BuildID, planExecID, ti.StageID, "lite-engine", taskID, ) - logrus.AddHook(logger.NewDualLogHook(meta, "LITE_ENGINE_LOGS")) + logrus.AddHook(logger.NewDualLogHook(meta, "EXECUTION_LOGS")) logger.L.WithFields(logrus.Fields{ "accountId": ti.AccountID, "pipelineId": ti.PipelineID, diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index 61f747c5..3f26a553 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -502,33 +502,18 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { if pipelineState.GetLogConfig().DualLoggingEnabled { tiCfg := pipelineState.GetTIConfig() - var accountID, orgID, projectID, pipelineID, buildID, stageID string - if tiCfg != nil && tiCfg.GetClient() != nil { - accountID = tiCfg.GetAccountID() - orgID = tiCfg.GetOrgID() - projectID = tiCfg.GetProjectID() - pipelineID = tiCfg.GetPipelineID() - buildID = tiCfg.GetBuildID() - stageID = tiCfg.GetStageID() - } else { - logrus.Warn("getLogStreamWriter: TIConfig or TIConfig.Client is nil, dual log metadata may be incomplete") + meta := &duallog.Meta{ + AccountID: pipelineState.GetLogConfig().AccountID, + OrgID: tiCfg.GetOrgID(), + ProjectID: tiCfg.GetProjectID(), + PipelineID: tiCfg.GetPipelineID(), + RunSequence: tiCfg.GetBuildID(), + PlanExecutionID: os.Getenv("HARNESS_EXECUTION_ID"), + StageIdentifier: tiCfg.GetStageID(), + StepIdentifier: r.Name, + TaskID: r.StepStatus.TaskID, } - - harnessExecID := os.Getenv("HARNESS_EXECUTION_ID") - extractedStepID := duallog.ExtractStepID(r.LogKey) - - meta := duallog.NewMetaFromTIConfig( - accountID, orgID, projectID, - pipelineID, buildID, - harnessExecID, - stageID, - extractedStepID, - r.StepStatus.TaskID, - ) - wc.SetDualLogConfig(meta, "LITE_ENGINE_STEP_LOGS") - logrus.WithFields(logrus.Fields{ - "accountId": accountID, "stepName": r.Name, "logKey": r.LogKey, - }).Info("getLogStreamWriter: dual logging enabled for step") + wc.SetDualLogConfig(meta, "EXECUTION_LOGS") } wr := logstream.NewReplacerWithEnvs(wc, secrets, r.Envs) From d3956390f96d7b5978a1bf65a77228aabc2c9f43 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Mon, 23 Mar 2026 15:03:14 -0700 Subject: [PATCH 06/14] feat:[CI-21635]: Fixed lint issues --- duallog/duallog.go | 5 +++-- duallog/duallog_test.go | 6 +++--- pipeline/state.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/duallog/duallog.go b/duallog/duallog.go index 8657fa7b..4cb89554 100644 --- a/duallog/duallog.go +++ b/duallog/duallog.go @@ -48,9 +48,10 @@ func EmitLine(meta *Meta, message string, ts time.Time, logType string) { // EmitLineWithLevel writes a single flat-JSON log line to os.Stdout with the specified level. // It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. -func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType string, level string) { +func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType, level string) { + const nanosecondsPerSecond = 1e9 payload := map[string]interface{}{ - "timestamp": float64(ts.UnixNano()) / 1e9, + "timestamp": float64(ts.UnixNano()) / nanosecondsPerSecond, "level": level, "message": message, "logType": logType, diff --git a/duallog/duallog_test.go b/duallog/duallog_test.go index 7a340e8e..d62ecd64 100644 --- a/duallog/duallog_test.go +++ b/duallog/duallog_test.go @@ -44,7 +44,7 @@ func TestEmitLine(t *testing.T) { w.Close() var buf bytes.Buffer - io.Copy(&buf, r) + _, _ = io.Copy(&buf, r) os.Stdout = oldStdout line := buf.String() @@ -104,7 +104,7 @@ func TestEmitLineNoTaskID(t *testing.T) { w.Close() var buf bytes.Buffer - io.Copy(&buf, r) + _, _ = io.Copy(&buf, r) os.Stdout = oldStdout var parsed map[string]interface{} @@ -126,7 +126,7 @@ func TestEmitLineNilMetaDoesNotPanic(t *testing.T) { w.Close() var buf bytes.Buffer - io.Copy(&buf, r) + _, _ = io.Copy(&buf, r) os.Stdout = oldStdout if buf.Len() == 0 { diff --git a/pipeline/state.go b/pipeline/state.go index 4e40e272..d6fee084 100644 --- a/pipeline/state.go +++ b/pipeline/state.go @@ -15,8 +15,8 @@ import ( "github.com/harness/lite-engine/logstream/filestore" "github.com/harness/lite-engine/logstream/remote" "github.com/harness/lite-engine/osstats" - "github.com/sirupsen/logrus" tiCfg "github.com/harness/lite-engine/ti/config" + "github.com/sirupsen/logrus" ) var ( From 1081a9a5b3a61c90cfb33ab02fa85fa370eb3551 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Mon, 23 Mar 2026 15:55:41 -0700 Subject: [PATCH 07/14] feat:[CI-21635]: Dummy commit --- pipeline/runtime/step_executor_stateless.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline/runtime/step_executor_stateless.go b/pipeline/runtime/step_executor_stateless.go index 61ba4ded..9e9346b8 100644 --- a/pipeline/runtime/step_executor_stateless.go +++ b/pipeline/runtime/step_executor_stateless.go @@ -77,7 +77,7 @@ func (e *StepExecutorStateless) executeStep( //nolint:gocritic runFunc := func(ctx context.Context, step *spec.Step, output io.Writer, isDrone bool, isHosted bool) (*runtime.State, error) { return engine.RunStep(ctx, engine.Opts{Opts: docker.Opts{DockerClient: dockerClient}}, step, output, cfg, isDrone, isHosted) } - // Temporary: this should be removed once we have a better way of handling test intelligence. + // Temporary: this should be removed once we have a better way of handling test intelligence. tiConfig := getTiCfg(&r.TIConfig, &r.MtlsConfig, r.Envs) r.DeleteTempStepFiles = true From 0567314d641164ebdb2a981ee13eaf8034603c1b Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Wed, 25 Mar 2026 14:51:43 -0700 Subject: [PATCH 08/14] feat:[CI-21635]: Reviewed comments --- duallog/duallog.go | 4 ++-- duallog/duallog_test.go | 6 +++--- handler/setup.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/duallog/duallog.go b/duallog/duallog.go index 4cb89554..1facb757 100644 --- a/duallog/duallog.go +++ b/duallog/duallog.go @@ -25,8 +25,8 @@ type Meta struct { TaskID string } -// NewMetaFromTIConfig constructs a Meta from TI config fields and other sources. -func NewMetaFromTIConfig(accountID, orgID, projectID, pipelineID, buildID, planExecID, stageID, stepID, taskID string) *Meta { +// NewMetaConfig constructs a Meta with the given pipeline context fields. +func NewMetaConfig(accountID, orgID, projectID, pipelineID, buildID, planExecID, stageID, stepID, taskID string) *Meta { return &Meta{ AccountID: accountID, OrgID: orgID, diff --git a/duallog/duallog_test.go b/duallog/duallog_test.go index d62ecd64..87bea096 100644 --- a/duallog/duallog_test.go +++ b/duallog/duallog_test.go @@ -14,12 +14,12 @@ import ( "time" ) -func TestNewMetaFromTIConfig(t *testing.T) { - m := NewMetaFromTIConfig("acc1", "org1", "proj1", "pipe1", "42", "exec1", "stage1", "step1", "task1") +func TestNewMetaConfig(t *testing.T) { + m := NewMetaConfig("acc1", "org1", "proj1", "pipe1", "42", "exec1", "stage1", "step1", "task1") if m.AccountID != "acc1" || m.OrgID != "org1" || m.ProjectID != "proj1" || m.PipelineID != "pipe1" || m.RunSequence != "42" || m.PlanExecutionID != "exec1" || m.StageIdentifier != "stage1" || m.StepIdentifier != "step1" || m.TaskID != "task1" { - t.Errorf("NewMetaFromTIConfig did not populate fields correctly: %+v", m) + t.Errorf("NewMetaConfig did not populate fields correctly: %+v", m) } } diff --git a/handler/setup.go b/handler/setup.go index d5851b5d..66ca29b5 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -309,7 +309,7 @@ func initializeDualLogHook(setupReq *api.SetupRequest) { } } - meta := duallog.NewMetaFromTIConfig( + meta := duallog.NewMetaConfig( ti.AccountID, ti.OrgID, ti.ProjectID, ti.PipelineID, ti.BuildID, planExecID, ti.StageID, "lite-engine", taskID, ) From d7d0982576a4919a16540d91512a0ae6e7ca7874 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Thu, 26 Mar 2026 12:50:40 -0700 Subject: [PATCH 09/14] feat:[CI-21635]: Added env to read the dual logging config --- handler/setup.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/handler/setup.go b/handler/setup.go index 66ca29b5..dd603238 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -82,6 +82,11 @@ func HandleSetup(engine *engine.Engine) http.HandlerFunc { collector := osstats.New(context.Background(), statsInterval, logProcess) setProxyEnvs(s.Envs) + + if val, ok := s.Envs["HARNESS_CI_DUAL_LOGGING"]; ok && val == "true" { + s.LogConfig.DualLoggingEnabled = true + } + state := pipeline.GetState() state.Set(s.Secrets, s.LogConfig, getTiCfg(&s.TIConfig, &s.MtlsConfig, s.Envs), s.MtlsConfig, collector) From e32553ddc91f4112915d33e248977c36a2c410a6 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Thu, 26 Mar 2026 14:42:37 -0700 Subject: [PATCH 10/14] feat:[CI-21635]: Updated time format --- duallog/duallog.go | 3 +-- duallog/duallog_test.go | 10 +++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/duallog/duallog.go b/duallog/duallog.go index 1facb757..52d5c41f 100644 --- a/duallog/duallog.go +++ b/duallog/duallog.go @@ -49,9 +49,8 @@ func EmitLine(meta *Meta, message string, ts time.Time, logType string) { // EmitLineWithLevel writes a single flat-JSON log line to os.Stdout with the specified level. // It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops. func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType, level string) { - const nanosecondsPerSecond = 1e9 payload := map[string]interface{}{ - "timestamp": float64(ts.UnixNano()) / nanosecondsPerSecond, + "timestamp": ts.UTC().Format(time.RFC3339Nano), "level": level, "message": message, "logType": logType, diff --git a/duallog/duallog_test.go b/duallog/duallog_test.go index 87bea096..c6515a6d 100644 --- a/duallog/duallog_test.go +++ b/duallog/duallog_test.go @@ -86,9 +86,13 @@ func TestEmitLine(t *testing.T) { t.Errorf("expected taskId 'task-abc-DEL', got %v", logCtx["taskId"]) } - ts64, ok := parsed["timestamp"].(float64) - if !ok || ts64 <= 0 { - t.Errorf("expected positive timestamp, got %v", parsed["timestamp"]) + tsStr, ok := parsed["timestamp"].(string) + if !ok || tsStr == "" { + t.Fatalf("expected non-empty timestamp string, got %v", parsed["timestamp"]) + } + expectedTS := ts.UTC().Format(time.RFC3339Nano) + if tsStr != expectedTS { + t.Errorf("expected timestamp %q, got %q", expectedTS, tsStr) } } From 2930ddd88caad5380b8ddf739a3a32512121c9c3 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Thu, 26 Mar 2026 15:22:38 -0700 Subject: [PATCH 11/14] feat:[CI-21635]: Fixed lint issue --- handler/setup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler/setup.go b/handler/setup.go index dd603238..2cfbd4cb 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -65,7 +65,7 @@ func GetNetrcFile(env map[string]string) (*spec.File, error) { } // HandleExecuteStep returns an http.HandlerFunc that executes a step -func HandleSetup(engine *engine.Engine) http.HandlerFunc { +func HandleSetup(engine *engine.Engine) http.HandlerFunc { //nolint:gocyclo return func(w http.ResponseWriter, r *http.Request) { st := time.Now() From 0ae7e314faa83a267b4c2b6f24ff1839450d98a1 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Fri, 27 Mar 2026 11:28:37 -0700 Subject: [PATCH 12/14] feat:[CI-21635]: Populated fields from envs and config used --- handler/setup.go | 24 ++++++++++++++++-------- pipeline/runtime/step_executor.go | 6 +++++- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/handler/setup.go b/handler/setup.go index 2cfbd4cb..8fdb99b8 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -33,7 +33,10 @@ var ( harnessEnableDebugLogs = "HARNESS_ENABLE_DEBUG_LOGS" ) -const OSWindows = "windows" +const ( + OSWindows = "windows" + dualLoggingEnvVar = "HARNESS_LOG_STREAMING_STDOUT_ENABLED" +) func GetNetrc(os string) string { switch os { @@ -82,8 +85,9 @@ func HandleSetup(engine *engine.Engine) http.HandlerFunc { //nolint:gocyclo collector := osstats.New(context.Background(), statsInterval, logProcess) setProxyEnvs(s.Envs) + setHarnessEnvs(s.Envs) - if val, ok := s.Envs["HARNESS_CI_DUAL_LOGGING"]; ok && val == "true" { + if val, ok := s.Envs[dualLoggingEnvVar]; ok && val == "true" { s.LogConfig.DualLoggingEnabled = true } @@ -298,6 +302,15 @@ func initializeOSStatsStreaming(setupReq *api.SetupRequest, state *pipeline.Stat return nil } +func setHarnessEnvs(environment map[string]string) { + harnessEnvs := []string{"HARNESS_EXECUTION_ID", "HARNESS_DELEGATE_TASK_ID"} + for _, v := range harnessEnvs { + if val, ok := environment[v]; ok && val != "" { + os.Setenv(v, val) + } + } +} + // initializeDualLogHook adds a logrus hook that emits each lite-engine internal log entry // as flat JSON to stdout for OTel collection, when dual logging is enabled. func initializeDualLogHook(setupReq *api.SetupRequest) { @@ -307,12 +320,7 @@ func initializeDualLogHook(setupReq *api.SetupRequest) { taskID = setupReq.Envs["HARNESS_DELEGATE_TASK_ID"] } - planExecID := "" - if setupReq.Envs != nil { - if v, ok := setupReq.Envs["HARNESS_EXECUTION_ID"]; ok { - planExecID = v - } - } + planExecID := os.Getenv("HARNESS_EXECUTION_ID") meta := duallog.NewMetaConfig( ti.AccountID, ti.OrgID, ti.ProjectID, ti.PipelineID, diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index 3f26a553..bdf29699 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -502,6 +502,10 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { if pipelineState.GetLogConfig().DualLoggingEnabled { tiCfg := pipelineState.GetTIConfig() + taskID := r.StepStatus.TaskID + if taskID == "" { + taskID = os.Getenv("HARNESS_DELEGATE_TASK_ID") + } meta := &duallog.Meta{ AccountID: pipelineState.GetLogConfig().AccountID, OrgID: tiCfg.GetOrgID(), @@ -511,7 +515,7 @@ func getLogStreamWriter(r *api.StartStepRequest) logstream.Writer { PlanExecutionID: os.Getenv("HARNESS_EXECUTION_ID"), StageIdentifier: tiCfg.GetStageID(), StepIdentifier: r.Name, - TaskID: r.StepStatus.TaskID, + TaskID: taskID, } wc.SetDualLogConfig(meta, "EXECUTION_LOGS") } From 619029625e2212069be6730f10ad36329e820c4a Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Fri, 27 Mar 2026 12:01:27 -0700 Subject: [PATCH 13/14] feat:[CI-21635]: Dummy commit --- pipeline/runtime/step_executor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index bdf29699..41410db7 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -359,6 +359,7 @@ func (e *StepExecutor) executeStep(r *api.StartStepRequest, wr logstream.Writer) state, err := e.executeStepDrone(r) return state, nil, nil, nil, nil, nil, "", err } + // First try to get TI Config from pipeline state, if empty then use the one from step request var tiConfig *tiCfg.Cfg state := pipeline.GetState() From 4dd2364143c260a3817c2fd0de759198f5261687 Mon Sep 17 00:00:00 2001 From: Dhiraj Chhawchharia Date: Fri, 27 Mar 2026 13:09:34 -0700 Subject: [PATCH 14/14] feat:[CI-21635]: Fixed lint issues --- handler/setup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler/setup.go b/handler/setup.go index 8fdb99b8..8bd63f03 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -34,7 +34,7 @@ var ( ) const ( - OSWindows = "windows" + OSWindows = "windows" dualLoggingEnvVar = "HARNESS_LOG_STREAMING_STDOUT_ENABLED" )