Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ async def metamorph(
async def reboot(
self,
*,
event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, # noqa: ARG002
event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT,
custom_after_sleep: timedelta | None = None,
) -> None:
"""Internally reboot this Actor.
Expand Down Expand Up @@ -1204,11 +1204,18 @@ async def reboot(
(self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)

results = await asyncio.gather(
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
*[listener(EventMigratingData()) for listener in migrating_listeners],
return_exceptions=True,
)
try:
results = await asyncio.wait_for(
asyncio.gather(
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
*[listener(EventMigratingData()) for listener in migrating_listeners],
return_exceptions=True,
),
timeout=event_listeners_timeout.total_seconds() if event_listeners_timeout else None,
)
except asyncio.TimeoutError:
self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot')
results = []

for result in results:
if isinstance(result, Exception):
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/actor/test_actor_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,33 @@ async def successful_migrating_listener(*_args: object) -> None:

# The reboot API call was still made.
assert len(apify_client_async_patcher.calls['run']['reboot']) == 1


async def test_reboot_proceeds_when_event_listener_exceeds_timeout(
apify_client_async_patcher: ApifyClientAsyncPatcher,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that a hanging pre-reboot event listener does not block reboot beyond the timeout."""
apify_client_async_patcher.patch('run', 'reboot', return_value=None)

async def hanging_listener(*_args: object) -> None:
await asyncio.sleep(60)

async with Actor:
Actor.configuration.is_at_home = True
Actor.configuration.actor_run_id = 'some-run-id'

listeners_map = Actor.event_manager._listeners_to_wrappers
listeners_map[Event.PERSIST_STATE] = {hanging_listener: [hanging_listener]}

with caplog.at_level(logging.WARNING):
await Actor.reboot(
event_listeners_timeout=timedelta(milliseconds=50),
custom_after_sleep=timedelta(milliseconds=1),
)

# The timeout was honored and logged.
assert any('Pre-reboot event listeners did not finish within timeout' in r.message for r in caplog.records)

# The reboot API call proceeded despite the hanging listener.
assert len(apify_client_async_patcher.calls['run']['reboot']) == 1
Loading