Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.analytics.schema.BinaryType;
import org.opensearch.analytics.schema.IpType;
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
import org.opensearch.sql.calcite.type.ExprBinaryType;
import org.opensearch.sql.calcite.type.ExprDateType;
Expand Down Expand Up @@ -275,6 +277,25 @@ public static ExprType convertRelDataTypeToExprType(RelDataType type) {
return exprType;
}

/**
* Result-schema-only variant of {@link #convertRelDataTypeToExprType} that recognizes the
* analytics-engine {@link IpType} / {@link BinaryType} markers as {@link ExprCoreType#IP} /
* {@link ExprCoreType#BINARY}.
*
* <p>Kept off the general path because Calcite's planner-internal coercion would round-trip
* through {@link #convertExprTypeToRelDataType} and synthesize {@code CAST(... AS ExprIPType)}
* casts the substrait converter can't handle.
*/
public static ExprType convertAnalyticsEngineRelDataTypeToExprType(RelDataType type) {
if (type instanceof IpType) {
return IP;
}
if (type instanceof BinaryType) {
return BINARY;
}
return convertRelDataTypeToExprType(type);
}

public static ExprValue getExprValueByExprType(ExprType type, Object value) {
switch (type) {
case UNDEFINED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package org.opensearch.sql.executor.analytics;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -14,6 +17,9 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.analytics.schema.BinaryType;
import org.opensearch.analytics.schema.IpType;
import org.opensearch.common.network.InetAddresses;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.calcite.CalcitePlanContext;
Expand Down Expand Up @@ -123,15 +129,46 @@ private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeFie
for (Object[] row : rows) {
Map<String, ExprValue> valueMap = new LinkedHashMap<>();
for (int i = 0; i < fields.size(); i++) {
String columnName = fields.get(i).getName();
RelDataTypeField field = fields.get(i);
Object value = (i < row.length) ? row[i] : null;
valueMap.put(columnName, ExprValueUtils.fromObjectValue(value));
valueMap.put(field.getName(), toExprValue(value, field.getType()));
}
results.add(ExprTupleValue.fromExprValueMap(valueMap));
}
return results;
}

/**
* Converts a single result cell to an {@link ExprValue}, dispatching on the column's UDT when
* present so {@code byte[]} payloads are rendered correctly:
*
* <ul>
* <li>{@link IpType} + {@code byte[]} &rarr; canonical address string (matches {@code
* IpFieldMapper}'s {@code valueFetcher} output).
* <li>{@link BinaryType} + {@code byte[]} &rarr; base64-encoded string (matches the OpenSearch
* {@code binary} field wire format).
* <li>Anything else &rarr; existing {@link ExprValueUtils#fromObjectValue} path.
* </ul>
*
* <p>Without this dispatch, {@code fromObjectValue} throws {@code unsupported object class [B} on
* byte[] cells, and IP buffers leak through as raw 16-byte ipv4-mapped-ipv6 garbage.
*/
private static ExprValue toExprValue(Object value, RelDataType type) {
if (value instanceof byte[] bytes) {
if (type instanceof IpType) {
Comment thread
penghuo marked this conversation as resolved.
try {
return ExprValueUtils.stringValue(
InetAddresses.toAddrString(InetAddress.getByAddress(bytes)));
} catch (UnknownHostException e) {
throw new IllegalStateException("invalid IP buffer length: " + bytes.length, e);
}
} else if (type instanceof BinaryType) {
return ExprValueUtils.stringValue(Base64.getEncoder().encodeToString(bytes));
}
}
return ExprValueUtils.fromObjectValue(value);
}

private Schema buildSchema(List<RelDataTypeField> fields) {
List<Schema.Column> columns = new ArrayList<>();
for (RelDataTypeField field : fields) {
Expand All @@ -143,7 +180,7 @@ private Schema buildSchema(List<RelDataTypeField> fields) {

private ExprType convertType(RelDataType type) {
try {
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
return OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(type);
} catch (IllegalArgumentException e) {
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.YEARWEEK;

import com.google.common.collect.ImmutableMap;
import inet.ipaddr.IPAddress;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -284,7 +283,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLambda;
Expand Down Expand Up @@ -314,10 +312,8 @@
import org.opensearch.sql.calcite.utils.PlanUtils;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.function.CollectionUDF.MVIndexFunctionImp;
import org.opensearch.sql.utils.IPUtils;

public class PPLFuncImpTable {
private static final Logger logger = LogManager.getLogger(PPLFuncImpTable.class);
Expand Down Expand Up @@ -913,29 +909,6 @@ void populate() {
registerDivideFunction(DIVIDEFUNCTION);
registerOperator(SHA2, PPLBuiltinOperators.SHA2);
registerOperator(CIDRMATCH, PPLBuiltinOperators.CIDRMATCH);
// (VARBINARY, VARCHAR) overload for ip / binary columns. The lambda parses the cidr
// literal at plan time and emits AND(col >= low, col <= high) directly.
// Only literal cidrs are expanded.
register(
CIDRMATCH,
(FunctionImp2)
(builder, col, cidr) -> {
if (cidr instanceof RexLiteral lit
&& col.getType().getSqlTypeName() == SqlTypeName.VARBINARY) {
byte[][] range = parseCidrToIpv6Range(lit.getValueAs(String.class));
RelDataType varbinary =
builder.getTypeFactory().createSqlType(SqlTypeName.VARBINARY);
RexNode low = builder.makeLiteral(new ByteString(range[0]), varbinary, false);
RexNode high = builder.makeLiteral(new ByteString(range[1]), varbinary, false);
// makeCall(AND, ...) auto-flattens at construction, so no Filter.isFlat issue.
return builder.makeCall(
SqlStdOperatorTable.AND,
builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, col, low),
builder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, col, high));
}
return builder.makeCall(PPLBuiltinOperators.CIDRMATCH, col, cidr);
},
PPLTypeChecker.family(SqlTypeFamily.BINARY, SqlTypeFamily.STRING));
registerOperator(INTERNAL_GROK, PPLBuiltinOperators.GROK);
registerOperator(INTERNAL_PARSE, PPLBuiltinOperators.PARSE);
registerOperator(MATCH, PPLBuiltinOperators.MATCH);
Expand Down Expand Up @@ -1619,22 +1592,4 @@ private static SqlOperandTypeChecker extractTypeCheckerFromUDF(SqlOperator opera
}
return typeChecker;
}

/**
* Parses a CIDR string and returns its lower and upper bounds in canonical 16-byte IPv6-mapped
* form. Used by the (BINARY, STRING) {@code cidrmatch} overload to expand into a byte-range
* conjunction at plan time.
*
* <p>Delegates to {@link IPUtils#toRange(String)} for parsing; converts both bounds to IPv6 to
* guarantee 16-byte output regardless of whether the input cidr is IPv4 or IPv6.
*/
private static byte[][] parseCidrToIpv6Range(String cidr) {
if (cidr == null) {
throw new SemanticCheckException("cidrmatch range argument is null");
}
IPAddress range = IPUtils.toRange(cidr);
byte[] low = range.getLower().toIPv6().getBytes();
byte[] high = range.getUpper().toIPv6().getBytes();
return new byte[][] {low, high};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
* <ul>
* <li>(STRING, STRING) -> BOOLEAN
* <li>(IP, STRING) -> BOOLEAN
* <li>(BINARY, STRING) -> BOOLEAN — accepts VARBINARY-backed ip columns in the
* analytics-engine schema; the backend's CidrMatchFunctionAdapter rewrites
* these before they reach DataFusion.
* </ul>
*/
public class CidrMatchFunction extends ImplementorUDF {
Expand All @@ -50,7 +53,8 @@ public UDFOperandMetadata getOperandMetadata() {
return UDFOperandMetadata.wrapUDT(
List.of(
List.of(ExprCoreType.IP, ExprCoreType.STRING),
List.of(ExprCoreType.STRING, ExprCoreType.STRING)));
List.of(ExprCoreType.STRING, ExprCoreType.STRING),
List.of(ExprCoreType.BINARY, ExprCoreType.STRING)));
}

public static class CidrMatchImplementor implements NotNullImplementor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.jupiter.api.Test;
import org.opensearch.analytics.schema.BinaryType;
import org.opensearch.analytics.schema.IpType;
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;

public class OpenSearchTypeFactoryTest {

Expand Down Expand Up @@ -282,4 +285,49 @@ public void testConvertExprTypeBinaryToNullableVarbinary() {
assertEquals(SqlTypeName.VARBINARY, result.getSqlTypeName());
assertTrue(result.isNullable());
}

// ---------- convertAnalyticsEngineRelDataTypeToExprType ----------
// UDT-aware variant for the response-schema path. Must agree with the
// planner-internal convertRelDataTypeToExprType on every non-UDT input.

@Test
public void testConvertAnalyticsEngineIpTypeReturnsIpExprType() {
ExprType result =
OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(new IpType(true));
assertEquals(ExprCoreType.IP, result);
}

@Test
public void testConvertAnalyticsEngineBinaryTypeReturnsBinaryExprType() {
ExprType result =
OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(new BinaryType(true));
assertEquals(ExprCoreType.BINARY, result);
}

@Test
public void testConvertAnalyticsEnginePlainVarbinaryFallsBackToBinary() {
// Plain VARBINARY (no UDT) must still resolve to BINARY via the delegated path.
RelDataType varbinary = TYPE_FACTORY.createSqlType(SqlTypeName.VARBINARY);
ExprType result = OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(varbinary);
assertEquals(ExprCoreType.BINARY, result);
}

@Test
public void testConvertAnalyticsEngineDelegatesParityForNonUdtTypes() {
// Parity check: drift would mean response-schema labels diverge from Calcite's view.
RelDataType[] samples =
new RelDataType[] {
TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT),
TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR),
TYPE_FACTORY.createSqlType(SqlTypeName.BOOLEAN),
TYPE_FACTORY.createSqlType(SqlTypeName.DOUBLE),
TYPE_FACTORY.createSqlType(SqlTypeName.TIMESTAMP),
};
for (RelDataType t : samples) {
assertEquals(
OpenSearchTypeFactory.convertRelDataTypeToExprType(t),
OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(t),
"Analytics-engine variant must agree with the general variant for " + t.getSqlTypeName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.analytics.schema.BinaryType;
import org.opensearch.analytics.schema.IpType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.SysLimit;
Expand Down Expand Up @@ -180,6 +182,58 @@ void executeRelNode_temporalTypes() {
// Query size limit is now enforced in the RelNode plan (LogicalSystemLimit) before it reaches
// AnalyticsExecutionEngine. The engine trusts the executor to honor the limit.

/** Raw 16-byte ipv6-mapped buffer + IpType → canonical IP string + schema reports "ip". */
@Test
void executeRelNode_ipColumnRendersAsAddressString() {
RelNode relNode = mockRelNodeWithType("host", new IpType(true));
// 1.2.3.4 in ipv4-mapped-ipv6 form: 10 zero bytes + ff ff + 4 IPv4 bytes.
byte[] ipv4 = new byte[16];
ipv4[10] = (byte) 0xff;
ipv4[11] = (byte) 0xff;
ipv4[12] = 1;
ipv4[13] = 2;
ipv4[14] = 3;
ipv4[15] = 4;
// ::1 in pure ipv6 form.
byte[] ipv6 = new byte[16];
ipv6[15] = 1;
Iterable<Object[]> rows = Arrays.asList(new Object[] {ipv4}, new Object[] {ipv6});
stubExecutorWith(relNode, rows);

QueryResponse response = executeAndCapture(relNode);
String dump = dumpResponse(response);

// Schema: column reports "ip", not "binary".
assertEquals(ExprCoreType.IP, response.getSchema().getColumns().get(0).getExprType(), dump);
// Cells: byte[] → formatted address string.
assertEquals(
"1.2.3.4",
response.getResults().get(0).tupleValue().get("host").value(),
"ipv4-mapped IPv6 buffer should render as dotted quad. " + dump);
assertEquals(
"::1",
response.getResults().get(1).tupleValue().get("host").value(),
"pure IPv6 buffer should render as RFC 5952 compressed form. " + dump);
}

/** Raw byte buffer + BinaryType → base64 string + schema reports "binary". */
@Test
void executeRelNode_binaryColumnRendersAsBase64() {
RelNode relNode = mockRelNodeWithType("blob", new BinaryType(true));
Iterable<Object[]> rows =
Collections.singletonList(new Object[] {"Some binary blob".getBytes()});
stubExecutorWith(relNode, rows);

QueryResponse response = executeAndCapture(relNode);
String dump = dumpResponse(response);

assertEquals(ExprCoreType.BINARY, response.getSchema().getColumns().get(0).getExprType(), dump);
assertEquals(
"U29tZSBiaW5hcnkgYmxvYg==",
response.getResults().get(0).tupleValue().get("blob").value(),
"byte[] should base64-encode to match OpenSearch binary wire format. " + dump);
}

@Test
void executeRelNode_emptyResults() {
RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR);
Expand Down Expand Up @@ -380,6 +434,16 @@ private RelNode mockRelNode(Object... nameTypePairs) {
return relNode;
}

/** Variant of {@link #mockRelNode} that accepts a pre-built RelDataType (e.g. UDTs). */
private RelNode mockRelNodeWithType(String name, RelDataType type) {
SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RelDataType rowType = typeFactory.builder().add(name, type).build();

RelNode relNode = mock(RelNode.class);
when(relNode.getRowType()).thenReturn(rowType);
return relNode;
}

private ResponseListener<QueryResponse> captureListener(AtomicReference<QueryResponse> ref) {
return new ResponseListener<QueryResponse>() {
@Override
Expand Down
Loading