Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/apify_client/_resource_clients/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,24 @@ def iterate_items(
"""
cache_size = 1000

should_finish = False
read_items = 0
# Server-side filters (skip_empty, skip_hidden, clean) drop items after the [offset, offset+limit)
# window is scanned, so a short page does not imply the dataset is exhausted. Track the scanned
# window explicitly and stop only when the server returns no items at all.
scanned_items = 0

# We can't rely on DatasetItemsPage.total because that is updated with a delay,
# so if you try to read the dataset items right after a run finishes, you could miss some.
# Instead, we just read and read until we reach the limit, or until there are no more items to read.
while not should_finish:
while True:
effective_limit = cache_size
if limit is not None:
if read_items == limit:
break
effective_limit = min(cache_size, limit - read_items)

current_items_page = self.list_items(
offset=offset + read_items,
offset=offset + scanned_items,
limit=effective_limit,
clean=clean,
desc=desc,
Expand All @@ -298,10 +301,11 @@ def iterate_items(
yield from current_items_page.items

current_page_item_count = len(current_items_page.items)
read_items += current_page_item_count
if current_page_item_count == 0:
break

if current_page_item_count < cache_size:
should_finish = True
read_items += current_page_item_count
scanned_items += effective_limit

def download_items(
self,
Expand Down Expand Up @@ -945,21 +949,24 @@ async def iterate_items(
"""
cache_size = 1000

should_finish = False
read_items = 0
# Server-side filters (skip_empty, skip_hidden, clean) drop items after the [offset, offset+limit)
# window is scanned, so a short page does not imply the dataset is exhausted. Track the scanned
# window explicitly and stop only when the server returns no items at all.
scanned_items = 0

# We can't rely on DatasetItemsPage.total because that is updated with a delay,
# so if you try to read the dataset items right after a run finishes, you could miss some.
# Instead, we just read and read until we reach the limit, or until there are no more items to read.
while not should_finish:
while True:
effective_limit = cache_size
if limit is not None:
if read_items == limit:
break
effective_limit = min(cache_size, limit - read_items)

current_items_page = await self.list_items(
offset=offset + read_items,
offset=offset + scanned_items,
limit=effective_limit,
clean=clean,
desc=desc,
Expand All @@ -976,10 +983,11 @@ async def iterate_items(
yield item

current_page_item_count = len(current_items_page.items)
read_items += current_page_item_count
if current_page_item_count == 0:
break

if current_page_item_count < cache_size:
should_finish = True
read_items += current_page_item_count
scanned_items += effective_limit

async def get_items_as_bytes(
self,
Expand Down
58 changes: 57 additions & 1 deletion tests/unit/test_dataset_list_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TYPE_CHECKING

import pytest
from werkzeug import Response
from werkzeug import Request, Response

from apify_client import ApifyClient, ApifyClientAsync

Expand Down Expand Up @@ -87,3 +87,59 @@ async def test_list_items_desc_true_async(httpserver: HTTPServer, desc_header_va
result = await client.dataset(DATASET_ID).list_items()

assert result.desc is True


def _make_filtered_pagination_handler(*, dataset_size: int) -> Callable:
"""Simulate an API that scans `[offset, offset+limit)` then post-filters out odd-indexed items.

Each returned page therefore contains ~half of the scanned window — a situation that
only `skip_empty`, `skip_hidden`, or `clean=True` can produce server-side.
"""

def handler(request: Request) -> Response:
offset = int(request.args.get('offset', '0'))
limit = int(request.args.get('limit', '1000'))
scanned_end = min(offset + limit, dataset_size)
items = [{'i': i} for i in range(offset, scanned_end) if i % 2 == 0]
return Response(
status=200,
headers={
'x-apify-pagination-total': str(dataset_size),
'x-apify-pagination-offset': str(offset),
'x-apify-pagination-count': str(len(items)),
'x-apify-pagination-limit': str(limit),
'x-apify-pagination-desc': 'false',
'content-type': 'application/json',
},
response=json.dumps(items),
)

return handler


def test_iterate_items_with_filter_does_not_terminate_early_sync(httpserver: HTTPServer) -> None:
dataset_size = 2500
httpserver.expect_request(ITEMS_PATH).respond_with_handler(
_make_filtered_pagination_handler(dataset_size=dataset_size),
)
api_url = httpserver.url_for('/').removesuffix('/')

client = ApifyClient(token='test-token', api_url=api_url)
items = list(client.dataset(DATASET_ID).iterate_items(skip_empty=True))

expected = [{'i': i} for i in range(dataset_size) if i % 2 == 0]
assert items == expected


async def test_iterate_items_with_filter_does_not_terminate_early_async(httpserver: HTTPServer) -> None:
dataset_size = 2500
httpserver.expect_request(ITEMS_PATH).respond_with_handler(
_make_filtered_pagination_handler(dataset_size=dataset_size),
)
api_url = httpserver.url_for('/').removesuffix('/')

client = ApifyClientAsync(token='test-token', api_url=api_url)
items = [item async for item in client.dataset(DATASET_ID).iterate_items(skip_empty=True)]

expected = [{'i': i} for i in range(dataset_size) if i % 2 == 0]
assert items == expected