diff --git a/connectors/hive-connector/pom.xml b/connectors/hive-connector/pom.xml index c71ebcd..ba5442a 100644 --- a/connectors/hive-connector/pom.xml +++ b/connectors/hive-connector/pom.xml @@ -42,6 +42,7 @@ org.apache.tsfile common ${tsfile.version} + test org.apache.tsfile diff --git a/connectors/pom.xml b/connectors/pom.xml index ad2c299..7742633 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -37,6 +37,7 @@ hadoop hive-connector spark-iotdb-connector + spark-iotdb-table-connector spark-tsfile zeppelin-interpreter diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/pom.xml b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/pom.xml new file mode 100644 index 0000000..2392d53 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/pom.xml @@ -0,0 +1,76 @@ + + + + 4.0.0 + + org.apache.iotdb + spark-iotdb-table-connector + 2.0.2-SNAPSHOT + + spark-iotdb-table-connector-3.3 + IoTDB: Table Connector: Apache Spark3.3 (Scala 2.12) + + 3.3.0 + + + + org.scala-lang + scala-library + provided + + + org.apache.iotdb + spark-iotdb-table-common + ${project.version} + + + org.apache.spark + spark-sql_${scala.version} + ${spark.version} + provided + + + + + + org.scala-tools + maven-scala-plugin + + + + compile + testCompile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..1f46523 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +org.apache.iotdb.spark.table.db.IoTDBTableProvider diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala new file mode 100644 index 0000000..4c98589 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.3/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import org.apache.spark.sql.sources.DataSourceRegister + + +class IoTDBTableProvider extends AbstractIoTDBTableProvider with DataSourceRegister { + + override def shortName(): String = "iotdb" +} diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/pom.xml b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/pom.xml new file mode 100644 index 0000000..4f546cb --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/pom.xml @@ -0,0 +1,76 @@ + + + + 4.0.0 + + org.apache.iotdb + spark-iotdb-table-connector + 2.0.2-SNAPSHOT + + spark-iotdb-table-connector-3.4 + IoTDB: Table Connector: Apache Spark3.4 (Scala 2.12) + + 3.4.0 + + + + org.scala-lang + scala-library + provided + + + org.apache.iotdb + spark-iotdb-table-common + ${project.version} + + + org.apache.spark + spark-sql_${scala.version} + ${spark.version} + provided + + + + + + org.scala-tools + maven-scala-plugin + + + + compile + testCompile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..1f46523 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +org.apache.iotdb.spark.table.db.IoTDBTableProvider diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala new file mode 100644 index 0000000..4c98589 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.4/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import org.apache.spark.sql.sources.DataSourceRegister + + +class IoTDBTableProvider extends AbstractIoTDBTableProvider with DataSourceRegister { + + override def shortName(): String = "iotdb" +} diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/pom.xml b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/pom.xml new file mode 100644 index 0000000..3ce7f1d --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/pom.xml @@ -0,0 +1,76 @@ + + + + 4.0.0 + + org.apache.iotdb + spark-iotdb-table-connector + 2.0.2-SNAPSHOT + + spark-iotdb-table-connector-3.5 + IoTDB: Table Connector: Apache Spark3.5 (Scala 2.12) + + 3.5.0 + + + + org.scala-lang + scala-library + provided + + + org.apache.iotdb + spark-iotdb-table-common + ${project.version} + + + org.apache.spark + spark-sql_${scala.version} + ${spark.version} + provided + + + + + + org.scala-tools + maven-scala-plugin + + + + compile + testCompile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 0000000..1f46523 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +org.apache.iotdb.spark.table.db.IoTDBTableProvider diff --git a/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala new file mode 100644 index 0000000..4c98589 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/iotdb-table-connector-3.5/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTableProvider.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import org.apache.spark.sql.sources.DataSourceRegister + + +class IoTDBTableProvider extends AbstractIoTDBTableProvider with DataSourceRegister { + + override def shortName(): String = "iotdb" +} diff --git a/connectors/spark-iotdb-table-connector/pom.xml b/connectors/spark-iotdb-table-connector/pom.xml new file mode 100644 index 0000000..7b7a4b1 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/pom.xml @@ -0,0 +1,152 @@ + + + + 4.0.0 + + org.apache.iotdb + connectors + 2.0.2-SNAPSHOT + + spark-iotdb-table-connector + + pom + IoTDB: Table Connector: Apache Spark + + spark-iotdb-table-common + iotdb-table-connector-3.5 + iotdb-table-connector-3.4 + iotdb-table-connector-3.3 + + + 11 + 11 + UTF-8 + 3.5.0 + + + + + org.apache.spark + spark-catalyst_${scala.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql-api_${scala.version} + ${spark.version} + provided + + + org.apache.spark + spark-common-utils_${scala.version} + 3.5.0 + provided + + + com.fasterxml.jackson.core + jackson-databind + 2.15.0 + + + org.apache.tsfile + common + ${tsfile.version} + compile + + + org.apache.iotdb + iotdb-session + ${iotdb.version} + compile + + + org.apache.tsfile + tsfile + ${tsfile.version} + + + org.apache.iotdb + isession + ${iotdb.version} + + + + + + + + + org.scala-tools + maven-scala-plugin + + ${scala.library.version} + + + + + add-source + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + spark-iotdb-table-connector_${spark.version}_${scala.version}-${project.version} + + + + org.apache.maven.plugins + maven-shade-plugin + 3.5.1 + + + + org.apache.thrift + shade.org.apache.thrift + + + true + jar-with-dependencies + false + + + + + shade + + + + + + + + diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/pom.xml b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/pom.xml new file mode 100644 index 0000000..abe34db --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/pom.xml @@ -0,0 +1,118 @@ + + + + 4.0.0 + + org.apache.iotdb + spark-iotdb-table-connector + 2.0.2-SNAPSHOT + + spark-iotdb-table-common + IoTDB: Table Connector: Apache Spark Common + + + org.apache.spark + spark-sql-api_${scala.version} + ${spark.version} + provided + + + org.apache.spark + spark-catalyst_${scala.version} + ${spark.version} + provided + + + org.apache.spark + spark-unsafe_${scala.version} + ${spark.version} + provided + + + org.slf4j + slf4j-api + provided + + + org.apache.spark + spark-common-utils_${scala.version} + 3.5.0 + provided + + + org.apache.tsfile + common + + + org.apache.iotdb + iotdb-session + + + org.apache.tsfile + tsfile + + + org.apache.iotdb + isession + + + org.scala-lang + scala-library + provided + + + junit + junit + test + + + org.scalatest + scalatest_2.12 + test + + + org.scalactic + scalactic_2.12 + test + + + + + + org.scala-tools + maven-scala-plugin + + + org.scala-tools + maven-scala-plugin + + + + compile + testCompile + + + + + + + diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/AbstractIoTDBTableProvider.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/AbstractIoTDBTableProvider.scala new file mode 100644 index 0000000..1434e7d --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/AbstractIoTDBTableProvider.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter + +/** + * IoTDBTableProvider is a Spark DataSource V2 provider for IoTDB. + * It supports schema inference and table access. + */ +abstract class AbstractIoTDBTableProvider extends TableProvider { + + override def inferSchema(caseInsensitiveStringMap: CaseInsensitiveStringMap): StructType = { + IoTDBUtils.getSchema(IoTDBOptions.fromMap(caseInsensitiveStringMap.asCaseSensitiveMap().asScala.toMap)) + } + + override def getTable(structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = { + val db = map.get(IoTDBOptions.IOTDB_DATABASE) + val table = map.get(IoTDBOptions.IOTDB_TABLE) + new IoTDBTable(Identifier.of(Array[String](db), table), structType, IoTDBOptions.fromMap(map.asScala.toMap)) + } +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBOptions.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBOptions.scala new file mode 100644 index 0000000..e03fef3 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBOptions.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import scala.collection.JavaConverters.seqAsJavaListConverter + +class IoTDBOptions( + @transient private val properties: Map[String, String]) + extends Serializable { + + val urls = properties.getOrElse(IoTDBOptions.IOTDB_URLS, sys.error(s"Option '${IoTDBOptions.IOTDB_URLS}' not specified")).split(",").toList.asJava + + val username = properties.getOrElse(IoTDBOptions.IOTDB_USERNAME, "root") + + val password = properties.getOrElse(IoTDBOptions.IOTDB_PASSWORD, "root") + + val database = properties.getOrElse(IoTDBOptions.IOTDB_DATABASE, sys.error(s"Option '${IoTDBOptions.IOTDB_DATABASE}' not specified")) + + val table = properties.getOrElse(IoTDBOptions.IOTDB_TABLE, sys.error(s"Option '${IoTDBOptions.IOTDB_TABLE}' not specified")) + +} + +object IoTDBOptions { + val IOTDB_USERNAME = "iotdb.username" + val IOTDB_PASSWORD = "iotdb.password" + val IOTDB_URLS = "iotdb.urls" + val IOTDB_DATABASE = "iotdb.database" + val IOTDB_TABLE = "iotdb.table" + + def fromMap(sparkMap: Map[String, String]): IoTDBOptions = { + new IoTDBOptions(sparkMap.map { case (k, v) => (k.toLowerCase, v) }) + } +} \ No newline at end of file diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTable.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTable.scala new file mode 100644 index 0000000..6139e0b --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBTable.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import org.apache.iotdb.spark.table.db.read.IoTDBScanBuilder +import org.apache.iotdb.spark.table.db.write.IoTDBWriteBuilder +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util +import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter} +import scala.language.implicitConversions + +/** + * Represents an IoTDB table in Spark, supporting read and write operations. + * + * @param identifier The unique identifier of the table. + * @param schema The schema of the table. + * @param iotdbOptions Configuration options for IoTDB. + */ +class IoTDBTable(identifier: Identifier, schema: StructType, iotdbOptions: IoTDBOptions) extends Table with SupportsRead with SupportsWrite { + + override def name(): String = identifier.toString + + override def schema(): StructType = schema + + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.BATCH_READ, + TableCapability.BATCH_WRITE, + TableCapability.ACCEPT_ANY_SCHEMA).asJava + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new IoTDBScanBuilder(IoTDBOptions.fromMap(options.asCaseSensitiveMap().asScala.toMap), schema()) + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + val incomingSchema = info.schema() + if (incomingSchema.fields.length > schema.fields.length) { + throw new IllegalArgumentException( + s"The incoming schema has more fields (${incomingSchema.fields.length}) than the table schema (${schema.fields.length})." + ) + } + new IoTDBWriteBuilder(iotdbOptions, incomingSchema, schema) + } +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala new file mode 100644 index 0000000..97ea02e --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import org.apache.iotdb.isession.SessionDataSet +import org.apache.iotdb.session.TableSessionBuilder +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.tsfile.enums.TSDataType +import org.apache.tsfile.read.common.RowRecord +import org.apache.tsfile.utils.{Binary, DateUtils} +import org.apache.tsfile.write.record.Tablet.ColumnCategory + +import java.util + +object IoTDBUtils { + + val TIME = "time" + val COLUMN_CATEGORY = "category" + + /** + * Retrieves the schema of an IoTDB table. + * + * @param options IoTDB options. + * @return The schema as a Spark `StructType`. + */ + def getSchema(options: IoTDBOptions): StructType = { + val session = new TableSessionBuilder() + .username(options.username) + .password(options.password) + .nodeUrls(options.urls) + .database(options.database) + .build() + val structFields = new util.ArrayList[StructField]() + var dataSet: SessionDataSet = null + try { + dataSet = session.executeQueryStatement(s"DESC ${options.table}") + while (dataSet.hasNext) { + val row: RowRecord = dataSet.next() + val columnName = row.getField(0).getStringValue + val dataType = row.getField(1).getStringValue + val columnType = row.getField(2).getStringValue + structFields.add(StructField(columnName, getSparkDataType(dataType), nullable = !TIME.equals(columnName), metadata = new MetadataBuilder().putString(COLUMN_CATEGORY, columnType).build())) + } + } catch { + case e: Exception => throw SparkException.internalError(s"Failed to get schema of table ${options.table}.", e) + } finally { + if (dataSet != null) { + dataSet.close() + } + session.close() + } + new StructType(structFields.toArray(Array[StructField]())) + } + + private def getSparkDataType(iotdbDataTypeStr: String): DataType = { + iotdbDataTypeStr.toUpperCase match { + case "BOOLEAN" => BooleanType + case "INT32" => IntegerType + case "DATE" => DateType + case "INT64" => LongType + case "TIMESTAMP" => LongType + case "FLOAT" => FloatType + case "DOUBLE" => DoubleType + case "TEXT" => StringType + case "BLOB" => BinaryType + case "STRING" => StringType + case _ => StringType + } + } + + def getSparkValue(sparkDataType: DataType, dataSetIterator: SessionDataSet#DataIterator, columnIdx: Int): Any = { + sparkDataType match { + case BooleanType => dataSetIterator.getBoolean(columnIdx) + case IntegerType => dataSetIterator.getInt(columnIdx) + case DateType => DateTimeUtils.fromJavaDate(DateUtils.parseIntToDate(dataSetIterator.getInt(columnIdx))) + case LongType => dataSetIterator.getLong(columnIdx) + case FloatType => dataSetIterator.getFloat(columnIdx) + case DoubleType => dataSetIterator.getDouble(columnIdx) + case StringType => UTF8String.fromString(dataSetIterator.getString(columnIdx)) + case BinaryType => getByteArrayFromHexString(dataSetIterator.getString(columnIdx)) + case TimestampType => dataSetIterator.getLong(columnIdx) + } + } + + private def getByteArrayFromHexString(value: String): Array[Byte] = { + if (value.isEmpty) { + new Array[Byte](0) + } + require(value.length % 2 == 0, "The length of the hex string must be even.") + value.substring(2).sliding(2, 2).map(Integer.parseInt(_, 16).toByte).toArray + } + + def getIoTDBHexStringFromByteArray(value: Array[Byte]): String = { + s"X'${value.map(b => f"$b%02X").mkString("")}'" + } + + def getIoTDBDataType(sparkDataType: DataType): TSDataType = { + sparkDataType match { + case BooleanType => TSDataType.BOOLEAN + case ByteType => TSDataType.INT32 + case ShortType => TSDataType.INT32 + case IntegerType => TSDataType.INT32 + case LongType => TSDataType.INT64 + case FloatType => TSDataType.FLOAT + case DoubleType => TSDataType.DOUBLE + case StringType => TSDataType.STRING + case BinaryType => TSDataType.BLOB + case DateType => TSDataType.DATE + case TimestampType => TSDataType.STRING + case _ => TSDataType.STRING + } + } + + def getIoTDBValue(sparkDataType: DataType, value: Any): Any = { + sparkDataType match { + case BooleanType => value.asInstanceOf[Boolean] + case ByteType => value.asInstanceOf[Byte].toInt + case ShortType => value.asInstanceOf[Short].toInt + case IntegerType => value.asInstanceOf[Int] + case LongType => value.asInstanceOf[Long] + case FloatType => value.asInstanceOf[Float] + case DoubleType => value.asInstanceOf[Double] + case StringType => value.asInstanceOf[UTF8String].toString + case BinaryType => new Binary(value.asInstanceOf[Array[Byte]]) + case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Integer]).toLocalDate + case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long]).toString + case _ => value.toString + } + } + + def getIoTDBColumnCategory(columnCategoryStr: String): ColumnCategory = { + columnCategoryStr.toUpperCase match { + case "TAG" => ColumnCategory.TAG + case "ATTRIBUTE" => ColumnCategory.ATTRIBUTE + case _ => ColumnCategory.FIELD + } + } + + def getIoTDBColumnIdentifierInSQL(sparkColumnIdentifier: String, isSparkNamedReference: Boolean): String = { + var str = sparkColumnIdentifier + if (isSparkNamedReference) { + str = sparkColumnIdentifier.replaceAll("``", "`") + if (str.startsWith("`") && str.endsWith("`")) { + str = str.substring(1, str.length - 1) + } + } + str = str.replaceAll("\"", "\"\"") + s""""$str"""" + } + +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBExpressionSQLBuilder.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBExpressionSQLBuilder.scala new file mode 100644 index 0000000..554ae6f --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBExpressionSQLBuilder.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.read + +import org.apache.iotdb.spark.table.db.IoTDBUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, Predicate} +import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, Literal, NamedReference} +import org.apache.spark.sql.types._ + +class IoTDBExpressionSQLBuilder { + + def build(predicate: Predicate): String = { + s"(${buildIoTDBExpressionSQL(predicate)})" + } + + private def buildIoTDBExpressionSQL(expression: Expression): String = { + expression match { + case literal: Literal[_] => visitLiteral(literal) + case namedReference: NamedReference => visitNamedReference(namedReference) + case expr: GeneralScalarExpression => visitGeneralScalarExpression(expr) + case _ => throw new UnsupportedOperationException("Unsupported push down expression: " + expression) + } + } + + private def visitLiteral(literal: Literal[_]): String = { + literal.dataType() match { + case StringType => s"'${literal.value().toString.replace("'", "''")}'" + case BinaryType => IoTDBUtils.getIoTDBHexStringFromByteArray(literal.value().asInstanceOf[Array[Byte]]) + case DateType => s"CAST('${DateTimeUtils.toJavaDate(Integer.parseInt(literal.value().toString))}' as DATE)" + case ShortType | IntegerType | ByteType | LongType | BooleanType | FloatType | DoubleType => literal.value().toString + case _ => throw new UnsupportedOperationException("Unsupported push down literal type: " + literal.dataType()) + } + } + + private def visitNamedReference(namedRef: NamedReference): String = { + IoTDBUtils.getIoTDBColumnIdentifierInSQL(namedRef.toString, true) + } + + private def visitAlwaysFalse(): String = { + "FALSE" + } + + private def visitAlwaysTrue(): String = { + "TRUE" + } + + private def visitOr(or: Or): String = { + s"(${buildIoTDBExpressionSQL(or.left())}) OR (${buildIoTDBExpressionSQL(or.right())})" + } + + private def visitAnd(and: And): String = { + s"(${buildIoTDBExpressionSQL(and.left())}) AND (${buildIoTDBExpressionSQL(and.right())})" + } + + private def visitNot(not: Not): String = { + s"NOT (${buildIoTDBExpressionSQL(not.child())})" + } + + private def visitGeneralScalarExpression(expr: GeneralScalarExpression): String = { + // <=> is unsupported + expr.name() match { + case "IS_NULL" => visitIsNull(expr) + case "IS_NOT_NULL" => visitIsNotNull(expr) + case "STARTS_WITH" => visitStartsWith(expr) + case "ENDS_WITH" => visitEndsWith(expr) + case "CONTAINS" => visitContains(expr) + case "IN" => visitIn(expr) + case "=" => visitEqualTo(expr) + case "<>" => visitNotEqualTo(expr) + case "<" => visitLess(expr) + case "<=" => visitLessOrEqual(expr) + case ">" => visitGreater(expr) + case ">=" => visitGreaterOrEqual(expr) + case "AND" => visitAnd(expr.asInstanceOf[And]) + case "OR" => visitOr(expr.asInstanceOf[Or]) + case "NOT" => visitNot(expr.asInstanceOf[Not]) + case "ALWAYS_TRUE" => visitAlwaysTrue() + case "ALWAYS_FALSE" => visitAlwaysFalse() + case _ => throw new UnsupportedOperationException("Unsupported push down expression: " + expr) + } + } + + private def visitIsNull(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} IS NULL" + } + + private def visitIsNotNull(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} IS NOT NULL" + } + + private def visitStartsWith(expr: Expression): String = { + val leftExpr = buildIoTDBExpressionSQL(expr.children()(0)) + val rightExpr = buildIoTDBExpressionSQL(expr.children()(1)) + s"starts_with(${leftExpr}, ${rightExpr})" + } + + private def visitEndsWith(expr: Expression): String = { + val leftExpr = buildIoTDBExpressionSQL(expr.children()(0)) + val rightExpr = buildIoTDBExpressionSQL(expr.children()(1)) + s"ends_with(${leftExpr}, ${rightExpr})" + } + + private def visitContains(expr: Expression): String = { + if (expr.children()(1).isInstanceOf[NamedReference]) { + throw new UnsupportedOperationException("Unsupported push down expression: contains non constant string") + } + val leftExpr = buildIoTDBExpressionSQL(expr.children()(0)) + val rightExpr = buildIoTDBExpressionSQL(expr.children()(1)) + s"$leftExpr LIKE '%${rightExpr.substring(1, rightExpr.length - 1)}%'" + } + + private def visitIn(expr: Expression): String = { + val expressions = expr.children() + val leftExpr = buildIoTDBExpressionSQL(expressions(0)) + val rightExpr = expressions.slice(1, expressions.length).map(buildIoTDBExpressionSQL).mkString(",") + s"$leftExpr IN ($rightExpr)" + } + + private def visitEqualTo(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} = ${buildIoTDBExpressionSQL(expr.children()(1))}" + } + + private def visitNotEqualTo(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} != ${buildIoTDBExpressionSQL(expr.children()(1))}" + } + + private def visitLess(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} < ${buildIoTDBExpressionSQL(expr.children()(1))}" + } + + private def visitLessOrEqual(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} <= ${buildIoTDBExpressionSQL(expr.children()(1))}" + } + + private def visitGreater(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} > ${buildIoTDBExpressionSQL(expr.children()(1))}" + } + + private def visitGreaterOrEqual(expr: Expression): String = { + s"${buildIoTDBExpressionSQL(expr.children()(0))} >= ${buildIoTDBExpressionSQL(expr.children()(1))}" + } + +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBInputPartition.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBInputPartition.scala new file mode 100644 index 0000000..4d45b9c --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBInputPartition.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.read + +import org.apache.spark.sql.connector.read.InputPartition + +class IoTDBInputPartition(sql: String) extends InputPartition { + + def getSQL: String = sql +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReader.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReader.scala new file mode 100644 index 0000000..4497586 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReader.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.read + +import org.apache.iotdb.isession.ITableSession +import org.apache.iotdb.session.TableSessionBuilder +import org.apache.iotdb.spark.table.db.{IoTDBOptions, IoTDBUtils} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} +import org.apache.spark.sql.types._ + +/** + * IoTDBPartitionReader is responsible for reading data from IoTDB and converting it into Spark's InternalRow format. + * + * @param inputPartition The partition containing query information. + * @param schema The schema of the resulting data. + * @param options IoTDB connection and query options. + */ +class IoTDBPartitionReader(inputPartition: InputPartition, schema: StructType, options: IoTDBOptions) extends PartitionReader[InternalRow] with Logging { + + private lazy val session: ITableSession = { + new TableSessionBuilder() + .username(options.username) + .password(options.password) + .nodeUrls(options.urls) + .database(options.database) + .build() + } + + private lazy val dataSetIterator = session.executeQueryStatement(inputPartition.asInstanceOf[IoTDBInputPartition].getSQL).iterator() + + override def next(): Boolean = dataSetIterator.next() + + override def get(): InternalRow = { + val row = new GenericInternalRow(schema.length) + for (i <- 0 until schema.length) { + if (dataSetIterator.isNull(i + 1)) { + row.setNullAt(i) + } else { + val dataType = schema.fields(i).dataType + row.update(i, IoTDBUtils.getSparkValue(dataType, dataSetIterator, i + 1)) + } + } + row + } + + override def close(): Unit = { + try { + if (session != null) { + session.close() + } + } catch { + case e: Exception => + logError(s"Error closing IoTDB session: ${e.getMessage}") + } + } +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReaderFactory.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReaderFactory.scala new file mode 100644 index 0000000..46c2179 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBPartitionReaderFactory.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.read + +import org.apache.iotdb.spark.table.db.IoTDBOptions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.types.StructType + +class IoTDBPartitionReaderFactory(schema: StructType, options: IoTDBOptions) extends PartitionReaderFactory{ + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + new IoTDBPartitionReader(partition, schema, options) + } +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScan.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScan.scala new file mode 100644 index 0000000..cd480d6 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScan.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.read + +import org.apache.iotdb.spark.table.db.{IoTDBOptions, IoTDBUtils} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.types.StructType + +import scala.language.postfixOps + +class IoTDBScan(options :IoTDBOptions, requiredSchema: StructType, pushedFilters: Array[String], pushDownOffset: Int, pushDownLimit: Int) extends Scan with Batch with Logging { + + override def readSchema(): StructType = requiredSchema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = { + val sql = buildSQL() + logDebug(s"SQL: $sql") + Array(new IoTDBInputPartition(sql)) + } + + private def buildSQL(): String = { + val columnList = getColumns() + val sqlBuilder = new StringBuilder(s"SELECT $columnList FROM ${options.table}") + + if (pushedFilters.nonEmpty) sqlBuilder.append(s" WHERE ${pushedFilters.mkString(" AND ")}") + if (pushDownOffset > 0) sqlBuilder.append(s" OFFSET $pushDownOffset") + if (pushDownLimit > 0) sqlBuilder.append(s" LIMIT $pushDownLimit") + + sqlBuilder.toString() + } + + private def getColumns(): String = { + requiredSchema.fieldNames.map(name => IoTDBUtils.getIoTDBColumnIdentifierInSQL(name, false)).mkString(", ") + } + + override def createReaderFactory(): PartitionReaderFactory = new IoTDBPartitionReaderFactory(requiredSchema, options) +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScanBuilder.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScanBuilder.scala new file mode 100644 index 0000000..2c5d161 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/read/IoTDBScanBuilder.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.read + +import org.apache.iotdb.spark.table.db.IoTDBOptions +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.types.StructType + +import java.util + +/** + * IoTDBScanBuilder is responsible for constructing an IoTDBScan with + * support for predicate push-down, column pruning, offset, and limit. + * + * @param options The IoTDB connection and query options. + * @param schema The full schema of the table. + */ +class IoTDBScanBuilder(options: IoTDBOptions, schema: StructType) extends ScanBuilder + with SupportsPushDownRequiredColumns + with SupportsPushDownV2Filters + with SupportsPushDownOffset + with SupportsPushDownLimit + with Logging { + + private var supportedFilters: Array[Predicate] = Array.empty + private var pushDownFilterStrings: Array[String] = Array.empty + private var requiredColumns: StructType = schema + private var pushDownOffset: Int = -1 + private var pushDownLimit: Int = -1 + + override def build(): Scan = { + new IoTDBScan(options, requiredColumns, pushDownFilterStrings, pushDownOffset, pushDownLimit) + } + + override def pruneColumns(requiredSchema: StructType): Unit = { + if (requiredSchema.nonEmpty) { + val fields = schema.fields.filter( + field => requiredSchema.fieldNames.contains(field.name) + ) + requiredColumns = StructType(fields) + } else { + requiredColumns = schema + } + } + + override def pushOffset(offset: Int): Boolean = { + pushDownOffset = offset + true + } + + override def pushLimit(limit: Int): Boolean = { + pushDownLimit = limit + true + } + + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { + val compiledPredicates = new util.ArrayList[String]() + val builder = new IoTDBExpressionSQLBuilder + val (supported, unsupported) = predicates.partition(predicate => { + try { + val sql = builder.build(predicate) + compiledPredicates.add(sql) + true + } catch { + case e: Exception => { + logDebug(s"Predicate push-down failed for: $predicate, reason: ${e.getMessage}") + false + } + } + }) + pushDownFilterStrings = compiledPredicates.toArray(new Array[String](compiledPredicates.size())) + supportedFilters = supported + unsupported + } + + override def pushedPredicates(): Array[Predicate] = { + supportedFilters + } + +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala new file mode 100644 index 0000000..91f921d --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.write + +import org.apache.iotdb.session.TableSessionBuilder +import org.apache.iotdb.spark.table.db.{IoTDBOptions, IoTDBUtils} +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage} +import org.apache.spark.sql.types.StructType +import org.apache.tsfile.enums.TSDataType +import org.apache.tsfile.write.record.Tablet +import org.apache.tsfile.write.record.Tablet.ColumnCategory + +class IoTDBDataWriter(options: IoTDBOptions, writeSchema: StructType, tableSchema: StructType) extends DataWriter[InternalRow] with Logging { + + private lazy val session = + new TableSessionBuilder() + .username(options.username) + .password(options.password) + .database(options.database) + .nodeUrls(options.urls) + .build() + + private val tableSchemaMap = tableSchema.fields.map(f => f.name -> f).toMap + + private val isWriteSchemaValid = writeSchema.fields.forall(f => tableSchemaMap.contains(f.name)) + + private lazy val tablet = { + val tableName = options.table + val columnNameList = new java.util.ArrayList[String]() + val dataTypeList = new java.util.ArrayList[TSDataType]() + val columnCategoryList = new java.util.ArrayList[ColumnCategory]() + + for (i <- writeSchema.indices) { + val writeSchemaField = writeSchema.fields(i) + val fieldInTableSchema = if (isWriteSchemaValid) { + writeSchema.fields(i) + } else { + tableSchema.fields(i) + } + val columnCategoryStr = tableSchemaMap.getOrElse(fieldInTableSchema.name, tableSchema.fields(i)).metadata.getString(IoTDBUtils.COLUMN_CATEGORY) + val columnCategory = IoTDBUtils.getIoTDBColumnCategory(columnCategoryStr) + if (fieldInTableSchema.name != IoTDBUtils.TIME) { + val dataType = writeSchemaField.dataType + columnNameList.add(fieldInTableSchema.name) + dataTypeList.add(IoTDBUtils.getIoTDBDataType(dataType)) + columnCategoryList.add(columnCategory) + } + } + new Tablet(tableName, columnNameList, dataTypeList, columnCategoryList) + } + + override def write(record: InternalRow): Unit = { + if (tablet.getRowSize == tablet.getMaxRowNumber) { + writeTabletToIoTDB() + } + val currentRow = tablet.getRowSize + try { + for (i <- writeSchema.fields.indices) { + if (!record.isNullAt(i)) { + val column = if (isWriteSchemaValid) { + writeSchema.fields(i).name + } else { + tableSchema.fields(i).name + } + val dataType = writeSchema.fields(i).dataType + val value = IoTDBUtils.getIoTDBValue(dataType, record.get(i, dataType)) + if (column == IoTDBUtils.TIME) { + tablet.addTimestamp(currentRow, value.asInstanceOf[Long]) + } else { + tablet.addValue(column, currentRow, value) + } + } + } + } catch { + case e: Exception => + throw SparkException.internalError("Error writing data to Tablet", e) + } + } + + override def commit(): WriterCommitMessage = { + if (tablet.getRowSize > 0) { + writeTabletToIoTDB() + } + new IoTDBWriterCommitMessage() + } + + private def writeTabletToIoTDB(): Unit = { + try { + session.insert(tablet) + tablet.reset() + } catch { + case e: Exception => + throw SparkException.internalError("Error writing tablet to IoTDB", e) + } + } + + override def abort(): Unit = {} + + override def close(): Unit = { + if (session != null) { + try { + session.close() + } catch { + case e: Exception => + logError(s"Error closing IoTDB session: ${e.getMessage}") + } + } + } +} + +class IoTDBWriterCommitMessage extends WriterCommitMessage {} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWrite.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWrite.scala new file mode 100644 index 0000000..efa5f8b --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWrite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.write + +import org.apache.iotdb.spark.table.db.IoTDBOptions +import org.apache.spark.sql.connector.write._ +import org.apache.spark.sql.types.StructType + +class IoTDBWrite(options: IoTDBOptions, writeSchema: StructType, tableSchema: StructType) extends Write with BatchWrite { + + override def toBatch: BatchWrite = this + + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = new IoTDBWriteFactory(options, writeSchema, tableSchema) + + override def commit(messages: Array[WriterCommitMessage]): Unit = {} + + override def abort(messages: Array[WriterCommitMessage]): Unit = {} +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteBuilder.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteBuilder.scala new file mode 100644 index 0000000..4ea34dd --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteBuilder.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.write + +import org.apache.iotdb.spark.table.db.IoTDBOptions +import org.apache.spark.sql.connector.write.{Write, WriteBuilder} +import org.apache.spark.sql.types.StructType + +class IoTDBWriteBuilder(options: IoTDBOptions, writeSchema: StructType, tableSchema: StructType) extends WriteBuilder { + override def build(): Write = new IoTDBWrite(options, writeSchema, tableSchema) +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteFactory.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteFactory.scala new file mode 100644 index 0000000..8219206 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBWriteFactory.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.write + +import org.apache.iotdb.spark.table.db.IoTDBOptions +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory} +import org.apache.spark.sql.types.StructType + +class IoTDBWriteFactory(options: IoTDBOptions, writeSchema: StructType, tableSchema: StructType) extends DataWriterFactory { + + override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { + new IoTDBDataWriter(options, writeSchema, tableSchema) + } +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/UtilsTest.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/UtilsTest.scala new file mode 100644 index 0000000..1ab390a --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/UtilsTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db + +import org.junit.Assert +import org.scalatest.FunSuite + + +class UtilsTest extends FunSuite { + test("testConvertIdentifier") { + var str = IoTDBUtils.getIoTDBColumnIdentifierInSQL("tag1", false) + Assert.assertEquals("\"tag1\"", str) + str = IoTDBUtils.getIoTDBColumnIdentifierInSQL("`ta``g1`", true) + Assert.assertEquals("\"ta`g1\"", str) + str = IoTDBUtils.getIoTDBColumnIdentifierInSQL("`ta\"g1`", true) + Assert.assertEquals("\"ta\"\"g1\"", str) + } + +} diff --git a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/read/PushDownPredicateSQLBuilderTest.scala b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/read/PushDownPredicateSQLBuilderTest.scala new file mode 100644 index 0000000..7ebdf11 --- /dev/null +++ b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/test/scala/org/apache/iotdb/spark/table/db/read/PushDownPredicateSQLBuilderTest.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table.db.read + +import org.apache.spark.sql.connector.expressions.Expressions +import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And, Not, Or, Predicate} +import org.apache.spark.sql.sources.EqualTo +import org.junit.Assert +import org.scalatest.FunSuite + +import java.sql.Date + +class PushDownPredicateSQLBuilderTest extends FunSuite { + private val builder = new IoTDBExpressionSQLBuilder + test("testBuildIoTDBSQL") { + Assert.assertEquals("(\"s1\" IS NULL)", builder.build(new Predicate("IS_NULL", Array(Expressions.column("s1"))))) + Assert.assertEquals("(\"s`1\" IS NULL)", builder.build(new Predicate("IS_NULL", Array(Expressions.column("`s``1`"))))) + Assert.assertEquals("(\"s\"\"1\" IS NULL)", builder.build(new Predicate("IS_NULL", Array(Expressions.column("`s\"1`"))))) + + Assert.assertEquals("(\"s1\" IS NOT NULL)", builder.build(new Predicate("IS_NOT_NULL", Array(Expressions.column("s1"))))) + + Assert.assertEquals("(ends_with(\"s1\", \"s2\"))", builder.build(new Predicate("ENDS_WITH", Array(Expressions.column("s1"), Expressions.column("s2"))))) + Assert.assertEquals("(ends_with(\"s1\", 'value1'))", builder.build(new Predicate("ENDS_WITH", Array(Expressions.column("s1"), Expressions.literal("value1"))))) + Assert.assertEquals("(ends_with(\"s1\", 'va''lue1'))", builder.build(new Predicate("ENDS_WITH", Array(Expressions.column("s1"), Expressions.literal("va'lue1"))))) + + Assert.assertEquals("(starts_with(\"s1\", \"s2\"))", builder.build(new Predicate("STARTS_WITH", Array(Expressions.column("s1"), Expressions.column("s2"))))) + Assert.assertEquals("(starts_with(\"s1\", 'value1'))", builder.build(new Predicate("STARTS_WITH", Array(Expressions.column("s1"), Expressions.literal("value1"))))) + Assert.assertEquals("(starts_with(\"s1\", 'va''lue1'))", builder.build(new Predicate("STARTS_WITH", Array(Expressions.column("s1"), Expressions.literal("va'lue1"))))) + + Assert.assertThrows(classOf[UnsupportedOperationException], () => builder.build(new Predicate("CONTAINS", Array(Expressions.column("s1"), Expressions.column("s2"))))) + Assert.assertEquals("(\"s1\" LIKE '%value1%')", builder.build(new Predicate("CONTAINS", Array(Expressions.column("s1"), Expressions.literal("value1"))))) + Assert.assertEquals("(\"s1\" LIKE '%va''lue1%')", builder.build(new Predicate("CONTAINS", Array(Expressions.column("s1"), Expressions.literal("va'lue1"))))) + + Assert.assertEquals("(\"s1\" IN (1,2,3))", builder.build(new Predicate("IN", Array(Expressions.column("s1"), Expressions.literal(1), Expressions.literal(2), Expressions.literal(3))))) + Assert.assertEquals("(\"s1\" IN ('value1','value2','val''ue3'))", builder.build(new Predicate("IN", Array(Expressions.column("s1"), Expressions.literal("value1"), Expressions.literal("value2"), Expressions.literal("val\'ue3"))))) + + Assert.assertEquals("(\"s1\" = 1)", builder.build(new Predicate("=", Array(Expressions.column("s1"), Expressions.literal(1))))) + Assert.assertEquals("(\"s1\" = 1)", builder.build(new Predicate("=", Array(Expressions.column("s1"), Expressions.literal(1.toShort))))) + Assert.assertEquals("(\"s1\" = 1)", builder.build(new Predicate("=", Array(Expressions.column("s1"), Expressions.literal(1.toByte))))) + Assert.assertEquals("(\"s1\" = 'val''ue1')", builder.build(new Predicate("=", Array(Expressions.column("s1"), Expressions.literal("val'ue1"))))) + Assert.assertEquals("(\"s1\" = X'010101')", builder.build(new Predicate("=", Array(Expressions.column("s1"), Expressions.literal(Array(1.toByte, 1.toByte, 1.toByte)))))) + // If you meet error on jdk17, add '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED' to VM options + Assert.assertEquals("(\"s1\" = CAST('2025-01-01' as DATE))", builder.build(EqualTo("s1", Date.valueOf("2025-01-01")).toV2)) + + Assert.assertEquals("(\"s1\" != 1)", builder.build(new Predicate("<>", Array(Expressions.column("s1"), Expressions.literal(1))))) + Assert.assertEquals("(\"s1\" < 1)", builder.build(new Predicate("<", Array(Expressions.column("s1"), Expressions.literal(1))))) + Assert.assertEquals("(\"s1\" <= 1)", builder.build(new Predicate("<=", Array(Expressions.column("s1"), Expressions.literal(1))))) + Assert.assertEquals("(\"s1\" > 1)", builder.build(new Predicate(">", Array(Expressions.column("s1"), Expressions.literal(1))))) + Assert.assertEquals("(\"s1\" >= 1)", builder.build(new Predicate(">=", Array(Expressions.column("s1"), Expressions.literal(1))))) + Assert.assertThrows(classOf[UnsupportedOperationException], () => builder.build(new Predicate("<=>", Array(Expressions.column("s1"), Expressions.literal(1))))) + + Assert.assertEquals("((\"time\" = 1) AND (\"s1\" = 1))", builder.build(new And(new Predicate("=", Array(Expressions.column("time"), Expressions.literal(1L))), new Predicate("=", Array(Expressions.column("s1"), Expressions.literal(1)))))) + Assert.assertEquals("((\"time\" = 1) OR (\"s1\" = 1))", builder.build(new Or(new Predicate("=", Array(Expressions.column("time"), Expressions.literal(1L))), new Predicate("=", Array(Expressions.column("s1"), Expressions.literal(1)))))) + Assert.assertEquals("(NOT (\"s1\" = 1))", builder.build(new Not(new Predicate("=", Array(Expressions.column("s1"), Expressions.literal(1)))))) + Assert.assertEquals("(true)", builder.build(new AlwaysTrue)) + Assert.assertEquals("(false)", builder.build(new AlwaysFalse)) + } + +} diff --git a/examples/pom.xml b/examples/pom.xml index 2935b50..f743e7b 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -37,6 +37,7 @@ pulsar rabbitmq rocketmq + spark-table diff --git a/examples/spark-table/README.md b/examples/spark-table/README.md new file mode 100644 index 0000000..b986b66 --- /dev/null +++ b/examples/spark-table/README.md @@ -0,0 +1,103 @@ + +# IoTDB-Table-Spark-Connector Example +## Introduction +This example demonstrates how to use the IoTDB-Table-Spark-Connector to read and write data from/to IoTDB in Spark. +## Version +* Scala 2.12 +* Spark 3.3 or later +## Usage +Import the IoTDB-Table-Spark-Connector dependency in your project. +``` + + org.apache.iotdb + spark-iotdb-table-connector-3.5 + +``` +## Options +| Key | Default Value | Comment | Required | +|----------------|----------------|-----------------------------------------------------------------------------------------------------------|----------| +| iotdb.database | -- | The database name of Iotdb, which needs to be a database that already exists in IoTDB | true | +| iotdb.table | -- | The table name in IoTDB needs to be a table that already exists in IoTDB | true | +| iotdb.username | root | the username to access IoTDB | false | +| iotdb.password | root | the password to access IoTDB | false | +| iotdb.urls | 127.0.0.1:6667 | The url for the client to connect to the datanode rpc. If there are multiple urls, separate them with ',' | false | + + +## Read +### DataFrame +```scala +val df = spark.read.format("org.apache.iotdb.spark.table.db.IoTDBTableProvider") + .option("iotdb.database", "$YOUR_IOTDB_DATABASE_NAME") + .option("iotdb.table", "$YOUR_IOTDB_TABLE_NAME") + .option("iotdb.username", "$YOUR_IOTDB_USERNAME") + .option("iotdb.password", "$YOUR_IOTDB_PASSWORD") + .option("iotdb.url", "$YOUR_IOTDB_URL") + .load() +``` +### Spark SQL +``` +CREATE TEMPORARY VIEW spark_iotdb + USING org.apache.iotdb.spark.table.db.IoTDBTableProvider + OPTIONS( + "iotdb.database"="$YOUR_IOTDB_DATABASE_NAME", + "iotdb.table"="$YOUR_IOTDB_TABLE_NAME", + "iotdb.username"="$YOUR_IOTDB_USERNAME", + "iotdb.password"="$YOUR_IOTDB_PASSWORD", + "iotdb.urls"="$YOUR_IOTDB_URL" +); + +SELECT * FROM spark_iotdb; +``` + +## Write +### DataFrame +```scala +val df = spark.createDataFrame(List( + (1L, "tag1_value1", "tag2_value1", "attribute1_value1", 1, true), + (1L, "tag1_value1", "tag2_value2", "attribute1_value1", 2, false))) + .toDF("time", "tag1", "tag2", "attribute1", "s1", "s2") + +df + .write + .format("org.apache.iotdb.spark.table.db.IoTDBTableProvider") + .option("iotdb.database", "$YOUR_IOTDB_DATABASE_NAME") + .option("iotdb.table", "$YOUR_IOTDB_TABLE_NAME") + .option("iotdb.username", "$YOUR_IOTDB_USERNAME") + .option("iotdb.password", "$YOUR_IOTDB_PASSWORD") + .option("iotdb.urls", "$YOUR_IOTDB_URL") + .save() +``` +### Spark SQL +``` +CREATE TEMPORARY VIEW spark_iotdb + USING org.apache.iotdb.spark.table.db.IoTDBTableProvider + OPTIONS( + "iotdb.database"="$YOUR_IOTDB_DATABASE_NAME", + "iotdb.table"="$YOUR_IOTDB_TABLE_NAME", + "iotdb.username"="$YOUR_IOTDB_USERNAME", + "iotdb.password"="$YOUR_IOTDB_PASSWORD", + "iotdb.urls"="$YOUR_IOTDB_URL" +); + +INSERT INTO spark_iotdb VALUES ("VALUE1", "VALUE2", ...); +INSERT INTO spark_iotdb SELECT * FROM YOUR_TABLE +``` \ No newline at end of file diff --git a/examples/spark-table/pom.xml b/examples/spark-table/pom.xml new file mode 100644 index 0000000..762f318 --- /dev/null +++ b/examples/spark-table/pom.xml @@ -0,0 +1,72 @@ + + + + 4.0.0 + + org.apache.iotdb + examples + 2.0.2-SNAPSHOT + + table-spark-connector-example + IoTDB: Example: IoTDB Table Spark Connector + + + org.apache.iotdb + spark-iotdb-table-connector-3.5 + 2.0.2-SNAPSHOT + + + org.apache.spark + spark-sql_2.12 + 3.5.0 + provided + + + com.thoughtworks.paranamer + paranamer + 2.8 + + + org.scala-lang + scala-library + provided + + + + + + org.scala-tools + maven-scala-plugin + + ${scala.version} + + + + + add-source + + + + + + + diff --git a/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorReadExample.scala b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorReadExample.scala new file mode 100644 index 0000000..89a9f5f --- /dev/null +++ b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorReadExample.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table + +import org.apache.spark.sql.SparkSession + +object SparkConnectorReadExample { + + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("IoTDB Spark Demo") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.master", "local[*]") + .getOrCreate() + val df = spark.read.format("org.apache.iotdb.spark.table.db.IoTDBTableProvider") + .option("iotdb.database", "test") + .option("iotdb.table", "table1") + .option("iotdb.username", "root") + .option("iotdb.password", "root") + .option("iotdb.urls", "127.0.0.1:6667") + .load() + df.createTempView("iotdb_table1") + df.printSchema() + spark.sql("select * from iotdb_table1").show() + spark.close() + } +} diff --git a/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorSQLExample.scala b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorSQLExample.scala new file mode 100644 index 0000000..fc9249f --- /dev/null +++ b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorSQLExample.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table + +import org.apache.spark.sql.SparkSession + +object SparkConnectorSQLExample { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("IoTDB Spark Demo") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.master", "local[*]") + .getOrCreate() + spark.sql( + """ + CREATE TEMPORARY VIEW spark_iotdb1 + USING org.apache.iotdb.spark.table.db.IoTDBTableProvider + OPTIONS( + "iotdb.database"="test", + "iotdb.table"="table1", + "iotdb.username"="root", + "iotdb.password"="root", + "iotdb.url"="127.0.0.1:6667"); + """) + spark.sql( + """ + CREATE TEMPORARY VIEW spark_iotdb2 + USING org.apache.iotdb.spark.table.db.IoTDBTableProvider + OPTIONS( + "iotdb.database"="test", + "iotdb.table"="table2", + "iotdb.username"="root", + "iotdb.password"="root", + "iotdb.urls"="127.0.0.1:6667"); + """) + spark.sql("select * from spark_iotdb1").show + spark.sql("insert into spark_iotdb2 select time,tag1, s0, s1 from spark_iotdb1") + spark.sql("select * from spark_iotdb1").show + spark.close() + + } +} diff --git a/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorWriteExample.scala b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorWriteExample.scala new file mode 100644 index 0000000..7e3deb6 --- /dev/null +++ b/examples/spark-table/src/main/scala/org/apache/iotdb/spark/table/SparkConnectorWriteExample.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.spark.table + +import org.apache.spark.sql.SparkSession + +object SparkConnectorWriteExample { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("IoTDB Spark Demo") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.master", "local[*]") + .getOrCreate() + // time, tag1 string tag,tag2 string tag, s0 int32, s1 boolean + val df = spark.createDataFrame(List( + (1L, "tag1_value1","tag2_value1", 1, false), + (1L, "tag1_value1","tag2_value1", 1, true), + (2L, "tag1_value2","tag2_value1")), 2, true) + .toDF("time", "tag1", "tag2", "s0", "s1") + + + df + .write + .format("org.apache.iotdb.spark.table.db.IoTDBTableProvider") + .option("iotdb.database", "test") + .option("iotdb.table", "spark_table1") + .option("iotdb.username", "root") + .option("iotdb.password", "root") + .option("iotdb.urls", "127.0.0.1:6667") + .mode("append") + .save() + spark.close() + } +} diff --git a/iotdb-collector/collector-core/pom.xml b/iotdb-collector/collector-core/pom.xml index e8e6b5b..7b68e68 100644 --- a/iotdb-collector/collector-core/pom.xml +++ b/iotdb-collector/collector-core/pom.xml @@ -103,6 +103,10 @@ com.lmax disruptor + + com.google.code.findbugs + jsr305 + diff --git a/pom.xml b/pom.xml index f3e32df..3620804 100644 --- a/pom.xml +++ b/pom.xml @@ -693,6 +693,16 @@ scalatest_2.11 ${scalatest.version} + + org.scalatest + scalatest_2.12 + ${scalatest.version} + + + org.scalactic + scalactic_2.12 + 3.0.9 + org.scalactic scalactic_2.11