Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/apify/storage_clients/_apify/_alias_resolving.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from apify_client import ApifyClientAsync

from ._utils import hash_api_base_url_and_token
from apify._configuration import Configuration

if TYPE_CHECKING:
from collections.abc import Callable
Expand All @@ -24,6 +23,8 @@
RequestQueueCollectionClientAsync,
)

from apify._configuration import Configuration

logger = getLogger(__name__)


Expand Down Expand Up @@ -128,6 +129,9 @@ class AliasResolver:
_alias_map: ClassVar[dict[str, str]] = {}
"""Map containing pre-existing alias storages and their ids. Global for all instances."""

_alias_map_loaded: ClassVar[bool] = False
"""Tracks whether `_alias_map` was fetched from the default KVS — an empty map is a valid loaded state."""

_alias_init_lock: Lock | None = None
"""Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances."""

Expand Down Expand Up @@ -181,11 +185,12 @@ async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]:
Returns:
Map of aliases and storage ids.
"""
if not cls._alias_map and Configuration.get_global_configuration().is_at_home:
if not cls._alias_map_loaded and configuration.is_at_home:
default_kvs_client = await cls._get_default_kvs_client(configuration)

record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY)
cls._alias_map = record.get('value', {}) if record else {}
cls._alias_map_loaded = True

return cls._alias_map

Expand Down Expand Up @@ -215,7 +220,7 @@ async def store_mapping(self, storage_id: str) -> None:
alias_map = await self._get_alias_map(self._configuration)
alias_map[self._storage_key] = storage_id

if not Configuration.get_global_configuration().is_at_home:
if not self._configuration.is_at_home:
logging.getLogger(__name__).debug(
'_AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.'
)
Expand Down
1 change: 1 addition & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def _prepare_test_env() -> None:

# Reset the AliasResolver class state.
AliasResolver._alias_map = {}
AliasResolver._alias_map_loaded = False
AliasResolver._alias_init_lock = None

# Verify that the test environment was set up correctly.
Expand Down
36 changes: 36 additions & 0 deletions tests/unit/storage_clients/test_alias_resolver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from unittest.mock import AsyncMock, patch

from apify._configuration import Configuration
from apify.storage_clients._apify._alias_resolving import AliasResolver

Expand Down Expand Up @@ -78,6 +80,40 @@ async def test_get_alias_map_returns_in_memory_map() -> None:
assert result == {}


async def test_get_alias_map_loads_from_kvs_only_once_when_empty() -> None:
"""An empty KVS response must not trigger repeat fetches on subsequent calls."""
config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id')

fake_kvs_client = AsyncMock()
fake_kvs_client.get_record = AsyncMock(return_value=None)

with patch.object(AliasResolver, '_get_default_kvs_client', return_value=fake_kvs_client):
await AliasResolver._get_alias_map(config)
await AliasResolver._get_alias_map(config)
await AliasResolver._get_alias_map(config)

assert fake_kvs_client.get_record.await_count == 1
assert AliasResolver._alias_map == {}


async def test_store_mapping_uses_injected_configuration_is_at_home() -> None:
"""`store_mapping` gates on the injected configuration's `is_at_home`, not the global one."""
# Global `is_at_home` defaults to False; injected config says True — the KVS write must still happen.
config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id')
resolver = AliasResolver(storage_type='Dataset', alias='test-alias', configuration=config)

fake_kvs_client = AsyncMock()
fake_kvs_client.get_record = AsyncMock(return_value=None)
fake_kvs_client.set_record = AsyncMock(return_value=None)
fake_kvs_client.get = AsyncMock(return_value={'id': 'default-kvs-id'})

with patch.object(AliasResolver, '_get_default_kvs_client', return_value=fake_kvs_client):
await resolver.store_mapping(storage_id='new-id-789')

fake_kvs_client.set_record.assert_awaited_once()
assert AliasResolver._alias_map[resolver._storage_key] == 'new-id-789'


async def test_configuration_storages_alias_resolving() -> None:
"""Test that `AliasResolver` correctly resolves ids of storages registered in Configuration."""

Expand Down
Loading