Skip to content

Commit 1defef3

Browse files
metadata constants
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 1bac7ab commit 1defef3

File tree

2 files changed

+339
-0
lines changed

2 files changed

+339
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""
2+
Metadata column mappings for normalizing SEA results to match Thrift backend.
3+
4+
This module defines the column mappings needed to transform SEA metadata
5+
results to use the same column names as the Thrift backend, ensuring
6+
consistency between the two backends.
7+
"""
8+
9+
from typing import Dict, List, NamedTuple, Optional
10+
11+
12+
class ColumnMapping(NamedTuple):
13+
"""Maps SEA column name to Thrift column name."""
14+
15+
sea_name: Optional[str]
16+
thrift_name: str
17+
18+
19+
# Column mappings for each metadata operation
20+
CATALOG_COLUMN_MAPPINGS: List[ColumnMapping] = [
21+
ColumnMapping("catalog", "TABLE_CAT"),
22+
]
23+
24+
SCHEMA_COLUMN_MAPPINGS: List[ColumnMapping] = [
25+
ColumnMapping("databaseName", "TABLE_SCHEM"),
26+
ColumnMapping(None, "TABLE_CATALOG"), # SEA doesn't return this, but Thrift does
27+
]
28+
29+
TABLE_COLUMN_MAPPINGS: List[ColumnMapping] = [
30+
ColumnMapping("catalogName", "TABLE_CAT"),
31+
ColumnMapping("namespace", "TABLE_SCHEM"),
32+
ColumnMapping("tableName", "TABLE_NAME"),
33+
ColumnMapping("tableType", "TABLE_TYPE"),
34+
ColumnMapping("remarks", "REMARKS"),
35+
# Add NULL columns for Thrift compatibility
36+
ColumnMapping(None, "TYPE_CAT"), # Always NULL
37+
ColumnMapping(None, "TYPE_SCHEM"), # Always NULL
38+
ColumnMapping(None, "TYPE_NAME"), # Always NULL
39+
ColumnMapping(None, "SELF_REFERENCING_COL_NAME"), # Always NULL
40+
ColumnMapping(None, "REF_GENERATION"), # Always NULL
41+
]
42+
43+
COLUMN_COLUMN_MAPPINGS: List[ColumnMapping] = [
44+
ColumnMapping("catalogName", "TABLE_CAT"),
45+
ColumnMapping("namespace", "TABLE_SCHEM"),
46+
ColumnMapping("tableName", "TABLE_NAME"),
47+
ColumnMapping("col_name", "COLUMN_NAME"),
48+
ColumnMapping(None, "DATA_TYPE"), # Requires conversion from columnType
49+
ColumnMapping("columnType", "TYPE_NAME"),
50+
ColumnMapping("columnSize", "COLUMN_SIZE"),
51+
ColumnMapping(None, "BUFFER_LENGTH"), # Always NULL
52+
ColumnMapping("decimalDigits", "DECIMAL_DIGITS"),
53+
ColumnMapping("radix", "NUM_PREC_RADIX"),
54+
ColumnMapping(None, "NULLABLE"), # Derived from isNullable
55+
ColumnMapping("remarks", "REMARKS"),
56+
ColumnMapping(None, "COLUMN_DEF"), # Always NULL
57+
ColumnMapping(None, "SQL_DATA_TYPE"), # Always NULL
58+
ColumnMapping(None, "SQL_DATETIME_SUB"), # Always NULL
59+
ColumnMapping(None, "CHAR_OCTET_LENGTH"), # Always NULL
60+
ColumnMapping("ordinalPosition", "ORDINAL_POSITION"),
61+
ColumnMapping("isNullable", "IS_NULLABLE"),
62+
ColumnMapping(None, "SCOPE_CATALOG"), # Always NULL
63+
ColumnMapping(None, "SCOPE_SCHEMA"), # Always NULL
64+
ColumnMapping(None, "SCOPE_TABLE"), # Always NULL
65+
ColumnMapping(None, "SOURCE_DATA_TYPE"), # Always NULL
66+
ColumnMapping("isAutoIncrement", "IS_AUTOINCREMENT"),
67+
ColumnMapping("isGenerated", "IS_GENERATEDCOLUMN"),
68+
]
69+
70+
# Operation to mapping lookup
71+
OPERATION_MAPPINGS: Dict[str, List[ColumnMapping]] = {
72+
"catalogs": CATALOG_COLUMN_MAPPINGS,
73+
"schemas": SCHEMA_COLUMN_MAPPINGS,
74+
"tables": TABLE_COLUMN_MAPPINGS,
75+
"columns": COLUMN_COLUMN_MAPPINGS,
76+
}
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
"""
2+
Metadata normalization for SEA backend results.
3+
4+
This module provides functionality to normalize SEA metadata results
5+
to match the column names and data format expected from the Thrift backend.
6+
"""
7+
8+
import logging
9+
from typing import List, Dict, Any, Optional, Tuple
10+
from databricks.sql.backend.sea.metadata_constants import (
11+
OPERATION_MAPPINGS,
12+
ColumnMapping,
13+
)
14+
from databricks.sql.backend.sea.utils.conversion import SqlType
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
# SQL type codes for metadata compatibility
20+
class TypeCodes:
21+
"""SQL type code constants for DATA_TYPE column values.
22+
23+
These integer codes are used in the DATA_TYPE column of metadata results
24+
to maintain compatibility with the Thrift backend.
25+
"""
26+
27+
TINYINT = -6
28+
SMALLINT = 5
29+
INTEGER = 4
30+
BIGINT = -5
31+
FLOAT = 6
32+
DOUBLE = 8
33+
DECIMAL = 3
34+
BINARY = -2
35+
BOOLEAN = 16
36+
CHAR = 1
37+
VARCHAR = 12
38+
TIMESTAMP = 93
39+
DATE = 91
40+
STRUCT = 2002
41+
ARRAY = 2003
42+
OTHER = 1111
43+
44+
45+
class MetadataNormalizer:
46+
"""Normalizes SEA metadata results to match Thrift backend column names."""
47+
48+
@staticmethod
49+
def normalize_description(description: List[Tuple], operation: str) -> List[Tuple]:
50+
"""
51+
Normalize column description to use Thrift standard names.
52+
53+
Args:
54+
description: Original description from SEA
55+
operation: The metadata operation (catalogs, schemas, tables, columns)
56+
57+
Returns:
58+
Normalized description with Thrift column names
59+
"""
60+
logger.debug(f"normalize_description called with operation: {operation}")
61+
logger.debug(f"Original description: {description}")
62+
63+
mappings = OPERATION_MAPPINGS.get(operation, [])
64+
if not mappings:
65+
logger.debug(f"No mappings found for operation: {operation}")
66+
return description
67+
68+
# Create lookup from SEA names to Thrift names
69+
sea_to_thrift = {
70+
mapping.sea_name: mapping.thrift_name
71+
for mapping in mappings
72+
if mapping.sea_name
73+
}
74+
logger.debug(f"SEA to Thrift mapping: {sea_to_thrift}")
75+
76+
# Create new description with normalized names
77+
normalized_description = []
78+
for col_desc in description:
79+
(
80+
name,
81+
type_code,
82+
display_size,
83+
internal_size,
84+
precision,
85+
scale,
86+
null_ok,
87+
) = col_desc
88+
89+
# Skip columns that don't exist in Thrift for tables operation
90+
if operation == "tables" and name in ["isTemporary", "information"]:
91+
continue
92+
93+
# Map SEA name to Thrift name
94+
thrift_name = sea_to_thrift.get(name, name)
95+
96+
normalized_description.append(
97+
(
98+
thrift_name,
99+
type_code,
100+
display_size,
101+
internal_size,
102+
precision,
103+
scale,
104+
null_ok,
105+
)
106+
)
107+
108+
# Add any missing NULL columns required by Thrift spec
109+
existing_names = {desc[0] for desc in normalized_description}
110+
for mapping in mappings:
111+
if mapping.sea_name is None and mapping.thrift_name not in existing_names:
112+
# Add NULL column
113+
normalized_description.append(
114+
(mapping.thrift_name, "string", None, None, None, None, None)
115+
)
116+
117+
# For tables operation, ensure the columns are in Thrift order
118+
if operation == "tables":
119+
# Define the expected Thrift column order
120+
thrift_order = [
121+
"TABLE_CAT",
122+
"TABLE_SCHEM",
123+
"TABLE_NAME",
124+
"TABLE_TYPE",
125+
"REMARKS",
126+
"TYPE_CAT",
127+
"TYPE_SCHEM",
128+
"TYPE_NAME",
129+
"SELF_REFERENCING_COL_NAME",
130+
"REF_GENERATION",
131+
]
132+
133+
# Create a mapping of column names to their descriptions
134+
desc_map = {desc[0]: desc for desc in normalized_description}
135+
136+
# Rebuild the description in the correct order
137+
ordered_description = []
138+
for col_name in thrift_order:
139+
if col_name in desc_map:
140+
ordered_description.append(desc_map[col_name])
141+
142+
normalized_description = ordered_description
143+
144+
logger.debug(f"Normalized description: {normalized_description}")
145+
return normalized_description
146+
147+
@staticmethod
148+
def normalize_row_data(
149+
rows: List[Dict[str, Any]],
150+
operation: str,
151+
context: Optional[Dict[str, Any]] = None,
152+
) -> List[Dict[str, Any]]:
153+
"""
154+
Normalize row data to use Thrift standard column names.
155+
156+
Args:
157+
rows: Original row data from SEA
158+
operation: The metadata operation (catalogs, schemas, tables, columns)
159+
160+
Returns:
161+
Normalized row data with Thrift column names
162+
"""
163+
logger.debug(f"normalize_row_data called with operation: {operation}")
164+
logger.debug(f"Number of rows to normalize: {len(rows)}")
165+
if rows:
166+
logger.debug(f"First row before normalization: {rows[0]}")
167+
168+
mappings = OPERATION_MAPPINGS.get(operation, [])
169+
if not mappings:
170+
logger.debug(f"No mappings found for operation: {operation}")
171+
return rows
172+
173+
# Create lookup from SEA names to Thrift names
174+
sea_to_thrift = {
175+
mapping.sea_name: mapping.thrift_name
176+
for mapping in mappings
177+
if mapping.sea_name
178+
}
179+
logger.debug(f"SEA to Thrift mapping: {sea_to_thrift}")
180+
181+
normalized_rows = []
182+
for row in rows:
183+
normalized_row = {}
184+
185+
# Map existing columns, but skip columns that don't have a mapping in tables operation
186+
for sea_name, value in row.items():
187+
if operation == "tables" and sea_name in ["isTemporary", "information"]:
188+
# Skip these columns that exist in SEA but not in Thrift
189+
continue
190+
thrift_name = sea_to_thrift.get(sea_name, sea_name)
191+
normalized_row[thrift_name] = value
192+
193+
# Add NULL values for missing columns
194+
for mapping in mappings:
195+
if mapping.sea_name is None:
196+
# Handle special cases that need context
197+
if (
198+
mapping.thrift_name == "TABLE_CATALOG"
199+
and operation == "schemas"
200+
and context
201+
):
202+
# For schemas, populate TABLE_CATALOG with the catalog name from context
203+
normalized_row[mapping.thrift_name] = context.get(
204+
"catalog_name"
205+
)
206+
else:
207+
normalized_row[mapping.thrift_name] = None
208+
elif mapping.thrift_name not in normalized_row:
209+
# Handle special conversions if needed
210+
if mapping.thrift_name == "DATA_TYPE":
211+
# Convert TYPE_NAME to DATA_TYPE code
212+
type_name = row.get("columnType", "")
213+
normalized_row["DATA_TYPE"] = _convert_type_name_to_data_type(
214+
type_name
215+
)
216+
elif mapping.thrift_name == "NULLABLE":
217+
# Convert IS_NULLABLE to NULLABLE code
218+
is_nullable = row.get("isNullable", "")
219+
normalized_row["NULLABLE"] = 1 if is_nullable == "YES" else 0
220+
221+
normalized_rows.append(normalized_row)
222+
223+
if normalized_rows:
224+
logger.debug(f"First row after normalization: {normalized_rows[0]}")
225+
226+
return normalized_rows
227+
228+
229+
def _convert_type_name_to_data_type(type_name: str) -> int:
230+
"""
231+
Convert normalized type name to SQL DATA_TYPE code.
232+
The type_name comes from the schema description's type_code field,
233+
which is already normalized by the SEA backend (see backend.py:324-327).
234+
This leverages the existing normalization rather than duplicating the logic.
235+
"""
236+
# Simple mapping from normalized type names to SQL type codes
237+
# Using SqlType constants for consistency with existing codebase
238+
type_mapping = {
239+
SqlType.BYTE: TypeCodes.TINYINT,
240+
"tinyint": TypeCodes.TINYINT,
241+
SqlType.SHORT: TypeCodes.SMALLINT,
242+
"smallint": TypeCodes.SMALLINT,
243+
SqlType.INT: TypeCodes.INTEGER,
244+
"integer": TypeCodes.INTEGER,
245+
SqlType.LONG: TypeCodes.BIGINT,
246+
"bigint": TypeCodes.BIGINT,
247+
SqlType.FLOAT: TypeCodes.FLOAT,
248+
SqlType.DOUBLE: TypeCodes.DOUBLE,
249+
SqlType.DECIMAL: TypeCodes.DECIMAL,
250+
SqlType.BINARY: TypeCodes.BINARY,
251+
SqlType.BOOLEAN: TypeCodes.BOOLEAN,
252+
SqlType.CHAR: TypeCodes.CHAR,
253+
SqlType.STRING: TypeCodes.VARCHAR,
254+
"varchar": TypeCodes.VARCHAR,
255+
SqlType.TIMESTAMP: TypeCodes.TIMESTAMP,
256+
SqlType.DATE: TypeCodes.DATE,
257+
SqlType.STRUCT: TypeCodes.STRUCT,
258+
SqlType.ARRAY: TypeCodes.ARRAY,
259+
SqlType.MAP: TypeCodes.VARCHAR, # Maps are represented as VARCHAR in Thrift
260+
SqlType.NULL: TypeCodes.VARCHAR,
261+
}
262+
263+
return type_mapping.get(type_name.lower(), TypeCodes.OTHER)

0 commit comments

Comments
 (0)