diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index eadaafea81a5..e921b90ce8ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2142,6 +2142,776 @@ class DataSourceV2DataFrameSuite } } + // ===================================================================== + // Helper for simulating external schema changes. + // + // In production, external changes come from a different process hitting + // the same metastore. With InMemoryTableCatalog, we simulate external + // schema modifications by calling the catalog API directly -- this + // bypasses SQL-level session tracking (cache invalidation, version + // refresh) making it equivalent to an external process modifying the + // metastore. For external data writes, SQL INSERT writes to the shared + // InMemoryTable -- this is external from the perspective of any + // pre-analyzed DataFrame or temp view that captured an earlier version. + // ===================================================================== + private def externalAlterTable(ident: Identifier, changes: TableChange*): Unit = { + catalog("testcat").alterTable(ident, changes: _*) + } + + // ===================================================================== + // Section 1: Temp views with stored plans - missing scenarios + // ===================================================================== + + test("temp view: drop and add column with same name and same type (session)") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create a temp view from DataFrame with a filter + spark.table(t).filter("salary < 999").createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // drop and add column with the same name and type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary INT") + + // insert new data + sql(s"INSERT INTO $t VALUES (2, 200)") + + // temp view resolves salary by name; InMemoryTable preserves old data values + // through column drop+add, so both rows are visible through the filter + checkAnswer(spark.table("tmp"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("temp view: drop and add column with same name but different type (session)") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create a temp view from DataFrame + spark.table(t).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // drop and add column with same name but different type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary STRING") + + // accessing temp view should detect incompatible type change + checkError( + exception = intercept[AnalysisException] { spark.table("tmp").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`tmp`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + test("temp view: type widening INT to BIGINT (session)") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create a temp view + spark.table(t).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // widen the type from INT to BIGINT + sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE BIGINT") + + // accessing temp view should detect type change + checkError( + exception = intercept[AnalysisException] { spark.table("tmp").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`tmp`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + + test("temp view: external data write refreshes view") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create temp view from DataFrame + spark.table(t).filter("salary < 999").createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // insert more data via session (simulating what an external writer would do) + sql(s"INSERT INTO $t VALUES (2, 200)") + + // temp view should pick up new data + checkAnswer(spark.table("tmp"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("temp view: external column addition preserves view schema") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create temp view + spark.table(t).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // external: add column via catalog API (bypasses session tracking) + externalAlterTable(ident, TableChange.addColumn(Array("new_column"), IntegerType, true)) + + // external: insert data with new column + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + + // temp view should preserve original schema (2 columns) but pick up new data + checkAnswer(spark.table("tmp"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("temp view: external column removal fails view") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT, extra STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100, 'x')") + + // create temp view + spark.table(t).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100, "x"))) + + // external: remove column via catalog API (bypasses session tracking) + externalAlterTable(ident, TableChange.deleteColumn(Array("extra"), false)) + + // accessing temp view should fail + checkError( + exception = intercept[AnalysisException] { spark.table("tmp").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`tmp`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `extra` STRING has been removed")) + } + } + + test("temp view: external drop and recreate resolves to new table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create temp view + spark.table(t).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // drop and recreate table + sql(s"DROP TABLE $t") + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + + // temp view should resolve to the new (empty) table + checkAnswer(spark.table("tmp"), Seq.empty) + + // insert new data and verify view reflects it + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("tmp"), Seq(Row(2, 200))) + } + } + + test("temp view survives multiple successive column additions") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create temp view + spark.table(t).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // add columns one at a time + sql(s"ALTER TABLE $t ADD COLUMN col1 INT") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + sql(s"ALTER TABLE $t ADD COLUMN col2 STRING") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + sql(s"ALTER TABLE $t ADD COLUMN col3 BOOLEAN") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // insert data with all new columns and verify view still shows original schema + sql(s"INSERT INTO $t VALUES (2, 200, 10, 'x', true)") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("temp view with subquery referencing same table refreshes all references") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10), (2, 20)") + + // create temp view with subquery referencing same table + spark.sql( + s"""SELECT * FROM $t WHERE id IN (SELECT id FROM $t WHERE value > 5)""" + ).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 10), Row(2, 20))) + + // insert more data + sql(s"INSERT INTO $t VALUES (3, 30)") + + // both table references in the subquery should refresh + checkAnswer(spark.table("tmp"), Seq(Row(1, 10), Row(2, 20), Row(3, 30))) + } + } + + test("temp view filter pushdown with refreshed table version") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + // create temp view with filter + spark.table(t).filter("salary < 999").createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // insert data that passes the filter + sql(s"INSERT INTO $t VALUES (2, 200)") + + // filter should apply to new data too + checkAnswer(spark.table("tmp"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // ===================================================================== + // Section 2: Repeated table access with external changes (regression) + // ===================================================================== + + test("repeated SELECT picks up external data writes") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // first query + checkAnswer(spark.sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // insert more data (simulating external write) + sql(s"INSERT INTO $t VALUES (2, 200)") + + // second query should reflect new data + checkAnswer(spark.sql(s"SELECT * FROM $t"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("repeated SELECT picks up external schema changes") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // first query + checkAnswer(spark.sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // external: add column via catalog API (bypasses session tracking) + externalAlterTable(ident, TableChange.addColumn(Array("new_col"), StringType, true)) + + // second query should reflect new schema + checkAnswer(spark.sql(s"SELECT * FROM $t"), Seq(Row(1, 100, null))) + } + } + + test("repeated SELECT after external drop and recreate") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // first query + checkAnswer(spark.sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // drop and recreate + sql(s"DROP TABLE $t") + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + + // second query should reflect new (empty) table + checkAnswer(spark.sql(s"SELECT * FROM $t"), Seq.empty) + } + } + + // ===================================================================== + // Section 3: Incrementally constructed queries - missing scenarios + // ===================================================================== + + test("join: drop and add column with same name and same type between df1 and df2") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create first DataFrame + val df1 = spark.table(t) + + // drop and add column with same name and type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary INT") + + // insert data + sql(s"INSERT INTO $t VALUES (2, 200)") + + // create second DataFrame + val df2 = spark.table(t) + + // join refreshes versions - both use latest version + // InMemoryTable preserves old data values through column drop+add + val joined = df1.join(df2, df1("id") === df2("id")) + checkAnswer(joined, Seq( + Row(1, 100, 1, 100), + Row(2, 200, 2, 200))) + } + } + + test("join: drop and add column with same name but different type between df1 and df2") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create first DataFrame + val df1 = spark.table(t) + + // drop and add column with same name but different type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary STRING") + + // create second DataFrame + val df2 = spark.table(t) + + // join should fail due to incompatible type change + checkError( + exception = intercept[AnalysisException] { + df1.join(df2, df1("id") === df2("id")).collect() + }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + test("three-way join with same table at different points in time") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10)") + + // version X + val df1 = spark.table(t) + + sql(s"INSERT INTO $t VALUES (2, 20)") + + // version X+1 + val df2 = spark.table(t) + + sql(s"INSERT INTO $t VALUES (3, 30)") + + // version X+2 + val df3 = spark.table(t) + + // all three should align to latest version in the joined plan + val joined = df1.join(df2, df1("id") === df2("id")) + .join(df3, df1("id") === df3("id")) + assert(joined.count() == 3) + } + } + + // ===================================================================== + // Section 4: Version pinning and refresh in Dataset - show vs collect + // ===================================================================== + + test("show() creates new QueryExecution while collect() reuses stale one") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + val df = spark.sql(s"SELECT * FROM $t") + + // first collect pins the QueryExecution + val result1 = df.collect() + assert(result1.length == 1) + + // insert more data + sql(s"INSERT INTO $t VALUES (2, 200)") + + // count() creates a new QueryExecution and sees new data + assert(df.count() == 2) + + // collect() reuses the pinned QueryExecution and sees stale data + // This documents the current inconsistent behavior noted in the design doc + val result2 = df.collect() + assert(result2.length == 1) + } + } + + test("DataFrame after external column addition preserves original schema") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create DataFrame but do NOT execute it yet (no checkAnswer/collect before change) + val df = spark.table(t) + + // external: add column via catalog API and insert data + externalAlterTable(ident, TableChange.addColumn(Array("new_column"), IntegerType, true)) + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + + // DataFrame should preserve original schema (2 columns) but pick up new data + // Column additions are compatible - original schema is preserved + checkAnswer(df, Seq(Row(1, 100), Row(2, 200))) + } + } + + test("DataFrame after external column removal throws AnalysisException") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT, extra STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100, 'x')") + + // create DataFrame but do NOT execute it (no collect before change) + val df = spark.table(t).select($"id", $"salary", $"extra") + + // external: remove column via catalog API (bypasses session tracking) + externalAlterTable(ident, TableChange.deleteColumn(Array("extra"), false)) + + // first access should detect the column removal + checkError( + exception = intercept[AnalysisException] { df.collect() }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "errors" -> "- `extra` STRING has been removed")) + } + } + + test("DataFrame after drop/recreate table detects table ID change") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create DataFrame but do NOT execute it (no collect before change) + val df = spark.table(t) + + val originalId = catalog("testcat").loadTable(ident).id() + + // drop and recreate + sql(s"DROP TABLE $t") + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + + val newId = catalog("testcat").loadTable(ident).id() + assert(originalId != newId) + + // first access should detect table ID change + checkError( + exception = intercept[AnalysisException] { df.collect() }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH", + sqlState = Some("51024"), + parameters = Map( + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "capturedTableId" -> originalId, + "currentTableId" -> newId)) + } + } + + test("DataFrame after drop and add column with same name same type") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create DataFrame but do NOT execute it (no collect before change) + val df = spark.table(t) + + // drop and add column with same name and type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary INT") + + // InMemoryTable preserves data through column drop+add, + // so the version refresh detects a compatible change and succeeds + // The design doc proposes this should eventually fail with column ID mismatch + // once Spark introduces the notion of column IDs + checkAnswer(df, Seq(Row(1, 100))) + } + } + + test("DataFrame after drop and add column with same name different type") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create DataFrame but do NOT execute it (no collect before change) + val df = spark.table(t) + + // drop and add column with same name but different type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary STRING") + + // should detect incompatible type change during version refresh + checkError( + exception = intercept[AnalysisException] { df.collect() }, + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", + parameters = Map( + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + test("DataFrame with multiple actions interleaved with writes") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10)") + + val df = spark.table(t) + + // count creates new QE + assert(df.count() == 1) + + // insert more data + sql(s"INSERT INTO $t VALUES (2, 20)") + + // count creates new QE again and sees new data + assert(df.count() == 2) + + // filter creates a derived DataFrame with new QE + assert(df.filter("id > 0").count() == 2) + + // checkAnswer (like show) creates new QE and sees all data + checkAnswer(df, Seq(Row(1, 10), Row(2, 20))) + } + } + + test("DataFrame used in CTAS after external schema change") { + val s = "testcat.ns1.s" + val t = "testcat.ns1.t" + val ident = Identifier.of(Array("ns1"), "s") + withTable(s, t) { + sql(s"CREATE TABLE $s (id INT, data STRING) USING foo") + sql(s"INSERT INTO $s VALUES (1, 'a'), (2, 'b')") + + // create DataFrame from source + val sourceDF = spark.table(s).filter("id < 10") + + // external: add column via catalog API (bypasses session tracking) + externalAlterTable(ident, TableChange.addColumn(Array("extra"), StringType, true)) + + // CTAS should fail as schema changed externally after analysis + val e = intercept[AnalysisException] { + sourceDF.writeTo(t).createOrReplace() + } + assert(e.message.contains("incompatible changes to table `testcat`.`ns1`.`s`")) + } + } + + // ===================================================================== + // Section 5: CACHE TABLE scenarios (in DataSourceV2DataFrameSuite) + // ===================================================================== + + test("CACHE TABLE pins state against external schema changes") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache the table + sql(s"CACHE TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // external: add column via catalog API (bypasses session tracking) + externalAlterTable(ident, TableChange.addColumn(Array("new_col"), IntegerType, true)) + + // cached state should be pinned - external schema change not visible + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + } + } + + test("CACHE TABLE: session write invalidates then re-pins against external write") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A')") + + // cache the table + sql(s"CACHE TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"))) + + // session write invalidates cache and recaches + sql(s"INSERT INTO $t VALUES (2, 20, 'B')") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"))) + + // external truncate should not affect re-pinned cache + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // cache should still show the session data + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"))) + } + } + + test("CACHE TABLE followed by REFRESH TABLE picks up external changes") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache the table + sql(s"CACHE TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // insert more data + sql(s"INSERT INTO $t VALUES (2, 200)") + + // REFRESH TABLE should refresh the cache + sql(s"REFRESH TABLE $t") + + // should show refreshed data + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("CACHE TABLE: external drop/recreate does not affect cached state") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B')") + + // cache the table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"))) + + // modify table directly to mimic external drop+recreate + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // cached state should still show original data + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"))) + } + } + + // ===================================================================== + // Creative / edge case scenarios + // ===================================================================== + + test("nested temp view on temp view with DSv2 table detects schema changes") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT, extra STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100, 'x')") + + // create temp view v1 on base table + spark.table(t).createOrReplaceTempView("v1") + checkAnswer(spark.table("v1"), Seq(Row(1, 100, "x"))) + + // create temp view v2 on v1 + spark.table("v1").createOrReplaceTempView("v2") + checkAnswer(spark.table("v2"), Seq(Row(1, 100, "x"))) + + // remove column from base table + sql(s"ALTER TABLE $t DROP COLUMN extra") + + // querying v2 should propagate the error from v1's validation + intercept[AnalysisException] { spark.table("v2").collect() } + } + } + + test("self-join with version drift aligns to latest version") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10)") + + val df1 = spark.table(t) + + // insert more data + sql(s"INSERT INTO $t VALUES (2, 20)") + + val df2 = spark.table(t) + + // both should use the latest version after refresh + val joined = df1.join(df2, df1("id") === df2("id")) + assert(joined.count() == 2) + } + } + + test("temp view on DSv2 table: REFRESH is a no-op for DataFrame temp views") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create temp view from DataFrame + spark.table(t).createOrReplaceTempView("tmp") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // REFRESH should be a no-op since temp views from DataFrames + // always reload the table from catalog on each access + sql("REFRESH TABLE tmp") + + // view should still work normally + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // insert more data and verify view still works + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("tmp"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("cached temp view: session write invalidates cache and recaches") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // create and cache temp view + spark.table(t).createOrReplaceTempView("tmp") + sql("CACHE TABLE tmp") + assertCached(spark.table("tmp")) + checkAnswer(spark.table("tmp"), Seq(Row(1, 100))) + + // session write should invalidate the cache + sql(s"INSERT INTO $t VALUES (2, 200)") + + // temp view should reflect new data + checkAnswer(spark.table("tmp"), Seq(Row(1, 100), Row(2, 200))) + } + } + test("CTAS/RTAS should trigger two query executions") { // CTAS/RTAS triggers 2 query executions: // 1. The outer CTAS/RTAS command execution