Skip to content

Commit ecedda1

Browse files
committed
iceberg_loader: Simplify batch_metadata
1 parent 1e5ff71 commit ecedda1

File tree

1 file changed

+3
-35
lines changed

1 file changed

+3
-35
lines changed

src/amp/loaders/implementations/iceberg_loader.py

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -406,45 +406,13 @@ def _perform_load_operation(self, iceberg_table: IcebergTable, arrow_table: pa.T
406406

407407
def _get_loader_batch_metadata(self, batch: pa.RecordBatch, duration: float, **kwargs) -> Dict[str, Any]:
408408
"""Get Iceberg-specific metadata for batch operation"""
409-
metadata = {'namespace': self.config.namespace}
410-
411-
# Add partition columns if available
412-
table_name = kwargs.get('table_name')
413-
if table_name and self._table_exists(table_name):
414-
try:
415-
table_info = self.get_table_info(table_name)
416-
metadata['partition_columns'] = table_info.get('partition_columns', [])
417-
except Exception:
418-
metadata['partition_columns'] = []
419-
else:
420-
# For new tables, get partition fields from partition_spec if available
421-
metadata['partition_columns'] = []
422-
423-
return metadata
409+
return {'namespace': self.config.namespace}
424410

425411
def _get_loader_table_metadata(
426-
self, table: pa.Table, duration: float, batch_count: int, **kwargs
412+
self, table: pa.Table, duration: float, batch_count: int, **kwargs
427413
) -> Dict[str, Any]:
428414
"""Get Iceberg-specific metadata for table operation"""
429-
metadata = {'namespace': self.config.namespace}
430-
431-
# Add partition columns if available
432-
table_name = kwargs.get('table_name')
433-
if table_name and self._table_exists(table_name):
434-
try:
435-
table_info = self.get_table_info(table_name)
436-
metadata['partition_columns'] = table_info.get('partition_columns', [])
437-
except Exception:
438-
metadata['partition_columns'] = []
439-
else:
440-
# For new tables, get partition fields from partition_spec if available
441-
metadata['partition_columns'] = []
442-
if self.config.partition_spec and hasattr(self.config.partition_spec, 'fields'):
443-
# partition_spec.fields contains partition field definitions
444-
# We'll extract them during table creation
445-
metadata['partition_columns'] = [] # Will be populated after table creation
446-
447-
return metadata
415+
return {'namespace': self.config.namespace}
448416

449417
def _table_exists(self, table_name: str) -> bool:
450418
"""Check if a table exists"""

0 commit comments

Comments
 (0)