GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855
Open
justinli500 wants to merge 1 commit intoapache:mainfrom
Open
GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855justinli500 wants to merge 1 commit intoapache:mainfrom
justinli500 wants to merge 1 commit intoapache:mainfrom
Conversation
…er decode Dataset.to_batches() on parquet files accumulates memory as iteration proceeds because ReadRangeCache has no eviction API. PreBuffer() is called once with every row group up front, entries stay resident until the FileReader is destroyed, and users see roughly 10x more memory than the equivalent ParquetFile.iter_batches() path. This is one of the longest-standing open issues on the tracker. Add a new ReadRangeCache::EvictEntriesInRange(start, length) method that removes cache entries fully contained in the given window. Entries that span past the window (for example, because range coalescing merged them with an adjacent row group's column chunk) are deliberately left in place, so eviction is safe in the presence of coalescing. Expose the primitive through ParquetFileReader::EvictPreBufferedData and call it from the Arrow RowGroupGenerator's .Then callback once a row group has been decoded into Arrow arrays. At that point the raw column-chunk bytes held by the cache are no longer needed, and releasing them gives each row group a bounded per-row-group memory footprint. Thread safety: promote the existing mutex from LazyImpl into base Impl so that Cache, Read, Wait, WaitFor, and EvictEntriesInRange all acquire it before touching the entries vector. Concurrent Read from one thread and Evict from another was previously undefined behaviour in the non-lazy cache, and the dataset scanner's batch_readahead path is exactly the concurrent call pattern that would trigger it. Read now drops the lock before blocking on the I/O future, so the new locking does not serialize readers more tightly than before. Measured on a 458 MB / 10-row-group / 10M-row test file: Dataset.to_batches, before fix: 598 MB peak Dataset.to_batches, after fix: 331 MB peak (-267 MB) ParquetFile.iter_batches (no pre-buffer): 59 MB peak Savings scale linearly with row-group count, so on the multi-GB files from the issue thread this single fix recovers several GB of peak allocation. The remaining gap between Dataset.to_batches and iter_batches comes from a second source of accumulation in the Dataset infrastructure that is unrelated to the ReadRangeCache and should be tracked as a follow-up issue. New tests: * RangeReadCache.EvictEntriesInRange * RangeReadCache.EvictEntriesInRangeSpanningEntry * RangeReadCache.ConcurrentReadAndEvict * TestArrowReadWrite.EvictPreBufferedData * TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroups Full regression sweep: 824/824 parquet-arrow-reader-writer-test, 57/57 arrow-io-memory-test.
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
Dataset.to_batches()accumulates memory becauseReadRangeCachehas no eviction API.
PreBuffer()is called with every row group upfront and entries stay resident until the
FileReaderis destroyed,so users see ~10x more peak memory than
ParquetFile.iter_batches().Issue #39808 has been open for over a year; downstream projects have
worked around it by disabling
pre_buffer(Ray) or dropping thedataset API (Marin), both of which give up features or throughput.
What changes are included in this PR?
ReadRangeCache::EvictEntriesInRange(start, length). Removesentries fully contained in the window; leaves coalesced entries
alone so eviction is safe under range coalescing.
ParquetFileReader::EvictPreBufferedData(row_groups, column_indices)and call it from
RowGroupGenerator::ReadOneRowGroupafter the rowgroup has been decoded into Arrow arrays.
LazyImplmutex into baseImplsoconcurrent
ReadandEvictacross row groups is definedbehaviour on both cache variants.
Performance
Measured on a 458 MB / 10 row group / 10M row parquet file
(6 columns: 3 float64, 2 int64, 1 large_string; Snappy; macOS arm64;
Release build). Fix toggled via a one-line A/B test:
total_allocated_bytesDataset.to_batches, fix disabledDataset.to_batches, fix enabledDataset.to_batches,pre_buffer=FalseParquetFile.iter_batchesxychart-beta title "Peak allocated memory (458 MB / 10 row groups, lower is better)" x-axis ["without fix", "with fix", "no prebuffer", "iter_batches"] y-axis "MB" 0 --> 650 bar [598, 331, 151, 59]Per-row-group progression during iteration
(
max(total_allocated_bytes)in MB, sampled every 10k of 100k batches):xychart-beta title "max_allocated over iteration (top line: without fix; bottom: with fix)" x-axis "batches consumed (thousands)" 10 --> 100 y-axis "MB" 0 --> 650 line [159, 232, 294, 323, 386, 415, 477, 507, 569, 598] line [129, 202, 205, 234, 237, 266, 270, 299, 302, 331]Savings scale linearly with row-group count, so on the multi-GB files
from the issue thread this single fix recovers several GB of peak.
Related work/commits
Downstream projects have shipped workarounds while this issue has
been open, all of them in their own code rather than upstream:
ray-project/ray#62745(merged 2026-04-20): injectsParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True)in Ray Data's parquet reader. Gets peak alloc down to ~75 MB but
gives up the
pre_buffer=Truecoalesced-read optimization thatmakes S3 fast.
marin-community/marin#4344(merged): replaces dataset-API usagewith
ParquetFile.iter_batches, giving up hive-partition discovery,filter pushdown, and dataset-level schema unification.
No open PR against
apache/arrowaddresses the cache-sideaccumulation. This PR is the upstream fix that lets both workarounds
be reverted without losing features or throughput.
Scope of this fix
This PR fixes the
ReadRangeCacheaccumulation that dominates peakmemory on the default
pre_buffer=truepath.A second source of growth, visible as the 151 MB vs 59 MB gap in the
pre_buffer=falserow of the table above, lives in the dataset asyncgenerator pipeline and is unrelated to the cache. It should be
tracked as a follow-up issue.
Partially closes #39808.
Test plan
New tests in
arrow/io/memory_test.cc:RangeReadCache.EvictEntriesInRange- basic eviction semanticsacross lazy and eager caches. Covers no-op windows, partial
overlaps, wide windows that drop multiple entries, and evict on an
empty cache.
RangeReadCache.EvictEntriesInRangeSpanningEntry- forces coalescingvia
hole_size_limit=100and verifies a coalesced entry is refusedfor a partial-window evict and dropped for a wide window that fully
contains it.
RangeReadCache.ConcurrentReadAndEvict- 4 reader threads in a tightRead()loop against the upper half of the cache, 1 evictor threadrunning 50 cycles of
EvictEntriesInRange+ re-Cacheagainst thelower half. Runs for both
lazy=trueandlazy=false. Under thepre-refactor code the
lazy=falsecase would race theentriesvector; both cases now pass cleanly.
New tests in
parquet/arrow/arrow_reader_writer_test.cc:TestArrowReadWrite.EvictPreBufferedData- PreBuffers a 4-row-groupfile, calls
EvictPreBufferedData({0}, ...), confirms row group 0'scache entries are gone while row groups 1-3 remain readable, and
that evicting twice or evicting on a reader that never PreBuffered
are both safe no-ops.
TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroupspre_buffer=trueand confirms correctness of every emitted batch.Full-suite regression on Release build, macOS arm64:
parquet-arrow-reader-writer-test: 824/826 passing, 0 failing(the 2 skips are pre-existing dictionary-write variants not built
in this configuration).
arrow-io-memory-test: 57/57 passing.Are there any user-facing changes?
One new public method:
parquet::ParquetFileReader::EvictPreBufferedData.No behaviour change for existing callers beyond strictly lower peak
memory on the default
pre_buffer=truepath. No API deprecations,no format changes.
This PR contains a "Critical Fix": No (memory usage improvement,
not correctness).