diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs
index b7d192210b..4157e02ac3 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -592,15 +592,58 @@ impl Catalog for HmsCatalog {
Ok(())
}
+ /// Registers an existing Iceberg table by its metadata location.
+ ///
+ /// This method allows registering a table that already has metadata written
+ /// to storage, without creating new metadata. It reads the existing metadata
+ /// from the provided location and creates a corresponding entry in HMS.
+ ///
+ /// # Returns
+ /// A `Result` wrapping a `Table` object representing the registered table.
+ ///
+ /// # Errors
+ /// This function may return an error in several scenarios:
+ /// - Failure to validate the namespace identifier
+ /// - Failure to read metadata from the provided location
+ /// - Table already exists in HMS
+ /// - Errors communicating with the Hive Metastore
async fn register_table(
&self,
- _table_ident: &TableIdent,
- _metadata_location: String,
+ table_ident: &TableIdent,
+ metadata_location: String,
) -> Result
{
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Registering a table is not supported yet",
- ))
+ let db_name = validate_namespace(table_ident.namespace())?;
+ let table_name = table_ident.name().to_string();
+
+ // Read metadata from provided location
+ let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
+
+ // Get table location from metadata
+ let location = metadata.location().to_string();
+
+ // Convert to HMS table format
+ let hive_table = convert_to_hive_table(
+ db_name.clone(),
+ metadata.current_schema(),
+ table_name.clone(),
+ location,
+ metadata_location.clone(),
+ metadata.properties(),
+ )?;
+
+ // Create table in HMS
+ self.client
+ .0
+ .create_table(hive_table)
+ .await
+ .map_err(from_thrift_error)?;
+
+ Table::builder()
+ .file_io(self.file_io())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .identifier(table_ident.clone())
+ .build()
}
async fn update_table(&self, _commit: TableCommit) -> Result {
diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs
index 9793b7f738..069359ffc5 100644
--- a/crates/catalog/hms/tests/hms_catalog_test.rs
+++ b/crates/catalog/hms/tests/hms_catalog_test.rs
@@ -404,3 +404,39 @@ async fn test_drop_namespace() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_register_table() -> Result<()> {
+ let catalog = get_catalog().await;
+ let namespace = Namespace::new(NamespaceIdent::new("test_register_table".into()));
+ set_test_namespace(&catalog, namespace.name()).await?;
+
+ // Create a source table to get valid metadata
+ let creation = set_table_creation(None, "source_table")?;
+ let source_table = catalog.create_table(namespace.name(), creation).await?;
+ let metadata_location = source_table
+ .metadata_location()
+ .expect("metadata location should be present")
+ .to_string();
+
+ // Register the same metadata under a new table name
+ let new_ident = TableIdent::new(namespace.name().clone(), "registered_table".into());
+ let registered = catalog
+ .register_table(&new_ident, metadata_location.clone())
+ .await?;
+
+ // Verify the registered table
+ assert_eq!(registered.identifier(), &new_ident);
+ assert_eq!(
+ registered.metadata_location(),
+ Some(metadata_location.as_str())
+ );
+ assert_eq!(registered.metadata(), source_table.metadata());
+
+ // Verify the table is loadable from HMS
+ let loaded = catalog.load_table(&new_ident).await?;
+ assert_eq!(loaded.metadata(), registered.metadata());
+ assert_eq!(loaded.metadata_location(), registered.metadata_location());
+
+ Ok(())
+}