Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mssql_python/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class ConstantsDDBC(Enum):
# Reset Connection Constants
SQL_RESET_CONNECTION_YES = 1

# Query Timeout Constants
SQL_ATTR_QUERY_TIMEOUT = 0


class GetInfoConstants(Enum):
"""
Expand Down
35 changes: 21 additions & 14 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,13 +681,34 @@ def _initialize_cursor(self) -> None:
Initialize the DDBC statement handle.
"""
self._allocate_statement_handle()
self._set_timeout()

def _allocate_statement_handle(self) -> None:
"""
Allocate the DDBC statement handle.
"""
self.hstmt = self._connection._conn.alloc_statement_handle()

def _set_timeout(self) -> None:
"""
Set the query timeout attribute on the statement handle.
This is called once when the cursor is created and after any handle reallocation.
Following pyodbc's approach for better performance.
"""
if self._timeout > 0:
logger.debug("_set_timeout: Setting query timeout=%d seconds", self._timeout)
try:
timeout_value = int(self._timeout)
ret = ddbc_bindings.DDBCSQLSetStmtAttr(
self.hstmt,
ddbc_sql_const.SQL_ATTR_QUERY_TIMEOUT.value,
timeout_value,
)
check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret)
logger.debug("Query timeout set to %d seconds", timeout_value)
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning("Failed to set query timeout: %s", str(e))

def _reset_cursor(self) -> None:
"""
Reset the DDBC statement handle.
Expand Down Expand Up @@ -1216,20 +1237,6 @@ def execute( # pylint: disable=too-many-locals,too-many-branches,too-many-state
encoding_settings = self._get_encoding_settings()

# Apply timeout if set (non-zero)
if self._timeout > 0:
logger.debug("execute: Setting query timeout=%d seconds", self._timeout)
try:
timeout_value = int(self._timeout)
ret = ddbc_bindings.DDBCSQLSetStmtAttr(
self.hstmt,
ddbc_sql_const.SQL_ATTR_QUERY_TIMEOUT.value,
timeout_value,
)
check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret)
logger.debug("Set query timeout to %d seconds", timeout_value)
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning("Failed to set query timeout: %s", str(e))

logger.debug("execute: Creating parameter type list")
param_info = ddbc_bindings.ParamInfo
parameters_type = []
Expand Down
13 changes: 11 additions & 2 deletions mssql_python/pybind/ddbc_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4392,8 +4392,17 @@ PYBIND11_MODULE(ddbc_bindings, m) {
"Set the decimal separator character");
m.def(
"DDBCSQLSetStmtAttr",
[](SqlHandlePtr stmt, SQLINTEGER attr, SQLPOINTER value) {
return SQLSetStmtAttr_ptr(stmt->get(), attr, value, 0);
[](SqlHandlePtr stmt, SQLINTEGER attr, py::object value) {
SQLPOINTER ptr_value;
if (py::isinstance<py::int_>(value)) {
// For integer attributes like SQL_ATTR_QUERY_TIMEOUT
ptr_value =
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(value.cast<int64_t>()));
} else {
// For pointer attributes
ptr_value = value.cast<SQLPOINTER>();
}
return SQLSetStmtAttr_ptr(stmt->get(), attr, ptr_value, 0);
},
"Set statement attributes");
m.def("DDBCSQLGetTypeInfo", &SQLGetTypeInfo_Wrapper,
Expand Down
299 changes: 299 additions & 0 deletions tests/test_003_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4144,3 +4144,302 @@ def test_getinfo_comprehensive_edge_case_coverage(db_connection):
assert not isinstance(
e, (SystemError, MemoryError)
), f"Info type {info_type} caused critical error: {e}"


def test_timeout_long_running_query_with_small_timeout(conn_str):
"""Test that a long-running query with small timeout (1-2 seconds) raises timeout error.

This test replicates exactly what test_timeout_bug.py does to ensure consistency.
"""
import time
import mssql_python

print(f"DEBUG: Connection string: {conn_str}")

# Test 1: Create connection with timeout parameter (like test_timeout_bug.py)
print("DEBUG: [Test 1] Creating connection with timeout=2 seconds")
connection = mssql_python.connect(conn_str, timeout=2)
print(f"DEBUG: Connection created, timeout property: {connection.timeout}")

try:
cursor = connection.cursor()
start_time = time.perf_counter()
print("DEBUG: Executing WAITFOR DELAY '00:00:05' (5 seconds)")

try:
cursor.execute("WAITFOR DELAY '00:00:05'")
elapsed = time.perf_counter() - start_time
print(f"DEBUG: BUG CONFIRMED: Query completed without timeout after {elapsed:.2f}s")
pytest.skip(
f"Timeout not enforced - query completed in {elapsed:.2f}s (expected ~2s timeout)"
)
except mssql_python.OperationalError as e:
elapsed = time.perf_counter() - start_time
print(f"DEBUG: ✓ Query timed out after {elapsed:.2f}s: {e}")
assert elapsed < 4.0, f"Timeout took too long: {elapsed:.2f}s"
assert "timeout" in str(e).lower(), f"Not a timeout error: {e}"
except Exception as e:
elapsed = time.perf_counter() - start_time
print(f"DEBUG: ✓ Query raised exception after {elapsed:.2f}s: {type(e).__name__}: {e}")
assert elapsed < 4.0, f"Exception took too long: {elapsed:.2f}s"
# Accept any exception that happens quickly as it might be timeout-related
finally:
cursor.close()
connection.close()

except Exception as e:
print(f"DEBUG: Unexpected error in test: {e}")
if connection:
connection.close()
raise

# Test 2: Set timeout dynamically (like test_timeout_bug.py)
print("DEBUG: [Test 2] Setting timeout dynamically via property")
connection = mssql_python.connect(conn_str)
print(f"DEBUG: Initial timeout: {connection.timeout}")
connection.timeout = 2
print(f"DEBUG: After setting: {connection.timeout}")

try:
cursor = connection.cursor()
start_time = time.perf_counter()

try:
cursor.execute("WAITFOR DELAY '00:00:05'")
elapsed = time.perf_counter() - start_time
print(f"DEBUG: BUG CONFIRMED: Query completed without timeout after {elapsed:.2f}s")
# This is the main test - if we get here, timeout is not working
assert (
False
), f"Timeout should have occurred after ~2s, but query completed in {elapsed:.2f}s"
except mssql_python.OperationalError as e:
elapsed = time.perf_counter() - start_time
print(f"DEBUG: ✓ Query timed out after {elapsed:.2f}s: {e}")
assert elapsed < 4.0, f"Timeout took too long: {elapsed:.2f}s"
assert "timeout" in str(e).lower(), f"Not a timeout error: {e}"
except Exception as e:
elapsed = time.perf_counter() - start_time
print(f"DEBUG: ✓ Query raised exception after {elapsed:.2f}s: {type(e).__name__}: {e}")
assert elapsed < 4.0, f"Exception took too long: {elapsed:.2f}s"
finally:
cursor.close()
connection.close()

except Exception as e:
print(f"DEBUG: Unexpected error in dynamic timeout test: {e}")
if connection:
connection.close()
raise


def test_cursor_timeout_single_execute(db_connection):
"""Test that creating a cursor with timeout set and calling execute once behaves correctly."""
cursor = db_connection.cursor()

# Set timeout on connection which should affect cursor
original_timeout = db_connection.timeout
db_connection.timeout = 30 # 30 seconds - reasonable timeout

try:
# Test single execution with timeout set
cursor.execute("SELECT 1 AS test_value")
result = cursor.fetchone()
assert result is not None, "Query should execute successfully with timeout set"
assert result[0] == 1, "Query should return expected result"

# Test that cursor can be used for another query
cursor.execute("SELECT 2 AS test_value")
result = cursor.fetchone()
assert result is not None, "Second query should also work"
assert result[0] == 2, "Second query should return expected result"

finally:
cursor.close()
db_connection.timeout = original_timeout


def test_cursor_timeout_multiple_executions_consistency(db_connection):
"""Test executing multiple times with same cursor and verify timeout applies consistently."""
cursor = db_connection.cursor()

# Set a reasonable timeout
original_timeout = db_connection.timeout
db_connection.timeout = 15 # 15 seconds

try:
# Execute multiple queries in sequence to verify timeout consistency
queries = [
"SELECT 1 AS query_num",
"SELECT 2 AS query_num",
"SELECT 3 AS query_num",
"SELECT GETDATE() AS current_datetime",
"SELECT @@VERSION AS version_info",
]

for i, query in enumerate(queries):
start_time = time.perf_counter()
cursor.execute(query)
result = cursor.fetchone()
elapsed_time = time.perf_counter() - start_time

assert result is not None, f"Query {i+1} should return a result"
# All queries should complete well within the timeout
assert elapsed_time < 10, f"Query {i+1} took too long: {elapsed_time:.2f}s"

# For simple queries, verify expected results
if i < 3: # First three queries return sequential numbers
assert result[0] == i + 1, f"Query {i+1} returned incorrect result"

print(
f"Successfully executed {len(queries)} queries consistently with timeout={db_connection.timeout}s"
)

finally:
cursor.close()
db_connection.timeout = original_timeout


def test_cursor_reset_timeout_behavior(db_connection):
"""Test that _reset_cursor handles timeout correctly and _set_timeout is called as intended."""
# Create initial cursor
cursor1 = db_connection.cursor()

original_timeout = db_connection.timeout
db_connection.timeout = 20 # Set reasonable timeout

try:
# Execute a query to establish cursor state
cursor1.execute("SELECT 'initial_query' AS status")
result1 = cursor1.fetchone()
assert result1[0] == "initial_query", "Initial query should work"
cursor1.close() # Close to release connection resources

# Create another cursor to test that timeout is properly set on new cursors
cursor2 = db_connection.cursor()
cursor2.execute("SELECT 'second_cursor' AS status")
result2 = cursor2.fetchone()
assert result2[0] == "second_cursor", "Second cursor should work with timeout"
cursor2.close() # Close to release connection resources

# Create another cursor to test reuse (simulating _reset_cursor scenario)
cursor3 = db_connection.cursor()
cursor3.execute("SELECT 'reuse_test' AS status")
result3 = cursor3.fetchone()
assert result3[0] == "reuse_test", "Cursor should work with timeout"

# Change timeout and verify cursor still works with new timeout
db_connection.timeout = 25
cursor3.execute("SELECT 'updated_timeout_test' AS status")
result4 = cursor3.fetchone()
assert result4[0] == "updated_timeout_test", "Cursor should work with updated timeout"

# Test that multiple operations work consistently
for i in range(3):
cursor3.execute(f"SELECT 'iteration_{i}' AS status")
result = cursor3.fetchone()
assert result[0] == f"iteration_{i}", f"Iteration {i} should work with timeout"

print(f"Successfully tested cursor reset behavior with timeout settings")

finally:
# Clean up cursor
try:
if "cursor3" in locals() and not cursor3.closed:
cursor3.close()
except:
pass
db_connection.timeout = original_timeout


def test_timeout_compatibility_with_previous_versions(db_connection):
"""Test that timeout behavior is compatible and doesn't break existing functionality."""
cursor = db_connection.cursor()

original_timeout = db_connection.timeout

try:
# Test with default timeout (0 = no timeout)
assert db_connection.timeout == 0, "Default timeout should be 0"

cursor.execute("SELECT 'default_timeout' AS test")
result = cursor.fetchone()
assert result[0] == "default_timeout", "Should work with default timeout"

# Test setting various timeout values
timeout_values = [5, 10, 30, 60, 0] # Including 0 to reset

for timeout_val in timeout_values:
db_connection.timeout = timeout_val
assert db_connection.timeout == timeout_val, f"Timeout should be set to {timeout_val}"

# Execute a quick query to verify functionality
cursor.execute(f"SELECT {timeout_val} AS timeout_value")
result = cursor.fetchone()
assert result[0] == timeout_val, f"Should work with timeout={timeout_val}"

# Test that timeout doesn't affect normal operations
test_operations = [
("SELECT COUNT(*) FROM sys.objects", "count query"),
("SELECT DB_NAME()", "database name"),
("SELECT GETDATE()", "current date"),
("SELECT 1 WHERE 1=1", "conditional query"),
("SELECT 'test' + 'string'", "string concatenation"),
]

db_connection.timeout = 10 # Set reasonable timeout

for query, description in test_operations:
cursor.execute(query)
result = cursor.fetchone()
assert result is not None, f"Operation '{description}' should work with timeout"

print("Successfully verified timeout compatibility with existing functionality")

finally:
cursor.close()
db_connection.timeout = original_timeout


def test_timeout_edge_cases_and_boundaries(db_connection):
"""Test timeout behavior with edge cases and boundary conditions."""
cursor = db_connection.cursor()
original_timeout = db_connection.timeout

try:
# Test boundary timeout values
boundary_values = [0, 1, 2, 5, 10, 30, 60, 120, 300] # 0 to 5 minutes

for timeout_val in boundary_values:
db_connection.timeout = timeout_val
assert (
db_connection.timeout == timeout_val
), f"Should accept timeout value {timeout_val}"

# Execute a very quick query to ensure no issues with boundary values
cursor.execute("SELECT 1 AS boundary_test")
result = cursor.fetchone()
assert result[0] == 1, f"Should work with boundary timeout {timeout_val}"

# Test with zero timeout (no timeout)
db_connection.timeout = 0
cursor.execute("SELECT 'no_timeout_test' AS test")
result = cursor.fetchone()
assert result[0] == "no_timeout_test", "Should work with zero timeout"

# Test invalid timeout values (should raise ValueError)
invalid_values = [-1, -5, -100]
for invalid_val in invalid_values:
with pytest.raises(ValueError, match="Timeout cannot be negative"):
db_connection.timeout = invalid_val

# Test non-integer timeout values (should raise TypeError)
invalid_types = ["10", 10.5, None, [], {}]
for invalid_type in invalid_types:
with pytest.raises(TypeError):
db_connection.timeout = invalid_type

print("Successfully tested timeout edge cases and boundaries")

finally:
cursor.close()
db_connection.timeout = original_timeout
Loading