diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index ddf55b532a1..3e2b0580684 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -12,6 +12,11 @@ #include "error.h" #include "table_filter.h" #include "duckdb_vx/data.h" +#include + +#ifdef __cplusplus +static_assert(sizeof(idx_t) == 8); +#endif #ifdef __cplusplus extern "C" { @@ -83,6 +88,16 @@ typedef struct { bool has_null; } duckdb_column_statistics; +const idx_t INVALID_IDX = UINT64_MAX; + +typedef struct { + idx_t partition_index; + // Either INVALID_IDX or position of column in output for file_index column + size_t file_index_column_pos; + // File index for the exported partition. + size_t file_index; +} duckdb_vx_partition_data; + // vtable mimicking subset of TableFunction. // See duckdb/include/function/tfunc.hpp typedef struct { @@ -123,10 +138,10 @@ typedef struct { double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state); - idx_t (*get_partition_data)(const void *bind_data, - void *init_global_data, - void *init_local_data, - duckdb_vx_error *error_out); + void (*get_partition_data)(const void *bind_data, + void *init_global_data, + void *init_local_data, + duckdb_vx_partition_data *partition_data_out); } duckdb_vx_tfunc_vtab_t; // A single function for configuring the DuckDB table function vtable. diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index a0ebabd068d..d26d0fa2f04 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -10,6 +10,7 @@ DUCKDB_INCLUDES_BEGIN #include "duckdb.h" #include "duckdb/catalog/catalog.hpp" #include "duckdb/common/insertion_order_preserving_map.hpp" +#include "duckdb/common/multi_file/multi_file_reader.hpp" #include "duckdb/function/table_function.hpp" #include "duckdb/main/capi/capi_internal.hpp" #include "duckdb/main/connection.hpp" @@ -19,6 +20,8 @@ DUCKDB_INCLUDES_END using namespace duckdb; using vortex::CData; using vortex::IntoErrString; +constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX; +constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER; struct CTableFunctionInfo final : TableFunctionInfo { explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) { @@ -334,21 +337,50 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu result.return_types.emplace_back(logical_type); } -OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) { - if (input.partition_info.RequiresPartitionColumns()) { - throw InternalException("TableScan::GetPartitionData: partition columns not supported"); - } +/** + * Called at planning time to determine whether data is partitioned by a + * given set of columns. Requested columns are GROUP BY parameters i.e. columns + * over which the query aggregates. + */ +TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInput &input) { + const vector &ids = input.partition_ids; + // Our data is partitioned by array exporters. Each exporter processes a + // single Array which belongs to a single file. If data is partitioned only + // by file_index, there is one unique value for an Array. Otherwise there + // may be multiple values. + return (ids.size() == 1 && ids[0] == COLUMN_IDENTIFIER_FILE_INDEX) + ? TablePartitionInfo::SINGLE_VALUE_PARTITIONS + : TablePartitionInfo::NOT_PARTITIONED; +} + +/** + * Duckdb requests this function after exporting the chunk. We answer with + * partition_index we have exported as well as information about constant + * columns in this partition. As data is partitioned by array exporters, in + * each partition ~ exported array file_index is constant. + */ +OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) { auto &bind = input.bind_data->Cast(); void *const ffi_bind = bind.ffi_data->DataPtr(); void *const ffi_global = input.global_state->Cast().ffi_data->DataPtr(); void *const ffi_local = input.local_state->Cast().ffi_data->DataPtr(); - - duckdb_vx_error error_out = nullptr; - const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out); - if (error_out) { - throw InvalidInputException(IntoErrString(error_out)); + duckdb_vx_partition_data partition_data; + bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data); + + OperatorPartitionData out(partition_data.partition_index); + + // file_index_column_pos may be INVALID_IDX, but column_index will never + // be INVALID_IDX, so we can compare directly + for (const column_t column_index : input.partition_info.partition_columns) { + if (column_index == partition_data.file_index_column_pos) { + out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index)); + } else { + throw InternalException(StringUtil::Format( + "get_partition_data: requested column_index %d is not constant for given partition", + column_index)); + } } - return OperatorPartitionData(batch_index); + return out; } extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) { @@ -377,21 +409,30 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d tf.projection_pushdown = true; tf.filter_pushdown = true; - // We can prune out filter columns that are unused in the remainder of the query plan. - // e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function. tf.filter_prune = true; tf.sampling_pushdown = false; - tf.late_materialization = false; tf.pushdown_complex_filter = c_pushdown_complex_filter; tf.cardinality = c_cardinality; - tf.get_partition_data = c_get_partition_data; + tf.get_partition_info = get_partition_info; + tf.get_partition_data = get_partition_data; tf.to_string = c_to_string; tf.table_scan_progress = c_table_scan_progress; tf.statistics = c_statistics; + tf.late_materialization = true; + // Columns that uniquely identify a row for deferred re-fetch in a multi + // file scan: (file index, row number in file). + tf.get_row_id_columns = [](auto &, auto) -> vector { + return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER}; + }; + tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t { - return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}}; + return { + {COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}}, + {COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}}, + {COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}}, + }; }; tf.arguments.resize(vtab->parameter_count); diff --git a/vortex-duckdb/src/convert/mod.rs b/vortex-duckdb/src/convert/mod.rs index 8caab12b04c..d61aa81e005 100644 --- a/vortex-duckdb/src/convert/mod.rs +++ b/vortex-duckdb/src/convert/mod.rs @@ -11,4 +11,5 @@ pub use dtype::from_duckdb_table; pub use expr::try_from_bound_expression; pub use scalar::*; pub use table_filter::try_from_table_filter; +pub use table_filter::try_from_virtual_column_filter; pub use vector::data_chunk_to_vortex; diff --git a/vortex-duckdb/src/convert/table_filter.rs b/vortex-duckdb/src/convert/table_filter.rs index fa9003bc09c..a1c046c678c 100644 --- a/vortex-duckdb/src/convert/table_filter.rs +++ b/vortex-duckdb/src/convert/table_filter.rs @@ -1,14 +1,17 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ops::Range; use std::sync::Arc; use itertools::Itertools; +use vortex::buffer::Buffer; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_bail; +use vortex::error::vortex_err; use vortex::expr::Expression; use vortex::expr::and_collect; use vortex::expr::get_item; @@ -21,10 +24,13 @@ use vortex::scalar::Scalar; use vortex::scalar_fn::ScalarFnVTableExt; use vortex::scalar_fn::fns::binary::Binary; use vortex::scalar_fn::fns::operators::CompareOperator; +use vortex::scan::selection::Selection; use crate::cpp::DUCKDB_VX_EXPR_TYPE; +use crate::duckdb::ExtractedValue; use crate::duckdb::TableFilterClass; use crate::duckdb::TableFilterRef; +use crate::duckdb::ValueRef; pub fn try_from_table_filter( value: &TableFilterRef, @@ -125,3 +131,96 @@ pub fn try_from_table_filter( } })) } + +fn nonnegative_number_from_value(value: &ValueRef) -> VortexResult { + match value.extract() { + ExtractedValue::BigInt(i) => { + u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}")) + } + ExtractedValue::Integer(i) => { + u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}")) + } + ExtractedValue::UBigInt(u) => Ok(u), + ExtractedValue::UInteger(u) => Ok(u64::from(u)), + _ => vortex_bail!("unexpected value type"), + } +} + +fn intersect_sorted(left: &[u64], right: &[u64]) -> Vec { + let mut result = Vec::new(); + let (mut i, mut j) = (0, 0); + while i < left.len() && j < right.len() { + match left[i].cmp(&right[j]) { + std::cmp::Ordering::Equal => { + result.push(left[i]); + i += 1; + j += 1; + } + std::cmp::Ordering::Less => i += 1, + std::cmp::Ordering::Greater => j += 1, + } + } + result +} + +/// For constant comparison on IN filters over file_index or file_row_number +/// virtual column, create a selection and a range covering the same range as +/// expressions do. +pub fn try_from_virtual_column_filter( + filter: &TableFilterRef, +) -> VortexResult<(Selection, Option>)> { + match filter.as_class() { + TableFilterClass::InFilter(values) => { + let indices = values + .iter() + .map(nonnegative_number_from_value) + .collect::>>()?; + Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None)) + } + TableFilterClass::ConstantComparison(const_) => { + let n = nonnegative_number_from_value(const_.value)?; + let range = match const_.operator { + DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_EQUAL => Some(n..n + 1), + DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHANOREQUALTO => { + Some(n..u64::MAX) + } + DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHAN => { + Some(n.saturating_add(1)..u64::MAX) + } + DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHANOREQUALTO => { + Some(0..n.saturating_add(1)) + } + DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHAN => Some(0..n), + _ => None, + }; + Ok((Selection::All, range)) + } + TableFilterClass::ConjunctionAnd(conj) => { + let mut start = 0u64; + let mut end = u64::MAX; + let mut indices: Option> = None; + for child in conj.children() { + let (sel, range) = try_from_virtual_column_filter(child)?; + if let Selection::IncludeByIndex(buf) = sel { + indices = Some(match indices { + None => buf.iter().copied().collect(), + Some(existing) => intersect_sorted(&existing, buf.as_ref()), + }); + } + if let Some(r) = range { + start = start.max(r.start); + end = end.min(r.end); + } + } + let range = (start < end).then_some(start..end); + let sel = indices + .map(|v| Selection::IncludeByIndex(Buffer::from_iter(v))) + .unwrap_or(Selection::All); + Ok((sel, range)) + } + TableFilterClass::Optional(child) => { + try_from_virtual_column_filter(child).or_else(|_| Ok((Selection::All, None))) + } + _ => Ok((Selection::All, None)), + } +} diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 84bc82f2db3..a1345a21434 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -8,6 +8,7 @@ //! pushdown, cardinality, and partitioning. use std::fmt::Debug; +use std::ops::Range; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -28,12 +29,16 @@ use vortex::array::optimizer::ArrayOptimizer; use vortex::array::stats::StatsSet; use vortex::dtype::DType; use vortex::dtype::FieldNames; +use vortex::dtype::PType; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::expr::Expression; use vortex::expr::and_collect; +use vortex::expr::cast; use vortex::expr::col; +use vortex::expr::merge; +use vortex::expr::pack; use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; @@ -42,6 +47,7 @@ use vortex::file::v2::FileStatsLayoutReader; use vortex::io::kanal_ext::KanalExt; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::ThreadSafeIterator; +use vortex::layout::layouts::row_idx::row_idx; use vortex::layout::scan::multi::MultiLayoutChild; use vortex::layout::scan::multi::MultiLayoutDataSource; use vortex::metrics::tracing::get_global_labels; @@ -50,6 +56,7 @@ use vortex::scalar::ScalarValue; use vortex::scalar_fn::fns::pack::Pack; use vortex::scan::DataSource; use vortex::scan::ScanRequest; +use vortex::scan::selection::Selection; use vortex_utils::aliases::hash_set::HashSet; use vortex_utils::parallelism::get_available_parallelism; @@ -58,6 +65,7 @@ use crate::SESSION; use crate::convert::ToDuckDBScalar; use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; +use crate::convert::try_from_virtual_column_filter; use crate::duckdb::BindInputRef; use crate::duckdb::BindResultRef; use crate::duckdb::Cardinality; @@ -67,25 +75,25 @@ use crate::duckdb::DataChunkRef; use crate::duckdb::DuckdbStringMapRef; use crate::duckdb::ExpressionRef; use crate::duckdb::LogicalType; +use crate::duckdb::PartitionData; use crate::duckdb::TableFilterSetRef; use crate::duckdb::TableFunction; use crate::duckdb::TableInitInput; +use crate::duckdb::Value; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; -/// Taken from -/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/common/constants.hpp#L44 -/// -/// If DuckDB requests a zero-column projection from read_vortex like count(*), -/// its planner tries to get any column: -/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/planner/operator/logical_get.cpp#L149 -/// -/// If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the -/// first column. As we don't want to fill the output chunk and we can leave -/// it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a -/// virtual column. -/// See virtual_columns in vortex-duckdb/cpp/table_function.cpp -static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; +// See MultiFileReader for constants + +/// "file_index" virtual column +static FILE_INDEX_COLUMN_IDX: u64 = 9223372036854775810; +/// "file_row_number" virtual column +static FILE_ROW_NUMBER_COLUMN_IDX: u64 = 9223372036854775809; + +/// See duckdb/src/common/constants.cpp +fn is_virtual_column(id: u64) -> bool { + id >= 9223372036854775808u64 +} /// A trait for table functions that resolve to a [`DataSourceRef`]. /// @@ -149,14 +157,16 @@ pub struct DataSourceGlobal { batch_id: AtomicU64, bytes_total: Arc, bytes_read: AtomicU64, + file_index_column_pos: Option, + file_row_number_column_pos: Option, } /// Per-thread local scan state. pub struct DataSourceLocal { iterator: DataSourceIterator, exporter: Option, - /// The unique batch id of the last chunk exported via scan(). - batch_id: Option, + partition_index: u64, + file_index: usize, } /// Returns scan progress as a percentage (0.0–100.0). @@ -281,9 +291,19 @@ impl TableFunction for T { let column_ids = init_input.column_ids(); let projection_ids = init_input.projection_ids(); - let projection_expr = - extract_projection_expr(projection_ids, column_ids, &bind_data.column_fields); - let filter_expr = extract_table_filter_expr( + let ProjectionWithVirtualColumns { + projection, + file_index_column_pos, + file_row_number_column_pos, + } = extract_projection_expr(projection_ids, column_ids, &bind_data.column_fields); + + let FilterWithVirtualColumns { + filter, + row_selection, + row_range, + file_selection, + file_range, + } = extract_table_filter_expr( init_input.table_filter_set(), column_ids, &bind_data.column_fields, @@ -291,16 +311,24 @@ impl TableFunction for T { bind_data.data_source.dtype(), )?; - let filter_expr_str = filter_expr + let filter_expr_str = filter .as_ref() .map_or_else(|| "true".to_string(), |f| f.to_string()); - debug!("Global init Vortex scan SELECT {projection_expr} WHERE {filter_expr_str}"); + debug!( + "Global init Vortex scan SELECT {projection} WHERE {filter_expr_str}\n + row selection: {row_selection:?}, row range: {row_range:?}, + file selection: {file_selection:?}, file range: {file_range:?}" + ); let request = ScanRequest { - projection: projection_expr, - filter: filter_expr, - ordered: false, - ..Default::default() + projection, + filter, + ordered: file_row_number_column_pos.is_some(), + selection: row_selection, + row_range, + file_selection, + file_range, + limit: None, }; let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; @@ -318,13 +346,22 @@ impl TableFunction for T { let stream = scan .partitions() .map(move |partition| { - // We create a new conversion cache scoped to the partition, since there's no point - // caching anything across partitions. - let cache = Arc::new(ConversionCache::default()); let tx = tx.clone(); - RUNTIME.handle().spawn(async move { - let mut stream = match partition.and_then(|p| p.execute()) { + let partition = match partition { + Ok(partition) => partition, + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + }; + + let cache = Arc::new(ConversionCache { + file_index: partition.index(), + ..Default::default() + }); + + let mut stream = match partition.execute() { Ok(s) => s, Err(e) => { let _ = tx.send(Err(e)).await; @@ -356,6 +393,8 @@ impl TableFunction for T { batch_id: AtomicU64::new(0), bytes_total: Arc::new(AtomicU64::new(0)), bytes_read: AtomicU64::new(0), + file_index_column_pos, + file_row_number_column_pos, }) } @@ -381,7 +420,8 @@ impl TableFunction for T { Ok(DataSourceLocal { iterator: global.iterator.clone(), exporter: None, - batch_id: None, + partition_index: 0, + file_index: 0, }) } @@ -400,6 +440,7 @@ impl TableFunction for T { }; let (array_result, conversion_cache) = result?; let array_result = array_result.optimize_recursive(ctx.session())?; + local_state.file_index = conversion_cache.file_index; let array_result: StructArray = if let Some(array) = array_result.as_opt::() { @@ -423,15 +464,19 @@ impl TableFunction for T { ctx, )?); // Relaxed since there is no intra-instruction ordering required. - local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); + local_state.partition_index = global_state.batch_id.fetch_add(1, Ordering::Relaxed); } let exporter = local_state .exporter .as_mut() .vortex_expect("error: exporter missing"); + let has_more_data = exporter.export( + chunk, + global_state.file_index_column_pos, + global_state.file_row_number_column_pos, + )?; - let has_more_data = exporter.export(chunk)?; global_state .bytes_read .fetch_add(chunk.len(), Ordering::Relaxed); @@ -439,7 +484,7 @@ impl TableFunction for T { if !has_more_data { // This exporter is fully consumed. local_state.exporter = None; - local_state.batch_id = None; + local_state.partition_index = 0; } else { break; } @@ -447,6 +492,12 @@ impl TableFunction for T { assert!(!chunk.is_empty()); + if let Some(pos) = global_state.file_index_column_pos { + chunk + .get_vector_mut(pos) + .reference_value(&Value::from(local_state.file_index as u64)); + } + Ok(()) } @@ -516,12 +567,14 @@ impl TableFunction for T { fn partition_data( _bind_data: &Self::BindData, - _global_init_data: &Self::GlobalState, + global_init_data: &Self::GlobalState, local_init_data: &mut Self::LocalState, - ) -> VortexResult { - local_init_data - .batch_id - .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) + ) -> PartitionData { + PartitionData { + partition_index: local_init_data.partition_index, + file_index_column_pos: global_init_data.file_index_column_pos, + file_index: local_init_data.file_index, + } } fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) { @@ -557,53 +610,96 @@ fn extract_schema_from_dtype(dtype: &DType) -> VortexResult> { Ok(fields) } -/// Creates a projection expression from raw projection/column ID slices and column names. +struct ProjectionWithVirtualColumns { + projection: Expression, + file_index_column_pos: Option, + file_row_number_column_pos: Option, +} + +/// Creates a projection expression from raw projection/column ID slices and +/// column names. fn extract_projection_expr( projection_ids: Option<&[u64]>, column_ids: &[u64], column_fields: &[DuckdbField], -) -> Expression { - // Projection ids may be empty, in which case you need to use projection_ids +) -> ProjectionWithVirtualColumns { + // If projection ids are empty, use column_ids. // See duckdb/src/planner/operator/logical_get.cpp#L168 - let (projection_ids, has_projection_ids) = match projection_ids { + let (ids, has_projection_ids) = match projection_ids { Some(ids) => (ids, true), None => (column_ids, false), }; - // duckdb index is u64 (size_t) but in Rust u64 and usize are different things. + let mut file_index_column_pos = None; + let mut file_row_number_column_pos = None; + #[expect(clippy::cast_possible_truncation)] - let names = projection_ids + let names = ids .iter() - .filter(|p| **p != EMPTY_COLUMN_IDX) - .map(|mut idx| { - if has_projection_ids { - idx = &column_ids[*idx as usize]; + .enumerate() + .map(|(column_pos, &column_id)| { + let column_id = if has_projection_ids { + column_ids[column_id as usize] + } else { + column_id + }; + + if column_id == FILE_INDEX_COLUMN_IDX { + file_index_column_pos = Some(column_pos); + } + if column_id == FILE_ROW_NUMBER_COLUMN_IDX { + file_row_number_column_pos = Some(column_pos); } - #[expect(clippy::cast_possible_truncation)] - &column_fields - .get(*idx as usize) - .vortex_expect("prune idx in column names") - .name + column_id }) - .map(|s| Arc::from(s.as_str())) + .filter(|&col_id| !is_virtual_column(col_id)) + .map(|col_id| Arc::from(column_fields[col_id as usize].name.as_str())) .collect::(); - select(names, root()) + // file_index column will be filled later when exporting the chunk. + let select = select(names, root()); + let projection = if file_row_number_column_pos.is_some() { + // row_idx will be rearranged to correct position in scan(), prepend + // here + let row_idx = cast(row_idx(), DType::Primitive(PType::I64, false.into())); + let row_idx_struct = pack([("file_row_number", row_idx)], false.into()); + merge([row_idx_struct, select]) + } else { + select + }; + + ProjectionWithVirtualColumns { + projection, + file_index_column_pos, + file_row_number_column_pos, + } +} + +struct FilterWithVirtualColumns { + filter: Option, + row_selection: Selection, + row_range: Option>, + file_selection: Selection, + file_range: Option>, } -/// Creates a table filter expression from the table filter set, column metadata, additional -/// filter expressions, and the top-level DType. +/// Creates a table filter expression, row selection, and row range from the table filter set, +/// column metadata, additional filter expressions, and the top-level DType. fn extract_table_filter_expr( table_filter_set: Option<&TableFilterSetRef>, column_ids: &[u64], column_fields: &[DuckdbField], additional_filters: &[Expression], dtype: &DType, -) -> VortexResult> { +) -> VortexResult { let mut table_filter_exprs: HashSet = if let Some(filter) = table_filter_set { filter .into_iter() + .filter(|(idx, _)| { + let idx_u: usize = idx.as_(); + !is_virtual_column(column_ids[idx_u]) + }) .map(|(idx, ex)| { let idx_u: usize = idx.as_(); let col_idx: usize = column_ids[idx_u].as_(); @@ -617,7 +713,31 @@ fn extract_table_filter_expr( }; table_filter_exprs.extend(additional_filters.iter().cloned()); - Ok(and_collect(table_filter_exprs)) + + let mut file_selection = Selection::All; + let mut row_selection = Selection::All; + let mut row_range = None; + let mut file_range = None; + if let Some(filter) = table_filter_set { + for (idx, expression) in filter.into_iter() { + let idx: usize = idx.as_(); + if column_ids[idx] == FILE_ROW_NUMBER_COLUMN_IDX { + (row_selection, row_range) = try_from_virtual_column_filter(expression)?; + } + if column_ids[idx] == FILE_INDEX_COLUMN_IDX { + (file_selection, file_range) = try_from_virtual_column_filter(expression)?; + } + } + }; + + let out = FilterWithVirtualColumns { + filter: and_collect(table_filter_exprs), + row_selection, + row_range, + file_selection, + file_range, + }; + Ok(out) } #[cfg(test)] diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index ac671f409e8..1b3b8199e0e 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -36,6 +36,12 @@ use crate::duckdb::table_function::statistics::statistics; use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback; use crate::duckdb_try; +pub struct PartitionData { + pub partition_index: u64, + pub file_index_column_pos: Option, + pub file_index: usize, +} + #[derive(Debug, Default)] pub struct ColumnStatistics { pub min: Option, @@ -137,10 +143,10 @@ pub trait TableFunction: Sized + Debug { /// Returns the idx of the current partition being processed by a local threa. /// This *must* be globally unique. fn partition_data( - _bind_data: &Self::BindData, - _global_init_data: &Self::GlobalState, - _local_init_data: &mut Self::LocalState, - ) -> VortexResult; + bind_data: &Self::BindData, + global_init_data: &Self::GlobalState, + local_init_data: &mut Self::LocalState, + ) -> PartitionData; /// Returns a vector of key-value pairs for EXPLAIN output fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef); diff --git a/vortex-duckdb/src/duckdb/table_function/partition.rs b/vortex-duckdb/src/duckdb/table_function/partition.rs index 09d0e8fbd94..d373d6f5623 100644 --- a/vortex-duckdb/src/duckdb/table_function/partition.rs +++ b/vortex-duckdb/src/duckdb/table_function/partition.rs @@ -3,11 +3,9 @@ use std::ffi::c_void; -use cpp::duckdb_vx_error; use vortex::error::VortexExpect; use crate::cpp; -use crate::cpp::idx_t; use crate::duckdb::TableFunction; /// Native callback for the cardinality estimate of a table function. @@ -15,22 +13,18 @@ pub(crate) unsafe extern "C-unwind" fn get_partition_data_callback idx_t { + partition_data_out: *mut cpp::duckdb_vx_partition_data, +) { let bind_data = unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); let global_init_data = unsafe { global_init_data.cast::().as_ref() } .vortex_expect("global_init_data null pointer"); let local_init_data = unsafe { local_init_data.cast::().as_mut() } .vortex_expect("local_init_data null pointer"); + let data = T::partition_data(bind_data, global_init_data, local_init_data); + let out = unsafe { &mut *partition_data_out }; + out.partition_index = data.partition_index; - match T::partition_data(bind_data, global_init_data, local_init_data) { - Ok(batch_id) => batch_id, - Err(e) => { - // Set the error in the error output. - let msg = e.to_string(); - unsafe { error_out.write(cpp::duckdb_vx_error_create(msg.as_ptr().cast(), msg.len())) }; - 0 - } - } + out.file_index_column_pos = data.file_index_column_pos.unwrap_or(usize::MAX); + out.file_index = data.file_index; } diff --git a/vortex-duckdb/src/e2e_test/object_cache_test.rs b/vortex-duckdb/src/e2e_test/object_cache_test.rs index ae9dc38d0db..b21c10b1d11 100644 --- a/vortex-duckdb/src/e2e_test/object_cache_test.rs +++ b/vortex-duckdb/src/e2e_test/object_cache_test.rs @@ -15,6 +15,7 @@ use crate::duckdb::ClientContextRef; use crate::duckdb::ColumnStatistics; use crate::duckdb::DataChunkRef; use crate::duckdb::LogicalType; +use crate::duckdb::PartitionData; use crate::duckdb::TableFunction; use crate::duckdb::TableInitInput; @@ -110,8 +111,12 @@ impl TableFunction for TestTableFunction { _bind_data: &Self::BindData, _global_init_data: &Self::GlobalState, _local_init_data: &mut Self::LocalState, - ) -> VortexResult { - Ok(0) + ) -> PartitionData { + PartitionData { + partition_index: 0, + file_index_column_pos: None, + file_index: 0, + } } fn statistics( diff --git a/vortex-duckdb/src/exporter/cache.rs b/vortex-duckdb/src/exporter/cache.rs index 3b0fd496360..2f495ba9608 100644 --- a/vortex-duckdb/src/exporter/cache.rs +++ b/vortex-duckdb/src/exporter/cache.rs @@ -21,4 +21,5 @@ pub struct ConversionCache { pub dict_cache: DashMap, pub values_cache: DashMap>)>, pub canonical_cache: DashMap, + pub file_index: usize, } diff --git a/vortex-duckdb/src/exporter/mod.rs b/vortex-duckdb/src/exporter/mod.rs index f76b3e41124..517776f5521 100644 --- a/vortex-duckdb/src/exporter/mod.rs +++ b/vortex-duckdb/src/exporter/mod.rs @@ -33,6 +33,7 @@ use vortex::array::arrays::struct_::StructArrayExt; use vortex::buffer::BitChunks; use vortex::encodings::runend::RunEnd; use vortex::encodings::sequence::Sequence; +use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_bail; @@ -74,15 +75,22 @@ impl ArrayExporter { /// Export the data into the next chunk. /// /// Returns `true` if a chunk was exported, `false` if all rows have been exported. - pub fn export(&mut self, chunk: &mut DataChunkRef) -> VortexResult { + pub fn export( + &mut self, + chunk: &mut DataChunkRef, + file_index_column_pos: Option, + file_row_number_column_pos: Option, + ) -> VortexResult { chunk.reset(); if self.remaining == 0 { return Ok(false); } - let expected_cols = self.fields.len(); + let zero_projection = self.fields.is_empty(); + + // file_row_number column is already populated in scan construction + let expected_cols = self.fields.len() + file_index_column_pos.is_some() as usize; let chunk_cols = chunk.column_count(); - let zero_projection = expected_cols == 0; if !zero_projection && chunk_cols != expected_cols { vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}"); } @@ -92,14 +100,48 @@ impl ArrayExporter { self.remaining -= chunk_len; chunk.set_len(chunk_len); - // DuckDB asked us for zero columns. This may happen with aggregation - // functions like count(*). In such case we can leave chunk contents - // uninitialized. See EMPTY_COLUMN_IDX comment why this works. + // If DuckDB requests a zero-column projection from read_vortex like count(*), + // its planner tries to get any column: + // See duckdb/src/planner/operator/logical_get.cpp#L149 + // + // If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the + // first column. As we don't want to fill the output chunk and we can leave + // it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a + // virtual column. + // See virtual_columns in vortex-duckdb/cpp/table_function.cpp if zero_projection { return Ok(true); } - for (i, field) in self.fields.iter_mut().enumerate() { + let mut fields = self.fields.iter(); + // file_row_number column is the first one if present. + if let Some(pos) = file_row_number_column_pos { + let field = fields.next().vortex_expect("field column mismatch"); + field.export( + position, + chunk_len, + chunk.get_vector_mut(pos), + &mut self.ctx, + )?; + } + + for i in 0..chunk_cols { + // file_index column: skip index - it will be filled after + // chunk export. + if let Some(pos) = file_index_column_pos + && i == pos + { + continue; + } + + // file_row_number column: skip index, already filled + if let Some(pos) = file_row_number_column_pos + && i == pos + { + continue; + } + + let field = fields.next().vortex_expect("field count mismatch"); field.export(position, chunk_len, chunk.get_vector_mut(i), &mut self.ctx)?; } diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs index 7d728aa135c..ffe846033bc 100644 --- a/vortex-ffi/src/scan.rs +++ b/vortex-ffi/src/scan.rs @@ -189,6 +189,8 @@ fn scan_request(opts: *const vx_scan_options) -> VortexResult { selection, ordered, limit, + file_selection: Selection::All, + file_range: None, }) } diff --git a/vortex-jni/src/scan.rs b/vortex-jni/src/scan.rs index 36cadfbb201..cea046c990d 100644 --- a/vortex-jni/src/scan.rs +++ b/vortex-jni/src/scan.rs @@ -111,6 +111,8 @@ fn build_scan_request( selection, ordered, limit, + file_selection: Selection::All, + file_range: None, }) } diff --git a/vortex-layout/src/scan/layout.rs b/vortex-layout/src/scan/layout.rs index 4ffdadab2e6..0debb11b1a4 100644 --- a/vortex-layout/src/scan/layout.rs +++ b/vortex-layout/src/scan/layout.rs @@ -289,6 +289,10 @@ impl Partition for LayoutReaderSplit { self } + fn index(&self) -> usize { + 0 + } + fn row_count(&self) -> Option> { let row_count = self.row_range.end - self.row_range.start; let row_count = self.selection.row_count(row_count); @@ -351,6 +355,10 @@ impl Partition for Empty { self } + fn index(&self) -> usize { + 0 + } + fn row_count(&self) -> Option> { Some(Precision::exact(self.row_count)) } diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index a753e0d6271..c4b26690026 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -51,6 +51,7 @@ use vortex_scan::Partition; use vortex_scan::PartitionRef; use vortex_scan::PartitionStream; use vortex_scan::ScanRequest; +use vortex_scan::selection::Selection; use vortex_session::VortexSession; use vortex_utils::parallelism::get_available_parallelism; @@ -309,8 +310,9 @@ impl DataSourceScan for MultiLayoutScan { // `flat_map` is appropriate here. The real I/O work happens when `execute()` is called. ready_stream .chain(deferred_stream) - .flat_map(move |reader_result| match reader_result { - Ok(reader) => reader_partition(reader, session.clone(), request.clone()), + .enumerate() + .flat_map(move |(i, reader_result)| match reader_result { + Ok(reader) => reader_partition(i, reader, session.clone(), request.clone()), Err(e) => stream::once(async move { Err(e) }).boxed(), }) .boxed() @@ -323,6 +325,7 @@ impl DataSourceScan for MultiLayoutScan { /// can match, returns an empty stream. Otherwise, yields a single partition covering the /// reader's full row range. fn reader_partition( + partition_idx: usize, reader: LayoutReaderRef, session: VortexSession, request: ScanRequest, @@ -330,6 +333,26 @@ fn reader_partition( let row_count = reader.row_count(); let row_range = request.row_range.clone().unwrap_or(0..row_count); + let partition_idx_u64: u64 = partition_idx as u64; + if let Some(range) = &request.file_range + && !range.contains(&partition_idx_u64) + { + return stream::empty().boxed(); + }; + match &request.file_selection { + Selection::IncludeByIndex(buffer) => { + if buffer.as_slice().binary_search(&partition_idx_u64).is_err() { + return stream::empty().boxed(); + } + } + Selection::ExcludeByIndex(buffer) => { + if buffer.as_slice().binary_search(&partition_idx_u64).is_ok() { + return stream::empty().boxed(); + } + } + _ => {} + }; + // Check file-level pruning: if the filter can be proven false for the entire row range // using file-level statistics, skip this reader entirely. if let Some(ref filter) = request.filter { @@ -351,6 +374,7 @@ fn reader_partition( row_range: Some(row_range), ..request }, + index: partition_idx, }) as PartitionRef) }) .boxed() @@ -364,6 +388,7 @@ struct MultiLayoutPartition { reader: LayoutReaderRef, session: VortexSession, request: ScanRequest, + index: usize, } impl Partition for MultiLayoutPartition { @@ -371,6 +396,10 @@ impl Partition for MultiLayoutPartition { self } + fn index(&self) -> usize { + self.index + } + fn row_count(&self) -> Option> { let row_range = self.request.row_range.as_ref()?; let row_count = row_range.end - row_range.start; diff --git a/vortex-scan/public-api.lock b/vortex-scan/public-api.lock index 2748e5cc037..a80bad4dd43 100644 --- a/vortex-scan/public-api.lock +++ b/vortex-scan/public-api.lock @@ -54,6 +54,10 @@ pub fn vortex_scan::selection::Selection::fmt(&self, f: &mut core::fmt::Formatte pub struct vortex_scan::ScanRequest +pub vortex_scan::ScanRequest::file_range: core::option::Option> + +pub vortex_scan::ScanRequest::file_selection: vortex_scan::selection::Selection + pub vortex_scan::ScanRequest::filter: core::option::Option pub vortex_scan::ScanRequest::limit: core::option::Option @@ -118,6 +122,8 @@ pub fn vortex_scan::Partition::byte_size(&self) -> core::option::Option) -> vortex_error::VortexResult +pub fn vortex_scan::Partition::index(&self) -> usize + pub fn vortex_scan::Partition::row_count(&self) -> core::option::Option> pub fn vortex_scan::Partition::serialize(&self) -> vortex_error::VortexResult>> diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index edcb5e29979..d17528998b1 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -125,6 +125,10 @@ pub struct ScanRequest { /// A row selection to apply to the scan. The selection identifies rows within the specified /// row range. pub selection: Selection, + /// If we're operating on files, what files to read + pub file_selection: Selection, + /// If we're operating on files, what files to read + pub file_range: Option>, /// Whether the scan should preserve row order. If false, the scan may produce rows in any /// order, for example to enable parallel execution across partitions. pub ordered: bool, @@ -140,8 +144,10 @@ impl Default for ScanRequest { filter: None, row_range: None, selection: Selection::default(), + file_selection: Selection::default(), ordered: false, limit: None, + file_range: None, } } } @@ -169,6 +175,11 @@ pub trait Partition: 'static + Send { /// Downcast the partition to a concrete type. fn as_any(&self) -> &dyn Any; + /// Some unique identifier for partition if it's present. + /// Used mainly to filter out unused partitions with Duckdb's + /// late materialization support + fn index(&self) -> usize; + /// Returns an estimate of the row count for this partition. fn row_count(&self) -> Option>; diff --git a/vortex-sqllogictest/slt/explain.slt b/vortex-sqllogictest/slt/duckdb/explain.slt similarity index 91% rename from vortex-sqllogictest/slt/explain.slt rename to vortex-sqllogictest/slt/duckdb/explain.slt index 81619291bf6..1917d9f0494 100644 --- a/vortex-sqllogictest/slt/explain.slt +++ b/vortex-sqllogictest/slt/duckdb/explain.slt @@ -1,15 +1,13 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors -include ./setup.slt +include ../setup.slt -onlyif duckdb query I COPY (SELECT * FROM (VALUES ('Hello'), ('Hi'), ('Hey')) AS t(str)) TO '$__TEST_DIR__/explain.vortex'; ---- 3 -onlyif duckdb query TT EXPLAIN (FORMAT json) SELECT * FROM '$__TEST_DIR__/explain.vortex'; ---- diff --git a/vortex-sqllogictest/slt/duckdb/file_index.slt b/vortex-sqllogictest/slt/duckdb/file_index.slt new file mode 100644 index 00000000000..ea2693e81a3 --- /dev/null +++ b/vortex-sqllogictest/slt/duckdb/file_index.slt @@ -0,0 +1,50 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +include ../setup.slt + +query I +COPY (SELECT * FROM (VALUES ('Hello'), ('Hi'), ) AS t(str)) TO '$__TEST_DIR__/file_index_1.vortex'; +---- +2 + +query I +COPY (SELECT * FROM (VALUES ('1'), ('2'), ('3')) AS t(str)) TO '$__TEST_DIR__/file_index_2.vortex'; +---- +3 + +query TI +SELECT str, file_index FROM '$__TEST_DIR__/file_index_1.vortex'; +---- +Hello 0 +Hi 0 + +query IT +SELECT file_index, str FROM '$__TEST_DIR__/file_index_2.vortex'; +---- +0 1 +0 2 +0 3 + +query TB +SELECT *, file_index < 2 FROM '$__TEST_DIR__/*.vortex' +ORDER BY str; +---- +1 true +2 true +3 true +Hello true +Hi true + +query IB +SELECT count(*) AS cnt, sum(file_index) <= 3 FROM '$__TEST_DIR__/*.vortex' +ORDER BY cnt; +---- +5 true + +query B +SELECT file_index < 2 FROM '$__TEST_DIR__/*.vortex' +WHERE len(str) > 1; +---- +true +true diff --git a/vortex-sqllogictest/slt/duckdb/file_row_number.slt b/vortex-sqllogictest/slt/duckdb/file_row_number.slt new file mode 100644 index 00000000000..4cb0a459480 --- /dev/null +++ b/vortex-sqllogictest/slt/duckdb/file_row_number.slt @@ -0,0 +1,56 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +include ../setup.slt + +query I +COPY (SELECT * FROM (VALUES ('Hello'), ('Hi')) AS t(str)) TO '$__TEST_DIR__/file_row_1.vortex'; +---- +2 + +query I +COPY (SELECT * FROM (VALUES ('1'), ('2'), ('3'), ('45')) AS t(str)) TO '$__TEST_DIR__/file_row_2.vortex'; +---- +4 + +query TI +SELECT str, file_row_number FROM '$__TEST_DIR__/file_row_1.vortex'; +---- +Hello 0 +Hi 1 + +query IT +SELECT file_row_number, str FROM '$__TEST_DIR__/file_row_2.vortex'; +---- +0 1 +1 2 +2 3 +3 45 + +query IT +SELECT file_row_number, str FROM '$__TEST_DIR__/file_row_2.vortex' +WHERE len(str) = 1; +---- +0 1 +1 2 +2 3 + +query TI +SELECT *, file_row_number FROM '$__TEST_DIR__/*.vortex' +ORDER BY str; +---- +1 0 +2 1 +3 2 +45 3 +Hello 0 +Hi 1 + +query I +SELECT file_row_number FROM '$__TEST_DIR__/*.vortex' +WHERE len(str) > 1 +ORDER BY file_row_number; +---- +0 +1 +3