Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ private RelBuilder scan(RelOptTable tableSchema, CalcitePlanContext context) {
public RelNode visitSearch(Search node, CalcitePlanContext context) {
// Visit the Relation child to get the scan
node.getChild().get(0).accept(this, context);

// Mark the scan as originating from a search command so that the optimizer
// can scope auto-highlight injection to search queries only.
PPLHintUtils.markSearchCommand(context.relBuilder);

// Create query_string function
Function queryStringFunc =
AstDSL.function(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
package org.opensearch.sql.calcite.utils;

import com.google.common.base.Suppliers;
import java.util.List;
import java.util.function.Supplier;
import lombok.experimental.UtilityClass;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.tools.RelBuilder;
Expand All @@ -19,6 +23,7 @@ public class PPLHintUtils {
private static final String HINT_AGG_ARGUMENTS = "AGG_ARGS";
private static final String KEY_IGNORE_NULL_BUCKET = "ignoreNullBucket";
private static final String KEY_HAS_NESTED_AGG_CALL = "hasNestedAggCall";
public static final String HINT_SEARCH_COMMAND = "SEARCH_COMMAND";

private static final Supplier<HintStrategyTable> HINT_STRATEGY_TABLE =
Suppliers.memoize(
Expand All @@ -29,7 +34,7 @@ public class PPLHintUtils {
(hint, rel) -> {
return rel instanceof LogicalAggregate;
})
// add more here
.hintStrategy(HINT_SEARCH_COMMAND, (hint, rel) -> rel instanceof TableScan)
.build());

/**
Expand Down Expand Up @@ -81,4 +86,34 @@ public static boolean hasNestedAggCall(Aggregate aggregate) {
.getOrDefault(KEY_HAS_NESTED_AGG_CALL, "false")
.equals("true"));
}

/**
* Mark a scan node as originating from a PPL search command. The scan node may be on top of the
* relBuilder stack directly, or wrapped in a Project (due to alias field wrapping). This hint is
* used to scope auto-highlight injection to search command queries only.
*/
public static void markSearchCommand(RelBuilder relBuilder) {
final RelHint hint = RelHint.builder(HINT_SEARCH_COMMAND).build();
RelNode top = relBuilder.peek();
if (top instanceof Hintable) {
// Scan is directly on top of the stack
relBuilder.hints(hint);
} else if (top instanceof org.apache.calcite.rel.core.Project proj) {
RelNode input = proj.getInput();
if (input instanceof Hintable hintable) {
RelNode hintedInput = hintable.attachHints(List.of(hint));
RelNode newProject = proj.copy(proj.getTraitSet(), List.of(hintedInput));
relBuilder.build(); // pop old project
relBuilder.push(newProject);
}
}
if (relBuilder.getCluster().getHintStrategies() == HintStrategyTable.EMPTY) {
relBuilder.getCluster().setHintStrategies(HINT_STRATEGY_TABLE.get());
}
}

/** Return true if the scan has the SEARCH_COMMAND hint. */
public static boolean isSearchCommand(TableScan scan) {
return scan.getHints().stream().anyMatch(hint -> hint.hintName.equals(HINT_SEARCH_COMMAND));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
/** Highlight Expression. */
@Getter
public class HighlightExpression extends FunctionExpression {
/** The field name used to store highlight data on ExprTupleValue rows. */
public static final String HIGHLIGHT_FIELD = "_highlight";

private final Expression highlightField;
private final ExprType type;

Expand All @@ -46,7 +49,7 @@ public HighlightExpression(Expression highlightField) {
*/
@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
String refName = "_highlight";
String refName = HIGHLIGHT_FIELD;
// Not a wilcard expression
if (this.type == ExprCoreType.ARRAY) {
refName += "." + StringUtils.unquoteText(getHighlightField().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.sql.executor.ExecutionEngine.Schema.Column;
import org.opensearch.sql.executor.Explain;
import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.expression.HighlightExpression;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;
import org.opensearch.sql.monitor.profile.MetricName;
Expand All @@ -63,6 +64,7 @@
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexEnumerator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.transport.client.node.NodeClient;
Expand Down Expand Up @@ -211,6 +213,7 @@ public void execute(
client.schedule(
() -> {
try (PreparedStatement statement = OpenSearchRelRunners.run(context, rel)) {
OpenSearchIndexEnumerator.clearCollectedHighlights();
ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
long execTime = System.nanoTime();
ResultSet result = statement.executeQuery();
Expand Down Expand Up @@ -279,6 +282,21 @@ private QueryResponse buildResultSet(
values.add(ExprTupleValue.fromExprValueMap(row));
}

// Merge highlight data collected by the enumerator back into ExprTupleValues.
// The Calcite row pipeline only carries schema column values, so highlight metadata
// is collected as a side channel in OpenSearchIndexEnumerator and merged here.
List<ExprValue> collectedHighlights =
OpenSearchIndexEnumerator.getAndClearCollectedHighlights();
for (int i = 0; i < Math.min(values.size(), collectedHighlights.size()); i++) {
ExprValue hl = collectedHighlights.get(i);
if (hl != null) {
Map<String, ExprValue> rowWithHighlight =
new LinkedHashMap<>(ExprValueUtils.getTupleValue(values.get(i)));
rowWithHighlight.put(HighlightExpression.HIGHLIGHT_FIELD, hl);
values.set(i, ExprTupleValue.fromExprValueMap(rowWithHighlight));
}
}

List<Column> columns = new ArrayList<>(metaData.getColumnCount());
for (int i = 1; i <= columnCount; ++i) {
String columnName = metaData.getColumnName(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.opensearch.response;

import static org.opensearch.sql.expression.HighlightExpression.HIGHLIGHT_FIELD;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATAFIELD_TYPE_MAP;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_INDEX;
Expand Down Expand Up @@ -200,7 +201,7 @@ private void addHighlightsToBuilder(
.map(Text::toString)
.collect(Collectors.toList())));
}
builder.put("_highlight", ExprTupleValue.fromExprValueMap(hlBuilder.build()));
builder.put(HIGHLIGHT_FIELD, ExprTupleValue.fromExprValueMap(hlBuilder.build()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public double estimateRowCount(RelMetadataQuery mq) {
(rowCount, operation) ->
switch (operation.type()) {
case AGGREGATION -> mq.getRowCount((RelNode) operation.digest());
case PROJECT, SORT, SORT_EXPR -> rowCount;
case PROJECT, SORT, SORT_EXPR, HIGHLIGHT -> rowCount;
case SORT_AGG_METRICS ->
NumberUtil.min(rowCount, osIndex.getQueryBucketSize().doubleValue());
// Refer the org.apache.calcite.rel.metadata.RelMdRowCount
Expand Down Expand Up @@ -176,8 +176,8 @@ public double estimateRowCount(RelMetadataQuery mq) {
dRows = mq.getRowCount((RelNode) operation.digest());
dCpu += dRows * getAggMultiplier(operation);
}
// Ignored Project in cost accumulation, but it will affect the external cost
case PROJECT -> {}
// Ignored Project and Highlight in cost accumulation
case PROJECT, HIGHLIGHT -> {}
case SORT -> dCpu += dRows;
case SORT_AGG_METRICS -> {
dRows = dRows * .9 / 10; // *.9 because always bucket IS_NOT_NULL
Expand Down Expand Up @@ -266,6 +266,11 @@ public Map<String, String> getAliasMapping() {
return osIndex.getAliasMapping();
}

@Override
public RelNode withHints(List<RelHint> hintList) {
return buildScan(getCluster(), traitSet, hintList, table, osIndex, schema, pushDownContext);
}

public abstract AbstractCalciteIndexScan copy();

protected List<String> getCollationNames(List<RelFieldCollation> collations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.PPLHintUtils;
import org.opensearch.sql.common.setting.Settings;
Expand Down Expand Up @@ -158,6 +159,27 @@ public AbstractRelNode pushDownFilter(Filter filter) {
(OSRequestBuilderAction)
requestBuilder -> requestBuilder.pushDownFilterForCalcite(queryExpression.builder()));

// Auto-inject wildcard highlight for PPL search command result highlighting.
// Only adds highlight when the scan is marked with a SEARCH_COMMAND hint
// (set by CalciteRelNodeVisitor.visitSearch), scoping it to the search command only.
// Uses OSD custom tags so the frontend getHighlightHtml() can convert to <mark>.
if (PPLHintUtils.isSearchCommand(this)) {
newScan.pushDownContext.add(
PushDownType.HIGHLIGHT,
"auto_highlight",
(OSRequestBuilderAction)
requestBuilder -> {
if (requestBuilder.getSourceBuilder().highlighter() == null) {
HighlightBuilder highlightBuilder =
new HighlightBuilder()
.field(new HighlightBuilder.Field("*").numOfFragments(0))
.preTags("@opensearch-dashboards-highlighted-field@")
.postTags("@/opensearch-dashboards-highlighted-field@");
requestBuilder.getSourceBuilder().highlighter(highlightBuilder);
}
});
}

// If the query expression is partial, we need to replace the input of the filter with the
// partial pushed scan and the filter condition with non-pushed-down conditions.
if (queryExpression.isPartial()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@

package org.opensearch.sql.opensearch.storage.scan;

import static org.opensearch.sql.expression.HighlightExpression.HIGHLIGHT_FIELD;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.calcite.linq4j.Enumerator;
Expand All @@ -27,6 +31,27 @@
*/
public class OpenSearchIndexEnumerator implements Enumerator<Object> {

/**
* Thread-local collector for highlight data. Since the Calcite row pipeline only carries schema
* column values, highlight metadata from OpenSearch hits is collected here as a side channel.
* After execution, {@link #getAndClearCollectedHighlights()} retrieves the collected data so it
* can be merged back into the ExprTupleValues for the JDBC response.
*/
private static final ThreadLocal<List<ExprValue>> COLLECTED_HIGHLIGHTS =
ThreadLocal.withInitial(ArrayList::new);

/** Retrieve collected highlights and clear the ThreadLocal. */
public static List<ExprValue> getAndClearCollectedHighlights() {
List<ExprValue> result = new ArrayList<>(COLLECTED_HIGHLIGHTS.get());
COLLECTED_HIGHLIGHTS.get().clear();
return result;
}

/** Clear collected highlights (call before starting a new execution). */
public static void clearCollectedHighlights() {
COLLECTED_HIGHLIGHTS.get().clear();
}

/** OpenSearch client. */
private final OpenSearchClient client;

Expand Down Expand Up @@ -111,6 +136,12 @@ public boolean moveNext() {
}
if (iterator.hasNext()) {
current = iterator.next();
// Collect highlight data as a side channel for the JDBC response.
// The Calcite row (from current()) only carries schema column values,
// so _highlight must be preserved separately.
Map<String, ExprValue> tuple = ExprValueUtils.getTupleValue(current);
ExprValue hl = tuple.get(HIGHLIGHT_FIELD);
COLLECTED_HIGHLIGHTS.get().add(hl != null && !hl.isMissing() ? hl : null);
queryCount++;
return true;
} else {
Expand All @@ -123,6 +154,7 @@ public void reset() {
bgScanner.reset(request);
iterator = bgScanner.fetchNextBatch(request).iterator();
queryCount = 0;
COLLECTED_HIGHLIGHTS.get().clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public enum PushDownType {
SCRIPT, // script in predicate
SORT_AGG_METRICS, // convert composite aggregate to terms or multi-terms bucket aggregate
RARE_TOP, // convert composite aggregate to nested aggregate
SORT_EXPR
// HIGHLIGHT,
SORT_EXPR,
HIGHLIGHT
// NESTED
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

package org.opensearch.sql.protocol.response;

import static org.opensearch.sql.expression.HighlightExpression.HIGHLIGHT_FIELD;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
Expand Down Expand Up @@ -82,19 +86,48 @@ public Map<String, String> columnNameTypes() {

@Override
public Iterator<Object[]> iterator() {
// Any chance to avoid copy for json response generation?
return exprValues.stream()
.map(ExprValueUtils::getTupleValue)
.map(Map::values)
.map(this::convertExprValuesToValues)
.map(
tuple ->
tuple.entrySet().stream()
.filter(e -> !HIGHLIGHT_FIELD.equals(e.getKey()))
.map(e -> e.getValue().value())
.toArray(Object[]::new))
.iterator();
}

private String getColumnName(Column column) {
return (column.getAlias() != null) ? column.getAlias() : column.getName();
/**
* Extract highlight data from each result row. Each row may contain a {@code _highlight} field
* added by {@code OpenSearchResponse.addHighlightsToBuilder()} and preserved through projection.
* Returns a list parallel to datarows where each entry is either a map of field name to highlight
* fragments, or null if no highlight data exists for that row.
*
* @return list of highlight maps, one per row
*/
public List<Map<String, Object>> highlights() {
return exprValues.stream()
.map(ExprValueUtils::getTupleValue)
.map(
tuple -> {
ExprValue hl = tuple.get(HIGHLIGHT_FIELD);
if (hl == null || hl.isMissing()) {
return null;
}
Map<String, Object> hlMap = new LinkedHashMap<>();
for (Map.Entry<String, ExprValue> entry : hl.tupleValue().entrySet()) {
hlMap.put(
entry.getKey(),
entry.getValue().collectionValue().stream()
.map(ExprValue::stringValue)
.collect(Collectors.toList()));
}
return (Map<String, Object>) hlMap;
})
.collect(Collectors.toList());
}

private Object[] convertExprValuesToValues(Collection<ExprValue> exprValues) {
return exprValues.stream().map(ExprValue::value).toArray(Object[]::new);
private String getColumnName(Column column) {
return (column.getAlias() != null) ? column.getAlias() : column.getName();
}
}
Loading
Loading