From 1a70260e1f957ee768707e1db7710c04cc1ea437 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Fri, 6 Feb 2026 12:42:02 -0800 Subject: [PATCH 1/2] moving sourceIdentifyingName from catalogtable to datasource --- .../sql/catalyst/catalog/interface.scala | 4 +-- .../execution/datasources/DataSource.scala | 5 ++- .../datasources/DataSourceStrategy.scala | 36 ++++++++----------- .../streaming/runtime/StreamingRelation.scala | 6 ++-- .../streaming/StreamRelationSuite.scala | 6 ++-- .../StreamingSourceIdentifyingNameSuite.scala | 9 +++-- 6 files changed, 27 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 7d15551afe0c4..fcaea47095040 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -41,7 +41,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.connector.catalog.CatalogManager @@ -443,8 +442,7 @@ case class CatalogTable( tracksPartitionsInCatalog: Boolean = false, schemaPreservesCase: Boolean = true, ignoredProperties: Map[String, String] = Map.empty, - viewOriginalText: Option[String] = None, - streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] = None) + viewOriginalText: Option[String] = None) extends MetadataMapSupport { import CatalogTable._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 35588df11bfca..60fd4fc68c462 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.classic.Dataset @@ -101,7 +102,9 @@ case class DataSource( partitionColumns: Seq[String] = Seq.empty, bucketSpec: Option[BucketSpec] = None, options: Map[String, String] = Map.empty, - catalogTable: Option[CatalogTable] = None) extends SessionStateHelper with Logging { + catalogTable: Option[CatalogTable] = None, + streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] = None) + extends SessionStateHelper with Logging { case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 436a760c5b6fa..2cf10a5cce525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -298,16 +298,14 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] extraOptions: CaseInsensitiveStringMap, sourceIdentifyingName: StreamingSourceIdentifyingName ): StreamingRelation = { - // Set the source identifying name on the CatalogTable so it propagates to StreamingRelation - val tableWithSourceName = table.copy( - streamingSourceIdentifyingName = Some(sourceIdentifyingName)) - val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, tableWithSourceName) + val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) val dataSource = DataSource( SparkSession.active, - className = tableWithSourceName.provider.get, - userSpecifiedSchema = Some(tableWithSourceName.schema), + className = table.provider.get, + userSpecifiedSchema = Some(table.schema), options = dsOptions, - catalogTable = Some(tableWithSourceName)) + catalogTable = Some(table), + streamingSourceIdentifyingName = Some(sourceIdentifyingName)) StreamingRelation(dataSource) } @@ -344,29 +342,23 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] // to preserve the source identifying name. With resolveOperators (bottom-up), the child // is processed first but doesn't match the case above due to the !isStreaming guard, // so the NamedStreamingRelation case here can match. - // We set the sourceIdentifyingName on the CatalogTable so it propagates to StreamingRelation. + // We pass the sourceIdentifyingName directly to getStreamingRelation. case NamedStreamingRelation(u: UnresolvedCatalogRelation, sourceIdentifyingName) => - val tableWithSourceName = u.tableMeta.copy( - streamingSourceIdentifyingName = Some(sourceIdentifyingName)) - resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName)) + getStreamingRelation(u.tableMeta, u.options, sourceIdentifyingName) // Handle NamedStreamingRelation wrapping SubqueryAlias(UnresolvedCatalogRelation) // - this happens when resolving streaming tables from catalogs where the table lookup // creates a SubqueryAlias wrapper around the UnresolvedCatalogRelation. case NamedStreamingRelation( SubqueryAlias(alias, u: UnresolvedCatalogRelation), sourceIdentifyingName) => - val tableWithSourceName = u.tableMeta.copy( - streamingSourceIdentifyingName = Some(sourceIdentifyingName)) - val resolved = resolveUnresolvedCatalogRelation(u.copy(tableMeta = tableWithSourceName)) + val resolved = getStreamingRelation(u.tableMeta, u.options, sourceIdentifyingName) SubqueryAlias(alias, resolved) // Fallback for streaming UnresolvedCatalogRelation that is NOT wrapped in // NamedStreamingRelation (e.g., from .readStream.table() API path). - // The sourceIdentifyingName defaults to Unassigned via - // tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned) - // in resolveUnresolvedCatalogRelation. + // The sourceIdentifyingName defaults to Unassigned. case u: UnresolvedCatalogRelation if u.isStreaming => - resolveUnresolvedCatalogRelation(u) + getStreamingRelation(u.tableMeta, u.options, Unassigned) case s @ StreamingRelationV2( _, _, table, extraOptions, _, _, _, @@ -392,11 +384,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] case UnresolvedCatalogRelation(tableMeta, _, false) => DDLUtils.readHiveTable(tableMeta) - // For streaming, the sourceIdentifyingName is read from - // tableMeta.streamingSourceIdentifyingName which was set by the caller. + // For streaming, the sourceIdentifyingName defaults to Unassigned. + // Callers that have a specific sourceIdentifyingName should call + // getStreamingRelation directly instead of this method. case UnresolvedCatalogRelation(tableMeta, extraOptions, true) => - val sourceIdentifyingName = tableMeta.streamingSourceIdentifyingName.getOrElse(Unassigned) - getStreamingRelation(tableMeta, extraOptions, sourceIdentifyingName) + getStreamingRelation(tableMeta, extraOptions, Unassigned) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala index 827a2f1d066dd..fc08f6608136b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala @@ -35,10 +35,8 @@ import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { - // Extract source identifying name from CatalogTable for stable checkpoints - val sourceIdentifyingName = dataSource.catalogTable - .flatMap(_.streamingSourceIdentifyingName) - .getOrElse(Unassigned) + // Extract source identifying name from DataSource for stable checkpoints + val sourceIdentifyingName = dataSource.streamingSourceIdentifyingName.getOrElse(Unassigned) StreamingRelation( dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema), sourceIdentifyingName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala index 642aaeb48392b..a632182025fc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala @@ -130,9 +130,6 @@ class StreamRelationSuite extends SharedSparkSession with AnalysisTest { val catalogTable = spark.sessionState.catalog.getTableMetadata( TableIdentifier("t") ) - // During streaming resolution, the CatalogTable gets streamingSourceIdentifyingName set - val catalogTableWithSourceName = catalogTable.copy( - streamingSourceIdentifyingName = Some(Unassigned)) val idAttr = AttributeReference(name = "id", dataType = IntegerType)() val expectedAnalyzedPlan = Project( @@ -148,7 +145,8 @@ class StreamRelationSuite extends SharedSparkSession with AnalysisTest { "path" -> catalogTable.location.toString ), userSpecifiedSchema = Option(catalogTable.schema), - catalogTable = Option(catalogTableWithSourceName) + catalogTable = Option(catalogTable), + streamingSourceIdentifyingName = Some(Unassigned) ), sourceName = s"FileSource[${catalogTable.location.toString}]", output = Seq(idAttr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala index 91c36fcb7935e..fd7aae9bc9136 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala @@ -43,11 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends SharedSparkSession { assert(streamingRelation.get.sourceIdentifyingName == Unassigned, s"Expected Unassigned but got ${streamingRelation.get.sourceIdentifyingName}") - // Verify the CatalogTable in DataSource has the name set - val catalogTable = streamingRelation.get.dataSource.catalogTable - assert(catalogTable.isDefined, "Expected CatalogTable in DataSource") - assert(catalogTable.get.streamingSourceIdentifyingName == Some(Unassigned), - s"Expected Some(Unassigned) but got ${catalogTable.get.streamingSourceIdentifyingName}") + // Verify the DataSource has the sourceIdentifyingName set + val dsSourceName = streamingRelation.get.dataSource.streamingSourceIdentifyingName + assert(dsSourceName == Some(Unassigned), + s"Expected Some(Unassigned) but got $dsSourceName") } } From 4886e29cba207f35ee393b8bb2b14ae181de01c2 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Fri, 6 Feb 2026 15:42:42 -0800 Subject: [PATCH 2/2] renaming to userSpecifiedStreamingSourceName --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- .../sql/execution/streaming/runtime/StreamingRelation.scala | 2 +- .../spark/sql/execution/streaming/StreamRelationSuite.scala | 2 +- .../sql/streaming/StreamingSourceIdentifyingNameSuite.scala | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 60fd4fc68c462..a48c71ad0d362 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -103,7 +103,7 @@ case class DataSource( bucketSpec: Option[BucketSpec] = None, options: Map[String, String] = Map.empty, catalogTable: Option[CatalogTable] = None, - streamingSourceIdentifyingName: Option[StreamingSourceIdentifyingName] = None) + userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] = None) extends SessionStateHelper with Logging { case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 2cf10a5cce525..220b0b8a63013 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -305,7 +305,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] userSpecifiedSchema = Some(table.schema), options = dsOptions, catalogTable = Some(table), - streamingSourceIdentifyingName = Some(sourceIdentifyingName)) + userSpecifiedStreamingSourceName = Some(sourceIdentifyingName)) StreamingRelation(dataSource) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala index fc08f6608136b..232f3475bb1bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { // Extract source identifying name from DataSource for stable checkpoints - val sourceIdentifyingName = dataSource.streamingSourceIdentifyingName.getOrElse(Unassigned) + val sourceIdentifyingName = dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned) StreamingRelation( dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema), sourceIdentifyingName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala index a632182025fc3..599a48c642085 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala @@ -146,7 +146,7 @@ class StreamRelationSuite extends SharedSparkSession with AnalysisTest { ), userSpecifiedSchema = Option(catalogTable.schema), catalogTable = Option(catalogTable), - streamingSourceIdentifyingName = Some(Unassigned) + userSpecifiedStreamingSourceName = Some(Unassigned) ), sourceName = s"FileSource[${catalogTable.location.toString}]", output = Seq(idAttr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala index fd7aae9bc9136..caf6c14c8000c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala @@ -44,7 +44,7 @@ class StreamingSourceIdentifyingNameSuite extends SharedSparkSession { s"Expected Unassigned but got ${streamingRelation.get.sourceIdentifyingName}") // Verify the DataSource has the sourceIdentifyingName set - val dsSourceName = streamingRelation.get.dataSource.streamingSourceIdentifyingName + val dsSourceName = streamingRelation.get.dataSource.userSpecifiedStreamingSourceName assert(dsSourceName == Some(Unassigned), s"Expected Some(Unassigned) but got $dsSourceName") }