From 348e45388c689a48feb292062e29df7f16db1b07 Mon Sep 17 00:00:00 2001 From: Lalit Deore Date: Thu, 26 Feb 2026 19:13:07 +0530 Subject: [PATCH 1/3] [Feature] add opensearch health monitoring system --- health.go | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++++ structs.go | 115 ++++++++++++++++++--- 2 files changed, 390 insertions(+), 11 deletions(-) diff --git a/health.go b/health.go index 2e05fa05..07d8bc5d 100644 --- a/health.go +++ b/health.go @@ -602,6 +602,17 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { //log.Printf("CACHEDATA: %s", cacheData) err = json.Unmarshal(cacheData, &platformHealth) if err == nil { + if project.Environment != "cloud" { + statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId) + if cachedStats, statsErr := GetCache(ctx, statsKey); statsErr == nil { + var osStats OpensearchStats + if b, ok := cachedStats.([]byte); ok { + if jsonErr := json.Unmarshal(b, &osStats); jsonErr == nil { + platformHealth.OpensearchStats = osStats + } + } + } + } //log.Printf("[DEBUG] Platform health returned: %#v", platformHealth) marshalledData, err := json.Marshal(platformHealth) @@ -635,6 +646,17 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { health := healths[0] if err == nil { + if project.Environment != "cloud" { + statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId) + if cachedStats, statsErr := GetCache(ctx, statsKey); statsErr == nil { + var osStats OpensearchStats + if b, ok := cachedStats.([]byte); ok { + if jsonErr := json.Unmarshal(b, &osStats); jsonErr == nil { + health.OpensearchStats = osStats + } + } + } + } platformData, err := json.Marshal(health) if err != nil { log.Printf("[ERROR] Failed marshalling platform health data: %s", err) @@ -757,6 +779,20 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { }() platformHealth.OpensearchOps = <-opensearchHealthChannel + + statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId) + if cachedStats, cacheErr := GetCache(ctx, statsKey); cacheErr == nil { + var osStats OpensearchStats + if b, ok := cachedStats.([]byte); ok { + if jsonErr := json.Unmarshal(b, &osStats); jsonErr == nil { + platformHealth.OpensearchStats = osStats + } else { + log.Printf("[WARNING] Failed unmarshalling cached OpenSearch resource stats: %s", jsonErr) + } + } + } else { + log.Printf("[DEBUG] No cached OpenSearch resource stats found (key: %s): %s", statsKey, cacheErr) + } } datastoreHealthChannel := make(chan DatastoreHealth) @@ -851,6 +887,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { HealthCheck.Success = platformHealth.Success HealthCheck.Updated = platformHealth.Updated HealthCheck.Workflows = platformHealth.Workflows + HealthCheck.OpensearchStats = platformHealth.OpensearchStats // Add to database err = SetPlatformHealth(ctx, HealthCheck) @@ -3926,6 +3963,255 @@ func RunOpensearchOps(ctx context.Context) (*opensearchapi.ClusterHealthResp, er return resp, nil } +func resolveHealthOrgId(ctx context.Context) (string, error) { + orgId := os.Getenv("SHUFFLE_OPS_DASHBOARD_ORG") + if len(orgId) > 0 { + return orgId, nil + } + + org, err := GetFirstOrg(ctx) + if err != nil { + return "", fmt.Errorf("failed getting first org: %w", err) + } + + return org.Id, nil +} + +func ScheduleOpensearchResourceHealth() { + if os.Getenv("SHUFFLE_HEALTHCHECK_DISABLED") == "true" { + return + } + + if project.Environment == "cloud" { + return + } + + ctx := context.Background() + orgId, err := resolveHealthOrgId(ctx) + if err != nil { + log.Printf("[ERROR] ScheduleOpensearchResourceHealth: failed resolving orgId: %s", err) + return + } + + if err := CheckOpensearchResourceHealth(ctx, orgId); err != nil { + log.Printf("[WARNING] ScheduleOpensearchResourceHealth: OpenSearch resource health check failed: %s", err) + } +} + +func RunOpensearchResourceHealthCheck(resp http.ResponseWriter, request *http.Request) { + cors := HandleCors(resp, request) + if cors { + return + } + + if os.Getenv("SHUFFLE_HEALTHCHECK_DISABLED") == "true" { + resp.WriteHeader(204) + resp.Write([]byte(`{"success": false, "reason": "Healthcheck disabled."}`)) + return + } + + if project.Environment == "cloud" { + resp.WriteHeader(400) + resp.Write([]byte(`{"success": false, "reason": "Not available in cloud environment."}`)) + return + } + + ctx := GetContext(request) + orgId, err := resolveHealthOrgId(ctx) + if err != nil { + log.Printf("[ERROR] RunOpensearchResourceHealthCheck: failed resolving orgId: %s", err) + resp.WriteHeader(500) + resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err.Error()))) + return + } + + if err := CheckOpensearchResourceHealth(ctx, orgId); err != nil { + log.Printf("[WARNING] RunOpensearchResourceHealthCheck: %s", err) + resp.WriteHeader(503) + resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err.Error()))) + return + } + + resp.Header().Set("Content-Type", "application/json") + resp.WriteHeader(200) + resp.Write([]byte(`{"success": true}`)) +} + +func CheckOpensearchResourceHealth(ctx context.Context, orgId string) error { + if project.Environment == "cloud" { + return nil + } + + opensearchUrl := os.Getenv("SHUFFLE_OPENSEARCH_URL") + if len(opensearchUrl) == 0 { + opensearchUrl = "https://shuffle-opensearch:9200" + } + + apiUrl := fmt.Sprintf("%s/_nodes/stats/fs,os,jvm,thread_pool", opensearchUrl) + httpReq, err := http.NewRequest("GET", apiUrl, nil) + if err != nil { + return fmt.Errorf("failed creating opensearch nodes stats request: %s", err) + } + + foundClient := GetEsConfig(false) + res, err := foundClient.Client.Transport.Perform(httpReq) + if err != nil { + return fmt.Errorf("failed querying opensearch nodes stats: %s", err) + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed reading opensearch nodes stats response: %s", err) + } + + if res.StatusCode != 200 { + return fmt.Errorf("opensearch nodes stats returned status %d: %s", res.StatusCode, string(body)) + } + + var nodesStats OpenSearchNodesStatsResp + if err := json.Unmarshal(body, &nodesStats); err != nil { + return fmt.Errorf("failed parsing opensearch nodes stats: %s", err) + } + + // Track worst-case stats across all nodes so they can be cached for /api/v1/health. + var ( + maxDiskPercent float64 + maxDiskNodeName string + maxHeapPercent int + maxHeapNodeName string + ) + + for nodeId, node := range nodesStats.Nodes { + // Disk check + total := node.FS.Total.TotalInBytes + available := node.FS.Total.AvailableInBytes + if total > 0 { + usedPercent := float64(total-available) / float64(total) * 100 + + // Track the node with the highest disk usage. + if usedPercent > maxDiskPercent { + maxDiskPercent = usedPercent + maxDiskNodeName = node.Name + } + + if debug { + log.Printf("[DEBUG] Disk usage for OpenSearch node %s (%s): %.1f%% (available: %d bytes, total: %d bytes)", nodeId, node.Name, usedPercent, available, total) + } + + if usedPercent >= 90.0 { + log.Printf("[WARNING] OpenSearch node %s (%s) disk usage is %.1f%%", nodeId, node.Name, usedPercent) + + // Only alert once every 1 hour per node to avoid spam. + diskAlertKey := fmt.Sprintf("opensearch-disk-alert-%s-%s", orgId, nodeId) + _, cacheErr := GetCache(ctx, diskAlertKey) + if cacheErr != nil { + // Not cached – create notification and suppress for 1 hour. + notifyErr := CreateOrgNotification( + ctx, + fmt.Sprintf("OpenSearch disk usage critical on node %s", node.Name), + fmt.Sprintf("OpenSearch node '%s' disk usage is at %.1f%% (threshold: 90%%). Consider cleaning up old indices or expanding disk capacity.", node.Name, usedPercent), + "/admin?admin_tab=notifications", + orgId, + true, + "HIGH", + "opensearch", + ) + if notifyErr != nil { + log.Printf("[WARNING] Failed creating disk notification for OpenSearch node %s: %s", nodeId, notifyErr) + } + // Don't send alert for next one hour for this node. + SetCache(ctx, diskAlertKey, []byte("1"), 60) + } + } + } + + // JVM Heap monitoring check + heapPercent := node.JVM.Mem.HeapUsedPercent + if heapPercent > maxHeapPercent { + maxHeapPercent = heapPercent + maxHeapNodeName = node.Name + } + + shuffleHeapThreshold := 80 + if envThreshold := os.Getenv("SHUFFLE_HEAP_ALERT_THRESHOLD"); envThreshold != "" { + if t, err := strconv.Atoi(envThreshold); err == nil && t > 0 && t < 100 { + shuffleHeapThreshold = t + } + } + + if debug { + log.Printf("[DEBUG] Opensearch JVM heap percentage is: %v and threshold is: %v", heapPercent, shuffleHeapThreshold) + } + + if heapPercent >= shuffleHeapThreshold { + log.Printf("[WARNING] OpenSearch node %s (%s) JVM heap usage is %d%%", nodeId, node.Name, heapPercent) + + // Track the first time we saw heap above threshold. + heapFirstSeenKey := fmt.Sprintf("opensearch-heap-firstseen-%s-%s", orgId, nodeId) + heapAlertKey := fmt.Sprintf("opensearch-heap-alert-%s-%s", orgId, nodeId) + + _, alertCacheErr := GetCache(ctx, heapAlertKey) + if alertCacheErr != nil { + // No active alert yet – check if we have been above threshold for ≥10 min. + firstSeenRaw, firstSeenErr := GetCache(ctx, heapFirstSeenKey) + if firstSeenErr != nil { + // First time seeing it – record timestamp and wait. + SetCache(ctx, heapFirstSeenKey, []byte(fmt.Sprintf("%d", time.Now().Unix())), 30) + } else { + // Already seen before – check how long. + var firstSeenUnix int64 + if b, ok := firstSeenRaw.([]byte); ok { + firstSeenUnix, _ = strconv.ParseInt(string(b), 10, 64) + } + if firstSeenUnix > 0 && time.Now().Unix()-firstSeenUnix >= 60 { + // Sustained for ≥10 minutes – notify. + notifyErr := CreateOrgNotification( + ctx, + fmt.Sprintf("OpenSearch JVM heap usage critical on node %s", node.Name), + fmt.Sprintf("OpenSearch node '%s' JVM heap usage has been at %d%% or above for 10+ minutes (threshold: %d%%). Review search load, shard count, and GC activity. Consider increasing heap allocation or scaling the cluster. If the issue persists, please contact support@shuffler.io for assistance.", node.Name, heapPercent, shuffleHeapThreshold), + "/admin?admin_tab=notifications", + orgId, + true, + "HIGH", + "opensearch", + ) + if notifyErr != nil { + log.Printf("[WARNING] Failed creating heap notification for OpenSearch node %s: %s", nodeId, notifyErr) + } + + SetCache(ctx, heapAlertKey, []byte("1"), 60) + DeleteCache(ctx, heapFirstSeenKey) + } + } + } + } else { + // Heap is healthy – clear first-seen marker so the clock resets. + heapFirstSeenKey := fmt.Sprintf("opensearch-heap-firstseen-%s-%s", orgId, nodeId) + DeleteCache(ctx, heapFirstSeenKey) + } + } + + resourceStats := OpensearchStats{ + DiskPercent: maxDiskPercent, + HeapPercent: maxHeapPercent, + LastChecked: time.Now().Unix(), + NodeCount: len(nodesStats.Nodes), + WorstDiskNode: maxDiskNodeName, + WorstHeapNode: maxHeapNodeName, + } + + statsData, marshalErr := json.Marshal(resourceStats) + if marshalErr == nil { + statsKey := fmt.Sprintf("opensearch-resource-stats-%s", orgId) + SetCache(ctx, statsKey, statsData, 30) + } else { + log.Printf("[WARNING] Failed marshalling OpenSearch resource stats for cache: %s", marshalErr) + } + + return nil +} + // Send in deleteall=true to delete ALL executions for the environment ID func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) { cors := HandleCors(resp, request) diff --git a/structs.go b/structs.go index 96206c75..612026ca 100755 --- a/structs.go +++ b/structs.go @@ -2241,6 +2241,88 @@ type StatisticsItem struct { OrgId string `json:"org_id" datastore:"org_id"` } +type OpenSearchNodeFSTotal struct { + TotalInBytes int64 `json:"total_in_bytes"` + FreeInBytes int64 `json:"free_in_bytes"` + AvailableInBytes int64 `json:"available_in_bytes"` +} + +type OpenSearchNodeFS struct { + Total OpenSearchNodeFSTotal `json:"total"` +} + +type OpenSearchLoadAverage struct { + OneM float64 `json:"1m"` + FiveM float64 `json:"5m"` + FifteenM float64 `json:"15m"` +} + +type OpenSearchNodeOSCPU struct { + Percent int `json:"percent"` + LoadAverage OpenSearchLoadAverage `json:"load_average"` +} + +type OpenSearchNodeOS struct { + CPU OpenSearchNodeOSCPU `json:"cpu"` +} + +type OpenSearchNodeJVMMemHeap struct { + UsedInBytes int64 `json:"heap_used_in_bytes"` + MaxInBytes int64 `json:"heap_max_in_bytes"` + UsedPercent int `json:"heap_used_percent"` +} + +type OpenSearchNodeJVMMem struct { + HeapUsedPercent int `json:"heap_used_percent"` + HeapUsedInBytes int64 `json:"heap_used_in_bytes"` + HeapMaxInBytes int64 `json:"heap_max_in_bytes"` +} + +type OpenSearchNodeJVM struct { + Mem OpenSearchNodeJVMMem `json:"mem"` +} + +type OpenSearchThreadPoolStat struct { + Threads int64 `json:"threads"` + Queue int64 `json:"queue"` + Active int64 `json:"active"` + Rejected int64 `json:"rejected"` + Largest int64 `json:"largest"` + Completed int64 `json:"completed"` +} + +type OpenSearchNodeThreadPool struct { + Search OpenSearchThreadPoolStat `json:"search"` + Write OpenSearchThreadPoolStat `json:"write"` +} + +type OpenSearchNodeStat struct { + Name string `json:"name"` + FS OpenSearchNodeFS `json:"fs"` + OS OpenSearchNodeOS `json:"os"` + JVM OpenSearchNodeJVM `json:"jvm"` + ThreadPool OpenSearchNodeThreadPool `json:"thread_pool"` +} + +type OpenSearchNodesStatsResp struct { + ClusterName string `json:"cluster_name"` + Nodes map[string]OpenSearchNodeStat `json:"nodes"` +} + +type OpenSearchNodeOSInfo struct { + AllocatedProcessors int `json:"allocated_processors"` + AvailableProcessors int `json:"available_processors"` +} + +type OpenSearchNodeInfo struct { + Name string `json:"name"` + OS OpenSearchNodeOSInfo `json:"os"` +} + +type OpenSearchNodesInfoResp struct { + Nodes map[string]OpenSearchNodeInfo `json:"nodes"` +} + type HealthCheckSearchWrapper struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` @@ -4369,20 +4451,31 @@ type HealthCheck struct { Apps AppHealth `json:"apps"` Workflows WorkflowHealth `json:"workflows"` //PythonApps AppHealth `json:"python_apps"` - Datastore DatastoreHealth `json:"datastore"` - FileOps FileHealth `json:"fileops"` - OpensearchOps opensearchapi.ClusterHealthResp `json:"opensearch"` + Datastore DatastoreHealth `json:"datastore"` + FileOps FileHealth `json:"fileops"` + OpensearchOps opensearchapi.ClusterHealthResp `json:"opensearch"` + OpensearchStats OpensearchStats `json:"opensearch_stats"` } type HealthCheckDB struct { - Success bool `json:"success"` - Updated int64 `json:"updated"` - Workflows WorkflowHealth `json:"workflows"` - Opensearch opensearchapi.ClusterHealthResp `json:"opnsearch"` - Datastore DatastoreHealth `json:"datastore"` - FileOps FileHealth `json:"fileops"` - Apps AppHealth `json:"apps"` - ID string `json:"id"` + Success bool `json:"success"` + Updated int64 `json:"updated"` + Workflows WorkflowHealth `json:"workflows"` + Opensearch opensearchapi.ClusterHealthResp `json:"opnsearch"` + OpensearchStats OpensearchStats `json:"opensearch_stats"` + Datastore DatastoreHealth `json:"datastore"` + FileOps FileHealth `json:"fileops"` + Apps AppHealth `json:"apps"` + ID string `json:"id"` +} + +type OpensearchStats struct { + DiskPercent float64 `json:"disk_percent"` + HeapPercent int `json:"heap_percent"` + LastChecked int64 `json:"last_checked"` + NodeCount int `json:"node_count"` + WorstDiskNode string `json:"worst_disk_node"` + WorstHeapNode string `json:"worst_heap_node"` } type NodeData struct { From 290e494b4671b04b5a88f7d4bf8bd1bc5143df11 Mon Sep 17 00:00:00 2001 From: Lalit Deore Date: Thu, 5 Mar 2026 18:49:46 +0530 Subject: [PATCH 2/3] fix user not being deleted issue --- shared.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/shared.go b/shared.go index 09c0ef68..aa7bf4a4 100644 --- a/shared.go +++ b/shared.go @@ -11175,14 +11175,7 @@ func HandleDeleteUsersAccountPermanent(resp http.ResponseWriter, request *http.R return } - if !userInfo.SupportAccess { - log.Printf("[INFO] Unauthorized user (%s) attempted to delete an account. Must be a user or have support access.", userInfo.Username) - resp.WriteHeader(401) - resp.Write([]byte(`{"success": false, "reason": "Unauthorize User. Must be a regular user or have support access"}`)) - return - } - - if userInfo.Id != foundUser.Id { + if (userInfo.Id != foundUser.Id) && !userInfo.SupportAccess { log.Printf("[INFO] Unauthorized user (%s) attempted to delete an account. Must be a user or have support access.", userInfo.Username) resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Unauthorize User. Must be a regular user or have support access"}`)) From 783f01a563c59ea5960316e654347f9a84f34b18 Mon Sep 17 00:00:00 2001 From: Lalit Deore Date: Thu, 5 Mar 2026 18:53:12 +0530 Subject: [PATCH 3/3] Revert "fix user not being deleted issue" This reverts commit 290e494b4671b04b5a88f7d4bf8bd1bc5143df11. --- shared.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/shared.go b/shared.go index 638a4abf..34add51e 100644 --- a/shared.go +++ b/shared.go @@ -11182,7 +11182,14 @@ func HandleDeleteUsersAccountPermanent(resp http.ResponseWriter, request *http.R return } - if (userInfo.Id != foundUser.Id) && !userInfo.SupportAccess { + if !userInfo.SupportAccess { + log.Printf("[INFO] Unauthorized user (%s) attempted to delete an account. Must be a user or have support access.", userInfo.Username) + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false, "reason": "Unauthorize User. Must be a regular user or have support access"}`)) + return + } + + if userInfo.Id != foundUser.Id { log.Printf("[INFO] Unauthorized user (%s) attempted to delete an account. Must be a user or have support access.", userInfo.Username) resp.WriteHeader(401) resp.Write([]byte(`{"success": false, "reason": "Unauthorize User. Must be a regular user or have support access"}`))