Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ $ cd iceberg-go/cmd/iceberg && go build .
| Check Namespace Exists | X | | | X | X |
| Drop Namespace | X | | | X | X |
| Update Namespace Properties | X | | | X | X |
| Create View | X | | | | |
| List View | X | | | | |
| Drop View | X | | | | |
| Check View Exists | X | | | | |
| Create View | X | | | | X |
| Load View | | | | | X |
| List View | X | | | | X |
| Drop View | X | | | | X |
| Check View Exists | X | | | | X |

### Read/Write Data Support

Expand Down
158 changes: 158 additions & 0 deletions catalog/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
Expand Down Expand Up @@ -251,3 +252,160 @@ func UpdateAndStageTable(ctx context.Context, current *table.Table, ident table.
),
}, nil
}

func CreateViewMetadata(
ctx context.Context,
catalogName string,
nsIdent []string,
schema *iceberg.Schema,
viewSQL string,
loc string,
props iceberg.Properties,
) (metadataLocation string, err error) {
versionId := int64(1)
timestampMs := time.Now().UnixMilli()

viewVersion := struct {
VersionID int64 `json:"version-id"`
TimestampMs int64 `json:"timestamp-ms"`
SchemaID int `json:"schema-id"`
Summary map[string]string `json:"summary"`
Operation string `json:"operation"`
Representations []struct {
Type string `json:"type"`
SQL string `json:"sql"`
Dialect string `json:"dialect"`
} `json:"representations"`
DefaultCatalog string `json:"default-catalog"`
DefaultNamespace []string `json:"default-namespace"`
}{
VersionID: versionId,
TimestampMs: timestampMs,
SchemaID: schema.ID,
Summary: map[string]string{"sql": viewSQL},
Operation: "create",
Representations: []struct {
Type string `json:"type"`
SQL string `json:"sql"`
Dialect string `json:"dialect"`
}{
{Type: "sql", SQL: viewSQL, Dialect: "default"},
},
DefaultCatalog: catalogName,
DefaultNamespace: nsIdent,
}

viewVersionBytes, err := json.Marshal(viewVersion)
if err != nil {
return "", fmt.Errorf("failed to marshal view version: %w", err)
}

if props == nil {
props = iceberg.Properties{}
}
props["view-version"] = string(viewVersionBytes)
props["view-format"] = "iceberg"
props["view-sql"] = viewSQL

metadataLocation = loc + "/metadata/view-" + uuid.New().String() + ".metadata.json"

viewUUID := uuid.New().String()
props["view-uuid"] = viewUUID

viewMetadata := map[string]interface{}{
"view-uuid": viewUUID,
"format-version": 1,
"location": loc,
"schema": schema,
"current-version-id": versionId,
"versions": map[string]interface{}{
"1": viewVersion,
},
"properties": props,
"version-log": []map[string]interface{}{
{
"timestamp-ms": timestampMs,
"version-id": versionId,
},
},
}

viewMetadataBytes, err := json.Marshal(viewMetadata)
if err != nil {
return "", fmt.Errorf("failed to marshal view metadata: %w", err)
}

fs, err := io.LoadFS(ctx, props, metadataLocation)
if err != nil {
return "", fmt.Errorf("failed to load filesystem for view metadata: %w", err)
}

wfs, ok := fs.(io.WriteFileIO)
if !ok {
return "", errors.New("filesystem IO does not support writing")
}

out, err := wfs.Create(metadataLocation)
if err != nil {
return "", fmt.Errorf("failed to create view metadata file: %w", err)
}
defer out.Close()

if _, err := out.Write(viewMetadataBytes); err != nil {
return "", fmt.Errorf("failed to write view metadata: %w", err)
}

return metadataLocation, nil
}

func LoadViewMetadata(ctx context.Context,
props iceberg.Properties,
metadataLocation string,
viewName string,
namespace string,
) (map[string]interface{}, error) {
// Initial metadata with basic information
viewMetadata := map[string]interface{}{
"name": viewName,
"namespace": namespace,
"metadata-location": metadataLocation,
}

// Load the filesystem
fs, err := io.LoadFS(ctx, props, metadataLocation)
if err != nil {
return nil, fmt.Errorf("error loading view metadata: %w", err)
}

// Open the metadata file
inputFile, err := fs.Open(metadataLocation)
if err != nil {
return viewMetadata, fmt.Errorf("error encountered loading view metadata: %w", err)
}
defer inputFile.Close()

// Decode the complete metadata
var fullViewMetadata map[string]interface{}
if err := json.NewDecoder(inputFile).Decode(&fullViewMetadata); err != nil {
return viewMetadata, fmt.Errorf("error encountered decoding view metadata: %w", err)
}

// Update the metadata with name, namespace and location
fullViewMetadata["name"] = viewName
fullViewMetadata["namespace"] = namespace
fullViewMetadata["metadata-location"] = metadataLocation

if props, ok := fullViewMetadata["properties"].(map[string]interface{}); ok {
strProps := make(map[string]string)
for k, v := range props {
if str, ok := v.(string); ok {
strProps[k] = str
} else if vJson, err := json.Marshal(v); err == nil {
strProps[k] = string(vJson)
}
}
fullViewMetadata["properties"] = strProps
}

return fullViewMetadata, nil
}
Loading
Loading