feat(datafusion): add CALL procedure support for tag and rollback operations#291
feat(datafusion): add CALL procedure support for tag and rollback operations#291JingsongLi merged 3 commits intoapache:mainfrom
Conversation
| ))); | ||
| } | ||
| let snapshot = if let Some(id) = snapshot_id { | ||
| sm.get_snapshot(id).await.map_err(to_datafusion_error)? |
There was a problem hiding this comment.
This only resolves snapshot_id from the live snapshot files. In Java Paimon, creating a tag from a snapshot id also works when that snapshot file has already expired but is still retained by an existing tag. With this implementation, CALL sys.create_tag(..., snapshot_id => ...) incorrectly fails for tag-retained snapshots, which defeats one of the main retention use cases for tags. Please resolve the id from both snapshot files and tag metadata.
|
|
||
| let (sm, tm) = managers(&table); | ||
| let snapshot = sm | ||
| .later_or_equal_time_millis(timestamp) |
There was a problem hiding this comment.
This should also consider snapshots retained only by existing tags. Java Paimon first checks live snapshots and then scans tag snapshots to choose the earliest snapshot whose commit time is >= the requested timestamp. This implementation only sees live snapshot files, so after expiration it can choose a later snapshot or fail even though a suitable tag-retained snapshot exists.
| let id: i64 = id_str | ||
| .parse() | ||
| .map_err(|_| DataFusionError::Plan(format!("Invalid snapshot_id: '{id_str}'")))?; | ||
| sm.get_snapshot(id).await.map_err(to_datafusion_error)?; |
There was a problem hiding this comment.
Same issue here as in create_tag: rollback by snapshot_id only accepts a live snapshot file. Java Paimon falls back to tag metadata when the requested snapshot file has expired but the snapshot is still tag-retained, and then rolls back through the tag path. As written, CALL sys.rollback_to(..., snapshot_id => ...) will reject valid tag-retained snapshots after expiration.
| /// | ||
| /// Reference: [SnapshotManager.earlierOrEqualTimeMills](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java) | ||
| pub async fn earlier_or_equal_time_mills( | ||
| pub async fn earlier_or_equal_time_millis( |
There was a problem hiding this comment.
This renames the public API from earlier_or_equal_time_mills to earlier_or_equal_time_millis. Internal callers are updated, but downstream users of the crate calling the old public method will get a compile break. Can we keep a deprecated compatibility wrapper with the old name that forwards to this method?
jerry-024
left a comment
There was a problem hiding this comment.
Two semantic divergences from the Java reference. Details inline.
| let tag_str = require_arg(args, "tag")?; | ||
|
|
||
| let (_, tm) = managers(&table); | ||
| for tag_name in tag_str.split(',') { |
There was a problem hiding this comment.
This loop diverges from Java in two ways:
- Java is more lenient.
Table.deleteTags(tagStr)callsTagManager.deleteTagfor each name, and Java'sdeleteTaglogsWARN "Tag '...' doesn't exist."and returns silently when the tag is missing. The Rust version returnsErrinstead. - Rust leaves partial state. Calling with
tag => 'v1,v2,bad'deletesv1andv2, then errors onbad. The caller sees an error and may assume nothing changed, but two tags are already gone. Same hazard for empty entries from'v1,,v2'.
Suggest either matching Java's warn-and-continue semantics, or doing a two-pass: split + trim + non-empty check + tag_exists check across all names first, then delete.
Java reference: Table.deleteTags default impl + TagManager.deleteTag.
| ok_result(ctx) | ||
| } | ||
|
|
||
| async fn clean_larger_than( |
There was a problem hiding this comment.
Missing long-lived changelog cleanup. Java's RollbackHelper.cleanLargerThan does three things:
cleanSnapshots— covered here (steps 1 + 2)cleanLongLivedChangelogs— missingcleanTags— covered here (step 3)
For tables with a changelog producer, this leaves changelog files newer than the rollback target on disk and never updates the long-lived-changelog LATEST hint (or sets it to -1 when everything is cleaned). After rollback, orphan changelog entries can break incremental scans and confuse readers that follow the changelog hint.
The Java implementation also computes to = max(earliest, retained_id + 1) so it doesn't try to delete already-expired snapshots; the Rust loop is fine on that front because list_all_ids() only returns existing files, but worth keeping the structure aligned with Java for future maintenance.
Purpose
//! CALL procedure support for Paimon tables.
//!
//! Supported procedures:
//! -
CALL sys.create_tag(table => '...', tag => '...', snapshot_id => ...)//! -
CALL sys.delete_tag(table => '...', tag => '...')//! -
CALL sys.rollback_to(table => '...', snapshot_id => ... | tag => '...')//! -
CALL sys.rollback_to_timestamp(table => '...', timestamp => ...)//! -
CALL sys.create_tag_from_timestamp(table => '...', tag => '...', timestamp => ...)Brief change log
Tests
API and Format
Documentation