diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 9a828011..0efedbb4 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -2335,6 +2335,33 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, const py::list& columnwise_params, bufferLength = sizeof(SQLGUID); break; } + case SQL_C_DEFAULT: { + // Handle NULL parameters - all values in this column should be NULL + LOG("BindParameterArray: Binding SQL_C_DEFAULT (NULL) array - param_index=%d, count=%zu", paramIndex, paramSetSize); + + // Verify all values are indeed NULL + for (size_t i = 0; i < paramSetSize; ++i) { + if (!columnValues[i].is_none()) { + LOG("BindParameterArray: SQL_C_DEFAULT non-NULL value detected - param_index=%d, row=%zu", paramIndex, i); + ThrowStdException("SQL_C_DEFAULT (99) should only be used for NULL parameters at index " + std::to_string(paramIndex)); + } + } + + // For NULL parameters, we need to allocate a minimal buffer and set all indicators to SQL_NULL_DATA + // Use SQL_C_CHAR as a safe default C type for NULL values + char* nullBuffer = AllocateParamBufferArray(tempBuffers, paramSetSize); + strLenOrIndArray = AllocateParamBufferArray(tempBuffers, paramSetSize); + + for (size_t i = 0; i < paramSetSize; ++i) { + nullBuffer[i] = 0; + strLenOrIndArray[i] = SQL_NULL_DATA; + } + + dataPtr = nullBuffer; + bufferLength = 1; + LOG("BindParameterArray: SQL_C_DEFAULT bound - param_index=%d, all_null=true", paramIndex); + break; + } default: { LOG("BindParameterArray: Unsupported C type - " "param_index=%d, C_type=%d", diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index cfc4ccf4..ab2f794f 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -1598,6 +1598,38 @@ def test_executemany_empty_parameter_list(cursor, db_connection): db_connection.commit() +def test_executemany_NONE_parameter_list(cursor, db_connection): + """Test executemany with an NONE parameter list.""" + try: + cursor.execute("CREATE TABLE #pytest_empty_params (val VARCHAR(50))") + data = [(None,), (None,)] + cursor.executemany("INSERT INTO #pytest_empty_params VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params") + count = cursor.fetchone()[0] + assert count == 2 + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_empty_params") + db_connection.commit() + + +def test_executemany_MIX_NONE_parameter_list(cursor, db_connection): + """Test executemany with an NONE parameter list.""" + try: + cursor.execute("CREATE TABLE #pytest_empty_params (val VARCHAR(50))") + data = [(None,), ("Test",), (None,)] + cursor.executemany("INSERT INTO #pytest_empty_params VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params") + count = cursor.fetchone()[0] + assert count == 3 + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_empty_params") + db_connection.commit() + + def test_executemany_Decimal_list(cursor, db_connection): """Test executemany with an decimal parameter list.""" try: