diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 6d04e9ae..015d78d6 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -1167,7 +1167,7 @@ async def metamorph( async def reboot( self, *, - event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, # noqa: ARG002 + event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, custom_after_sleep: timedelta | None = None, ) -> None: """Internally reboot this Actor. @@ -1204,11 +1204,18 @@ async def reboot( (self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 ) - results = await asyncio.gather( - *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], - *[listener(EventMigratingData()) for listener in migrating_listeners], - return_exceptions=True, - ) + try: + results = await asyncio.wait_for( + asyncio.gather( + *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], + *[listener(EventMigratingData()) for listener in migrating_listeners], + return_exceptions=True, + ), + timeout=event_listeners_timeout.total_seconds() if event_listeners_timeout else None, + ) + except asyncio.TimeoutError: + self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot') + results = [] for result in results: if isinstance(result, Exception): diff --git a/tests/unit/actor/test_actor_helpers.py b/tests/unit/actor/test_actor_helpers.py index 2cf84c51..8067e6be 100644 --- a/tests/unit/actor/test_actor_helpers.py +++ b/tests/unit/actor/test_actor_helpers.py @@ -399,3 +399,33 @@ async def successful_migrating_listener(*_args: object) -> None: # The reboot API call was still made. assert len(apify_client_async_patcher.calls['run']['reboot']) == 1 + + +async def test_reboot_proceeds_when_event_listener_exceeds_timeout( + apify_client_async_patcher: ApifyClientAsyncPatcher, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that a hanging pre-reboot event listener does not block reboot beyond the timeout.""" + apify_client_async_patcher.patch('run', 'reboot', return_value=None) + + async def hanging_listener(*_args: object) -> None: + await asyncio.sleep(60) + + async with Actor: + Actor.configuration.is_at_home = True + Actor.configuration.actor_run_id = 'some-run-id' + + listeners_map = Actor.event_manager._listeners_to_wrappers + listeners_map[Event.PERSIST_STATE] = {hanging_listener: [hanging_listener]} + + with caplog.at_level(logging.WARNING): + await Actor.reboot( + event_listeners_timeout=timedelta(milliseconds=50), + custom_after_sleep=timedelta(milliseconds=1), + ) + + # The timeout was honored and logged. + assert any('Pre-reboot event listeners did not finish within timeout' in r.message for r in caplog.records) + + # The reboot API call proceeded despite the hanging listener. + assert len(apify_client_async_patcher.calls['run']['reboot']) == 1