Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ catalog.register_table(
)
```

To overwrite a table using existing metadata:

```python
catalog.register_table(
identifier="docs_example.bids",
metadata_location="s3://warehouse/path/to/metadata.json",
overwrite=True
)
```

## Load a table

There are two ways of reading an Iceberg table; through a catalog, and by pointing at the Iceberg metadata directly. Reading through a catalog is preferred, and directly pointing at the metadata is read-only.
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,13 @@ def namespace_exists(self, namespace: str | Identifier) -> bool:
"""

@abstractmethod
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (Union[str, Identifier]): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/bigquery_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,19 +270,23 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
datasets_iterator = self.client.list_datasets()
return [(dataset.dataset_id,) for dataset in datasets_iterator]

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (str | Identifier): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
if overwrite:
raise NotImplementedError("`overwrite` isn't supported")

dataset_name, table_name = self.identifier_to_database_and_table(identifier)

dataset_ref = DatasetReference(project=self.project_id, dataset_id=dataset_name)
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,13 @@ def create_table(

return self.load_table(identifier=identifier)

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (Union[str, Identifier]): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,19 +601,23 @@ def create_table(
catalog=self,
)

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (Union[str, Identifier]): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
if overwrite:
raise NotImplementedError("`overwrite` isn't supported")

database_name, table_name = self.identifier_to_database_and_table(identifier)
properties = EMPTY_DICT
io = self._load_file_io(location=metadata_location)
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,19 +446,23 @@ def create_view(
) -> View:
raise NotImplementedError

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (Union[str, Identifier]): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
if overwrite:
raise NotImplementedError("`overwrite` isn't supported")

database_name, table_name = self.identifier_to_database_and_table(identifier)
io = self._load_file_io(location=metadata_location)
metadata_file = io.new_input(metadata_location)
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ def load_table(self, identifier: str | Identifier) -> Table:
def table_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (Union[str, Identifier]): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table
Expand Down
5 changes: 4 additions & 1 deletion pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def transform_properties_dict_value_to_str(cls, properties: Properties) -> dict[
class RegisterTableRequest(IcebergBaseModel):
name: str
metadata_location: str = Field(..., alias="metadata-location")
overwrite: bool


class ConfigResponse(IcebergBaseModel):
Expand Down Expand Up @@ -976,12 +977,13 @@ def create_view(
return self._response_to_view(self.identifier_to_tuple(identifier), view_response)

@retry(**_RETRY_ARGS)
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (Union[str, Identifier]): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table
Expand All @@ -994,6 +996,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
request = RegisterTableRequest(
name=self._identifier_to_validated_tuple(identifier)[-1],
metadata_location=metadata_location,
overwrite=overwrite,
)
serialized_json = request.model_dump_json().encode(UTF8)
response = self._session.post(
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,13 @@ def create_table(

return self.load_table(identifier=identifier)

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Args:
identifier (Union[str, Identifier]): Table identifier for the table
metadata_location (str): The location to the metadata
overwrite (bool): Whether to overwrite the existing table, default False

Returns:
Table: The newly registered table
Expand All @@ -251,6 +252,9 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
TableAlreadyExistsError: If the table already exists
NoSuchNamespaceError: If namespace does not exist
"""
if overwrite:
raise NotImplementedError("`overwrite` isn't supported")

namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier)
Expand Down
27 changes: 27 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,33 @@ def test_register_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> N
assert "Table already exists" in str(e.value)


def test_register_table_overwrite(
rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any]
) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/default/register",
json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
actual = catalog.register_table(
identifier=("default", "registered_table"),
metadata_location="s3://warehouse/database/table/metadata.json",
overwrite=True,
)
expected = Table(
identifier=("default", "registered_table"),
metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual.metadata_location == expected.metadata_location
assert actual.name() == expected.name()


def test_delete_namespace_204(rest_mock: Mocker) -> None:
namespace = "example"
rest_mock.delete(
Expand Down
Loading