From 8786964450fd28c6eb9e1ace1398f5a0c93763e7 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Thu, 23 Apr 2026 17:33:55 -0400 Subject: [PATCH 01/11] PYTHON-5784 Increase code coverage for periodic_executor.py from 79.25% to >=80% --- test/test_periodic_executor.py | 424 +++++++++++++++++++++++++++++++++ 1 file changed, 424 insertions(+) create mode 100644 test/test_periodic_executor.py diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py new file mode 100644 index 0000000000..00352479c3 --- /dev/null +++ b/test/test_periodic_executor.py @@ -0,0 +1,424 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for periodic_executor.py.""" + +from __future__ import annotations + +import asyncio +import sys +import threading +import time +import weakref + +sys.path[0:0] = [""] + +from test import unittest + +import pymongo.periodic_executor as pe_module +from pymongo.periodic_executor import ( + AsyncPeriodicExecutor, + PeriodicExecutor, + _register_executor, + _shutdown_executors, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_sync(interval=30.0, min_interval=0.01, target=None, name="test"): + if target is None: + + def target(): + return True + + return PeriodicExecutor(interval=interval, min_interval=min_interval, target=target, name=name) + + +def _make_async(interval=30.0, min_interval=0.01, target=None, name="test"): + async def _default_target(): + return True + + if target is None: + target = _default_target + return AsyncPeriodicExecutor( + interval=interval, min_interval=min_interval, target=target, name=name + ) + + +# --------------------------------------------------------------------------- +# PeriodicExecutor (sync / threading) +# --------------------------------------------------------------------------- + + +class TestPeriodicExecutorRepr(unittest.TestCase): + def test_repr_contains_class_and_name(self): + ex = _make_sync(name="myexec") + r = repr(ex) + self.assertIn("PeriodicExecutor", r) + self.assertIn("myexec", r) + + +class TestPeriodicExecutorLifecycle(unittest.TestCase): + def test_open_starts_thread(self): + ex = _make_sync() + ex.open() + try: + self.assertIsNotNone(ex._thread) + # Give thread a moment to start. + time.sleep(0.05) + self.assertTrue(ex._thread.is_alive()) + finally: + ex.close() + ex.join(timeout=2) + + def test_multiple_open_calls_have_no_effect(self): + ex = _make_sync() + ex.open() + thread_id = id(ex._thread) + ex.open() + try: + self.assertEqual(thread_id, id(ex._thread)) + finally: + ex.close() + ex.join(timeout=2) + + def test_close_sets_stopped(self): + ex = _make_sync() + ex.open() + ex.close() + self.assertTrue(ex._stopped) + ex.join(timeout=2) + + def test_join_without_open_is_safe(self): + ex = _make_sync() + ex.join(timeout=0.01) # should not raise + + def test_wake_sets_event(self): + ex = _make_sync() + self.assertFalse(ex._event) + ex.wake() + self.assertTrue(ex._event) + + def test_update_interval(self): + ex = _make_sync(interval=30.0) + ex.update_interval(60) + self.assertEqual(ex._interval, 60) + + def test_skip_sleep(self): + ex = _make_sync() + self.assertFalse(ex._skip_sleep) + ex.skip_sleep() + self.assertTrue(ex._skip_sleep) + + +class TestPeriodicExecutorTarget(unittest.TestCase): + def test_target_returning_false_stops_executor(self): + ran = threading.Event() + + def target(): + ran.set() + return False # Signal stop. + + ex = _make_sync(target=target) + ex.open() + self.assertTrue(ran.wait(timeout=2), "target never ran") + ex.join(timeout=2) + self.assertTrue(ex._stopped) + + def test_target_exception_stops_executor(self): + ran = threading.Event() + captured_exc = [] + orig_excepthook = threading.excepthook + + def _capture_excepthook(args): + captured_exc.append(args.exc_value) + + threading.excepthook = _capture_excepthook + try: + + def target(): + ran.set() + raise RuntimeError("boom") + + ex = _make_sync(target=target) + ex.open() + self.assertTrue(ran.wait(timeout=2), "target never ran") + ex.join(timeout=2) + finally: + threading.excepthook = orig_excepthook + + self.assertTrue(ex._stopped) + self.assertEqual(len(captured_exc), 1) + self.assertIsInstance(captured_exc[0], RuntimeError) + + def test_skip_sleep_flag_skips_interval(self): + call_times = [] + + def target(): + call_times.append(time.monotonic()) + if len(call_times) >= 2: + return False + return True + + ex = _make_sync(interval=30.0, min_interval=0.001, target=target) + ex.skip_sleep() + ex.open() + ex.join(timeout=2) + # First call should have skipped the 30s sleep. + self.assertGreaterEqual(len(call_times), 2) + self.assertLess(call_times[1] - call_times[0], 5.0) + + def test_wake_causes_early_run(self): + call_count = [0] + woken = threading.Event() + + def target(): + call_count[0] += 1 + if call_count[0] == 1: + woken.set() + if call_count[0] >= 2: + return False + return True + + ex = _make_sync(interval=30.0, min_interval=0.01, target=target) + ex.open() + woken.wait(timeout=2) + ex.wake() + ex.join(timeout=3) + self.assertGreaterEqual(call_count[0], 2) + + +class TestShouldStop(unittest.TestCase): + def test_returns_false_when_not_stopped(self): + ex = _make_sync() + self.assertFalse(ex._should_stop()) + self.assertFalse(ex._thread_will_exit) + + def test_returns_true_and_sets_thread_will_exit(self): + ex = _make_sync() + ex._stopped = True + self.assertTrue(ex._should_stop()) + self.assertTrue(ex._thread_will_exit) + + +class TestPeriodicExecutorOpenAfterExit(unittest.TestCase): + def test_reopen_after_target_returns_false(self): + called = [0] + + def target(): + called[0] += 1 + return False + + ex = _make_sync(target=target) + ex.open() + ex.join(timeout=2) + self.assertTrue(ex._stopped) + # Re-open should start a new thread. + ex.open() + ex.join(timeout=2) + self.assertGreaterEqual(called[0], 2) + + +# --------------------------------------------------------------------------- +# Module-level: _register_executor, _on_executor_deleted, _shutdown_executors +# --------------------------------------------------------------------------- + + +class TestRegisterExecutor(unittest.TestCase): + def setUp(self): + self._orig = set(pe_module._EXECUTORS) + + def tearDown(self): + pe_module._EXECUTORS.clear() + pe_module._EXECUTORS.update(self._orig) + + def test_register_adds_weakref(self): + ex = _make_sync() + before = len(pe_module._EXECUTORS) + _register_executor(ex) + self.assertEqual(len(pe_module._EXECUTORS), before + 1) + # When executor is GC'd the ref is cleaned up. + ref_count_before = len(pe_module._EXECUTORS) + del ex + self.assertLessEqual(len(pe_module._EXECUTORS), ref_count_before) + + def test_shutdown_executors_stops_running_executors(self): + stopped = threading.Event() + + def target(): + stopped.wait(timeout=5) + return True + + ex = _make_sync(target=target) + ex.open() + time.sleep(0.05) + _register_executor(ex) + _shutdown_executors() + stopped.set() + ex.join(timeout=2) + self.assertTrue(ex._stopped) + + def test_shutdown_executors_safe_when_empty(self): + pe_module._EXECUTORS.clear() + _shutdown_executors() # Should not raise. + + +# --------------------------------------------------------------------------- +# AsyncPeriodicExecutor +# --------------------------------------------------------------------------- + + +class TestAsyncPeriodicExecutorRepr(unittest.TestCase): + def test_repr_contains_class_and_name(self): + ex = _make_async(name="asyncexec") + r = repr(ex) + self.assertIn("AsyncPeriodicExecutor", r) + self.assertIn("asyncexec", r) + + +class TestAsyncPeriodicExecutorBasic(unittest.TestCase): + def test_wake_sets_event(self): + ex = _make_async() + ex.wake() + self.assertTrue(ex._event) + + def test_update_interval(self): + ex = _make_async(interval=30.0) + ex.update_interval(60) + self.assertEqual(ex._interval, 60) + + def test_skip_sleep(self): + ex = _make_async() + ex.skip_sleep() + self.assertTrue(ex._skip_sleep) + + +class TestAsyncPeriodicExecutorLifecycle(unittest.TestCase): + def test_open_creates_task(self): + async def run(): + ex = _make_async() + ex.open() + self.assertIsNotNone(ex._task) + ex.close() + await ex.join(timeout=1) + + asyncio.run(run()) + + def test_close_cancels_task(self): + async def run(): + ex = _make_async() + ex.open() + ex.close() + await ex.join(timeout=1) + self.assertTrue(ex._stopped) + + asyncio.run(run()) + + def test_join_without_open_is_safe(self): + async def run(): + ex = _make_async() + await ex.join(timeout=0.01) # Should not raise. + + asyncio.run(run()) + + def test_multiple_open_calls_have_no_effect(self): + async def run(): + ex = _make_async() + ex.open() + task_id = id(ex._task) + ex.open() # Second open: same task still running. + self.assertEqual(task_id, id(ex._task)) + ex.close() + await ex.join(timeout=1) + + asyncio.run(run()) + + +class TestAsyncPeriodicExecutorTarget(unittest.TestCase): + def test_target_returning_false_stops_executor(self): + async def run(): + ran = asyncio.Event() + + async def target(): + ran.set() + return False + + ex = _make_async(target=target) + ex.open() + await asyncio.wait_for(ran.wait(), timeout=2) + await ex.join(timeout=2) + self.assertTrue(ex._stopped) + + asyncio.run(run()) + + def test_target_exception_stops_executor(self): + async def run(): + ran = asyncio.Event() + + async def target(): + ran.set() + raise RuntimeError("async boom") + + ex = _make_async(target=target) + ex.open() + await asyncio.wait_for(ran.wait(), timeout=2) + await ex.join(timeout=2) + self.assertTrue(ex._stopped) + + asyncio.run(run()) + + def test_skip_sleep_flag_skips_interval(self): + async def run(): + call_times = [] + + async def target(): + call_times.append(asyncio.get_event_loop().time()) + if len(call_times) >= 2: + return False + return True + + ex = _make_async(interval=30.0, min_interval=0.001, target=target) + ex.skip_sleep() + ex.open() + await ex.join(timeout=3) + self.assertGreaterEqual(len(call_times), 2) + self.assertLess(call_times[1] - call_times[0], 5.0) + + asyncio.run(run()) + + def test_open_after_target_returns_false_creates_new_task(self): + async def run(): + call_count = [0] + + async def target(): + call_count[0] += 1 + return False + + ex = _make_async(target=target) + ex.open() + await ex.join(timeout=2) + first_task = ex._task + ex.open() + await ex.join(timeout=2) + self.assertGreaterEqual(call_count[0], 2) + self.assertIsNot(ex._task, first_task) + + asyncio.run(run()) + + +if __name__ == "__main__": + unittest.main() From 290c99bc982b65f9188de23708520f4a6fc0342f Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Thu, 23 Apr 2026 18:54:49 -0400 Subject: [PATCH 02/11] PYTHON-5784 Use _run() helper for async tests to match test_network_layer.py style --- test/test_periodic_executor.py | 36 +++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index 00352479c3..7b9bdb2649 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -59,6 +59,10 @@ async def _default_target(): ) +def _run(coro): + return asyncio.run(coro) + + # --------------------------------------------------------------------------- # PeriodicExecutor (sync / threading) # --------------------------------------------------------------------------- @@ -309,34 +313,34 @@ def test_skip_sleep(self): class TestAsyncPeriodicExecutorLifecycle(unittest.TestCase): def test_open_creates_task(self): - async def run(): + async def _test(): ex = _make_async() ex.open() self.assertIsNotNone(ex._task) ex.close() await ex.join(timeout=1) - asyncio.run(run()) + _run(_test()) def test_close_cancels_task(self): - async def run(): + async def _test(): ex = _make_async() ex.open() ex.close() await ex.join(timeout=1) self.assertTrue(ex._stopped) - asyncio.run(run()) + _run(_test()) def test_join_without_open_is_safe(self): - async def run(): + async def _test(): ex = _make_async() await ex.join(timeout=0.01) # Should not raise. - asyncio.run(run()) + _run(_test()) def test_multiple_open_calls_have_no_effect(self): - async def run(): + async def _test(): ex = _make_async() ex.open() task_id = id(ex._task) @@ -345,12 +349,12 @@ async def run(): ex.close() await ex.join(timeout=1) - asyncio.run(run()) + _run(_test()) class TestAsyncPeriodicExecutorTarget(unittest.TestCase): def test_target_returning_false_stops_executor(self): - async def run(): + async def _test(): ran = asyncio.Event() async def target(): @@ -363,10 +367,10 @@ async def target(): await ex.join(timeout=2) self.assertTrue(ex._stopped) - asyncio.run(run()) + _run(_test()) def test_target_exception_stops_executor(self): - async def run(): + async def _test(): ran = asyncio.Event() async def target(): @@ -379,10 +383,10 @@ async def target(): await ex.join(timeout=2) self.assertTrue(ex._stopped) - asyncio.run(run()) + _run(_test()) def test_skip_sleep_flag_skips_interval(self): - async def run(): + async def _test(): call_times = [] async def target(): @@ -398,10 +402,10 @@ async def target(): self.assertGreaterEqual(len(call_times), 2) self.assertLess(call_times[1] - call_times[0], 5.0) - asyncio.run(run()) + _run(_test()) def test_open_after_target_returns_false_creates_new_task(self): - async def run(): + async def _test(): call_count = [0] async def target(): @@ -417,7 +421,7 @@ async def target(): self.assertGreaterEqual(call_count[0], 2) self.assertIsNot(ex._task, first_task) - asyncio.run(run()) + _run(_test()) if __name__ == "__main__": From aad418b53df255ee3e4766d59306649e532006e5 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Thu, 23 Apr 2026 18:56:45 -0400 Subject: [PATCH 03/11] PYTHON-5784 Spell out 'coro' parameter as 'coroutine' in _run helper --- test/test_periodic_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index 7b9bdb2649..b7fc2d2ab9 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -59,8 +59,8 @@ async def _default_target(): ) -def _run(coro): - return asyncio.run(coro) +def _run(coroutine): + return asyncio.run(coroutine) # --------------------------------------------------------------------------- From 613e815e99f72cbc8ea2c2581e39fb3bbd968227 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Thu, 23 Apr 2026 19:41:01 -0400 Subject: [PATCH 04/11] PYTHON-5784 Address Copilot feedback: remove unused import, fix deprecated API, gc-safe weakref assertion, non-blocking shutdown test, retrieve task exception --- test/test_periodic_executor.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index b7fc2d2ab9..636c492076 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -17,10 +17,10 @@ from __future__ import annotations import asyncio +import gc import sys import threading import time -import weakref sys.path[0:0] = [""] @@ -255,24 +255,18 @@ def test_register_adds_weakref(self): before = len(pe_module._EXECUTORS) _register_executor(ex) self.assertEqual(len(pe_module._EXECUTORS), before + 1) - # When executor is GC'd the ref is cleaned up. - ref_count_before = len(pe_module._EXECUTORS) + # Find the specific weakref we just registered. + ref = next(r for r in pe_module._EXECUTORS if r() is ex) del ex - self.assertLessEqual(len(pe_module._EXECUTORS), ref_count_before) + gc.collect() + # The weakref callback must have removed our specific ref. + self.assertNotIn(ref, pe_module._EXECUTORS) def test_shutdown_executors_stops_running_executors(self): - stopped = threading.Event() - - def target(): - stopped.wait(timeout=5) - return True - - ex = _make_sync(target=target) + ex = _make_sync(interval=30.0) ex.open() time.sleep(0.05) - _register_executor(ex) _shutdown_executors() - stopped.set() ex.join(timeout=2) self.assertTrue(ex._stopped) @@ -382,6 +376,9 @@ async def target(): await asyncio.wait_for(ran.wait(), timeout=2) await ex.join(timeout=2) self.assertTrue(ex._stopped) + # Retrieve the task exception to avoid "Task exception was never retrieved". + if ex._task is not None and ex._task.done(): + ex._task.exception() _run(_test()) @@ -390,7 +387,7 @@ async def _test(): call_times = [] async def target(): - call_times.append(asyncio.get_event_loop().time()) + call_times.append(asyncio.get_running_loop().time()) if len(call_times) >= 2: return False return True From b646e43dbc44da0c51d8decaba7c25577bed7409 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 24 Apr 2026 14:18:25 -0400 Subject: [PATCH 05/11] PYTHON-5784 Address Noah's review: move async tests to test/asynchronous/ via synchro - Create test/asynchronous/test_periodic_executor.py as the single source of truth for all periodic executor tests, using AsyncUnitTest with asyncSetUp/ asyncTearDown base class for executor lifecycle management - Register test_periodic_executor.py in synchro's converted_tests so the sync variant is auto-generated - Replace the manually-maintained test/test_periodic_executor.py with the synchro-generated equivalent, eliminating duplicated async/sync test code - Use _IS_SYNC branching for the small number of tests that differ between threading (PeriodicExecutor) and asyncio (AsyncPeriodicExecutor) behavior --- test/asynchronous/test_periodic_executor.py | 290 ++++++++++++ test/test_periodic_executor.py | 469 +++++++------------- tools/synchro.py | 1 + 3 files changed, 457 insertions(+), 303 deletions(-) create mode 100644 test/asynchronous/test_periodic_executor.py diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py new file mode 100644 index 0000000000..15f75b0f47 --- /dev/null +++ b/test/asynchronous/test_periodic_executor.py @@ -0,0 +1,290 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for periodic_executor.py.""" + +from __future__ import annotations + +import asyncio +import gc +import sys +import threading +import time + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncUnitTest, unittest + +import pymongo.periodic_executor as pe_module +from pymongo.periodic_executor import ( + AsyncPeriodicExecutor, + _register_executor, + _shutdown_executors, +) + +_IS_SYNC = False + + +def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"): + if target is None: + + async def target(): + return True + + return AsyncPeriodicExecutor( + interval=interval, min_interval=min_interval, target=target, name=name + ) + + +class _AsyncPeriodicExecutorTestBase(AsyncUnitTest): + async def asyncSetUp(self): + self.ex = _make_executor() + + async def asyncTearDown(self): + self.ex.close() + await self.ex.join(timeout=2) + + +class TestAsyncPeriodicExecutorRepr(AsyncUnitTest): + async def test_repr_contains_class_and_name(self): + ex = _make_executor(name="exec") + r = repr(ex) + self.assertIn("AsyncPeriodicExecutor", r) + self.assertIn("exec", r) + + +class TestAsyncPeriodicExecutorBasic(_AsyncPeriodicExecutorTestBase): + async def test_wake_sets_event(self): + self.assertFalse(self.ex._event) + self.ex.wake() + self.assertTrue(self.ex._event) + + async def test_update_interval(self): + self.ex.update_interval(60) + self.assertEqual(self.ex._interval, 60) + + async def test_skip_sleep(self): + self.assertFalse(self.ex._skip_sleep) + self.ex.skip_sleep() + self.assertTrue(self.ex._skip_sleep) + + +class TestAsyncPeriodicExecutorLifecycle(_AsyncPeriodicExecutorTestBase): + async def test_open_starts_worker(self): + self.ex.open() + if _IS_SYNC: + self.assertIsNotNone(self.ex._thread) + self.assertTrue(self.ex._thread.is_alive()) + else: + self.assertIsNotNone(self.ex._task) + + async def test_close_sets_stopped(self): + self.ex.open() + self.ex.close() + self.assertTrue(self.ex._stopped) + await self.ex.join(timeout=1) + + async def test_join_without_open_is_safe(self): + await self.ex.join(timeout=0.01) + + async def test_multiple_open_calls_have_no_effect(self): + self.ex.open() + if _IS_SYNC: + worker_id = id(self.ex._thread) + else: + worker_id = id(self.ex._task) + self.ex.open() + if _IS_SYNC: + self.assertEqual(worker_id, id(self.ex._thread)) + else: + self.assertEqual(worker_id, id(self.ex._task)) + + +class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): + async def test_target_returning_false_stops_executor(self): + if _IS_SYNC: + ran = threading.Event() + else: + ran = asyncio.Event() + + async def target(): + ran.set() + return False + + self.ex = _make_executor(target=target) + self.ex.open() + if _IS_SYNC: + self.assertTrue(ran.wait(timeout=2), "target never ran") + else: + await asyncio.wait_for(ran.wait(), timeout=2) + await self.ex.join(timeout=2) + self.assertTrue(self.ex._stopped) + + async def test_target_exception_stops_executor(self): + if _IS_SYNC: + ran = threading.Event() + captured_exc: list = [] + orig_excepthook = threading.excepthook + + def _capture_excepthook(args): + captured_exc.append(args.exc_value) + + threading.excepthook = _capture_excepthook + try: + + def target(): + ran.set() + raise RuntimeError("boom") + + self.ex = _make_executor(target=target) + self.ex.open() + self.assertTrue(ran.wait(timeout=2), "target never ran") + self.ex.join(timeout=2) + finally: + threading.excepthook = orig_excepthook + self.assertTrue(self.ex._stopped) + self.assertEqual(len(captured_exc), 1) + self.assertIsInstance(captured_exc[0], RuntimeError) + else: + ran = asyncio.Event() + + async def target(): + ran.set() + raise RuntimeError("async boom") + + self.ex = _make_executor(target=target) + self.ex.open() + await asyncio.wait_for(ran.wait(), timeout=2) + await self.ex.join(timeout=2) + self.assertTrue(self.ex._stopped) + if self.ex._task is not None and self.ex._task.done(): + self.ex._task.exception() + + async def test_skip_sleep_flag_skips_interval(self): + call_times = [] + + async def target(): + call_times.append(time.monotonic() if _IS_SYNC else asyncio.get_running_loop().time()) + if len(call_times) >= 2: + return False + return True + + self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target) + self.ex.skip_sleep() + self.ex.open() + await self.ex.join(timeout=3) + self.assertGreaterEqual(len(call_times), 2) + self.assertLess(call_times[1] - call_times[0], 5.0) + + async def test_wake_causes_early_run(self): + call_count = [0] + if _IS_SYNC: + woken = threading.Event() + else: + woken = asyncio.Event() + + async def target(): + call_count[0] += 1 + if call_count[0] == 1: + woken.set() + if call_count[0] >= 2: + return False + return True + + self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target) + self.ex.open() + if _IS_SYNC: + woken.wait(timeout=2) + else: + await asyncio.wait_for(woken.wait(), timeout=2) + self.ex.wake() + await self.ex.join(timeout=3) + self.assertGreaterEqual(call_count[0], 2) + + async def test_open_after_target_returns_false(self): + called = [0] + + async def target(): + called[0] += 1 + return False + + self.ex = _make_executor(target=target) + self.ex.open() + await self.ex.join(timeout=2) + self.assertTrue(self.ex._stopped) + if not _IS_SYNC: + first_task = self.ex._task + self.ex.open() + await self.ex.join(timeout=2) + self.assertGreaterEqual(called[0], 2) + if not _IS_SYNC: + self.assertIsNot(self.ex._task, first_task) + + +class TestShouldStop(AsyncUnitTest): + if _IS_SYNC: + + def test_returns_false_when_not_stopped(self): + ex = _make_executor() + self.assertFalse(ex._should_stop()) + self.assertFalse(ex._thread_will_exit) + + def test_returns_true_and_sets_thread_will_exit(self): + ex = _make_executor() + ex._stopped = True + self.assertTrue(ex._should_stop()) + self.assertTrue(ex._thread_will_exit) + + +class TestRegisterExecutor(AsyncUnitTest): + if _IS_SYNC: + + def setUp(self): + self._orig = set(pe_module._EXECUTORS) + + def tearDown(self): + pe_module._EXECUTORS.clear() + pe_module._EXECUTORS.update(self._orig) + + def test_register_adds_weakref(self): + ex = _make_executor() + before = len(pe_module._EXECUTORS) + _register_executor(ex) + self.assertEqual(len(pe_module._EXECUTORS), before + 1) + ref = next(r for r in pe_module._EXECUTORS if r() is ex) + del ex + gc.collect() + self.assertNotIn(ref, pe_module._EXECUTORS) + + def test_shutdown_executors_stops_running_executors(self): + ran = threading.Event() + + def target(): + ran.set() + return True + + ex = _make_executor(target=target) + ex.open() + self.assertTrue(ran.wait(timeout=2), "target never ran") + _shutdown_executors() + ex.join(timeout=2) + self.assertTrue(ex._stopped) + + def test_shutdown_executors_safe_when_empty(self): + pe_module._EXECUTORS.clear() + _shutdown_executors() + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index 636c492076..f07ded1fde 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -24,22 +24,19 @@ sys.path[0:0] = [""] -from test import unittest +from test import UnitTest, unittest import pymongo.periodic_executor as pe_module from pymongo.periodic_executor import ( - AsyncPeriodicExecutor, PeriodicExecutor, _register_executor, _shutdown_executors, ) -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- +_IS_SYNC = True -def _make_sync(interval=30.0, min_interval=0.01, target=None, name="test"): +def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"): if target is None: def target(): @@ -48,147 +45,152 @@ def target(): return PeriodicExecutor(interval=interval, min_interval=min_interval, target=target, name=name) -def _make_async(interval=30.0, min_interval=0.01, target=None, name="test"): - async def _default_target(): - return True - - if target is None: - target = _default_target - return AsyncPeriodicExecutor( - interval=interval, min_interval=min_interval, target=target, name=name - ) - - -def _run(coroutine): - return asyncio.run(coroutine) - +class _PeriodicExecutorTestBase(UnitTest): + def setUp(self): + self.ex = _make_executor() -# --------------------------------------------------------------------------- -# PeriodicExecutor (sync / threading) -# --------------------------------------------------------------------------- + def tearDown(self): + self.ex.close() + self.ex.join(timeout=2) -class TestPeriodicExecutorRepr(unittest.TestCase): +class TestPeriodicExecutorRepr(UnitTest): def test_repr_contains_class_and_name(self): - ex = _make_sync(name="myexec") + ex = _make_executor(name="exec") r = repr(ex) self.assertIn("PeriodicExecutor", r) - self.assertIn("myexec", r) - - -class TestPeriodicExecutorLifecycle(unittest.TestCase): - def test_open_starts_thread(self): - ex = _make_sync() - ex.open() - try: - self.assertIsNotNone(ex._thread) - # Give thread a moment to start. - time.sleep(0.05) - self.assertTrue(ex._thread.is_alive()) - finally: - ex.close() - ex.join(timeout=2) - - def test_multiple_open_calls_have_no_effect(self): - ex = _make_sync() - ex.open() - thread_id = id(ex._thread) - ex.open() - try: - self.assertEqual(thread_id, id(ex._thread)) - finally: - ex.close() - ex.join(timeout=2) + self.assertIn("exec", r) - def test_close_sets_stopped(self): - ex = _make_sync() - ex.open() - ex.close() - self.assertTrue(ex._stopped) - ex.join(timeout=2) - - def test_join_without_open_is_safe(self): - ex = _make_sync() - ex.join(timeout=0.01) # should not raise +class TestPeriodicExecutorBasic(_PeriodicExecutorTestBase): def test_wake_sets_event(self): - ex = _make_sync() - self.assertFalse(ex._event) - ex.wake() - self.assertTrue(ex._event) + self.assertFalse(self.ex._event) + self.ex.wake() + self.assertTrue(self.ex._event) def test_update_interval(self): - ex = _make_sync(interval=30.0) - ex.update_interval(60) - self.assertEqual(ex._interval, 60) + self.ex.update_interval(60) + self.assertEqual(self.ex._interval, 60) def test_skip_sleep(self): - ex = _make_sync() - self.assertFalse(ex._skip_sleep) - ex.skip_sleep() - self.assertTrue(ex._skip_sleep) + self.assertFalse(self.ex._skip_sleep) + self.ex.skip_sleep() + self.assertTrue(self.ex._skip_sleep) + +class TestPeriodicExecutorLifecycle(_PeriodicExecutorTestBase): + def test_open_starts_worker(self): + self.ex.open() + if _IS_SYNC: + self.assertIsNotNone(self.ex._thread) + self.assertTrue(self.ex._thread.is_alive()) + else: + self.assertIsNotNone(self.ex._task) -class TestPeriodicExecutorTarget(unittest.TestCase): + def test_close_sets_stopped(self): + self.ex.open() + self.ex.close() + self.assertTrue(self.ex._stopped) + self.ex.join(timeout=1) + + def test_join_without_open_is_safe(self): + self.ex.join(timeout=0.01) + + def test_multiple_open_calls_have_no_effect(self): + self.ex.open() + if _IS_SYNC: + worker_id = id(self.ex._thread) + else: + worker_id = id(self.ex._task) + self.ex.open() + if _IS_SYNC: + self.assertEqual(worker_id, id(self.ex._thread)) + else: + self.assertEqual(worker_id, id(self.ex._task)) + + +class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): def test_target_returning_false_stops_executor(self): - ran = threading.Event() + if _IS_SYNC: + ran = threading.Event() + else: + ran = asyncio.Event() def target(): ran.set() - return False # Signal stop. + return False - ex = _make_sync(target=target) - ex.open() - self.assertTrue(ran.wait(timeout=2), "target never ran") - ex.join(timeout=2) - self.assertTrue(ex._stopped) + self.ex = _make_executor(target=target) + self.ex.open() + if _IS_SYNC: + self.assertTrue(ran.wait(timeout=2), "target never ran") + else: + asyncio.wait_for(ran.wait(), timeout=2) + self.ex.join(timeout=2) + self.assertTrue(self.ex._stopped) def test_target_exception_stops_executor(self): - ran = threading.Event() - captured_exc = [] - orig_excepthook = threading.excepthook - - def _capture_excepthook(args): - captured_exc.append(args.exc_value) - - threading.excepthook = _capture_excepthook - try: + if _IS_SYNC: + ran = threading.Event() + captured_exc: list = [] + orig_excepthook = threading.excepthook + + def _capture_excepthook(args): + captured_exc.append(args.exc_value) + + threading.excepthook = _capture_excepthook + try: + + def target(): + ran.set() + raise RuntimeError("boom") + + self.ex = _make_executor(target=target) + self.ex.open() + self.assertTrue(ran.wait(timeout=2), "target never ran") + self.ex.join(timeout=2) + finally: + threading.excepthook = orig_excepthook + self.assertTrue(self.ex._stopped) + self.assertEqual(len(captured_exc), 1) + self.assertIsInstance(captured_exc[0], RuntimeError) + else: + ran = asyncio.Event() def target(): ran.set() - raise RuntimeError("boom") - - ex = _make_sync(target=target) - ex.open() - self.assertTrue(ran.wait(timeout=2), "target never ran") - ex.join(timeout=2) - finally: - threading.excepthook = orig_excepthook + raise RuntimeError("async boom") - self.assertTrue(ex._stopped) - self.assertEqual(len(captured_exc), 1) - self.assertIsInstance(captured_exc[0], RuntimeError) + self.ex = _make_executor(target=target) + self.ex.open() + asyncio.wait_for(ran.wait(), timeout=2) + self.ex.join(timeout=2) + self.assertTrue(self.ex._stopped) + if self.ex._task is not None and self.ex._task.done(): + self.ex._task.exception() def test_skip_sleep_flag_skips_interval(self): call_times = [] def target(): - call_times.append(time.monotonic()) + call_times.append(time.monotonic() if _IS_SYNC else asyncio.get_running_loop().time()) if len(call_times) >= 2: return False return True - ex = _make_sync(interval=30.0, min_interval=0.001, target=target) - ex.skip_sleep() - ex.open() - ex.join(timeout=2) - # First call should have skipped the 30s sleep. + self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target) + self.ex.skip_sleep() + self.ex.open() + self.ex.join(timeout=3) self.assertGreaterEqual(len(call_times), 2) self.assertLess(call_times[1] - call_times[0], 5.0) def test_wake_causes_early_run(self): call_count = [0] - woken = threading.Event() + if _IS_SYNC: + woken = threading.Event() + else: + woken = asyncio.Event() def target(): call_count[0] += 1 @@ -198,227 +200,88 @@ def target(): return False return True - ex = _make_sync(interval=30.0, min_interval=0.01, target=target) - ex.open() - woken.wait(timeout=2) - ex.wake() - ex.join(timeout=3) + self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target) + self.ex.open() + if _IS_SYNC: + woken.wait(timeout=2) + else: + asyncio.wait_for(woken.wait(), timeout=2) + self.ex.wake() + self.ex.join(timeout=3) self.assertGreaterEqual(call_count[0], 2) - -class TestShouldStop(unittest.TestCase): - def test_returns_false_when_not_stopped(self): - ex = _make_sync() - self.assertFalse(ex._should_stop()) - self.assertFalse(ex._thread_will_exit) - - def test_returns_true_and_sets_thread_will_exit(self): - ex = _make_sync() - ex._stopped = True - self.assertTrue(ex._should_stop()) - self.assertTrue(ex._thread_will_exit) - - -class TestPeriodicExecutorOpenAfterExit(unittest.TestCase): - def test_reopen_after_target_returns_false(self): + def test_open_after_target_returns_false(self): called = [0] def target(): called[0] += 1 return False - ex = _make_sync(target=target) - ex.open() - ex.join(timeout=2) - self.assertTrue(ex._stopped) - # Re-open should start a new thread. - ex.open() - ex.join(timeout=2) + self.ex = _make_executor(target=target) + self.ex.open() + self.ex.join(timeout=2) + self.assertTrue(self.ex._stopped) + if not _IS_SYNC: + first_task = self.ex._task + self.ex.open() + self.ex.join(timeout=2) self.assertGreaterEqual(called[0], 2) + if not _IS_SYNC: + self.assertIsNot(self.ex._task, first_task) -# --------------------------------------------------------------------------- -# Module-level: _register_executor, _on_executor_deleted, _shutdown_executors -# --------------------------------------------------------------------------- +class TestShouldStop(UnitTest): + if _IS_SYNC: + def test_returns_false_when_not_stopped(self): + ex = _make_executor() + self.assertFalse(ex._should_stop()) + self.assertFalse(ex._thread_will_exit) -class TestRegisterExecutor(unittest.TestCase): - def setUp(self): - self._orig = set(pe_module._EXECUTORS) + def test_returns_true_and_sets_thread_will_exit(self): + ex = _make_executor() + ex._stopped = True + self.assertTrue(ex._should_stop()) + self.assertTrue(ex._thread_will_exit) - def tearDown(self): - pe_module._EXECUTORS.clear() - pe_module._EXECUTORS.update(self._orig) - - def test_register_adds_weakref(self): - ex = _make_sync() - before = len(pe_module._EXECUTORS) - _register_executor(ex) - self.assertEqual(len(pe_module._EXECUTORS), before + 1) - # Find the specific weakref we just registered. - ref = next(r for r in pe_module._EXECUTORS if r() is ex) - del ex - gc.collect() - # The weakref callback must have removed our specific ref. - self.assertNotIn(ref, pe_module._EXECUTORS) - - def test_shutdown_executors_stops_running_executors(self): - ex = _make_sync(interval=30.0) - ex.open() - time.sleep(0.05) - _shutdown_executors() - ex.join(timeout=2) - self.assertTrue(ex._stopped) - - def test_shutdown_executors_safe_when_empty(self): - pe_module._EXECUTORS.clear() - _shutdown_executors() # Should not raise. - - -# --------------------------------------------------------------------------- -# AsyncPeriodicExecutor -# --------------------------------------------------------------------------- - - -class TestAsyncPeriodicExecutorRepr(unittest.TestCase): - def test_repr_contains_class_and_name(self): - ex = _make_async(name="asyncexec") - r = repr(ex) - self.assertIn("AsyncPeriodicExecutor", r) - self.assertIn("asyncexec", r) +class TestRegisterExecutor(UnitTest): + if _IS_SYNC: -class TestAsyncPeriodicExecutorBasic(unittest.TestCase): - def test_wake_sets_event(self): - ex = _make_async() - ex.wake() - self.assertTrue(ex._event) + def setUp(self): + self._orig = set(pe_module._EXECUTORS) - def test_update_interval(self): - ex = _make_async(interval=30.0) - ex.update_interval(60) - self.assertEqual(ex._interval, 60) - - def test_skip_sleep(self): - ex = _make_async() - ex.skip_sleep() - self.assertTrue(ex._skip_sleep) + def tearDown(self): + pe_module._EXECUTORS.clear() + pe_module._EXECUTORS.update(self._orig) + def test_register_adds_weakref(self): + ex = _make_executor() + before = len(pe_module._EXECUTORS) + _register_executor(ex) + self.assertEqual(len(pe_module._EXECUTORS), before + 1) + ref = next(r for r in pe_module._EXECUTORS if r() is ex) + del ex + gc.collect() + self.assertNotIn(ref, pe_module._EXECUTORS) -class TestAsyncPeriodicExecutorLifecycle(unittest.TestCase): - def test_open_creates_task(self): - async def _test(): - ex = _make_async() - ex.open() - self.assertIsNotNone(ex._task) - ex.close() - await ex.join(timeout=1) - - _run(_test()) + def test_shutdown_executors_stops_running_executors(self): + ran = threading.Event() - def test_close_cancels_task(self): - async def _test(): - ex = _make_async() - ex.open() - ex.close() - await ex.join(timeout=1) - self.assertTrue(ex._stopped) - - _run(_test()) - - def test_join_without_open_is_safe(self): - async def _test(): - ex = _make_async() - await ex.join(timeout=0.01) # Should not raise. - - _run(_test()) - - def test_multiple_open_calls_have_no_effect(self): - async def _test(): - ex = _make_async() - ex.open() - task_id = id(ex._task) - ex.open() # Second open: same task still running. - self.assertEqual(task_id, id(ex._task)) - ex.close() - await ex.join(timeout=1) - - _run(_test()) - - -class TestAsyncPeriodicExecutorTarget(unittest.TestCase): - def test_target_returning_false_stops_executor(self): - async def _test(): - ran = asyncio.Event() - - async def target(): - ran.set() - return False - - ex = _make_async(target=target) - ex.open() - await asyncio.wait_for(ran.wait(), timeout=2) - await ex.join(timeout=2) - self.assertTrue(ex._stopped) - - _run(_test()) - - def test_target_exception_stops_executor(self): - async def _test(): - ran = asyncio.Event() - - async def target(): + def target(): ran.set() - raise RuntimeError("async boom") - - ex = _make_async(target=target) - ex.open() - await asyncio.wait_for(ran.wait(), timeout=2) - await ex.join(timeout=2) - self.assertTrue(ex._stopped) - # Retrieve the task exception to avoid "Task exception was never retrieved". - if ex._task is not None and ex._task.done(): - ex._task.exception() - - _run(_test()) - - def test_skip_sleep_flag_skips_interval(self): - async def _test(): - call_times = [] - - async def target(): - call_times.append(asyncio.get_running_loop().time()) - if len(call_times) >= 2: - return False return True - ex = _make_async(interval=30.0, min_interval=0.001, target=target) - ex.skip_sleep() + ex = _make_executor(target=target) ex.open() - await ex.join(timeout=3) - self.assertGreaterEqual(len(call_times), 2) - self.assertLess(call_times[1] - call_times[0], 5.0) - - _run(_test()) - - def test_open_after_target_returns_false_creates_new_task(self): - async def _test(): - call_count = [0] - - async def target(): - call_count[0] += 1 - return False - - ex = _make_async(target=target) - ex.open() - await ex.join(timeout=2) - first_task = ex._task - ex.open() - await ex.join(timeout=2) - self.assertGreaterEqual(call_count[0], 2) - self.assertIsNot(ex._task, first_task) + self.assertTrue(ran.wait(timeout=2), "target never ran") + _shutdown_executors() + ex.join(timeout=2) + self.assertTrue(ex._stopped) - _run(_test()) + def test_shutdown_executors_safe_when_empty(self): + pe_module._EXECUTORS.clear() + _shutdown_executors() if __name__ == "__main__": diff --git a/tools/synchro.py b/tools/synchro.py index ed794c5963..5b5267b857 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -279,6 +279,7 @@ def async_only_test(f: str) -> bool: "unified_format.py", "utils_selection_tests.py", "utils.py", + "test_periodic_executor.py", ] From c1273fb26a3dc7047c0009687d867d463107b8dd Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 24 Apr 2026 19:29:05 -0400 Subject: [PATCH 06/11] PYTHON-5784 Replace pe_module alias with periodic_executor --- test/asynchronous/test_periodic_executor.py | 18 +++++++++--------- test/test_periodic_executor.py | 18 +++++++++--------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py index 15f75b0f47..aeeacaca28 100644 --- a/test/asynchronous/test_periodic_executor.py +++ b/test/asynchronous/test_periodic_executor.py @@ -26,7 +26,7 @@ from test.asynchronous import AsyncUnitTest, unittest -import pymongo.periodic_executor as pe_module +from pymongo import periodic_executor from pymongo.periodic_executor import ( AsyncPeriodicExecutor, _register_executor, @@ -251,21 +251,21 @@ class TestRegisterExecutor(AsyncUnitTest): if _IS_SYNC: def setUp(self): - self._orig = set(pe_module._EXECUTORS) + self._orig = set(periodic_executor._EXECUTORS) def tearDown(self): - pe_module._EXECUTORS.clear() - pe_module._EXECUTORS.update(self._orig) + periodic_executor._EXECUTORS.clear() + periodic_executor._EXECUTORS.update(self._orig) def test_register_adds_weakref(self): ex = _make_executor() - before = len(pe_module._EXECUTORS) + before = len(periodic_executor._EXECUTORS) _register_executor(ex) - self.assertEqual(len(pe_module._EXECUTORS), before + 1) - ref = next(r for r in pe_module._EXECUTORS if r() is ex) + self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) + ref = next(r for r in periodic_executor._EXECUTORS if r() is ex) del ex gc.collect() - self.assertNotIn(ref, pe_module._EXECUTORS) + self.assertNotIn(ref, periodic_executor._EXECUTORS) def test_shutdown_executors_stops_running_executors(self): ran = threading.Event() @@ -282,7 +282,7 @@ def target(): self.assertTrue(ex._stopped) def test_shutdown_executors_safe_when_empty(self): - pe_module._EXECUTORS.clear() + periodic_executor._EXECUTORS.clear() _shutdown_executors() diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index f07ded1fde..1eed0e022e 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -26,7 +26,7 @@ from test import UnitTest, unittest -import pymongo.periodic_executor as pe_module +from pymongo import periodic_executor from pymongo.periodic_executor import ( PeriodicExecutor, _register_executor, @@ -249,21 +249,21 @@ class TestRegisterExecutor(UnitTest): if _IS_SYNC: def setUp(self): - self._orig = set(pe_module._EXECUTORS) + self._orig = set(periodic_executor._EXECUTORS) def tearDown(self): - pe_module._EXECUTORS.clear() - pe_module._EXECUTORS.update(self._orig) + periodic_executor._EXECUTORS.clear() + periodic_executor._EXECUTORS.update(self._orig) def test_register_adds_weakref(self): ex = _make_executor() - before = len(pe_module._EXECUTORS) + before = len(periodic_executor._EXECUTORS) _register_executor(ex) - self.assertEqual(len(pe_module._EXECUTORS), before + 1) - ref = next(r for r in pe_module._EXECUTORS if r() is ex) + self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) + ref = next(r for r in periodic_executor._EXECUTORS if r() is ex) del ex gc.collect() - self.assertNotIn(ref, pe_module._EXECUTORS) + self.assertNotIn(ref, periodic_executor._EXECUTORS) def test_shutdown_executors_stops_running_executors(self): ran = threading.Event() @@ -280,7 +280,7 @@ def target(): self.assertTrue(ex._stopped) def test_shutdown_executors_safe_when_empty(self): - pe_module._EXECUTORS.clear() + periodic_executor._EXECUTORS.clear() _shutdown_executors() From 9fb237873bec332374b0b9532d02c2d2e0d2792b Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 24 Apr 2026 19:37:00 -0400 Subject: [PATCH 07/11] PYTHON-5784 Remove underscore from base test class name --- test/asynchronous/test_periodic_executor.py | 8 ++++---- test/test_periodic_executor.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py index aeeacaca28..6f0f435907 100644 --- a/test/asynchronous/test_periodic_executor.py +++ b/test/asynchronous/test_periodic_executor.py @@ -47,7 +47,7 @@ async def target(): ) -class _AsyncPeriodicExecutorTestBase(AsyncUnitTest): +class AsyncPeriodicExecutorTestBase(AsyncUnitTest): async def asyncSetUp(self): self.ex = _make_executor() @@ -64,7 +64,7 @@ async def test_repr_contains_class_and_name(self): self.assertIn("exec", r) -class TestAsyncPeriodicExecutorBasic(_AsyncPeriodicExecutorTestBase): +class TestAsyncPeriodicExecutorBasic(AsyncPeriodicExecutorTestBase): async def test_wake_sets_event(self): self.assertFalse(self.ex._event) self.ex.wake() @@ -80,7 +80,7 @@ async def test_skip_sleep(self): self.assertTrue(self.ex._skip_sleep) -class TestAsyncPeriodicExecutorLifecycle(_AsyncPeriodicExecutorTestBase): +class TestAsyncPeriodicExecutorLifecycle(AsyncPeriodicExecutorTestBase): async def test_open_starts_worker(self): self.ex.open() if _IS_SYNC: @@ -111,7 +111,7 @@ async def test_multiple_open_calls_have_no_effect(self): self.assertEqual(worker_id, id(self.ex._task)) -class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase): +class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): async def test_target_returning_false_stops_executor(self): if _IS_SYNC: ran = threading.Event() diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index 1eed0e022e..cd4c45e032 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -45,7 +45,7 @@ def target(): return PeriodicExecutor(interval=interval, min_interval=min_interval, target=target, name=name) -class _PeriodicExecutorTestBase(UnitTest): +class PeriodicExecutorTestBase(UnitTest): def setUp(self): self.ex = _make_executor() @@ -62,7 +62,7 @@ def test_repr_contains_class_and_name(self): self.assertIn("exec", r) -class TestPeriodicExecutorBasic(_PeriodicExecutorTestBase): +class TestPeriodicExecutorBasic(PeriodicExecutorTestBase): def test_wake_sets_event(self): self.assertFalse(self.ex._event) self.ex.wake() @@ -78,7 +78,7 @@ def test_skip_sleep(self): self.assertTrue(self.ex._skip_sleep) -class TestPeriodicExecutorLifecycle(_PeriodicExecutorTestBase): +class TestPeriodicExecutorLifecycle(PeriodicExecutorTestBase): def test_open_starts_worker(self): self.ex.open() if _IS_SYNC: @@ -109,7 +109,7 @@ def test_multiple_open_calls_have_no_effect(self): self.assertEqual(worker_id, id(self.ex._task)) -class TestPeriodicExecutorTarget(_PeriodicExecutorTestBase): +class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): def test_target_returning_false_stops_executor(self): if _IS_SYNC: ran = threading.Event() From 2fa2988072cc9b0fc87b9a7ddff56cab3fc1c8fc Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 24 Apr 2026 19:40:53 -0400 Subject: [PATCH 08/11] PYTHON-5784 Alpha sort test_periodic_executor.py in synchro list --- tools/synchro.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/synchro.py b/tools/synchro.py index 5b5267b857..51ec7eb243 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -252,6 +252,7 @@ def async_only_test(f: str) -> bool: "test_monitoring.py", "test_mongos_load_balancing.py", "test_on_demand_csfle.py", + "test_periodic_executor.py", "test_pooling.py", "test_raw_bson.py", "test_read_concern.py", @@ -279,7 +280,6 @@ def async_only_test(f: str) -> bool: "unified_format.py", "utils_selection_tests.py", "utils.py", - "test_periodic_executor.py", ] From 4e90c0044ca6fedfa86b5e2657fd98d088e0d6fa Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 24 Apr 2026 19:42:42 -0400 Subject: [PATCH 09/11] PYTHON-5784 Rename ex to executor in tests --- test/asynchronous/test_periodic_executor.py | 146 ++++++++++---------- test/test_periodic_executor.py | 146 ++++++++++---------- 2 files changed, 146 insertions(+), 146 deletions(-) diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py index 6f0f435907..eab0de5702 100644 --- a/test/asynchronous/test_periodic_executor.py +++ b/test/asynchronous/test_periodic_executor.py @@ -49,66 +49,66 @@ async def target(): class AsyncPeriodicExecutorTestBase(AsyncUnitTest): async def asyncSetUp(self): - self.ex = _make_executor() + self.executor = _make_executor() async def asyncTearDown(self): - self.ex.close() - await self.ex.join(timeout=2) + self.executor.close() + await self.executor.join(timeout=2) class TestAsyncPeriodicExecutorRepr(AsyncUnitTest): async def test_repr_contains_class_and_name(self): - ex = _make_executor(name="exec") - r = repr(ex) + executor = _make_executor(name="exec") + r = repr(executor) self.assertIn("AsyncPeriodicExecutor", r) self.assertIn("exec", r) class TestAsyncPeriodicExecutorBasic(AsyncPeriodicExecutorTestBase): async def test_wake_sets_event(self): - self.assertFalse(self.ex._event) - self.ex.wake() - self.assertTrue(self.ex._event) + self.assertFalse(self.executor._event) + self.executor.wake() + self.assertTrue(self.executor._event) async def test_update_interval(self): - self.ex.update_interval(60) - self.assertEqual(self.ex._interval, 60) + self.executor.update_interval(60) + self.assertEqual(self.executor._interval, 60) async def test_skip_sleep(self): - self.assertFalse(self.ex._skip_sleep) - self.ex.skip_sleep() - self.assertTrue(self.ex._skip_sleep) + self.assertFalse(self.executor._skip_sleep) + self.executor.skip_sleep() + self.assertTrue(self.executor._skip_sleep) class TestAsyncPeriodicExecutorLifecycle(AsyncPeriodicExecutorTestBase): async def test_open_starts_worker(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - self.assertIsNotNone(self.ex._thread) - self.assertTrue(self.ex._thread.is_alive()) + self.assertIsNotNone(self.executor._thread) + self.assertTrue(self.executor._thread.is_alive()) else: - self.assertIsNotNone(self.ex._task) + self.assertIsNotNone(self.executor._task) async def test_close_sets_stopped(self): - self.ex.open() - self.ex.close() - self.assertTrue(self.ex._stopped) - await self.ex.join(timeout=1) + self.executor.open() + self.executor.close() + self.assertTrue(self.executor._stopped) + await self.executor.join(timeout=1) async def test_join_without_open_is_safe(self): - await self.ex.join(timeout=0.01) + await self.executor.join(timeout=0.01) async def test_multiple_open_calls_have_no_effect(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - worker_id = id(self.ex._thread) + worker_id = id(self.executor._thread) else: - worker_id = id(self.ex._task) - self.ex.open() + worker_id = id(self.executor._task) + self.executor.open() if _IS_SYNC: - self.assertEqual(worker_id, id(self.ex._thread)) + self.assertEqual(worker_id, id(self.executor._thread)) else: - self.assertEqual(worker_id, id(self.ex._task)) + self.assertEqual(worker_id, id(self.executor._task)) class TestAsyncPeriodicExecutorTarget(AsyncPeriodicExecutorTestBase): @@ -122,14 +122,14 @@ async def target(): ran.set() return False - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() if _IS_SYNC: self.assertTrue(ran.wait(timeout=2), "target never ran") else: await asyncio.wait_for(ran.wait(), timeout=2) - await self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + await self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) async def test_target_exception_stops_executor(self): if _IS_SYNC: @@ -147,13 +147,13 @@ def target(): ran.set() raise RuntimeError("boom") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") - self.ex.join(timeout=2) + self.executor.join(timeout=2) finally: threading.excepthook = orig_excepthook - self.assertTrue(self.ex._stopped) + self.assertTrue(self.executor._stopped) self.assertEqual(len(captured_exc), 1) self.assertIsInstance(captured_exc[0], RuntimeError) else: @@ -163,13 +163,13 @@ async def target(): ran.set() raise RuntimeError("async boom") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() await asyncio.wait_for(ran.wait(), timeout=2) - await self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) - if self.ex._task is not None and self.ex._task.done(): - self.ex._task.exception() + await self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) + if self.executor._task is not None and self.executor._task.done(): + self.executor._task.exception() async def test_skip_sleep_flag_skips_interval(self): call_times = [] @@ -180,10 +180,10 @@ async def target(): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target) - self.ex.skip_sleep() - self.ex.open() - await self.ex.join(timeout=3) + self.executor = _make_executor(interval=30.0, min_interval=0.001, target=target) + self.executor.skip_sleep() + self.executor.open() + await self.executor.join(timeout=3) self.assertGreaterEqual(len(call_times), 2) self.assertLess(call_times[1] - call_times[0], 5.0) @@ -202,14 +202,14 @@ async def target(): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target) - self.ex.open() + self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target) + self.executor.open() if _IS_SYNC: woken.wait(timeout=2) else: await asyncio.wait_for(woken.wait(), timeout=2) - self.ex.wake() - await self.ex.join(timeout=3) + self.executor.wake() + await self.executor.join(timeout=3) self.assertGreaterEqual(call_count[0], 2) async def test_open_after_target_returns_false(self): @@ -219,32 +219,32 @@ async def target(): called[0] += 1 return False - self.ex = _make_executor(target=target) - self.ex.open() - await self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + self.executor = _make_executor(target=target) + self.executor.open() + await self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) if not _IS_SYNC: - first_task = self.ex._task - self.ex.open() - await self.ex.join(timeout=2) + first_task = self.executor._task + self.executor.open() + await self.executor.join(timeout=2) self.assertGreaterEqual(called[0], 2) if not _IS_SYNC: - self.assertIsNot(self.ex._task, first_task) + self.assertIsNot(self.executor._task, first_task) class TestShouldStop(AsyncUnitTest): if _IS_SYNC: def test_returns_false_when_not_stopped(self): - ex = _make_executor() - self.assertFalse(ex._should_stop()) - self.assertFalse(ex._thread_will_exit) + executor = _make_executor() + self.assertFalse(executor._should_stop()) + self.assertFalse(executor._thread_will_exit) def test_returns_true_and_sets_thread_will_exit(self): - ex = _make_executor() - ex._stopped = True - self.assertTrue(ex._should_stop()) - self.assertTrue(ex._thread_will_exit) + executor = _make_executor() + executor._stopped = True + self.assertTrue(executor._should_stop()) + self.assertTrue(executor._thread_will_exit) class TestRegisterExecutor(AsyncUnitTest): @@ -258,12 +258,12 @@ def tearDown(self): periodic_executor._EXECUTORS.update(self._orig) def test_register_adds_weakref(self): - ex = _make_executor() + executor = _make_executor() before = len(periodic_executor._EXECUTORS) - _register_executor(ex) + _register_executor(executor) self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) - ref = next(r for r in periodic_executor._EXECUTORS if r() is ex) - del ex + ref = next(r for r in periodic_executor._EXECUTORS if r() is executor) + del executor gc.collect() self.assertNotIn(ref, periodic_executor._EXECUTORS) @@ -274,12 +274,12 @@ def target(): ran.set() return True - ex = _make_executor(target=target) - ex.open() + executor = _make_executor(target=target) + executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") _shutdown_executors() - ex.join(timeout=2) - self.assertTrue(ex._stopped) + executor.join(timeout=2) + self.assertTrue(executor._stopped) def test_shutdown_executors_safe_when_empty(self): periodic_executor._EXECUTORS.clear() diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index cd4c45e032..63143b1eeb 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -47,66 +47,66 @@ def target(): class PeriodicExecutorTestBase(UnitTest): def setUp(self): - self.ex = _make_executor() + self.executor = _make_executor() def tearDown(self): - self.ex.close() - self.ex.join(timeout=2) + self.executor.close() + self.executor.join(timeout=2) class TestPeriodicExecutorRepr(UnitTest): def test_repr_contains_class_and_name(self): - ex = _make_executor(name="exec") - r = repr(ex) + executor = _make_executor(name="exec") + r = repr(executor) self.assertIn("PeriodicExecutor", r) self.assertIn("exec", r) class TestPeriodicExecutorBasic(PeriodicExecutorTestBase): def test_wake_sets_event(self): - self.assertFalse(self.ex._event) - self.ex.wake() - self.assertTrue(self.ex._event) + self.assertFalse(self.executor._event) + self.executor.wake() + self.assertTrue(self.executor._event) def test_update_interval(self): - self.ex.update_interval(60) - self.assertEqual(self.ex._interval, 60) + self.executor.update_interval(60) + self.assertEqual(self.executor._interval, 60) def test_skip_sleep(self): - self.assertFalse(self.ex._skip_sleep) - self.ex.skip_sleep() - self.assertTrue(self.ex._skip_sleep) + self.assertFalse(self.executor._skip_sleep) + self.executor.skip_sleep() + self.assertTrue(self.executor._skip_sleep) class TestPeriodicExecutorLifecycle(PeriodicExecutorTestBase): def test_open_starts_worker(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - self.assertIsNotNone(self.ex._thread) - self.assertTrue(self.ex._thread.is_alive()) + self.assertIsNotNone(self.executor._thread) + self.assertTrue(self.executor._thread.is_alive()) else: - self.assertIsNotNone(self.ex._task) + self.assertIsNotNone(self.executor._task) def test_close_sets_stopped(self): - self.ex.open() - self.ex.close() - self.assertTrue(self.ex._stopped) - self.ex.join(timeout=1) + self.executor.open() + self.executor.close() + self.assertTrue(self.executor._stopped) + self.executor.join(timeout=1) def test_join_without_open_is_safe(self): - self.ex.join(timeout=0.01) + self.executor.join(timeout=0.01) def test_multiple_open_calls_have_no_effect(self): - self.ex.open() + self.executor.open() if _IS_SYNC: - worker_id = id(self.ex._thread) + worker_id = id(self.executor._thread) else: - worker_id = id(self.ex._task) - self.ex.open() + worker_id = id(self.executor._task) + self.executor.open() if _IS_SYNC: - self.assertEqual(worker_id, id(self.ex._thread)) + self.assertEqual(worker_id, id(self.executor._thread)) else: - self.assertEqual(worker_id, id(self.ex._task)) + self.assertEqual(worker_id, id(self.executor._task)) class TestPeriodicExecutorTarget(PeriodicExecutorTestBase): @@ -120,14 +120,14 @@ def target(): ran.set() return False - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() if _IS_SYNC: self.assertTrue(ran.wait(timeout=2), "target never ran") else: asyncio.wait_for(ran.wait(), timeout=2) - self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) def test_target_exception_stops_executor(self): if _IS_SYNC: @@ -145,13 +145,13 @@ def target(): ran.set() raise RuntimeError("boom") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") - self.ex.join(timeout=2) + self.executor.join(timeout=2) finally: threading.excepthook = orig_excepthook - self.assertTrue(self.ex._stopped) + self.assertTrue(self.executor._stopped) self.assertEqual(len(captured_exc), 1) self.assertIsInstance(captured_exc[0], RuntimeError) else: @@ -161,13 +161,13 @@ def target(): ran.set() raise RuntimeError("async boom") - self.ex = _make_executor(target=target) - self.ex.open() + self.executor = _make_executor(target=target) + self.executor.open() asyncio.wait_for(ran.wait(), timeout=2) - self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) - if self.ex._task is not None and self.ex._task.done(): - self.ex._task.exception() + self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) + if self.executor._task is not None and self.executor._task.done(): + self.executor._task.exception() def test_skip_sleep_flag_skips_interval(self): call_times = [] @@ -178,10 +178,10 @@ def target(): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target) - self.ex.skip_sleep() - self.ex.open() - self.ex.join(timeout=3) + self.executor = _make_executor(interval=30.0, min_interval=0.001, target=target) + self.executor.skip_sleep() + self.executor.open() + self.executor.join(timeout=3) self.assertGreaterEqual(len(call_times), 2) self.assertLess(call_times[1] - call_times[0], 5.0) @@ -200,14 +200,14 @@ def target(): return False return True - self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target) - self.ex.open() + self.executor = _make_executor(interval=30.0, min_interval=0.01, target=target) + self.executor.open() if _IS_SYNC: woken.wait(timeout=2) else: asyncio.wait_for(woken.wait(), timeout=2) - self.ex.wake() - self.ex.join(timeout=3) + self.executor.wake() + self.executor.join(timeout=3) self.assertGreaterEqual(call_count[0], 2) def test_open_after_target_returns_false(self): @@ -217,32 +217,32 @@ def target(): called[0] += 1 return False - self.ex = _make_executor(target=target) - self.ex.open() - self.ex.join(timeout=2) - self.assertTrue(self.ex._stopped) + self.executor = _make_executor(target=target) + self.executor.open() + self.executor.join(timeout=2) + self.assertTrue(self.executor._stopped) if not _IS_SYNC: - first_task = self.ex._task - self.ex.open() - self.ex.join(timeout=2) + first_task = self.executor._task + self.executor.open() + self.executor.join(timeout=2) self.assertGreaterEqual(called[0], 2) if not _IS_SYNC: - self.assertIsNot(self.ex._task, first_task) + self.assertIsNot(self.executor._task, first_task) class TestShouldStop(UnitTest): if _IS_SYNC: def test_returns_false_when_not_stopped(self): - ex = _make_executor() - self.assertFalse(ex._should_stop()) - self.assertFalse(ex._thread_will_exit) + executor = _make_executor() + self.assertFalse(executor._should_stop()) + self.assertFalse(executor._thread_will_exit) def test_returns_true_and_sets_thread_will_exit(self): - ex = _make_executor() - ex._stopped = True - self.assertTrue(ex._should_stop()) - self.assertTrue(ex._thread_will_exit) + executor = _make_executor() + executor._stopped = True + self.assertTrue(executor._should_stop()) + self.assertTrue(executor._thread_will_exit) class TestRegisterExecutor(UnitTest): @@ -256,12 +256,12 @@ def tearDown(self): periodic_executor._EXECUTORS.update(self._orig) def test_register_adds_weakref(self): - ex = _make_executor() + executor = _make_executor() before = len(periodic_executor._EXECUTORS) - _register_executor(ex) + _register_executor(executor) self.assertEqual(len(periodic_executor._EXECUTORS), before + 1) - ref = next(r for r in periodic_executor._EXECUTORS if r() is ex) - del ex + ref = next(r for r in periodic_executor._EXECUTORS if r() is executor) + del executor gc.collect() self.assertNotIn(ref, periodic_executor._EXECUTORS) @@ -272,12 +272,12 @@ def target(): ran.set() return True - ex = _make_executor(target=target) - ex.open() + executor = _make_executor(target=target) + executor.open() self.assertTrue(ran.wait(timeout=2), "target never ran") _shutdown_executors() - ex.join(timeout=2) - self.assertTrue(ex._stopped) + executor.join(timeout=2) + self.assertTrue(executor._stopped) def test_shutdown_executors_safe_when_empty(self): periodic_executor._EXECUTORS.clear() From d5fe8b488cacd832cb3c8ea1bc7070cd848a377a Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 24 Apr 2026 19:46:26 -0400 Subject: [PATCH 10/11] PYTHON-5784 Rename r to executor_repr in repr test --- test/asynchronous/test_periodic_executor.py | 6 +++--- test/test_periodic_executor.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py index eab0de5702..bb1479fa13 100644 --- a/test/asynchronous/test_periodic_executor.py +++ b/test/asynchronous/test_periodic_executor.py @@ -59,9 +59,9 @@ async def asyncTearDown(self): class TestAsyncPeriodicExecutorRepr(AsyncUnitTest): async def test_repr_contains_class_and_name(self): executor = _make_executor(name="exec") - r = repr(executor) - self.assertIn("AsyncPeriodicExecutor", r) - self.assertIn("exec", r) + executor_repr = repr(executor) + self.assertIn("AsyncPeriodicExecutor", executor_repr) + self.assertIn("exec", executor_repr) class TestAsyncPeriodicExecutorBasic(AsyncPeriodicExecutorTestBase): diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index 63143b1eeb..c1240d9950 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -57,9 +57,9 @@ def tearDown(self): class TestPeriodicExecutorRepr(UnitTest): def test_repr_contains_class_and_name(self): executor = _make_executor(name="exec") - r = repr(executor) - self.assertIn("PeriodicExecutor", r) - self.assertIn("exec", r) + executor_repr = repr(executor) + self.assertIn("PeriodicExecutor", executor_repr) + self.assertIn("exec", executor_repr) class TestPeriodicExecutorBasic(PeriodicExecutorTestBase): From 8ffcf6fbb959fe87a9ced47d093b67f8046d732a Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 24 Apr 2026 19:47:44 -0400 Subject: [PATCH 11/11] PYTHON-5784 Replace boom with error in exception messages --- test/asynchronous/test_periodic_executor.py | 4 ++-- test/test_periodic_executor.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_periodic_executor.py b/test/asynchronous/test_periodic_executor.py index bb1479fa13..571a193409 100644 --- a/test/asynchronous/test_periodic_executor.py +++ b/test/asynchronous/test_periodic_executor.py @@ -145,7 +145,7 @@ def _capture_excepthook(args): def target(): ran.set() - raise RuntimeError("boom") + raise RuntimeError("error") self.executor = _make_executor(target=target) self.executor.open() @@ -161,7 +161,7 @@ def target(): async def target(): ran.set() - raise RuntimeError("async boom") + raise RuntimeError("error") self.executor = _make_executor(target=target) self.executor.open() diff --git a/test/test_periodic_executor.py b/test/test_periodic_executor.py index c1240d9950..deba99975f 100644 --- a/test/test_periodic_executor.py +++ b/test/test_periodic_executor.py @@ -143,7 +143,7 @@ def _capture_excepthook(args): def target(): ran.set() - raise RuntimeError("boom") + raise RuntimeError("error") self.executor = _make_executor(target=target) self.executor.open() @@ -159,7 +159,7 @@ def target(): def target(): ran.set() - raise RuntimeError("async boom") + raise RuntimeError("error") self.executor = _make_executor(target=target) self.executor.open()