Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 286 additions & 0 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,17 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
//log.Printf("CACHEDATA: %s", cacheData)
err = json.Unmarshal(cacheData, &platformHealth)
if err == nil {
if project.Environment != "cloud" {
statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId)
if cachedStats, statsErr := GetCache(ctx, statsKey); statsErr == nil {
var osStats OpensearchStats
if b, ok := cachedStats.([]byte); ok {
if jsonErr := json.Unmarshal(b, &osStats); jsonErr == nil {
platformHealth.OpensearchStats = osStats
}
}
}
}
//log.Printf("[DEBUG] Platform health returned: %#v", platformHealth)
marshalledData, err := json.Marshal(platformHealth)

Expand Down Expand Up @@ -635,6 +646,17 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
health := healths[0]

if err == nil {
if project.Environment != "cloud" {
statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId)
if cachedStats, statsErr := GetCache(ctx, statsKey); statsErr == nil {
var osStats OpensearchStats
if b, ok := cachedStats.([]byte); ok {
if jsonErr := json.Unmarshal(b, &osStats); jsonErr == nil {
health.OpensearchStats = osStats
}
}
}
}
platformData, err := json.Marshal(health)
if err != nil {
log.Printf("[ERROR] Failed marshalling platform health data: %s", err)
Expand Down Expand Up @@ -757,6 +779,20 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
}()

platformHealth.OpensearchOps = <-opensearchHealthChannel

statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId)
if cachedStats, cacheErr := GetCache(ctx, statsKey); cacheErr == nil {
var osStats OpensearchStats
if b, ok := cachedStats.([]byte); ok {
if jsonErr := json.Unmarshal(b, &osStats); jsonErr == nil {
platformHealth.OpensearchStats = osStats
} else {
log.Printf("[WARNING] Failed unmarshalling cached OpenSearch resource stats: %s", jsonErr)
}
}
} else {
log.Printf("[DEBUG] No cached OpenSearch resource stats found (key: %s): %s", statsKey, cacheErr)
}
}

datastoreHealthChannel := make(chan DatastoreHealth)
Expand Down Expand Up @@ -851,6 +887,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
HealthCheck.Success = platformHealth.Success
HealthCheck.Updated = platformHealth.Updated
HealthCheck.Workflows = platformHealth.Workflows
HealthCheck.OpensearchStats = platformHealth.OpensearchStats

// Add to database
err = SetPlatformHealth(ctx, HealthCheck)
Expand Down Expand Up @@ -3926,6 +3963,255 @@ func RunOpensearchOps(ctx context.Context) (*opensearchapi.ClusterHealthResp, er
return resp, nil
}

func resolveHealthOrgId(ctx context.Context) (string, error) {
orgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG")
if len(orgId) > 0 {
return orgId, nil
}

org, err := GetFirstOrg(ctx)
if err != nil {
return "", fmt.Errorf("failed getting first org: %w", err)
}

return org.Id, nil
}

func ScheduleOpensearchResourceHealth() {
if os.Getenv("SHUFFLE_HEALTHCHECK_DISABLED") == "true" {
return
}

if project.Environment == "cloud" {
return
}

ctx := context.Background()
orgId, err := resolveHealthOrgId(ctx)
if err != nil {
log.Printf("[ERROR] ScheduleOpensearchResourceHealth: failed resolving orgId: %s", err)
return
}

if err := CheckOpensearchResourceHealth(ctx, orgId); err != nil {
log.Printf("[WARNING] ScheduleOpensearchResourceHealth: OpenSearch resource health check failed: %s", err)
}
}

func RunOpensearchResourceHealthCheck(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
if cors {
return
}

if os.Getenv("SHUFFLE_HEALTHCHECK_DISABLED") == "true" {
resp.WriteHeader(204)
resp.Write([]byte(`{"success": false, "reason": "Healthcheck disabled."}`))
return
}

if project.Environment == "cloud" {
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false, "reason": "Not available in cloud environment."}`))
return
}

ctx := GetContext(request)
orgId, err := resolveHealthOrgId(ctx)
if err != nil {
log.Printf("[ERROR] RunOpensearchResourceHealthCheck: failed resolving orgId: %s", err)
resp.WriteHeader(500)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err.Error())))
return
}

if err := CheckOpensearchResourceHealth(ctx, orgId); err != nil {
log.Printf("[WARNING] RunOpensearchResourceHealthCheck: %s", err)
resp.WriteHeader(503)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err.Error())))
return
}

resp.Header().Set("Content-Type", "application/json")
resp.WriteHeader(200)
resp.Write([]byte(`{"success": true}`))
}

func CheckOpensearchResourceHealth(ctx context.Context, orgId string) error {
if project.Environment == "cloud" {
return nil
}

opensearchUrl := os.Getenv("SHUFFLE_OPENSEARCH_URL")
if len(opensearchUrl) == 0 {
opensearchUrl = "https://shuffle-opensearch:9200"
}

apiUrl := fmt.Sprintf("%s/_nodes/stats/fs,os,jvm,thread_pool", opensearchUrl)
httpReq, err := http.NewRequest("GET", apiUrl, nil)
if err != nil {
return fmt.Errorf("failed creating opensearch nodes stats request: %s", err)
}

foundClient := GetEsConfig(false)
res, err := foundClient.Client.Transport.Perform(httpReq)
if err != nil {
return fmt.Errorf("failed querying opensearch nodes stats: %s", err)
}
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("failed reading opensearch nodes stats response: %s", err)
}

if res.StatusCode != 200 {
return fmt.Errorf("opensearch nodes stats returned status %d: %s", res.StatusCode, string(body))
}

var nodesStats OpenSearchNodesStatsResp
if err := json.Unmarshal(body, &nodesStats); err != nil {
return fmt.Errorf("failed parsing opensearch nodes stats: %s", err)
}

// Track worst-case stats across all nodes so they can be cached for /api/v1/health.
var (
maxDiskPercent float64
maxDiskNodeName string
maxHeapPercent int
maxHeapNodeName string
)

for nodeId, node := range nodesStats.Nodes {
// Disk check
total := node.FS.Total.TotalInBytes
available := node.FS.Total.AvailableInBytes
if total > 0 {
usedPercent := float64(total-available) / float64(total) * 100

// Track the node with the highest disk usage.
if usedPercent > maxDiskPercent {
maxDiskPercent = usedPercent
maxDiskNodeName = node.Name
}

if debug {
log.Printf("[DEBUG] Disk usage for OpenSearch node %s (%s): %.1f%% (available: %d bytes, total: %d bytes)", nodeId, node.Name, usedPercent, available, total)
}

if usedPercent >= 90.0 {
log.Printf("[WARNING] OpenSearch node %s (%s) disk usage is %.1f%%", nodeId, node.Name, usedPercent)

// Only alert once every 1 hour per node to avoid spam.
diskAlertKey := fmt.Sprintf("opensearch-disk-alert-%s-%s", orgId, nodeId)
_, cacheErr := GetCache(ctx, diskAlertKey)
if cacheErr != nil {
// Not cached – create notification and suppress for 1 hour.
notifyErr := CreateOrgNotification(
ctx,
fmt.Sprintf("OpenSearch disk usage critical on node %s", node.Name),
fmt.Sprintf("OpenSearch node '%s' disk usage is at %.1f%% (threshold: 90%%). Consider cleaning up old indices or expanding disk capacity.", node.Name, usedPercent),
"/admin?admin_tab=notifications",
orgId,
true,
"HIGH",
"opensearch",
)
if notifyErr != nil {
log.Printf("[WARNING] Failed creating disk notification for OpenSearch node %s: %s", nodeId, notifyErr)
}
// Don't send alert for next one hour for this node.
SetCache(ctx, diskAlertKey, []byte("1"), 60)
}
}
}

// JVM Heap monitoring check
heapPercent := node.JVM.Mem.HeapUsedPercent
if heapPercent > maxHeapPercent {
maxHeapPercent = heapPercent
maxHeapNodeName = node.Name
}

shuffleHeapThreshold := 80
if envThreshold := os.Getenv("SHUFFLE_HEAP_ALERT_THRESHOLD"); envThreshold != "" {
if t, err := strconv.Atoi(envThreshold); err == nil && t > 0 && t < 100 {
shuffleHeapThreshold = t
}
}

if debug {
log.Printf("[DEBUG] Opensearch JVM heap percentage is: %v and threshold is: %v", heapPercent, shuffleHeapThreshold)
}

if heapPercent >= shuffleHeapThreshold {
log.Printf("[WARNING] OpenSearch node %s (%s) JVM heap usage is %d%%", nodeId, node.Name, heapPercent)

// Track the first time we saw heap above threshold.
heapFirstSeenKey := fmt.Sprintf("opensearch-heap-firstseen-%s-%s", orgId, nodeId)
heapAlertKey := fmt.Sprintf("opensearch-heap-alert-%s-%s", orgId, nodeId)

_, alertCacheErr := GetCache(ctx, heapAlertKey)
if alertCacheErr != nil {
// No active alert yet – check if we have been above threshold for ≥10 min.
firstSeenRaw, firstSeenErr := GetCache(ctx, heapFirstSeenKey)
if firstSeenErr != nil {
// First time seeing it – record timestamp and wait.
SetCache(ctx, heapFirstSeenKey, []byte(fmt.Sprintf("%d", time.Now().Unix())), 30)
} else {
// Already seen before – check how long.
var firstSeenUnix int64
if b, ok := firstSeenRaw.([]byte); ok {
firstSeenUnix, _ = strconv.ParseInt(string(b), 10, 64)
}
if firstSeenUnix > 0 && time.Now().Unix()-firstSeenUnix >= 60 {
// Sustained for ≥10 minutes – notify.
notifyErr := CreateOrgNotification(
ctx,
fmt.Sprintf("OpenSearch JVM heap usage critical on node %s", node.Name),
fmt.Sprintf("OpenSearch node '%s' JVM heap usage has been at %d%% or above for 10+ minutes (threshold: %d%%). Review search load, shard count, and GC activity. Consider increasing heap allocation or scaling the cluster. If the issue persists, please contact support@shuffler.io for assistance.", node.Name, heapPercent, shuffleHeapThreshold),
"/admin?admin_tab=notifications",
orgId,
true,
"HIGH",
"opensearch",
)
if notifyErr != nil {
log.Printf("[WARNING] Failed creating heap notification for OpenSearch node %s: %s", nodeId, notifyErr)
}

SetCache(ctx, heapAlertKey, []byte("1"), 60)
DeleteCache(ctx, heapFirstSeenKey)
}
}
}
} else {
// Heap is healthy – clear first-seen marker so the clock resets.
heapFirstSeenKey := fmt.Sprintf("opensearch-heap-firstseen-%s-%s", orgId, nodeId)
DeleteCache(ctx, heapFirstSeenKey)
}
}

resourceStats := OpensearchStats{
DiskPercent: maxDiskPercent,
HeapPercent: maxHeapPercent,
LastChecked: time.Now().Unix(),
NodeCount: len(nodesStats.Nodes),
WorstDiskNode: maxDiskNodeName,
WorstHeapNode: maxHeapNodeName,
}

statsData, marshalErr := json.Marshal(resourceStats)
if marshalErr == nil {
statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId)
SetCache(ctx, statsKey, statsData, 30)
} else {
log.Printf("[WARNING] Failed marshalling OpenSearch resource stats for cache: %s", marshalErr)
}

return nil
}

// Send in deleteall=true to delete ALL executions for the environment ID
func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
Expand Down
Loading
Loading