Skip to content

Commit 22de260

Browse files
authored
fix data type conversion for spark iotdb table connector
1 parent 4501dbc commit 22de260

2 files changed

Lines changed: 15 additions & 4 deletions

File tree

connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,13 @@ object IoTDBUtils {
127127
case BinaryType => TSDataType.BLOB
128128
case DateType => TSDataType.DATE
129129
case TimestampType => TSDataType.STRING
130-
case _ => TSDataType.STRING
130+
case _ => {
131+
var errMsg = s"Unable to convert Spark data type $sparkDataType to IoTDB data type."
132+
if (sparkDataType.simpleString.toLowerCase.contains("decimal")) {
133+
errMsg += s"For float numbers in insert into values sql, you should add the suffix 'f' or 'd' to represent float or double."
134+
}
135+
throw new IllegalArgumentException(errMsg)
136+
}
131137
}
132138
}
133139

connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.SparkException
2525
import org.apache.spark.internal.Logging
2626
import org.apache.spark.sql.catalyst.InternalRow
2727
import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
28-
import org.apache.spark.sql.types.StructType
28+
import org.apache.spark.sql.types.{NullType, StructType}
2929
import org.apache.tsfile.enums.TSDataType
3030
import org.apache.tsfile.write.record.Tablet
3131
import org.apache.tsfile.write.record.Tablet.ColumnCategory
@@ -57,10 +57,15 @@ class IoTDBDataWriter(options: IoTDBOptions, writeSchema: StructType, tableSchem
5757
} else {
5858
tableSchema.fields(i)
5959
}
60-
val columnCategoryStr = tableSchemaMap.getOrElse(fieldInTableSchema.name, tableSchema.fields(i)).metadata.getString(IoTDBUtils.COLUMN_CATEGORY)
60+
val actualColumnSchema = tableSchemaMap.getOrElse(fieldInTableSchema.name, tableSchema.fields(i))
61+
val columnCategoryStr = actualColumnSchema.metadata.getString(IoTDBUtils.COLUMN_CATEGORY)
6162
val columnCategory = IoTDBUtils.getIoTDBColumnCategory(columnCategoryStr)
6263
if (fieldInTableSchema.name != IoTDBUtils.TIME) {
63-
val dataType = writeSchemaField.dataType
64+
val dataType = if (writeSchemaField.dataType == NullType) {
65+
actualColumnSchema.dataType
66+
} else {
67+
writeSchemaField.dataType
68+
}
6469
columnNameList.add(fieldInTableSchema.name)
6570
dataTypeList.add(IoTDBUtils.getIoTDBDataType(dataType))
6671
columnCategoryList.add(columnCategory)

0 commit comments

Comments
 (0)