Skip to content

feat: add parallel file copy for ratio task execution#475

Merged
JasonW404 merged 4 commits intomainfrom
feat/parallel-ratio-file-copy
Apr 29, 2026
Merged

feat: add parallel file copy for ratio task execution#475
JasonW404 merged 4 commits intomainfrom
feat/parallel-ratio-file-copy

Conversation

@JasonW404
Copy link
Copy Markdown
Collaborator

Summary

  • Add dynamic concurrency calculation based on CPU and memory resources for ratio task file copying
  • Refactor handle_ratio_relations to use asyncio.Semaphore for parallel file execution
  • Use hard links for file copying (fallback to symlink/copy2 when hard link fails)
  • Add resource_utils.py module for system resource detection
  • Add 8 new configuration parameters for concurrency tuning with all-flash storage defaults
  • Add psutil dependency for accurate resource detection

Changes

New Files

  • runtime/datamate-python/app/module/shared/util/resource_utils.py - Resource-aware concurrency calculation utilities

Modified Files

  • runtime/datamate-python/app/core/config.py - Added 8 config parameters:

    • ratio_copy_min_concurrent (default: 8)
    • ratio_copy_max_concurrent (default: 128)
    • ratio_copy_cpu_factor (default: 4.0 for all-flash storage)
    • ratio_copy_memory_per_task_mb (default: 32)
    • ratio_copy_memory_reserve_ratio (default: 0.2)
    • ratio_copy_dynamic_concurrent (default: True)
    • ratio_copy_fixed_concurrent (default: 10)
  • runtime/datamate-python/app/module/ratio/service/ratio_task.py - New method handle_ratio_relations_parallel():

    • Collects all copy tasks upfront
    • Uses asyncio.Semaphore to limit concurrent operations
    • Batch database insert for efficiency
    • Hard links for fast file copying (O(1) instead of O(n))
  • runtime/datamate-python/pyproject.toml - Added psutil dependency

Performance Impact

Concurrent file copying significantly reduces execution time for ratio tasks with large files, especially on all-flash storage systems. For example, on a 20-core system with all-flash storage:

  • Previous: Sequential file copy (1 file at a time)
  • Now: Up to 80 concurrent operations (20 cores × 4.0 factor)

Test Plan

  1. Run ratio task with multiple source datasets
  2. Verify files are correctly copied to target dataset
  3. Check concurrency calculation logs for correct values
  4. Test with different file sizes and counts

- Add dynamic concurrency calculation based on CPU and memory resources
- Refactor handle_ratio_relations to use asyncio.Semaphore for parallel execution
- Use hard links for file copying (fallback to symlink/copy2)
- Add resource_utils.py for system resource detection
- Add 8 new config parameters for concurrency tuning (all-flash defaults)
- Add psutil dependency for accurate resource detection

Performance improvement: concurrent file copying significantly reduces
execution time for ratio tasks with large files, especially on all-flash
storage systems.
Copilot AI review requested due to automatic review settings April 29, 2026 03:39
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves ratio task execution performance by introducing resource-aware concurrency and parallelizing file copy operations, with a new utility module for CPU/memory detection and new settings to tune concurrency behavior.

Changes:

  • Add resource-aware concurrency calculation utilities (CPU/memory-based) and log system resource info during ratio task execution.
  • Refactor ratio task file copying to run in parallel using asyncio.Semaphore, with hard-link-first copy strategy and batched DB inserts.
  • Add psutil dependency and new configuration knobs for concurrency tuning.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.

File Description
runtime/datamate-python/pyproject.toml Adds psutil dependency to support accurate CPU/memory detection.
runtime/datamate-python/app/module/shared/util/resource_utils.py New module to compute optimal concurrency from CPU/memory and expose system resource info.
runtime/datamate-python/app/module/ratio/service/ratio_task.py Adds parallel copy implementation for ratio tasks and uses dynamic concurrency calculation.
runtime/datamate-python/app/core/config.py Introduces new settings for ratio-copy concurrency tuning (min/max/factors/memory and dynamic vs fixed).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +171 to +196
dst_prefix = f"/dataset/{target_ds.id}/"
file_name = RatioTaskService.get_new_file_name(dst_prefix, existing_paths, file)
new_path = dst_prefix + file_name

file_record = DatasetFiles(
dataset_id=target_ds.id,
file_name=file_name,
file_path=new_path,
file_type=file.file_type,
file_size=file.file_size,
check_sum=file.check_sum,
tags=file.tags,
tags_updated_at=datetime.now(),
dataset_filemetadata=file.dataset_filemetadata,
status="ACTIVE",
)

all_copy_tasks.append((file_record, file.file_path, new_path))
existing_paths.add(new_path)
source_paths.add(file.file_path)

if not all_copy_tasks:
return 0, 0

dst_dir = f"/dataset/{target_ds.id}/"
await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True)
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New code hard-codes the dataset path prefix as "/dataset" when constructing dst_prefix/dst_dir. The codebase already has settings.dm_file_path_prefix for this purpose; hard-coding here makes the ratio task ignore that configuration and increases drift if the prefix ever changes. Consider building paths from settings.dm_file_path_prefix (or a shared helper) instead of embedding the literal string in the new parallel implementation.

Copilot uses AI. Check for mistakes.
Comment on lines +56 to +101
def calculate_optimal_concurrent(
min_concurrent: int = 8,
max_concurrent: int = 128,
cpu_factor: float = 4.0,
memory_per_task_mb: int = 32,
memory_reserve_ratio: float = 0.2,
) -> int:
"""根据系统资源计算最优并发数

计算逻辑:
1. 基于 CPU 核心数:并发数 = CPU核心数 × cpu_factor
2. 基于内存限制:可用内存 × (1-reserve_ratio) ÷ memory_per_task
3. 最终取两者最小值,并限制在 [min_concurrent, max_concurrent] 范围内

Args:
min_concurrent: 并发下限
max_concurrent: 并发上限
cpu_factor: CPU核心系数(全闪存储建议4.0)
memory_per_task_mb: 每任务预估内存占用(MB)
memory_reserve_ratio: 内存安全保留比例

Returns:
计算得出的最优并发数
"""
# 获取 CPU 核心数
cpu_count = get_cpu_count(logical=True)

# 基于 CPU 计算并发数
cpu_based_concurrent = int(cpu_count * cpu_factor)

# 获取可用内存
available_memory_gb = get_available_memory_gb()

# 计算可用于并发任务的内存(扣除保留部分)
usable_memory_gb = available_memory_gb * (1 - memory_reserve_ratio)
usable_memory_mb = usable_memory_gb * 1024

# 基于内存计算最大并发数
memory_based_concurrent = int(usable_memory_mb / memory_per_task_mb)

# 取两者最小值(避免内存耗尽)
calculated_concurrent = min(cpu_based_concurrent, memory_based_concurrent)

# 限制在范围内
final_concurrent = max(min_concurrent, min(max_concurrent, calculated_concurrent))

Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculate_optimal_concurrent can raise ZeroDivisionError if memory_per_task_mb is configured as 0, and invalid inputs (e.g. memory_reserve_ratio outside [0,1), or min_concurrent > max_concurrent) can yield surprising results. Add input validation/clamping (and ensure the function always returns at least 1) so misconfiguration fails fast with a clear error or safely degrades.

Copilot uses AI. Check for mistakes.
"""
import os
import logging
from typing import Optional
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import: Optional is imported but not referenced anywhere in this module. Please remove it to keep the module clean (and to avoid failing stricter linters if/when enabled).

Suggested change
from typing import Optional

Copilot uses AI. Check for mistakes.
Comment on lines +87 to +109
# ==================== 配比任务并行复制配置 ====================
# 动态并发计算参数(全闪存储高性能场景默认值)

# 并发下限(最少并发数)
ratio_copy_min_concurrent: int = 8

# 并发上限(最多并发数,防止资源耗尽)
ratio_copy_max_concurrent: int = 128

# CPU核心系数(每个核心贡献的并发数,全闪存储建议4.0)
ratio_copy_cpu_factor: float = 4.0

# 每并发任务预估内存占用(MB)
ratio_copy_memory_per_task_mb: int = 32

# 内存安全保留比例(保留给其他进程)
ratio_copy_memory_reserve_ratio: float = 0.2

# 是否启用动态计算(False则使用固定值)
ratio_copy_dynamic_concurrent: bool = True

# 固定并发数(当 dynamic_concurrent=False 时使用)
ratio_copy_fixed_concurrent: int = 10
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says “Add 8 new configuration parameters”, but this diff adds 7 (ratio_copy_min_concurrent, ratio_copy_max_concurrent, ratio_copy_cpu_factor, ratio_copy_memory_per_task_mb, ratio_copy_memory_reserve_ratio, ratio_copy_dynamic_concurrent, ratio_copy_fixed_concurrent). Please reconcile the PR description with the actual change (or add the missing config if something was intended).

Copilot uses AI. Check for mistakes.
Comment on lines +195 to +213
dst_dir = f"/dataset/{target_ds.id}/"
await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True)

semaphore = asyncio.Semaphore(max_concurrent)
successful_records: List[DatasetFiles] = []
added_count = 0
added_size = 0

async def copy_with_semaphore(
file_record: DatasetFiles,
src_path: str,
dst_path: str
) -> Tuple[bool, DatasetFiles]:
async with semaphore:
try:
file_dst_dir = os.path.dirname(dst_path)
if file_dst_dir != dst_dir:
await asyncio.to_thread(os.makedirs, file_dst_dir, exist_ok=True)

Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dst_dir includes a trailing slash (e.g. "/dataset//"), but os.path.dirname(dst_path) returns "/dataset/". This makes if file_dst_dir != dst_dir always true, causing an extra os.makedirs(...) call for every file copy. Normalize paths (e.g., strip trailing slash or compare Path(dst_dir)/Path(file_dst_dir)) and avoid per-file mkdir when all files share the same target directory.

Copilot uses AI. Check for mistakes.

dst_dir = f"/dataset/{target_ds.id}/"
await asyncio.to_thread(os.makedirs, dst_dir, exist_ok=True)

Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio.Semaphore(max_concurrent) will deadlock if max_concurrent is 0 (tasks will wait forever) and may error for negative values. Since max_concurrent comes from config, add validation/clamping (e.g., ensure it’s at least 1) either in get_concurrent_for_ratio_copy/calculate_optimal_concurrent or immediately before constructing the semaphore.

Suggested change
original_max_concurrent = max_concurrent
try:
max_concurrent = max(1, int(max_concurrent))
except (TypeError, ValueError):
max_concurrent = 1
if max_concurrent != original_max_concurrent:
logger.warning(
f"Invalid max_concurrent={original_max_concurrent}; using {max_concurrent} instead"
)

Copilot uses AI. Check for mistakes.
Comment on lines +227 to +246
tasks = [copy_with_semaphore(rec, src, dst) for rec, src, dst in all_copy_tasks]
results = await asyncio.gather(*tasks, return_exceptions=True)

for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Copy task {i} raised exception: {result}")
continue
success, file_record = result
if success:
added_count += 1
added_size += int(file_record.file_size or 0)
successful_records.append(file_record)

if successful_records:
session.add_all(successful_records)
await session.flush()

logger.info(
f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, "
f"{added_size} bytes, {len(results) - added_count} failures"
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks = [copy_with_semaphore(...) for ...] followed by asyncio.gather(*tasks) creates all tasks up front and retains a full results list. For large datasets this can cause high memory usage and event-loop overhead even though actual copy concurrency is limited by the semaphore. Consider processing copies in bounded batches or using a worker-queue pattern (fixed number of worker tasks consuming from an asyncio.Queue) so outstanding tasks are also bounded.

Suggested change
tasks = [copy_with_semaphore(rec, src, dst) for rec, src, dst in all_copy_tasks]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Copy task {i} raised exception: {result}")
continue
success, file_record = result
if success:
added_count += 1
added_size += int(file_record.file_size or 0)
successful_records.append(file_record)
if successful_records:
session.add_all(successful_records)
await session.flush()
logger.info(
f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, "
f"{added_size} bytes, {len(results) - added_count} failures"
copy_queue: asyncio.Queue = asyncio.Queue()
for rec, src, dst in all_copy_tasks:
await copy_queue.put((rec, src, dst))
async def worker(worker_index: int) -> None:
nonlocal added_count, added_size
while True:
item = await copy_queue.get()
if item is None:
copy_queue.task_done()
break
file_record, src_path, dst_path = item
try:
success, copied_file_record = await copy_with_semaphore(
file_record, src_path, dst_path
)
if success:
added_count += 1
added_size += int(copied_file_record.file_size or 0)
successful_records.append(copied_file_record)
except Exception as e:
logger.error(f"Copy worker {worker_index} raised exception: {e}")
finally:
copy_queue.task_done()
worker_count = max(1, min(max_concurrent, len(all_copy_tasks)))
workers = [
asyncio.create_task(worker(i))
for i in range(worker_count)
]
await copy_queue.join()
for _ in range(worker_count):
await copy_queue.put(None)
await asyncio.gather(*workers, return_exceptions=True)
if successful_records:
session.add_all(successful_records)
await session.flush()
logger.info(
f"Parallel copy completed: {added_count}/{len(all_copy_tasks)} files, "
f"{added_size} bytes, {len(all_copy_tasks) - added_count} failures"

Copilot uses AI. Check for mistakes.
jason added 3 commits April 29, 2026 11:47
The poetry.lock file contains psutil 7.2.2 which was outside the
previous constraint >=6.0.0,<7.0.0. This mismatch caused CI build
failure with 'pyproject.toml changed significantly' error.

Update constraint to >=6.0.0,<8.0.0 to match existing lock file.
@JasonW404 JasonW404 merged commit 7f61ca5 into main Apr 29, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants