diff --git a/db-connector.go b/db-connector.go index bbf4d763..2c6876be 100755 --- a/db-connector.go +++ b/db-connector.go @@ -11477,8 +11477,59 @@ func SetTriggerAuth(ctx context.Context, trigger TriggerAuth) error { func DeleteKeys(ctx context.Context, entity string, value []string) error { // Non indexed User data if project.DbType == "opensearch" { + if len(value) == 0 { + return nil + } + + indexName := strings.ToLower(GetESIndexPrefix(entity)) + var buf bytes.Buffer for _, item := range value { - DeleteKey(ctx, entity, item) + if len(item) == 0 { + log.Printf("[WARNING] Skipping delete from %s due to empty id", entity) + continue + } + + meta := map[string]map[string]string { + `delete`: { + "_index": indexName, + "_id": item, + }, + } + + metaLine, err := json.Marshal(meta) + if err != nil { + log.Printf("[WARNING] Failed marshalling delete meta for %s: %s", entity, err) + continue + } + + buf.Write(metaLine) + buf.WriteByte('\n') + } + + if buf.Len() == 0 { + return nil + } + + resp, err := project.Es.Bulk(ctx, opensearchapi.BulkReq{ + Body: bytes.NewBuffer(buf.Bytes()), + Index: indexName, + }) + if err != nil { + log.Printf("[ERROR] Failed sending bulk delete request to Opensearch: %s", err) + return err + } + + res := resp.Inspect().Response + defer res.Body.Close() + + if res.IsError() { + body, readErr := ioutil.ReadAll(res.Body) + if readErr != nil { + log.Printf("[WARNING] Error reading bulk delete response body: %s", readErr) + return readErr + } + + return errors.New(fmt.Sprintf("Bulk delete failed for %s: %s", entity, string(body))) } } else { keys := []*datastore.Key{}