Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -43,13 +43,14 @@ case class AvroTable(
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
override def build(): Write =
AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
createFileWriteBuilder(info) {
(mergedInfo, partSchema, customLocs, dynamicOverwrite, truncate) =>
AvroWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, customLocs,
dynamicOverwrite, truncate)
}
}

override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)

override def formatName: String = "AVRO"
override def formatName: String = "Avro"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ case class AvroWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo) extends FileWrite {
info: LogicalWriteInfo,
partitionSchema: StructType,
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
override val dynamicPartitionOverwrite: Boolean,
override val isTruncate: Boolean) extends FileWrite {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2, FileTable}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructField, StructType}
Expand Down Expand Up @@ -247,7 +247,34 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
constructV1TableCmd(None, c.tableSpec, ident, StructType(fields), c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
// File sources: validate data types and create via
// V1 command. Non-file V2 providers keep V2 plan.
DataSourceV2Utils.getTableProvider(
provider, conf) match {
case Some(f: FileDataSourceV2) =>
val ft = f.getTable(
c.tableSchema, c.partitioning.toArray,
new org.apache.spark.sql.util
.CaseInsensitiveStringMap(
java.util.Collections.emptyMap()))
ft match {
case ft: FileTable =>
c.tableSchema.foreach { field =>
if (!ft.supportsDataType(
field.dataType)) {
throw QueryCompilationErrors
.dataTypeUnsupportedByDataSourceError(
ft.formatName, field)
}
}
case _ =>
}
constructV1TableCmd(None, c.tableSpec, ident,
StructType(c.columns.map(_.toV1Column)),
c.partitioning,
c.ignoreIfExists, storageFormat, provider)
case _ => c
}
}

case c @ CreateTableAsSelect(
Expand All @@ -267,7 +294,17 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
// File sources: create via V1 command.
// Non-file V2 providers keep V2 plan.
DataSourceV2Utils.getTableProvider(
provider, conf) match {
case Some(_: FileDataSourceV2) =>
constructV1TableCmd(Some(c.query),
c.tableSpec, ident, new StructType,
c.partitioning, c.ignoreIfExists,
storageFormat, provider)
case _ => c
}
}

case RefreshTable(ResolvedV1TableOrViewIdentifier(ident)) =>
Expand All @@ -281,7 +318,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.unsupportedTableOperationError(
ident, "REPLACE TABLE")
} else {
c
// File sources don't support REPLACE TABLE in
// the session catalog (requires StagingTableCatalog).
DataSourceV2Utils.getTableProvider(
provider, conf) match {
case Some(_: FileDataSourceV2) =>
throw QueryCompilationErrors
.unsupportedTableOperationError(
ident, "REPLACE TABLE")
case _ => c
}
}

case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
Expand All @@ -290,7 +336,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.unsupportedTableOperationError(
ident, "REPLACE TABLE AS SELECT")
} else {
c
DataSourceV2Utils.getTableProvider(
provider, conf) match {
case Some(_: FileDataSourceV2) =>
throw QueryCompilationErrors
.unsupportedTableOperationError(
ident, "REPLACE TABLE AS SELECT")
case _ => c
}
}

// For CREATE TABLE LIKE, use the v1 command if both the target and source are in the session
Expand Down Expand Up @@ -377,9 +430,35 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AnalyzeTables(ResolvedV1Database(db), noScan) =>
AnalyzeTablesCommand(Some(db), noScan)

// V2 FileTable backed by session catalog: route through V1 commands.
// FileTable from V2SessionCatalog.loadTable doesn't match V1 extractors
// (ResolvedV1TableIdentifier, etc.), so we intercept here and delegate
// to V1 commands using the catalogTable metadata.
case AnalyzeTable(
ResolvedTable(catalog, _, ft: FileTable, _),
partitionSpec, noScan)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
val tableIdent = ft.catalogTable.get.identifier
if (partitionSpec.isEmpty) {
AnalyzeTableCommand(tableIdent, noScan)
} else {
AnalyzePartitionCommand(
tableIdent, partitionSpec, noScan)
}

case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
AnalyzeColumnCommand(ident, columnNames, allColumns)

case AnalyzeColumn(
ResolvedTable(catalog, _, ft: FileTable, _),
columnNames, allColumns)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
AnalyzeColumnCommand(
ft.catalogTable.get.identifier,
columnNames, allColumns)

// V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here.
case RepairTable(
ResolvedV1TableIdentifierInSessionCatalog(ident),
Expand Down Expand Up @@ -418,11 +497,24 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
TruncateTableCommand(ident, None)

case TruncateTable(ResolvedTable(catalog, _, ft: FileTable, _))
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
TruncateTableCommand(ft.catalogTable.get.identifier, None)

case TruncatePartition(ResolvedV1TableIdentifier(ident), partitionSpec) =>
TruncateTableCommand(
ident,
Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)

case TruncatePartition(
ResolvedTable(catalog, _, ft: FileTable, _), partitionSpec)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
TruncateTableCommand(
ft.catalogTable.get.identifier,
Some(partSpecToMap(partitionSpec, ft.partitionSchema())))

case ShowPartitions(
ResolvedV1TableOrViewIdentifier(ident),
pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
Expand All @@ -431,6 +523,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
output,
pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))

case ShowPartitions(
ResolvedTable(catalog, _, ft: FileTable, _), pattern, output)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
ShowPartitionsCommand(
ft.catalogTable.get.identifier,
output,
pattern.map(partSpecToMap(_, ft.partitionSchema())))

case ShowColumns(ResolvedViewIdentifier(ident), ns, output) =>
val resolver = conf.resolver
val db = ns match {
Expand Down Expand Up @@ -464,18 +565,52 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
enableDropPartitions = false,
"ALTER TABLE RECOVER PARTITIONS")

case RecoverPartitions(ResolvedTable(catalog, _, ft: FileTable, _))
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
RepairTableCommand(
ft.catalogTable.get.identifier,
enableAddPartitions = true,
enableDropPartitions = false,
"ALTER TABLE RECOVER PARTITIONS")

case AddPartitions(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) =>
AlterTableAddPartitionCommand(
ident,
partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)),
ifNotExists)

case AddPartitions(
ResolvedTable(catalog, _, ft: FileTable, _),
partSpecsAndLocs, ifNotExists)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
AlterTableAddPartitionCommand(
ft.catalogTable.get.identifier,
partSpecsAndLocs.map { spec =>
(partSpecToMap(spec, ft.partitionSchema()), spec match {
case r: ResolvedPartitionSpec => r.location
case u: UnresolvedPartitionSpec => u.location
})
},
ifNotExists)

case RenamePartitions(
ResolvedV1TableIdentifier(ident),
UnresolvedPartitionSpec(from, _),
UnresolvedPartitionSpec(to, _)) =>
AlterTableRenamePartitionCommand(ident, from, to)

case RenamePartitions(
ResolvedTable(catalog, _, ft: FileTable, _), from, to)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
val partSchema = ft.partitionSchema()
AlterTableRenamePartitionCommand(
ft.catalogTable.get.identifier,
partSpecToMap(from, partSchema),
partSpecToMap(to, partSchema))

case DropPartitions(
ResolvedV1TableIdentifier(ident), specs, ifExists, purge) =>
AlterTableDropPartitionCommand(
Expand All @@ -485,6 +620,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
purge,
retainData = false)

case DropPartitions(
ResolvedTable(catalog, _, ft: FileTable, _),
specs, ifExists, purge)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
AlterTableDropPartitionCommand(
ft.catalogTable.get.identifier,
specs.map(partSpecToMap(_, ft.partitionSchema())),
ifExists,
purge,
retainData = false)

// V2 catalog doesn't support setting serde properties yet, we must use v1 command here.
case SetTableSerDeProperties(
ResolvedV1TableIdentifierInSessionCatalog(ident),
Expand All @@ -507,6 +654,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
location) =>
AlterTableSetLocationCommand(ident, Some(partitionSpec), location)

case SetTableLocation(
ResolvedTable(catalog, _, ft: FileTable, _),
partitionSpec, location)
if supportsV1Command(catalog)
&& ft.catalogTable.isDefined =>
AlterTableSetLocationCommand(
ft.catalogTable.get.identifier, partitionSpec, location)

case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)

Expand Down Expand Up @@ -928,4 +1083,19 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) == "builtin" ||
catalog.isInstanceOf[CatalogExtension])
}

/**
* Converts a [[PartitionSpec]] (either [[ResolvedPartitionSpec]] or [[UnresolvedPartitionSpec]])
* to a V1 partition spec map (Map[String, String]).
*/
private def partSpecToMap(
spec: PartitionSpec,
partSchema: StructType): Map[String, String] = spec match {
case r: ResolvedPartitionSpec =>
r.names.zipWithIndex.map { case (name, i) =>
val v = r.ident.get(i, partSchema(name).dataType)
name -> (if (v == null) null else v.toString)
}.toMap
case u: UnresolvedPartitionSpec => u.spec
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
if (curmode == SaveMode.Append) {
AppendData.byName(relation, df.logicalPlan, finalOptions)
} else {
// Truncate the table. TableCapabilityCheck will throw a nice exception if this
// isn't supported
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
val dynamicOverwrite =
df.sparkSession.sessionState.conf.partitionOverwriteMode ==
PartitionOverwriteMode.DYNAMIC &&
partitioningColumns.exists(_.nonEmpty)
if (dynamicOverwrite) {
OverwritePartitionsDynamic.byName(
relation, df.logicalPlan, finalOptions)
} else {
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
}
}

case createMode =>
Expand Down Expand Up @@ -438,9 +445,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val session = df.sparkSession
val v2ProviderOpt = lookupV2Provider()
val canUseV2 = v2ProviderOpt.isDefined || (hasCustomSessionCatalog &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
// TODO(SPARK-56230): File source V2 does not support
// saveAsTable (Overwrite creates ReplaceTableAsSelect
// which requires StagingTableCatalog).
val v2ProviderOpt = lookupV2Provider().flatMap {
case _: FileDataSourceV2 => None
case other => Some(other)
}
val canUseV2 = v2ProviderOpt.isDefined ||
(hasCustomSessionCatalog &&
!df.sparkSession.sessionState.catalogManager
.catalog(CatalogManager.SESSION_CATALOG_NAME)
.isInstanceOf[CatalogExtension])

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
Expand Down Expand Up @@ -595,8 +610,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram

private def lookupV2Provider(): Option[TableProvider] = {
DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
// TODO(SPARK-28396): File source v2 write path is currently broken.
case Some(_: FileDataSourceV2) => None
// File source V2 supports non-partitioned Append and
// Overwrite via DataFrame API (df.write.save(path)).
// Fall back to V1 for:
// - ErrorIfExists/Ignore (TODO: SPARK-56174)
// - Partitioned writes (TODO: SPARK-56174)
case Some(_: FileDataSourceV2)
if (curmode != SaveMode.Append
&& curmode != SaveMode.Overwrite)
|| partitioningColumns.exists(_.nonEmpty) =>
None
case other => other
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,10 @@ case class DataSource(
val caseSensitive = conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)

val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
val fileIndex = catalogTable.map(_.identifier).flatMap { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collectFirst {
case LogicalRelationWithTable(t: HadoopFsRelation, _) => t.location
}.head
}
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
Expand Down
Loading