From 4b01f4769f4a7fab1f9eb66e8f33b1cf28699692 Mon Sep 17 00:00:00 2001 From: kination Date: Sun, 8 Feb 2026 23:26:46 +0900 Subject: [PATCH 1/2] Create catalog for Hive metastore support --- .../io/kination/vine/catalog/Catalog.scala | 295 ++++++++++++++++++ .../vine/catalog/CatalogAwareReader.scala | 149 +++++++++ .../kination/vine/catalog/CatalogConfig.scala | 40 +++ .../vine/catalog/HiveMetastoreClient.scala | 229 ++++++++++++++ .../vine/catalog/MetadataConverter.scala | 113 +++++++ 5 files changed, 826 insertions(+) create mode 100644 vine-spark/src/main/scala/io/kination/vine/catalog/Catalog.scala create mode 100644 vine-spark/src/main/scala/io/kination/vine/catalog/CatalogAwareReader.scala create mode 100644 vine-spark/src/main/scala/io/kination/vine/catalog/CatalogConfig.scala create mode 100644 vine-spark/src/main/scala/io/kination/vine/catalog/HiveMetastoreClient.scala create mode 100644 vine-spark/src/main/scala/io/kination/vine/catalog/MetadataConverter.scala diff --git a/vine-spark/src/main/scala/io/kination/vine/catalog/Catalog.scala b/vine-spark/src/main/scala/io/kination/vine/catalog/Catalog.scala new file mode 100644 index 0000000..8b8380b --- /dev/null +++ b/vine-spark/src/main/scala/io/kination/vine/catalog/Catalog.scala @@ -0,0 +1,295 @@ +package io.kination.vine.catalog + +import io.kination.vine.{VineDataSource, VineModule} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.SparkSession + +import java.util +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success} + +/** + * Catalog implementation for Hive Metastore integration. + * This allows Spark to discover and query Vine tables registered in Hive Metastore. + * + * Usage in spark-defaults.conf: + * {{{ + * spark.sql.catalog.vine = io.kination.vine.catalog.Catalog + * spark.sql.catalog.vine.hive.metastore.uris = thrift://localhost:9083 + * spark.sql.catalog.vine.auto-register-partitions = true + * }}} + * + * Then in Spark SQL: + * {{{ + * spark.sql("SELECT * FROM vine.default.events WHERE date = '2024-12-26'") + * }}} + */ +class Catalog extends TableCatalog with SupportsNamespaces { + private var catalogName: String = _ + private var config: Option[CatalogConfig] = None + private var hmsClient: Option[HiveMetastoreClient] = None + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + this.catalogName = name + this.config = CatalogConfig.fromOptions(options) + + // Initialize HMS client if metastore URI is configured + this.config.foreach { cfg => + this.hmsClient = Some(HiveMetastoreClient(cfg)) + println(s"Initialized Vine Catalog '$name' with Hive Metastore: ${cfg.metastoreUri}") + } + + if (this.config.isEmpty) { + println(s"Warning: Vine Catalog '$name' initialized without Hive Metastore configuration") + } + } + + override def name(): String = catalogName + + /** + * List available namespaces (databases). + */ + override def listNamespaces(): Array[Array[String]] = { + hmsClient match { + case Some(client) => + client.getAllDatabases().map(db => Array(db)) + case None => + Array(Array("default")) + } + } + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + if (namespace.length == 0) { + listNamespaces() + } else { + Array.empty + } + } + + /** + * Load namespace properties. + */ + override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = { + if (namespace.length != 1) { + throw new NoSuchNamespaceException(namespace) + } + + val props = new util.HashMap[String, String]() + props.put("namespace", namespace(0)) + props + } + + /** + * Check if namespace exists. + */ + override def namespaceExists(namespace: Array[String]): Boolean = { + if (namespace.length != 1) return false + hmsClient match { + case Some(client) => + client.getAllDatabases().contains(namespace(0)) + case None => + namespace(0) == "default" + } + } + + /** + * Create namespace (not supported - use Hive directly). + */ + override def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit = { + throw new NamespaceAlreadyExistsException(namespace) + } + + /** + * Alter namespace (not supported). + */ + override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = { + throw new UnsupportedOperationException("alterNamespace is not supported") + } + + /** + * Drop namespace (not supported - use Hive directly). + */ + override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { + throw new UnsupportedOperationException("dropNamespace is not supported") + } + + /** + * List tables in a namespace. + */ + override def listTables(namespace: Array[String]): Array[Identifier] = { + // In a full implementation, this would query HMS + // For now, return empty + Array.empty + } + + /** + * Load a table. + */ + override def loadTable(ident: Identifier): Table = { + val tableName = ident.toString + + hmsClient match { + case Some(client) => + val Array(dbName, tblName) = parseIdentifier(ident) + + if (!client.tableExists(dbName, tblName)) { + throw new NoSuchTableException(ident) + } + + // Load table metadata from HMS and create Vine table + // For now, delegate to VineDataSource + val dataSource = new VineDataSource() + // Note: In a full implementation, we'd get the location from HMS + // and pass it to the data source + throw new NoSuchTableException(s"Table loading from HMS not yet fully implemented: $tableName") + + case None => + throw new UnsupportedOperationException("Hive Metastore not configured") + } + } + + /** + * Create a table and register it in Hive Metastore. + */ + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String] + ): Table = { + val Array(dbName, tblName) = parseIdentifier(ident) + val location = properties.get("location") + + if (location == null) { + throw new IllegalArgumentException("Table location must be specified in properties") + } + + hmsClient match { + case Some(client) => + // Check if table already exists + if (client.tableExists(dbName, tblName)) { + throw new TableAlreadyExistsException(ident) + } + + // Register table in HMS + client.registerTable(ident.toString, schema, location) match { + case Success(_) => + // Discover and register existing partitions + if (config.exists(_.autoRegisterPartitions)) { + val registered = client.discoverAndRegisterPartitions(ident.toString, location) + println(s"Auto-registered $registered partitions for table ${ident.toString}") + } + + // Return a Vine table instance + // Note: This is a simplified implementation + throw new UnsupportedOperationException("createTable not yet fully implemented") + + case Failure(e) => + throw new RuntimeException(s"Failed to register table ${ident.toString} in Hive Metastore", e) + } + + case None => + throw new UnsupportedOperationException("Hive Metastore not configured") + } + } + + /** + * Alter table (not supported yet). + */ + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + throw new UnsupportedOperationException("alterTable is not yet supported") + } + + /** + * Drop a table from Hive Metastore. + */ + override def dropTable(ident: Identifier): Boolean = { + hmsClient match { + case Some(client) => + client.dropTable(ident.toString, deleteData = false) match { + case Success(_) => true + case Failure(_) => false + } + + case None => + throw new UnsupportedOperationException("Hive Metastore not configured") + } + } + + /** + * Rename table (not supported yet). + */ + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + throw new UnsupportedOperationException("renameTable is not yet supported") + } + + /** + * Parse Identifier into database and table name. + */ + private def parseIdentifier(ident: Identifier): Array[String] = { + val namespace = ident.namespace() + val tableName = ident.name() + + if (namespace.length == 0) { + Array(config.map(_.database).getOrElse("default"), tableName) + } else if (namespace.length == 1) { + Array(namespace(0), tableName) + } else { + throw new IllegalArgumentException(s"Invalid identifier: $ident") + } + } + + /** + * Get HMS client for external use. + */ + def getHmsClient(): Option[HiveMetastoreClient] = hmsClient + + /** + * Close the catalog and release resources. + */ + def close(): Unit = { + hmsClient.foreach(_.close()) + } +} + +object Catalog { + /** + * Register a Vine table in Hive Metastore from Spark. + * + * Example usage: + * {{{ + * val catalog = spark.sessionState.catalog.asInstanceOf[Catalog] + * Catalog.registerTable( + * catalog, + * "default.events", + * spark.read.format("vine").load("/data/events").schema, + * "/data/events" + * ) + * }}} + */ + def registerTable( + catalog: Catalog, + tableName: String, + schema: StructType, + location: String + ): Unit = { + catalog.getHmsClient() match { + case Some(client) => + client.registerTable(tableName, schema, location) match { + case Success(_) => + val partitions = client.discoverAndRegisterPartitions(tableName, location) + println(s"Successfully registered table $tableName with $partitions partitions") + + case Failure(e) => + throw new RuntimeException(s"Failed to register table $tableName", e) + } + + case None => + throw new UnsupportedOperationException("Hive Metastore not configured in catalog") + } + } +} diff --git a/vine-spark/src/main/scala/io/kination/vine/catalog/CatalogAwareReader.scala b/vine-spark/src/main/scala/io/kination/vine/catalog/CatalogAwareReader.scala new file mode 100644 index 0000000..2fca994 --- /dev/null +++ b/vine-spark/src/main/scala/io/kination/vine/catalog/CatalogAwareReader.scala @@ -0,0 +1,149 @@ +package io.kination.vine.catalog + +import io.kination.vine.{VineModule, VineArrowBridge, VineTypeUtils} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ + +/** + * Catalog reader that can use HMS partition metadata for optimized reads. + * + * This reader provides two main benefits over direct file reading: + * 1. Partition pruning using HMS metadata (faster for selective queries) + * 2. Schema reading from HMS instead of vine_meta.json + */ +object CatalogAwareReader { + + /** + * Read specific partitions from HMS-registered table. + * + * @param spark SparkSession + * @param catalogConfig HMS configuration + * @param tableName Full table name (e.g., "default.events") + * @param partitionFilter Optional partition filter (e.g., Some("2024-12-26") or None for all) + * @return DataFrame containing the data + */ + def read( + spark: SparkSession, + catalogConfig: CatalogConfig, + tableName: String, + partitionFilter: Option[String] = None + ): DataFrame = { + val client = HiveMetastoreClient(catalogConfig) + + try { + val Array(dbName, tblName) = parseTableName(tableName, catalogConfig) + + // Get table metadata from HMS + if (!client.tableExists(dbName, tblName)) { + throw new IllegalArgumentException( + s"Table $tableName not found in Hive Metastore. " + + "Please register the table first using HiveMetastoreClient.registerTable()" + ) + } + + // Get partitions to read + val partitions = partitionFilter match { + case Some(partition) => + if (client.partitionExists(dbName, tblName, partition)) { + Seq(partition) + } else { + throw new IllegalArgumentException(s"Partition $partition not found for table $tableName") + } + case None => + // Read all partitions + client.listPartitions(tableName).map { partSpec => + // Parse "date=2024-12-26" -> "2024-12-26" + partSpec.split("=")(1) + } + } + + if (partitions.isEmpty) { + // No partitions found, return empty DataFrame + // We need schema - try to get from first available metadata + val schema = getSchemaFromHMS(client, dbName, tblName) + return spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + } + + // Read data from selected partitions + val allRows = partitions.flatMap { partition => + readPartition(tableName, partition, catalogConfig) + } + + // Get schema from HMS + val schema = getSchemaFromHMS(client, dbName, tblName) + + spark.createDataFrame(spark.sparkContext.parallelize(allRows), schema) + + } finally { + client.close() + } + } + + /** + * Read data from a specific partition. + */ + private def readPartition( + tableName: String, + partition: String, + config: CatalogConfig + ): Seq[Row] = { + val client = HiveMetastoreClient(config) + + try { + val Array(dbName, tblName) = parseTableName(tableName, config) + + // Get partition location from HMS + val partitionSpec = s"date=$partition" + + // For now, construct path manually (HMS partition location would be used in full implementation) + // In a complete implementation, we'd get this from HMS partition metadata + val basePath = s"/data/$tblName" // This should come from HMS table location + val partitionPath = s"$basePath/$partition" + + // Read data using Arrow IPC + val arrowBytes = VineModule.readDataArrow(partitionPath) + + if (arrowBytes == null || arrowBytes.isEmpty) { + return Seq.empty + } + + // Get schema from HMS + val schema = getSchemaFromHMS(client, dbName, tblName) + + VineArrowBridge.arrowIpcToRows(arrowBytes, schema) + + } finally { + client.close() + } + } + + /** + * Get Spark schema from HMS table metadata. + */ + private def getSchemaFromHMS( + client: HiveMetastoreClient, + dbName: String, + tableName: String + ): StructType = { + // In a full implementation, this would read the HMS table schema + // For now, we'll note this as a TODO + // The HMS client would need a method like: + // client.getTableSchema(dbName, tableName) + + // Placeholder: In reality, you'd convert HMS FieldSchema to Spark StructType + throw new UnsupportedOperationException( + "Schema reading from HMS not yet implemented. " + + "Please use VineReader.read() which reads from vine_meta.json, " + + "or query via Spark SQL using Catalog." + ) + } + + private def parseTableName(fullName: String, config: CatalogConfig): Array[String] = { + val parts = fullName.split("\\.") + if (parts.length == 2) { + parts + } else { + Array(config.database, fullName) + } + } +} diff --git a/vine-spark/src/main/scala/io/kination/vine/catalog/CatalogConfig.scala b/vine-spark/src/main/scala/io/kination/vine/catalog/CatalogConfig.scala new file mode 100644 index 0000000..38c7fa7 --- /dev/null +++ b/vine-spark/src/main/scala/io/kination/vine/catalog/CatalogConfig.scala @@ -0,0 +1,40 @@ +package io.kination.vine.catalog + +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Configuration for Vine Catalog integration with Hive Metastore. + * + * @param metastoreUri Hive Metastore Thrift URI (e.g., "thrift://localhost:9083") + * @param autoRegisterPartitions Whether to automatically register partitions on write + * @param enableAsync Whether to register partitions asynchronously to avoid blocking writes + * @param database Default database name + */ +case class CatalogConfig( + metastoreUri: String, + autoRegisterPartitions: Boolean = true, + enableAsync: Boolean = true, + database: String = "default" +) + +object CatalogConfig { + // Configuration keys + val METASTORE_URI_KEY = "hive.metastore.uris" + val AUTO_REGISTER_PARTITIONS_KEY = "auto-register-partitions" + val ENABLE_ASYNC_KEY = "enable-async" + val DATABASE_KEY = "database" + + /** + * Create config from Spark's CaseInsensitiveStringMap. + */ + def fromOptions(options: CaseInsensitiveStringMap): Option[CatalogConfig] = { + Option(options.get(METASTORE_URI_KEY)).map { uri => + CatalogConfig( + metastoreUri = uri, + autoRegisterPartitions = options.getBoolean(AUTO_REGISTER_PARTITIONS_KEY, true), + enableAsync = options.getBoolean(ENABLE_ASYNC_KEY, true), + database = options.getOrDefault(DATABASE_KEY, "default") + ) + } + } +} diff --git a/vine-spark/src/main/scala/io/kination/vine/catalog/HiveMetastoreClient.scala b/vine-spark/src/main/scala/io/kination/vine/catalog/HiveMetastoreClient.scala new file mode 100644 index 0000000..6efbed7 --- /dev/null +++ b/vine-spark/src/main/scala/io/kination/vine/catalog/HiveMetastoreClient.scala @@ -0,0 +1,229 @@ +package io.kination.vine.catalog + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient +import org.apache.hadoop.hive.metastore.api.{Partition, Table => HiveTable} +import org.apache.spark.sql.types.StructType + +import java.nio.file.{Files, Paths} +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +/** + * Wrapper of Hive MetaStore Client + * + * @param config Vine catalog configuration + */ +class HiveMetastoreClient(config: CatalogConfig) { + private val hiveConf = createHiveConf(config) + @volatile private var client: Option[HiveMetaStoreClient] = None + + /** + * Get or create HMS client. + */ + private def getClient(): HiveMetaStoreClient = { + client.getOrElse { + synchronized { + client.getOrElse { + val newClient = new HiveMetaStoreClient(hiveConf) + client = Some(newClient) + newClient + } + } + } + } + + private def createHiveConf(config: CatalogConfig): HiveConf = { + val conf = new HiveConf() + conf.setVar(HiveConf.ConfVars.METASTOREURIS, config.metastoreUri) + // Skip metastore DB schema version check (client-side only, so okay to skip) + conf.setBoolVar(HiveConf.ConfVars.METASTORE_SCHEMA_VERIFICATION, false) + conf + } + + /** + * Register Vine table in Hive Metastore. + * + * @param tableName Full table name (database.table) + * @param schema Spark schema + * @param location Table location path + * + * @return Success or failure + */ + def registerTable( + tableName: String, + schema: StructType, + location: String + ): Try[Unit] = Try { + val Array(dbName, tblName) = parseTableName(tableName) + + if (tableExists(dbName, tblName)) { + println(s"Table $tableName already exists in Hive Metastore, skipping registration") + return Success(()) + } + + val hiveTable = MetadataConverter.createHiveTable(dbName, tblName, schema, location) + + getClient().createTable(hiveTable) + println(s"Successfully registered Vine table $tableName in Hive Metastore") + } + + /** + * Register partition for table. + * + * @param tableName Full table name (database.table) + * @param partitionValue Partition value (e.g., "2024-12-26") + * @param location Partition location path + * + * @return Success or failure + */ + def registerPartition( + tableName: String, + partitionValue: String, + location: String + ): Try[Unit] = Try { + val Array(dbName, tblName) = parseTableName(tableName) + + if (partitionExists(dbName, tblName, partitionValue)) { + // Skip if partition already exists + return Success(()) + } + + val table = getClient().getTable(dbName, tblName) + val sd = table.getSd + + val partition = new Partition() + partition.setDbName(dbName) + partition.setTableName(tblName) + partition.setValues(List(partitionValue).asJava) + + val partSd = sd.deepCopy() + partSd.setLocation(location) + partition.setSd(partSd) + + getClient().add_partition(partition) + println(s"Successfully registered partition $partitionValue for table $tableName") + } + + /** + * Discover and register all date-based partitions from filesystem. + * + * @param tableName Full table name + * @param basePath Base path to scan for partitions + * + * @return Number of partitions registered + */ + def discoverAndRegisterPartitions( + tableName: String, + basePath: String + ): Int = { + val dateDirs = findDateDirectories(basePath) + var registered = 0 + + dateDirs.foreach { dateDir => + val partitionValue = dateDir.getFileName.toString + val partitionLocation = dateDir.toString + + registerPartition(tableName, partitionValue, partitionLocation) match { + case Success(_) => registered += 1 + case Failure(e) => println(s"Failed to register partition $partitionValue: ${e.getMessage}") + } + } + + registered + } + + /** + * Find date-based partition directories (YYYY-MM-DD pattern). + */ + private def findDateDirectories(basePath: String): Seq[java.nio.file.Path] = { + val base = Paths.get(basePath) + if (!Files.exists(base) || !Files.isDirectory(base)) { + return Seq.empty + } + + val datePattern = "\\d{4}-\\d{2}-\\d{2}".r + + Files.list(base) + .iterator() + .asScala + .filter(p => Files.isDirectory(p) && datePattern.matches(p.getFileName.toString)) + .toSeq + .sorted + } + + /** + * Check if table exists. + */ + def tableExists(dbName: String, tableName: String): Boolean = { + Try(getClient().getTable(dbName, tableName)).isSuccess + } + + /** + * Check if partition exists. + */ + def partitionExists(dbName: String, tableName: String, partitionValue: String): Boolean = { + Try { + getClient().getPartition(dbName, tableName, List(partitionValue).asJava) + }.isSuccess + } + + /** + * Drop table from Hive Metastore. + */ + def dropTable(tableName: String, deleteData: Boolean = false): Try[Unit] = Try { + val Array(dbName, tblName) = parseTableName(tableName) + getClient().dropTable(dbName, tblName, deleteData, true) + println(s"Successfully dropped table $tableName from Hive Metastore") + } + + /** + * List all partitions for table. + */ + def listPartitions(tableName: String): Seq[String] = { + val Array(dbName, tblName) = parseTableName(tableName) + Try { + getClient().listPartitionNames(dbName, tblName, Short.MaxValue) + .asScala + .toSeq + }.getOrElse(Seq.empty) + } + + /** + * Parse table name into database and table. + * Supports both "database.table" and "table" (uses default database). + */ + private def parseTableName(fullName: String): Array[String] = { + val parts = fullName.split("\\.") + if (parts.length == 2) { + parts + } else { + Array(config.database, fullName) + } + } + + /** + * List all databases in Hive Metastore. + */ + def getAllDatabases(): Array[String] = { + getClient().getAllDatabases.asScala.toArray + } + + /** + * Close the HMS client. + */ + def close(): Unit = { + client.foreach(_.close()) + client = None + } +} + +object HiveMetastoreClient { + /** + * Create new HMS client from configuration. + */ + def apply(config: CatalogConfig): HiveMetastoreClient = { + new HiveMetastoreClient(config) + } +} diff --git a/vine-spark/src/main/scala/io/kination/vine/catalog/MetadataConverter.scala b/vine-spark/src/main/scala/io/kination/vine/catalog/MetadataConverter.scala new file mode 100644 index 0000000..1d3531f --- /dev/null +++ b/vine-spark/src/main/scala/io/kination/vine/catalog/MetadataConverter.scala @@ -0,0 +1,113 @@ +package io.kination.vine.catalog + +import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Table => HiveTable} +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.spark.sql.types._ +import scala.collection.JavaConverters._ + +/** + * Converts between Vine metadata and Hive Metastore table structures. + */ +object MetadataConverter { + + /** + * Convert Spark StructType to Hive FieldSchema list. + */ + def sparkSchemaToHiveFields(schema: StructType): java.util.List[FieldSchema] = { + schema.fields.map { field => + new FieldSchema( + field.name, + sparkTypeToHiveType(field.dataType), + s"Vine field: ${field.name}" + ) + }.toList.asJava + } + + /** + * Convert Spark DataType to Hive type string. + */ + def sparkTypeToHiveType(dataType: DataType): String = dataType match { + case ByteType => serdeConstants.TINYINT_TYPE_NAME + case ShortType => serdeConstants.SMALLINT_TYPE_NAME + case IntegerType => serdeConstants.INT_TYPE_NAME + case LongType => serdeConstants.BIGINT_TYPE_NAME + case FloatType => serdeConstants.FLOAT_TYPE_NAME + case DoubleType => serdeConstants.DOUBLE_TYPE_NAME + case StringType => serdeConstants.STRING_TYPE_NAME + case BooleanType => serdeConstants.BOOLEAN_TYPE_NAME + case BinaryType => serdeConstants.BINARY_TYPE_NAME + case DateType => serdeConstants.DATE_TYPE_NAME + case TimestampType => serdeConstants.TIMESTAMP_TYPE_NAME + case _: DecimalType => serdeConstants.DECIMAL_TYPE_NAME + case _ => serdeConstants.STRING_TYPE_NAME // fallback + } + + /** + * Create Hive partition fields (currently just "date" for date-based partitioning). + */ + def createPartitionFields(): java.util.List[FieldSchema] = { + List( + new FieldSchema("date", serdeConstants.STRING_TYPE_NAME, "Date partition (YYYY-MM-DD)") + ).asJava + } + + /** + * Create StorageDescriptor for Vine table. + * Uses Vine-specific format identifiers so other engines don't + * mistakenly try to read .vtx files as Parquet. + * + * Vine connectors (vine-spark, vine-trino) bypass SerDe and read via JNI. + * + */ + def createStorageDescriptor( + schema: StructType, + location: String + ): StorageDescriptor = { + val sd = new StorageDescriptor() + sd.setCols(sparkSchemaToHiveFields(schema)) + sd.setLocation(location) + sd.setInputFormat("io.kination.vine.serde.VortexInputFormat") + sd.setOutputFormat("io.kination.vine.serde.VortexOutputFormat") + + // Set SerDe info + val serDeInfo = new SerDeInfo() + serDeInfo.setName("vine") + serDeInfo.setSerializationLib("io.kination.vine.serde.VortexSerDe") + sd.setSerdeInfo(serDeInfo) + + // Set storage properties + sd.setCompressed(false) + sd.setNumBuckets(-1) + sd.setStoredAsSubDirectories(false) + + sd + } + + /** + * Create Hive Table metadata for Vine table. + */ + def createHiveTable( + dbName: String, + tableName: String, + schema: StructType, + location: String + ): HiveTable = { + val table = new HiveTable() + table.setDbName(dbName) + table.setTableName(tableName) + table.setTableType("EXTERNAL_TABLE") + table.setOwner(System.getProperty("user.name")) + + table.setSd(createStorageDescriptor(schema, location)) + table.setPartitionKeys(createPartitionFields()) + + // Set table parameters + val parameters = new java.util.HashMap[String, String]() + parameters.put("EXTERNAL", "TRUE") + parameters.put("vine.version", "0.2.0") + parameters.put("transient_lastDdlTime", (System.currentTimeMillis() / 1000).toString) + table.setParameters(parameters) + + table + } +} From 1ea0b28d5cd111e42202e88c54195c1d063e72d2 Mon Sep 17 00:00:00 2001 From: kination Date: Mon, 9 Feb 2026 19:58:55 +0900 Subject: [PATCH 2/2] Apply metastore support to vine-spark --- vine-spark/build.sbt | 17 ++- vine-spark/src/main/scala/Main.scala | 5 - .../io/kination/vine/VineBatchWriter.scala | 130 +++++++++++++++++- .../kination/vine/VineStreamingWriter.scala | 128 ++++++++++++++++- 4 files changed, 266 insertions(+), 14 deletions(-) delete mode 100644 vine-spark/src/main/scala/Main.scala diff --git a/vine-spark/build.sbt b/vine-spark/build.sbt index ffdefb6..44caf2d 100644 --- a/vine-spark/build.sbt +++ b/vine-spark/build.sbt @@ -36,6 +36,7 @@ Test / javaOptions ++= Seq( val sparkVersion = "3.4.0" val arrowVersion = "14.0.2" val jacksonVersion = "2.14.3" // Downgrade to fix compatibility with Scala module 2.14.2 +val hiveVersion = "2.3.9" // Hive Metastore version compatible with Spark 3.4 libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion % Provided, @@ -43,7 +44,21 @@ libraryDependencies ++= Seq( "org.scalatest" %% "scalatest" % "3.2.17" % Test, // Apache Arrow for high-performance JNI data transfer "org.apache.arrow" % "arrow-vector" % arrowVersion, - "org.apache.arrow" % "arrow-memory-netty" % arrowVersion + "org.apache.arrow" % "arrow-memory-netty" % arrowVersion, + // Hive Metastore for catalog integration + "org.apache.hive" % "hive-metastore" % hiveVersion % Provided excludeAll( + ExclusionRule(organization = "org.pentaho"), + ExclusionRule(organization = "org.apache.logging.log4j"), + ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"), + ExclusionRule(organization = "log4j", name = "log4j") + ), + "org.apache.hive" % "hive-exec" % hiveVersion % Provided excludeAll( + ExclusionRule(organization = "org.pentaho"), + ExclusionRule(organization = "org.apache.logging.log4j"), + ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"), + ExclusionRule(organization = "log4j", name = "log4j") + ), + "org.apache.thrift" % "libthrift" % "0.12.0" % Provided ) // Force Jackson version downgrade for Spark compatibility diff --git a/vine-spark/src/main/scala/Main.scala b/vine-spark/src/main/scala/Main.scala deleted file mode 100644 index 49fc133..0000000 --- a/vine-spark/src/main/scala/Main.scala +++ /dev/null @@ -1,5 +0,0 @@ -//object Main { -// def main(args: Array[String]): Unit = { -// println("Hello, world!") -// } -//} diff --git a/vine-spark/src/main/scala/io/kination/vine/VineBatchWriter.scala b/vine-spark/src/main/scala/io/kination/vine/VineBatchWriter.scala index 398b113..d28e091 100644 --- a/vine-spark/src/main/scala/io/kination/vine/VineBatchWriter.scala +++ b/vine-spark/src/main/scala/io/kination/vine/VineBatchWriter.scala @@ -1,15 +1,21 @@ package io.kination.vine +import io.kination.vine.catalog.{CatalogConfig, HiveMetastoreClient} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types.StructType +import java.nio.file.{Files, Paths} +import java.time.LocalDate +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + /** * Batch writer for bulk data ingestion. */ object VineBatchWriter { /** - * Write DataFrame using Arrow IPC format. + * Write DataFrame * * @param path Directory path to Vine table (must contain vine_meta.json) * @param df DataFrame to write @@ -22,7 +28,36 @@ object VineBatchWriter { } /** - * Write collection of rows using Arrow IPC format. + * Write DataFrame with optional Hive Metastore catalog integration. + * + * @param path Directory path to output(vine table) + * @param df DataFrame to write + * @param catalogConfig Optional catalog configuration for HMS integration + * @param tableName Optional table name for HMS registration + * + */ + def writeWithCatalog( + path: String, + df: DataFrame, + catalogConfig: Option[CatalogConfig], + tableName: Option[String] = None + ): Unit = { + val rows = df.collect().toSeq + if (rows.isEmpty) return + + // Write data + writeRows(path, rows, df.schema) + + // Register partition in HMS if catalog is configured + catalogConfig.foreach { config => + tableName.foreach { tbl => + registerPartitionInCatalog(path, tbl, df.schema, config) + } + } + } + + /** + * Write collection of rows * * @param path Directory path to write Vine table * @param rows Collection of rows @@ -34,4 +69,95 @@ object VineBatchWriter { val arrowBytes = VineArrowBridge.rowsToArrowIpc(rows, schema) VineModule.batchWriteArrow(path, arrowBytes) } + + /** + * Write collection of rows with catalog integration. + * + * @param path Directory path to write Vine table + * @param rows Collection of rows + * @param schema Schema of the rows + * @param catalogConfig Optional catalog configuration + * @param tableName Optional table name for HMS + * + */ + def writeRowsWithCatalog( + path: String, + rows: Seq[Row], + schema: StructType, + catalogConfig: Option[CatalogConfig], + tableName: Option[String] = None + ): Unit = { + if (rows.isEmpty) return + + // Write data + writeRows(path, rows, schema) + + // Register partition in HMS if catalog is configured + catalogConfig.foreach { config => + tableName.foreach { tbl => + registerPartitionInCatalog(path, tbl, schema, config) + } + } + } + + /** + * Register newly written partition in Hive Metastore. + * Uses the current date to identify the partition. + */ + private def registerPartitionInCatalog( + basePath: String, + tableName: String, + schema: StructType, + config: CatalogConfig + ): Unit = { + val currentDate = LocalDate.now().toString // YYYY-MM-DD + val partitionPath = Paths.get(basePath, currentDate).toString + + // Check if partition directory exists + if (!Files.exists(Paths.get(partitionPath))) { + println(s"Partition directory does not exist: $partitionPath, skipping HMS registration") + return + } + + val registerPartition = () => { + val client = HiveMetastoreClient(config) + try { + // Ensure table is registered + if (!client.tableExists(extractDbName(tableName, config), extractTableName(tableName))) { + client.registerTable(tableName, schema, basePath) match { + case Success(_) => println(s"Registered table $tableName in HMS") + case Failure(e) => println(s"Failed to register table $tableName: ${e.getMessage}") + } + } + + // Register partition + client.registerPartition(tableName, currentDate, partitionPath) match { + case Success(_) => println(s"Registered partition $currentDate for table $tableName in HMS") + case Failure(e) => println(s"Failed to register partition: ${e.getMessage}") + } + } finally { + client.close() + } + } + + // Register asynchronously if enabled + if (config.enableAsync) { + implicit val ec: ExecutionContext = ExecutionContext.global + Future { + registerPartition() + } + } else { + registerPartition() + } + } + + private def extractDbName(fullTableName: String, config: CatalogConfig): String = { + val parts = fullTableName.split("\\.") + if (parts.length == 2) parts(0) else config.database + } + + private def extractTableName(fullTableName: String): String = { + val parts = fullTableName.split("\\.") + if (parts.length == 2) parts(1) else fullTableName + } } diff --git a/vine-spark/src/main/scala/io/kination/vine/VineStreamingWriter.scala b/vine-spark/src/main/scala/io/kination/vine/VineStreamingWriter.scala index 424e057..845f8eb 100644 --- a/vine-spark/src/main/scala/io/kination/vine/VineStreamingWriter.scala +++ b/vine-spark/src/main/scala/io/kination/vine/VineStreamingWriter.scala @@ -1,19 +1,37 @@ package io.kination.vine +import io.kination.vine.catalog.{CatalogConfig, HiveMetastoreClient} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types.StructType +import java.nio.file.{Files, Paths} +import java.time.LocalDate +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + /** - * Streaming writer for incremental data ingestion to Vine tables. + * Streaming writer for incremental data ingestion * - * Optimized for 'continuous data streams' where batches arrive over time. + * Optimized on 'continuous data streams' where batches arrive over time. * Supports explicit control over flushing and file rotation. + * + * @param path Base path to Vine table + * @param catalogConfig Optional Hive Metastore catalog configuration + * @param tableName Optional table name for HMS registration + * */ -class VineStreamingWriter(path: String) extends AutoCloseable { +class VineStreamingWriter( + path: String, + catalogConfig: Option[CatalogConfig] = None, + tableName: Option[String] = None +) extends AutoCloseable { private val writerId: Long = VineModule.createStreamingWriter(path) private var closed = false + private var lastSchema: Option[StructType] = None + private val writtenPartitions = mutable.Set[String]() /** * Append DataFrame batch to stream using Arrow IPC format. @@ -38,22 +56,33 @@ class VineStreamingWriter(path: String) extends AutoCloseable { ensureOpen() if (rows.isEmpty) return + // Track schema for catalog registration + if (lastSchema.isEmpty) { + lastSchema = Some(schema) + } + val arrowBytes = VineArrowBridge.rowsToArrowIpc(rows, schema) VineModule.streamingAppendBatchArrow(writerId, arrowBytes) } /** * Flush pending writes. - * Closes current file and opens new file on next write. + * Close current file, and open new file on next write. * * Call this periodically to: * - Control file size * - Make data visible to readers * - Checkpoint progress + * + * If catalog is configured, this will also register partitions in HMS. + * */ def flush(): Unit = { ensureOpen() VineModule.streamingFlush(writerId) + + // Register partitions in catalog after flush + registerPendingPartitions() } /** @@ -61,14 +90,86 @@ class VineStreamingWriter(path: String) extends AutoCloseable { * This must be called after 'writing'. * * After closing, the writer cannot be used anymore. + * If catalog is configured, this will register any pending partitions in HMS. + * */ override def close(): Unit = { if (!closed) { + // Ensure final flush + VineModule.streamingFlush(writerId) + VineModule.streamingClose(writerId) closed = true + + // Register any remaining partitions + registerPendingPartitions() } } + /** + * Register partitions that have been written since last registration. + */ + private def registerPendingPartitions(): Unit = { + (catalogConfig, tableName, lastSchema) match { + case (Some(config), Some(tbl), Some(schema)) => + val currentDate = LocalDate.now().toString + val partitionPath = Paths.get(path, currentDate).toString + + // Only register if partition exists and hasn't been registered yet + if (Files.exists(Paths.get(partitionPath)) && !writtenPartitions.contains(currentDate)) { + writtenPartitions.add(currentDate) + + val registerPartition = () => { + val client = HiveMetastoreClient(config) + try { + // Ensure table is registered + val dbName = extractDbName(tbl, config) + val tblName = extractTableName(tbl) + + if (!client.tableExists(dbName, tblName)) { + client.registerTable(tbl, schema, path) match { + case Success(_) => println(s"Registered table $tbl in HMS") + case Failure(e) => println(s"Failed to register table $tbl: ${e.getMessage}") + } + } + + // Register partition + client.registerPartition(tbl, currentDate, partitionPath) match { + case Success(_) => println(s"Registered partition $currentDate for table $tbl in HMS") + case Failure(e) => println(s"Failed to register partition: ${e.getMessage}") + } + } finally { + client.close() + } + } + + // Register asynchronously if enabled + if (config.enableAsync) { + implicit val ec: ExecutionContext = ExecutionContext.global + Future { + registerPartition() + } + } else { + registerPartition() + } + } + + case _ => + // No catalog configured or no data written yet + () + } + } + + private def extractDbName(fullTableName: String, config: CatalogConfig): String = { + val parts = fullTableName.split("\\.") + if (parts.length == 2) parts(0) else config.database + } + + private def extractTableName(fullTableName: String): String = { + val parts = fullTableName.split("\\.") + if (parts.length == 2) parts(1) else fullTableName + } + /** * Check if writer is still open. */ @@ -83,9 +184,24 @@ class VineStreamingWriter(path: String) extends AutoCloseable { object VineStreamingWriter { /** - * Create streaming writer for the given path. + * Create streaming writer for the given path without catalog integration. */ def apply(path: String): VineStreamingWriter = { - new VineStreamingWriter(path) + new VineStreamingWriter(path, None, None) + } + + /** + * Create streaming writer with Hive Metastore catalog integration. + * + * @param path Base path to Vine table + * @param catalogConfig Catalog configuration + * @param tableName Table name for HMS registration (e.g., "default.events") + */ + def withCatalog( + path: String, + catalogConfig: CatalogConfig, + tableName: String + ): VineStreamingWriter = { + new VineStreamingWriter(path, Some(catalogConfig), Some(tableName)) } }