From 7745f4112cfddb0823b9ad4c4ed33124d83e0c11 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Fri, 24 Apr 2026 14:38:07 -0700 Subject: [PATCH 1/3] delete file index --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/delete_file_index.py | 117 +++++++++++++++++++++----- pyiceberg/table/update/validate.py | 4 +- tests/table/test_delete_file_index.py | 92 ++++++++++++++++++++ 4 files changed, 193 insertions(+), 22 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7f1524642b..b4eb4bda08 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2059,7 +2059,7 @@ def _plan_files_server_side(self) -> Iterable[FileScanTask]: def _plan_files_local(self) -> Iterable[FileScanTask]: """Plan files locally by reading manifests.""" data_entries: list[ManifestEntry] = [] - delete_index = DeleteFileIndex() + delete_index = DeleteFileIndex(self.table_metadata.schema()) residual_evaluators: dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py index 3f513aabe5..8d824c32d1 100644 --- a/pyiceberg/table/delete_file_index.py +++ b/pyiceberg/table/delete_file_index.py @@ -17,12 +17,17 @@ from __future__ import annotations from bisect import bisect_left +from typing import TYPE_CHECKING +from pyiceberg.conversions import from_bytes from pyiceberg.expressions import EqualTo from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator -from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry +from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, DataFileContent, ManifestEntry from pyiceberg.typedef import Record +if TYPE_CHECKING: + from pyiceberg.schema import Schema + PATH_FIELD_ID = 2147483546 @@ -59,6 +64,15 @@ def referenced_delete_files(self) -> list[DataFile]: return [data_file for data_file, _ in self._files] +class EqualityDeletes(PositionDeletes): + """Collects equality delete files and indexes them by sequence number.""" + + def add(self, delete_file: DataFile, seq_num: int) -> None: + # Equality deletes are indexed by sequence number - 1 to ensure they only + # apply to data files added in strictly earlier snapshots. + super().add(delete_file, seq_num - 1) + + def _has_path_bounds(delete_file: DataFile) -> bool: lower = delete_file.lower_bounds upper = delete_file.upper_bounds @@ -76,6 +90,36 @@ def _applies_to_data_file(delete_file: DataFile, data_file: DataFile) -> bool: return evaluator.eval(delete_file) +def _eq_applies_to_data_file(eq_delete_file: DataFile, data_file: DataFile, schema: Schema) -> bool: + if not eq_delete_file.equality_ids: + return True + + for field_id in eq_delete_file.equality_ids: + if ( + eq_delete_file.lower_bounds + and eq_delete_file.upper_bounds + and data_file.lower_bounds + and data_file.upper_bounds + and field_id in eq_delete_file.lower_bounds + and field_id in eq_delete_file.upper_bounds + and field_id in data_file.lower_bounds + and field_id in data_file.upper_bounds + ): + field_type = schema.find_type(field_id) + if not field_type.is_primitive: + continue + + eq_lower = from_bytes(field_type, eq_delete_file.lower_bounds[field_id]) + eq_upper = from_bytes(field_type, eq_delete_file.upper_bounds[field_id]) + data_lower = from_bytes(field_type, data_file.lower_bounds[field_id]) + data_upper = from_bytes(field_type, data_file.upper_bounds[field_id]) + + if eq_upper < data_lower or eq_lower > data_upper: + return False + + return True + + def _referenced_data_file_path(delete_file: DataFile) -> str | None: """Return the path, if the path bounds evaluate to the same location.""" lower_bounds = delete_file.lower_bounds @@ -103,27 +147,43 @@ def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record] class DeleteFileIndex: - """Indexes position delete files by partition and by exact data file path.""" + """Indexes position and equality delete files by partition and by exact data file path.""" - def __init__(self) -> None: + def __init__(self, schema: Schema | None = None) -> None: + self._schema = schema self._by_partition: dict[tuple[int, Record], PositionDeletes] = {} self._by_path: dict[str, PositionDeletes] = {} + self._eq_by_partition: dict[tuple[int, Record], EqualityDeletes] = {} + self._global_eq_deletes: EqualityDeletes = EqualityDeletes() def is_empty(self) -> bool: - return not self._by_partition and not self._by_path + return ( + not self._by_partition + and not self._by_path + and not self._eq_by_partition + and not self._global_eq_deletes.referenced_delete_files() + ) def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None: delete_file = manifest_entry.data_file seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER - target_path = _referenced_data_file_path(delete_file) - if target_path: - deletes = self._by_path.setdefault(target_path, PositionDeletes()) - deletes.add(delete_file, seq) - else: - key = _partition_key(delete_file.spec_id or 0, partition_key) - deletes = self._by_partition.setdefault(key, PositionDeletes()) - deletes.add(delete_file, seq) + if delete_file.content == DataFileContent.POSITION_DELETES: + target_path = _referenced_data_file_path(delete_file) + if target_path: + deletes = self._by_path.setdefault(target_path, PositionDeletes()) + deletes.add(delete_file, seq) + else: + key = _partition_key(delete_file.spec_id or 0, partition_key) + deletes = self._by_partition.setdefault(key, PositionDeletes()) + deletes.add(delete_file, seq) + elif delete_file.content == DataFileContent.EQUALITY_DELETES: + if partition_key is None or len(partition_key) == 0: + self._global_eq_deletes.add(delete_file, seq) + else: + key = _partition_key(delete_file.spec_id or 0, partition_key) + deletes = self._eq_by_partition.setdefault(key, EqualityDeletes()) + deletes.add(delete_file, seq) def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]: if self.is_empty(): @@ -131,17 +191,31 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record deletes: set[DataFile] = set() spec_id = data_file.spec_id or 0 - key = _partition_key(spec_id, partition_key) - partition_deletes = self._by_partition.get(key) - if partition_deletes: - for delete_file in partition_deletes.filter_by_seq(seq_num): + + # Add position deletes + partition_pos_deletes = self._by_partition.get(key) + if partition_pos_deletes: + for delete_file in partition_pos_deletes.filter_by_seq(seq_num): if _applies_to_data_file(delete_file, data_file): deletes.add(delete_file) - path_deletes = self._by_path.get(data_file.file_path) - if path_deletes: - deletes.update(path_deletes.filter_by_seq(seq_num)) + path_pos_deletes = self._by_path.get(data_file.file_path) + if path_pos_deletes: + deletes.update(path_pos_deletes.filter_by_seq(seq_num)) + + # Add equality deletes + candidate_eq_deletes: list[DataFile] = [] + partition_eq_deletes = self._eq_by_partition.get(key) + if partition_eq_deletes: + candidate_eq_deletes.extend(partition_eq_deletes.filter_by_seq(seq_num)) + + candidate_eq_deletes.extend(self._global_eq_deletes.filter_by_seq(seq_num)) + + for eq_delete_file in candidate_eq_deletes: + if self._schema and not _eq_applies_to_data_file(eq_delete_file, data_file, self._schema): + continue + deletes.add(eq_delete_file) return deletes @@ -154,4 +228,9 @@ def referenced_delete_files(self) -> list[DataFile]: for deletes in self._by_path.values(): data_files.extend(deletes.referenced_delete_files()) + for deletes in self._eq_by_partition.values(): + data_files.extend(deletes.referenced_delete_files()) + + data_files.extend(self._global_eq_deletes.referenced_delete_files()) + return data_files diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 8178ed6ee0..042351d415 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -245,13 +245,13 @@ def _added_delete_files( DeleteFileIndex """ if parent_snapshot is None or table.format_version < 2: - return DeleteFileIndex() + return DeleteFileIndex(table.schema()) manifests, snapshot_ids = _validation_history( table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES ) - dfi = DeleteFileIndex() + dfi = DeleteFileIndex(table.schema()) for manifest in manifests: for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True): diff --git a/tests/table/test_delete_file_index.py b/tests/table/test_delete_file_index.py index 09dd9ac81b..559d64eee4 100644 --- a/tests/table/test_delete_file_index.py +++ b/tests/table/test_delete_file_index.py @@ -16,9 +16,12 @@ # under the License. import pytest +from pyiceberg.conversions import to_bytes from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntry, ManifestEntryStatus +from pyiceberg.schema import Schema from pyiceberg.table.delete_file_index import PATH_FIELD_ID, DeleteFileIndex, PositionDeletes from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField def _create_data_file(file_path: str = "s3://bucket/data.parquet", spec_id: int = 0) -> DataFile: @@ -187,3 +190,92 @@ def test_record_equality_for_partition_lookup() -> None: assert len(index.for_data_file(1, data_file, partition_b)) == 1 assert len(index.for_data_file(1, data_file, partition_c)) == 0 + + +def test_equality_delete_sequence_number_filtering() -> None: + index = DeleteFileIndex() + + # Equality delete with sequence number 2 + index.add_delete_file(_create_equality_delete(sequence_number=2)) + + data_file = _create_data_file() + + # Data file with sequence number 1 should be affected by equality delete with sequence number 2 + assert len(index.for_data_file(1, data_file)) == 1 + + # Data file with sequence number 2 should NOT be affected by equality delete with sequence number 2 + # Equality deletes apply only to data files added in strictly earlier snapshots (seq - 1) + assert len(index.for_data_file(2, data_file)) == 0 + + # Data file with sequence number 3 should NOT be affected + assert len(index.for_data_file(3, data_file)) == 0 + + +def test_global_equality_deletes() -> None: + index = DeleteFileIndex() + + # Global equality delete (unpartitioned) + index.add_delete_file(_create_equality_delete(sequence_number=10)) + + partition_1 = Record(1) + partition_2 = Record(2) + + # Partitioned equality delete for partition 1 + index.add_delete_file(_create_equality_delete(sequence_number=20), partition_1) + + file_1 = _create_data_file(file_path="s3://bucket/file_1.parquet") + file_2 = _create_data_file(file_path="s3://bucket/file_2.parquet") + + # Partition 1 should have 2 equality deletes (1 global, 1 partitioned) + assert len(index.for_data_file(1, file_1, partition_1)) == 2 + # Partition 2 should have 1 equality delete (1 global) + assert len(index.for_data_file(1, file_2, partition_2)) == 1 + + +def test_equality_delete_metrics_filtering() -> None: + schema = Schema(NestedField(1, "id", IntegerType(), required=True)) + index = DeleteFileIndex(schema=schema) + + def _create_data_file_with_metrics(file_path: str, lower: int, upper: int) -> DataFile: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + lower_bounds={1: to_bytes(IntegerType(), lower)}, + upper_bounds={1: to_bytes(IntegerType(), upper)}, + ) + data_file._spec_id = 0 + return data_file + + def _create_equality_delete_with_metrics(sequence_number: int, lower: int, upper: int) -> ManifestEntry: + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=f"s3://bucket/eq-delete-{sequence_number}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + equality_ids=[1], + lower_bounds={1: to_bytes(IntegerType(), lower)}, + upper_bounds={1: to_bytes(IntegerType(), upper)}, + ) + delete_file._spec_id = 0 + return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) + + # Equality delete for rows where id is between 10 and 20 + index.add_delete_file(_create_equality_delete_with_metrics(sequence_number=100, lower=10, upper=20)) + + # Data file with id between 0 and 5 (no overlap) + file_no_overlap = _create_data_file_with_metrics("s3://bucket/no_overlap.parquet", 0, 5) + assert len(index.for_data_file(1, file_no_overlap)) == 0 + + # Data file with id between 15 and 25 (overlap) + file_overlap = _create_data_file_with_metrics("s3://bucket/overlap.parquet", 15, 25) + assert len(index.for_data_file(1, file_overlap)) == 1 + + # Data file with id between 25 and 30 (no overlap) + file_no_overlap_2 = _create_data_file_with_metrics("s3://bucket/no_overlap_2.parquet", 25, 30) + assert len(index.for_data_file(1, file_no_overlap_2)) == 0 From 0b3081aeae873131f2462e027081f5e7d4e1ef9f Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Fri, 24 Apr 2026 14:53:04 -0700 Subject: [PATCH 2/3] Add DeleteFileIndex support for EqualityDeletes --- tests/table/test_delete_file_index.py | 85 ++++++++++++++++----------- 1 file changed, 51 insertions(+), 34 deletions(-) diff --git a/tests/table/test_delete_file_index.py b/tests/table/test_delete_file_index.py index 559d64eee4..848e7f47c1 100644 --- a/tests/table/test_delete_file_index.py +++ b/tests/table/test_delete_file_index.py @@ -24,7 +24,12 @@ from pyiceberg.types import IntegerType, NestedField -def _create_data_file(file_path: str = "s3://bucket/data.parquet", spec_id: int = 0) -> DataFile: +def _create_data_file( + file_path: str = "s3://bucket/data.parquet", + spec_id: int = 0, + lower_bounds: dict[int, bytes] | None = None, + upper_bounds: dict[int, bytes] | None = None, +) -> DataFile: data_file = DataFile.from_args( content=DataFileContent.DATA, file_path=file_path, @@ -32,6 +37,8 @@ def _create_data_file(file_path: str = "s3://bucket/data.parquet", spec_id: int partition=Record(), record_count=100, file_size_in_bytes=1000, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, ) data_file._spec_id = spec_id return data_file @@ -84,6 +91,27 @@ def _create_deletion_vector( return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) +def _create_equality_delete( + sequence_number: int = 1, + spec_id: int = 0, + lower_bounds: dict[int, bytes] | None = None, + upper_bounds: dict[int, bytes] | None = None, +) -> ManifestEntry: + delete_file = DataFile.from_args( + content=DataFileContent.EQUALITY_DELETES, + file_path=f"s3://bucket/eq-delete-{sequence_number}.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + equality_ids=[1], + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + ) + delete_file._spec_id = spec_id + return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) + + def test_empty_index() -> None: index = DeleteFileIndex() data_file = _create_data_file() @@ -236,46 +264,35 @@ def test_equality_delete_metrics_filtering() -> None: schema = Schema(NestedField(1, "id", IntegerType(), required=True)) index = DeleteFileIndex(schema=schema) - def _create_data_file_with_metrics(file_path: str, lower: int, upper: int) -> DataFile: - data_file = DataFile.from_args( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - record_count=100, - file_size_in_bytes=1000, - lower_bounds={1: to_bytes(IntegerType(), lower)}, - upper_bounds={1: to_bytes(IntegerType(), upper)}, - ) - data_file._spec_id = 0 - return data_file - - def _create_equality_delete_with_metrics(sequence_number: int, lower: int, upper: int) -> ManifestEntry: - delete_file = DataFile.from_args( - content=DataFileContent.EQUALITY_DELETES, - file_path=f"s3://bucket/eq-delete-{sequence_number}.parquet", - file_format=FileFormat.PARQUET, - partition=Record(), - record_count=10, - file_size_in_bytes=100, - equality_ids=[1], - lower_bounds={1: to_bytes(IntegerType(), lower)}, - upper_bounds={1: to_bytes(IntegerType(), upper)}, - ) - delete_file._spec_id = 0 - return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file) - # Equality delete for rows where id is between 10 and 20 - index.add_delete_file(_create_equality_delete_with_metrics(sequence_number=100, lower=10, upper=20)) + index.add_delete_file( + _create_equality_delete( + sequence_number=100, + lower_bounds={1: to_bytes(IntegerType(), 10)}, + upper_bounds={1: to_bytes(IntegerType(), 20)}, + ) + ) # Data file with id between 0 and 5 (no overlap) - file_no_overlap = _create_data_file_with_metrics("s3://bucket/no_overlap.parquet", 0, 5) + file_no_overlap = _create_data_file( + "s3://bucket/no_overlap.parquet", + lower_bounds={1: to_bytes(IntegerType(), 0)}, + upper_bounds={1: to_bytes(IntegerType(), 5)}, + ) assert len(index.for_data_file(1, file_no_overlap)) == 0 # Data file with id between 15 and 25 (overlap) - file_overlap = _create_data_file_with_metrics("s3://bucket/overlap.parquet", 15, 25) + file_overlap = _create_data_file( + "s3://bucket/overlap.parquet", + lower_bounds={1: to_bytes(IntegerType(), 15)}, + upper_bounds={1: to_bytes(IntegerType(), 25)}, + ) assert len(index.for_data_file(1, file_overlap)) == 1 # Data file with id between 25 and 30 (no overlap) - file_no_overlap_2 = _create_data_file_with_metrics("s3://bucket/no_overlap_2.parquet", 25, 30) + file_no_overlap_2 = _create_data_file( + "s3://bucket/no_overlap_2.parquet", + lower_bounds={1: to_bytes(IntegerType(), 25)}, + upper_bounds={1: to_bytes(IntegerType(), 30)}, + ) assert len(index.for_data_file(1, file_no_overlap_2)) == 0 From 78800f7bee47ced9185259541a301aa4cf86b956 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Fri, 24 Apr 2026 15:29:35 -0700 Subject: [PATCH 3/3] we shouldn't try to act on equality delete files --- pyiceberg/io/pyarrow.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 17948ad795..c9fd45819a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1693,7 +1693,12 @@ def _task_to_record_batches( def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]: deletes_per_file: dict[str, list[ChunkedArray]] = {} - unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) + unique_deletes = { + delete_file + for task in tasks + for delete_file in task.delete_files + if delete_file.content == DataFileContent.POSITION_DELETES + } if len(unique_deletes) > 0: executor = ExecutorFactory.get_or_create() deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map(