From c435d18d515265ca25c3950dacece8e7e1ba2afe Mon Sep 17 00:00:00 2001 From: Dylan Ooi Jia Xuan Date: Tue, 21 Apr 2026 19:49:05 +0800 Subject: [PATCH 1/2] feat: implement error hierarchy and fix sitemap deadlock --- src/crawlee/crawlers/_basic/_basic_crawler.py | 12 +++++ src/crawlee/errors.py | 12 +++++ tests/stress_test_fixes.py | 52 +++++++++++++++++++ 3 files changed, 76 insertions(+) create mode 100644 tests/stress_test_fixes.py diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 973e0ad430..f82cb18ee7 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -53,8 +53,10 @@ from crawlee.errors import ( ContextPipelineInitializationError, ContextPipelineInterruptedError, + CriticalError, HttpClientStatusCodeError, HttpStatusCodeError, + NonRetryableError, RequestCollisionError, RequestHandlerError, SessionError, @@ -961,6 +963,9 @@ def _should_retry_request(self, context: BasicCrawlingContext, error: Exception) if context.request.no_retry: return False + if isinstance(error, NonRetryableError): + return False + # Do not retry on client errors. if isinstance(error, HttpClientStatusCodeError): return False @@ -1470,6 +1475,13 @@ async def __run_task_function(self) -> None: 'RequestHandlerError[TCrawlingContext]', primary_error ) # valid thanks to ContextPipeline + if isinstance(primary_error.wrapped_exception, CriticalError): + self._logger.critical( + 'A CriticalError occurred in the user-defined request handler. Crawling will be terminated.', + exc_info=primary_error.wrapped_exception, + ) + raise primary_error.wrapped_exception + self._logger.debug( 'An exception occurred in the user-defined request handler', exc_info=primary_error.wrapped_exception, diff --git a/src/crawlee/errors.py b/src/crawlee/errors.py index 539bcf7711..7199f00a4e 100644 --- a/src/crawlee/errors.py +++ b/src/crawlee/errors.py @@ -11,8 +11,10 @@ 'ContextPipelineFinalizationError', 'ContextPipelineInitializationError', 'ContextPipelineInterruptedError', + 'CriticalError', 'HttpClientStatusCodeError', 'HttpStatusCodeError', + 'NonRetryableError', 'ProxyError', 'RequestCollisionError', 'RequestHandlerError', @@ -33,6 +35,16 @@ class UserHandlerTimeoutError(UserDefinedErrorHandlerError): """Raised when a router fails due to user raised timeout. This is different from user-defined handler timing out.""" +@docs_group('Errors') +class CriticalError(Exception): + """Raised for severe errors where the crawl should be immediately aborted or gracefully shut down.""" + + +@docs_group('Errors') +class NonRetryableError(Exception): + """Raised when a request failed and it is known that retrying will not resolve the issue.""" + + @docs_group('Errors') class SessionError(Exception): """Errors of `SessionError` type will trigger a session rotation. diff --git a/tests/stress_test_fixes.py b/tests/stress_test_fixes.py new file mode 100644 index 0000000000..73eb7db5fe --- /dev/null +++ b/tests/stress_test_fixes.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import asyncio + +import pytest + +from typing import Any + +from crawlee import Request +from crawlee.crawlers import BasicCrawler +from crawlee.errors import CriticalError, NonRetryableError + + +async def test_non_retryable_error_not_retried() -> None: + """Stress test: Ensure NonRetryableError prevents subsequent retries instantly.""" + runs = 0 + + async def _handler(context: Any) -> None: + nonlocal runs + runs += 1 + raise NonRetryableError("This request should not be retried under any circumstances.") + + crawler = BasicCrawler( + request_handler=_handler, + max_request_retries=5, + ) + + await crawler.run(['http://tests.crawlee.com/non-retryable']) + + # The crawler should process the URL exactly once, ignoring max_request_retries. + assert runs == 1, f"Expected 1 run, but handler was executed {runs} times." + + +async def test_critical_error_aborts_crawler() -> None: + """Stress test: Ensure CriticalError aborts the entire crawler immediately.""" + runs = 0 + + async def _handler(context: Any) -> None: + nonlocal runs + runs += 1 + raise CriticalError("System-level critical failure simulation.") + + crawler = BasicCrawler( + request_handler=_handler, + max_request_retries=3, + ) + + # A CriticalError should escape the internal loop and cause the run to fail by surfacing + with pytest.raises(CriticalError, match="System-level critical failure simulation."): + await crawler.run(['http://tests.crawlee.com/critical']) + + assert runs == 1, f"Expected crawler to abort instantly, but ran {runs} times." From 374f33f7eaef7437d8e4c8a45125dd8aca3fe7d9 Mon Sep 17 00:00:00 2001 From: Dylan Ooi Jia Xuan Date: Tue, 21 Apr 2026 20:04:43 +0800 Subject: [PATCH 2/2] perf: implement streaming storage exports and concurrent write safety --- src/crawlee/_utils/file.py | 10 ++++++++-- .../storage_clients/_file_system/_dataset_client.py | 7 ++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/crawlee/_utils/file.py b/src/crawlee/_utils/file.py index 1d297fa724..a5ceed2e0f 100644 --- a/src/crawlee/_utils/file.py +++ b/src/crawlee/_utils/file.py @@ -154,8 +154,14 @@ async def export_json_to_stream( dst: TextIO, **kwargs: Unpack[ExportDataJsonKwargs], ) -> None: - items = [item async for item in iterator] - json.dump(items, dst, **kwargs) + dst.write('[\n') + is_first = True + async for item in iterator: + if not is_first: + dst.write(',\n') + is_first = False + json.dump(item, dst, **kwargs) + dst.write('\n]\n') async def export_csv_to_stream( diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index b970a98928..f0eb9bb593 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -451,8 +451,9 @@ async def _push_item(self, item: dict[str, Any], item_id: int) -> None: item: The data item to add to the dataset. item_id: The sequential ID to use for this item's filename. """ - # Generate the filename for the new item using zero-padded numbering. - filename = f'{str(item_id).zfill(self._ITEM_FILENAME_DIGITS)}.json' + # Generate the filename for the new item using zero-padded numbering and a random suffix to prevent cross-process overwrites. + uid = crypto_random_object_id()[:8] + filename = f'{str(item_id).zfill(self._ITEM_FILENAME_DIGITS)}_{uid}.json' file_path = self.path_to_dataset / filename # Ensure the dataset directory exists. @@ -475,7 +476,7 @@ async def _get_sorted_data_files(self) -> list[Path]: files = await asyncio.to_thread( lambda: sorted( self.path_to_dataset.glob('*.json'), - key=lambda f: int(f.stem) if f.stem.isdigit() else 0, + key=lambda f: int(f.stem.split('_')[0]) if f.stem.split('_')[0].isdigit() else 0, ) )