diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/CallTreeFactory.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/CallTreeFactory.java index 6c93c4221..1b13f5c6d 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/CallTreeFactory.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/CallTreeFactory.java @@ -27,6 +27,7 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Sarg; import org.apache.wayang.basic.data.Record; import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction; @@ -55,10 +56,11 @@ public default Node fromRexNode(final RexNode node) { * serializable function * * @param kind {@link SqlKind} from {@link RexCall} SqlOperator + * @param returnType return type of the {@link RexCall} * @return a serializable function of +, -, * or / * @throws UnsupportedOperationException on unrecognized {@link SqlKind} */ - public SerializableFunction, Object> deriveOperation(final SqlKind kind); + public SerializableFunction, Object> deriveOperation(final SqlKind kind, final SqlTypeName returnType); } interface Node extends Serializable { @@ -71,7 +73,7 @@ final class Call implements Node { protected Call(final RexCall call, final CallTreeFactory tree) { operands = call.getOperands().stream().map(tree::fromRexNode).toList(); - operation = tree.deriveOperation(call.getKind()); + operation = tree.deriveOperation(call.getKind(), call.getType().getSqlTypeName()); } @Override diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterPredicateImpl.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterPredicateImpl.java index cc587cd5a..17409cb3e 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterPredicateImpl.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/FilterPredicateImpl.java @@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.runtime.SqlFunctions; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.DateString; import org.apache.calcite.util.NlsString; import org.apache.wayang.basic.data.Record; @@ -37,7 +38,7 @@ public class FilterPredicateImpl implements FunctionDescriptor.SerializablePredicate { class FilterCallTreeFactory implements CallTreeFactory { - public SerializableFunction, Object> deriveOperation(final SqlKind kind) { + public SerializableFunction, Object> deriveOperation(final SqlKind kind, final SqlTypeName returnType) { return input -> switch (kind) { case NOT -> !(boolean) input.get(0); case IS_NOT_NULL -> !isEqualTo(input.get(0), null); @@ -55,6 +56,8 @@ public SerializableFunction, Object> deriveOperation(final SqlKind case OR -> input.stream().anyMatch(obj -> Boolean.class.cast(obj).booleanValue()); case MINUS -> widenToDouble.apply(input.get(0)) - widenToDouble.apply(input.get(1)); case PLUS -> widenToDouble.apply(input.get(0)) + widenToDouble.apply(input.get(1)); + // TODO: may need better support for CASTing in the future. See sqlCast() in this file. + case CAST -> input.get(0) instanceof Number ? widenToDouble.apply(input.get(0)) : ensureComparable.apply(input.get(0)); case SEARCH -> { if (input.get(0) instanceof final ImmutableRangeSet range) { assert input.get(1) instanceof Comparable @@ -81,6 +84,16 @@ public SerializableFunction, Object> deriveOperation(final SqlKind }; } + /** + * Java implementation of SQL cast. + * @param input input field + * @param type the new return type of the field + * @return Java-type equivalent to {@link SqlTypeName} counterpart. + */ + private static Object sqlCast(Object input, SqlTypeName type){ + throw new UnsupportedOperationException("sqlCasting is not yet implemented."); + } + /** * Java equivalent of SQL like clauses * @@ -88,7 +101,7 @@ public SerializableFunction, Object> deriveOperation(final SqlKind * @param s2 * @return true if {@code s1} like {@code s2} */ - private boolean like(final String s1, final String s2) { + private static boolean like(final String s1, final String s2) { return new SqlFunctions.LikeFunction().like(s1, s2); } @@ -99,7 +112,7 @@ private boolean like(final String s1, final String s2) { * @param o2 * @return true if {@code o1 > o2} */ - private boolean isGreaterThan(final Object o1, final Object o2) { + private static boolean isGreaterThan(final Object o1, final Object o2) { return ensureComparable.apply(o1).compareTo(ensureComparable.apply(o2)) > 0; } @@ -110,7 +123,7 @@ private boolean isGreaterThan(final Object o1, final Object o2) { * @param o2 * @return true if {@code o1 < o2} */ - private boolean isLessThan(final Object o1, final Object o2) { + private static boolean isLessThan(final Object o1, final Object o2) { return ensureComparable.apply(o1).compareTo(ensureComparable.apply(o2)) < 0; } @@ -121,7 +134,7 @@ private boolean isLessThan(final Object o1, final Object o2) { * @param o2 * @return true if {@code o1 == o2} */ - private boolean isEqualTo(final Object o1, final Object o2) { + private static boolean isEqualTo(final Object o1, final Object o2) { return Objects.equals(ensureComparable.apply(o1), ensureComparable.apply(o2)); } } @@ -133,7 +146,7 @@ private boolean isEqualTo(final Object o1, final Object o2) { * * @throws UnsupportedOperationException if conversion was not possible */ - final SerializableFunction widenToDouble = field -> { + final static SerializableFunction widenToDouble = field -> { if (field instanceof final Number number) { return number.doubleValue(); } else if (field instanceof final Date date) { @@ -148,7 +161,7 @@ private boolean isEqualTo(final Object o1, final Object o2) { /** * Widening conversions, all numbers to double */ - final SerializableFunction ensureComparable = field -> { + final static SerializableFunction ensureComparable = field -> { if (field instanceof final Number number) { return number.doubleValue(); } else if (field instanceof final Date date) { diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/ProjectMapFuncImpl.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/ProjectMapFuncImpl.java index e06f2328d..a28ef8967 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/ProjectMapFuncImpl.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/ProjectMapFuncImpl.java @@ -22,7 +22,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; - +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.wayang.core.function.FunctionDescriptor; import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction; import org.apache.wayang.basic.data.Record; @@ -38,7 +38,7 @@ public ProjectMapFuncImpl(final List projects) { } class ProjectCallTreeFactory implements CallTreeFactory { - public SerializableFunction, Object> deriveOperation(final SqlKind kind) { + public SerializableFunction, Object> deriveOperation(final SqlKind kind, final SqlTypeName returnType) { return input -> switch (kind) { case PLUS -> asDouble(input.get(0)) + asDouble(input.get(1)); diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java index 9d10106ab..88114774b 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java @@ -79,8 +79,6 @@ import org.apache.wayang.basic.data.Record; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; -import org.apache.wayang.core.api.Job; -import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate; import org.apache.wayang.core.mapping.PlanTransformation; import org.apache.wayang.core.plan.wayangplan.Operator; @@ -102,8 +100,7 @@ class SqlToWayangRelTest { /** * Method for building {@link WayangPlan}s useful for testing, benchmarking and - * other - * usages where you want to handle the intermediate {@link WayangPlan} + * other usages where you want to handle the intermediate {@link WayangPlan} * * @param sql sql query string with the {@code ;} cut off * @param udfJars @@ -116,38 +113,65 @@ private Tuple2, WayangPlan> buildCollectorAndWayangPlan(final final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); - final Optimizer optimizer = Optimizer.create( - SchemaUtils.getSchema(context.getConfiguration()), - configProperties, - relDataTypeFactory); + final Optimizer optimizer = Optimizer.create(SchemaUtils.getSchema(context.getConfiguration()), + configProperties, relDataTypeFactory); final SqlNode sqlNode = optimizer.parseSql(sql); final SqlNode validatedSqlNode = optimizer.validate(sqlNode); final RelNode relNode = optimizer.convert(validatedSqlNode); - final RuleSet rules = RuleSets.ofList( - CoreRules.FILTER_INTO_JOIN, - WayangRules.WAYANG_TABLESCAN_RULE, - WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, - WayangRules.WAYANG_PROJECT_RULE, - WayangRules.WAYANG_FILTER_RULE, - WayangRules.WAYANG_JOIN_RULE, - WayangRules.WAYANG_AGGREGATE_RULE, + final RuleSet rules = RuleSets.ofList(CoreRules.FILTER_INTO_JOIN, WayangRules.WAYANG_TABLESCAN_RULE, + WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, WayangRules.WAYANG_PROJECT_RULE, + WayangRules.WAYANG_FILTER_RULE, WayangRules.WAYANG_JOIN_RULE, WayangRules.WAYANG_AGGREGATE_RULE, WayangRules.WAYANG_SORT_RULE); - final RelNode wayangRel = optimizer.optimize( - relNode, - relNode.getTraitSet().plus(WayangConvention.INSTANCE), + final RelNode wayangRel = optimizer.optimize(relNode, relNode.getTraitSet().plus(WayangConvention.INSTANCE), rules); final Collection collector = new ArrayList<>(); - final WayangPlan wayangPlan = Optimizer.convertWithConfig(wayangRel, context.getConfiguration(), - collector); + final WayangPlan wayangPlan = Optimizer.convertWithConfig(wayangRel, context.getConfiguration(), collector); return new Tuple2<>(collector, wayangPlan); } + @Test + void javaFilterWithCastUsingNumbers() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.exampleInt WHERE CAST(NAMEB AS DOUBLE) = 1.0"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes() + .forEach(node -> node.addTargetPlatform(Java.platform())); + + sqlContext.execute(wayangPlan); + + assertTrue(!result.isEmpty()); + assertTrue(result.stream().allMatch(field -> field.getField(1).equals(1))); + } + + + @Test + void javaFilterWithCast() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE CAST(NAMEB AS VARCHAR) = 'test1'"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes() + .forEach(node -> node.addTargetPlatform(Java.platform())); + + sqlContext.execute(wayangPlan); + + assertTrue(!result.isEmpty()); + assertTrue(result.stream().allMatch(field -> field.getField(1).equals("test1"))); + } + @Test void sqlApiSourceTest() throws Exception { final JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(); @@ -158,10 +182,8 @@ void sqlApiSourceTest() throws Exception { final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); final SchemaPlus rootSchema = Frameworks.createRootSchema(true); - final RelDataType rowType = new Builder(typeFactory) - .add("ID", typeFactory.createJavaType(Integer.class)) - .add("NAME", typeFactory.createJavaType(String.class)) - .build(); + final RelDataType rowType = new Builder(typeFactory).add("ID", typeFactory.createJavaType(Integer.class)) + .add("NAME", typeFactory.createJavaType(String.class)).build(); rootSchema.add("T1", new AbstractTable() { @Override @@ -170,11 +192,8 @@ public RelDataType getRowType(final RelDataTypeFactory typeFactory) { } }); - final RelOptSchema relOptSchema = new CalciteCatalogReader( - CalciteSchema.from(rootSchema), - CalciteSchema.from(rootSchema).path(null), - typeFactory, - mock()); + final RelOptSchema relOptSchema = new CalciteCatalogReader(CalciteSchema.from(rootSchema), + CalciteSchema.from(rootSchema).path(null), typeFactory, mock()); final RelOptTable t1 = relOptSchema.getTableForMember(Arrays.asList("T1")); @@ -187,40 +206,34 @@ public RelDataType getRowType(final RelDataTypeFactory typeFactory) { final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); - final Optimizer optimizer = Optimizer.create( - CalciteSchema.from(rootSchema), - configProperties, + final Optimizer optimizer = Optimizer.create(CalciteSchema.from(rootSchema), configProperties, relDataTypeFactory); final SqlNode validatedSqlNode = optimizer.validate(sqlNode); final RelNode relNode = optimizer.convert(validatedSqlNode); - final RuleSet rules = RuleSets.ofList( - CoreRules.FILTER_INTO_JOIN, - WayangRules.WAYANG_TABLESCAN_RULE, - WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, - WayangRules.WAYANG_PROJECT_RULE, - WayangRules.WAYANG_FILTER_RULE, - WayangRules.WAYANG_JOIN_RULE, - WayangRules.WAYANG_AGGREGATE_RULE, + final RuleSet rules = RuleSets.ofList(CoreRules.FILTER_INTO_JOIN, WayangRules.WAYANG_TABLESCAN_RULE, + WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, WayangRules.WAYANG_PROJECT_RULE, + WayangRules.WAYANG_FILTER_RULE, WayangRules.WAYANG_JOIN_RULE, WayangRules.WAYANG_AGGREGATE_RULE, WayangRules.WAYANG_SORT_RULE); - final RelNode wayangRel = optimizer.optimize( - relNode, - relNode.getTraitSet().plus(WayangConvention.INSTANCE), + final RelNode wayangRel = optimizer.optimize(relNode, relNode.getTraitSet().plus(WayangConvention.INSTANCE), rules); final WayangPlan plan = Optimizer.convert(wayangRel, new ArrayList()); final ProjectionMapping projectionMapping = new ProjectionMapping(); - final PlanTransformation projectionTransformation = projectionMapping.getTransformations().iterator().next().thatReplaces(); + final PlanTransformation projectionTransformation = projectionMapping.getTransformations().iterator().next() + .thatReplaces(); plan.applyTransformations(List.of(projectionTransformation)); final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()).getTraversedNodes(); - final JdbcTableSource table = operators.stream().filter(op -> op instanceof JdbcTableSource).map(JdbcTableSource.class::cast).findFirst().orElseThrow(); - final JdbcProjectionOperator projection = operators.stream().filter(op -> op instanceof JdbcProjectionOperator).map(JdbcProjectionOperator.class::cast).findFirst().orElseThrow(); + final JdbcTableSource table = operators.stream().filter(op -> op instanceof JdbcTableSource) + .map(JdbcTableSource.class::cast).findFirst().orElseThrow(); + final JdbcProjectionOperator projection = operators.stream().filter(op -> op instanceof JdbcProjectionOperator) + .map(JdbcProjectionOperator.class::cast).findFirst().orElseThrow(); final JdbcExecutor jdbcExecutor = mock(); final StringBuilder query = JdbcExecutor.createSqlString(jdbcExecutor, table, List.of(), projection, List.of()); @@ -242,8 +255,8 @@ void javaJoinTest() throws Exception { sqlContext.execute(wayangPlan); - assertTrue(result.stream() - .anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1")))); + assertTrue( + result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1")))); } @Test @@ -368,8 +381,7 @@ void filterIsNotNull() throws Exception { void javaReduceBy() throws Exception { final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( - sqlContext, + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, "select exampleSmallA.COLA, count(*) from fs.exampleSmallA group by exampleSmallA.COLA"); final Collection result = t.field0; @@ -387,8 +399,7 @@ void javaReduceBy() throws Exception { void javaCrossJoin() throws Exception { final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( - sqlContext, + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, "select * from fs.exampleSmallA cross join fs.exampleSmallB"); final Collection result = t.field0; @@ -396,12 +407,10 @@ void javaCrossJoin() throws Exception { sqlContext.execute(wayangPlan); - final List shouldBe = List.of( - new Record("item1", "item2", "item1", "item2", "item3"), + final List shouldBe = List.of(new Record("item1", "item2", "item1", "item2", "item3"), new Record("item1", "item2", "item1", "item2", "item3"), new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "x", "x", "x"), + new Record("item1", "item2", "item1", "item2", "item3"), new Record("item1", "item2", "x", "x", "x"), new Record("item1", "item2", "x", "x", "x")); final Map resultTally = result.stream() @@ -527,8 +536,7 @@ void joinWithLargeLeftTableIndexCorrect() throws Exception { final WayangPlan wayangPlan = t.field1; sqlContext.execute(wayangPlan); - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), + final List shouldBe = List.of(new Record("test1", "test1", "test2", "test1", "test1", "test2"), new Record("test2", "", "test2", "", "test2", "test2"), new Record("", "test2", "test2", "test2", "", "test2")); @@ -558,8 +566,7 @@ void joinWithLargeLeftTableIndexMirrorAlias() throws Exception { final WayangPlan wayangPlan = t.field1; sqlContext.execute(wayangPlan); - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), + final List shouldBe = List.of(new Record("test1", "test1", "test2", "test1", "test1", "test2"), new Record("test2", "", "test2", "", "test2", "test2"), new Record("", "test2", "test2", "test2", "", "test2")); @@ -626,8 +633,7 @@ void sparkInnerJoin() throws Exception { sqlContext.execute(wayangPlan); - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), + final List shouldBe = List.of(new Record("test1", "test1", "test2", "test1", "test1", "test2"), new Record("test2", "", "test2", "", "test2", "test2"), new Record("", "test2", "test2", "test2", "", "test2")); @@ -645,8 +651,7 @@ void serializeProjection() throws Exception { final RelDataTypeFactory typeFactory = rb.getTypeFactory(); final RelDataType intType = typeFactory.createSqlType(SqlTypeName.INTEGER); - final RelDataType rowType = typeFactory.createStructType( - Arrays.asList(intType, intType, intType), + final RelDataType rowType = typeFactory.createStructType(Arrays.asList(intType, intType, intType), Arrays.asList("x", "b", "y")); final RexNode inputRefX = rb.makeInputRef(rowType, 0); @@ -692,8 +697,7 @@ public void serializeFilter() throws Exception { objectOutputStream.writeObject(fpImpl); objectOutputStream.close(); - final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( - byteArrayOutputStream.toByteArray()); + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); final Object deserializedObject = objectInputStream.readObject(); objectInputStream.close(); @@ -742,10 +746,8 @@ void exampleCustomDelimiter() throws Exception { " \"type\": \"custom\",\r\n" + // " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + // " \"operand\": {\r\n" + // - " \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() - + "\",\r\n" + // - " \"delimiter\": \"|\"" + - " }\r\n" + // + " \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() + "\",\r\n" + // + " \"delimiter\": \"|\"" + " }\r\n" + // " }\r\n" + // " ]\r\n" + // " }\r\n" + // @@ -753,8 +755,7 @@ void exampleCustomDelimiter() throws Exception { final JsonNode calciteModelJSON = new ObjectMapper().readTree(calciteModel); - final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON) - .setProperties(); + final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON).setProperties(); assertNotNull(configuration, "Could not get configuration with calcite model: " + calciteModel); final String tableResourceName = "/data/exampleDelimiter.csv"; @@ -765,13 +766,9 @@ void exampleCustomDelimiter() throws Exception { configuration.setProperty("wayang.fs.table.url", dataPath); - configuration.setProperty( - "wayang.ml.executions.file", - "mle" + ".txt"); + configuration.setProperty("wayang.ml.executions.file", "mle" + ".txt"); - configuration.setProperty( - "wayang.ml.optimizations.file", - "mlo" + ".txt"); + configuration.setProperty("wayang.ml.optimizations.file", "mlo" + ".txt"); configuration.setProperty("wayang.ml.experience.enabled", "false"); @@ -791,28 +788,17 @@ void exampleCustomDelimiter() throws Exception { private SqlContext createSqlContext(final String tableResourceName) throws IOException, ParseException, SQLException { - final String calciteModel = "{\r\n" + - " \"calcite\": {\r\n" + - " \"version\": \"1.0\",\r\n" + - " \"defaultSchema\": \"wayang\",\r\n" + - " \"schemas\": [\r\n" + - " {\r\n" + - " \"name\": \"fs\",\r\n" + - " \"type\": \"custom\",\r\n" + - " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + - " \"operand\": {\r\n" + - " \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() - + "\"\r\n" + - " }\r\n" + - " }\r\n" + - " ]\r\n" + - " }\r\n" + - " }"; + final String calciteModel = "{\r\n" + " \"calcite\": {\r\n" + " \"version\": \"1.0\",\r\n" + + " \"defaultSchema\": \"wayang\",\r\n" + " \"schemas\": [\r\n" + " {\r\n" + + " \"name\": \"fs\",\r\n" + " \"type\": \"custom\",\r\n" + + " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + + " \"operand\": {\r\n" + " \"directory\": \"" + "/" + + this.getClass().getResource("/data").getPath() + "\"\r\n" + " }\r\n" + " }\r\n" + + " ]\r\n" + " }\r\n" + " }"; final JsonNode calciteModelJSON = new ObjectMapper().readTree(calciteModel); - final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON) - .setProperties(); + final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON).setProperties(); assertNotNull(configuration, "Could not get configuration with calcite model: " + calciteModel); final String dataPath = this.getClass().getResource(tableResourceName).getPath(); @@ -821,13 +807,9 @@ private SqlContext createSqlContext(final String tableResourceName) configuration.setProperty("wayang.fs.table.url", dataPath); - configuration.setProperty( - "wayang.ml.executions.file", - "mle" + ".txt"); + configuration.setProperty("wayang.ml.executions.file", "mle" + ".txt"); - configuration.setProperty( - "wayang.ml.optimizations.file", - "mlo" + ".txt"); + configuration.setProperty("wayang.ml.optimizations.file", "mlo" + ".txt"); configuration.setProperty("wayang.ml.experience.enabled", "false");