diff --git a/crates/integrations/datafusion/src/system_tables/branches.rs b/crates/integrations/datafusion/src/system_tables/branches.rs new file mode 100644 index 00000000..581fb978 --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/branches.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [BranchesTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java). + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; +use datafusion::arrow::array::{RecordBatch, StringArray, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::catalog::Session; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use paimon::table::{BranchManager, Table}; + +use crate::error::to_datafusion_error; + +pub(super) fn build(table: Table) -> DFResult> { + Ok(Arc::new(BranchesTable { table })) +} + +fn branches_schema() -> SchemaRef { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("branch_name", DataType::Utf8, false), + Field::new( + "create_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ])) + }) + .clone() +} + +#[derive(Debug)] +struct BranchesTable { + table: Table, +} + +#[async_trait] +impl TableProvider for BranchesTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + branches_schema() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let bm = BranchManager::new( + self.table.file_io().clone(), + self.table.location().to_string(), + ); + let branch_names = bm.list_all().await.map_err(to_datafusion_error)?; + + let n = branch_names.len(); + let mut names: Vec = Vec::with_capacity(n); + let mut create_times = Vec::with_capacity(n); + + for name in branch_names { + let branch_path = bm.branch_path(&name); + let status = self + .table + .file_io() + .get_status(&branch_path) + .await + .map_err(to_datafusion_error)?; + let ts_millis = status + .last_modified + .map(|dt| dt.timestamp_millis()) + .unwrap_or(0); + names.push(name); + create_times.push(ts_millis); + } + + let schema = branches_schema(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(names)), + Arc::new(TimestampMillisecondArray::from(create_times)), + ], + )?; + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema, + projection.cloned(), + )?) + } +} diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs index bc2e3fd6..1bdc0d13 100644 --- a/crates/integrations/datafusion/src/system_tables/mod.rs +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -29,6 +29,7 @@ use paimon::table::Table; use crate::error::to_datafusion_error; +mod branches; mod manifests; mod options; mod schemas; @@ -38,6 +39,7 @@ mod tags; type Builder = fn(Table) -> DFResult>; const TABLES: &[(&str, Builder)] = &[ + ("branches", branches::build), ("manifests", manifests::build), ("options", options::build), ("schemas", schemas::build), @@ -130,6 +132,9 @@ mod tests { assert!(is_registered("schemas")); assert!(is_registered("Schemas")); assert!(is_registered("SCHEMAS")); + assert!(is_registered("branches")); + assert!(is_registered("Branches")); + assert!(is_registered("BRANCHES")); assert!(is_registered("tags")); assert!(is_registered("Tags")); assert!(is_registered("TAGS")); diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs index da9b8013..0d0afa40 100644 --- a/crates/integrations/datafusion/tests/system_tables.rs +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -329,6 +329,63 @@ async fn test_snapshots_system_table() { ); } +#[tokio::test] +async fn test_branches_system_table_empty_when_no_branch_dir() { + let (ctx, _catalog, _tmp) = create_context().await; + let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$branches"); + let batches = run_sql(&ctx, &sql).await; + + // Schema must be present even with zero rows. + assert!(!batches.is_empty(), "$branches should return ≥1 batch"); + let arrow_schema = batches[0].schema(); + let expected_columns = [ + ("branch_name", DataType::Utf8), + ( + "create_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + ), + ]; + for (i, (name, dtype)) in expected_columns.iter().enumerate() { + let field = arrow_schema.field(i); + assert_eq!(field.name(), name, "column {i} name"); + assert_eq!(field.data_type(), dtype, "column {i} type"); + } + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 0, "fixture has no branch dir, expected 0 rows"); +} + +#[tokio::test] +async fn test_branches_system_table_with_seeded_branches() { + let (ctx, _catalog, tmp) = create_context().await; + + let table_dir = tmp.path().join("default.db").join(FIXTURE_TABLE); + let branch_dir = table_dir.join("branch"); + std::fs::create_dir_all(&branch_dir).expect("create branch dir"); + std::fs::create_dir_all(branch_dir.join("branch-b1")).unwrap(); + std::fs::create_dir_all(branch_dir.join("branch-b2")).unwrap(); + + let sql = format!("SELECT branch_name FROM paimon.default.{FIXTURE_TABLE}$branches"); + let batches = run_sql(&ctx, &sql).await; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2, "expected two seeded branches"); + + let mut names: Vec = Vec::new(); + for batch in &batches { + let names_col = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("branch_name is Utf8"); + for i in 0..batch.num_rows() { + names.push(names_col.value(i).to_string()); + } + } + let mut sorted_names = names.clone(); + sorted_names.sort(); + assert_eq!(names, sorted_names, "branch_name should be ascending"); + assert_eq!(names, vec!["b1".to_string(), "b2".to_string()]); +} + #[tokio::test] async fn test_tags_system_table_empty_when_no_tag_dir() { let (ctx, _catalog, _tmp) = create_context().await; diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 93758e87..eef5dd65 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -213,6 +213,17 @@ impl FileIO { Ok(()) } + /// Copy a file from src to dst. + /// + /// Overwrites dst if it already exists. + pub async fn copy_file(&self, src: &str, dst: &str) -> Result<()> { + let input = self.new_input(src)?; + let bytes = input.read().await?; + let output = self.new_output(dst)?; + output.write(bytes).await?; + Ok(()) + } + /// Renames the file/directory src to dst. /// /// Reference: diff --git a/crates/paimon/src/table/branch_manager.rs b/crates/paimon/src/table/branch_manager.rs new file mode 100644 index 00000000..bd94cdb1 --- /dev/null +++ b/crates/paimon/src/table/branch_manager.rs @@ -0,0 +1,577 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Branch manager for reading and managing branch metadata using FileIO. +//! +//! Reference: [org.apache.paimon.utils.BranchManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java) +//! and [org.apache.paimon.utils.FileSystemBranchManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java) + +use crate::catalog::DEFAULT_MAIN_BRANCH; +use crate::io::FileIO; +use crate::table::{SchemaManager, SnapshotManager, TagManager}; +use opendal::raw::get_basename; + +const BRANCH_DIR: &str = "branch"; +const BRANCH_PREFIX: &str = "branch-"; + +/// Manager for branch directories using unified FileIO. +/// +/// Branches are stored as sub-directories at `{table_path}/branch/branch-{name}`. +/// +/// Reference: [org.apache.paimon.utils.BranchManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java) +#[derive(Debug, Clone)] +pub struct BranchManager { + file_io: FileIO, + table_path: String, +} + +impl BranchManager { + pub fn new(file_io: FileIO, table_path: String) -> Self { + Self { + file_io, + table_path, + } + } + + /// Path to the branch directory (e.g. `table_path/branch`). + pub fn branch_directory(&self) -> String { + format!("{}/{}", self.table_path, BRANCH_DIR) + } + + /// Path to the branch sub-directory for the given name (e.g. `branch/branch-my_branch`). + pub fn branch_path(&self, branch_name: &str) -> String { + format!( + "{}/{}{}", + self.branch_directory(), + BRANCH_PREFIX, + branch_name + ) + } + + /// Validate branch name format. + /// + /// Rules: + /// - Cannot be "main" + /// - Cannot be blank or whitespace only + /// - Cannot be a pure numeric string + fn validate_branch_name(branch_name: &str) -> crate::Result<()> { + if branch_name == DEFAULT_MAIN_BRANCH { + return Err(crate::Error::DataInvalid { + message: format!( + "Branch name '{}' is the default branch and cannot be used.", + DEFAULT_MAIN_BRANCH + ), + source: None, + }); + } + if branch_name.trim().is_empty() { + return Err(crate::Error::DataInvalid { + message: format!("Branch name '{}' is blank.", branch_name), + source: None, + }); + } + if branch_name.chars().all(|c| c.is_ascii_digit()) { + return Err(crate::Error::DataInvalid { + message: format!( + "Branch name cannot be pure numeric string but is '{}'.", + branch_name + ), + source: None, + }); + } + Ok(()) + } + + /// Validate branch name and ensure it does not already exist. + pub async fn validate_branch(&self, branch_name: &str) -> crate::Result<()> { + Self::validate_branch_name(branch_name)?; + if self.branch_exists(branch_name).await? { + return Err(crate::Error::DataInvalid { + message: format!("Branch name '{}' already exists.", branch_name), + source: None, + }); + } + Ok(()) + } + + /// Check if a branch exists. + pub async fn branch_exists(&self, branch_name: &str) -> crate::Result { + let path = format!("{}/", self.branch_path(branch_name)); + self.file_io.exists(&path).await + } + + /// List all branch names sorted ascending. Returns an empty vector when the + /// branch directory does not exist. + pub async fn list_all(&self) -> crate::Result> { + let branch_dir = self.branch_directory(); + let statuses = match self.file_io.list_status(&branch_dir).await { + Ok(s) => s, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(Vec::new()); + } + Err(e) => return Err(e), + }; + let mut names: Vec = statuses + .into_iter() + .filter(|s| s.is_dir) + .filter_map(|s| { + get_basename(&s.path) + .strip_suffix("/") + .and_then(|base| base.strip_prefix(BRANCH_PREFIX)) + .map(str::to_string) + }) + .collect(); + names.sort_unstable(); + Ok(names) + } + + /// Create a new branch by copying the latest schema to the branch directory. + pub async fn create_branch(&self, branch_name: &str) -> crate::Result<()> { + self.validate_branch(branch_name).await?; + let schema_manager = SchemaManager::new(self.file_io.clone(), self.table_path.clone()); + if let Some(latest) = schema_manager.latest().await? { + self.copy_schemas_to_branch(branch_name, latest.id()) + .await?; + } + Ok(()) + } + + /// Create a new branch from a tag by copying the tag, snapshot and schemas. + pub async fn create_branch_from_tag( + &self, + branch_name: &str, + tag_name: &str, + ) -> crate::Result<()> { + self.validate_branch(branch_name).await?; + let tag_manager = TagManager::new(self.file_io.clone(), self.table_path.clone()); + let snapshot = + tag_manager + .get(tag_name) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!("Tag '{}' does not exist.", tag_name), + source: None, + })?; + + let snapshot_manager = SnapshotManager::new(self.file_io.clone(), self.table_path.clone()); + + // Copy tag file to branch + let tag_src = tag_manager.tag_path(tag_name); + let tag_dst = tag_manager.with_branch(branch_name).tag_path(tag_name); + self.file_io.copy_file(&tag_src, &tag_dst).await?; + + // Copy snapshot file to branch + let snap_src = snapshot_manager.snapshot_path(snapshot.id()); + let snap_dst = snapshot_manager + .with_branch(branch_name) + .snapshot_path(snapshot.id()); + self.file_io.copy_file(&snap_src, &snap_dst).await?; + + // Copy schemas to branch + self.copy_schemas_to_branch(branch_name, snapshot.schema_id()) + .await?; + + Ok(()) + } + + /// Drop an existing branch. + pub async fn drop_branch(&self, branch_name: &str) -> crate::Result<()> { + if !self.branch_exists(branch_name).await? { + return Err(crate::Error::DataInvalid { + message: format!("Branch name '{}' doesn't exist.", branch_name), + source: None, + }); + } + let path = self.branch_path(branch_name); + self.file_io.delete_dir(&path).await?; + Ok(()) + } + + /// Rename an existing branch. + pub async fn rename_branch(&self, from: &str, to: &str) -> crate::Result<()> { + if from == DEFAULT_MAIN_BRANCH { + return Err(crate::Error::DataInvalid { + message: "Cannot rename the main branch.".to_string(), + source: None, + }); + } + if !self.branch_exists(from).await? { + return Err(crate::Error::DataInvalid { + message: format!("Branch name '{}' doesn't exist.", from), + source: None, + }); + } + Self::validate_branch_name(to)?; + if self.branch_exists(to).await? { + return Err(crate::Error::DataInvalid { + message: format!("Branch name '{}' already exists.", to), + source: None, + }); + } + let src = self.branch_path(from); + let dst = self.branch_path(to); + match self.file_io.rename(&src, &dst).await { + Ok(()) => Ok(()), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::Unsupported => + { + self.copy_dir_recursive(&src, &dst).await?; + self.file_io.delete_dir(&src).await?; + Ok(()) + } + Err(e) => Err(e), + } + } + + /// Recursively copy a directory tree from src to dst. + async fn copy_dir_recursive(&self, src: &str, dst: &str) -> crate::Result<()> { + use std::collections::VecDeque; + + self.file_io.mkdirs(dst).await?; + let mut queue = VecDeque::new(); + queue.push_back((src.to_string(), dst.to_string())); + + while let Some((current_src, current_dst)) = queue.pop_front() { + self.file_io.mkdirs(¤t_dst).await?; + let statuses = self.file_io.list_status(¤t_src).await?; + for status in statuses { + let name = get_basename(&status.path).trim_end_matches('/'); + let src_path = format!("{}/{}", current_src.trim_end_matches('/'), name); + let dst_path = format!("{}/{}", current_dst.trim_end_matches('/'), name); + if status.is_dir { + queue.push_back((src_path, dst_path)); + } else { + self.file_io.copy_file(&src_path, &dst_path).await?; + } + } + } + Ok(()) + } + + /// Copy all schemas with id <= schema_id to the branch directory. + async fn copy_schemas_to_branch(&self, branch_name: &str, schema_id: i64) -> crate::Result<()> { + let schema_manager = SchemaManager::new(self.file_io.clone(), self.table_path.clone()); + let ids = schema_manager.list_all_ids().await?; + let branch_schema_manager = schema_manager.with_branch(branch_name); + for id in ids { + if id <= schema_id { + let src = schema_manager.schema_path(id); + let dst = branch_schema_manager.schema_path(id); + self.file_io.copy_file(&src, &dst).await?; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{CommitKind, Schema, Snapshot, TableSchema}; + use bytes::Bytes; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn test_schema() -> TableSchema { + let schema = Schema::builder().build().unwrap(); + TableSchema::new(0, &schema) + } + + fn test_snapshot(id: i64) -> Snapshot { + Snapshot::builder() + .version(3) + .id(id) + .schema_id(0) + .base_manifest_list("base-list".to_string()) + .delta_manifest_list("delta-list".to_string()) + .commit_user("test-user".to_string()) + .commit_identifier(0) + .commit_kind(CommitKind::APPEND) + .time_millis(1000 * id as u64) + .build() + } + + async fn write_schema(file_io: &FileIO, schema_manager: &SchemaManager, schema: &TableSchema) { + let path = schema_manager.schema_path(schema.id()); + let json = serde_json::to_string(schema).unwrap(); + let output = file_io.new_output(&path).unwrap(); + output.write(Bytes::from(json)).await.unwrap(); + } + + async fn write_snapshot( + file_io: &FileIO, + snapshot_manager: &SnapshotManager, + snapshot: &Snapshot, + ) { + let path = snapshot_manager.snapshot_path(snapshot.id()); + let json = serde_json::to_string(snapshot).unwrap(); + let output = file_io.new_output(&path).unwrap(); + output.write(Bytes::from(json)).await.unwrap(); + } + + async fn write_tag( + file_io: &FileIO, + tag_manager: &TagManager, + name: &str, + snapshot: &Snapshot, + ) { + let path = tag_manager.tag_path(name); + let json = serde_json::to_string(snapshot).unwrap(); + let output = file_io.new_output(&path).unwrap(); + output.write(Bytes::from(json)).await.unwrap(); + } + + #[tokio::test] + async fn test_list_all_missing_dir_returns_empty() { + let file_io = test_file_io(); + let bm = BranchManager::new(file_io, "memory:/test_branch_missing".to_string()); + assert!(bm.list_all().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_sorted() { + let file_io = test_file_io(); + let table_path = "memory:/test_branch_sorted".to_string(); + let branch_dir = format!("{table_path}/branch"); + file_io.mkdirs(&branch_dir).await.unwrap(); + + for name in ["b", "a", "c"] { + let path = format!("{branch_dir}/branch-{name}"); + file_io.mkdirs(&path).await.unwrap(); + } + + let bm = BranchManager::new(file_io, table_path); + assert_eq!(bm.list_all().await.unwrap(), vec!["a", "b", "c"]); + } + + #[tokio::test] + async fn test_branch_exists() { + let file_io = test_file_io(); + let table_path = "memory:/test_branch_exists".to_string(); + let branch_dir = format!("{table_path}/branch"); + file_io.mkdirs(&branch_dir).await.unwrap(); + file_io + .mkdirs(&format!("{branch_dir}/branch-foo")) + .await + .unwrap(); + + let bm = BranchManager::new(file_io, table_path); + assert!(bm.branch_exists("foo").await.unwrap()); + assert!(!bm.branch_exists("bar").await.unwrap()); + } + + #[tokio::test] + async fn test_validate_branch_name_rejects_main() { + let result = BranchManager::validate_branch_name("main"); + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("main")); + } + + #[tokio::test] + async fn test_validate_branch_name_rejects_blank() { + let result = BranchManager::validate_branch_name(""); + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("blank")); + } + + #[tokio::test] + async fn test_validate_branch_name_rejects_numeric() { + let result = BranchManager::validate_branch_name("123"); + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("numeric")); + } + + #[tokio::test] + async fn test_validate_branch_name_accepts_valid() { + assert!(BranchManager::validate_branch_name("my_branch").is_ok()); + assert!(BranchManager::validate_branch_name("branch-1").is_ok()); + } + + #[tokio::test] + async fn test_create_branch() { + let file_io = test_file_io(); + let table_path = "memory:/test_create_branch".to_string(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.clone()); + write_schema(&file_io, &schema_manager, &test_schema()).await; + + let bm = BranchManager::new(file_io.clone(), table_path.clone()); + bm.create_branch("my_branch").await.unwrap(); + + assert!(bm.branch_exists("my_branch").await.unwrap()); + // Verify schema was copied + let branch_schema_manager = schema_manager.with_branch("my_branch"); + let schemas = branch_schema_manager.list_all().await.unwrap(); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas[0].id(), 0); + } + + #[tokio::test] + async fn test_create_branch_already_exists() { + let file_io = test_file_io(); + let table_path = "memory:/test_create_branch_exists".to_string(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.clone()); + write_schema(&file_io, &schema_manager, &test_schema()).await; + + let bm = BranchManager::new(file_io, table_path); + bm.create_branch("my_branch").await.unwrap(); + let result = bm.create_branch("my_branch").await; + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("already exists")); + } + + #[tokio::test] + async fn test_create_branch_from_tag() { + let file_io = test_file_io(); + let table_path = "memory:/test_create_branch_from_tag".to_string(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.clone()); + let snapshot_manager = SnapshotManager::new(file_io.clone(), table_path.clone()); + let tag_manager = TagManager::new(file_io.clone(), table_path.clone()); + + write_schema(&file_io, &schema_manager, &test_schema()).await; + let snap = test_snapshot(1); + write_snapshot(&file_io, &snapshot_manager, &snap).await; + write_tag(&file_io, &tag_manager, "v1", &snap).await; + + let bm = BranchManager::new(file_io.clone(), table_path.clone()); + bm.create_branch_from_tag("my_branch", "v1").await.unwrap(); + + assert!(bm.branch_exists("my_branch").await.unwrap()); + + // Verify snapshot was copied + let branch_snap_manager = snapshot_manager.with_branch("my_branch"); + let copied = branch_snap_manager.get_snapshot(1).await.unwrap(); + assert_eq!(copied.id(), 1); + + // Verify tag was copied + let branch_tag_manager = tag_manager.with_branch("my_branch"); + let tag_snap = branch_tag_manager.get("v1").await.unwrap(); + assert!(tag_snap.is_some()); + + // Verify schema was copied + let branch_schema_manager = schema_manager.with_branch("my_branch"); + let schemas = branch_schema_manager.list_all().await.unwrap(); + assert_eq!(schemas.len(), 1); + } + + #[tokio::test] + async fn test_drop_branch() { + let file_io = test_file_io(); + let table_path = "memory:/test_drop_branch".to_string(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.clone()); + write_schema(&file_io, &schema_manager, &test_schema()).await; + + let bm = BranchManager::new(file_io, table_path); + bm.create_branch("to_drop").await.unwrap(); + assert!(bm.branch_exists("to_drop").await.unwrap()); + + bm.drop_branch("to_drop").await.unwrap(); + assert!(!bm.branch_exists("to_drop").await.unwrap()); + } + + #[tokio::test] + async fn test_drop_nonexistent_branch() { + let file_io = test_file_io(); + let bm = BranchManager::new(file_io, "memory:/test_drop_none".to_string()); + let result = bm.drop_branch("nonexistent").await; + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("doesn't exist")); + } + + #[tokio::test] + async fn test_rename_branch() { + let file_io = test_file_io(); + let table_path = "memory:/test_rename_branch".to_string(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.clone()); + write_schema(&file_io, &schema_manager, &test_schema()).await; + + let bm = BranchManager::new(file_io, table_path); + bm.create_branch("old_branch").await.unwrap(); + assert!(bm.branch_exists("old_branch").await.unwrap()); + + bm.rename_branch("old_branch", "new_branch").await.unwrap(); + assert!(!bm.branch_exists("old_branch").await.unwrap()); + assert!(bm.branch_exists("new_branch").await.unwrap()); + + let branches = bm.list_all().await.unwrap(); + assert!(branches.contains(&"new_branch".to_string())); + assert!(!branches.contains(&"old_branch".to_string())); + } + + #[tokio::test] + async fn test_rename_nonexistent_branch() { + let file_io = test_file_io(); + let bm = BranchManager::new(file_io, "memory:/test_rename_none".to_string()); + let result = bm.rename_branch("nonexistent", "new_name").await; + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("doesn't exist")); + } + + #[tokio::test] + async fn test_rename_to_existing_branch() { + let file_io = test_file_io(); + let table_path = "memory:/test_rename_to_existing".to_string(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.clone()); + write_schema(&file_io, &schema_manager, &test_schema()).await; + + let bm = BranchManager::new(file_io, table_path); + bm.create_branch("branch1").await.unwrap(); + bm.create_branch("branch2").await.unwrap(); + + let result = bm.rename_branch("branch1", "branch2").await; + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("already exists")); + } + + #[tokio::test] + async fn test_rename_main_branch_should_fail() { + let file_io = test_file_io(); + let bm = BranchManager::new(file_io, "memory:/test_rename_main".to_string()); + let result = bm.rename_branch("main", "new_name").await; + assert!(result.is_err()); + let msg = format!("{}", result.unwrap_err()); + assert!(msg.contains("Cannot rename the main branch")); + } + + #[tokio::test] + async fn test_rename_branch_multiple_times() { + let file_io = test_file_io(); + let table_path = "memory:/test_rename_multi".to_string(); + let schema_manager = SchemaManager::new(file_io.clone(), table_path.clone()); + write_schema(&file_io, &schema_manager, &test_schema()).await; + + let bm = BranchManager::new(file_io, table_path); + bm.create_branch("branch1").await.unwrap(); + bm.rename_branch("branch1", "branch2").await.unwrap(); + bm.rename_branch("branch2", "branch3").await.unwrap(); + + assert!(!bm.branch_exists("branch1").await.unwrap()); + assert!(!bm.branch_exists("branch2").await.unwrap()); + assert!(bm.branch_exists("branch3").await.unwrap()); + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 41f98cd9..6bf7246f 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -19,6 +19,7 @@ pub(crate) mod bin_pack; mod blob_file_writer; +mod branch_manager; mod bucket_assigner; mod bucket_assigner_constant; mod bucket_assigner_cross; @@ -57,6 +58,7 @@ mod write_builder; use crate::Result; use arrow_array::RecordBatch; +pub use branch_manager::BranchManager; pub use commit_message::CommitMessage; pub use cow_writer::{CopyOnWriteMergeWriter, FileInfo}; pub use data_evolution_writer::DataEvolutionWriter; diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index 61e3b81d..f727df68 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -62,8 +62,18 @@ impl SchemaManager { format!("{}/{}", self.table_path.trim_end_matches('/'), SCHEMA_DIR) } + /// Create a SchemaManager for a branch of this table. + pub fn with_branch(&self, branch_name: &str) -> Self { + let branch_path = format!( + "{}/branch/branch-{}", + self.table_path.trim_end_matches('/'), + branch_name + ); + Self::new(self.file_io.clone(), branch_path) + } + /// Path to a specific schema file (e.g. `{table_path}/schema/schema-0`). - fn schema_path(&self, schema_id: i64) -> String { + pub fn schema_path(&self, schema_id: i64) -> String { format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id) } diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 1e8e1735..13839e3a 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -55,6 +55,12 @@ impl SnapshotManager { format!("{}/{}", self.table_path, SNAPSHOT_DIR) } + /// Create a SnapshotManager for a branch of this table. + pub fn with_branch(&self, branch_name: &str) -> Self { + let branch_path = format!("{}/branch/branch-{}", self.table_path, branch_name); + Self::new(self.file_io.clone(), branch_path) + } + /// Path to the LATEST hint file. fn latest_hint_path(&self) -> String { format!("{}/{}", self.snapshot_dir(), LATEST_HINT) diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index 6c38499f..4e7e3ea8 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -53,6 +53,12 @@ impl TagManager { format!("{}/{}", self.table_path, TAG_DIR) } + /// Create a TagManager for a branch of this table. + pub fn with_branch(&self, branch_name: &str) -> Self { + let branch_path = format!("{}/branch/branch-{}", self.table_path, branch_name); + Self::new(self.file_io.clone(), branch_path) + } + /// Path to the tag file for the given name (e.g. `tag/tag-my_tag`). pub fn tag_path(&self, tag_name: &str) -> String { format!("{}/{}{}", self.tag_directory(), TAG_PREFIX, tag_name)