Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,15 +592,58 @@ impl Catalog for HmsCatalog {
Ok(())
}

/// Registers an existing Iceberg table by its metadata location.
///
/// This method allows registering a table that already has metadata written
/// to storage, without creating new metadata. It reads the existing metadata
/// from the provided location and creates a corresponding entry in HMS.
///
/// # Returns
/// A `Result` wrapping a `Table` object representing the registered table.
///
/// # Errors
/// This function may return an error in several scenarios:
/// - Failure to validate the namespace identifier
/// - Failure to read metadata from the provided location
/// - Table already exists in HMS
/// - Errors communicating with the Hive Metastore
async fn register_table(
&self,
_table_ident: &TableIdent,
_metadata_location: String,
table_ident: &TableIdent,
metadata_location: String,
) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Registering a table is not supported yet",
))
let db_name = validate_namespace(table_ident.namespace())?;
let table_name = table_ident.name().to_string();

// Read metadata from provided location
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;

// Get table location from metadata
let location = metadata.location().to_string();

// Convert to HMS table format
let hive_table = convert_to_hive_table(
db_name.clone(),
metadata.current_schema(),
table_name.clone(),
location,
metadata_location.clone(),
metadata.properties(),
)?;

// Create table in HMS
self.client
.0
.create_table(hive_table)
.await
.map_err(from_thrift_error)?;

Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(table_ident.clone())
.build()
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Expand Down
36 changes: 36 additions & 0 deletions crates/catalog/hms/tests/hms_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,39 @@ async fn test_drop_namespace() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_register_table() -> Result<()> {
let catalog = get_catalog().await;
let namespace = Namespace::new(NamespaceIdent::new("test_register_table".into()));
set_test_namespace(&catalog, namespace.name()).await?;

// Create a source table to get valid metadata
let creation = set_table_creation(None, "source_table")?;
let source_table = catalog.create_table(namespace.name(), creation).await?;
let metadata_location = source_table
.metadata_location()
.expect("metadata location should be present")
.to_string();

// Register the same metadata under a new table name
let new_ident = TableIdent::new(namespace.name().clone(), "registered_table".into());
let registered = catalog
.register_table(&new_ident, metadata_location.clone())
.await?;

// Verify the registered table
assert_eq!(registered.identifier(), &new_ident);
assert_eq!(
registered.metadata_location(),
Some(metadata_location.as_str())
);
assert_eq!(registered.metadata(), source_table.metadata());

// Verify the table is loadable from HMS
let loaded = catalog.load_table(&new_ident).await?;
assert_eq!(loaded.metadata(), registered.metadata());
assert_eq!(loaded.metadata_location(), registered.metadata_location());

Ok(())
}
Loading