Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ type (
// There is a limit of 5k lines for log-service's snapshot, so this parameter should NOT
// be used in cases where more than 5k lines of logs are written by the logger. Otherwise,
// the final logs blob may have missing logs.
SkipOpeningStream bool `json:"skip_opening_stream,omitempty"`
SkipClosingStream bool `json:"skip_closing_stream,omitempty"`
SkipOpeningStream bool `json:"skip_opening_stream,omitempty"`
SkipClosingStream bool `json:"skip_closing_stream,omitempty"`
DualLoggingEnabled bool `json:"dual_logging_enabled,omitempty"`
}

TIConfig struct {
Expand Down
80 changes: 80 additions & 0 deletions duallog/duallog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 Harness Inc. All rights reserved.
// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license
// that can be found in the licenses directory at the root of this repository, also available at
// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt.

package duallog

import (
"encoding/json"
"fmt"
"os"
"time"
)

// Meta holds metadata fields for dual-log JSON payloads.
type Meta struct {
AccountID string
OrgID string
ProjectID string
PipelineID string
RunSequence string
PlanExecutionID string
StageIdentifier string
StepIdentifier string
TaskID string
}

// NewMetaConfig constructs a Meta with the given pipeline context fields.
func NewMetaConfig(accountID, orgID, projectID, pipelineID, buildID, planExecID, stageID, stepID, taskID string) *Meta {
return &Meta{
AccountID: accountID,
OrgID: orgID,
ProjectID: projectID,
PipelineID: pipelineID,
RunSequence: buildID,
PlanExecutionID: planExecID,
StageIdentifier: stageID,
StepIdentifier: stepID,
TaskID: taskID,
}
}

// EmitLine writes a single flat-JSON log line to os.Stdout with level "INFO".
// It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops.
func EmitLine(meta *Meta, message string, ts time.Time, logType string) {
EmitLineWithLevel(meta, message, ts, logType, "INFO")
}

// EmitLineWithLevel writes a single flat-JSON log line to os.Stdout with the specified level.
// It uses fmt.Fprintln (NOT logrus) to avoid re-ingestion loops.
func EmitLineWithLevel(meta *Meta, message string, ts time.Time, logType, level string) {
payload := map[string]interface{}{
"timestamp": ts.UTC().Format(time.RFC3339Nano),
"level": level,
"message": message,
"logType": logType,
"logAbstractions": map[string]string{
"accountId": meta.AccountID,
"orgId": meta.OrgID,
"projectId": meta.ProjectID,
"pipelineId": meta.PipelineID,
"runSequence": meta.RunSequence,
"planExecutionId": meta.PlanExecutionID,
"stageIdentifier": meta.StageIdentifier,
"stepIdentifier": meta.StepIdentifier,
},
}
if meta.TaskID != "" {
payload["logContext"] = map[string]string{
"taskId": meta.TaskID,
}
}
jsonBytes, err := json.Marshal(payload)
if err != nil {
fmt.Fprintf(os.Stderr, "duallog: EmitLineWithLevel failed to marshal JSON: %v (logType=%s, accountId=%s)\n",
err, logType, meta.AccountID)
return
}
fmt.Fprintln(os.Stdout, string(jsonBytes))
}
139 changes: 139 additions & 0 deletions duallog/duallog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2024 Harness Inc. All rights reserved.
// Use of this source code is governed by the PolyForm Free Trial 1.0.0 license
// that can be found in the licenses directory at the root of this repository, also available at
// https://polyformproject.org/wp-content/uploads/2020/05/PolyForm-Free-Trial-1.0.0.txt.

package duallog

import (
"bytes"
"encoding/json"
"io"
"os"
"testing"
"time"
)

func TestNewMetaConfig(t *testing.T) {
m := NewMetaConfig("acc1", "org1", "proj1", "pipe1", "42", "exec1", "stage1", "step1", "task1")
if m.AccountID != "acc1" || m.OrgID != "org1" || m.ProjectID != "proj1" ||
m.PipelineID != "pipe1" || m.RunSequence != "42" || m.PlanExecutionID != "exec1" ||
m.StageIdentifier != "stage1" || m.StepIdentifier != "step1" || m.TaskID != "task1" {
t.Errorf("NewMetaConfig did not populate fields correctly: %+v", m)
}
}

func TestEmitLine(t *testing.T) {
oldStdout := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w

meta := &Meta{
AccountID: "testAcct",
OrgID: "testOrg",
ProjectID: "testProj",
PipelineID: "testPipe",
RunSequence: "99",
PlanExecutionID: "exec123",
StageIdentifier: "stg1",
StepIdentifier: "stp1",
TaskID: "task-abc-DEL",
}
ts := time.Unix(1700000000, 500000000)
EmitLine(meta, "hello world", ts, "LITE_ENGINE_STEP_LOGS")

w.Close()
var buf bytes.Buffer
_, _ = io.Copy(&buf, r)
os.Stdout = oldStdout

line := buf.String()
if line == "" {
t.Fatal("EmitLine produced no output")
}

var parsed map[string]interface{}
if err := json.Unmarshal([]byte(line), &parsed); err != nil {
t.Fatalf("EmitLine output is not valid JSON: %v\nOutput: %s", err, line)
}

if parsed["message"] != "hello world" {
t.Errorf("expected message 'hello world', got %v", parsed["message"])
}
if parsed["level"] != "INFO" {
t.Errorf("expected level 'INFO', got %v", parsed["level"])
}
if parsed["logType"] != "LITE_ENGINE_STEP_LOGS" {
t.Errorf("expected logType 'LITE_ENGINE_STEP_LOGS', got %v", parsed["logType"])
}

abs, ok := parsed["logAbstractions"].(map[string]interface{})
if !ok {
t.Fatal("logAbstractions missing or not a map")
}
if abs["accountId"] != "testAcct" {
t.Errorf("expected accountId 'testAcct', got %v", abs["accountId"])
}
if abs["stepIdentifier"] != "stp1" {
t.Errorf("expected stepIdentifier 'stp1', got %v", abs["stepIdentifier"])
}

logCtx, ok := parsed["logContext"].(map[string]interface{})
if !ok {
t.Fatal("logContext missing or not a map")
}
if logCtx["taskId"] != "task-abc-DEL" {
t.Errorf("expected taskId 'task-abc-DEL', got %v", logCtx["taskId"])
}

tsStr, ok := parsed["timestamp"].(string)
if !ok || tsStr == "" {
t.Fatalf("expected non-empty timestamp string, got %v", parsed["timestamp"])
}
expectedTS := ts.UTC().Format(time.RFC3339Nano)
if tsStr != expectedTS {
t.Errorf("expected timestamp %q, got %q", expectedTS, tsStr)
}
}

func TestEmitLineNoTaskID(t *testing.T) {
oldStdout := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w

meta := &Meta{
AccountID: "testAcct",
}
EmitLine(meta, "msg", time.Now(), "TEST")

w.Close()
var buf bytes.Buffer
_, _ = io.Copy(&buf, r)
os.Stdout = oldStdout

var parsed map[string]interface{}
if err := json.Unmarshal(buf.Bytes(), &parsed); err != nil {
t.Fatalf("EmitLine output is not valid JSON: %v", err)
}
if _, exists := parsed["logContext"]; exists {
t.Error("logContext should not be present when taskId is empty")
}
}

func TestEmitLineNilMetaDoesNotPanic(t *testing.T) {
oldStdout := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w

meta := &Meta{}
EmitLine(meta, "msg", time.Now(), "TEST")

w.Close()
var buf bytes.Buffer
_, _ = io.Copy(&buf, r)
os.Stdout = oldStdout

if buf.Len() == 0 {
t.Error("expected output for empty meta")
}
}
50 changes: 48 additions & 2 deletions handler/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/harness/lite-engine/api"
"github.com/harness/lite-engine/duallog"
"github.com/harness/lite-engine/engine"
"github.com/harness/lite-engine/engine/spec"
"github.com/harness/lite-engine/livelog"
Expand All @@ -32,7 +33,10 @@ var (
harnessEnableDebugLogs = "HARNESS_ENABLE_DEBUG_LOGS"
)

const OSWindows = "windows"
const (
OSWindows = "windows"
dualLoggingEnvVar = "HARNESS_LOG_STREAMING_STDOUT_ENABLED"
)

func GetNetrc(os string) string {
switch os {
Expand Down Expand Up @@ -64,7 +68,7 @@ func GetNetrcFile(env map[string]string) (*spec.File, error) {
}

// HandleExecuteStep returns an http.HandlerFunc that executes a step
func HandleSetup(engine *engine.Engine) http.HandlerFunc {
func HandleSetup(engine *engine.Engine) http.HandlerFunc { //nolint:gocyclo
return func(w http.ResponseWriter, r *http.Request) {
st := time.Now()

Expand All @@ -81,6 +85,12 @@ func HandleSetup(engine *engine.Engine) http.HandlerFunc {
collector := osstats.New(context.Background(), statsInterval, logProcess)

setProxyEnvs(s.Envs)
setHarnessEnvs(s.Envs)

if val, ok := s.Envs[dualLoggingEnvVar]; ok && val == "true" {
s.LogConfig.DualLoggingEnabled = true
}

state := pipeline.GetState()
state.Set(s.Secrets, s.LogConfig, getTiCfg(&s.TIConfig, &s.MtlsConfig, s.Envs), s.MtlsConfig, collector)

Expand All @@ -92,6 +102,10 @@ func HandleSetup(engine *engine.Engine) http.HandlerFunc {
Warnln("api: failed to initialize lite-engine log streaming")
}

if s.LogConfig.DualLoggingEnabled {
initializeDualLogHook(&s)
}

// Initialize OS stats NDJSON streaming (file + upload) if MemoryMetricsLogKey is provided
if err := initializeOSStatsStreaming(&s, state); err != nil {
logger.FromRequest(r).
Expand Down Expand Up @@ -288,6 +302,38 @@ func initializeOSStatsStreaming(setupReq *api.SetupRequest, state *pipeline.Stat
return nil
}

func setHarnessEnvs(environment map[string]string) {
harnessEnvs := []string{"HARNESS_EXECUTION_ID", "HARNESS_DELEGATE_TASK_ID"}
for _, v := range harnessEnvs {
if val, ok := environment[v]; ok && val != "" {
os.Setenv(v, val)
}
}
}

// initializeDualLogHook adds a logrus hook that emits each lite-engine internal log entry
// as flat JSON to stdout for OTel collection, when dual logging is enabled.
func initializeDualLogHook(setupReq *api.SetupRequest) {
ti := &setupReq.TIConfig
taskID := ""
if setupReq.Envs != nil {
taskID = setupReq.Envs["HARNESS_DELEGATE_TASK_ID"]
}

planExecID := os.Getenv("HARNESS_EXECUTION_ID")

meta := duallog.NewMetaConfig(
ti.AccountID, ti.OrgID, ti.ProjectID, ti.PipelineID,
ti.BuildID, planExecID, ti.StageID, "lite-engine", taskID,
)
logrus.AddHook(logger.NewDualLogHook(meta, "EXECUTION_LOGS"))

logger.L.WithFields(logrus.Fields{
"accountId": ti.AccountID, "pipelineId": ti.PipelineID,
"stageId": ti.StageID, "taskId": taskID,
}).Infoln("api: initialized dual log hook for lite-engine internal logs")
}

// syncSystemClock forces chrony to step the system clock if there is significant drift.
// This fixes clock skew on ARM64 VMs after GCP hibernate resume, where the arch_sys_counter
// clock source doesn't auto-adjust (unlike x86's kvm-clock). Without this, chrony may
Expand Down
14 changes: 14 additions & 0 deletions livelog/livelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/harness/lite-engine/duallog"
"github.com/harness/lite-engine/internal/safego"
"github.com/harness/lite-engine/logstream"
"github.com/harness/lite-engine/logstream/remote"
Expand Down Expand Up @@ -64,6 +65,9 @@ type Writer struct {
trimNewLineSuffix bool
lastFlushTime time.Time
ctx context.Context

dualLogMeta *duallog.Meta
dualLogType string
}

// New returns a new writer
Expand Down Expand Up @@ -99,6 +103,12 @@ func (b *Writer) SetInterval(interval time.Duration) {
b.interval = interval
}

// SetDualLogConfig enables dual logging to stdout in flat JSON format.
func (b *Writer) SetDualLogConfig(meta *duallog.Meta, logType string) {
b.dualLogMeta = meta
b.dualLogType = logType
}

// Write uploads the live log stream to the server.
func (b *Writer) Write(p []byte) (n int, err error) {
var res []byte
Expand Down Expand Up @@ -138,6 +148,10 @@ func (b *Writer) Write(p []byte) (n int, err error) {
ElaspedTime: int64(time.Since(b.now).Seconds()),
}

if b.dualLogMeta != nil {
duallog.EmitLine(b.dualLogMeta, line.Message, line.Timestamp, b.dualLogType)
}

jsonLine, _ := getLineBytes(line)

if b.printToStdout {
Expand Down
Loading