diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index f29a0fd0..3a03ba95 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -44,6 +44,7 @@ mod filter_pushdown; mod full_text_search; mod merge_into; mod physical_plan; +mod procedures; mod relation_planner; pub mod runtime; mod sql_handler; diff --git a/crates/integrations/datafusion/src/procedures.rs b/crates/integrations/datafusion/src/procedures.rs new file mode 100644 index 00000000..36dcbd35 --- /dev/null +++ b/crates/integrations/datafusion/src/procedures.rs @@ -0,0 +1,635 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! CALL procedure support for Paimon tables. +//! +//! Supported procedures: +//! - `CALL sys.create_tag(table => '...', tag => '...', snapshot_id => ...)` +//! - `CALL sys.delete_tag(table => '...', tag => '...')` +//! - `CALL sys.rollback_to(table => '...', snapshot_id => ... | tag => '...')` +//! - `CALL sys.rollback_to_timestamp(table => '...', timestamp => ...)` +//! - `CALL sys.create_tag_from_timestamp(table => '...', tag => '...', timestamp => ...)` + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::arrow::array::StringArray; +use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion::sql::sqlparser::ast::{ + Expr as SqlExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgOperator, + FunctionArguments, ObjectName, Value as SqlValue, +}; +use paimon::catalog::{Catalog, Identifier}; +use paimon::spec::Snapshot; +use paimon::table::{SnapshotManager, Table, TagManager}; + +use crate::error::to_datafusion_error; + +/// Resolve a snapshot by id: try live snapshot file first, then fall back to tag metadata. +async fn resolve_snapshot_by_id( + sm: &SnapshotManager, + tm: &TagManager, + snapshot_id: i64, +) -> DFResult { + if let Ok(snap) = sm.get_snapshot(snapshot_id).await { + return Ok(snap); + } + let tags = tm.list_all().await.map_err(to_datafusion_error)?; + for (_, snap) in &tags { + if snap.id() == snapshot_id { + return Ok(snap.clone()); + } + } + Err(DataFusionError::Plan(format!( + "Snapshot '{snapshot_id}' does not exist in live files or tag metadata" + ))) +} + +/// Find the earliest snapshot with commit time >= timestamp_millis, +/// considering both live snapshots and tag-retained snapshots. +async fn later_or_equal_from_all( + sm: &SnapshotManager, + tm: &TagManager, + timestamp_millis: i64, +) -> DFResult> { + let live = sm + .later_or_equal_time_millis(timestamp_millis) + .await + .map_err(to_datafusion_error)?; + let tags = tm.list_all().await.map_err(to_datafusion_error)?; + let tag_candidate = tags + .into_iter() + .map(|(_, snap)| snap) + .filter(|s| (s.time_millis() as i64) >= timestamp_millis) + .min_by_key(|s| s.time_millis()); + match (live, tag_candidate) { + (Some(a), Some(b)) => { + if a.time_millis() <= b.time_millis() { + Ok(Some(a)) + } else { + Ok(Some(b)) + } + } + (Some(a), None) => Ok(Some(a)), + (None, Some(b)) => Ok(Some(b)), + (None, None) => Ok(None), + } +} + +/// Find the latest snapshot with commit time <= timestamp_millis, +/// considering both live snapshots and tag-retained snapshots. +async fn earlier_or_equal_from_all( + sm: &SnapshotManager, + tm: &TagManager, + timestamp_millis: i64, +) -> DFResult> { + let live = sm + .earlier_or_equal_time_millis(timestamp_millis) + .await + .map_err(to_datafusion_error)?; + let tags = tm.list_all().await.map_err(to_datafusion_error)?; + let tag_candidate = tags + .into_iter() + .map(|(_, snap)| snap) + .filter(|s| (s.time_millis() as i64) <= timestamp_millis) + .max_by_key(|s| s.time_millis()); + match (live, tag_candidate) { + (Some(a), Some(b)) => { + if a.time_millis() >= b.time_millis() { + Ok(Some(a)) + } else { + Ok(Some(b)) + } + } + (Some(a), None) => Ok(Some(a)), + (None, Some(b)) => Ok(Some(b)), + (None, None) => Ok(None), + } +} + +pub async fn execute_call( + ctx: &SessionContext, + catalog: &Arc, + catalog_name: &str, + func: &Function, +) -> DFResult { + let proc_name = extract_procedure_name(&func.name)?; + let args = extract_named_args(&func.args)?; + + match proc_name.as_str() { + "create_tag" => proc_create_tag(ctx, catalog, catalog_name, &args).await, + "delete_tag" => proc_delete_tag(ctx, catalog, catalog_name, &args).await, + "rollback_to" => proc_rollback_to(ctx, catalog, catalog_name, &args).await, + "rollback_to_timestamp" => { + proc_rollback_to_timestamp(ctx, catalog, catalog_name, &args).await + } + "create_tag_from_timestamp" => { + proc_create_tag_from_timestamp(ctx, catalog, catalog_name, &args).await + } + _ => Err(DataFusionError::Plan(format!( + "Unknown procedure: {proc_name}" + ))), + } +} + +fn extract_procedure_name(name: &ObjectName) -> DFResult { + let parts: Vec = name + .0 + .iter() + .filter_map(|p| p.as_ident().map(|id| id.value.clone())) + .collect(); + match parts.len() { + 1 => Ok(parts[0].clone()), + 2 => Ok(parts[1].clone()), + _ => Err(DataFusionError::Plan(format!( + "Invalid procedure name: {name}. Expected sys.procedure_name or procedure_name" + ))), + } +} + +fn extract_named_args(args: &FunctionArguments) -> DFResult> { + let arg_list = match args { + FunctionArguments::List(list) => &list.args, + FunctionArguments::None => return Ok(HashMap::new()), + _ => { + return Err(DataFusionError::Plan( + "Unsupported argument format for CALL".to_string(), + )) + } + }; + + let mut map = HashMap::new(); + for arg in arg_list { + match arg { + FunctionArg::Named { + name, + arg: FunctionArgExpr::Expr(expr), + operator: FunctionArgOperator::RightArrow, + } => { + let value = expr_to_string(expr)?; + map.insert(name.value.to_lowercase(), value); + } + _ => return Err(DataFusionError::Plan( + "CALL procedures require named arguments with '=>' syntax, e.g. table => 'db.t'" + .to_string(), + )), + } + } + Ok(map) +} + +fn expr_to_string(expr: &SqlExpr) -> DFResult { + match expr { + SqlExpr::Value(v) => match &v.value { + SqlValue::SingleQuotedString(s) => Ok(s.clone()), + SqlValue::Number(n, _) => Ok(n.clone()), + SqlValue::Boolean(b) => Ok(b.to_string()), + _ => Err(DataFusionError::Plan(format!( + "Unsupported argument value: {v}" + ))), + }, + SqlExpr::UnaryOp { + op: datafusion::sql::sqlparser::ast::UnaryOperator::Minus, + expr, + } => { + let inner = expr_to_string(expr)?; + Ok(format!("-{inner}")) + } + _ => Err(DataFusionError::Plan(format!( + "Unsupported argument expression: {expr}" + ))), + } +} + +fn require_arg<'a>(args: &'a HashMap, name: &str) -> DFResult<&'a str> { + args.get(name) + .map(|s| s.as_str()) + .ok_or_else(|| DataFusionError::Plan(format!("Missing required argument: '{name}'"))) +} + +fn resolve_table_identifier(table_str: &str, catalog_name: &str) -> DFResult { + let parts: Vec<&str> = table_str.split('.').collect(); + match parts.len() { + 2 => Ok(Identifier::new(parts[0], parts[1])), + 3 => { + if parts[0] != catalog_name { + return Err(DataFusionError::Plan(format!( + "Catalog name mismatch: expected '{catalog_name}', got '{}'", + parts[0] + ))); + } + Ok(Identifier::new(parts[1], parts[2])) + } + _ => Err(DataFusionError::Plan(format!( + "Invalid table identifier: '{table_str}'. Expected 'database.table' or 'catalog.database.table'" + ))), + } +} + +async fn get_table( + catalog: &Arc, + catalog_name: &str, + args: &HashMap, +) -> DFResult { + let table_str = require_arg(args, "table")?; + let identifier = resolve_table_identifier(table_str, catalog_name)?; + catalog + .get_table(&identifier) + .await + .map_err(to_datafusion_error) +} + +fn managers(table: &Table) -> (SnapshotManager, TagManager) { + let sm = SnapshotManager::new(table.file_io().clone(), table.location().to_string()); + let tm = TagManager::new(table.file_io().clone(), table.location().to_string()); + (sm, tm) +} + +async fn proc_create_tag( + ctx: &SessionContext, + catalog: &Arc, + catalog_name: &str, + args: &HashMap, +) -> DFResult { + let table = get_table(catalog, catalog_name, args).await?; + let tag_name = require_arg(args, "tag")?; + let snapshot_id: Option = args + .get("snapshot_id") + .map(|s| { + s.parse() + .map_err(|_| DataFusionError::Plan(format!("Invalid snapshot_id: '{s}'"))) + }) + .transpose()?; + + let (sm, tm) = managers(&table); + if tm.tag_exists(tag_name).await.map_err(to_datafusion_error)? { + return Err(DataFusionError::Plan(format!( + "Tag '{tag_name}' already exists" + ))); + } + let snapshot = if let Some(id) = snapshot_id { + resolve_snapshot_by_id(&sm, &tm, id).await? + } else { + sm.get_latest_snapshot() + .await + .map_err(to_datafusion_error)? + .ok_or_else(|| DataFusionError::Plan("No snapshots exist".to_string()))? + }; + tm.create(tag_name, &snapshot) + .await + .map_err(to_datafusion_error)?; + ok_result(ctx) +} + +async fn proc_delete_tag( + ctx: &SessionContext, + catalog: &Arc, + catalog_name: &str, + args: &HashMap, +) -> DFResult { + let table = get_table(catalog, catalog_name, args).await?; + let tag_str = require_arg(args, "tag")?; + + let (_, tm) = managers(&table); + for tag_name in tag_str.split(',') { + let tag_name = tag_name.trim(); + if tag_name.is_empty() { + continue; + } + if !tm.tag_exists(tag_name).await.map_err(to_datafusion_error)? { + continue; + } + tm.delete(tag_name).await.map_err(to_datafusion_error)?; + } + ok_result(ctx) +} + +async fn clean_larger_than( + sm: &SnapshotManager, + tm: &TagManager, + retained_snapshot_id: i64, +) -> DFResult<()> { + // 1. Update LATEST hint + sm.write_latest_hint(retained_snapshot_id) + .await + .map_err(to_datafusion_error)?; + + // 2. Delete snapshots newer than the target + let all_ids = sm.list_all_ids().await.map_err(to_datafusion_error)?; + for &id in all_ids.iter().rev() { + if id <= retained_snapshot_id { + break; + } + sm.delete_snapshot(id).await.map_err(to_datafusion_error)?; + } + + // TODO: clean long-lived changelogs newer than retained_snapshot_id + // Java's RollbackHelper.cleanLargerThan also calls cleanLongLivedChangelogs here. + // Implement once ChangelogManager is available. + + // 3. Delete tags that reference snapshots newer than the target + let tags = tm.list_all().await.map_err(to_datafusion_error)?; + for (name, snap) in tags.iter().rev() { + if snap.id() <= retained_snapshot_id { + continue; + } + tm.delete(name).await.map_err(to_datafusion_error)?; + } + + Ok(()) +} + +async fn proc_rollback_to( + ctx: &SessionContext, + catalog: &Arc, + catalog_name: &str, + args: &HashMap, +) -> DFResult { + let table = get_table(catalog, catalog_name, args).await?; + + if let Some(rest_env) = table.rest_env() { + if let Some(id_str) = args.get("snapshot_id") { + let id: i64 = id_str + .parse() + .map_err(|_| DataFusionError::Plan(format!("Invalid snapshot_id: '{id_str}'")))?; + rest_env + .api() + .rollback_to_snapshot(rest_env.identifier(), id) + .await + .map_err(to_datafusion_error)?; + } else if let Some(tag_name) = args.get("tag") { + rest_env + .api() + .rollback_to_tag(rest_env.identifier(), tag_name) + .await + .map_err(to_datafusion_error)?; + } else { + return Err(DataFusionError::Plan( + "rollback_to requires either 'snapshot_id' or 'tag' argument".to_string(), + )); + } + } else { + let (sm, tm) = managers(&table); + if let Some(id_str) = args.get("snapshot_id") { + let id: i64 = id_str + .parse() + .map_err(|_| DataFusionError::Plan(format!("Invalid snapshot_id: '{id_str}'")))?; + let snapshot = resolve_snapshot_by_id(&sm, &tm, id).await?; + clean_larger_than(&sm, &tm, id).await?; + if !sm + .file_io() + .exists(&sm.snapshot_path(id)) + .await + .map_err(to_datafusion_error)? + { + sm.commit_snapshot(&snapshot) + .await + .map_err(to_datafusion_error)?; + sm.write_earliest_hint(id) + .await + .map_err(to_datafusion_error)?; + } + } else if let Some(tag_name) = args.get("tag") { + let snapshot = tm + .get(tag_name) + .await + .map_err(to_datafusion_error)? + .ok_or_else(|| DataFusionError::Plan(format!("Tag '{tag_name}' does not exist")))?; + let snapshot_id = snapshot.id(); + clean_larger_than(&sm, &tm, snapshot_id).await?; + if !sm + .file_io() + .exists(&sm.snapshot_path(snapshot_id)) + .await + .map_err(to_datafusion_error)? + { + sm.commit_snapshot(&snapshot) + .await + .map_err(to_datafusion_error)?; + sm.write_earliest_hint(snapshot_id) + .await + .map_err(to_datafusion_error)?; + } + } else { + return Err(DataFusionError::Plan( + "rollback_to requires either 'snapshot_id' or 'tag' argument".to_string(), + )); + } + } + + ok_result(ctx) +} + +async fn proc_rollback_to_timestamp( + ctx: &SessionContext, + catalog: &Arc, + catalog_name: &str, + args: &HashMap, +) -> DFResult { + let table = get_table(catalog, catalog_name, args).await?; + let ts_str = require_arg(args, "timestamp")?; + let timestamp: i64 = ts_str + .parse() + .map_err(|_| DataFusionError::Plan(format!("Invalid timestamp: '{ts_str}'")))?; + + let (sm, tm) = managers(&table); + let snapshot = earlier_or_equal_from_all(&sm, &tm, timestamp) + .await? + .ok_or_else(|| { + DataFusionError::Plan(format!("No snapshot found with commit time <= {timestamp}")) + })?; + + if let Some(rest_env) = table.rest_env() { + rest_env + .api() + .rollback_to_snapshot(rest_env.identifier(), snapshot.id()) + .await + .map_err(to_datafusion_error)?; + } else { + clean_larger_than(&sm, &tm, snapshot.id()).await?; + } + ok_result(ctx) +} + +async fn proc_create_tag_from_timestamp( + ctx: &SessionContext, + catalog: &Arc, + catalog_name: &str, + args: &HashMap, +) -> DFResult { + let table = get_table(catalog, catalog_name, args).await?; + let tag_name = require_arg(args, "tag")?; + let ts_str = require_arg(args, "timestamp")?; + let timestamp: i64 = ts_str + .parse() + .map_err(|_| DataFusionError::Plan(format!("Invalid timestamp: '{ts_str}'")))?; + + let (sm, tm) = managers(&table); + let snapshot = later_or_equal_from_all(&sm, &tm, timestamp) + .await? + .ok_or_else(|| { + DataFusionError::Plan(format!("No snapshot found with commit time >= {timestamp}")) + })?; + + if tm.tag_exists(tag_name).await.map_err(to_datafusion_error)? { + return Err(DataFusionError::Plan(format!( + "Tag '{tag_name}' already exists" + ))); + } + tm.create(tag_name, &snapshot) + .await + .map_err(to_datafusion_error)?; + ok_result(ctx) +} + +fn ok_result(ctx: &SessionContext) -> DFResult { + let schema = Arc::new(Schema::new(vec![Field::new( + "result", + ArrowDataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec!["OK"]))], + )?; + ctx.read_batch(batch) +} + +#[cfg(test)] +mod tests { + use super::*; + use paimon::io::FileIOBuilder; + use paimon::spec::CommitKind; + + fn test_file_io() -> paimon::io::FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn test_snapshot(id: i64, time_millis: u64) -> Snapshot { + Snapshot::builder() + .version(3) + .id(id) + .schema_id(0) + .base_manifest_list("base-list".to_string()) + .delta_manifest_list("delta-list".to_string()) + .commit_user("test-user".to_string()) + .commit_identifier(0) + .commit_kind(CommitKind::APPEND) + .time_millis(time_millis) + .build() + } + + async fn setup(table_path: &str) -> (paimon::io::FileIO, SnapshotManager, TagManager) { + let file_io = test_file_io(); + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io.mkdirs(&format!("{table_path}/tag/")).await.unwrap(); + let sm = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let tm = TagManager::new(file_io.clone(), table_path.to_string()); + (file_io, sm, tm) + } + + #[tokio::test] + async fn test_resolve_snapshot_by_id_live() { + let (_, sm, tm) = setup("memory:/test_resolve_live").await; + let snap = test_snapshot(1, 1000); + sm.commit_snapshot(&snap).await.unwrap(); + + let result = resolve_snapshot_by_id(&sm, &tm, 1).await.unwrap(); + assert_eq!(result.id(), 1); + } + + #[tokio::test] + async fn test_resolve_snapshot_by_id_tag_fallback() { + let (_, sm, tm) = setup("memory:/test_resolve_tag").await; + let snap = test_snapshot(1, 1000); + tm.create("v1", &snap).await.unwrap(); + + let result = resolve_snapshot_by_id(&sm, &tm, 1).await.unwrap(); + assert_eq!(result.id(), 1); + } + + #[tokio::test] + async fn test_resolve_snapshot_by_id_not_found() { + let (_, sm, tm) = setup("memory:/test_resolve_none").await; + let result = resolve_snapshot_by_id(&sm, &tm, 99).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_later_or_equal_exact_live() { + let (_, sm, tm) = setup("memory:/test_later_exact").await; + sm.commit_snapshot(&test_snapshot(1, 1000)).await.unwrap(); + sm.commit_snapshot(&test_snapshot(2, 2000)).await.unwrap(); + + let result = later_or_equal_from_all(&sm, &tm, 1000).await.unwrap(); + assert_eq!(result.unwrap().id(), 1); + } + + #[tokio::test] + async fn test_later_or_equal_tag_better() { + let (_, sm, tm) = setup("memory:/test_later_tag_better").await; + sm.commit_snapshot(&test_snapshot(3, 3000)).await.unwrap(); + tm.create("v2", &test_snapshot(2, 2000)).await.unwrap(); + + let result = later_or_equal_from_all(&sm, &tm, 1500).await.unwrap(); + assert_eq!(result.unwrap().id(), 2); + } + + #[tokio::test] + async fn test_later_or_equal_only_tag() { + let (_, sm, tm) = setup("memory:/test_later_only_tag").await; + tm.create("v1", &test_snapshot(1, 1000)).await.unwrap(); + + let result = later_or_equal_from_all(&sm, &tm, 500).await.unwrap(); + assert_eq!(result.unwrap().id(), 1); + } + + #[tokio::test] + async fn test_earlier_or_equal_exact_live() { + let (_, sm, tm) = setup("memory:/test_earlier_exact").await; + sm.commit_snapshot(&test_snapshot(1, 1000)).await.unwrap(); + sm.commit_snapshot(&test_snapshot(2, 2000)).await.unwrap(); + + let result = earlier_or_equal_from_all(&sm, &tm, 2000).await.unwrap(); + assert_eq!(result.unwrap().id(), 2); + } + + #[tokio::test] + async fn test_earlier_or_equal_tag_better() { + let (_, sm, tm) = setup("memory:/test_earlier_tag_better").await; + sm.commit_snapshot(&test_snapshot(1, 1000)).await.unwrap(); + tm.create("v2", &test_snapshot(2, 2000)).await.unwrap(); + + let result = earlier_or_equal_from_all(&sm, &tm, 2500).await.unwrap(); + assert_eq!(result.unwrap().id(), 2); + } + + #[tokio::test] + async fn test_earlier_or_equal_only_tag() { + let (_, sm, tm) = setup("memory:/test_earlier_only_tag").await; + tm.create("v1", &test_snapshot(1, 1000)).await.unwrap(); + + let result = earlier_or_equal_from_all(&sm, &tm, 1500).await.unwrap(); + assert_eq!(result.unwrap().id(), 1); + } +} diff --git a/crates/integrations/datafusion/src/sql_handler.rs b/crates/integrations/datafusion/src/sql_handler.rs index 8993c134..f68a2100 100644 --- a/crates/integrations/datafusion/src/sql_handler.rs +++ b/crates/integrations/datafusion/src/sql_handler.rs @@ -199,6 +199,10 @@ impl PaimonSqlHandler { self.ctx.sql(sql).await } Statement::Truncate(truncate) => self.handle_truncate_table(truncate).await, + Statement::Call(func) => { + crate::procedures::execute_call(&self.ctx, &self.catalog, &self.catalog_name, func) + .await + } _ => self.ctx.sql(sql).await, } } diff --git a/crates/integrations/datafusion/tests/procedures.rs b/crates/integrations/datafusion/tests/procedures.rs new file mode 100644 index 00000000..532978a8 --- /dev/null +++ b/crates/integrations/datafusion/tests/procedures.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod common; + +use common::{exec, row_count, setup_handler}; + +async fn setup_table_with_snapshots() -> (tempfile::TempDir, paimon_datafusion::PaimonSqlHandler) { + let (tmp, handler) = setup_handler().await; + exec( + &handler, + "CREATE TABLE paimon.test_db.t1 (id INT, name VARCHAR(100), PRIMARY KEY (id))", + ) + .await; + // Insert data to create snapshot 1 + exec( + &handler, + "INSERT INTO paimon.test_db.t1 VALUES (1, 'alice')", + ) + .await; + // Insert data to create snapshot 2 + exec(&handler, "INSERT INTO paimon.test_db.t1 VALUES (2, 'bob')").await; + // Insert data to create snapshot 3 + exec( + &handler, + "INSERT INTO paimon.test_db.t1 VALUES (3, 'charlie')", + ) + .await; + (tmp, handler) +} + +#[tokio::test] +async fn test_create_tag() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + // Create tag from latest snapshot + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v1')", + ) + .await; + + // Verify tag exists via $tags system table + let count = row_count( + &handler, + "SELECT * FROM paimon.test_db.`t1$tags` WHERE tag_name = 'v1'", + ) + .await; + assert_eq!(count, 1); +} + +#[tokio::test] +async fn test_create_tag_with_snapshot_id() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v1', snapshot_id => '1')", + ) + .await; + + let count = row_count( + &handler, + "SELECT * FROM paimon.test_db.`t1$tags` WHERE tag_name = 'v1'", + ) + .await; + assert_eq!(count, 1); +} + +#[tokio::test] +async fn test_create_tag_already_exists() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v1')", + ) + .await; + + let result = handler + .sql("CALL sys.create_tag(table => 'test_db.t1', tag => 'v1')") + .await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("already exists")); +} + +#[tokio::test] +async fn test_delete_tag() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v1')", + ) + .await; + exec( + &handler, + "CALL sys.delete_tag(table => 'test_db.t1', tag => 'v1')", + ) + .await; + + let count = row_count( + &handler, + "SELECT * FROM paimon.test_db.`t1$tags` WHERE tag_name = 'v1'", + ) + .await; + assert_eq!(count, 0); +} + +#[tokio::test] +async fn test_delete_multiple_tags() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v1')", + ) + .await; + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v2', snapshot_id => '1')", + ) + .await; + + exec( + &handler, + "CALL sys.delete_tag(table => 'test_db.t1', tag => 'v1,v2')", + ) + .await; + + let count = row_count(&handler, "SELECT * FROM paimon.test_db.`t1$tags`").await; + assert_eq!(count, 0); +} + +#[tokio::test] +async fn test_rollback_to_snapshot() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + // We have 3 snapshots. Rollback to snapshot 1. + exec( + &handler, + "CALL sys.rollback_to(table => 'test_db.t1', snapshot_id => '1')", + ) + .await; + + // After rollback, only snapshot 1 data should be visible + let count = row_count(&handler, "SELECT * FROM paimon.test_db.t1").await; + assert_eq!(count, 1); +} + +#[tokio::test] +async fn test_rollback_to_tag() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + // Create tag on snapshot 1 + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v1', snapshot_id => '1')", + ) + .await; + + // Rollback to tag + exec( + &handler, + "CALL sys.rollback_to(table => 'test_db.t1', tag => 'v1')", + ) + .await; + + let count = row_count(&handler, "SELECT * FROM paimon.test_db.t1").await; + assert_eq!(count, 1); +} + +#[tokio::test] +async fn test_rollback_to_timestamp() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + // Get the timestamp of snapshot 1 from $snapshots system table + let batches = handler + .sql("SELECT snapshot_id, commit_time FROM paimon.test_db.`t1$snapshots` ORDER BY snapshot_id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Use a timestamp between snapshot 1 and snapshot 2 + let snap1_time = batches[0] + .column_by_name("commit_time") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + + exec( + &handler, + &format!( + "CALL sys.rollback_to_timestamp(table => 'test_db.t1', timestamp => '{snap1_time}')" + ), + ) + .await; + + let count = row_count(&handler, "SELECT * FROM paimon.test_db.t1").await; + assert_eq!(count, 1); +} + +#[tokio::test] +async fn test_create_tag_from_timestamp() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + // Get the timestamp of snapshot 2 + let batches = handler + .sql("SELECT snapshot_id, commit_time FROM paimon.test_db.`t1$snapshots` ORDER BY snapshot_id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let snap2_time = batches[0] + .column_by_name("commit_time") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(1); + + exec( + &handler, + &format!( + "CALL sys.create_tag_from_timestamp(table => 'test_db.t1', tag => 'ts_tag', timestamp => '{snap2_time}')" + ), + ) + .await; + + let count = row_count( + &handler, + "SELECT * FROM paimon.test_db.`t1$tags` WHERE tag_name = 'ts_tag'", + ) + .await; + assert_eq!(count, 1); +} + +#[tokio::test] +async fn test_rollback_cleans_newer_tags() { + let (_tmp, handler) = setup_table_with_snapshots().await; + + // Create tags on snapshot 2 and 3 + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v2', snapshot_id => '2')", + ) + .await; + exec( + &handler, + "CALL sys.create_tag(table => 'test_db.t1', tag => 'v3', snapshot_id => '3')", + ) + .await; + + // Rollback to snapshot 1 — tags v2 and v3 should be cleaned + exec( + &handler, + "CALL sys.rollback_to(table => 'test_db.t1', snapshot_id => '1')", + ) + .await; + + let count = row_count(&handler, "SELECT * FROM paimon.test_db.`t1$tags`").await; + assert_eq!(count, 0); +} diff --git a/crates/paimon/src/api/resource_paths.rs b/crates/paimon/src/api/resource_paths.rs index 9345310a..78dc8b5c 100644 --- a/crates/paimon/src/api/resource_paths.rs +++ b/crates/paimon/src/api/resource_paths.rs @@ -143,6 +143,18 @@ impl ResourcePaths { RESTUtil::encode_string(table_name) ) } + + /// Get the rollback endpoint path for a table. + pub fn rollback(&self, database_name: &str, table_name: &str) -> String { + format!( + "{}/{}/{}/{}/{}/rollback", + self.base_path, + Self::DATABASES, + RESTUtil::encode_string(database_name), + Self::TABLES, + RESTUtil::encode_string(table_name) + ) + } } #[cfg(test)] diff --git a/crates/paimon/src/api/rest_api.rs b/crates/paimon/src/api/rest_api.rs index 8a3416ed..dcd3d971 100644 --- a/crates/paimon/src/api/rest_api.rs +++ b/crates/paimon/src/api/rest_api.rs @@ -419,4 +419,44 @@ impl RESTApi { .and_then(|v| v.as_bool()) .unwrap_or(false)) } + + /// Rollback a table to a specific snapshot. + pub async fn rollback_to_snapshot( + &self, + identifier: &Identifier, + snapshot_id: i64, + ) -> Result<()> { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?; + let path = self.resource_paths.rollback(database, table); + let request = serde_json::json!({ + "instant": { + "type": "snapshot", + "snapshotId": snapshot_id, + } + }); + let _resp: serde_json::Value = self.client.post(&path, &request).await?; + Ok(()) + } + + /// Rollback a table to a specific tag. + pub async fn rollback_to_tag(&self, identifier: &Identifier, tag_name: &str) -> Result<()> { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[ + (database, "database name"), + (table, "table name"), + (tag_name, "tag name"), + ])?; + let path = self.resource_paths.rollback(database, table); + let request = serde_json::json!({ + "instant": { + "type": "tag", + "tagName": tag_name, + } + }); + let _resp: serde_json::Value = self.client.post(&path, &request).await?; + Ok(()) + } } diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 41f98cd9..7b6e2932 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -140,6 +140,11 @@ impl Table { &self.schema_manager } + /// Get the REST environment, if this table was loaded from a REST catalog. + pub fn rest_env(&self) -> Option<&RESTEnv> { + self.rest_env.as_ref() + } + /// Create a read builder for scan/read. /// /// Reference: [pypaimon FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/table/file_store_table.py). diff --git a/crates/paimon/src/table/rest_env.rs b/crates/paimon/src/table/rest_env.rs index bd1b42ac..e7b11072 100644 --- a/crates/paimon/src/table/rest_env.rs +++ b/crates/paimon/src/table/rest_env.rs @@ -50,6 +50,16 @@ impl RESTEnv { } } + /// Get the REST API client. + pub fn api(&self) -> &Arc { + &self.api + } + + /// Get the table identifier. + pub fn identifier(&self) -> &Identifier { + &self.identifier + } + /// Create a `RESTSnapshotCommit` from this environment. pub fn snapshot_commit(&self) -> Arc { Arc::new(RESTSnapshotCommit::new( diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 1e8e1735..0412c9bf 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -282,40 +282,97 @@ impl SnapshotManager { .await } + /// Delete a snapshot file by id. + pub async fn delete_snapshot(&self, snapshot_id: i64) -> crate::Result<()> { + let path = self.snapshot_path(snapshot_id); + self.file_io.delete_file(&path).await + } + + /// Update the EARLIEST hint file. + pub async fn write_earliest_hint(&self, snapshot_id: i64) -> crate::Result<()> { + let hint_path = self.earliest_hint_path(); + let output = self.file_io.new_output(&hint_path)?; + output + .write(bytes::Bytes::from(snapshot_id.to_string())) + .await + } + + /// Returns the first snapshot whose commit time is later than or equal to the given + /// `timestamp_millis`. If no such snapshot exists, returns None. + /// + /// Uses binary search over the actual snapshot ID list to handle gaps from deleted snapshots. + pub async fn later_or_equal_time_millis( + &self, + timestamp_millis: i64, + ) -> crate::Result> { + let ids = self.list_all_ids().await?; + if ids.is_empty() { + return Ok(None); + } + + let latest_snapshot = self.get_snapshot(*ids.last().unwrap()).await?; + if (latest_snapshot.time_millis() as i64) < timestamp_millis { + return Ok(None); + } + + let mut lo: usize = 0; + let mut hi: usize = ids.len() - 1; + let mut result: Option = None; + while lo <= hi { + let mid = lo + (hi - lo) / 2; + let snapshot = self.get_snapshot(ids[mid]).await?; + let commit_time = snapshot.time_millis() as i64; + if commit_time < timestamp_millis { + lo = mid + 1; + } else if commit_time > timestamp_millis { + if mid == 0 { + result = Some(snapshot); + break; + } + hi = mid - 1; + result = Some(snapshot); + } else { + result = Some(snapshot); + break; + } + } + Ok(result) + } + /// Returns the snapshot whose commit time is earlier than or equal to the given /// `timestamp_millis`. If no such snapshot exists, returns None. /// - /// Uses binary search over snapshot IDs (assumes monotonically increasing commit times). + /// Uses binary search over the actual snapshot ID list to handle gaps from deleted snapshots. /// /// Reference: [SnapshotManager.earlierOrEqualTimeMills](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java) - pub async fn earlier_or_equal_time_mills( + pub async fn earlier_or_equal_time_millis( &self, timestamp_millis: i64, ) -> crate::Result> { - let mut latest = match self.get_latest_snapshot_id().await? { - Some(id) => id, - None => return Ok(None), - }; - - let earliest_snapshot = match self.earliest_snapshot_id().await? { - Some(id) => self.get_snapshot(id).await?, - None => return Ok(None), - }; + let ids = self.list_all_ids().await?; + if ids.is_empty() { + return Ok(None); + } + let earliest_snapshot = self.get_snapshot(ids[0]).await?; if (earliest_snapshot.time_millis() as i64) > timestamp_millis { return Ok(None); } - let mut earliest = earliest_snapshot.id(); + let mut lo: usize = 0; + let mut hi: usize = ids.len() - 1; let mut result: Option = None; - while earliest <= latest { - let mid = earliest + (latest - earliest) / 2; - let snapshot = self.get_snapshot(mid).await?; + while lo <= hi { + let mid = lo + (hi - lo) / 2; + let snapshot = self.get_snapshot(ids[mid]).await?; let commit_time = snapshot.time_millis() as i64; if commit_time > timestamp_millis { - latest = mid - 1; + if mid == 0 { + break; + } + hi = mid - 1; } else if commit_time < timestamp_millis { - earliest = mid + 1; + lo = mid + 1; result = Some(snapshot); } else { result = Some(snapshot); @@ -324,6 +381,14 @@ impl SnapshotManager { } Ok(result) } + + #[deprecated(note = "Renamed to earlier_or_equal_time_millis")] + pub async fn earlier_or_equal_time_mills( + &self, + timestamp_millis: i64, + ) -> crate::Result> { + self.earlier_or_equal_time_millis(timestamp_millis).await + } } #[cfg(test)] diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 02117040..a5ff1034 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -370,7 +370,7 @@ impl<'a> TableScan<'a> { match core_options.try_time_travel_selector()? { Some(TimeTravelSelector::TimestampMillis(ts)) => { - match snapshot_manager.earlier_or_equal_time_mills(ts).await? { + match snapshot_manager.earlier_or_equal_time_millis(ts).await? { Some(s) => Ok(Some(s)), None => Err(Error::DataInvalid { message: format!("No snapshot found with timestamp <= {ts}"), diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index 6c38499f..14a45d7c 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -115,6 +115,23 @@ impl TagManager { Ok(names) } + /// Create a tag by writing the snapshot JSON to the tag path. + pub async fn create(&self, tag_name: &str, snapshot: &Snapshot) -> crate::Result<()> { + let path = self.tag_path(tag_name); + let json = serde_json::to_string(snapshot).map_err(|e| crate::Error::DataInvalid { + message: format!("failed to serialize snapshot for tag '{tag_name}': {e}"), + source: Some(Box::new(e)), + })?; + let output = self.file_io.new_output(&path)?; + output.write(bytes::Bytes::from(json)).await + } + + /// Delete a tag file. + pub async fn delete(&self, tag_name: &str) -> crate::Result<()> { + let path = self.tag_path(tag_name); + self.file_io.delete_file(&path).await + } + /// List all tags as `(name, snapshot)` pairs sorted by name ascending. pub async fn list_all(&self) -> crate::Result> { let names = self.list_all_names().await?; diff --git a/docs/src/datafusion.md b/docs/src/datafusion.md index 72c4d8ed..5664b7c4 100644 --- a/docs/src/datafusion.md +++ b/docs/src/datafusion.md @@ -283,6 +283,80 @@ WHEN MATCHED THEN UPDATE SET name = source.name; For data-evolution tables, MERGE INTO uses the `_ROW_ID` virtual column for row-level tracking. For append-only tables, it uses Copy-on-Write file rewriting. +### TRUNCATE TABLE + +Truncate an entire table or specific partitions: + +```sql +-- Truncate the entire table +TRUNCATE TABLE paimon.my_db.users; + +-- Truncate specific partitions +TRUNCATE TABLE paimon.my_db.events PARTITION (dt = '2024-01-01'); +``` + +### DROP PARTITION + +Drop specific partitions from a table using `ALTER TABLE ... DROP PARTITION`: + +```sql +ALTER TABLE paimon.my_db.events DROP PARTITION (dt = '2024-01-01'); +``` + +Multiple partition key-value pairs can be specified: + +```sql +ALTER TABLE paimon.my_db.events DROP PARTITION (dt = '2024-01-01', region = 'us'); +``` + +## Procedures + +Use `CALL` to invoke built-in procedures. All procedures are under the `sys` namespace. + +### create_tag + +Create a named tag from a snapshot: + +```sql +CALL sys.create_tag(table => 'paimon.my_db.my_table', tag => 'my_tag', snapshot_id => 1); +``` + +### create_tag_from_timestamp + +Create a named tag from a timestamp (finds the latest snapshot at or before the given time): + +```sql +CALL sys.create_tag_from_timestamp(table => 'paimon.my_db.my_table', tag => 'my_tag', timestamp => 1234567890000); +``` + +### delete_tag + +Delete a named tag: + +```sql +CALL sys.delete_tag(table => 'paimon.my_db.my_table', tag => 'my_tag'); +``` + +### rollback_to + +Rollback a table to a specific snapshot or tag: + +```sql +-- Rollback to a snapshot +CALL sys.rollback_to(table => 'paimon.my_db.my_table', snapshot_id => 1); + +-- Rollback to a tag +CALL sys.rollback_to(table => 'paimon.my_db.my_table', tag => 'my_tag'); +``` + +### rollback_to_timestamp + +Rollback a table to a specific timestamp: + +```sql +CALL sys.rollback_to_timestamp(table => 'paimon.my_db.my_table', timestamp => 1234567890000); +``` + ## Queries ### Basic Queries