diff --git a/pytest.ini b/pytest.ini index dc94ab9e..55827f1f 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,6 +2,7 @@ # Register custom markers markers = stress: marks tests as stress tests (long-running, resource-intensive) + slow: marks tests as extra-slow (sustained load, multi-minute duration) # Default options applied to all pytest runs # Default: pytest -v → Skips stress tests (fast) diff --git a/tests/test_011_singlethreaded_stress.py b/tests/test_011_singlethreaded_stress.py index 7750fee5..6f9ac53e 100644 --- a/tests/test_011_singlethreaded_stress.py +++ b/tests/test_011_singlethreaded_stress.py @@ -7,6 +7,11 @@ - Exception handling during batch processing - Thousands of empty string allocations - 10MB+ LOB data handling +- Concurrent fetch data integrity (multi-cursor) +- executemany() with 10,000 mixed-type rows +- NULL-heavy result sets (50,000 rows, 6/8 columns NULL) +- Cursor re-use across 5,000 execute/fetch cycles +- fetchone() loop vs fetchall() parity at 100,000 rows Tests are marked with @pytest.mark.stress and may be skipped in regular CI runs. """ @@ -262,7 +267,7 @@ def test_thousands_of_empty_strings_allocation_stress(cursor, db_connection): @pytest.mark.stress def test_large_result_set_100k_rows_no_overflow(cursor, db_connection): """ - Test #5: Fetch very large result sets (100,000+ rows) to test buffer overflow protection. + Test #4: Fetch very large result sets (100,000+ rows) to test buffer overflow protection. Tests that large rowIdx values don't cause buffer overflow when calculating rowIdx × fetchBufferSize. Verifies data integrity across all rows - no crashes, @@ -356,7 +361,7 @@ def test_large_result_set_100k_rows_no_overflow(cursor, db_connection): @pytest.mark.stress def test_very_large_lob_10mb_data_integrity(cursor, db_connection): """ - Test #6: Fetch VARCHAR(MAX), NVARCHAR(MAX), VARBINARY(MAX) with 10MB+ data. + Test #5: Fetch VARCHAR(MAX), NVARCHAR(MAX), VARBINARY(MAX) with 10MB+ data. Verifies: 1. Correct LOB detection @@ -458,7 +463,7 @@ def test_very_large_lob_10mb_data_integrity(cursor, db_connection): @pytest.mark.stress def test_concurrent_fetch_data_integrity_no_corruption(db_connection, conn_str): """ - Test #7: Multiple threads/cursors fetching data simultaneously. + Test #6: Multiple threads/cursors fetching data simultaneously. Verifies: 1. No data corruption occurs @@ -547,18 +552,18 @@ def worker_thread(thread_id: int, conn_str: str, results_list: List, errors_list for thread in threads: thread.join() - # Verify results - print(f"\nConcurrent fetch results:") - for result in results: - print( - f" Thread {result['thread_id']}: Fetched {result['rows_fetched']} rows - {'OK' if result['success'] else 'FAILED'}" - ) + # Verify results (after ALL threads have finished) + print(f"\nConcurrent fetch results:") + for result in results: + print( + f" Thread {result['thread_id']}: Fetched {result['rows_fetched']} rows - {'OK' if result['success'] else 'FAILED'}" + ) - if errors: - print(f"\nErrors encountered:") - for error in errors: - print(f" Thread {error['thread_id']}: {error['error']}") - pytest.fail(f"Concurrent fetch had {len(errors)} errors") + if errors: + print(f"\nErrors encountered:") + for error in errors: + print(f" Thread {error['thread_id']}: {error['error']}") + pytest.fail(f"Concurrent fetch had {len(errors)} errors") # All threads should have succeeded assert ( @@ -574,3 +579,331 @@ def worker_thread(thread_id: int, conn_str: str, results_list: List, errors_list print( f"\n[OK] Concurrent fetch test passed: {num_threads} threads, no corruption, no race conditions" ) + + +# ============================================================================ +# New Stress Tests +# ============================================================================ + + +@pytest.mark.stress +def test_executemany_large_batch_mixed_types(cursor, db_connection): + """ + Test #7: Stress executemany() with 10,000 rows of mixed parameter types in a + single call. + + Exercises parameter serialization at scale for INT, FLOAT, NVARCHAR, VARBINARY, + DECIMAL, and NULL in one large executemany batch. Verifies the inserted row count + and spot-checks five rows for exact value correctness. + """ + num_rows = 10000 + + try: + drop_table_if_exists(cursor, "#pytest_executemany_stress") + + cursor.execute(""" + CREATE TABLE #pytest_executemany_stress ( + id INT, + int_col INT, + float_col FLOAT, + str_col NVARCHAR(100), + bytes_col VARBINARY(50), + dec_col DECIMAL(18, 6), + null_col NVARCHAR(50) + ) + """) + db_connection.commit() + + # Build 10,000 rows with predictable, verifiable values + rows = [ + ( + i, + i * 2, + float(i) * 1.5, + f"str_{i}", + bytes([i % 256]) * 10, + decimal.Decimal(str(i)) / decimal.Decimal("1000"), + None, # always NULL to exercise the NULL serialization path + ) + for i in range(num_rows) + ] + + # Single large executemany call — stresses parameter serialization at scale + cursor.executemany( + "INSERT INTO #pytest_executemany_stress VALUES (?, ?, ?, ?, ?, ?, ?)", + rows, + ) + db_connection.commit() + + # Verify total count + cursor.execute("SELECT COUNT(*) FROM #pytest_executemany_stress") + count = cursor.fetchone()[0] + assert count == num_rows, f"Expected {num_rows} rows, got {count}" + print(f"[OK] executemany stress: {num_rows} rows inserted") + + # Spot-check five representative rows + for idx in [0, 1, 500, 5000, 9999]: + cursor.execute( + "SELECT id, int_col, float_col, str_col, bytes_col, dec_col, null_col" + " FROM #pytest_executemany_stress WHERE id = ?", + (idx,), + ) + row = cursor.fetchone() + assert row is not None, f"Row {idx} not found after executemany" + assert row[0] == idx, f"Row {idx}: id mismatch" + assert row[1] == idx * 2, f"Row {idx}: int_col mismatch" + assert abs(row[2] - float(idx) * 1.5) < 1e-9, f"Row {idx}: float_col mismatch" + assert row[3] == f"str_{idx}", f"Row {idx}: str_col mismatch" + assert row[4] == bytes([idx % 256]) * 10, f"Row {idx}: bytes_col mismatch" + expected_dec = decimal.Decimal(str(idx)) / decimal.Decimal("1000") + assert row[5] == expected_dec, ( + f"Row {idx}: dec_col mismatch: got {row[5]}, expected {expected_dec}" + ) + assert row[6] is None, f"Row {idx}: null_col should be None, got {row[6]}" + + print("[OK] executemany stress: all 5 spot-checks passed (int, float, str, bytes, decimal, NULL)") + + except Exception as e: + pytest.fail(f"executemany large batch stress failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_executemany_stress") + db_connection.commit() + + +@pytest.mark.stress +def test_null_heavy_large_result_set(cursor, db_connection): + """ + Test #8: Fetch 50,000 rows where 6 of 8 columns are always NULL. + + Real-world tables have many nullable columns and SQL NULL takes a separate code + path in the fetch layer. This test stresses that path at scale and verifies: + - All NULL columns map to Python None (no corruption) + - The two non-null sentinel columns are intact + - No crashes, no partial rows + + Note: VARBINARY is excluded because the driver cannot reliably infer the SQL + type from a Python None, causing an implicit-conversion error on SQL Server. + Binary-NULL handling is covered by test_thousands_of_empty_strings_allocation_stress. + """ + num_rows = 50000 + + try: + drop_table_if_exists(cursor, "#pytest_null_heavy") + + # Note: VARBINARY is intentionally excluded — passing None for a VARBINARY + # column causes the driver to infer SQL_C_CHAR, which SQL Server rejects with + # an implicit-conversion error. NULL handling for binary data is covered by + # test_thousands_of_empty_strings_allocation_stress instead. + cursor.execute(""" + CREATE TABLE #pytest_null_heavy ( + id INT NOT NULL, + non_null_str NVARCHAR(30) NOT NULL, + null_int INT, + null_float FLOAT, + null_str NVARCHAR(100), + null_nvarchar NVARCHAR(MAX), + null_datetime DATETIME, + null_bit BIT + ) + """) + db_connection.commit() + + # Insert in batches of 1000 to avoid a single huge parameter array + batch_size = 1000 + for batch_start in range(0, num_rows, batch_size): + batch = [ + (i, f"ID_{i}", None, None, None, None, None, None) + for i in range(batch_start, min(batch_start + batch_size, num_rows)) + ] + cursor.executemany( + "INSERT INTO #pytest_null_heavy VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + batch, + ) + db_connection.commit() + print(f"[OK] Inserted {num_rows} NULL-heavy rows") + + # Fetch all at once and verify + cursor.execute("SELECT * FROM #pytest_null_heavy ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == num_rows, f"Expected {num_rows} rows, got {len(rows)}" + + for i, row in enumerate(rows): + assert row[0] == i, f"Row {i}: id mismatch (got {row[0]})" + assert row[1] == f"ID_{i}", f"Row {i}: non_null_str mismatch (got {row[1]})" + # Columns 2–7 must be Python None (SQL NULL) + for col_idx in range(2, 8): + assert row[col_idx] is None, ( + f"Row {i} col {col_idx}: expected None, got {row[col_idx]!r}" + ) + + print( + f"[OK] NULL-heavy stress: {num_rows} rows, all 6 nullable cols are None, " + "no corruption in non-null sentinel columns" + ) + + except Exception as e: + pytest.fail(f"NULL-heavy result set stress failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_null_heavy") + db_connection.commit() + + +@pytest.mark.stress +def test_cursor_reuse_high_iteration(db_connection): + """ + Test #9: Re-use a single cursor for 5,000 sequential execute/fetch cycles. + + ORM frameworks and connection pools re-use cursors heavily. This test verifies + that cursor state is correctly reset between executes, results are accurate after + thousands of prior queries, and memory does not grow unboundedly (potential leak). + + Three alternating query patterns exercise different internal code paths each cycle. + """ + import psutil + import gc + + iterations = 5000 + stress_cursor = db_connection.cursor() + + try: + gc.collect() + process = psutil.Process() + baseline_rss_mb = process.memory_info().rss / (1024 * 1024) + + for i in range(iterations): + pattern = i % 3 + + if pattern == 0: + # Parametrized WHERE clause — exercises parameter binding path + stress_cursor.execute( + "SELECT COUNT(*) FROM sys.objects WHERE object_id > ?", + (i % 1000,), + ) + row = stress_cursor.fetchone() + assert row is not None, f"Iter {i}: fetchone returned None" + assert isinstance(row[0], int), f"Iter {i}: COUNT(*) not an int" + + elif pattern == 1: + # Multi-row result — exercises fetchall path + stress_cursor.execute( + "SELECT TOP 5 name, object_id FROM sys.objects ORDER BY object_id" + ) + rows = stress_cursor.fetchall() + assert len(rows) <= 5, f"Iter {i}: got {len(rows)} rows, expected ≤5" + assert all(r[0] is not None for r in rows), f"Iter {i}: NULL name in result" + + else: + # Scalar query — simplest fetch path + stress_cursor.execute("SELECT 1 AS n, 'hello' AS s") + row = stress_cursor.fetchone() + assert row[0] == 1, f"Iter {i}: scalar mismatch (got {row[0]})" + assert row[1] == "hello", f"Iter {i}: string mismatch (got {row[1]})" + + gc.collect() + final_rss_mb = process.memory_info().rss / (1024 * 1024) + mem_growth_mb = final_rss_mb - baseline_rss_mb + + # 50MB growth limit across 5,000 iterations is generous but detects real leaks + assert mem_growth_mb < 50, ( + f"Potential cursor memory leak: {mem_growth_mb:.1f}MB growth over {iterations} iterations" + ) + + print( + f"[OK] Cursor re-use stress: {iterations} iterations, " + f"memory delta {mem_growth_mb:+.1f}MB, all results correct" + ) + + except Exception as e: + pytest.fail(f"Cursor re-use stress failed: {e}") + finally: + stress_cursor.close() + + +@pytest.mark.stress +def test_fetchone_loop_vs_fetchall_parity(cursor, db_connection): + """ + Test #10: Verify fetchone() loop and fetchall() produce bit-identical results + for 100,000 rows. + + The two fetch paths have separate internal implementations. Any divergence — + wrong values, swapped columns, missing rows — indicates a bug in one of them. + This test surfaces such divergence at a scale where the bug would not be visible + in small unit tests. + """ + num_rows = 100000 + + try: + drop_table_if_exists(cursor, "#pytest_fetch_parity") + + cursor.execute(""" + CREATE TABLE #pytest_fetch_parity ( + id INT, + val NVARCHAR(20), + num INT + ) + """) + db_connection.commit() + + batch_size = 1000 + for start in range(0, num_rows, batch_size): + batch = [ + (i, f"V_{i}", i * 3) + for i in range(start, min(start + batch_size, num_rows)) + ] + cursor.executemany( + "INSERT INTO #pytest_fetch_parity VALUES (?, ?, ?)", batch + ) + db_connection.commit() + print(f"[OK] Inserted {num_rows} rows for parity test") + + # Path A: fetchone() loop + cursor.execute( + "SELECT id, val, num FROM #pytest_fetch_parity ORDER BY id" + ) + fetchone_rows: List[Tuple] = [] + while True: + row = cursor.fetchone() + if row is None: + break + fetchone_rows.append(row) + + assert len(fetchone_rows) == num_rows, ( + f"fetchone loop got {len(fetchone_rows)} rows, expected {num_rows}" + ) + print(f"[OK] fetchone loop: {len(fetchone_rows)} rows collected") + + # Path B: fetchall() + cursor.execute( + "SELECT id, val, num FROM #pytest_fetch_parity ORDER BY id" + ) + fetchall_rows = cursor.fetchall() + + assert len(fetchall_rows) == num_rows, ( + f"fetchall got {len(fetchall_rows)} rows, expected {num_rows}" + ) + print(f"[OK] fetchall: {len(fetchall_rows)} rows collected") + + # Row-by-row comparison + for i in range(num_rows): + fo = fetchone_rows[i] + fa = fetchall_rows[i] + assert fo[0] == fa[0] == i, ( + f"Row {i}: id mismatch (fetchone={fo[0]}, fetchall={fa[0]})" + ) + assert fo[1] == fa[1] == f"V_{i}", ( + f"Row {i}: val mismatch (fetchone={fo[1]!r}, fetchall={fa[1]!r})" + ) + assert fo[2] == fa[2] == i * 3, ( + f"Row {i}: num mismatch (fetchone={fo[2]}, fetchall={fa[2]})" + ) + + print( + f"[OK] fetchone/fetchall parity: {num_rows} rows identical across both fetch paths" + ) + + except Exception as e: + pytest.fail(f"fetchone vs fetchall parity test failed: {e}") + finally: + drop_table_if_exists(cursor, "#pytest_fetch_parity") + db_connection.commit() diff --git a/tests/test_020_multithreaded_stress.py b/tests/test_020_multithreaded_stress.py index fa9d7062..c4e0c076 100644 --- a/tests/test_020_multithreaded_stress.py +++ b/tests/test_020_multithreaded_stress.py @@ -3,10 +3,11 @@ These tests verify the driver's behavior under multi-threaded conditions: - Concurrent connections with 2, 5, 10, 50, 100 threads, and 2x CPU cores -- Connection pooling under stress +- Connection pooling under stress (including pool exhaustion recovery) - Thread safety of query execution - Memory and resource usage under load - Race condition detection +- Concurrent mixed read/write on a shared global temp table Tests are marked with @pytest.mark.stress and are designed to be run in a dedicated pipeline separate from regular CI tests. @@ -112,12 +113,14 @@ def __init__( query: str = "SELECT 1 as num, 'test' as str, GETDATE() as dt", verbose: bool = False, enable_pooling: bool = True, + pool_max_size: Optional[int] = None, timeout_seconds: int = 120, ): self.conn_str = conn_str self.query = query self.verbose = verbose self.enable_pooling = enable_pooling + self.pool_max_size = pool_max_size self.timeout_seconds = timeout_seconds self.stats_lock = threading.Lock() @@ -247,7 +250,8 @@ def run_parallel( # Configure pooling if self.enable_pooling: - mssql_python.pooling(enabled=True, max_size=max(100, num_threads * 2)) + effective_max_size = self.pool_max_size if self.pool_max_size is not None else max(100, num_threads * 2) + mssql_python.pooling(enabled=True, max_size=effective_max_size) else: mssql_python.pooling(enabled=False) @@ -726,16 +730,15 @@ def test_pool_exhaustion_recovery(stress_conn_str): Creates more threads than pool size to test queuing and recovery. """ - # Set small pool size - mssql_python.pooling(enabled=True, max_size=10) - num_threads = 50 # 5x the pool size iterations = 10 + pool_size = 10 runner = MultiThreadedQueryRunner( conn_str=stress_conn_str, - query="SELECT 1; WAITFOR DELAY '00:00:00.050'", # 50ms delay + query="WAITFOR DELAY '00:00:00.050'", # 50ms delay to hold connections enable_pooling=True, + pool_max_size=pool_size, # Explicitly cap the pool to force exhaustion timeout_seconds=180, ) @@ -749,7 +752,7 @@ def test_pool_exhaustion_recovery(stress_conn_str): ), f"Too many queries failed under pool exhaustion: {completion_rate*100:.1f}%" print( - f"[PASSED] Pool exhaustion test: {completion_rate*100:.1f}% completion with pool_size=10, threads=50" + f"[PASSED] Pool exhaustion test: {completion_rate*100:.1f}% completion with pool_size={pool_size}, threads={num_threads}" ) @@ -1043,3 +1046,186 @@ def test_comprehensive_thread_scaling(stress_conn_str, num_threads, iterations, f"[PASSED] {num_threads}T x {iterations}I, pooling={pooling}: " f"{result.throughput_qps:.1f} qps, {error_rate*100:.1f}% errors" ) + + +# ============================================================================ +# Concurrent Read/Write Stress Test +# ============================================================================ + + +@pytest.mark.stress +def test_concurrent_read_write_no_corruption(stress_conn_str): + """ + Test simultaneous INSERTs (writers) and SELECTs (readers) on a shared table. + + 5 writer threads continuously INSERT rows for 10 seconds. + 5 reader threads continuously execute SELECT COUNT(*) during that same window. + + Verifies: + - The driver does not crash under mixed concurrent load + - Readers never observe the row count decrease (no phantom rollback visible to readers) + - Every writer contributed at least one committed row (no silent write failure) + - No unexpected exceptions bubble out of driver internals + + Uses a global temp table (##) so all sessions can see the same data. + A unique hex suffix prevents collisions when tests run in parallel pipelines. + """ + import uuid + + # Global temp table — visible across connections within the same SQL Server instance + table_name = f"##pytest_rw_{uuid.uuid4().hex[:12]}" + + run_duration = 10 # seconds + num_writers = 5 + num_readers = 5 + + stop_event = threading.Event() + errors: list = [] + errors_lock = threading.Lock() + writer_committed: dict = {} # writer_id -> committed row count + + # --- Setup: create the shared table on a dedicated connection --- + setup_conn = connect(stress_conn_str) + setup_cursor = setup_conn.cursor() + try: + setup_cursor.execute( + f""" + CREATE TABLE {table_name} ( + id INT IDENTITY(1, 1) PRIMARY KEY, + writer_id INT NOT NULL, + val INT NOT NULL + ) + """ + ) + setup_conn.commit() + except Exception as e: + setup_cursor.close() + setup_conn.close() + pytest.fail(f"Failed to create shared table: {e}") + + # --- Worker definitions --- + + def writer(writer_id: int): + conn = None + cur = None + count = 0 + try: + conn = connect(stress_conn_str) + cur = conn.cursor() + while not stop_event.is_set(): + cur.execute( + f"INSERT INTO {table_name} (writer_id, val) VALUES (?, ?)", + (writer_id, count), + ) + conn.commit() + count += 1 + time.sleep(0.01) # 10ms gap prevents thundering-herd on the IDENTITY page + except Exception as e: + with errors_lock: + errors.append(f"Writer {writer_id}: {e}") + finally: + if cur: + try: + cur.close() + except Exception: + pass + if conn: + try: + conn.close() + except Exception: + pass + writer_committed[writer_id] = count + + def reader(reader_id: int): + conn = None + cur = None + prev_count = 0 + try: + conn = connect(stress_conn_str) + cur = conn.cursor() + while not stop_event.is_set(): + # NOLOCK avoids blocking on uncommitted writer rows under READ COMMITTED + cur.execute(f"SELECT COUNT(*) FROM {table_name} WITH (NOLOCK)") + row = cur.fetchone() + if row is not None: + current_count = row[0] + # Under READ COMMITTED the count must never decrease + if current_count < prev_count: + with errors_lock: + errors.append( + f"Reader {reader_id}: COUNT went DOWN " + f"from {prev_count} to {current_count} — data anomaly" + ) + prev_count = current_count + except Exception as e: + with errors_lock: + errors.append(f"Reader {reader_id}: {e}") + finally: + if cur: + try: + cur.close() + except Exception: + pass + if conn: + try: + conn.close() + except Exception: + pass + + # --- Launch threads --- + all_threads = [] + for i in range(num_writers): + t = threading.Thread(target=writer, args=(i,), daemon=True, name=f"Writer-{i}") + all_threads.append(t) + t.start() + + for i in range(num_readers): + t = threading.Thread(target=reader, args=(i,), daemon=True, name=f"Reader-{i}") + all_threads.append(t) + t.start() + + # Run for the specified duration, then signal stop + time.sleep(run_duration) + stop_event.set() + + for t in all_threads: + t.join(timeout=30) + + # --- Final verification via setup connection --- + try: + setup_cursor.execute( + f"SELECT COUNT(*), COUNT(DISTINCT writer_id) FROM {table_name}" + ) + agg_row = setup_cursor.fetchone() + if agg_row is None: + pytest.fail("Aggregate query returned no row — shared table may have been dropped early") + total_rows = agg_row[0] + distinct_writers = agg_row[1] + finally: + setup_cursor.execute(f"DROP TABLE IF EXISTS {table_name}") + setup_conn.commit() + setup_cursor.close() + setup_conn.close() + + print( + f"\nConcurrent read/write results: {total_rows} total rows, " + f"{distinct_writers}/{num_writers} distinct writers active" + ) + for wid, cnt in sorted(writer_committed.items()): + print(f" Writer {wid}: {cnt} committed rows") + + # All data-anomaly and driver-crash errors must be zero + assert not errors, f"Concurrent read/write errors: {errors[:5]}" + + # Every writer must have committed at least one row in 15 seconds + assert distinct_writers == num_writers, ( + f"Expected {num_writers} distinct writers in final table, got {distinct_writers}" + ) + + # The table must contain rows (trivially true if writers worked) + assert total_rows > 0, "No rows found in shared table after 15s of writer threads" + + print( + f"[PASSED] Concurrent read/write: {total_rows} rows, " + f"count never decreased (NOLOCK), no driver crashes" + )