feat: add parallel file copy for ratio task execution#475
Conversation
- Add dynamic concurrency calculation based on CPU and memory resources - Refactor handle_ratio_relations to use asyncio.Semaphore for parallel execution - Use hard links for file copying (fallback to symlink/copy2) - Add resource_utils.py for system resource detection - Add 8 new config parameters for concurrency tuning (all-flash defaults) - Add psutil dependency for accurate resource detection Performance improvement: concurrent file copying significantly reduces execution time for ratio tasks with large files, especially on all-flash storage systems.
There was a problem hiding this comment.
Pull request overview
This PR improves ratio task execution performance by introducing resource-aware concurrency and parallelizing file copy operations, with a new utility module for CPU/memory detection and new settings to tune concurrency behavior.
Changes:
- Add resource-aware concurrency calculation utilities (CPU/memory-based) and log system resource info during ratio task execution.
- Refactor ratio task file copying to run in parallel using
asyncio.Semaphore, with hard-link-first copy strategy and batched DB inserts. - Add
psutildependency and new configuration knobs for concurrency tuning.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| runtime/datamate-python/pyproject.toml | Adds psutil dependency to support accurate CPU/memory detection. |
| runtime/datamate-python/app/module/shared/util/resource_utils.py | New module to compute optimal concurrency from CPU/memory and expose system resource info. |
| runtime/datamate-python/app/module/ratio/service/ratio_task.py | Adds parallel copy implementation for ratio tasks and uses dynamic concurrency calculation. |
| runtime/datamate-python/app/core/config.py | Introduces new settings for ratio-copy concurrency tuning (min/max/factors/memory and dynamic vs fixed). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| dst_prefix = f"/dataset/{target_ds.id}/" | ||
| file_name = RatioTaskService.get_new_file_name(dst_prefix, existing_paths, file) | ||
| new_path = dst_prefix + file_name | ||
|
|
||
| file_record = DatasetFiles( | ||
| dataset_id=target_ds.id, | ||
| file_name=file_name, | ||
| file_path=new_path, | ||
| file_type=file.file_type, | ||
| file_size=file.file_size, | ||
| check_sum=file.check_sum, | ||
| tags=file.tags, | ||
| tags_updated_at=datetime.now(), | ||
| dataset_filemetadata=file.dataset_filemetadata, | ||
| status="ACTIVE", | ||
| ) | ||
|
|
||
| all_copy_tasks.append((file_record, file.file_path, new_path)) | ||
| existing_paths.add(new_path) | ||
| source_paths.add(file.file_path) | ||
|
|
||
| if not all_copy_tasks: | ||
| return 0, 0 | ||
|
|
||
| dst_dir = f"/dataset/{target_ds.id}/" | ||
| await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True) |
There was a problem hiding this comment.
New code hard-codes the dataset path prefix as "/dataset" when constructing dst_prefix/dst_dir. The codebase already has settings.dm_file_path_prefix for this purpose; hard-coding here makes the ratio task ignore that configuration and increases drift if the prefix ever changes. Consider building paths from settings.dm_file_path_prefix (or a shared helper) instead of embedding the literal string in the new parallel implementation.
| def calculate_optimal_concurrent( | ||
| min_concurrent: int = 8, | ||
| max_concurrent: int = 128, | ||
| cpu_factor: float = 4.0, | ||
| memory_per_task_mb: int = 32, | ||
| memory_reserve_ratio: float = 0.2, | ||
| ) -> int: | ||
| """根据系统资源计算最优并发数 | ||
|
|
||
| 计算逻辑: | ||
| 1. 基于 CPU 核心数:并发数 = CPU核心数 × cpu_factor | ||
| 2. 基于内存限制:可用内存 × (1-reserve_ratio) ÷ memory_per_task | ||
| 3. 最终取两者最小值,并限制在 [min_concurrent, max_concurrent] 范围内 | ||
|
|
||
| Args: | ||
| min_concurrent: 并发下限 | ||
| max_concurrent: 并发上限 | ||
| cpu_factor: CPU核心系数(全闪存储建议4.0) | ||
| memory_per_task_mb: 每任务预估内存占用(MB) | ||
| memory_reserve_ratio: 内存安全保留比例 | ||
|
|
||
| Returns: | ||
| 计算得出的最优并发数 | ||
| """ | ||
| # 获取 CPU 核心数 | ||
| cpu_count = get_cpu_count(logical=True) | ||
|
|
||
| # 基于 CPU 计算并发数 | ||
| cpu_based_concurrent = int(cpu_count * cpu_factor) | ||
|
|
||
| # 获取可用内存 | ||
| available_memory_gb = get_available_memory_gb() | ||
|
|
||
| # 计算可用于并发任务的内存(扣除保留部分) | ||
| usable_memory_gb = available_memory_gb * (1 - memory_reserve_ratio) | ||
| usable_memory_mb = usable_memory_gb * 1024 | ||
|
|
||
| # 基于内存计算最大并发数 | ||
| memory_based_concurrent = int(usable_memory_mb / memory_per_task_mb) | ||
|
|
||
| # 取两者最小值(避免内存耗尽) | ||
| calculated_concurrent = min(cpu_based_concurrent, memory_based_concurrent) | ||
|
|
||
| # 限制在范围内 | ||
| final_concurrent = max(min_concurrent, min(max_concurrent, calculated_concurrent)) | ||
|
|
There was a problem hiding this comment.
calculate_optimal_concurrent can raise ZeroDivisionError if memory_per_task_mb is configured as 0, and invalid inputs (e.g. memory_reserve_ratio outside [0,1), or min_concurrent > max_concurrent) can yield surprising results. Add input validation/clamping (and ensure the function always returns at least 1) so misconfiguration fails fast with a clear error or safely degrades.
| """ | ||
| import os | ||
| import logging | ||
| from typing import Optional |
There was a problem hiding this comment.
Unused import: Optional is imported but not referenced anywhere in this module. Please remove it to keep the module clean (and to avoid failing stricter linters if/when enabled).
| from typing import Optional |
| # ==================== 配比任务并行复制配置 ==================== | ||
| # 动态并发计算参数(全闪存储高性能场景默认值) | ||
|
|
||
| # 并发下限(最少并发数) | ||
| ratio_copy_min_concurrent: int = 8 | ||
|
|
||
| # 并发上限(最多并发数,防止资源耗尽) | ||
| ratio_copy_max_concurrent: int = 128 | ||
|
|
||
| # CPU核心系数(每个核心贡献的并发数,全闪存储建议4.0) | ||
| ratio_copy_cpu_factor: float = 4.0 | ||
|
|
||
| # 每并发任务预估内存占用(MB) | ||
| ratio_copy_memory_per_task_mb: int = 32 | ||
|
|
||
| # 内存安全保留比例(保留给其他进程) | ||
| ratio_copy_memory_reserve_ratio: float = 0.2 | ||
|
|
||
| # 是否启用动态计算(False则使用固定值) | ||
| ratio_copy_dynamic_concurrent: bool = True | ||
|
|
||
| # 固定并发数(当 dynamic_concurrent=False 时使用) | ||
| ratio_copy_fixed_concurrent: int = 10 |
There was a problem hiding this comment.
PR description says “Add 8 new configuration parameters”, but this diff adds 7 (ratio_copy_min_concurrent, ratio_copy_max_concurrent, ratio_copy_cpu_factor, ratio_copy_memory_per_task_mb, ratio_copy_memory_reserve_ratio, ratio_copy_dynamic_concurrent, ratio_copy_fixed_concurrent). Please reconcile the PR description with the actual change (or add the missing config if something was intended).
| dst_dir = f"/dataset/{target_ds.id}/" | ||
| await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True) | ||
|
|
||
| semaphore = asyncio.Semaphore(max_concurrent) | ||
| successful_records: List[DatasetFiles] = [] | ||
| added_count = 0 | ||
| added_size = 0 | ||
|
|
||
| async def copy_with_semaphore( | ||
| file_record: DatasetFiles, | ||
| src_path: str, | ||
| dst_path: str | ||
| ) -> Tuple[bool, DatasetFiles]: | ||
| async with semaphore: | ||
| try: | ||
| file_dst_dir = os.path.dirname(dst_path) | ||
| if file_dst_dir != dst_dir: | ||
| await asyncio.to_thread(os.makedirs, file_dst_dir, exist_ok=True) | ||
|
|
There was a problem hiding this comment.
dst_dir includes a trailing slash (e.g. "/dataset//"), but os.path.dirname(dst_path) returns "/dataset/". This makes if file_dst_dir != dst_dir always true, causing an extra os.makedirs(...) call for every file copy. Normalize paths (e.g., strip trailing slash or compare Path(dst_dir)/Path(file_dst_dir)) and avoid per-file mkdir when all files share the same target directory.
|
|
||
| dst_dir = f"/dataset/{target_ds.id}/" | ||
| await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True) | ||
|
|
There was a problem hiding this comment.
asyncio.Semaphore(max_concurrent) will deadlock if max_concurrent is 0 (tasks will wait forever) and may error for negative values. Since max_concurrent comes from config, add validation/clamping (e.g., ensure it’s at least 1) either in get_concurrent_for_ratio_copy/calculate_optimal_concurrent or immediately before constructing the semaphore.
| original_max_concurrent = max_concurrent | |
| try: | |
| max_concurrent = max(1, int(max_concurrent)) | |
| except (TypeError, ValueError): | |
| max_concurrent = 1 | |
| if max_concurrent != original_max_concurrent: | |
| logger.warning( | |
| f"Invalid max_concurrent={original_max_concurrent}; using {max_concurrent} instead" | |
| ) |
| tasks = [copy_with_semaphore(rec, src, dst) for rec, src, dst in all_copy_tasks] | ||
| results = await asyncio.gather(*tasks, return_exceptions=True) | ||
|
|
||
| for i, result in enumerate(results): | ||
| if isinstance(result, Exception): | ||
| logger.error(f"Copy task {i} raised exception: {result}") | ||
| continue | ||
| success, file_record = result | ||
| if success: | ||
| added_count += 1 | ||
| added_size += int(file_record.file_size or 0) | ||
| successful_records.append(file_record) | ||
|
|
||
| if successful_records: | ||
| session.add_all(successful_records) | ||
| await session.flush() | ||
|
|
||
| logger.info( | ||
| f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, " | ||
| f"{added_size} bytes, {len(results) - added_count} failures" |
There was a problem hiding this comment.
tasks = [copy_with_semaphore(...) for ...] followed by asyncio.gather(*tasks) creates all tasks up front and retains a full results list. For large datasets this can cause high memory usage and event-loop overhead even though actual copy concurrency is limited by the semaphore. Consider processing copies in bounded batches or using a worker-queue pattern (fixed number of worker tasks consuming from an asyncio.Queue) so outstanding tasks are also bounded.
| tasks = [copy_with_semaphore(rec, src, dst) for rec, src, dst in all_copy_tasks] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| logger.error(f"Copy task {i} raised exception: {result}") | |
| continue | |
| success, file_record = result | |
| if success: | |
| added_count += 1 | |
| added_size += int(file_record.file_size or 0) | |
| successful_records.append(file_record) | |
| if successful_records: | |
| session.add_all(successful_records) | |
| await session.flush() | |
| logger.info( | |
| f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, " | |
| f"{added_size} bytes, {len(results) - added_count} failures" | |
| copy_queue: asyncio.Queue = asyncio.Queue() | |
| for rec, src, dst in all_copy_tasks: | |
| await copy_queue.put((rec, src, dst)) | |
| async def worker(worker_index: int) -> None: | |
| nonlocal added_count, added_size | |
| while True: | |
| item = await copy_queue.get() | |
| if item is None: | |
| copy_queue.task_done() | |
| break | |
| file_record, src_path, dst_path = item | |
| try: | |
| success, copied_file_record = await copy_with_semaphore( | |
| file_record, src_path, dst_path | |
| ) | |
| if success: | |
| added_count += 1 | |
| added_size += int(copied_file_record.file_size or 0) | |
| successful_records.append(copied_file_record) | |
| except Exception as e: | |
| logger.error(f"Copy worker {worker_index} raised exception: {e}") | |
| finally: | |
| copy_queue.task_done() | |
| worker_count = max(1, min(max_concurrent, len(all_copy_tasks))) | |
| workers = [ | |
| asyncio.create_task(worker(i)) | |
| for i in range(worker_count) | |
| ] | |
| await copy_queue.join() | |
| for _ in range(worker_count): | |
| await copy_queue.put(None) | |
| await asyncio.gather(*workers, return_exceptions=True) | |
| if successful_records: | |
| session.add_all(successful_records) | |
| await session.flush() | |
| logger.info( | |
| f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, " | |
| f"{added_size} bytes, {len(all_copy_tasks) - added_count} failures" |
The poetry.lock file contains psutil 7.2.2 which was outside the previous constraint >=6.0.0,<7.0.0. This mismatch caused CI build failure with 'pyproject.toml changed significantly' error. Update constraint to >=6.0.0,<8.0.0 to match existing lock file.
Summary
handle_ratio_relationsto useasyncio.Semaphorefor parallel file executionresource_utils.pymodule for system resource detectionpsutildependency for accurate resource detectionChanges
New Files
runtime/datamate-python/app/module/shared/util/resource_utils.py- Resource-aware concurrency calculation utilitiesModified Files
runtime/datamate-python/app/core/config.py- Added 8 config parameters:ratio_copy_min_concurrent(default: 8)ratio_copy_max_concurrent(default: 128)ratio_copy_cpu_factor(default: 4.0 for all-flash storage)ratio_copy_memory_per_task_mb(default: 32)ratio_copy_memory_reserve_ratio(default: 0.2)ratio_copy_dynamic_concurrent(default: True)ratio_copy_fixed_concurrent(default: 10)runtime/datamate-python/app/module/ratio/service/ratio_task.py- New methodhandle_ratio_relations_parallel():asyncio.Semaphoreto limit concurrent operationsruntime/datamate-python/pyproject.toml- AddedpsutildependencyPerformance Impact
Concurrent file copying significantly reduces execution time for ratio tasks with large files, especially on all-flash storage systems. For example, on a 20-core system with all-flash storage:
Test Plan