From 8be17d942d0418b30b9116b0396e1da69d0924a4 Mon Sep 17 00:00:00 2001 From: Javier de la Torre Date: Tue, 17 Mar 2026 23:33:33 +0100 Subject: [PATCH] feat(snowflake): add GeoArrow support for bulk ingestion (GEOGRAPHY/GEOMETRY) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Detect geoarrow.wkb/geoarrow.wkt columns during adbc_insert and create GEOGRAPHY or GEOMETRY columns in Snowflake, with automatic WKB→geo conversion and SRID support. How it works: 1. Bulk ingest loads data as BINARY via existing Parquet→PUT→COPY INTO 2. After COPY, geoarrow columns are detected from Arrow field metadata (ARROW:extension:name) and converted via CTAS with TO_GEOGRAPHY or TO_GEOMETRY. SRID is extracted from geoarrow CRS metadata (PROJJSON or "EPSG:NNNN") and applied via ST_SETSRID for GEOMETRY columns. The CTAS post-processing is needed because Snowflake's COPY INTO from Parquet cannot load WKB directly into GEOGRAPHY/GEOMETRY columns — only CSV and JSON/AVRO support direct geospatial loading from stages. See: https://docs.snowflake.com/en/sql-reference/data-types-geospatial#loading-geospatial-data-from-stages New statement option: - adbc.snowflake.statement.ingest_geo_type: "geography" (default) or "geometry". GEOGRAPHY is WGS84/SRID 4326; GEOMETRY supports any SRID. Benchmarked with Czech Republic OSM Geofabrik data against Snowflake: - Points (465K): 38,119 rows/sec - LineStrings (1.9M): 56,804 rows/sec - Polygons (5M): 68,611 rows/sec Co-Authored-By: Claude Opus 4.6 (1M context) --- go/adbc/driver/snowflake/bulk_ingestion.go | 8 + go/adbc/driver/snowflake/statement.go | 229 ++++++++++++++++++++- go/adbc/driver/snowflake/statement_test.go | 162 +++++++++++++++ 3 files changed, 392 insertions(+), 7 deletions(-) create mode 100644 go/adbc/driver/snowflake/statement_test.go diff --git a/go/adbc/driver/snowflake/bulk_ingestion.go b/go/adbc/driver/snowflake/bulk_ingestion.go index 1dfd4d6b44..07c177f566 100644 --- a/go/adbc/driver/snowflake/bulk_ingestion.go +++ b/go/adbc/driver/snowflake/bulk_ingestion.go @@ -117,6 +117,13 @@ type ingestOptions struct { // // Default is true. vectorizedScanner bool + // Snowflake type to use for geoarrow columns (geoarrow.wkb, geoarrow.wkt). + // + // Valid values are "geography" (default) and "geometry". + // GEOGRAPHY is always WGS84 (SRID 4326). GEOMETRY supports any SRID; + // the SRID is extracted from geoarrow extension metadata and applied + // via ST_SETSRID after COPY INTO. + geoType string } func DefaultIngestOptions() *ingestOptions { @@ -128,6 +135,7 @@ func DefaultIngestOptions() *ingestOptions { compressionCodec: defaultCompressionCodec, compressionLevel: defaultCompressionLevel, vectorizedScanner: defaultVectorizedScanner, + geoType: "geography", } } diff --git a/go/adbc/driver/snowflake/statement.go b/go/adbc/driver/snowflake/statement.go index 65a60c8b9e..759cee5138 100644 --- a/go/adbc/driver/snowflake/statement.go +++ b/go/adbc/driver/snowflake/statement.go @@ -20,6 +20,7 @@ package snowflake import ( "context" "database/sql/driver" + "encoding/json" "fmt" "io" "strconv" @@ -47,6 +48,11 @@ const ( OptionStatementIngestCompressionCodec = "adbc.snowflake.statement.ingest_compression_codec" // TODO(GH-1473): Implement option OptionStatementIngestCompressionLevel = "adbc.snowflake.statement.ingest_compression_level" // TODO(GH-1473): Implement option OptionStatementVectorizedScanner = "adbc.snowflake.statement.ingest_use_vectorized_scanner" + // OptionStatementIngestGeoType controls the Snowflake type created for + // columns with geoarrow extension types (geoarrow.wkb, geoarrow.wkt). + // Valid values are "geography" (default) and "geometry". + // GEOGRAPHY is always WGS84 (SRID 4326). GEOMETRY supports any SRID. + OptionStatementIngestGeoType = "adbc.snowflake.statement.ingest_geo_type" ) type statement struct { @@ -239,6 +245,17 @@ func (st *statement) SetOption(key string, val string) error { } st.ingestOptions.vectorizedScanner = vectorized return nil + case OptionStatementIngestGeoType: + switch strings.ToLower(val) { + case "geography", "geometry": + st.ingestOptions.geoType = strings.ToLower(val) + default: + return adbc.Error{ + Msg: fmt.Sprintf("[Snowflake] invalid geo type '%s': must be 'geography' or 'geometry'", val), + Code: adbc.StatusInvalidArgument, + } + } + return nil default: return st.Base().SetOption(key, val) } @@ -343,14 +360,20 @@ func (st *statement) SetSqlQuery(query string) error { return nil } -func toSnowflakeType(dt arrow.DataType) string { +func toSnowflakeType(dt arrow.DataType, geoType string) string { switch dt.ID() { case arrow.EXTENSION: - return toSnowflakeType(dt.(arrow.ExtensionType).StorageType()) + ext := dt.(arrow.ExtensionType) + switch ext.ExtensionName() { + case "geoarrow.wkb", "geoarrow.wkb_view", "geoarrow.wkt", "geoarrow.wkt_view": + return geoType + default: + return toSnowflakeType(ext.StorageType(), geoType) + } case arrow.DICTIONARY: - return toSnowflakeType(dt.(*arrow.DictionaryType).ValueType) + return toSnowflakeType(dt.(*arrow.DictionaryType).ValueType, geoType) case arrow.RUN_END_ENCODED: - return toSnowflakeType(dt.(*arrow.RunEndEncodedType).Encoded()) + return toSnowflakeType(dt.(*arrow.RunEndEncodedType).Encoded(), geoType) case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64, arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64: return "integer" @@ -418,7 +441,7 @@ func (st *statement) initIngest(ctx context.Context) error { createBldr.WriteString(quoteTblName(f.Name)) createBldr.WriteString(" ") - ty := toSnowflakeType(f.Type) + ty := toSnowflakeType(f.Type, st.ingestOptions.geoType) if ty == "" { return adbc.Error{ Msg: fmt.Sprintf("unimplemented type conversion for field %s, arrow type: %s", f.Name, f.Type), @@ -469,16 +492,208 @@ func (st *statement) executeIngest(ctx context.Context) (int64, error) { } } + // Capture schema before ingest (ingestRecord nils st.bound after completion) + var schema *arrow.Schema + if st.bound != nil { + schema = st.bound.Schema() + } else { + schema = st.streamBind.Schema() + } + err := st.initIngest(ctx) if err != nil { return -1, err } + var nrows int64 if st.bound != nil { - return st.ingestRecord(ctx) + nrows, err = st.ingestRecord(ctx) + } else { + nrows, err = st.ingestStream(ctx) + } + if err != nil { + return nrows, err + } + + // Convert geo columns from BINARY/TEXT to GEOGRAPHY/GEOMETRY after COPY INTO. + // Snowflake's COPY INTO cannot load WKB directly into GEOGRAPHY columns, + // so we load as BINARY first, then convert via CTAS. + if err := st.convertGeoColumns(ctx, schema); err != nil { + return nrows, err + } + + return nrows, nil +} + +// convertGeoColumns converts BINARY/TEXT geo columns to GEOGRAPHY/GEOMETRY after COPY INTO. +// +// Snowflake's COPY INTO from Parquet cannot load WKB directly into GEOGRAPHY/GEOMETRY +// columns — only CSV and JSON/AVRO support direct geospatial loading from stages. +// See: https://docs.snowflake.com/en/sql-reference/data-types-geospatial#loading-geospatial-data-from-stages +// +// We work around this with a CTAS pattern: rename the staging table, create the +// final table with TO_GEOGRAPHY/TO_GEOMETRY conversion, then drop staging. +// +// TODO: Investigate using a COPY transform (SELECT ... FROM @stage) to convert +// inline during COPY INTO, which would avoid the rename+CTAS overhead. +func (st *statement) convertGeoColumns(ctx context.Context, schema *arrow.Schema) error { + if schema == nil { + return nil + } + + // Find geo columns from Arrow metadata + type geoCol struct { + name string + extName string + extMeta string + } + var geoCols []geoCol + + for _, f := range schema.Fields() { + var extName, extMeta string + if f.Type.ID() == arrow.EXTENSION { + ext := f.Type.(arrow.ExtensionType) + extName = ext.ExtensionName() + extMeta = ext.Serialize() + } else if name, ok := f.Metadata.GetValue("ARROW:extension:name"); ok { + extName = name + extMeta, _ = f.Metadata.GetValue("ARROW:extension:metadata") + } + + switch extName { + case "geoarrow.wkb", "geoarrow.wkb_view", "geoarrow.wkt", "geoarrow.wkt_view": + geoCols = append(geoCols, geoCol{name: f.Name, extName: extName, extMeta: extMeta}) + } + } + + if len(geoCols) == 0 { + return nil + } + + geoType := st.ingestOptions.geoType + target := quoteTblName(st.targetTable) + staging := quoteTblName(st.targetTable + "_ADBC_STAGING") + + // Rename current table to staging + renameQuery := fmt.Sprintf("ALTER TABLE %s RENAME TO %s", target, staging) + if _, err := st.cnxn.cn.ExecContext(ctx, renameQuery, nil); err != nil { + return errToAdbcErr(adbc.StatusInternal, err) + } + + // Build SELECT with geo conversion + var selectCols []string + for _, f := range schema.Fields() { + isGeo := false + var gc geoCol + for _, g := range geoCols { + if g.name == f.Name { + isGeo = true + gc = g + break + } + } + + quoted := quoteTblName(f.Name) + if !isGeo { + selectCols = append(selectCols, quoted) + continue + } + + // Build conversion expression. + // For WKB: TO_GEOGRAPHY(binary, allow_invalid=true) or TO_GEOMETRY(binary). + // For WKT: TRY_TO_GEOGRAPHY(text) or TO_GEOMETRY(text). + var expr string + isWKB := strings.Contains(gc.extName, "wkb") + if geoType == "geography" { + if isWKB { + expr = fmt.Sprintf("TO_GEOGRAPHY(%s, true) AS %s", quoted, quoted) + } else { + expr = fmt.Sprintf("TRY_TO_GEOGRAPHY(%s) AS %s", quoted, quoted) + } + } else { + srid := extractSRIDFromMeta(gc.extMeta) + if srid != 0 { + expr = fmt.Sprintf("ST_SETSRID(TO_GEOMETRY(%s), %d) AS %s", quoted, srid, quoted) + } else { + expr = fmt.Sprintf("TO_GEOMETRY(%s) AS %s", quoted, quoted) + } + } + selectCols = append(selectCols, expr) + } + + // CTAS with geo conversion + ctasQuery := fmt.Sprintf("CREATE TABLE %s AS SELECT %s FROM %s", + target, strings.Join(selectCols, ", "), staging) + if _, err := st.cnxn.cn.ExecContext(ctx, ctasQuery, nil); err != nil { + // Try to restore the original table name on failure + restoreQuery := fmt.Sprintf("ALTER TABLE %s RENAME TO %s", staging, target) + st.cnxn.cn.ExecContext(ctx, restoreQuery, nil) + return errToAdbcErr(adbc.StatusInternal, err) + } + + // Drop staging table + dropQuery := fmt.Sprintf("DROP TABLE %s", staging) + st.cnxn.cn.ExecContext(ctx, dropQuery, nil) + + return nil +} + +// extractSRIDFromMeta extracts the SRID from geoarrow extension metadata string. +// The metadata is a JSON string that may contain a "crs" field. +// Supported formats: +// - PROJJSON: {"crs": {"id": {"authority": "EPSG", "code": 4326}}} +// - Simple string: "EPSG:4326" (as CRS value) +// +// Returns 0 if no SRID can be determined. +func extractSRIDFromMeta(metadata string) int { + if metadata == "" { + return 0 + } + + // Try to parse as JSON with CRS + // Look for "code": NNNN pattern in PROJJSON + // This handles both full PROJJSON and simple {"crs": {"id": {"authority": "EPSG", "code": 4326}}} + type projID struct { + Authority string `json:"authority"` + Code int `json:"code"` + } + type projCRS struct { + ID projID `json:"id"` + } + type geoarrowMeta struct { + CRS json.RawMessage `json:"crs"` + } + + var meta geoarrowMeta + if err := json.Unmarshal([]byte(metadata), &meta); err != nil { + return 0 + } + + if len(meta.CRS) == 0 { + return 0 + } + + // CRS can be a string like "EPSG:4326" or a PROJJSON object + var crsStr string + if err := json.Unmarshal(meta.CRS, &crsStr); err == nil { + // Simple string format: "EPSG:NNNN" + if strings.HasPrefix(crsStr, "EPSG:") { + if code, err := strconv.Atoi(crsStr[5:]); err == nil { + return code + } + } + return 0 + } + + // Try PROJJSON object + var crs projCRS + if err := json.Unmarshal(meta.CRS, &crs); err == nil { + if strings.EqualFold(crs.ID.Authority, "EPSG") && crs.ID.Code != 0 { + return crs.ID.Code + } } - return st.ingestStream(ctx) + return 0 } // ExecuteQuery executes the current query or prepared statement diff --git a/go/adbc/driver/snowflake/statement_test.go b/go/adbc/driver/snowflake/statement_test.go new file mode 100644 index 0000000000..f20e8c08f6 --- /dev/null +++ b/go/adbc/driver/snowflake/statement_test.go @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package snowflake + +import ( + "reflect" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/stretchr/testify/assert" +) + +// geoArrowType implements arrow.ExtensionType for testing geoarrow types. +type geoArrowType struct { + arrow.ExtensionBase + name string +} + +func newGeoArrowType(name string, storage arrow.DataType) *geoArrowType { + return &geoArrowType{ + ExtensionBase: arrow.ExtensionBase{Storage: storage}, + name: name, + } +} + +func (g *geoArrowType) ExtensionName() string { return g.name } +func (g *geoArrowType) Serialize() string { return "" } +func (g *geoArrowType) Deserialize(storage arrow.DataType, data string) (arrow.ExtensionType, error) { + return newGeoArrowType(g.name, storage), nil +} +func (g *geoArrowType) ExtensionEquals(other arrow.ExtensionType) bool { + return g.ExtensionName() == other.ExtensionName() +} +func (g *geoArrowType) ArrayType() reflect.Type { + return reflect.TypeOf(array.ExtensionArrayBase{}) +} + +func TestToSnowflakeTypeGeoArrow(t *testing.T) { + tests := []struct { + name string + dt arrow.DataType + geoType string + expected string + }{ + { + name: "geoarrow.wkb defaults to geography", + dt: newGeoArrowType("geoarrow.wkb", arrow.BinaryTypes.Binary), + geoType: "geography", + expected: "geography", + }, + { + name: "geoarrow.wkt defaults to geography", + dt: newGeoArrowType("geoarrow.wkt", arrow.BinaryTypes.String), + geoType: "geography", + expected: "geography", + }, + { + name: "geoarrow.wkb with geometry option", + dt: newGeoArrowType("geoarrow.wkb", arrow.BinaryTypes.Binary), + geoType: "geometry", + expected: "geometry", + }, + { + name: "geoarrow.wkb_view maps to geography", + dt: newGeoArrowType("geoarrow.wkb_view", arrow.BinaryTypes.Binary), + geoType: "geography", + expected: "geography", + }, + { + name: "plain binary stays binary", + dt: arrow.BinaryTypes.Binary, + geoType: "geography", + expected: "binary", + }, + { + name: "plain string stays text", + dt: arrow.BinaryTypes.String, + geoType: "geography", + expected: "text", + }, + { + name: "unknown extension falls through to storage type", + dt: newGeoArrowType("some.other.ext", arrow.BinaryTypes.Binary), + geoType: "geography", + expected: "binary", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := toSnowflakeType(tt.dt, tt.geoType) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestExtractSRIDFromMeta(t *testing.T) { + tests := []struct { + name string + metadata string + expected int + }{ + { + name: "empty metadata", + metadata: "", + expected: 0, + }, + { + name: "PROJJSON with EPSG code", + metadata: `{"crs":{"type":"GeographicCRS","name":"WGS 84","id":{"authority":"EPSG","code":4326}}}`, + expected: 4326, + }, + { + name: "PROJJSON with non-4326 SRID", + metadata: `{"crs":{"type":"ProjectedCRS","name":"ETRS89 / UTM zone 33N","id":{"authority":"EPSG","code":25833}}}`, + expected: 25833, + }, + { + name: "simple EPSG string CRS", + metadata: `{"crs":"EPSG:3857"}`, + expected: 3857, + }, + { + name: "no CRS field", + metadata: `{"edges":"planar"}`, + expected: 0, + }, + { + name: "null CRS", + metadata: `{"crs":null}`, + expected: 0, + }, + { + name: "invalid JSON", + metadata: `not json`, + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := extractSRIDFromMeta(tt.metadata) + assert.Equal(t, tt.expected, result) + }) + } +}