Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
#include "error.h"
#include "table_filter.h"
#include "duckdb_vx/data.h"
#include <stdint.h>

#ifdef __cplusplus
static_assert(sizeof(idx_t) == 8);
#endif

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -83,6 +88,16 @@ typedef struct {
bool has_null;
} duckdb_column_statistics;

const idx_t INVALID_IDX = UINT64_MAX;

typedef struct {
idx_t partition_index;
// Either INVALID_IDX or position of column in output for file_index column
size_t file_index_column_pos;
// File index for the exported partition.
size_t file_index;
} duckdb_vx_partition_data;

// vtable mimicking subset of TableFunction.
// See duckdb/include/function/tfunc.hpp
typedef struct {
Expand Down Expand Up @@ -123,10 +138,10 @@ typedef struct {

double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);

idx_t (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
duckdb_vx_error *error_out);
void (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
duckdb_vx_partition_data *partition_data_out);
} duckdb_vx_tfunc_vtab_t;

// A single function for configuring the DuckDB table function vtable.
Expand Down
71 changes: 56 additions & 15 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ DUCKDB_INCLUDES_BEGIN
#include "duckdb.h"
#include "duckdb/catalog/catalog.hpp"
#include "duckdb/common/insertion_order_preserving_map.hpp"
#include "duckdb/common/multi_file/multi_file_reader.hpp"
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/connection.hpp"
Expand All @@ -19,6 +20,8 @@ DUCKDB_INCLUDES_END
using namespace duckdb;
using vortex::CData;
using vortex::IntoErrString;
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;

struct CTableFunctionInfo final : TableFunctionInfo {
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
Expand Down Expand Up @@ -334,21 +337,50 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
result.return_types.emplace_back(logical_type);
}

OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
if (input.partition_info.RequiresPartitionColumns()) {
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
}
/**
* Called at planning time to determine whether data is partitioned by a
* given set of columns. Requested columns are GROUP BY parameters i.e. columns
* over which the query aggregates.
*/
TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInput &input) {
const vector<column_t> &ids = input.partition_ids;
// Our data is partitioned by array exporters. Each exporter processes a
// single Array which belongs to a single file. If data is partitioned only
// by file_index, there is one unique value for an Array. Otherwise there
// may be multiple values.
return (ids.size() == 1 && ids[0] == COLUMN_IDENTIFIER_FILE_INDEX)
? TablePartitionInfo::SINGLE_VALUE_PARTITIONS
: TablePartitionInfo::NOT_PARTITIONED;
}

/**
* Duckdb requests this function after exporting the chunk. We answer with
* partition_index we have exported as well as information about constant
* columns in this partition. As data is partitioned by array exporters, in
* each partition ~ exported array file_index is constant.
*/
OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();

duckdb_vx_error error_out = nullptr;
const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
if (error_out) {
throw InvalidInputException(IntoErrString(error_out));
duckdb_vx_partition_data partition_data;
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);

OperatorPartitionData out(partition_data.partition_index);

// file_index_column_pos may be INVALID_IDX, but column_index will never
// be INVALID_IDX, so we can compare directly
for (const column_t column_index : input.partition_info.partition_columns) {
if (column_index == partition_data.file_index_column_pos) {
out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index));
} else {
throw InternalException(StringUtil::Format(
"get_partition_data: requested column_index %d is not constant for given partition",
column_index));
}
}
return OperatorPartitionData(batch_index);
return out;
}

extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
Expand Down Expand Up @@ -377,21 +409,30 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d

tf.projection_pushdown = true;
tf.filter_pushdown = true;
// We can prune out filter columns that are unused in the remainder of the query plan.
// e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
tf.filter_prune = true;
tf.sampling_pushdown = false;
tf.late_materialization = false;

tf.pushdown_complex_filter = c_pushdown_complex_filter;
tf.cardinality = c_cardinality;
tf.get_partition_data = c_get_partition_data;
tf.get_partition_info = get_partition_info;
tf.get_partition_data = get_partition_data;
tf.to_string = c_to_string;
tf.table_scan_progress = c_table_scan_progress;
tf.statistics = c_statistics;

tf.late_materialization = true;
// Columns that uniquely identify a row for deferred re-fetch in a multi
// file scan: (file index, row number in file).
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
};

tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
return {
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
{COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}},
};
};

tf.arguments.resize(vtab->parameter_count);
Expand Down
1 change: 1 addition & 0 deletions vortex-duckdb/src/convert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub use dtype::from_duckdb_table;
pub use expr::try_from_bound_expression;
pub use scalar::*;
pub use table_filter::try_from_table_filter;
pub use table_filter::try_from_virtual_column_filter;
pub use vector::data_chunk_to_vortex;
99 changes: 99 additions & 0 deletions vortex-duckdb/src/convert/table_filter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::ops::Range;
use std::sync::Arc;

use itertools::Itertools;
use vortex::buffer::Buffer;
use vortex::dtype::DType;
use vortex::dtype::Nullability;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::error::vortex_err;
use vortex::expr::Expression;
use vortex::expr::and_collect;
use vortex::expr::get_item;
Expand All @@ -21,10 +24,13 @@ use vortex::scalar::Scalar;
use vortex::scalar_fn::ScalarFnVTableExt;
use vortex::scalar_fn::fns::binary::Binary;
use vortex::scalar_fn::fns::operators::CompareOperator;
use vortex::scan::selection::Selection;

use crate::cpp::DUCKDB_VX_EXPR_TYPE;
use crate::duckdb::ExtractedValue;
use crate::duckdb::TableFilterClass;
use crate::duckdb::TableFilterRef;
use crate::duckdb::ValueRef;

pub fn try_from_table_filter(
value: &TableFilterRef,
Expand Down Expand Up @@ -125,3 +131,96 @@ pub fn try_from_table_filter(
}
}))
}

fn nonnegative_number_from_value(value: &ValueRef) -> VortexResult<u64> {
match value.extract() {
ExtractedValue::BigInt(i) => {
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
}
ExtractedValue::Integer(i) => {
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
}
ExtractedValue::UBigInt(u) => Ok(u),
ExtractedValue::UInteger(u) => Ok(u64::from(u)),
_ => vortex_bail!("unexpected value type"),
}
}

fn intersect_sorted(left: &[u64], right: &[u64]) -> Vec<u64> {
let mut result = Vec::new();
let (mut i, mut j) = (0, 0);
while i < left.len() && j < right.len() {
match left[i].cmp(&right[j]) {
std::cmp::Ordering::Equal => {
result.push(left[i]);
i += 1;
j += 1;
}
std::cmp::Ordering::Less => i += 1,
std::cmp::Ordering::Greater => j += 1,
}
}
Comment on lines +149 to +162
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in a method Selection::merge

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Selection::merge is a more generic (and hard) method to implement. Here we're sure we're handling either Selection::All or Selection::IncludeByIndex.

result
}

/// For constant comparison on IN filters over file_index or file_row_number
/// virtual column, create a selection and a range covering the same range as
/// expressions do.
pub fn try_from_virtual_column_filter(
Comment thread
myrrc marked this conversation as resolved.
filter: &TableFilterRef,
) -> VortexResult<(Selection, Option<Range<u64>>)> {
match filter.as_class() {
TableFilterClass::InFilter(values) => {
let indices = values
.iter()
.map(nonnegative_number_from_value)
.collect::<VortexResult<Vec<u64>>>()?;
Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None))
}
TableFilterClass::ConstantComparison(const_) => {
let n = nonnegative_number_from_value(const_.value)?;
let range = match const_.operator {
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_EQUAL => Some(n..n + 1),
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHANOREQUALTO => {
Some(n..u64::MAX)
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHAN => {
Some(n.saturating_add(1)..u64::MAX)
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHANOREQUALTO => {
Some(0..n.saturating_add(1))
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHAN => Some(0..n),
_ => None,
};
Ok((Selection::All, range))
}
TableFilterClass::ConjunctionAnd(conj) => {
let mut start = 0u64;
let mut end = u64::MAX;
let mut indices: Option<Vec<u64>> = None;
for child in conj.children() {
let (sel, range) = try_from_virtual_column_filter(child)?;
if let Selection::IncludeByIndex(buf) = sel {
indices = Some(match indices {
None => buf.iter().copied().collect(),
Some(existing) => intersect_sorted(&existing, buf.as_ref()),
});
}
if let Some(r) = range {
start = start.max(r.start);
end = end.min(r.end);
}
}
let range = (start < end).then_some(start..end);
let sel = indices
.map(|v| Selection::IncludeByIndex(Buffer::from_iter(v)))
.unwrap_or(Selection::All);
Ok((sel, range))
Comment on lines +202 to +219
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just selection merge?

}
TableFilterClass::Optional(child) => {
try_from_virtual_column_filter(child).or_else(|_| Ok((Selection::All, None)))
}
_ => Ok((Selection::All, None)),
}
}
Loading
Loading