Skip to content

Commit 677a482

Browse files
committed
[SPARK-56171][SQL] Enable V2 file write path for non-partitioned DataFrame API writes and delete FallBackFileSourceV2
Key changes: - FileWrite: added partitionSchema, customPartitionLocations, dynamicPartitionOverwrite, isTruncate; path creation and truncate logic; dynamic partition overwrite via FileCommitProtocol - FileTable: createFileWriteBuilder with SupportsDynamicOverwrite and SupportsTruncate; capabilities now include TRUNCATE and OVERWRITE_DYNAMIC; fileIndex skips file existence checks when userSpecifiedSchema is provided (write path) - All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use createFileWriteBuilder with partition/truncate/overwrite support - DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for non-partitioned Append and Overwrite via df.write.save(path) - DataFrameWriter.insertInto: V1 fallback for file sources (TODO: SPARK-56175) - DataFrameWriter.saveAsTable: V1 fallback for file sources (TODO: SPARK-56230, needs StagingTableCatalog) - DataSourceV2Utils.getTableProvider: V1 fallback for file sources (TODO: SPARK-56175) - Removed FallBackFileSourceV2 rule - V2SessionCatalog.createTable: V1 FileFormat data type validation
1 parent 91e21ee commit 677a482

24 files changed

Lines changed: 806 additions & 319 deletions

File tree

connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus
2222

2323
import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.avro.AvroUtils
25-
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
25+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
2626
import org.apache.spark.sql.execution.datasources.FileFormat
2727
import org.apache.spark.sql.execution.datasources.v2.FileTable
2828
import org.apache.spark.sql.types.{DataType, StructType}
@@ -43,13 +43,14 @@ case class AvroTable(
4343
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
4444

4545
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
46-
new WriteBuilder {
47-
override def build(): Write =
48-
AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
46+
createFileWriteBuilder(info) {
47+
(mergedInfo, partSchema, customLocs, dynamicOverwrite, truncate) =>
48+
AvroWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, customLocs,
49+
dynamicOverwrite, truncate)
4950
}
5051
}
5152

5253
override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)
5354

54-
override def formatName: String = "AVRO"
55+
override def formatName: String = "Avro"
5556
}

connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ case class AvroWrite(
2929
paths: Seq[String],
3030
formatName: String,
3131
supportsDataType: DataType => Boolean,
32-
info: LogicalWriteInfo) extends FileWrite {
32+
info: LogicalWriteInfo,
33+
partitionSchema: StructType,
34+
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
35+
override val dynamicPartitionOverwrite: Boolean,
36+
override val isTruncate: Boolean) extends FileWrite {
3337
override def prepareWrite(
3438
sqlConf: SQLConf,
3539
job: Job,

sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
194194
if (curmode == SaveMode.Append) {
195195
AppendData.byName(relation, df.logicalPlan, finalOptions)
196196
} else {
197-
// Truncate the table. TableCapabilityCheck will throw a nice exception if this
198-
// isn't supported
199-
OverwriteByExpression.byName(
200-
relation, df.logicalPlan, Literal(true), finalOptions)
197+
val dynamicOverwrite =
198+
df.sparkSession.sessionState.conf.partitionOverwriteMode ==
199+
PartitionOverwriteMode.DYNAMIC &&
200+
partitioningColumns.exists(_.nonEmpty)
201+
if (dynamicOverwrite) {
202+
OverwritePartitionsDynamic.byName(
203+
relation, df.logicalPlan, finalOptions)
204+
} else {
205+
OverwriteByExpression.byName(
206+
relation, df.logicalPlan, Literal(true), finalOptions)
207+
}
201208
}
202209

203210
case createMode =>
@@ -318,7 +325,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
318325
}
319326

320327
val session = df.sparkSession
321-
val canUseV2 = lookupV2Provider().isDefined
328+
// TODO(SPARK-56175): File source V2 does not support
329+
// insertInto for catalog tables yet.
330+
val canUseV2 = lookupV2Provider() match {
331+
case Some(_: FileDataSourceV2) => false
332+
case Some(_) => true
333+
case None => false
334+
}
322335

323336
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
324337
case NonSessionCatalogAndIdentifier(catalog, ident) =>
@@ -438,9 +451,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
438451
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
439452

440453
val session = df.sparkSession
441-
val v2ProviderOpt = lookupV2Provider()
442-
val canUseV2 = v2ProviderOpt.isDefined || (hasCustomSessionCatalog &&
443-
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
454+
// TODO(SPARK-56230): File source V2 does not support
455+
// saveAsTable yet. Always use V1 for file sources.
456+
val v2ProviderOpt = lookupV2Provider().flatMap {
457+
case _: FileDataSourceV2 => None
458+
case other => Some(other)
459+
}
460+
val canUseV2 = v2ProviderOpt.isDefined ||
461+
(hasCustomSessionCatalog &&
462+
!df.sparkSession.sessionState.catalogManager
463+
.catalog(CatalogManager.SESSION_CATALOG_NAME)
444464
.isInstanceOf[CatalogExtension])
445465

446466
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
@@ -595,8 +615,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
595615

596616
private def lookupV2Provider(): Option[TableProvider] = {
597617
DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
598-
// TODO(SPARK-28396): File source v2 write path is currently broken.
599-
case Some(_: FileDataSourceV2) => None
618+
// File source V2 supports non-partitioned Append and
619+
// Overwrite via DataFrame API (df.write.save(path)).
620+
// Fall back to V1 for:
621+
// - ErrorIfExists/Ignore (TODO: SPARK-56174)
622+
// - Partitioned writes (TODO: SPARK-56174)
623+
case Some(_: FileDataSourceV2)
624+
if (curmode != SaveMode.Append
625+
&& curmode != SaveMode.Overwrite)
626+
|| partitioningColumns.exists(_.nonEmpty) =>
627+
None
600628
case other => other
601629
}
602630
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala

Lines changed: 0 additions & 49 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.sql.execution.datasources
1818

1919
import scala.collection.mutable
20+
import scala.jdk.CollectionConverters._
2021

2122
import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
2223
import org.apache.hadoop.mapreduce.TaskAttemptContext
@@ -104,6 +105,14 @@ abstract class FileFormatDataWriter(
104105
}
105106
}
106107

108+
/**
109+
* Override writeAll to ensure V2 DataWriter.writeAll path also wraps
110+
* errors with TASK_WRITE_FAILED, matching V1 behavior.
111+
*/
112+
override def writeAll(records: java.util.Iterator[InternalRow]): Unit = {
113+
writeWithIterator(records.asScala)
114+
}
115+
107116
/** Write an iterator of records. */
108117
def writeWithIterator(iterator: Iterator[InternalRow]): Unit = {
109118
var count = 0L

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
7070
val nameParts = ident.toQualifiedNameParts(catalog)
7171
cacheManager.recacheTableOrView(session, nameParts, includeTimeTravel = false)
7272
case _ =>
73-
cacheManager.recacheByPlan(session, r)
73+
r.table match {
74+
case ft: FileTable =>
75+
ft.fileIndex.refresh()
76+
val path = new Path(ft.fileIndex.rootPaths.head.toUri)
77+
val fs = path.getFileSystem(hadoopConf)
78+
cacheManager.recacheByPath(session, path, fs)
79+
case _ =>
80+
cacheManager.recacheByPlan(session, r)
81+
}
7482
}
7583

7684
private def recacheTable(r: ResolvedTable, includeTimeTravel: Boolean)(): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,11 @@ private[sql] object DataSourceV2Utils extends Logging {
164164
// `HiveFileFormat`, when running tests in sql/core.
165165
if (DDLUtils.isHiveTable(Some(provider))) return None
166166
DataSource.lookupDataSourceV2(provider, conf) match {
167-
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
168-
case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p)
167+
// TODO(SPARK-56175): File source V2 catalog table loading
168+
// is not yet fully supported (stats, partition management,
169+
// data type validation gaps).
170+
case Some(_: FileDataSourceV2) => None
171+
case Some(p) => Some(p)
169172
case _ => None
170173
}
171174
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import org.apache.spark.sql.SparkSession
2626
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
2727
import org.apache.spark.sql.connector.catalog.TableCapability._
2828
import org.apache.spark.sql.connector.expressions.Transform
29-
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
29+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
30+
LogicalWriteInfoImpl, SupportsDynamicOverwrite,
31+
SupportsTruncate, Write, WriteBuilder}
3032
import org.apache.spark.sql.errors.QueryCompilationErrors
3133
import org.apache.spark.sql.execution.datasources._
3234
import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
@@ -49,18 +51,27 @@ abstract class FileTable(
4951
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
5052
// Hadoop Configurations are case sensitive.
5153
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
52-
if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
53-
// We are reading from the results of a streaming query. We will load files from
54-
// the metadata log instead of listing them using HDFS APIs.
54+
// When userSpecifiedSchema is provided (e.g., write path via DataFrame API), the path
55+
// may not exist yet. Skip streaming metadata check and file existence checks.
56+
val isStreamingMetadata = userSpecifiedSchema.isEmpty &&
57+
FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)
58+
if (isStreamingMetadata) {
5559
new MetadataLogFileIndex(sparkSession, new Path(paths.head),
5660
options.asScala.toMap, userSpecifiedSchema)
5761
} else {
58-
// This is a non-streaming file based datasource.
59-
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
60-
checkEmptyGlobPath = true, checkFilesExist = true, enableGlobbing = globPaths)
61-
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
62+
val checkFilesExist = userSpecifiedSchema.isEmpty
63+
val rootPathsSpecified =
64+
DataSource.checkAndGlobPathIfNecessary(
65+
paths, hadoopConf,
66+
checkEmptyGlobPath = checkFilesExist,
67+
checkFilesExist = checkFilesExist,
68+
enableGlobbing = globPaths)
69+
val fileStatusCache =
70+
FileStatusCache.getOrCreate(sparkSession)
6271
new InMemoryFileIndex(
63-
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
72+
sparkSession, rootPathsSpecified,
73+
caseSensitiveMap, userSpecifiedSchema,
74+
fileStatusCache)
6475
}
6576
}
6677

@@ -174,8 +185,43 @@ abstract class FileTable(
174185
writeInfo.rowIdSchema(),
175186
writeInfo.metadataSchema())
176187
}
188+
189+
/**
190+
* Creates a [[WriteBuilder]] that supports truncate and
191+
* dynamic partition overwrite for file-based tables.
192+
*/
193+
protected def createFileWriteBuilder(
194+
info: LogicalWriteInfo)(
195+
buildWrite: (LogicalWriteInfo, StructType,
196+
Map[Map[String, String], String],
197+
Boolean, Boolean) => Write
198+
): WriteBuilder = {
199+
new WriteBuilder with SupportsDynamicOverwrite with SupportsTruncate {
200+
private var isDynamicOverwrite = false
201+
private var isTruncate = false
202+
203+
override def overwriteDynamicPartitions(): WriteBuilder = {
204+
isDynamicOverwrite = true
205+
this
206+
}
207+
208+
override def truncate(): WriteBuilder = {
209+
isTruncate = true
210+
this
211+
}
212+
213+
override def build(): Write = {
214+
val merged = mergedWriteInfo(info)
215+
val partSchema = fileIndex.partitionSchema
216+
buildWrite(merged, partSchema,
217+
Map.empty, isDynamicOverwrite, isTruncate)
218+
}
219+
}
220+
}
221+
177222
}
178223

179224
object FileTable {
180-
private val CAPABILITIES = util.EnumSet.of(BATCH_READ, BATCH_WRITE)
225+
private val CAPABILITIES = util.EnumSet.of(
226+
BATCH_READ, BATCH_WRITE, TRUNCATE, OVERWRITE_DYNAMIC)
181227
}

0 commit comments

Comments
 (0)