diff --git a/vine-trino/.gitignore b/vine-trino/.gitignore new file mode 100644 index 0000000..5923cf6 --- /dev/null +++ b/vine-trino/.gitignore @@ -0,0 +1,9 @@ +.gradle/ +build/ +!gradle/wrapper/gradle-wrapper.jar +*.class +*.jar +!gradle/wrapper/*.jar +.idea/ +*.iml +out/ diff --git a/vine-trino/README.md b/vine-trino/README.md new file mode 100644 index 0000000..a427211 --- /dev/null +++ b/vine-trino/README.md @@ -0,0 +1,190 @@ +# Vine Trino Connector + +A read-only [Trino](https://trino.io/) connector for querying Vine tables stored in [Vortex](https://github.com/spiraldb/vortex) columnar format (`.vtx`) via standard SQL. + +## Requirements + +- Java 11+ +- Trino 439 +- vine-core native library (requires Rust build) + +## Build + +```bash +# Build vine-core native library (Rust) +cd vine-core && cargo build --release && cd .. + +# Build vine-trino +cd vine-trino +./gradlew clean build # compile + test +./gradlew shadowJar # produce deployable fat JAR +``` + +Build artifacts: +- `build/libs/vine-trino-0.1.0-all.jar` — shadow JAR (bundles Arrow, Jackson with relocated packages) +- `build/libs/vine-trino-0.1.0.jar` — thin JAR + +## Deployment + +### 1. Install the Plugin + +```bash +mkdir -p /plugin/vine/ +cp build/libs/vine-trino-0.1.0-all.jar /plugin/vine/ + +# Copy the native library for your platform: +# macOS +cp ../vine-core/target/release/libvine_core.dylib /plugin/vine/ +# Linux +cp ../vine-core/target/release/libvine_core.so /plugin/vine/ +``` + +### 2. Configure the Catalog + +Create `/etc/catalog/vine.properties`: + +```properties +connector.name=vine +vine.data-dir=/path/to/vine/tables +``` + +### 3. Data Directory Layout + +The path specified by `vine.data-dir` must follow this structure. Each subdirectory with a `vine_meta.json` file is treated as a table. + +``` +/path/to/vine/tables/ +├── events/ +│ ├── vine_meta.json +│ ├── 2024-12-26/ +│ │ ├── data_143025_123456000.vtx +│ │ └── data_150130_789012000.vtx +│ └── 2024-12-27/ +│ └── data_091500_345678000.vtx +└── users/ + ├── vine_meta.json + └── 2024-12-26/ + └── data_100000_000000000.vtx +``` + +### 4. Query + +```sql +-- List tables +SHOW TABLES FROM vine.default; + +-- Inspect schema +DESCRIBE vine.default.events; + +-- Query data +SELECT * FROM vine.default.events; + +SELECT user_id, COUNT(*) AS event_count +FROM vine.default.events +GROUP BY user_id; +``` + +## Architecture + +### Data Flow + +``` +Trino SQL Query + │ + ▼ +VinePlugin ← discovered via ServiceLoader + │ + ▼ +VineConnectorFactory ← reads vine.data-dir config + │ + ▼ +VineConnector + ├─ VineConnectorMetadata vine_meta.json → schema / table / column info + ├─ VineSplitManager one split per table + └─ VineRecordSetProvider + │ + ▼ + VineModule.readDataArrow(path) [JNI → Rust vine-core] + │ + ▼ + Arrow IPC bytes + │ + ▼ + VineArrowConverter Arrow IPC → Object[][] + │ + ▼ + VineRecordCursor row-by-row delivery to Trino +``` + +### Module Structure + +``` +io.kination.vine/ +├── VinePlugin.java Trino Plugin entry point +├── VineConnectorFactory.java Creates Connector from catalog properties +├── VineConnector.java Read-only connector (metadata + splits + record sets) +├── VineTransactionHandle.java Singleton transaction handle +│ +├── VineConnectorMetadata.java Schema discovery (listSchemas, listTables, getColumnHandles) +├── VineMetadata.java vine_meta.json POJO +├── VineMetadataReader.java vine_meta.json parser (Jackson) +├── VineTypeMapping.java Vine type → Trino type mapping +│ +├── VineTableHandle.java Table reference (schema, name, path) +├── VineColumnHandle.java Column reference (name, type, ordinal) +│ +├── VineSplitManager.java Split generation (1 per table) +├── VineSplit.java Split payload (table path) +│ +├── VineRecordSetProvider.java JNI invocation → RecordSet creation +├── VineRecordSet.java Holds Arrow data, produces cursor +├── VineRecordCursor.java Row-by-row cursor consumed by Trino +├── VineArrowConverter.java Arrow IPC → Object[][] conversion +│ +└── VineModule.java JNI bridge (readDataArrow) +``` + +### Type Mapping + +| Vine Type (`vine_meta.json`) | Alias | Trino Type | Arrow Vector | Cursor Method | +|---|---|---|---|---| +| `integer` | `int` | `INTEGER` | IntVector | `getLong()` | +| `long` | `bigint` | `BIGINT` | BigIntVector | `getLong()` | +| `short` | `smallint` | `SMALLINT` | SmallIntVector | `getLong()` | +| `byte` | `tinyint` | `TINYINT` | TinyIntVector | `getLong()` | +| `float` | — | `REAL` | Float4Vector | `getLong()` (float bits) | +| `double` | — | `DOUBLE` | Float8Vector | `getDouble()` | +| `boolean` | `bool` | `BOOLEAN` | BitVector | `getBoolean()` | +| `string` | — | `VARCHAR` | VarCharVector | `getSlice()` | +| `binary` | — | `VARBINARY` | VarBinaryVector | `getSlice()` | +| `date` | — | `DATE` | DateDayVector | `getLong()` | +| `timestamp` | — | `TIMESTAMP(3)` | TimeStampMilliVector | `getLong()` | +| `decimal` | — | `VARCHAR` | VarCharVector | `getSlice()` | + +### vine_meta.json Schema + +```json +{ + "table_name": "events", + "fields": [ + {"id": 1, "name": "user_id", "data_type": "integer", "is_required": true}, + {"id": 2, "name": "event_type", "data_type": "string", "is_required": false}, + {"id": 3, "name": "timestamp", "data_type": "long", "is_required": true} + ] +} +``` + +## Limitations + +- **Read-only** — only `SELECT` queries are supported; `INSERT`, `UPDATE`, and `DELETE` are not implemented. +- **Single split per table** — the entire table is read in one pass, which may cause high memory usage for large datasets. +- **Single schema** — all tables reside under the `default` schema. +- **No partition pruning** — all date partitions are read regardless of query predicates. + +## Roadmap + +- Per-partition splits for parallel reads +- Partition pruning based on `WHERE` clause predicates +- Predicate pushdown +- Column pruning (project only required columns) +- Hive Metastore (HMS) integration diff --git a/vine-trino/build.gradle b/vine-trino/build.gradle new file mode 100644 index 0000000..b9aacc9 --- /dev/null +++ b/vine-trino/build.gradle @@ -0,0 +1,91 @@ +plugins { + id 'java' + id 'com.github.johnrengelman.shadow' version '8.1.1' +} + +group = 'io.kination.vine' +version = '0.1.0' + +java { + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 +} + +repositories { + mavenCentral() +} + +ext { + trinoVersion = '439' + arrowVersion = '14.0.2' +} + +dependencies { + // Trino SPI (provided at runtime by Trino) + compileOnly "io.trino:trino-spi:${trinoVersion}" + compileOnly "io.airlift:slice:2.1" + + // Jackson for vine_meta.json parsing + implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.3' + + // Apache Arrow for reading JNI data + implementation "org.apache.arrow:arrow-vector:${arrowVersion}" + implementation "org.apache.arrow:arrow-memory-netty:${arrowVersion}" + + // Logging + implementation 'org.slf4j:slf4j-api:2.0.9' + + // Test dependencies + testImplementation "io.trino:trino-spi:${trinoVersion}" + testImplementation "io.airlift:slice:2.1" + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' +} + +test { + useJUnitPlatform() + + // Set native library path for tests + systemProperty 'java.library.path', "${projectDir}/../vine-core/target/release" + + testLogging { + events "passed", "skipped", "failed" + showStandardStreams = true + } +} + +shadowJar { + archiveBaseName.set('vine-trino') + archiveClassifier.set('all') + + // Relocate to avoid conflicts with Trino's bundled libraries + relocate 'org.apache.arrow', 'io.kination.vine.shaded.arrow' + relocate 'com.fasterxml.jackson', 'io.kination.vine.shaded.jackson' + + // Exclude Trino dependencies (provided at runtime) + dependencies { + exclude(dependency('io.trino:.*')) + exclude(dependency('io.airlift:.*')) + } + + mergeServiceFiles() +} + +tasks.named('jar') { + manifest { + attributes( + 'Implementation-Title': 'Vine Trino Connector', + 'Implementation-Version': version + ) + } +} + +// Task to copy native library for local testing +tasks.register('copyNativeLib', Copy) { + from "${projectDir}/../vine-core/target/release" + into "${buildDir}/native" + include "libvine_core.*" + include "vine_core.dll" +} + +test.dependsOn copyNativeLib diff --git a/vine-trino/settings.gradle b/vine-trino/settings.gradle new file mode 100644 index 0000000..b16ec3c --- /dev/null +++ b/vine-trino/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'vine-trino' diff --git a/vine-trino/src/main/java/io/kination/vine/VineArrowConverter.java b/vine-trino/src/main/java/io/kination/vine/VineArrowConverter.java new file mode 100644 index 0000000..b9013ba --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineArrowConverter.java @@ -0,0 +1,123 @@ +package io.kination.vine; + +import io.trino.spi.type.Type; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; + +/** + * Converts Arrow IPC bytes (from vine-core JNI) to row-oriented data for Trino's RecordCursor. + */ +public class VineArrowConverter { + + private static final BufferAllocator ALLOCATOR = new RootAllocator(); + + /** + * Decode Arrow IPC bytes into 'row oriented' Object array. + * + * @param arrowData Arrow IPC bytes from VineModule.readDataArrow() + * @param columns Requested columns with ordinal positions and types + * @return Object[row][col] with Trino-compatible typed values + */ + public static Object[][] arrowToRows(byte[] arrowData, List columns) throws IOException { + if (arrowData == null || arrowData.length == 0) { + return new Object[0][]; + } + + BufferAllocator childAllocator = ALLOCATOR.newChildAllocator("arrow-to-rows", 0, Long.MAX_VALUE); + try (ByteArrayInputStream bais = new ByteArrayInputStream(arrowData); + ArrowStreamReader reader = new ArrowStreamReader(bais, childAllocator)) { + + List rows = new ArrayList<>(); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + + while (reader.loadNextBatch()) { + int rowCount = root.getRowCount(); + List allVectors = root.getFieldVectors(); + + IntStream.range(0, rowCount) + .mapToObj(i -> { + Object[] row = new Object[columns.size()]; + for (int c = 0; c < columns.size(); c++) { + VineColumnHandle col = columns.get(c); + int ordinal = col.getOrdinalPosition(); + if (ordinal < allVectors.size()) { + row[c] = extractValue(allVectors.get(ordinal), i, col.getType()); + } + } + return row; + }) + .forEach(rows::add); + } + + return rows.toArray(new Object[0][]); + } finally { + childAllocator.close(); + } + } + + /** + * Extract value from Arrow vector, converting to 'Trino compatible' Java types. + * + * Trino types: + * - TINYINT, SMALLINT, INTEGER, BIGINT, DATE -> long (via getLong) + * - REAL -> long (Float.floatToRawIntBits, via getLong) + * - DOUBLE -> double (via getDouble) + * - BOOLEAN -> boolean (via getBoolean) + * - VARCHAR -> String (converted to Slice in cursor) + * - VARBINARY -> byte[] (converted to Slice in cursor) + * - TIMESTAMP -> long (millis, via getLong) + */ + private static Object extractValue(FieldVector vector, int index, Type trinoType) { + if (vector.isNull(index)) { + return null; + } + + switch (vector.getMinorType()) { + case TINYINT: + return (long) ((TinyIntVector) vector).get(index); + case SMALLINT: + return (long) ((SmallIntVector) vector).get(index); + case INT: + return (long) ((IntVector) vector).get(index); + case BIGINT: + return ((BigIntVector) vector).get(index); + case FLOAT4: + return (long) Float.floatToRawIntBits(((Float4Vector) vector).get(index)); + case FLOAT8: + return ((Float8Vector) vector).get(index); + case BIT: + return ((BitVector) vector).get(index) == 1; + case VARCHAR: + return new String(((VarCharVector) vector).get(index)); + case VARBINARY: + return ((VarBinaryVector) vector).get(index); + case DATEDAY: + return (long) ((DateDayVector) vector).get(index); + case TIMESTAMPMILLI: + return ((TimeStampMilliVector) vector).get(index); + default: + Object obj = vector.getObject(index); + return obj != null ? obj.toString() : null; + } + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineColumnHandle.java b/vine-trino/src/main/java/io/kination/vine/VineColumnHandle.java new file mode 100644 index 0000000..afa87a3 --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineColumnHandle.java @@ -0,0 +1,63 @@ +package io.kination.vine; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.type.Type; + +import java.util.Objects; + +/** + * Handle representing column in Vine table. + */ +public class VineColumnHandle implements ColumnHandle { + + private final String name; + private final Type type; + private final int ordinalPosition; + + @JsonCreator + public VineColumnHandle( + @JsonProperty("name") String name, + @JsonProperty("type") Type type, + @JsonProperty("ordinalPosition") int ordinalPosition) { + this.name = name; + this.type = type; + this.ordinalPosition = ordinalPosition; + } + + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public Type getType() { + return type; + } + + @JsonProperty + public int getOrdinalPosition() { + return ordinalPosition; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VineColumnHandle that = (VineColumnHandle) o; + return ordinalPosition == that.ordinalPosition + && Objects.equals(name, that.name) + && Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, ordinalPosition); + } + + @Override + public String toString() { + return name + ":" + type; + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineConnector.java b/vine-trino/src/main/java/io/kination/vine/VineConnector.java new file mode 100644 index 0000000..862a047 --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineConnector.java @@ -0,0 +1,47 @@ +package io.kination.vine; + +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorRecordSetProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.transaction.IsolationLevel; + +/** + * Read-only Vine connector for Trino. + * This connector allows Trino to query Vine tables by reading Arrow IPC data produced by vine-core. + */ +public class VineConnector implements Connector { + + private final VineConnectorMetadata metadata; + private final VineSplitManager splitManager; + private final VineRecordSetProvider recordSetProvider; + + public VineConnector(String dataDir) { + this.metadata = new VineConnectorMetadata(dataDir); + this.splitManager = new VineSplitManager(); + this.recordSetProvider = new VineRecordSetProvider(); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, + boolean readOnly, boolean autoCommit) { + return VineTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transaction) { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() { + return splitManager; + } + + @Override + public ConnectorRecordSetProvider getRecordSetProvider() { + return recordSetProvider; + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineConnectorFactory.java b/vine-trino/src/main/java/io/kination/vine/VineConnectorFactory.java new file mode 100644 index 0000000..8ae1b53 --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineConnectorFactory.java @@ -0,0 +1,33 @@ +package io.kination.vine; + +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorContext; +import io.trino.spi.connector.ConnectorFactory; + +import java.util.Map; + +/** + * VineConnectorFactory: To create 'VineConnector' instances from catalog configuration. + * + * Catalog config (etc/catalog/vine.properties): + *
+ * connector.name=vine
+ * vine.data-dir=/path/to/vine/tables
+ * 
+ */ +public class VineConnectorFactory implements ConnectorFactory { + + @Override + public String getName() { + return "vine"; + } + + @Override + public Connector create(String catalogName, Map config, ConnectorContext context) { + String dataDir = config.get("vine.data-dir"); + if (dataDir == null || dataDir.isEmpty()) { + throw new IllegalArgumentException("vine.data-dir must be set in catalog configuration"); + } + return new VineConnector(dataDir); + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineConnectorMetadata.java b/vine-trino/src/main/java/io/kination/vine/VineConnectorMetadata.java new file mode 100644 index 0000000..2b1f001 --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineConnectorMetadata.java @@ -0,0 +1,138 @@ +package io.kination.vine; + +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SchemaTablePrefix; + +import java.io.File; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Provides schema discovery and metadata + * Lists tables by scanning data directory for subdirectories containing vine_meta.json. + * + * Directory layout: + *
+ * vine.data-dir/
+ *   table_one/vine_meta.json
+ *   table_two/vine_meta.json
+ * 
+ */ +public class VineConnectorMetadata implements ConnectorMetadata { + + private static final String DEFAULT_SCHEMA = "default"; + + private final String dataDir; + + public VineConnectorMetadata(String dataDir) { + this.dataDir = dataDir; + } + + @Override + public List listSchemaNames(ConnectorSession session) { + return List.of(DEFAULT_SCHEMA); + } + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { + if (!DEFAULT_SCHEMA.equals(tableName.getSchemaName())) { + return null; + } + String tablePath = dataDir + File.separator + tableName.getTableName(); + if (!VineMetadataReader.hasMetadata(tablePath)) { + return null; + } + + return new VineTableHandle(tableName.getSchemaName(), tableName.getTableName(), tablePath); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { + VineTableHandle vineTable = (VineTableHandle) table; + List columns = getColumns(vineTable); + + return new ConnectorTableMetadata(vineTable.toSchemaTableName(), columns); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { + VineTableHandle vineTable = (VineTableHandle) tableHandle; + VineMetadata meta = VineMetadataReader.read(vineTable.getTablePath()); + + List fields = meta.getFields(); + + return IntStream.range(0, fields.size()) + .boxed() + .collect(Collectors.toMap( + i -> fields.get(i).getName(), + i -> (ColumnHandle) new VineColumnHandle( + fields.get(i).getName(), + VineTypeMapping.toTrinoType(fields.get(i).getDataType()), + i), + (a, b) -> a, + LinkedHashMap::new)); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, + ConnectorTableHandle tableHandle, + ColumnHandle columnHandle) { + VineColumnHandle col = (VineColumnHandle) columnHandle; + return new ColumnMetadata(col.getName(), col.getType()); + } + + @Override + public List listTables(ConnectorSession session, Optional schemaName) { + if (schemaName.isPresent() && !DEFAULT_SCHEMA.equals(schemaName.get())) { + return List.of(); + } + + List tables = new ArrayList<>(); + File dir = new File(dataDir); + if (dir.isDirectory()) { + File[] children = dir.listFiles(); + if (children != null) { + for (File child : children) { + if (child.isDirectory() && VineMetadataReader.hasMetadata(child.getAbsolutePath())) { + tables.add(new SchemaTableName(DEFAULT_SCHEMA, child.getName())); + } + } + } + } + return tables; + } + + @Override + public Map> listTableColumns( + ConnectorSession session, SchemaTablePrefix prefix) { + Map> result = new LinkedHashMap<>(); + for (SchemaTableName tableName : listTables(session, Optional.of(prefix.getSchema().orElse(DEFAULT_SCHEMA)))) { + if (prefix.getTable().isPresent() && !prefix.getTable().get().equals(tableName.getTableName())) { + continue; + } + ConnectorTableHandle handle = getTableHandle(session, tableName); + if (handle != null) { + result.put(tableName, getColumns((VineTableHandle) handle)); + } + } + return result; + } + + private List getColumns(VineTableHandle tableHandle) { + VineMetadata meta = VineMetadataReader.read(tableHandle.getTablePath()); + return meta.getFields().stream() + .map(f -> new ColumnMetadata(f.getName(), VineTypeMapping.toTrinoType(f.getDataType()))) + .collect(Collectors.toList()); + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineMetadata.java b/vine-trino/src/main/java/io/kination/vine/VineMetadata.java new file mode 100644 index 0000000..28cc4be --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineMetadata.java @@ -0,0 +1,67 @@ +package io.kination.vine; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * POJO representing vine_meta.json schema definition. + * This class is used to deserialize vine_meta.json. + */ +public class VineMetadata { + + private final String tableName; + private final List fields; + + @JsonCreator + public VineMetadata( + @JsonProperty("table_name") String tableName, + @JsonProperty("fields") List fields) { + this.tableName = tableName; + this.fields = fields; + } + + public String getTableName() { + return tableName; + } + + public List getFields() { + return fields; + } + + public static class Field { + private final int id; + private final String name; + private final String dataType; + private final boolean isRequired; + + @JsonCreator + public Field( + @JsonProperty("id") int id, + @JsonProperty("name") String name, + @JsonProperty("data_type") String dataType, + @JsonProperty("is_required") boolean isRequired) { + this.id = id; + this.name = name; + this.dataType = dataType; + this.isRequired = isRequired; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getDataType() { + return dataType; + } + + public boolean isRequired() { + return isRequired; + } + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineMetadataReader.java b/vine-trino/src/main/java/io/kination/vine/VineMetadataReader.java new file mode 100644 index 0000000..e57fb8b --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineMetadataReader.java @@ -0,0 +1,32 @@ +package io.kination.vine; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; + +/** + * Reads and parses vine_meta.json from a Vine table directory. + */ +public class VineMetadataReader { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String META_FILE_NAME = "vine_meta.json"; + + public static VineMetadata read(String tablePath) { + File metaFile = new File(tablePath, META_FILE_NAME); + if (!metaFile.exists()) { + throw new IllegalArgumentException("vine_meta.json not found at: " + tablePath); + } + try { + return MAPPER.readValue(metaFile, VineMetadata.class); + } catch (IOException e) { + throw new UncheckedIOException("Failed to read vine_meta.json at: " + tablePath, e); + } + } + + public static boolean hasMetadata(String tablePath) { + return new File(tablePath, META_FILE_NAME).exists(); + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineModule.java b/vine-trino/src/main/java/io/kination/vine/VineModule.java new file mode 100644 index 0000000..915816d --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineModule.java @@ -0,0 +1,45 @@ +package io.kination.vine; + +public class VineModule { + static { + loadNativeLibrary(); + } + + private static void loadNativeLibrary() { + String os = System.getProperty("os.name").toLowerCase(); + String libName; + String libExtension; + + if (os.contains("mac") || os.contains("darwin")) { + libName = "libvine_core"; + libExtension = ".dylib"; + } else if (os.contains("win")) { + libName = "vine_core"; + libExtension = ".dll"; + } else { + libName = "libvine_core"; + libExtension = ".so"; + } + + String fullLibName = libName + libExtension; + + try { + System.loadLibrary("vine_core"); + System.out.println("[VineTrino] Loaded native library from java.library.path"); + return; + } catch (UnsatisfiedLinkError e) { + throw new UnsatisfiedLinkError( + "Failed to load native library -> " + fullLibName + + ". Ensure vine-core is built: cd vine-core && cargo build --release" + ); + } + } + + /** + * Read data from Vine table using Arrow IPC format. + * + * @param path Directory path to Vine table (must contain vine_meta.json) + * @return Arrow IPC stream bytes containing RecordBatch data + */ + public static native byte[] readDataArrow(String path); +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineSplit.java b/vine-trino/src/main/java/io/kination/vine/VineSplit.java new file mode 100644 index 0000000..d73425f --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineSplit.java @@ -0,0 +1,43 @@ +package io.kination.vine; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.HostAddress; +import io.trino.spi.connector.ConnectorSplit; + +import java.util.List; +import java.util.Map; + +/** + * Represents a unit of work for reading a Vine table. + * Currently one split per table (reads all date partitions at once via JNI). + */ +public class VineSplit implements ConnectorSplit { + + private final String tablePath; + + @JsonCreator + public VineSplit(@JsonProperty("tablePath") String tablePath) { + this.tablePath = tablePath; + } + + @JsonProperty + public String getTablePath() { + return tablePath; + } + + @Override + public boolean isRemotelyAccessible() { + return false; + } + + @Override + public List getAddresses() { + return List.of(); + } + + @Override + public Object getInfo() { + return Map.of("tablePath", tablePath); + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineSplitManager.java b/vine-trino/src/main/java/io/kination/vine/VineSplitManager.java new file mode 100644 index 0000000..7aeb407 --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineSplitManager.java @@ -0,0 +1,33 @@ +package io.kination.vine; + +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.connector.FixedSplitSource; + +import java.util.List; + +/** + * Split manager for Vine tables. + * Currently generates 'single split' per table. + * vine-core's readDataArrow reads all date partitions at once. + */ +public class VineSplitManager implements ConnectorSplitManager { + + @Override + public ConnectorSplitSource getSplits( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorTableHandle table, + DynamicFilter dynamicFilter, + Constraint constraint) { + VineTableHandle vineTable = (VineTableHandle) table; + ConnectorSplit split = new VineSplit(vineTable.getTablePath()); + return new FixedSplitSource(List.of(split)); + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineTableHandle.java b/vine-trino/src/main/java/io/kination/vine/VineTableHandle.java new file mode 100644 index 0000000..77c4844 --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineTableHandle.java @@ -0,0 +1,68 @@ +package io.kination.vine; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.SchemaTableName; + +import java.util.Objects; + +/** + * Handle representing Vine table in Trino. + * Contains the filesystem path to table directory. + */ +public class VineTableHandle implements ConnectorTableHandle { + + private final String schemaName; + private final String tableName; + private final String tablePath; + + @JsonCreator + public VineTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("tablePath") String tablePath) { + this.schemaName = schemaName; + this.tableName = tableName; + this.tablePath = tablePath; + } + + @JsonProperty + public String getSchemaName() { + return schemaName; + } + + @JsonProperty + public String getTableName() { + return tableName; + } + + @JsonProperty + public String getTablePath() { + return tablePath; + } + + public SchemaTableName toSchemaTableName() { + return new SchemaTableName(schemaName, tableName); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VineTableHandle that = (VineTableHandle) o; + return Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(tablePath, that.tablePath); + } + + @Override + public int hashCode() { + return Objects.hash(schemaName, tableName, tablePath); + } + + @Override + public String toString() { + return schemaName + "." + tableName; + } +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineTransactionHandle.java b/vine-trino/src/main/java/io/kination/vine/VineTransactionHandle.java new file mode 100644 index 0000000..4227dd7 --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineTransactionHandle.java @@ -0,0 +1,8 @@ +package io.kination.vine; + +import io.trino.spi.connector.ConnectorTransactionHandle; + + +public enum VineTransactionHandle implements ConnectorTransactionHandle { + INSTANCE +} diff --git a/vine-trino/src/main/java/io/kination/vine/VineTypeMapping.java b/vine-trino/src/main/java/io/kination/vine/VineTypeMapping.java new file mode 100644 index 0000000..0c797ed --- /dev/null +++ b/vine-trino/src/main/java/io/kination/vine/VineTypeMapping.java @@ -0,0 +1,56 @@ +package io.kination.vine; + +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; +import io.trino.spi.type.DateType; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.IntegerType; +import io.trino.spi.type.RealType; +import io.trino.spi.type.SmallintType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.TinyintType; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; + +/** + * Maps Vine data types (from vine_meta.json) to Trino types. + */ +public class VineTypeMapping { + + public static Type toTrinoType(String vineType) { + switch (vineType.toLowerCase()) { + case "byte": + case "tinyint": + return TinyintType.TINYINT; + case "short": + case "smallint": + return SmallintType.SMALLINT; + case "integer": + case "int": + return IntegerType.INTEGER; + case "long": + case "bigint": + return BigintType.BIGINT; + case "float": + return RealType.REAL; + case "double": + return DoubleType.DOUBLE; + case "boolean": + case "bool": + return BooleanType.BOOLEAN; + case "string": + return VarcharType.VARCHAR; + case "binary": + return VarbinaryType.VARBINARY; + case "date": + return DateType.DATE; + case "timestamp": + return TimestampType.TIMESTAMP_MILLIS; + case "decimal": + return VarcharType.VARCHAR; + default: + return VarcharType.VARCHAR; + } + } +}