Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
Expand Down Expand Up @@ -443,8 +442,7 @@ case class CatalogTable(
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None,
streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] = None)
viewOriginalText: Option[String] = None)
extends MetadataMapSupport {

import CatalogTable._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.classic.Dataset
Expand Down Expand Up @@ -101,7 +102,9 @@ case class DataSource(
partitionColumns: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty,
catalogTable: Option[CatalogTable] = None) extends SessionStateHelper with Logging {
catalogTable: Option[CatalogTable] = None,
userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] = None)
extends SessionStateHelper with Logging {

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,14 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
extraOptions: CaseInsensitiveStringMap,
sourceIdentifyingName: StreamingSourceIdentifyingName
): StreamingRelation = {
// Set the source identifying name on the CatalogTable so it propagates to StreamingRelation
val tableWithSourceName = table.copy(
streamingSourceIdentifyingName = Some(sourceIdentifyingName))
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, tableWithSourceName)
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
val dataSource = DataSource(
SparkSession.active,
className = tableWithSourceName.provider.get,
userSpecifiedSchema = Some(tableWithSourceName.schema),
className = table.provider.get,
userSpecifiedSchema = Some(table.schema),
options = dsOptions,
catalogTable = Some(tableWithSourceName))
catalogTable = Some(table),
userSpecifiedStreamingSourceName = Some(sourceIdentifyingName))
StreamingRelation(dataSource)
}

Expand Down Expand Up @@ -344,29 +342,23 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
// to preserve the source identifying name. With resolveOperators (bottom-up), the child
// is processed first but doesn't match the case above due to the !isStreaming guard,
// so the NamedStreamingRelation case here can match.
// We set the sourceIdentifyingName on the CatalogTable so it propagates to StreamingRelation.
// We pass the sourceIdentifyingName directly to getStreamingRelation.
case NamedStreamingRelation(u: UnresolvedCatalogRelation, sourceIdentifyingName) =>
val tableWithSourceName = u.tableMeta.copy(
streamingSourceIdentifyingName = Some(sourceIdentifyingName))
resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName))
getStreamingRelation(u.tableMeta, u.options, sourceIdentifyingName)

// Handle NamedStreamingRelation wrapping SubqueryAlias(UnresolvedCatalogRelation)
// - this happens when resolving streaming tables from catalogs where the table lookup
// creates a SubqueryAlias wrapper around the UnresolvedCatalogRelation.
case NamedStreamingRelation(
SubqueryAlias(alias, u: UnresolvedCatalogRelation), sourceIdentifyingName) =>
val tableWithSourceName = u.tableMeta.copy(
streamingSourceIdentifyingName = Some(sourceIdentifyingName))
val resolved = resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName))
val resolved = getStreamingRelation(u.tableMeta, u.options, sourceIdentifyingName)
SubqueryAlias(alias, resolved)

// Fallback for streaming UnresolvedCatalogRelation that is NOT wrapped in
// NamedStreamingRelation (e.g., from .readStream.table() API path).
// The sourceIdentifyingName defaults to Unassigned via
// tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
// in resolveUnresolvedCatalogRelation.
// The sourceIdentifyingName defaults to Unassigned.
case u: UnresolvedCatalogRelation if u.isStreaming =>
resolveUnresolvedCatalogRelation(u)
getStreamingRelation(u.tableMeta, u.options, Unassigned)

case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _,
Expand All @@ -392,11 +384,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
case UnresolvedCatalogRelation(tableMeta, _, false) =>
DDLUtils.readHiveTable(tableMeta)

// For streaming, the sourceIdentifyingName is read from
// tableMeta.streamingSourceIdentifyingName which was set by the caller.
// For streaming, the sourceIdentifyingName defaults to Unassigned.
// Callers that have a specific sourceIdentifyingName should call
// getStreamingRelation directly instead of this method.
case UnresolvedCatalogRelation(tableMeta, extraOptions, true) =>
val sourceIdentifyingName = tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned)
getStreamingRelation(tableMeta, extraOptions, sourceIdentifyingName)
getStreamingRelation(tableMeta, extraOptions, Unassigned)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns

object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
// Extract source identifying name from CatalogTable for stable checkpoints
val sourceIdentifyingName = dataSource.catalogTable
.flatMap(_.streamingSourceIdentifyingName)
.getOrElse(Unassigned)
// Extract source identifying name from DataSource for stable checkpoints
val sourceIdentifyingName = dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned)
StreamingRelation(
dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema),
sourceIdentifyingName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ class StreamRelationSuite extends SharedSparkSession with AnalysisTest {
val catalogTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier("t")
)
// During streaming resolution, the CatalogTable gets streamingSourceIdentifyingName set
val catalogTableWithSourceName = catalogTable.copy(
streamingSourceIdentifyingName = Some(Unassigned))
val idAttr = AttributeReference(name = "id", dataType = IntegerType)()

val expectedAnalyzedPlan = Project(
Expand All @@ -148,7 +145,8 @@ class StreamRelationSuite extends SharedSparkSession with AnalysisTest {
"path" -> catalogTable.location.toString
),
userSpecifiedSchema = Option(catalogTable.schema),
catalogTable = Option(catalogTableWithSourceName)
catalogTable = Option(catalogTable),
userSpecifiedStreamingSourceName = Some(Unassigned)
),
sourceName = s"FileSource[${catalogTable.location.toString}]",
output = Seq(idAttr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends SharedSparkSession {
assert(streamingRelation.get.sourceIdentifyingName == Unassigned,
s"Expected Unassigned but got ${streamingRelation.get.sourceIdentifyingName}")

// Verify the CatalogTable in DataSource has the name set
val catalogTable = streamingRelation.get.dataSource.catalogTable
assert(catalogTable.isDefined, "Expected CatalogTable in DataSource")
assert(catalogTable.get.streamingSourceIdentifyingName == Some(Unassigned),
s"Expected Some(Unassigned) but got ${catalogTable.get.streamingSourceIdentifyingName}")
// Verify the DataSource has the sourceIdentifyingName set
val dsSourceName = streamingRelation.get.dataSource.userSpecifiedStreamingSourceName
assert(dsSourceName == Some(Unassigned),
s"Expected Some(Unassigned) but got $dsSourceName")
}
}

Expand Down