diff --git a/crates/paimon/src/api/api_response.rs b/crates/paimon/src/api/api_response.rs index c3169a27..092008a2 100644 --- a/crates/paimon/src/api/api_response.rs +++ b/crates/paimon/src/api/api_response.rs @@ -258,6 +258,29 @@ impl ListTablesResponse { } } +/// Response for listing partitions. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListPartitionsResponse { + /// List of partitions. + pub partitions: Option>, + /// Token for the next page. + pub next_page_token: Option, +} + +impl ListPartitionsResponse { + /// Create a new ListPartitionsResponse. + pub fn new( + partitions: Option>, + next_page_token: Option, + ) -> Self { + Self { + partitions, + next_page_token, + } + } +} + /// A paginated list of elements with an optional next page token. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs index 6307cf87..8584cb08 100644 --- a/crates/paimon/src/api/mod.rs +++ b/crates/paimon/src/api/mod.rs @@ -37,7 +37,8 @@ pub use api_request::{ // Re-export response types pub use api_response::{ AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse, GetTableResponse, - GetTableTokenResponse, ListDatabasesResponse, ListTablesResponse, PagedList, + GetTableTokenResponse, ListDatabasesResponse, ListPartitionsResponse, ListTablesResponse, + PagedList, }; // Re-export error types diff --git a/crates/paimon/src/api/resource_paths.rs b/crates/paimon/src/api/resource_paths.rs index 9345310a..6e16a9a3 100644 --- a/crates/paimon/src/api/resource_paths.rs +++ b/crates/paimon/src/api/resource_paths.rs @@ -32,6 +32,7 @@ impl ResourcePaths { const DATABASES: &'static str = "databases"; const TABLES: &'static str = "tables"; const TABLE_DETAILS: &'static str = "table-details"; + const PARTITIONS: &'static str = "partitions"; /// Create a new ResourcePaths with the given prefix. pub fn new(prefix: &str) -> Self { @@ -143,6 +144,19 @@ impl ResourcePaths { RESTUtil::encode_string(table_name) ) } + + /// Get the partitions endpoint path for a table. + pub fn partitions(&self, database_name: &str, table_name: &str) -> String { + format!( + "{}/{}/{}/{}/{}/{}", + self.base_path, + Self::DATABASES, + RESTUtil::encode_string(database_name), + Self::TABLES, + RESTUtil::encode_string(table_name), + Self::PARTITIONS + ) + } } #[cfg(test)] diff --git a/crates/paimon/src/api/rest_api.rs b/crates/paimon/src/api/rest_api.rs index 8a3416ed..347e6691 100644 --- a/crates/paimon/src/api/rest_api.rs +++ b/crates/paimon/src/api/rest_api.rs @@ -25,7 +25,7 @@ use std::collections::HashMap; use crate::api::rest_client::HttpClient; use crate::catalog::Identifier; use crate::common::{CatalogOptions, Options}; -use crate::spec::{PartitionStatistics, Schema, Snapshot}; +use crate::spec::{Partition, PartitionStatistics, Schema, Snapshot}; use crate::Result; use super::api_request::{ @@ -33,7 +33,7 @@ use super::api_request::{ }; use super::api_response::{ ConfigResponse, GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, - ListTablesResponse, PagedList, + ListPartitionsResponse, ListTablesResponse, PagedList, }; use super::auth::{AuthProviderFactory, RESTAuthFunction}; use super::resource_paths::ResourcePaths; @@ -376,6 +376,64 @@ impl RESTApi { Ok(()) } + // ==================== Partition Operations ==================== + + /// List all partitions of a table, paging internally. + pub async fn list_partitions(&self, identifier: &Identifier) -> Result> { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?; + + let mut results = Vec::new(); + let mut page_token: Option = None; + + loop { + let paged = self + .list_partitions_paged(identifier, None, page_token.as_deref()) + .await?; + let is_empty = paged.elements.is_empty(); + results.extend(paged.elements); + page_token = paged.next_page_token; + if page_token.is_none() || is_empty { + break; + } + } + + Ok(results) + } + + /// List partitions with pagination. + pub async fn list_partitions_paged( + &self, + identifier: &Identifier, + max_results: Option, + page_token: Option<&str>, + ) -> Result> { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?; + let path = self.resource_paths.partitions(database, table); + let mut params: Vec<(&str, String)> = Vec::new(); + + if let Some(max) = max_results { + params.push((Self::MAX_RESULTS, max.to_string())); + } + if let Some(token) = page_token { + params.push((Self::PAGE_TOKEN, token.to_string())); + } + + let response: ListPartitionsResponse = if params.is_empty() { + self.client.get(&path, None::<&[(&str, &str)]>).await? + } else { + self.client.get(&path, Some(¶ms)).await? + }; + + Ok(PagedList::new( + response.partitions.unwrap_or_default(), + response.next_page_token, + )) + } + // ==================== Token Operations ==================== /// Load table token for data access. diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index 4dbe8bd7..c544a153 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -567,4 +567,73 @@ mod tests { assert_eq!(tables.len(), 2); assert!(!tables.contains(&"table1".to_string())); } + + #[tokio::test] + async fn test_list_partitions_default_table_not_found_errors() { + let (_temp_dir, catalog) = create_test_catalog(); + let id = Identifier::new("nope_db", "nope_table"); + let result = catalog.list_partitions(&id).await; + assert!( + matches!( + result, + Err(Error::DatabaseNotExist { .. } | Error::TableNotExist { .. }) + ), + "expected TableNotExist/DatabaseNotExist, got {result:?}" + ); + } + + #[tokio::test] + async fn test_list_partitions_default_empty_table_returns_empty() { + let (_temp_dir, catalog) = create_test_catalog(); + catalog + .create_database("db1", false, HashMap::new()) + .await + .unwrap(); + let id = Identifier::new("db1", "t1"); + catalog + .create_table(&id, testing_schema(), false) + .await + .unwrap(); + let parts = catalog.list_partitions(&id).await.unwrap(); + assert!( + parts.is_empty(), + "table without snapshots should yield no partitions" + ); + } + + /// Mirrors Java `CatalogTestBase.testListPartitionsPaged`: the default impl + /// returns the same full result regardless of `max_results` / `page_token`. + #[tokio::test] + async fn test_list_partitions_paged_default_ignores_max_and_token() { + let (_temp_dir, catalog) = create_test_catalog(); + catalog + .create_database("db1", false, HashMap::new()) + .await + .unwrap(); + let id = Identifier::new("db1", "t1"); + catalog + .create_table(&id, testing_schema(), false) + .await + .unwrap(); + for (max_results, page_token) in [ + (None, None), + (Some(2), None), + (Some(2), Some("dt=20250101")), + (Some(8), None), + (Some(8), Some("dt=20250101")), + ] { + let page = catalog + .list_partitions_paged(&id, max_results, page_token) + .await + .unwrap(); + assert!( + page.elements.is_empty(), + "empty table → empty page for max_results={max_results:?}, page_token={page_token:?}" + ); + assert!( + page.next_page_token.is_none(), + "default impl never paginates" + ); + } + } } diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index 3a9fee54..0c944a4c 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -23,6 +23,7 @@ mod database; mod factory; mod filesystem; +mod partition_listing; mod rest; use std::collections::HashMap; @@ -31,6 +32,7 @@ use std::fmt; pub use database::*; pub use factory::*; pub use filesystem::*; +pub use partition_listing::list_partitions_from_file_system; pub use rest::*; use serde::{Deserialize, Serialize}; @@ -114,7 +116,8 @@ impl fmt::Debug for Identifier { use async_trait::async_trait; -use crate::spec::{Schema, SchemaChange}; +use crate::api::PagedList; +use crate::spec::{Partition, Schema, SchemaChange}; use crate::table::Table; use crate::Result; @@ -227,4 +230,30 @@ pub trait Catalog: Send + Sync { changes: Vec, ignore_if_not_exists: bool, ) -> Result<()>; + + /// List partitions for a table. + /// + /// Default impl scans the table's manifest entries via + /// [`list_partitions_from_file_system`], matching Java + /// `AbstractCatalog.listPartitions`. Catalogs with metastore-tracked + /// partitions (e.g. `RESTCatalog`) override to return audit fields too. + async fn list_partitions(&self, identifier: &Identifier) -> Result> { + let table = self.get_table(identifier).await?; + list_partitions_from_file_system(&table).await + } + + /// Like [`Self::list_partitions`] but paged. Default impl ignores + /// `max_results` and `page_token`, returning all partitions in a single page. + /// Catalogs that need true pagination (e.g. `RESTCatalog`) override this. + async fn list_partitions_paged( + &self, + identifier: &Identifier, + _max_results: Option, + _page_token: Option<&str>, + ) -> Result> { + Ok(PagedList::new( + self.list_partitions(identifier).await?, + None, + )) + } } diff --git a/crates/paimon/src/catalog/partition_listing.rs b/crates/paimon/src/catalog/partition_listing.rs new file mode 100644 index 00000000..b71e30d3 --- /dev/null +++ b/crates/paimon/src/catalog/partition_listing.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [CatalogUtils.listPartitionsFromFileSystem](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java). +//! +//! Used as the catalog-side fallback when a backend doesn't track partitions +//! (e.g. `FileSystemCatalog`, or `RESTCatalog` against a non-metastore server). + +use std::collections::{BTreeMap, HashMap}; + +use crate::spec::{BinaryRow, CoreOptions, Manifest, ManifestList, Partition, PartitionComputer}; +use crate::table::{SnapshotManager, Table}; +use crate::Result; + +/// Scan a table's manifest entries and aggregate them into [`Partition`] rows, +/// matching the shape catalogs would otherwise return from a metastore. +pub async fn list_partitions_from_file_system(table: &Table) -> Result> { + let file_io = table.file_io(); + let sm = SnapshotManager::new(file_io.clone(), table.location().to_string()); + let snapshot = match sm.get_latest_snapshot().await? { + Some(s) => s, + None => return Ok(Vec::new()), + }; + + let base_path = sm.manifest_path(snapshot.base_manifest_list()); + let delta_path = sm.manifest_path(snapshot.delta_manifest_list()); + let (base_metas, delta_metas) = futures::try_join!( + ManifestList::read(file_io, &base_path), + ManifestList::read(file_io, &delta_path), + )?; + + let mut all_entries = Vec::new(); + for meta in base_metas.into_iter().chain(delta_metas) { + let manifest_path = sm.manifest_path(meta.file_name()); + let entries = Manifest::read(file_io, &manifest_path).await?; + all_entries.extend(entries); + } + + let schema = table.schema(); + let core = CoreOptions::new(schema.options()); + let computer = PartitionComputer::new( + schema.partition_keys(), + schema.fields(), + core.partition_default_name(), + core.legacy_partition_name(), + )?; + + #[derive(Default)] + struct Agg { + record_count: i64, + file_size: i64, + file_count: i64, + last_file_creation_time: i64, + } + let mut buckets: BTreeMap, Agg> = BTreeMap::new(); + for entry in &all_entries { + let file = entry.file(); + let agg = buckets.entry(entry.partition().to_vec()).or_default(); + agg.record_count += file.row_count; + agg.file_size += file.file_size; + agg.file_count += 1; + if let Some(ct) = file.creation_time { + agg.last_file_creation_time = agg.last_file_creation_time.max(ct.timestamp_millis()); + } + } + + let mut result = Vec::with_capacity(buckets.len()); + for (bytes, agg) in buckets { + let spec = if bytes.is_empty() { + HashMap::new() + } else { + let row = BinaryRow::from_serialized_bytes(&bytes)?; + computer.generate_part_values(&row)?.into_iter().collect() + }; + result.push(Partition { + spec, + record_count: agg.record_count, + file_size_in_bytes: agg.file_size, + file_count: agg.file_count, + last_file_creation_time: agg.last_file_creation_time, + total_buckets: 0, + done: false, + created_at: None, + created_by: None, + updated_at: None, + updated_by: None, + options: None, + }); + } + Ok(result) +} diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs b/crates/paimon/src/catalog/rest/rest_catalog.rs index 1da2aa16..6fb8db09 100644 --- a/crates/paimon/src/catalog/rest/rest_catalog.rs +++ b/crates/paimon/src/catalog/rest/rest_catalog.rs @@ -28,11 +28,13 @@ use async_trait::async_trait; use crate::api::rest_api::RESTApi; use crate::api::rest_error::RestError; use crate::api::PagedList; -use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP}; +use crate::catalog::{ + list_partitions_from_file_system, Catalog, Database, Identifier, DB_LOCATION_PROP, +}; use crate::common::{CatalogOptions, Options}; use crate::error::Error; use crate::io::FileIO; -use crate::spec::{Schema, SchemaChange, TableSchema}; +use crate::spec::{Partition, Schema, SchemaChange, TableSchema}; use crate::table::{RESTEnv, Table}; use crate::Result; @@ -333,6 +335,42 @@ impl Catalog for RESTCatalog { message: "Alter table is not yet implemented for REST catalog".to_string(), }) } + + async fn list_partitions(&self, identifier: &Identifier) -> Result> { + match self.api.list_partitions(identifier).await { + Ok(parts) => Ok(parts), + Err(Error::RestApi { + source: RestError::NotImplemented { .. }, + }) => { + let table = self.get_table(identifier).await?; + list_partitions_from_file_system(&table).await + } + Err(e) => Err(map_rest_error_for_table(e, identifier)), + } + } + + async fn list_partitions_paged( + &self, + identifier: &Identifier, + max_results: Option, + page_token: Option<&str>, + ) -> Result> { + match self + .api + .list_partitions_paged(identifier, max_results, page_token) + .await + { + Ok(page) => Ok(page), + Err(Error::RestApi { + source: RestError::NotImplemented { .. }, + }) => { + let table = self.get_table(identifier).await?; + let parts = list_partitions_from_file_system(&table).await?; + Ok(PagedList::new(parts, None)) + } + Err(e) => Err(map_rest_error_for_table(e, identifier)), + } + } } // ============================================================================ // Error mapping helpers diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index e9bc085f..6ad6533f 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -65,6 +65,8 @@ pub(crate) mod avro; pub(crate) mod stats; mod types; pub use types::*; +mod partition; +pub use partition::Partition; mod partition_utils; pub(crate) use partition_utils::PartitionComputer; mod predicate; diff --git a/crates/paimon/src/spec/partition.rs b/crates/paimon/src/spec/partition.rs new file mode 100644 index 00000000..478fa0b7 --- /dev/null +++ b/crates/paimon/src/spec/partition.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [Partition](https://github.com/apache/paimon/blob/release-1.4/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java) +//! and its [PartitionStatistics](https://github.com/apache/paimon/blob/release-1.4/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java) base. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// A partition with aggregate statistics and audit metadata, as tracked by the catalog. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Partition { + pub spec: HashMap, + pub record_count: i64, + pub file_size_in_bytes: i64, + pub file_count: i64, + pub last_file_creation_time: i64, + #[serde(default)] + pub total_buckets: i32, + #[serde(default)] + pub done: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub created_at: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub created_by: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub updated_at: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub updated_by: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub options: Option>, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_partition_roundtrip_minimal() { + let original = Partition { + spec: HashMap::from([("dt".to_string(), "2024-01-01".to_string())]), + record_count: 100, + file_size_in_bytes: 2048, + file_count: 3, + last_file_creation_time: 1700000000000, + total_buckets: 4, + done: false, + created_at: None, + created_by: None, + updated_at: None, + updated_by: None, + options: None, + }; + let json = serde_json::to_string(&original).unwrap(); + let decoded: Partition = serde_json::from_str(&json).unwrap(); + assert_eq!(original, decoded); + } + + #[test] + fn test_partition_roundtrip_full() { + let original = Partition { + spec: HashMap::from([ + ("dt".to_string(), "2024-01-01".to_string()), + ("hr".to_string(), "12".to_string()), + ]), + record_count: 100, + file_size_in_bytes: 2048, + file_count: 3, + last_file_creation_time: 1700000000000, + total_buckets: 4, + done: true, + created_at: Some(1699000000000), + created_by: Some("user-a".to_string()), + updated_at: Some(1700000000000), + updated_by: Some("user-b".to_string()), + options: Some(HashMap::from([("k".to_string(), "v".to_string())])), + }; + let json = serde_json::to_string(&original).unwrap(); + let decoded: Partition = serde_json::from_str(&json).unwrap(); + assert_eq!(original, decoded); + } + + #[test] + fn test_partition_decode_with_unknown_fields() { + let json = r#"{ + "spec": {"dt": "2024-01-01"}, + "recordCount": 1, + "fileSizeInBytes": 1, + "fileCount": 1, + "lastFileCreationTime": 0, + "newField": "ignored" + }"#; + let decoded: Partition = serde_json::from_str(json).unwrap(); + assert_eq!(decoded.spec.get("dt"), Some(&"2024-01-01".to_string())); + } +}