@@ -43,7 +43,7 @@ def __init__(self, *args: t.Any, **kwargs: t.Any):
4343 except Exception as e:
4444 raise RuntimeError(f"Failed to set database context to '{self.database}'. Reason: {e}")
4545
46- def _get_schema_name(self, name: t.Union[str, exp.Table, exp.Identifier ]) -> t.Optional[str]:
46+ def _get_schema_name(self, name: t.Union[str, exp.Table]) -> t.Optional[str]:
4747 """
4848 Safely extracts the schema name from a table or schema name, which can be
4949 a string or a sqlglot expression.
@@ -112,14 +112,31 @@ def _get_data_objects(
112112 catalog=catalog,
113113 schema=row.schema_name,
114114 name=row.name,
115- type=DataObjectType.from_str(row.type),
115+ type=DataObjectType.from_str(str( row.type) ),
116116 )
117117 for row in dataframe.itertuples()
118118 ]
119119
120+ def schema_exists(self, schema_name: SchemaName) -> bool:
121+ """
122+ Checks if a schema exists.
123+ """
124+ schema = exp.to_table(schema_name).db
125+ if not schema:
126+ return False
127+
128+ sql = (
129+ exp.select("1")
130+ .from_("INFORMATION_SCHEMA.SCHEMATA")
131+ .where(f"SCHEMA_NAME = '{schema}'")
132+ .where(f"CATALOG_NAME = '{self.database}'")
133+ )
134+ result = self.fetchone(sql, quote_identifiers=True)
135+ return result[0] == 1 if result else False
136+
120137 def create_schema(
121138 self,
122- schema_name: SchemaName,
139+ schema_name: t.Optional[ SchemaName] ,
123140 ignore_if_exists: bool = True,
124141 warn_on_error: bool = True,
125142 **kwargs: t.Any,
@@ -128,53 +145,51 @@ def create_schema(
128145 Creates a schema in a Microsoft Fabric Warehouse.
129146
130147 Overridden to handle Fabric's specific T-SQL requirements.
131- T-SQL's `CREATE SCHEMA` command does not support `IF NOT EXISTS` directly
132- as part of the statement in all contexts, and error messages suggest
133- issues with batching or preceding statements like USE.
134148 """
135- if schema_name is None :
149+ if not schema_name :
136150 return
137151
138- schema_name_str = (
139- schema_name.name if isinstance(schema_name, exp.Identifier) else str(schema_name)
140- )
141-
142- if not schema_name_str:
143- logger.warning("Attempted to create a schema with an empty name. Skipping.")
144- return
145-
146- schema_name_str = schema_name_str.strip('[]"').rstrip(".")
152+ schema_exp = to_schema(schema_name)
153+ simple_schema_name_str = exp.to_identifier(schema_exp.db).name if schema_exp.db else None
147154
148- if not schema_name_str :
155+ if not simple_schema_name_str :
149156 logger.warning(
150- "Attempted to create a schema with an empty name after sanitization . Skipping."
157+ f"Could not determine simple schema name from '{schema_name}' . Skipping schema creation ."
151158 )
152159 return
153160
154161 try:
155- if self.schema_exists(schema_name_str ):
162+ if self.schema_exists(simple_schema_name_str ):
156163 if ignore_if_exists:
157164 return
158- raise RuntimeError(f"Schema '{schema_name_str }' already exists.")
165+ raise RuntimeError(f"Schema '{simple_schema_name_str }' already exists.")
159166 except Exception as e:
160167 if warn_on_error:
161- logger.warning(f"Failed to check for existence of schema '{schema_name_str}': {e}")
168+ logger.warning(
169+ f"Failed to check for existence of schema '{simple_schema_name_str}': {e}"
170+ )
162171 else:
163172 raise
164173
165174 try:
166- create_sql = f"CREATE SCHEMA [{schema_name_str }]"
175+ create_sql = f"CREATE SCHEMA [{simple_schema_name_str }]"
167176 self.execute(create_sql)
168177 except Exception as e:
169- if "already exists" in str(e).lower() or "There is already an object named" in str(e):
178+ error_message = str(e).lower()
179+ if (
180+ "already exists" in error_message
181+ or "there is already an object named" in error_message
182+ ):
170183 if ignore_if_exists:
171184 return
172- raise RuntimeError(f"Schema '{schema_name_str}' already exists.") from e
185+ raise RuntimeError(
186+ f"Schema '{simple_schema_name_str}' already exists due to race condition."
187+ ) from e
173188 else:
174189 if warn_on_error:
175- logger.warning(f"Failed to create schema {schema_name_str }. Reason: {e}")
190+ logger.warning(f"Failed to create schema {simple_schema_name_str }. Reason: {e}")
176191 else:
177- raise RuntimeError(f"Failed to create schema {schema_name_str }.") from e
192+ raise RuntimeError(f"Failed to create schema {simple_schema_name_str }.") from e
178193
179194 def _create_table_from_columns(
180195 self,
@@ -251,7 +266,7 @@ def _fully_qualify(self, name: t.Union[TableName, SchemaName]) -> exp.Table:
251266 and isinstance(table.this, exp.Identifier)
252267 and (table.this.name.startswith("#"))
253268 ):
254- temp_identifier = exp.Identifier(this=table.this.this , quoted=True)
269+ temp_identifier = exp.Identifier(this=table.this.name , quoted=True)
255270 return exp.Table(this=temp_identifier)
256271
257272 schema = self._get_schema_name(name)
@@ -308,6 +323,8 @@ def create_view(
308323 def columns(
309324 self, table_name: TableName, include_pseudo_columns: bool = False
310325 ) -> t.Dict[str, exp.DataType]:
326+ import numpy as np
327+
311328 table = exp.to_table(table_name)
312329 schema = self._get_schema_name(table_name)
313330
@@ -346,6 +363,7 @@ def columns(
346363 )
347364
348365 df = self.fetchdf(sql)
366+ df = df.replace({np.nan: None})
349367
350368 def build_var_length_col(
351369 column_name: str,
@@ -356,11 +374,9 @@ def build_var_length_col(
356374 ) -> t.Tuple[str, str]:
357375 data_type = data_type.lower()
358376
359- char_len_int = (
360- int(character_maximum_length) if character_maximum_length is not None else None
361- )
362- prec_int = int(numeric_precision) if numeric_precision is not None else None
363- scale_int = int(numeric_scale) if numeric_scale is not None else None
377+ char_len_int = character_maximum_length
378+ prec_int = numeric_precision
379+ scale_int = numeric_scale
364380
365381 if data_type in self.VARIABLE_LENGTH_DATA_TYPES and char_len_int is not None:
366382 if char_len_int > 0:
@@ -378,79 +394,31 @@ def build_var_length_col(
378394
379395 return (column_name, data_type)
380396
381- columns_raw = [
382- (
383- row.COLUMN_NAME,
384- row.DATA_TYPE,
385- getattr(row, "CHARACTER_MAXIMUM_LENGTH", None),
386- getattr(row, "NUMERIC_PRECISION", None),
387- getattr(row, "NUMERIC_SCALE", None),
397+ def _to_optional_int(val: t.Any) -> t.Optional[int]:
398+ """Safely convert DataFrame values to Optional[int] for mypy."""
399+ if val is None:
400+ return None
401+ try:
402+ return int(val)
403+ except (ValueError, TypeError):
404+ return None
405+
406+ columns_processed = [
407+ build_var_length_col(
408+ str(row.COLUMN_NAME),
409+ str(row.DATA_TYPE),
410+ _to_optional_int(row.CHARACTER_MAXIMUM_LENGTH),
411+ _to_optional_int(row.NUMERIC_PRECISION),
412+ _to_optional_int(row.NUMERIC_SCALE),
388413 )
389414 for row in df.itertuples()
390415 ]
391416
392- columns_processed = [build_var_length_col(*row) for row in columns_raw]
393-
394417 return {
395418 column_name: exp.DataType.build(data_type, dialect=self.dialect)
396419 for column_name, data_type in columns_processed
397420 }
398421
399- def create_schema(
400- self,
401- schema_name: SchemaName,
402- ignore_if_exists: bool = True,
403- warn_on_error: bool = True,
404- **kwargs: t.Any,
405- ) -> None:
406- if schema_name is None:
407- return
408-
409- schema_exp = to_schema(schema_name)
410- simple_schema_name_str = None
411- if schema_exp.db:
412- simple_schema_name_str = exp.to_identifier(schema_exp.db).name
413-
414- if not simple_schema_name_str:
415- logger.warning(
416- f"Could not determine simple schema name from '{schema_name}'. Skipping schema creation."
417- )
418- return
419-
420- if ignore_if_exists:
421- try:
422- if self.schema_exists(simple_schema_name_str):
423- return
424- except Exception as e:
425- if warn_on_error:
426- logger.warning(
427- f"Failed to check for existence of schema '{simple_schema_name_str}': {e}"
428- )
429- else:
430- raise
431- elif self.schema_exists(simple_schema_name_str):
432- raise RuntimeError(f"Schema '{simple_schema_name_str}' already exists.")
433-
434- try:
435- create_sql = f"CREATE SCHEMA [{simple_schema_name_str}]"
436- self.execute(create_sql)
437- except Exception as e:
438- error_message = str(e).lower()
439- if (
440- "already exists" in error_message
441- or "there is already an object named" in error_message
442- ):
443- if ignore_if_exists:
444- return
445- raise RuntimeError(
446- f"Schema '{simple_schema_name_str}' already exists due to race condition."
447- ) from e
448- else:
449- if warn_on_error:
450- logger.warning(f"Failed to create schema {simple_schema_name_str}. Reason: {e}")
451- else:
452- raise RuntimeError(f"Failed to create schema {simple_schema_name_str}.") from e
453-
454422 def _insert_overwrite_by_condition(
455423 self,
456424 table_name: TableName,
0 commit comments