Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/paimon/src/api/api_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,29 @@ impl ListTablesResponse {
}
}

/// Response for listing partitions.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ListPartitionsResponse {
/// List of partitions.
pub partitions: Option<Vec<crate::spec::Partition>>,
/// Token for the next page.
pub next_page_token: Option<String>,
}

impl ListPartitionsResponse {
/// Create a new ListPartitionsResponse.
pub fn new(
partitions: Option<Vec<crate::spec::Partition>>,
next_page_token: Option<String>,
) -> Self {
Self {
partitions,
next_page_token,
}
}
}

/// A paginated list of elements with an optional next page token.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down
3 changes: 2 additions & 1 deletion crates/paimon/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub use api_request::{
// Re-export response types
pub use api_response::{
AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse, GetTableResponse,
GetTableTokenResponse, ListDatabasesResponse, ListTablesResponse, PagedList,
GetTableTokenResponse, ListDatabasesResponse, ListPartitionsResponse, ListTablesResponse,
PagedList,
};

// Re-export error types
Expand Down
14 changes: 14 additions & 0 deletions crates/paimon/src/api/resource_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl ResourcePaths {
const DATABASES: &'static str = "databases";
const TABLES: &'static str = "tables";
const TABLE_DETAILS: &'static str = "table-details";
const PARTITIONS: &'static str = "partitions";

/// Create a new ResourcePaths with the given prefix.
pub fn new(prefix: &str) -> Self {
Expand Down Expand Up @@ -143,6 +144,19 @@ impl ResourcePaths {
RESTUtil::encode_string(table_name)
)
}

/// Get the partitions endpoint path for a table.
pub fn partitions(&self, database_name: &str, table_name: &str) -> String {
format!(
"{}/{}/{}/{}/{}/{}",
self.base_path,
Self::DATABASES,
RESTUtil::encode_string(database_name),
Self::TABLES,
RESTUtil::encode_string(table_name),
Self::PARTITIONS
)
}
}

#[cfg(test)]
Expand Down
62 changes: 60 additions & 2 deletions crates/paimon/src/api/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ use std::collections::HashMap;
use crate::api::rest_client::HttpClient;
use crate::catalog::Identifier;
use crate::common::{CatalogOptions, Options};
use crate::spec::{PartitionStatistics, Schema, Snapshot};
use crate::spec::{Partition, PartitionStatistics, Schema, Snapshot};
use crate::Result;

use super::api_request::{
AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest,
};
use super::api_response::{
ConfigResponse, GetDatabaseResponse, GetTableResponse, ListDatabasesResponse,
ListTablesResponse, PagedList,
ListPartitionsResponse, ListTablesResponse, PagedList,
};
use super::auth::{AuthProviderFactory, RESTAuthFunction};
use super::resource_paths::ResourcePaths;
Expand Down Expand Up @@ -376,6 +376,64 @@ impl RESTApi {
Ok(())
}

// ==================== Partition Operations ====================

/// List all partitions of a table, paging internally.
pub async fn list_partitions(&self, identifier: &Identifier) -> Result<Vec<Partition>> {
let database = identifier.database();
let table = identifier.object();
validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?;

let mut results = Vec::new();
let mut page_token: Option<String> = None;

loop {
let paged = self
.list_partitions_paged(identifier, None, page_token.as_deref())
.await?;
let is_empty = paged.elements.is_empty();
results.extend(paged.elements);
page_token = paged.next_page_token;
if page_token.is_none() || is_empty {
break;
}
}

Ok(results)
}

/// List partitions with pagination.
pub async fn list_partitions_paged(
&self,
identifier: &Identifier,
max_results: Option<u32>,
page_token: Option<&str>,
) -> Result<PagedList<Partition>> {
let database = identifier.database();
let table = identifier.object();
validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?;
let path = self.resource_paths.partitions(database, table);
let mut params: Vec<(&str, String)> = Vec::new();

if let Some(max) = max_results {
params.push((Self::MAX_RESULTS, max.to_string()));
}
if let Some(token) = page_token {
params.push((Self::PAGE_TOKEN, token.to_string()));
}

let response: ListPartitionsResponse = if params.is_empty() {
self.client.get(&path, None::<&[(&str, &str)]>).await?
} else {
self.client.get(&path, Some(&params)).await?
};

Ok(PagedList::new(
response.partitions.unwrap_or_default(),
response.next_page_token,
))
}

// ==================== Token Operations ====================

/// Load table token for data access.
Expand Down
69 changes: 69 additions & 0 deletions crates/paimon/src/catalog/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,73 @@ mod tests {
assert_eq!(tables.len(), 2);
assert!(!tables.contains(&"table1".to_string()));
}

#[tokio::test]
async fn test_list_partitions_default_table_not_found_errors() {
let (_temp_dir, catalog) = create_test_catalog();
let id = Identifier::new("nope_db", "nope_table");
let result = catalog.list_partitions(&id).await;
assert!(
matches!(
result,
Err(Error::DatabaseNotExist { .. } | Error::TableNotExist { .. })
),
"expected TableNotExist/DatabaseNotExist, got {result:?}"
);
}

#[tokio::test]
async fn test_list_partitions_default_empty_table_returns_empty() {
let (_temp_dir, catalog) = create_test_catalog();
catalog
.create_database("db1", false, HashMap::new())
.await
.unwrap();
let id = Identifier::new("db1", "t1");
catalog
.create_table(&id, testing_schema(), false)
.await
.unwrap();
let parts = catalog.list_partitions(&id).await.unwrap();
assert!(
parts.is_empty(),
"table without snapshots should yield no partitions"
);
}

/// Mirrors Java `CatalogTestBase.testListPartitionsPaged`: the default impl
/// returns the same full result regardless of `max_results` / `page_token`.
#[tokio::test]
async fn test_list_partitions_paged_default_ignores_max_and_token() {
let (_temp_dir, catalog) = create_test_catalog();
catalog
.create_database("db1", false, HashMap::new())
.await
.unwrap();
let id = Identifier::new("db1", "t1");
catalog
.create_table(&id, testing_schema(), false)
.await
.unwrap();
for (max_results, page_token) in [
(None, None),
(Some(2), None),
(Some(2), Some("dt=20250101")),
(Some(8), None),
(Some(8), Some("dt=20250101")),
] {
let page = catalog
.list_partitions_paged(&id, max_results, page_token)
.await
.unwrap();
assert!(
page.elements.is_empty(),
"empty table → empty page for max_results={max_results:?}, page_token={page_token:?}"
);
assert!(
page.next_page_token.is_none(),
"default impl never paginates"
);
}
}
}
31 changes: 30 additions & 1 deletion crates/paimon/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
mod database;
mod factory;
mod filesystem;
mod partition_listing;
mod rest;

use std::collections::HashMap;
Expand All @@ -31,6 +32,7 @@ use std::fmt;
pub use database::*;
pub use factory::*;
pub use filesystem::*;
pub use partition_listing::list_partitions_from_file_system;
pub use rest::*;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -114,7 +116,8 @@ impl fmt::Debug for Identifier {

use async_trait::async_trait;

use crate::spec::{Schema, SchemaChange};
use crate::api::PagedList;
use crate::spec::{Partition, Schema, SchemaChange};
use crate::table::Table;
use crate::Result;

Expand Down Expand Up @@ -227,4 +230,30 @@ pub trait Catalog: Send + Sync {
changes: Vec<SchemaChange>,
ignore_if_not_exists: bool,
) -> Result<()>;

/// List partitions for a table.
///
/// Default impl scans the table's manifest entries via
/// [`list_partitions_from_file_system`], matching Java
/// `AbstractCatalog.listPartitions`. Catalogs with metastore-tracked
/// partitions (e.g. `RESTCatalog`) override to return audit fields too.
async fn list_partitions(&self, identifier: &Identifier) -> Result<Vec<Partition>> {
let table = self.get_table(identifier).await?;
list_partitions_from_file_system(&table).await
}

/// Like [`Self::list_partitions`] but paged. Default impl ignores
/// `max_results` and `page_token`, returning all partitions in a single page.
/// Catalogs that need true pagination (e.g. `RESTCatalog`) override this.
async fn list_partitions_paged(
&self,
identifier: &Identifier,
_max_results: Option<u32>,
_page_token: Option<&str>,
) -> Result<PagedList<Partition>> {
Ok(PagedList::new(
self.list_partitions(identifier).await?,
None,
))
}
}
105 changes: 105 additions & 0 deletions crates/paimon/src/catalog/partition_listing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Mirrors Java [CatalogUtils.listPartitionsFromFileSystem](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java).
//!
//! Used as the catalog-side fallback when a backend doesn't track partitions
//! (e.g. `FileSystemCatalog`, or `RESTCatalog` against a non-metastore server).

use std::collections::{BTreeMap, HashMap};

use crate::spec::{BinaryRow, CoreOptions, Manifest, ManifestList, Partition, PartitionComputer};
use crate::table::{SnapshotManager, Table};
use crate::Result;

/// Scan a table's manifest entries and aggregate them into [`Partition`] rows,
/// matching the shape catalogs would otherwise return from a metastore.
pub async fn list_partitions_from_file_system(table: &Table) -> Result<Vec<Partition>> {
let file_io = table.file_io();
let sm = SnapshotManager::new(file_io.clone(), table.location().to_string());
let snapshot = match sm.get_latest_snapshot().await? {
Some(s) => s,
None => return Ok(Vec::new()),
};

let base_path = sm.manifest_path(snapshot.base_manifest_list());
let delta_path = sm.manifest_path(snapshot.delta_manifest_list());
let (base_metas, delta_metas) = futures::try_join!(
ManifestList::read(file_io, &base_path),
ManifestList::read(file_io, &delta_path),
)?;

let mut all_entries = Vec::new();
for meta in base_metas.into_iter().chain(delta_metas) {
let manifest_path = sm.manifest_path(meta.file_name());
let entries = Manifest::read(file_io, &manifest_path).await?;
all_entries.extend(entries);
}

let schema = table.schema();
let core = CoreOptions::new(schema.options());
let computer = PartitionComputer::new(
schema.partition_keys(),
schema.fields(),
core.partition_default_name(),
core.legacy_partition_name(),
)?;

#[derive(Default)]
struct Agg {
record_count: i64,
file_size: i64,
file_count: i64,
last_file_creation_time: i64,
}
let mut buckets: BTreeMap<Vec<u8>, Agg> = BTreeMap::new();
for entry in &all_entries {
let file = entry.file();
let agg = buckets.entry(entry.partition().to_vec()).or_default();
agg.record_count += file.row_count;
agg.file_size += file.file_size;
agg.file_count += 1;
if let Some(ct) = file.creation_time {
agg.last_file_creation_time = agg.last_file_creation_time.max(ct.timestamp_millis());
}
}

let mut result = Vec::with_capacity(buckets.len());
for (bytes, agg) in buckets {
let spec = if bytes.is_empty() {
HashMap::new()
} else {
let row = BinaryRow::from_serialized_bytes(&bytes)?;
computer.generate_part_values(&row)?.into_iter().collect()
};
result.push(Partition {
spec,
record_count: agg.record_count,
file_size_in_bytes: agg.file_size,
file_count: agg.file_count,
last_file_creation_time: agg.last_file_creation_time,
total_buckets: 0,
done: false,
created_at: None,
created_by: None,
updated_at: None,
updated_by: None,
options: None,
});
}
Ok(result)
}
Loading
Loading