Skip to content

GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855

Open
justinli500 wants to merge 1 commit intoapache:mainfrom
justinli500:GH-39808-parquet-dataset-memory-accumulation
Open

GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855
justinli500 wants to merge 1 commit intoapache:mainfrom
justinli500:GH-39808-parquet-dataset-memory-accumulation

Conversation

@justinli500
Copy link
Copy Markdown

@justinli500 justinli500 commented Apr 24, 2026

Rationale for this change

Dataset.to_batches() accumulates memory because ReadRangeCache
has no eviction API. PreBuffer() is called with every row group up
front and entries stay resident until the FileReader is destroyed,
so users see ~10x more peak memory than ParquetFile.iter_batches().
Issue #39808 has been open for over a year; downstream projects have
worked around it by disabling pre_buffer (Ray) or dropping the
dataset API (Marin), both of which give up features or throughput.

What changes are included in this PR?

  • Add ReadRangeCache::EvictEntriesInRange(start, length). Removes
    entries fully contained in the window; leaves coalesced entries
    alone so eviction is safe under range coalescing.
  • Add ParquetFileReader::EvictPreBufferedData(row_groups, column_indices)
    and call it from RowGroupGenerator::ReadOneRowGroup after the row
    group has been decoded into Arrow arrays.
  • Promote the existing LazyImpl mutex into base Impl so
    concurrent Read and Evict across row groups is defined
    behaviour on both cache variants.

Performance

Measured on a 458 MB / 10 row group / 10M row parquet file
(6 columns: 3 float64, 2 int64, 1 large_string; Snappy; macOS arm64;
Release build). Fix toggled via a one-line A/B test:

Mode Peak total_allocated_bytes
Dataset.to_batches, fix disabled 598 MB
Dataset.to_batches, fix enabled 331 MB (-267 MB, -44.6%)
Dataset.to_batches, pre_buffer=False 151 MB
ParquetFile.iter_batches 59 MB
xychart-beta                                                                                                                                  
    title "Peak allocated memory (458 MB / 10 row groups, lower is better)"                                                                                                                                                                                                  
    x-axis ["without fix", "with fix", "no prebuffer", "iter_batches"]                                                                                                                                                                                                       
    y-axis "MB" 0 --> 650                                                                                                                                                                                                                                                    
    bar [598, 331, 151, 59]                                                                                                                                                                                                                                                  
Loading

Per-row-group progression during iteration
(max(total_allocated_bytes) in MB, sampled every 10k of 100k batches):

batches (k)        10   20   30   40   50   60   70   80   90  100                                                                                                                                                                                                           
----------------  ---  ---  ---  ---  ---  ---  ---  ---  ---  ---                                                                                                                                                                                                           
without fix       159  232  294  323  386  415  477  507  569  598                                                                                                                                                                                                           
with fix          129  202  205  234  237  266  270  299  302  331                                                                                                                                                                                                           
saved              30   30   89   89  149  149  207  208  267  267                                                                                                                                                                                                           
xychart-beta                                                                                                                                  
    title "max_allocated over iteration (top line: without fix; bottom: with fix)"                                                                                                                                                                                           
    x-axis "batches consumed (thousands)" 10 --> 100                                                                                                                                                                                                                         
    y-axis "MB" 0 --> 650                                                                                                                                                                                                                                                    
    line [159, 232, 294, 323, 386, 415, 477, 507, 569, 598]                                                                                                                                                                                                                  
    line [129, 202, 205, 234, 237, 266, 270, 299, 302, 331]                                                                                                                                                                                                                  
Loading

Savings scale linearly with row-group count, so on the multi-GB files
from the issue thread this single fix recovers several GB of peak.

Related work/commits

Downstream projects have shipped workarounds while this issue has
been open, all of them in their own code rather than upstream:

  • ray-project/ray#62745 (merged 2026-04-20): injects
    ParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True)
    in Ray Data's parquet reader. Gets peak alloc down to ~75 MB but
    gives up the pre_buffer=True coalesced-read optimization that
    makes S3 fast.
  • marin-community/marin#4344 (merged): replaces dataset-API usage
    with ParquetFile.iter_batches, giving up hive-partition discovery,
    filter pushdown, and dataset-level schema unification.

No open PR against apache/arrow addresses the cache-side
accumulation. This PR is the upstream fix that lets both workarounds
be reverted without losing features or throughput.

Scope of this fix

This PR fixes the ReadRangeCache accumulation that dominates peak
memory on the default pre_buffer=true path.

A second source of growth, visible as the 151 MB vs 59 MB gap in the
pre_buffer=false row of the table above, lives in the dataset async
generator pipeline and is unrelated to the cache. It should be
tracked as a follow-up issue.

Partially closes #39808.

Test plan

New tests in arrow/io/memory_test.cc:

  • RangeReadCache.EvictEntriesInRange - basic eviction semantics
    across lazy and eager caches. Covers no-op windows, partial
    overlaps, wide windows that drop multiple entries, and evict on an
    empty cache.
  • RangeReadCache.EvictEntriesInRangeSpanningEntry - forces coalescing
    via hole_size_limit=100 and verifies a coalesced entry is refused
    for a partial-window evict and dropped for a wide window that fully
    contains it.
  • RangeReadCache.ConcurrentReadAndEvict - 4 reader threads in a tight
    Read() loop against the upper half of the cache, 1 evictor thread
    running 50 cycles of EvictEntriesInRange + re-Cache against the
    lower half. Runs for both lazy=true and lazy=false. Under the
    pre-refactor code the lazy=false case would race the entries
    vector; both cases now pass cleanly.

New tests in parquet/arrow/arrow_reader_writer_test.cc:

  • TestArrowReadWrite.EvictPreBufferedData - PreBuffers a 4-row-group
    file, calls EvictPreBufferedData({0}, ...), confirms row group 0's
    cache entries are gone while row groups 1-3 remain readable, and
    that evicting twice or evicting on a reader that never PreBuffered
    are both safe no-ops.
  • TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroups
    • drives the full async generator pipeline end to end with
      pre_buffer=true and confirms correctness of every emitted batch.

Full-suite regression on Release build, macOS arm64:

  • parquet-arrow-reader-writer-test: 824/826 passing, 0 failing
    (the 2 skips are pre-existing dictionary-write variants not built
    in this configuration).
  • arrow-io-memory-test: 57/57 passing.

Are there any user-facing changes?

One new public method: parquet::ParquetFileReader::EvictPreBufferedData.
No behaviour change for existing callers beyond strictly lower peak
memory on the default pre_buffer=true path. No API deprecations,
no format changes.

This PR contains a "Critical Fix": No (memory usage improvement,
not correctness).

…er decode

Dataset.to_batches() on parquet files accumulates memory as iteration
proceeds because ReadRangeCache has no eviction API. PreBuffer() is
called once with every row group up front, entries stay resident until
the FileReader is destroyed, and users see roughly 10x more memory than
the equivalent ParquetFile.iter_batches() path. This is one of the
longest-standing open issues on the tracker.

Add a new ReadRangeCache::EvictEntriesInRange(start, length) method
that removes cache entries fully contained in the given window. Entries
that span past the window (for example, because range coalescing merged
them with an adjacent row group's column chunk) are deliberately left
in place, so eviction is safe in the presence of coalescing.

Expose the primitive through ParquetFileReader::EvictPreBufferedData
and call it from the Arrow RowGroupGenerator's .Then callback once a
row group has been decoded into Arrow arrays. At that point the raw
column-chunk bytes held by the cache are no longer needed, and releasing
them gives each row group a bounded per-row-group memory footprint.

Thread safety: promote the existing mutex from LazyImpl into base Impl
so that Cache, Read, Wait, WaitFor, and EvictEntriesInRange all acquire
it before touching the entries vector. Concurrent Read from one thread
and Evict from another was previously undefined behaviour in the
non-lazy cache, and the dataset scanner's batch_readahead path is
exactly the concurrent call pattern that would trigger it. Read now
drops the lock before blocking on the I/O future, so the new locking
does not serialize readers more tightly than before.

Measured on a 458 MB / 10-row-group / 10M-row test file:

  Dataset.to_batches, before fix:              598 MB peak
  Dataset.to_batches, after  fix:              331 MB peak (-267 MB)
  ParquetFile.iter_batches (no pre-buffer):     59 MB peak

Savings scale linearly with row-group count, so on the multi-GB files
from the issue thread this single fix recovers several GB of peak
allocation. The remaining gap between Dataset.to_batches and
iter_batches comes from a second source of accumulation in the Dataset
infrastructure that is unrelated to the ReadRangeCache and should be
tracked as a follow-up issue.

New tests:
* RangeReadCache.EvictEntriesInRange
* RangeReadCache.EvictEntriesInRangeSpanningEntry
* RangeReadCache.ConcurrentReadAndEvict
* TestArrowReadWrite.EvictPreBufferedData
* TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroups

Full regression sweep: 824/824 parquet-arrow-reader-writer-test,
57/57 arrow-io-memory-test.
@justinli500 justinli500 requested a review from wgtmac as a code owner April 24, 2026 06:01
@github-actions
Copy link
Copy Markdown

⚠️ GitHub issue #39808 has been automatically assigned in GitHub to PR creator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Python] Dataset.to_batches accumulates memory usage and leaks

1 participant