Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go/adbc/driver/snowflake/bulk_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ type ingestOptions struct {
//
// Default is true.
vectorizedScanner bool
// Snowflake type to use for geoarrow columns (geoarrow.wkb, geoarrow.wkt).
//
// Valid values are "geography" (default) and "geometry".
// GEOGRAPHY is always WGS84 (SRID 4326). GEOMETRY supports any SRID;
// the SRID is extracted from geoarrow extension metadata and applied
// via ST_SETSRID after COPY INTO.
geoType string
}

func DefaultIngestOptions() *ingestOptions {
Expand All @@ -128,6 +135,7 @@ func DefaultIngestOptions() *ingestOptions {
compressionCodec: defaultCompressionCodec,
compressionLevel: defaultCompressionLevel,
vectorizedScanner: defaultVectorizedScanner,
geoType: "geography",
}
}

Expand Down
229 changes: 222 additions & 7 deletions go/adbc/driver/snowflake/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package snowflake
import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -47,6 +48,11 @@ const (
OptionStatementIngestCompressionCodec = "adbc.snowflake.statement.ingest_compression_codec" // TODO(GH-1473): Implement option
OptionStatementIngestCompressionLevel = "adbc.snowflake.statement.ingest_compression_level" // TODO(GH-1473): Implement option
OptionStatementVectorizedScanner = "adbc.snowflake.statement.ingest_use_vectorized_scanner"
// OptionStatementIngestGeoType controls the Snowflake type created for
// columns with geoarrow extension types (geoarrow.wkb, geoarrow.wkt).
// Valid values are "geography" (default) and "geometry".
// GEOGRAPHY is always WGS84 (SRID 4326). GEOMETRY supports any SRID.
OptionStatementIngestGeoType = "adbc.snowflake.statement.ingest_geo_type"
)

type statement struct {
Expand Down Expand Up @@ -239,6 +245,17 @@ func (st *statement) SetOption(key string, val string) error {
}
st.ingestOptions.vectorizedScanner = vectorized
return nil
case OptionStatementIngestGeoType:
switch strings.ToLower(val) {
case "geography", "geometry":
st.ingestOptions.geoType = strings.ToLower(val)
default:
return adbc.Error{
Msg: fmt.Sprintf("[Snowflake] invalid geo type '%s': must be 'geography' or 'geometry'", val),
Code: adbc.StatusInvalidArgument,
}
}
return nil
default:
return st.Base().SetOption(key, val)
}
Expand Down Expand Up @@ -343,14 +360,20 @@ func (st *statement) SetSqlQuery(query string) error {
return nil
}

func toSnowflakeType(dt arrow.DataType) string {
func toSnowflakeType(dt arrow.DataType, geoType string) string {
switch dt.ID() {
case arrow.EXTENSION:
return toSnowflakeType(dt.(arrow.ExtensionType).StorageType())
ext := dt.(arrow.ExtensionType)
switch ext.ExtensionName() {
case "geoarrow.wkb", "geoarrow.wkb_view", "geoarrow.wkt", "geoarrow.wkt_view":
return geoType
default:
return toSnowflakeType(ext.StorageType(), geoType)
}
case arrow.DICTIONARY:
return toSnowflakeType(dt.(*arrow.DictionaryType).ValueType)
return toSnowflakeType(dt.(*arrow.DictionaryType).ValueType, geoType)
case arrow.RUN_END_ENCODED:
return toSnowflakeType(dt.(*arrow.RunEndEncodedType).Encoded())
return toSnowflakeType(dt.(*arrow.RunEndEncodedType).Encoded(), geoType)
case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64,
arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64:
return "integer"
Expand Down Expand Up @@ -418,7 +441,7 @@ func (st *statement) initIngest(ctx context.Context) error {

createBldr.WriteString(quoteTblName(f.Name))
createBldr.WriteString(" ")
ty := toSnowflakeType(f.Type)
ty := toSnowflakeType(f.Type, st.ingestOptions.geoType)
if ty == "" {
return adbc.Error{
Msg: fmt.Sprintf("unimplemented type conversion for field %s, arrow type: %s", f.Name, f.Type),
Expand Down Expand Up @@ -469,16 +492,208 @@ func (st *statement) executeIngest(ctx context.Context) (int64, error) {
}
}

// Capture schema before ingest (ingestRecord nils st.bound after completion)
var schema *arrow.Schema
if st.bound != nil {
schema = st.bound.Schema()
} else {
schema = st.streamBind.Schema()
}

err := st.initIngest(ctx)
if err != nil {
return -1, err
}

var nrows int64
if st.bound != nil {
return st.ingestRecord(ctx)
nrows, err = st.ingestRecord(ctx)
} else {
nrows, err = st.ingestStream(ctx)
}
if err != nil {
return nrows, err
}

// Convert geo columns from BINARY/TEXT to GEOGRAPHY/GEOMETRY after COPY INTO.
// Snowflake's COPY INTO cannot load WKB directly into GEOGRAPHY columns,
// so we load as BINARY first, then convert via CTAS.
if err := st.convertGeoColumns(ctx, schema); err != nil {
return nrows, err
}

return nrows, nil
}

// convertGeoColumns converts BINARY/TEXT geo columns to GEOGRAPHY/GEOMETRY after COPY INTO.
//
// Snowflake's COPY INTO from Parquet cannot load WKB directly into GEOGRAPHY/GEOMETRY
// columns — only CSV and JSON/AVRO support direct geospatial loading from stages.
// See: https://docs.snowflake.com/en/sql-reference/data-types-geospatial#loading-geospatial-data-from-stages
//
// We work around this with a CTAS pattern: rename the staging table, create the
// final table with TO_GEOGRAPHY/TO_GEOMETRY conversion, then drop staging.
//
// TODO: Investigate using a COPY transform (SELECT ... FROM @stage) to convert
// inline during COPY INTO, which would avoid the rename+CTAS overhead.
func (st *statement) convertGeoColumns(ctx context.Context, schema *arrow.Schema) error {
if schema == nil {
return nil
}

// Find geo columns from Arrow metadata
type geoCol struct {
name string
extName string
extMeta string
}
var geoCols []geoCol

for _, f := range schema.Fields() {
var extName, extMeta string
if f.Type.ID() == arrow.EXTENSION {
ext := f.Type.(arrow.ExtensionType)
extName = ext.ExtensionName()
extMeta = ext.Serialize()
} else if name, ok := f.Metadata.GetValue("ARROW:extension:name"); ok {
extName = name
extMeta, _ = f.Metadata.GetValue("ARROW:extension:metadata")
}

switch extName {
case "geoarrow.wkb", "geoarrow.wkb_view", "geoarrow.wkt", "geoarrow.wkt_view":
geoCols = append(geoCols, geoCol{name: f.Name, extName: extName, extMeta: extMeta})
}
}

if len(geoCols) == 0 {
return nil
}

geoType := st.ingestOptions.geoType
target := quoteTblName(st.targetTable)
staging := quoteTblName(st.targetTable + "_ADBC_STAGING")

// Rename current table to staging
renameQuery := fmt.Sprintf("ALTER TABLE %s RENAME TO %s", target, staging)
if _, err := st.cnxn.cn.ExecContext(ctx, renameQuery, nil); err != nil {
return errToAdbcErr(adbc.StatusInternal, err)
}

// Build SELECT with geo conversion
var selectCols []string
for _, f := range schema.Fields() {
isGeo := false
var gc geoCol
for _, g := range geoCols {
if g.name == f.Name {
isGeo = true
gc = g
break
}
}

quoted := quoteTblName(f.Name)
if !isGeo {
selectCols = append(selectCols, quoted)
continue
}

// Build conversion expression.
// For WKB: TO_GEOGRAPHY(binary, allow_invalid=true) or TO_GEOMETRY(binary).
// For WKT: TRY_TO_GEOGRAPHY(text) or TO_GEOMETRY(text).
var expr string
isWKB := strings.Contains(gc.extName, "wkb")
if geoType == "geography" {
if isWKB {
expr = fmt.Sprintf("TO_GEOGRAPHY(%s, true) AS %s", quoted, quoted)
} else {
expr = fmt.Sprintf("TRY_TO_GEOGRAPHY(%s) AS %s", quoted, quoted)
}
} else {
srid := extractSRIDFromMeta(gc.extMeta)
if srid != 0 {
expr = fmt.Sprintf("ST_SETSRID(TO_GEOMETRY(%s), %d) AS %s", quoted, srid, quoted)
} else {
expr = fmt.Sprintf("TO_GEOMETRY(%s) AS %s", quoted, quoted)
}
}
selectCols = append(selectCols, expr)
}

// CTAS with geo conversion
ctasQuery := fmt.Sprintf("CREATE TABLE %s AS SELECT %s FROM %s",
target, strings.Join(selectCols, ", "), staging)
if _, err := st.cnxn.cn.ExecContext(ctx, ctasQuery, nil); err != nil {
// Try to restore the original table name on failure
restoreQuery := fmt.Sprintf("ALTER TABLE %s RENAME TO %s", staging, target)
st.cnxn.cn.ExecContext(ctx, restoreQuery, nil)
return errToAdbcErr(adbc.StatusInternal, err)
}

// Drop staging table
dropQuery := fmt.Sprintf("DROP TABLE %s", staging)
st.cnxn.cn.ExecContext(ctx, dropQuery, nil)

return nil
}

// extractSRIDFromMeta extracts the SRID from geoarrow extension metadata string.
// The metadata is a JSON string that may contain a "crs" field.
// Supported formats:
// - PROJJSON: {"crs": {"id": {"authority": "EPSG", "code": 4326}}}
// - Simple string: "EPSG:4326" (as CRS value)
//
// Returns 0 if no SRID can be determined.
func extractSRIDFromMeta(metadata string) int {
if metadata == "" {
return 0
}

// Try to parse as JSON with CRS
// Look for "code": NNNN pattern in PROJJSON
// This handles both full PROJJSON and simple {"crs": {"id": {"authority": "EPSG", "code": 4326}}}
type projID struct {
Authority string `json:"authority"`
Code int `json:"code"`
}
type projCRS struct {
ID projID `json:"id"`
}
type geoarrowMeta struct {
CRS json.RawMessage `json:"crs"`
}

var meta geoarrowMeta
if err := json.Unmarshal([]byte(metadata), &meta); err != nil {
return 0
}

if len(meta.CRS) == 0 {
return 0
}

// CRS can be a string like "EPSG:4326" or a PROJJSON object
var crsStr string
if err := json.Unmarshal(meta.CRS, &crsStr); err == nil {
// Simple string format: "EPSG:NNNN"
if strings.HasPrefix(crsStr, "EPSG:") {
if code, err := strconv.Atoi(crsStr[5:]); err == nil {
return code
}
}
return 0
}

// Try PROJJSON object
var crs projCRS
if err := json.Unmarshal(meta.CRS, &crs); err == nil {
if strings.EqualFold(crs.ID.Authority, "EPSG") && crs.ID.Code != 0 {
return crs.ID.Code
}
}

return st.ingestStream(ctx)
return 0
}

// ExecuteQuery executes the current query or prepared statement
Expand Down
Loading
Loading