Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion vine-spark/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,29 @@ Test / javaOptions ++= Seq(
val sparkVersion = "3.4.0"
val arrowVersion = "14.0.2"
val jacksonVersion = "2.14.3" // Downgrade to fix compatibility with Scala module 2.14.2
val hiveVersion = "2.3.9" // Hive Metastore version compatible with Spark 3.4

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.parquet" % "parquet-avro" % "1.12.0",
"org.scalatest" %% "scalatest" % "3.2.17" % Test,
// Apache Arrow for high-performance JNI data transfer
"org.apache.arrow" % "arrow-vector" % arrowVersion,
"org.apache.arrow" % "arrow-memory-netty" % arrowVersion
"org.apache.arrow" % "arrow-memory-netty" % arrowVersion,
// Hive Metastore for catalog integration
"org.apache.hive" % "hive-metastore" % hiveVersion % Provided excludeAll(
ExclusionRule(organization = "org.pentaho"),
ExclusionRule(organization = "org.apache.logging.log4j"),
ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"),
ExclusionRule(organization = "log4j", name = "log4j")
),
"org.apache.hive" % "hive-exec" % hiveVersion % Provided excludeAll(
ExclusionRule(organization = "org.pentaho"),
ExclusionRule(organization = "org.apache.logging.log4j"),
ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"),
ExclusionRule(organization = "log4j", name = "log4j")
),
"org.apache.thrift" % "libthrift" % "0.12.0" % Provided
)

// Force Jackson version downgrade for Spark compatibility
Expand Down
5 changes: 0 additions & 5 deletions vine-spark/src/main/scala/Main.scala

This file was deleted.

130 changes: 128 additions & 2 deletions vine-spark/src/main/scala/io/kination/vine/VineBatchWriter.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package io.kination.vine

import io.kination.vine.catalog.{CatalogConfig, HiveMetastoreClient}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.StructType

import java.nio.file.{Files, Paths}
import java.time.LocalDate
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

/**
* Batch writer for bulk data ingestion.
*/
object VineBatchWriter {

/**
* Write DataFrame using Arrow IPC format.
* Write DataFrame
*
* @param path Directory path to Vine table (must contain vine_meta.json)
* @param df DataFrame to write
Expand All @@ -22,7 +28,36 @@ object VineBatchWriter {
}

/**
* Write collection of rows using Arrow IPC format.
* Write DataFrame with optional Hive Metastore catalog integration.
*
* @param path Directory path to output(vine table)
* @param df DataFrame to write
* @param catalogConfig Optional catalog configuration for HMS integration
* @param tableName Optional table name for HMS registration
*
*/
def writeWithCatalog(
path: String,
df: DataFrame,
catalogConfig: Option[CatalogConfig],
tableName: Option[String] = None
): Unit = {
val rows = df.collect().toSeq
if (rows.isEmpty) return

// Write data
writeRows(path, rows, df.schema)

// Register partition in HMS if catalog is configured
catalogConfig.foreach { config =>
tableName.foreach { tbl =>
registerPartitionInCatalog(path, tbl, df.schema, config)
}
}
}

/**
* Write collection of rows
*
* @param path Directory path to write Vine table
* @param rows Collection of rows
Expand All @@ -34,4 +69,95 @@ object VineBatchWriter {
val arrowBytes = VineArrowBridge.rowsToArrowIpc(rows, schema)
VineModule.batchWriteArrow(path, arrowBytes)
}

/**
* Write collection of rows with catalog integration.
*
* @param path Directory path to write Vine table
* @param rows Collection of rows
* @param schema Schema of the rows
* @param catalogConfig Optional catalog configuration
* @param tableName Optional table name for HMS
*
*/
def writeRowsWithCatalog(
path: String,
rows: Seq[Row],
schema: StructType,
catalogConfig: Option[CatalogConfig],
tableName: Option[String] = None
): Unit = {
if (rows.isEmpty) return

// Write data
writeRows(path, rows, schema)

// Register partition in HMS if catalog is configured
catalogConfig.foreach { config =>
tableName.foreach { tbl =>
registerPartitionInCatalog(path, tbl, schema, config)
}
}
}

/**
* Register newly written partition in Hive Metastore.
* Uses the current date to identify the partition.
*/
private def registerPartitionInCatalog(
basePath: String,
tableName: String,
schema: StructType,
config: CatalogConfig
): Unit = {
val currentDate = LocalDate.now().toString // YYYY-MM-DD
val partitionPath = Paths.get(basePath, currentDate).toString

// Check if partition directory exists
if (!Files.exists(Paths.get(partitionPath))) {
println(s"Partition directory does not exist: $partitionPath, skipping HMS registration")
return
}

val registerPartition = () => {
val client = HiveMetastoreClient(config)
try {
// Ensure table is registered
if (!client.tableExists(extractDbName(tableName, config), extractTableName(tableName))) {
client.registerTable(tableName, schema, basePath) match {
case Success(_) => println(s"Registered table $tableName in HMS")
case Failure(e) => println(s"Failed to register table $tableName: ${e.getMessage}")
}
}

// Register partition
client.registerPartition(tableName, currentDate, partitionPath) match {
case Success(_) => println(s"Registered partition $currentDate for table $tableName in HMS")
case Failure(e) => println(s"Failed to register partition: ${e.getMessage}")
}
} finally {
client.close()
}
}

// Register asynchronously if enabled
if (config.enableAsync) {
implicit val ec: ExecutionContext = ExecutionContext.global
Future {
registerPartition()
}
} else {
registerPartition()
}
}

private def extractDbName(fullTableName: String, config: CatalogConfig): String = {
val parts = fullTableName.split("\\.")
if (parts.length == 2) parts(0) else config.database
}

private def extractTableName(fullTableName: String): String = {
val parts = fullTableName.split("\\.")
if (parts.length == 2) parts(1) else fullTableName
}
}
128 changes: 122 additions & 6 deletions vine-spark/src/main/scala/io/kination/vine/VineStreamingWriter.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
package io.kination.vine

import io.kination.vine.catalog.{CatalogConfig, HiveMetastoreClient}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.StructType

import java.nio.file.{Files, Paths}
import java.time.LocalDate
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}


/**
* Streaming writer for incremental data ingestion to Vine tables.
* Streaming writer for incremental data ingestion
*
* Optimized for 'continuous data streams' where batches arrive over time.
* Optimized on 'continuous data streams' where batches arrive over time.
* Supports explicit control over flushing and file rotation.
*
* @param path Base path to Vine table
* @param catalogConfig Optional Hive Metastore catalog configuration
* @param tableName Optional table name for HMS registration
*
*/
class VineStreamingWriter(path: String) extends AutoCloseable {
class VineStreamingWriter(
path: String,
catalogConfig: Option[CatalogConfig] = None,
tableName: Option[String] = None
) extends AutoCloseable {

private val writerId: Long = VineModule.createStreamingWriter(path)
private var closed = false
private var lastSchema: Option[StructType] = None
private val writtenPartitions = mutable.Set[String]()

/**
* Append DataFrame batch to stream using Arrow IPC format.
Expand All @@ -38,37 +56,120 @@ class VineStreamingWriter(path: String) extends AutoCloseable {
ensureOpen()
if (rows.isEmpty) return

// Track schema for catalog registration
if (lastSchema.isEmpty) {
lastSchema = Some(schema)
}

val arrowBytes = VineArrowBridge.rowsToArrowIpc(rows, schema)
VineModule.streamingAppendBatchArrow(writerId, arrowBytes)
}

/**
* Flush pending writes.
* Closes current file and opens new file on next write.
* Close current file, and open new file on next write.
*
* Call this periodically to:
* - Control file size
* - Make data visible to readers
* - Checkpoint progress
*
* If catalog is configured, this will also register partitions in HMS.
*
*/
def flush(): Unit = {
ensureOpen()
VineModule.streamingFlush(writerId)

// Register partitions in catalog after flush
registerPendingPartitions()
}

/**
* Close the writer and finalize all pending writes.
* This must be called after 'writing'.
*
* After closing, the writer cannot be used anymore.
* If catalog is configured, this will register any pending partitions in HMS.
*
*/
override def close(): Unit = {
if (!closed) {
// Ensure final flush
VineModule.streamingFlush(writerId)

VineModule.streamingClose(writerId)
closed = true

// Register any remaining partitions
registerPendingPartitions()
}
}

/**
* Register partitions that have been written since last registration.
*/
private def registerPendingPartitions(): Unit = {
(catalogConfig, tableName, lastSchema) match {
case (Some(config), Some(tbl), Some(schema)) =>
val currentDate = LocalDate.now().toString
val partitionPath = Paths.get(path, currentDate).toString

// Only register if partition exists and hasn't been registered yet
if (Files.exists(Paths.get(partitionPath)) && !writtenPartitions.contains(currentDate)) {
writtenPartitions.add(currentDate)

val registerPartition = () => {
val client = HiveMetastoreClient(config)
try {
// Ensure table is registered
val dbName = extractDbName(tbl, config)
val tblName = extractTableName(tbl)

if (!client.tableExists(dbName, tblName)) {
client.registerTable(tbl, schema, path) match {
case Success(_) => println(s"Registered table $tbl in HMS")
case Failure(e) => println(s"Failed to register table $tbl: ${e.getMessage}")
}
}

// Register partition
client.registerPartition(tbl, currentDate, partitionPath) match {
case Success(_) => println(s"Registered partition $currentDate for table $tbl in HMS")
case Failure(e) => println(s"Failed to register partition: ${e.getMessage}")
}
} finally {
client.close()
}
}

// Register asynchronously if enabled
if (config.enableAsync) {
implicit val ec: ExecutionContext = ExecutionContext.global
Future {
registerPartition()
}
} else {
registerPartition()
}
}

case _ =>
// No catalog configured or no data written yet
()
}
}

private def extractDbName(fullTableName: String, config: CatalogConfig): String = {
val parts = fullTableName.split("\\.")
if (parts.length == 2) parts(0) else config.database
}

private def extractTableName(fullTableName: String): String = {
val parts = fullTableName.split("\\.")
if (parts.length == 2) parts(1) else fullTableName
}

/**
* Check if writer is still open.
*/
Expand All @@ -83,9 +184,24 @@ class VineStreamingWriter(path: String) extends AutoCloseable {

object VineStreamingWriter {
/**
* Create streaming writer for the given path.
* Create streaming writer for the given path without catalog integration.
*/
def apply(path: String): VineStreamingWriter = {
new VineStreamingWriter(path)
new VineStreamingWriter(path, None, None)
}

/**
* Create streaming writer with Hive Metastore catalog integration.
*
* @param path Base path to Vine table
* @param catalogConfig Catalog configuration
* @param tableName Table name for HMS registration (e.g., "default.events")
*/
def withCatalog(
path: String,
catalogConfig: CatalogConfig,
tableName: String
): VineStreamingWriter = {
new VineStreamingWriter(path, Some(catalogConfig), Some(tableName))
}
}
Loading
Loading