Skip to content

Commit be16a2d

Browse files
committed
test updates
Signed-off-by: Marc Handalian <marc.handalian@gmail.com> (cherry picked from commit 2937a6715bca5d479998dc324c6e69c3a63d209a)
1 parent 9fdfb35 commit be16a2d

2 files changed

Lines changed: 149 additions & 39 deletions

File tree

plugins/engine-datafusion/build.gradle

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,37 @@ dependencies {
3737
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
3838

3939
// Apache Arrow dependencies for memory management
40-
implementation "org.apache.arrow:arrow-memory-core:17.0.0"
41-
implementation "org.apache.arrow:arrow-memory-unsafe:17.0.0"
42-
implementation "org.apache.arrow:arrow-vector:17.0.0"
43-
implementation "org.apache.arrow:arrow-c-data:17.0.0"
44-
implementation "org.apache.arrow:arrow-format:17.0.0"
40+
implementation "org.apache.arrow:arrow-c-data:${versions.arrow}"
4541
// SLF4J API for Arrow logging compatibility
4642
implementation "org.slf4j:slf4j-api:${versions.slf4j}"
4743
// CheckerFramework annotations required by Arrow 17.0.0
4844
implementation "org.checkerframework:checker-qual:3.42.0"
4945
// FlatBuffers dependency required by Arrow 17.0.0
5046
implementation "com.google.flatbuffers:flatbuffers-java:${versions.flatbuffers}"
51-
47+
// testRuntimeOnly "com.google.guava:guava:${versions.guava}"
48+
testImplementation 'io.substrait:core:0.67.0'
49+
testImplementation('io.substrait:isthmus:0.67.0') {
50+
exclude group: 'org.apache.calcite'
51+
}
52+
testImplementation 'org.apache.calcite:calcite-linq4j:1.41.0'
53+
testImplementation 'org.apache.calcite:calcite-server:1.41.0'
54+
testImplementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${versions.jackson}"
55+
testImplementation 'org.jooq:jooq-meta:3.19.15'
56+
testImplementation 'org.jooq:jooq:3.19.15'
57+
testImplementation 'org.jooq:joou:0.9.4'
58+
testImplementation 'com.jayway.jsonpath:json-path:2.9.0'
59+
testImplementation 'org.codehaus.janino:commons-compiler:3.1.10'
60+
testImplementation group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
61+
62+
testImplementation 'org.apache.calcite.avatica:avatica-core:1.26.0'
63+
api('org.apache.calcite:calcite-core:1.41.0') {
64+
exclude group: 'net.minidev', module: 'json-smart'
65+
}
5266
testImplementation "junit:junit:${versions.junit}"
5367
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
5468
testImplementation "org.mockito:mockito-core:${versions.mockito}"
5569
testImplementation project(":modules:parquet-data-format")
70+
api project(":plugins:arrow-flight-rpc")
5671
// Add CSV plugin for testing
5772
// testImplementation project(':plugins:dataformat-csv')
5873
}
@@ -170,7 +185,19 @@ clean {
170185
delete file('src/main/resources/native')
171186
}
172187

188+
internalClusterTest {
189+
systemProperty 'io.netty.allocator.numDirectArenas', '1'
190+
systemProperty 'io.netty.noUnsafe', 'false'
191+
systemProperty 'io.netty.tryUnsafe', 'true'
192+
systemProperty 'io.netty.tryReflectionSetAccessible', 'true'
193+
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
194+
}
195+
testingConventions.enabled = false
173196
test {
197+
systemProperty 'io.netty.allocator.numDirectArenas', '1'
198+
systemProperty 'io.netty.noUnsafe', 'false'
199+
systemProperty 'io.netty.tryUnsafe', 'true'
200+
systemProperty 'io.netty.tryReflectionSetAccessible', 'true'
174201
// Set system property to help tests find the native library
175202
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
176203

@@ -183,36 +210,36 @@ yamlRestTest {
183210
enabled = false
184211
}
185212

186-
tasks.named("dependencyLicenses").configure {
187-
mapping from: /jackson-.*/, to: 'jackson'
188-
mapping from: /arrow-.*/, to: 'arrow'
189-
mapping from: /slf4j-.*/, to: 'slf4j-api'
190-
mapping from: /checker-qual.*/, to: 'checker-qual'
191-
mapping from: /flatbuffers-.*/, to: 'flatbuffers-java'
192-
}
213+
//tasks.named("dependencyLicenses").configure {
214+
// mapping from: /jackson-.*/, to: 'jackson'
215+
// mapping from: /arrow-.*/, to: 'arrow'
216+
// mapping from: /slf4j-.*/, to: 'slf4j-api'
217+
// mapping from: /checker-qual.*/, to: 'checker-qual'
218+
// mapping from: /flatbuffers-.*/, to: 'flatbuffers-java'
219+
//}
193220

194221
// Configure third party audit to handle Apache Arrow dependencies
195222
tasks.named('thirdPartyAudit').configure {
196223
ignoreMissingClasses(
197224
// Apache Commons Codec (missing dependency)
198225
'org.apache.commons.codec.binary.Hex'
199226
)
200-
ignoreViolations(
201-
// Apache Arrow internal classes that use Unsafe operations
202-
'org.apache.arrow.memory.ArrowBuf',
203-
'org.apache.arrow.memory.unsafe.UnsafeAllocationManager',
204-
'org.apache.arrow.memory.util.ByteFunctionHelpers',
205-
'org.apache.arrow.memory.util.MemoryUtil',
206-
'org.apache.arrow.memory.util.MemoryUtil$1',
207-
'org.apache.arrow.memory.util.hash.MurmurHasher',
208-
'org.apache.arrow.memory.util.hash.SimpleHasher',
209-
'org.apache.arrow.vector.BaseFixedWidthVector',
210-
'org.apache.arrow.vector.BitVectorHelper',
211-
'org.apache.arrow.vector.Decimal256Vector',
212-
'org.apache.arrow.vector.DecimalVector',
213-
'org.apache.arrow.vector.util.DecimalUtility',
214-
'org.apache.arrow.vector.util.VectorAppender'
215-
)
227+
// ignoreViolations(
228+
// // Apache Arrow internal classes that use Unsafe operations
229+
// 'org.apache.arrow.memory.ArrowBuf',
230+
// 'org.apache.arrow.memory.unsafe.UnsafeAllocationManager',
231+
// 'org.apache.arrow.memory.util.ByteFunctionHelpers',
232+
// 'org.apache.arrow.memory.util.MemoryUtil',
233+
// 'org.apache.arrow.memory.util.MemoryUtil$1',
234+
// 'org.apache.arrow.memory.util.hash.MurmurHasher',
235+
// 'org.apache.arrow.memory.util.hash.SimpleHasher',
236+
// 'org.apache.arrow.vector.BaseFixedWidthVector',
237+
// 'org.apache.arrow.vector.BitVectorHelper',
238+
// 'org.apache.arrow.vector.Decimal256Vector',
239+
// 'org.apache.arrow.vector.DecimalVector',
240+
// 'org.apache.arrow.vector.util.DecimalUtility',
241+
// 'org.apache.arrow.vector.util.VectorAppender'
242+
// )
216243
}
217244

218245
// Configure Javadoc to skip package documentation requirements ie package-info.java

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionSingleNodeTests.java

Lines changed: 93 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,24 @@
99
package org.opensearch.datafusion;
1010

1111
import com.parquet.parquetdataformat.ParquetDataFormatPlugin;
12+
import io.substrait.extension.SimpleExtension;
13+
import io.substrait.isthmus.FeatureBoard;
14+
import io.substrait.isthmus.SqlToSubstrait;
15+
import io.substrait.isthmus.calcite.SubstraitTable;
16+
import io.substrait.isthmus.sql.SubstraitCreateStatementParser;
17+
import io.substrait.plan.Plan;
18+
import io.substrait.plan.PlanProtoConverter;
19+
import io.substrait.proto.Capabilities;
20+
import io.substrait.relation.NamedScan;
21+
import io.substrait.relation.ProtoRelConverter;
22+
import io.substrait.relation.Rel;
23+
import io.substrait.relation.RelCopyOnWriteVisitor;
24+
import io.substrait.util.EmptyVisitationContext;
25+
import org.apache.calcite.sql.parser.SqlParseException;
26+
import org.opensearch.action.bulk.BulkRequest;
27+
import org.opensearch.action.index.IndexRequest;
1228
import org.opensearch.action.search.SearchResponse;
29+
import org.opensearch.arrow.flight.transport.FlightStreamPlugin;
1330
import org.opensearch.cluster.metadata.IndexMetadata;
1431
import org.opensearch.common.settings.Settings;
1532
import org.opensearch.common.xcontent.json.JsonXContent;
@@ -19,9 +36,10 @@
1936
import org.opensearch.search.builder.SearchSourceBuilder;
2037
import org.opensearch.test.OpenSearchIntegTestCase;
2138
import org.opensearch.test.OpenSearchSingleNodeTestCase;
22-
39+
import org.apache.calcite.prepare.CalciteCatalogReader;
2340
import java.io.BufferedReader;
2441
import java.io.FileInputStream;
42+
import java.io.FileNotFoundException;
2543
import java.io.IOException;
2644
import java.io.InputStream;
2745
import java.io.InputStreamReader;
@@ -30,6 +48,10 @@
3048
import java.util.Collection;
3149
import java.util.List;
3250
import java.util.Locale;
51+
import java.util.Optional;
52+
53+
import static java.util.Arrays.asList;
54+
import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
3355

3456
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
3557
public class DataFusionSingleNodeTests extends OpenSearchSingleNodeTestCase {
@@ -40,7 +62,7 @@ public class DataFusionSingleNodeTests extends OpenSearchSingleNodeTestCase {
4062

4163
@Override
4264
protected Collection<Class<? extends Plugin>> getPlugins() {
43-
return List.of(DataFusionPlugin.class, ParquetDataFormatPlugin.class);
65+
return List.of(DataFusionPlugin.class, ParquetDataFormatPlugin.class, FlightStreamPlugin.class);
4466
}
4567

4668
public void testClickBenchQueries() throws IOException {
@@ -57,19 +79,14 @@ public void testClickBenchQueries() throws IOException {
5779
.build(),
5880
mappings
5981
);
60-
String req = fileToString(
61-
DATA,
62-
false
63-
);
64-
System.out.println(req.trim());
65-
client().prepareIndex("hits").setSource(req, MediaTypeRegistry.JSON).get();
82+
BulkRequest bulkRequest = prepareBulkRequest(DATA);
83+
client().bulk(bulkRequest).actionGet();
6684
client().admin().indices().prepareRefresh().get();
6785
client().admin().indices().prepareFlush().get();
68-
client().admin().indices().prepareFlush().get();
6986

7087
// TODO: run in a loop
7188
String sourceFile = fileToString(
72-
"q25.json",
89+
"q30.json",
7390
false
7491
);
7592
SearchSourceBuilder source = new SearchSourceBuilder();
@@ -81,6 +98,72 @@ public void testClickBenchQueries() throws IOException {
8198
System.out.println(response);
8299
}
83100

101+
@LockFeatureFlag(STREAM_TRANSPORT)
102+
public void testRandomQueryWithIsthmus() throws IOException, SqlParseException {
103+
String mappings = fileToString(
104+
INDEX_MAPPING_JSON,
105+
false
106+
);
107+
createIndexWithMappingSource(
108+
indexName,
109+
Settings.builder()
110+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
111+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
112+
.put("index.refresh_interval", -1)
113+
.build(),
114+
mappings
115+
);
116+
BulkRequest bulkRequest = prepareBulkRequest(DATA);
117+
client().bulk(bulkRequest).actionGet();
118+
client().admin().indices().prepareRefresh().get();
119+
client().admin().indices().prepareFlush().get();
120+
121+
// TODO: run in a loop
122+
String sourceFile = fileToString(
123+
"q30.json",
124+
false
125+
);
126+
SearchSourceBuilder source = new SearchSourceBuilder();
127+
// Define your SQL query - use quoted lowercase to preserve case
128+
// String sql = "select COUNT(\"UserID\") from \"hits\"";
129+
// String sql = "select \"RegionID\", SUM(\"UserID\") from \"hits\" GROUP BY \"RegionID\"";
130+
String sql = "SELECT \"RegionID\", COUNT(DISTINCT \"UserID\") as u FROM \"hits\" GROUP BY \"RegionID\" ORDER BY u DESC LIMIT 10";
131+
String createStatement = fileToString("schema.sql", false);
132+
133+
134+
135+
// Convert to Substrait
136+
org.apache.calcite.prepare.CalciteCatalogReader calciteCatalogReader = SubstraitCreateStatementParser.processCreateStatementsToCatalog(createStatement);
137+
Plan plan = new SqlToSubstrait().convert(sql, calciteCatalogReader);
138+
PlanProtoConverter planProtoConverter = new PlanProtoConverter();
139+
io.substrait.proto.Plan substraitPlanProto = planProtoConverter.toProto(plan);
140+
System.out.println(substraitPlanProto);
141+
source.queryPlanIR(substraitPlanProto.toByteArray());
142+
SearchResponse response = client().prepareStreamSearch(indexName).setSource(source).get();
143+
// TODO: Match expected results...
144+
}
145+
146+
private BulkRequest prepareBulkRequest(String fileName) {
147+
BulkRequest bulkRequest = new BulkRequest();
148+
String absolutePath = getResourceFilePath(fileName);
149+
try (BufferedReader br = new BufferedReader(
150+
new InputStreamReader(new FileInputStream(absolutePath), StandardCharsets.UTF_8))) {
151+
String line;
152+
while ((line = br.readLine()) != null) {
153+
if (line.trim().isEmpty()) {
154+
continue;
155+
}
156+
IndexRequest indexRequest = new IndexRequest(indexName);
157+
indexRequest.source(line, MediaTypeRegistry.JSON);
158+
bulkRequest.add(indexRequest);
159+
System.out.println(line);
160+
}
161+
} catch (IOException e) {
162+
throw new RuntimeException(e);
163+
}
164+
return bulkRequest;
165+
}
166+
84167
static String getResourceFilePath(String relPath) {
85168
return DataFusionSingleNodeTests.class.getClassLoader().getResource(relPath).getPath();
86169
}

0 commit comments

Comments
 (0)