diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index cd55015e..a8180952 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -2839,7 +2839,7 @@ def nextset(self) -> Union[bool, None]: def bulkcopy( self, table_name: str, - data: Iterable[Union[Tuple, List]], + data: Iterable[Union[Tuple, "Row"]], batch_size: int = 0, timeout: int = 30, column_mappings: Optional[Union[List[str], List[Tuple[int, str]]]] = None, @@ -2857,11 +2857,13 @@ def bulkcopy( table_name: Target table name (can include schema, e.g., 'dbo.MyTable'). The table must exist and the user must have INSERT permissions. - data: Iterable of tuples or lists containing row data to be inserted. + data: Iterable of tuples or Row objects containing row data to be inserted. + Row objects from fetchone/fetchmany/fetchall are automatically + converted to tuples. Lists and other types are not accepted. Data Format Requirements: - Each element in the iterable represents one row - - Each row should be a tuple or list of column values + - Each row should be a tuple or Row object - Column order must match the target table's column order (by ordinal position), unless column_mappings is specified - The number of values in each row must match the number of columns @@ -3011,11 +3013,33 @@ def bulkcopy( ) pycore_cursor = pycore_connection.cursor() + # Enforce the bulkcopy type contract: only tuple and Row accepted. + # Rust (mssql_py_core) requires native PyTuple via cast::(). + # Row objects from fetch methods are converted using direct _values + # access (4x faster than __iter__). All other types raise TypeError. + def _ensure_tuples(iterable): + it = iter(iterable) + first = next(it, None) + if first is None: + return + if isinstance(first, tuple): + yield first + yield from it + elif isinstance(first, Row): + yield tuple(first._values) + for item in it: + yield tuple(item._values) + else: + raise TypeError( + f"bulkcopy data rows must be tuples or Row objects, " + f"got {type(first).__name__}" + ) + # Call bulkcopy with explicit keyword arguments # The API signature: bulkcopy(table_name, data_source, batch_size=0, timeout=30, ...) result = pycore_cursor.bulkcopy( table_name, - iter(data), + _ensure_tuples(data), batch_size=batch_size, timeout=timeout, column_mappings=column_mappings,