Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -43,13 +43,14 @@ case class AvroTable(
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
override def build(): Write =
AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
createFileWriteBuilder(info) {
(mergedInfo, partSchema, customLocs, dynamicOverwrite, truncate) =>
AvroWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, customLocs,
dynamicOverwrite, truncate)
}
}

override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)

override def formatName: String = "AVRO"
override def formatName: String = "Avro"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ case class AvroWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo) extends FileWrite {
info: LogicalWriteInfo,
partitionSchema: StructType,
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
override val dynamicPartitionOverwrite: Boolean,
override val isTruncate: Boolean) extends FileWrite {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
if (curmode == SaveMode.Append) {
AppendData.byName(relation, df.logicalPlan, finalOptions)
} else {
// Truncate the table. TableCapabilityCheck will throw a nice exception if this
// isn't supported
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
val dynamicOverwrite =
df.sparkSession.sessionState.conf.partitionOverwriteMode ==
PartitionOverwriteMode.DYNAMIC &&
partitioningColumns.exists(_.nonEmpty)
if (dynamicOverwrite) {
OverwritePartitionsDynamic.byName(
relation, df.logicalPlan, finalOptions)
} else {
OverwriteByExpression.byName(
relation, df.logicalPlan, Literal(true), finalOptions)
}
}

case createMode =>
Expand Down Expand Up @@ -318,7 +325,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
}

val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined
// TODO(SPARK-56175): File source V2 does not support
// insertInto for catalog tables yet.
val canUseV2 = lookupV2Provider() match {
case Some(_: FileDataSourceV2) => false
case Some(_) => true
case None => false
}

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
Expand Down Expand Up @@ -438,9 +451,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val session = df.sparkSession
val v2ProviderOpt = lookupV2Provider()
val canUseV2 = v2ProviderOpt.isDefined || (hasCustomSessionCatalog &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
// TODO(SPARK-56230): File source V2 does not support
// saveAsTable yet. Always use V1 for file sources.
val v2ProviderOpt = lookupV2Provider().flatMap {
case _: FileDataSourceV2 => None
case other => Some(other)
}
val canUseV2 = v2ProviderOpt.isDefined ||
(hasCustomSessionCatalog &&
!df.sparkSession.sessionState.catalogManager
.catalog(CatalogManager.SESSION_CATALOG_NAME)
.isInstanceOf[CatalogExtension])

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
Expand Down Expand Up @@ -595,8 +615,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram

private def lookupV2Provider(): Option[TableProvider] = {
DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
// TODO(SPARK-28396): File source v2 write path is currently broken.
case Some(_: FileDataSourceV2) => None
// File source V2 supports non-partitioned Append and
// Overwrite via DataFrame API (df.write.save(path)).
// Fall back to V1 for:
// - ErrorIfExists/Ignore (TODO: SPARK-56174)
// - Partitioned writes (TODO: SPARK-56174)
case Some(_: FileDataSourceV2)
if (curmode != SaveMode.Append
&& curmode != SaveMode.Overwrite)
|| partitioningColumns.exists(_.nonEmpty) =>
None
case other => other
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources

import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext
Expand Down Expand Up @@ -104,6 +105,14 @@ abstract class FileFormatDataWriter(
}
}

/**
* Override writeAll to ensure V2 DataWriter.writeAll path also wraps
* errors with TASK_WRITE_FAILED, matching V1 behavior.
*/
override def writeAll(records: java.util.Iterator[InternalRow]): Unit = {
writeWithIterator(records.asScala)
}

/** Write an iterator of records. */
def writeWithIterator(iterator: Iterator[InternalRow]): Unit = {
var count = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val nameParts = ident.toQualifiedNameParts(catalog)
cacheManager.recacheTableOrView(session, nameParts, includeTimeTravel = false)
case _ =>
cacheManager.recacheByPlan(session, r)
r.table match {
case ft: FileTable =>
ft.fileIndex.refresh()
val path = new Path(ft.fileIndex.rootPaths.head.toUri)
val fs = path.getFileSystem(hadoopConf)
cacheManager.recacheByPath(session, path, fs)
case _ =>
cacheManager.recacheByPlan(session, r)
}
}

private def recacheTable(r: ResolvedTable, includeTimeTravel: Boolean)(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,11 @@ private[sql] object DataSourceV2Utils extends Logging {
// `HiveFileFormat`, when running tests in sql/core.
if (DDLUtils.isHiveTable(Some(provider))) return None
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p)
// TODO(SPARK-56175): File source V2 catalog table loading
// is not yet fully supported (stats, partition management,
// data type validation gaps).
case Some(_: FileDataSourceV2) => None
case Some(p) => Some(p)
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
LogicalWriteInfoImpl, SupportsDynamicOverwrite,
SupportsTruncate, Write, WriteBuilder}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
Expand All @@ -49,18 +51,27 @@ abstract class FileTable(
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
// We are reading from the results of a streaming query. We will load files from
// the metadata log instead of listing them using HDFS APIs.
// When userSpecifiedSchema is provided (e.g., write path via DataFrame API), the path
// may not exist yet. Skip streaming metadata check and file existence checks.
val isStreamingMetadata = userSpecifiedSchema.isEmpty &&
FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)
if (isStreamingMetadata) {
new MetadataLogFileIndex(sparkSession, new Path(paths.head),
options.asScala.toMap, userSpecifiedSchema)
} else {
// This is a non-streaming file based datasource.
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true, enableGlobbing = globPaths)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
val checkFilesExist = userSpecifiedSchema.isEmpty
val rootPathsSpecified =
DataSource.checkAndGlobPathIfNecessary(
paths, hadoopConf,
checkEmptyGlobPath = checkFilesExist,
checkFilesExist = checkFilesExist,
enableGlobbing = globPaths)
val fileStatusCache =
FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
sparkSession, rootPathsSpecified,
caseSensitiveMap, userSpecifiedSchema,
fileStatusCache)
}
}

Expand Down Expand Up @@ -174,8 +185,43 @@ abstract class FileTable(
writeInfo.rowIdSchema(),
writeInfo.metadataSchema())
}

/**
* Creates a [[WriteBuilder]] that supports truncate and
* dynamic partition overwrite for file-based tables.
*/
protected def createFileWriteBuilder(
info: LogicalWriteInfo)(
buildWrite: (LogicalWriteInfo, StructType,
Map[Map[String, String], String],
Boolean, Boolean) => Write
): WriteBuilder = {
new WriteBuilder with SupportsDynamicOverwrite with SupportsTruncate {
private var isDynamicOverwrite = false
private var isTruncate = false

override def overwriteDynamicPartitions(): WriteBuilder = {
isDynamicOverwrite = true
this
}

override def truncate(): WriteBuilder = {
isTruncate = true
this
}

override def build(): Write = {
val merged = mergedWriteInfo(info)
val partSchema = fileIndex.partitionSchema
buildWrite(merged, partSchema,
Map.empty, isDynamicOverwrite, isTruncate)
}
}
}

}

object FileTable {
private val CAPABILITIES = util.EnumSet.of(BATCH_READ, BATCH_WRITE)
private val CAPABILITIES = util.EnumSet.of(
BATCH_READ, BATCH_WRITE, TRUNCATE, OVERWRITE_DYNAMIC)
}
Loading