perf(add_files): stream manifest entries for duplicate-files check#3287
Open
laichunpongben wants to merge 6 commits intoapache:mainfrom
Open
perf(add_files): stream manifest entries for duplicate-files check#3287laichunpongben wants to merge 6 commits intoapache:mainfrom
laichunpongben wants to merge 6 commits intoapache:mainfrom
Conversation
ndrluis
approved these changes
Apr 26, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #3286.
The hot path today
What this actually does, per call:
inspect.data_files()— for every manifest in the current snapshot, calls_get_files_from_manifest(pyiceberg/table/inspect.py:548), which for eachManifestEntrybuilds a Python dict with ~17 fields. The expensive ones:readable_metrics— for every column in the table schema, decodes lower/upper bound bytes viafrom_bytesand packs the result into a per-column dict. This is the single biggest cost on wide tables.partition— decodes the partition struct into a name → value dict.column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds— each materialized as a Python dict per file.pyarrow.Tableper manifest.pa.concat_tablesglues all manifests' Tables together..filter(expr)applies an Arrow-computeisinover the concatenated Table..to_pylist()converts back to Python dicts.file_path.For a backfill that calls
add_filesonce per day on a growing table, per-call cost is O(snapshot file count); cumulative cost is O(N²). After ~15 daily commits on a wide-schema table, dup-check time dominates: each call takes ~10–15 minutes vs seconds early on.The workaround in #2132 / docs PR #2249 is
check_duplicate_files=False, which trades away the idempotency guarantee that re-running a partial-failure resume is safe.Benchmark — before vs after
tests/benchmark/bench_add_files_dup_check.py(added in this PR) runs 10 sequentialadd_files(check_duplicate_files=True)calls on anInMemoryCatalogtable with a 30-column schema, 200 small parquet files per call. Measures wall-clock andtracemallocpeak per call. Run on macOS arm64 / Python 3.11.Before (upstream
main):Wall climbs ~44%; tracemalloc peak grows ~7.2×.
After (this PR):
Wall flat at ~1s; tracemalloc peak flat at ~6–8 MB. The growth disappears because the dup-check no longer materializes per-file dicts / pyarrow Tables / readable_metrics — it just does set containment on
file_pathwhile streaming manifest entries.This is a 10-batch run on a small, narrow workload. Real backfills with wider schemas (more columns × more row groups), more files per batch, and many more batches see the constant factor amplify; the production workload that motivated this PR was hitting ~10–15 minutes per call after 15 commits.
What this PR does
Replace the materialize-then-filter with a streaming scan that reuses the existing
_open_manifesthelper (pyiceberg/table/__init__.py:1918) — the canonical "open a manifest, fetch entries withdiscard_deleted=True, apply data-file predicates" pattern already used byDataScan.scan_plan_helper(line 2050). Delete manifests are skipped at the top level (same shape as_min_sequence_number).The loop body becomes a
setcontainment check ondata_file.file_path, scheduled viaexecutor.mapand flattened withchain.from_iterable— same idiom as the existing scan path.The same approach Spark's
add_filesaction takes: predicate-based against the new paths only, no pre-scan of all data files.What this is and isn't
fetch_manifest_entry), but everything downstream of the read —readable_metricscomputation, partition decode, per-file dict construction, pyarrow Table construction,concat_tables,filter,to_pylist— is gone. That post-processing was the bulk of the time, not the Avro read.file_pathlower/upper bounds at theManifestFilelevel so most manifests can be pruned without opening — that's a spec extension and a follow-up.Compatibility / behavior preservation
Audited the change for any behavioral divergence from the old
inspect.data_files().filter(...)path:add_filessignature and exception message unchanged. Existing integration tests attests/integration/test_add_files.py:test_add_files_that_referenced_by_current_snapshot{,_with_check_duplicate_files_true,_with_check_duplicate_files_false}exercise the dup-check contract and assert the exact error string — both preserved verbatim.Table.add_files(pyiceberg/table/__init__.py:1491). No subclass overrides exist (e.g.CreateTableTransactiondoesn't redefine it).Transaction.upsert/append/overwrite,_FastAppendFiles,MergingSnapshotProducerdon't share the dup-check path.inspect.data_files()filtered per-entry onDataFileContent.DATA; new code filters atManifestContent.DATA. These are theoretically distinct but produce identical sets per the Iceberg spec — delete entries cannot live in DATA manifests.discard_deleted: both paths useTrue(fetch_manifest_entrydefaults toTrue;_open_manifestpasses it explicitly).current_snapshot()—inspect.data_files()via_get_snapshot(None), new code directly viaself.table_metadata.current_snapshot().file_paths: same result (empty list) and same exceptions either way. Slight efficiency regression in this edge case — the new code still walks data manifests where the old code short-circuited viapc.field("file_path").isin([]). Not user-visible; can be optimized in a follow-up if anyone cares.ExecutorFactory.get_or_create()thread pool.add_filesaccepts abranchargument, but the dup-check has always run againstcurrent_snapshot()(i.e. main) regardless. This is a pre-existing inconsistency, not introduced by this PR. Preserved exactly to keep this change behavior-preserving.Refs
check_duplicate_filesoption in theadd_filesapi docs #2132 (closed as docs), perf: optimizetable.add_filesandinspect.files#2133 (parallelization)