FIX: Fixed bugs in existing test suite and added additional stress tests#523
FIX: Fixed bugs in existing test suite and added additional stress tests#523gargsaumya wants to merge 1 commit intomainfrom
Conversation
New tests: - test_executemany_large_batch_mixed_types: 10k rows, mixed types via executemany - test_null_heavy_large_result_set: 50k rows with 6/8 cols NULL, verify None propagation - test_cursor_reuse_high_iteration: single cursor reused 5000 times, memory leak check - test_fetchone_loop_vs_fetchall_parity: 100k row bit-identical comparison of both fetch paths - test_concurrent_read_write_no_corruption: 5 writers + 5 readers on shared table for 10s Bug fixes: - test_concurrent_fetch_data_integrity_no_corruption: verification block was inside thread.join() loop instead of after it (race condition + premature fail) - test_pool_exhaustion_recovery: pool max_size was silently overridden by run_parallel(); added pool_max_size param to MultiThreadedQueryRunner; fixed multi-statement query - test numbering: docstring sequence skipped #4 (renumbered #5-#7 -> #4-#6) Also registers 'slow' mark in pytest.ini to suppress PytestUnknownMarkWarning
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR expands the stress test suite with additional high-scale single-threaded and multi-threaded scenarios, and fixes a few correctness/config issues in existing tests.
Changes:
- Added multiple new single-threaded stress tests (executemany mixed types, NULL-heavy results, cursor reuse, fetch parity).
- Improved multi-threaded stress runner configurability via explicit
pool_max_size, and updated pool exhaustion test accordingly. - Added a new concurrent mixed read/write stress test and registered the
slowpytest marker.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| tests/test_020_multithreaded_stress.py | Adds pool_max_size to the runner, adjusts pool exhaustion test, and introduces a new concurrent read/write stress test. |
| tests/test_011_singlethreaded_stress.py | Fixes a race in existing concurrent fetch verification and adds several new large-scale stress tests. |
| pytest.ini | Registers the slow marker to avoid unknown-mark warnings. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| prev_count = 0 | ||
| try: | ||
| conn = connect(stress_conn_str) | ||
| cur = conn.cursor() | ||
| while not stop_event.is_set(): | ||
| # NOLOCK avoids blocking on uncommitted writer rows under READ COMMITTED | ||
| cur.execute(f"SELECT COUNT(*) FROM {table_name} WITH (NOLOCK)") | ||
| row = cur.fetchone() | ||
| if row is not None: | ||
| current_count = row[0] | ||
| # Under READ COMMITTED the count must never decrease | ||
| if current_count < prev_count: | ||
| with errors_lock: | ||
| errors.append( | ||
| f"Reader {reader_id}: COUNT went DOWN " | ||
| f"from {prev_count} to {current_count} — data anomaly" | ||
| ) | ||
| prev_count = current_count |
There was a problem hiding this comment.
The test asserts the COUNT(*) must never decrease “under READ COMMITTED”, but the query uses WITH (NOLOCK) (READ UNCOMMITTED), where non-monotonic counts are expected (dirty reads/allocation-order effects). This can make the test fail intermittently even when the driver is correct. Fix by either removing NOLOCK (and accepting/block-handling appropriately), or keeping NOLOCK and removing/relaxing the “count never decreases” invariant (e.g., only verify “no crashes/unexpected exceptions” and final row counts).
| prev_count = 0 | |
| try: | |
| conn = connect(stress_conn_str) | |
| cur = conn.cursor() | |
| while not stop_event.is_set(): | |
| # NOLOCK avoids blocking on uncommitted writer rows under READ COMMITTED | |
| cur.execute(f"SELECT COUNT(*) FROM {table_name} WITH (NOLOCK)") | |
| row = cur.fetchone() | |
| if row is not None: | |
| current_count = row[0] | |
| # Under READ COMMITTED the count must never decrease | |
| if current_count < prev_count: | |
| with errors_lock: | |
| errors.append( | |
| f"Reader {reader_id}: COUNT went DOWN " | |
| f"from {prev_count} to {current_count} — data anomaly" | |
| ) | |
| prev_count = current_count | |
| try: | |
| conn = connect(stress_conn_str) | |
| cur = conn.cursor() | |
| while not stop_event.is_set(): | |
| # NOLOCK avoids blocking during concurrent writes, but intermediate | |
| # COUNT(*) results are allowed to fluctuate under READ UNCOMMITTED. | |
| # This reader verifies that concurrent reads execute without errors. | |
| cur.execute(f"SELECT COUNT(*) FROM {table_name} WITH (NOLOCK)") | |
| row = cur.fetchone() | |
| if row is not None: | |
| current_count = row[0] |
| stop_event = threading.Event() | ||
| errors: list = [] | ||
| errors_lock = threading.Lock() | ||
| writer_committed: dict = {} # writer_id -> committed row count |
There was a problem hiding this comment.
writer_committed is mutated by multiple writer threads without synchronization. Concurrent dict writes are not thread-safe and can cause flaky runtime errors or corrupted state. Use a lock around updates (reuse errors_lock or add a dedicated lock), or use a pre-sized list (index by writer_id) to avoid concurrent dict mutations.
| stop_event = threading.Event() | |
| errors: list = [] | |
| errors_lock = threading.Lock() | |
| writer_committed: dict = {} # writer_id -> committed row count | |
| class ThreadSafeDict(dict): | |
| def __init__(self): | |
| super().__init__() | |
| self._lock = threading.Lock() | |
| def __setitem__(self, key, value): | |
| with self._lock: | |
| super().__setitem__(key, value) | |
| def __getitem__(self, key): | |
| with self._lock: | |
| return super().__getitem__(key) | |
| def get(self, key, default=None): | |
| with self._lock: | |
| return super().get(key, default) | |
| def items(self): | |
| with self._lock: | |
| return list(super().items()) | |
| def values(self): | |
| with self._lock: | |
| return list(super().values()) | |
| def __contains__(self, key): | |
| with self._lock: | |
| return super().__contains__(key) | |
| def copy(self): | |
| with self._lock: | |
| return dict(super().items()) | |
| stop_event = threading.Event() | |
| errors: list = [] | |
| errors_lock = threading.Lock() | |
| writer_committed: dict = ThreadSafeDict() # writer_id -> committed row count |
| conn.close() | ||
| except Exception: | ||
| pass | ||
| writer_committed[writer_id] = count |
There was a problem hiding this comment.
writer_committed is mutated by multiple writer threads without synchronization. Concurrent dict writes are not thread-safe and can cause flaky runtime errors or corrupted state. Use a lock around updates (reuse errors_lock or add a dedicated lock), or use a pre-sized list (index by writer_id) to avoid concurrent dict mutations.
| writer_committed[writer_id] = count | |
| with errors_lock: | |
| writer_committed[writer_id] = count |
| for t in all_threads: | ||
| t.join(timeout=30) | ||
|
|
There was a problem hiding this comment.
Threads are joined with a timeout, but the test proceeds to verification and drops the table without checking whether any threads are still alive. If a writer/reader thread is still running, the DROP can fail or the thread can start erroring mid-flight, making the test flaky and potentially leaking connections. After the join loop, explicitly verify all threads exited (e.g., collect is_alive() and fail/extend waiting), and consider not using daemon=True for these threads so the test reliably enforces cleanup.
| setup_cursor.execute(f"DROP TABLE IF EXISTS {table_name}") | ||
| setup_conn.commit() | ||
| setup_cursor.close() | ||
| setup_conn.close() |
There was a problem hiding this comment.
Threads are joined with a timeout, but the test proceeds to verification and drops the table without checking whether any threads are still alive. If a writer/reader thread is still running, the DROP can fail or the thread can start erroring mid-flight, making the test flaky and potentially leaking connections. After the join loop, explicitly verify all threads exited (e.g., collect is_alive() and fail/extend waiting), and consider not using daemon=True for these threads so the test reliably enforces cleanup.
| runner = MultiThreadedQueryRunner( | ||
| conn_str=stress_conn_str, | ||
| query="SELECT 1; WAITFOR DELAY '00:00:00.050'", # 50ms delay | ||
| query="WAITFOR DELAY '00:00:00.050'", # 50ms delay to hold connections |
There was a problem hiding this comment.
MultiThreadedQueryRunner.execute_single_query() reads results by iterating the cursor; depending on the DB-API/driver behavior, executing a statement with no result set (like WAITFOR) can raise when fetching/iterating. To avoid driver-dependent failures, either (a) make the query return a row (e.g., SELECT 1; WAITFOR ...) and/or (b) update the runner to only iterate when a result set exists (e.g., guard on cursor.description).
| query="WAITFOR DELAY '00:00:00.050'", # 50ms delay to hold connections | |
| query="SELECT 1; WAITFOR DELAY '00:00:00.050'", # Return a row, then hold connections for 50ms |
| # Configure pooling | ||
| if self.enable_pooling: | ||
| mssql_python.pooling(enabled=True, max_size=max(100, num_threads * 2)) | ||
| effective_max_size = self.pool_max_size if self.pool_max_size is not None else max(100, num_threads * 2) |
There was a problem hiding this comment.
The conditional assignment for effective_max_size is quite long and harder to scan; it’s also likely to exceed typical formatter limits in Python. Consider wrapping it across multiple lines (or using an explicit if/else block) to keep the pooling configuration section easier to read and maintain.
| effective_max_size = self.pool_max_size if self.pool_max_size is not None else max(100, num_threads * 2) | |
| if self.pool_max_size is not None: | |
| effective_max_size = self.pool_max_size | |
| else: | |
| effective_max_size = max(100, num_threads * 2) |
New tests:
Bug fixes:
Also registers 'slow' mark in pytest.ini to suppress PytestUnknownMarkWarning
Work Item / Issue Reference
Summary
This pull request adds several new high-scale stress tests and improves the configurability and coverage of both single-threaded and multi-threaded stress testing. The main updates include three new single-threaded stress tests for executemany with mixed types, NULL-heavy result sets, and high-iteration cursor re-use, plus a fetchone vs fetchall parity test. Multi-threaded stress tests now support explicit pool size configuration and improved pool exhaustion testing. Test documentation and marker registration have also been updated.
Single-threaded stress test enhancements:
test_executemany_large_batch_mixed_typesto stress-testexecutemany()with 10,000 mixed-type rows, verifying parameter serialization and correctness.test_null_heavy_large_result_setto fetch 50,000 rows with 6/8 columns as NULL, ensuring correct NULL handling at scale.test_cursor_reuse_high_iterationto re-use a single cursor for 5,000 execute/fetch cycles, checking for memory leaks and state reset.test_fetchone_loop_vs_fetchall_parityto verify bit-identical results between fetchone() and fetchall() for 100,000 rows.Multi-threaded stress test improvements:
MultiThreadedQueryRunnernow accepts apool_max_sizeparameter, allowing explicit control of the connection pool size in tests. [1] [2]test_pool_exhaustion_recovery) now uses the explicit pool size parameter and improved query logic for more accurate stress simulation and reporting. [1] [2]Test marker and config updates:
slowpytest marker for extra-slow tests, complementing the existingstressmarker.These changes expand test coverage for edge-case and high-load scenarios, improve test documentation, and make stress tests more configurable and robust.