Skip to content

Commit bdf8023

Browse files
committed
[BugFix] Fix Regex OOM when there are 10+ regex clauses
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent a8069d1 commit bdf8023

2 files changed

Lines changed: 118 additions & 3 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;
99

1010
import java.sql.Connection;
11+
import java.util.ArrayList;
1112
import java.util.HashMap;
1213
import java.util.List;
1314
import java.util.Map;
@@ -61,6 +62,12 @@ public class CalcitePlanContext {
6162

6263
@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;
6364

65+
/** Accumulated filter conditions to prevent deep Filter node chains */
66+
private final List<RexNode> pendingFilterConditions = new ArrayList<>();
67+
68+
/** Flag to indicate if filter accumulation mode is active */
69+
@Getter @Setter private boolean filterAccumulationEnabled = false;
70+
6471
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
6572
this.config = config;
6673
this.sysLimit = sysLimit;
@@ -134,4 +141,52 @@ public static boolean isLegacyPreferred() {
134141
public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
135142
this.rexLambdaRefMap.putAll(candidateMap);
136143
}
144+
145+
/**
146+
* Adds a filter condition to the accumulation list instead of creating immediate Filter RelNode.
147+
* This prevents deep Filter node chains that cause memory explosion.
148+
*/
149+
public void addFilterCondition(RexNode condition) {
150+
pendingFilterConditions.add(condition);
151+
}
152+
153+
/**
154+
* Applies all accumulated filter conditions as a single Filter RelNode with AND operations. This
155+
* creates a single Filter node instead of a deep chain of Filter nodes.
156+
*/
157+
public void flushFilterConditions() {
158+
if (pendingFilterConditions.isEmpty()) {
159+
return;
160+
}
161+
162+
if (pendingFilterConditions.size() == 1) {
163+
relBuilder.filter(pendingFilterConditions.get(0));
164+
} else {
165+
// Combine all filter conditions with AND
166+
RexNode combinedCondition = relBuilder.and(pendingFilterConditions);
167+
relBuilder.filter(combinedCondition);
168+
}
169+
pendingFilterConditions.clear();
170+
}
171+
172+
/**
173+
* Enables filter accumulation mode to prevent deep Filter node chains. Should be called before
174+
* processing multiple filter operations.
175+
*/
176+
public void enableFilterAccumulation() {
177+
filterAccumulationEnabled = true;
178+
}
179+
180+
/**
181+
* Disables filter accumulation mode. Should be called after processing multiple filter
182+
* operations.
183+
*/
184+
public void disableFilterAccumulation() {
185+
filterAccumulationEnabled = false;
186+
}
187+
188+
/** Returns true if there are pending filter conditions that need to be flushed. */
189+
public boolean hasPendingFilterConditions() {
190+
return !pendingFilterConditions.isEmpty();
191+
}
137192
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,20 @@ public CalciteRelNodeVisitor(DataSourceService dataSourceService) {
177177
}
178178

179179
public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
180-
return unresolved.accept(this, context);
180+
// Enable filter accumulation if this plan contains multiple filtering operations
181+
// that could create deep Filter RelNode chains
182+
if (countFilteringOperations(unresolved) >= 2) {
183+
context.enableFilterAccumulation();
184+
try {
185+
unresolved.accept(this, context);
186+
context.flushFilterConditions(); // Flush accumulated conditions before returning
187+
return context.relBuilder.peek(); // Get the result after flushing
188+
} finally {
189+
context.disableFilterAccumulation();
190+
}
191+
} else {
192+
return unresolved.accept(this, context);
193+
}
181194
}
182195

183196
@Override
@@ -241,7 +254,12 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
241254
context.relBuilder.filter(ImmutableList.of(v.get().id), condition);
242255
context.popCorrelVar();
243256
} else {
244-
context.relBuilder.filter(condition);
257+
// Use filter accumulation to prevent deep Filter node chains
258+
if (context.isFilterAccumulationEnabled()) {
259+
context.addFilterCondition(condition);
260+
} else {
261+
context.relBuilder.filter(condition);
262+
}
245263
}
246264
return context.relBuilder.peek();
247265
}
@@ -290,7 +308,12 @@ public RelNode visitRegex(Regex node, CalcitePlanContext context) {
290308
regexCondition = context.rexBuilder.makeCall(SqlStdOperatorTable.NOT, regexCondition);
291309
}
292310

293-
context.relBuilder.filter(regexCondition);
311+
// Use filter accumulation to prevent deep Filter node chains
312+
if (context.isFilterAccumulationEnabled()) {
313+
context.addFilterCondition(regexCondition);
314+
} else {
315+
context.relBuilder.filter(regexCondition);
316+
}
294317
return context.relBuilder.peek();
295318
}
296319

@@ -381,6 +404,11 @@ private boolean containsSubqueryExpression(Node expr) {
381404
public RelNode visitProject(Project node, CalcitePlanContext context) {
382405
visitChildren(node, context);
383406

407+
// Flush accumulated filter conditions before schema-changing operations
408+
if (context.isFilterAccumulationEnabled() && context.hasPendingFilterConditions()) {
409+
context.flushFilterConditions();
410+
}
411+
384412
if (isSingleAllFieldsProject(node)) {
385413
return handleAllFieldsProject(node, context);
386414
}
@@ -3237,4 +3265,36 @@ private RexNode createOptimizedTransliteration(
32373265
throw new RuntimeException("Failed to optimize sed expression: " + sedExpression, e);
32383266
}
32393267
}
3268+
3269+
/**
3270+
* Counts the number of filtering operations in an UnresolvedPlan tree that would create Filter
3271+
* RelNodes. This is used to detect queries with multiple regex/filter operations that could cause
3272+
* deep Filter RelNode chains and memory exhaustion.
3273+
*
3274+
* @param plan the UnresolvedPlan to analyze
3275+
* @return the count of filtering operations found
3276+
*/
3277+
private int countFilteringOperations(UnresolvedPlan plan) {
3278+
if (plan == null) {
3279+
return 0;
3280+
}
3281+
3282+
int count = 0;
3283+
3284+
// Count this node if it's a filtering operation
3285+
if (plan instanceof Regex || plan instanceof Filter) {
3286+
count = 1;
3287+
}
3288+
3289+
// Recursively count filtering operations in children
3290+
if (plan.getChild() != null) {
3291+
for (Node child : plan.getChild()) {
3292+
if (child instanceof UnresolvedPlan) {
3293+
count += countFilteringOperations((UnresolvedPlan) child);
3294+
}
3295+
}
3296+
}
3297+
3298+
return count;
3299+
}
32403300
}

0 commit comments

Comments
 (0)