Skip to content

feat(datafusion): add CALL procedure support for tag and rollback operations#291

Merged
JingsongLi merged 3 commits intoapache:mainfrom
JingsongLi:procedures
Apr 28, 2026
Merged

feat(datafusion): add CALL procedure support for tag and rollback operations#291
JingsongLi merged 3 commits intoapache:mainfrom
JingsongLi:procedures

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

Purpose

//! CALL procedure support for Paimon tables.
//!
//! Supported procedures:
//! - CALL sys.create_tag(table => '...', tag => '...', snapshot_id => ...)
//! - CALL sys.delete_tag(table => '...', tag => '...')
//! - CALL sys.rollback_to(table => '...', snapshot_id => ... | tag => '...')
//! - CALL sys.rollback_to_timestamp(table => '...', timestamp => ...)
//! - CALL sys.create_tag_from_timestamp(table => '...', tag => '...', timestamp => ...)

Brief change log

Tests

API and Format

Documentation

)));
}
let snapshot = if let Some(id) = snapshot_id {
sm.get_snapshot(id).await.map_err(to_datafusion_error)?
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only resolves snapshot_id from the live snapshot files. In Java Paimon, creating a tag from a snapshot id also works when that snapshot file has already expired but is still retained by an existing tag. With this implementation, CALL sys.create_tag(..., snapshot_id => ...) incorrectly fails for tag-retained snapshots, which defeats one of the main retention use cases for tags. Please resolve the id from both snapshot files and tag metadata.


let (sm, tm) = managers(&table);
let snapshot = sm
.later_or_equal_time_millis(timestamp)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also consider snapshots retained only by existing tags. Java Paimon first checks live snapshots and then scans tag snapshots to choose the earliest snapshot whose commit time is >= the requested timestamp. This implementation only sees live snapshot files, so after expiration it can choose a later snapshot or fail even though a suitable tag-retained snapshot exists.

let id: i64 = id_str
.parse()
.map_err(|_| DataFusionError::Plan(format!("Invalid snapshot_id: '{id_str}'")))?;
sm.get_snapshot(id).await.map_err(to_datafusion_error)?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here as in create_tag: rollback by snapshot_id only accepts a live snapshot file. Java Paimon falls back to tag metadata when the requested snapshot file has expired but the snapshot is still tag-retained, and then rolls back through the tag path. As written, CALL sys.rollback_to(..., snapshot_id => ...) will reject valid tag-retained snapshots after expiration.

///
/// Reference: [SnapshotManager.earlierOrEqualTimeMills](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java)
pub async fn earlier_or_equal_time_mills(
pub async fn earlier_or_equal_time_millis(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This renames the public API from earlier_or_equal_time_mills to earlier_or_equal_time_millis. Internal callers are updated, but downstream users of the crate calling the old public method will get a compile break. Can we keep a deprecated compatibility wrapper with the old name that forwards to this method?

Copy link
Copy Markdown

@jerry-024 jerry-024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two semantic divergences from the Java reference. Details inline.

let tag_str = require_arg(args, "tag")?;

let (_, tm) = managers(&table);
for tag_name in tag_str.split(',') {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop diverges from Java in two ways:

  1. Java is more lenient. Table.deleteTags(tagStr) calls TagManager.deleteTag for each name, and Java's deleteTag logs WARN "Tag '...' doesn't exist." and returns silently when the tag is missing. The Rust version returns Err instead.
  2. Rust leaves partial state. Calling with tag => 'v1,v2,bad' deletes v1 and v2, then errors on bad. The caller sees an error and may assume nothing changed, but two tags are already gone. Same hazard for empty entries from 'v1,,v2'.

Suggest either matching Java's warn-and-continue semantics, or doing a two-pass: split + trim + non-empty check + tag_exists check across all names first, then delete.

Java reference: Table.deleteTags default impl + TagManager.deleteTag.

ok_result(ctx)
}

async fn clean_larger_than(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing long-lived changelog cleanup. Java's RollbackHelper.cleanLargerThan does three things:

  1. cleanSnapshots — covered here (steps 1 + 2)
  2. cleanLongLivedChangelogsmissing
  3. cleanTags — covered here (step 3)

For tables with a changelog producer, this leaves changelog files newer than the rollback target on disk and never updates the long-lived-changelog LATEST hint (or sets it to -1 when everything is cleaned). After rollback, orphan changelog entries can break incremental scans and confuse readers that follow the changelog hint.

The Java implementation also computes to = max(earliest, retained_id + 1) so it doesn't try to delete already-expired snapshots; the Rust loop is fine on that front because list_all_ids() only returns existing files, but worth keeping the structure aligned with Java for future maintenance.

Copy link
Copy Markdown

@jerry-024 jerry-024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 6486efd into apache:main Apr 28, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants