From bda54115cfdd9f4f23f07151f921f5fc7bac53c6 Mon Sep 17 00:00:00 2001 From: Brodie Alexander Date: Thu, 29 Jan 2026 15:37:36 -0600 Subject: [PATCH 1/6] Added backend-specific SQL code to check for iceberg-type --- crates/catalog/sql/src/catalog.rs | 225 +++++++++++++++++++++--------- crates/catalog/sql/src/error.rs | 7 + 2 files changed, 164 insertions(+), 68 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 8209cd04c1..da84964090 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -28,10 +28,11 @@ use iceberg::{ TableCommit, TableCreation, TableIdent, }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; -use sqlx::{Any, AnyPool, Row, Transaction}; +use sqlx::{Any, AnyPool, Column, Row, Transaction}; use crate::error::{ from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, + unsupported_sql_backend_err, }; /// catalog URI @@ -136,8 +137,6 @@ impl CatalogBuilder for SqlCatalogBuilder { name: impl Into, props: HashMap, ) -> impl Future> + Send { - let name = name.into(); - for (k, v) in props { self.0.props.insert(k, v); } @@ -149,6 +148,8 @@ impl CatalogBuilder for SqlCatalogBuilder { self.0.warehouse_location = warehouse_location; } + let name = name.into(); + let mut valid_sql_bind_style = true; if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) { if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) { @@ -158,8 +159,10 @@ impl CatalogBuilder for SqlCatalogBuilder { } } + let valid_name = !name.trim().is_empty(); + async move { - if name.trim().is_empty() { + if !valid_name { Err(Error::new( ErrorKind::DataInvalid, "Catalog name cannot be empty", @@ -175,12 +178,20 @@ impl CatalogBuilder for SqlCatalogBuilder { ), )) } else { + self.0.name = name; SqlCatalog::new(self.0).await } } } } +#[derive(Debug, Clone, Copy)] +enum SqlBackend { + SQLite, + PostgreSQL, + MySQL, +} + /// A struct representing the SQL catalog configuration. /// /// This struct contains various parameters that are used to configure a SQL catalog, @@ -202,6 +213,7 @@ struct SqlCatalogConfig { /// Sql catalog implementation. pub struct SqlCatalog { name: String, + backend: SqlBackend, connection: AnyPool, warehouse_location: String, fileio: FileIO, @@ -246,6 +258,19 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; + let backend = pool + .acquire() + .await + .map_err(from_sqlx_error)? + .backend_name() + .to_string(); + let backend = match backend.as_str() { + "PostgreSQL" => SqlBackend::PostgreSQL, + "MySQL" => SqlBackend::MySQL, + "SQLite" => SqlBackend::SQLite, + _ => return unsupported_sql_backend_err(backend), + }; + sqlx::query(&format!( "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} ( {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL, @@ -274,6 +299,7 @@ impl SqlCatalog { Ok(SqlCatalog { name: config.name.to_owned(), + backend, connection: pool, warehouse_location: config.warehouse_location, fileio, @@ -342,6 +368,51 @@ impl SqlCatalog { } } } + /// Method for getting table column names for MySQL, Postgres, or SQLite + async fn get_table_columns(&self, table: impl ToString) -> Result> { + let table = table.to_string(); + Ok(match self.backend { + SqlBackend::SQLite => sqlx::query(&format!("PRAGMA table_info('{table}')")) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)? + .iter() + .map(|x| x.get(1)) + .collect(), + SqlBackend::PostgreSQL => sqlx::query(&format!( + "SELECT column_name FROM information_schema.columns WHERE table_name='{table}'" + )) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)? + .iter() + .map(|x| x.get(0)) + .collect(), + SqlBackend::MySQL => sqlx::query(&format!( + "SELECT column_name FROM information_schema.columns WHERE table_name='{table}'" + )) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)? + .iter() + .map(|x| x.get(0)) + .collect(), + }) + } + async fn field_type_check_query(&self) -> Result { + let has_type_info = self + .get_table_columns(CATALOG_TABLE_NAME) + .await? + .contains(CATALOG_FIELD_RECORD_TYPE); + + Ok(if has_type_info { + format!( + "AND ({CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' OR {CATALOG_FIELD_RECORD_TYPE} IS NULL)" + ) + } else { + String::new() + }) + } } #[async_trait] @@ -641,6 +712,7 @@ impl Catalog for SqlCatalog { async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let exists = self.namespace_exists(namespace).await?; if exists { + let field_type_check = self.field_type_check_query().await?; let rows = self .fetch_rows( &format!( @@ -649,10 +721,7 @@ impl Catalog for SqlCatalog { FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )", + {field_type_check}", ), vec![Some(&namespace.join(".")), Some(&self.name)], ) @@ -679,6 +748,7 @@ impl Catalog for SqlCatalog { async fn table_exists(&self, identifier: &TableIdent) -> Result { let namespace = identifier.namespace().join("."); + let field_type_check = self.field_type_check_query().await?; let table_name = identifier.name(); let table_counts = self .fetch_rows( @@ -688,10 +758,7 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {field_type_check}" ), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) @@ -737,6 +804,7 @@ impl Catalog for SqlCatalog { return no_such_table_err(identifier); } + let field_type_check = self.field_type_check_query().await?; let rows = self .fetch_rows( &format!( @@ -745,10 +813,7 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {field_type_check}" ), vec![ Some(&self.name), @@ -863,6 +928,7 @@ impl Catalog for SqlCatalog { return table_already_exists_err(dest); } + let field_type_check = self.field_type_check_query().await?; self.execute( &format!( "UPDATE {CATALOG_TABLE_NAME} @@ -870,10 +936,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {field_type_check} + " ), vec![ Some(dest.name()), @@ -1004,7 +1068,16 @@ mod tests { HashMap::from([("exists".to_string(), "true".to_string())]) } - async fn new_sql_catalog(warehouse_location: String) -> impl Catalog { + /// Create a new SQLite catalog for testing. If name is not specified it defaults to "iceberg". + async fn new_sql_catalog( + warehouse_location: String, + name: Option, + ) -> impl Catalog { + let name = if let Some(name) = name { + name.to_string() + } else { + "iceberg".to_string() + }; let sql_lite_uri = format!("sqlite:{}", temp_path()); sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); @@ -1017,7 +1090,7 @@ mod tests { ), ]); SqlCatalogBuilder::default() - .load("iceberg", props) + .load(&name, props) .await .unwrap() } @@ -1112,10 +1185,10 @@ mod tests { #[tokio::test] async fn test_initialized() { let warehouse_loc = temp_path(); - new_sql_catalog(warehouse_loc.clone()).await; + new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; // catalog instantiation should not fail even if tables exist - new_sql_catalog(warehouse_loc.clone()).await; - new_sql_catalog(warehouse_loc.clone()).await; + new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; + new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; } #[tokio::test] @@ -1318,15 +1391,31 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_empty_vector() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); } + #[tokio::test] + async fn test_list_namespaces_returns_empty_different_name() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_2]) + ); + + let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await; + assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]); + } + #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -1340,7 +1429,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_only_top_level_namespaces() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("b".into()); @@ -1360,7 +1449,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_no_namespaces_under_parent() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -1377,7 +1466,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_namespace_under_parent() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("c".into()); @@ -1405,7 +1494,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_1 = NamespaceIdent::new("a".to_string()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); @@ -1438,7 +1527,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_false() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1453,7 +1542,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_true() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1463,7 +1552,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_with_properties() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("abc".into()); let mut properties = default_properties(); @@ -1486,7 +1575,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_throws_error_if_namespace_already_exists() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1511,7 +1600,7 @@ mod tests { #[tokio::test] async fn test_create_nested_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let parent_namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &parent_namespace_ident).await; @@ -1534,7 +1623,7 @@ mod tests { #[tokio::test] async fn test_create_deeply_nested_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1558,7 +1647,7 @@ mod tests { #[tokio::test] async fn test_update_namespace_noop() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1580,7 +1669,7 @@ mod tests { #[tokio::test] async fn test_update_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1609,7 +1698,7 @@ mod tests { #[tokio::test] async fn test_update_nested_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap(); create_namespace(&catalog, &namespace_ident).await; @@ -1638,7 +1727,7 @@ mod tests { #[tokio::test] async fn test_update_namespace_errors_if_namespace_doesnt_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); let props = HashMap::from_iter([ @@ -1660,7 +1749,7 @@ mod tests { #[tokio::test] async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap(); let props = HashMap::from_iter([ @@ -1682,7 +1771,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1694,7 +1783,7 @@ mod tests { #[tokio::test] async fn test_drop_nested_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1714,7 +1803,7 @@ mod tests { #[tokio::test] async fn test_drop_deeply_nested_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -1750,7 +1839,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); assert_eq!( @@ -1766,7 +1855,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; let non_existent_namespace_ident = @@ -1784,7 +1873,7 @@ mod tests { #[tokio::test] async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1804,7 +1893,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_empty_vector() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1814,7 +1903,7 @@ mod tests { #[tokio::test] async fn test_list_tables_throws_error_if_namespace_doesnt_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); @@ -1831,7 +1920,7 @@ mod tests { #[tokio::test] async fn test_create_table_with_location() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1870,7 +1959,7 @@ mod tests { #[tokio::test] async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); let mut namespace_properties = HashMap::new(); @@ -1912,7 +2001,7 @@ mod tests { async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); let mut namespace_properties = HashMap::new(); @@ -1968,7 +2057,7 @@ mod tests { async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); // note: no location specified in namespace_properties @@ -2006,7 +2095,7 @@ mod tests { async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -2042,7 +2131,7 @@ mod tests { #[tokio::test] async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; let table_name = "tbl1"; @@ -2072,7 +2161,7 @@ mod tests { #[tokio::test] async fn test_rename_table_in_same_namespace() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -2092,7 +2181,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_namespaces() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let src_namespace_ident = NamespaceIdent::new("a".into()); let dst_namespace_ident = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await; @@ -2119,7 +2208,7 @@ mod tests { #[tokio::test] async fn test_rename_table_src_table_is_same_as_dst_table() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); @@ -2138,7 +2227,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_nested_namespaces() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -2166,7 +2255,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let src_namespace_ident = NamespaceIdent::new("n1".into()); let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); create_namespace(&catalog, &src_namespace_ident).await; @@ -2188,7 +2277,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_src_table_doesnt_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -2207,7 +2296,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_table_already_exists() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -2227,7 +2316,7 @@ mod tests { #[tokio::test] async fn test_drop_table_throws_error_if_table_not_exist() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); let table_name = "tbl1"; let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); @@ -2247,7 +2336,7 @@ mod tests { #[tokio::test] async fn test_drop_table() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); let table_name = "tbl1"; let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); @@ -2283,7 +2372,7 @@ mod tests { #[tokio::test] async fn test_register_table_throws_error_if_table_with_same_name_already_exists() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; let table_name = "tbl1"; @@ -2303,7 +2392,7 @@ mod tests { #[tokio::test] async fn test_register_table() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -2342,7 +2431,7 @@ mod tests { #[tokio::test] async fn test_update_table() { let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; + let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await; // Create a test namespace and table let namespace_ident = NamespaceIdent::new("ns1".into()); diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index a08f755596..130e1c10cf 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -26,6 +26,13 @@ pub fn from_sqlx_error(error: sqlx::Error) -> Error { .with_source(error) } +pub fn unsupported_sql_backend_err(backend: impl ToString) -> Result { + Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported SQL backend: {}", backend.to_string()), + )) +} + pub fn no_such_namespace_err(namespace: &NamespaceIdent) -> Result { Err(Error::new( ErrorKind::Unexpected, From 41d59a399cc34a000fcba286d90d9cdf0ca339b1 Mon Sep 17 00:00:00 2001 From: Brodie Alexander Date: Thu, 29 Jan 2026 16:33:03 -0600 Subject: [PATCH 2/6] Removed unused import --- crates/catalog/sql/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index da84964090..fad33029ee 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -28,7 +28,7 @@ use iceberg::{ TableCommit, TableCreation, TableIdent, }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; -use sqlx::{Any, AnyPool, Column, Row, Transaction}; +use sqlx::{Any, AnyPool, Row, Transaction}; use crate::error::{ from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, From 67358b45124c75afb9405fd298099f31de707199 Mon Sep 17 00:00:00 2001 From: Brodie Alexander Date: Fri, 30 Jan 2026 09:13:03 -0600 Subject: [PATCH 3/6] Refactored many cases where backend-specific SQL code is not required --- crates/catalog/sql/src/catalog.rs | 59 +++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index fad33029ee..5a43facb33 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -712,16 +712,13 @@ impl Catalog for SqlCatalog { async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let exists = self.namespace_exists(namespace).await?; if exists { - let field_type_check = self.field_type_check_query().await?; let rows = self .fetch_rows( &format!( - "SELECT {CATALOG_FIELD_TABLE_NAME}, - {CATALOG_FIELD_TABLE_NAMESPACE} + "SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND {CATALOG_FIELD_CATALOG_NAME} = ? - {field_type_check}", + AND {CATALOG_FIELD_CATALOG_NAME} = ?", ), vec![Some(&namespace.join(".")), Some(&self.name)], ) @@ -730,6 +727,15 @@ impl Catalog for SqlCatalog { let mut tables = HashSet::::with_capacity(rows.len()); for row in rows.iter() { + // If `iceberg_type` column exists AND ≠ "TABLE", continue to next row. + if row + .try_get::<&str, _>(CATALOG_FIELD_RECORD_TYPE) + .unwrap_or(CATALOG_FIELD_TABLE_RECORD_TYPE) + != CATALOG_FIELD_TABLE_RECORD_TYPE + { + continue; + } + let tbl = row .try_get::(CATALOG_FIELD_TABLE_NAME) .map_err(from_sqlx_error)?; @@ -748,23 +754,31 @@ impl Catalog for SqlCatalog { async fn table_exists(&self, identifier: &TableIdent) -> Result { let namespace = identifier.namespace().join("."); - let field_type_check = self.field_type_check_query().await?; let table_name = identifier.name(); - let table_counts = self + let rows = self .fetch_rows( &format!( - "SELECT 1 + "SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAME} = ? - {field_type_check}" + AND {CATALOG_FIELD_TABLE_NAME} = ?" ), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) .await?; - if !table_counts.is_empty() { + // Filter for rows where `iceberg_type` is "TABLE" or not present. + let tables: Vec<_> = rows + .iter() + .filter(|row| { + row.try_get::<&str, _>(CATALOG_FIELD_RECORD_TYPE) + .unwrap_or(CATALOG_FIELD_TABLE_RECORD_TYPE) + == CATALOG_FIELD_TABLE_RECORD_TYPE + }) + .collect(); + + if !tables.is_empty() { Ok(true) } else { Ok(false) @@ -776,16 +790,15 @@ impl Catalog for SqlCatalog { return no_such_table_err(identifier); } + let field_type_check = self.field_type_check_query().await?; + self.execute( &format!( "DELETE FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {field_type_check}" ), vec![ Some(&self.name), @@ -804,16 +817,14 @@ impl Catalog for SqlCatalog { return no_such_table_err(identifier); } - let field_type_check = self.field_type_check_query().await?; let rows = self .fetch_rows( &format!( - "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP} + "SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - {field_type_check}" + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?" ), vec![ Some(&self.name), @@ -823,6 +834,16 @@ impl Catalog for SqlCatalog { ) .await?; + // Filter for rows where `iceberg_type` is "TABLE" or not present. + let rows: Vec<_> = rows + .iter() + .filter(|row| { + row.try_get::<&str, _>(CATALOG_FIELD_RECORD_TYPE) + .unwrap_or(CATALOG_FIELD_TABLE_RECORD_TYPE) + == CATALOG_FIELD_TABLE_RECORD_TYPE + }) + .collect(); + if rows.is_empty() { return no_such_table_err(identifier); } From b822d28e98ff04c6426fdc9cb98b1d71b7ebbbd0 Mon Sep 17 00:00:00 2001 From: Brodie Alexander Date: Fri, 30 Jan 2026 13:27:04 -0600 Subject: [PATCH 4/6] added migration and schema version-specific handling for tables --- crates/catalog/sql/src/catalog.rs | 403 ++++++++++++++++++++---------- 1 file changed, 266 insertions(+), 137 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 5a43facb33..604d4e3e38 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -21,7 +21,7 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; -use iceberg::spec::{TableMetadata, TableMetadataBuilder}; +use iceberg::spec::{ TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, @@ -62,6 +62,183 @@ static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if n static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning +/// SQL Schema version for `iceberg_tables`. Refer to Java impl JDBC +#[derive(Debug, Clone, Copy)] +pub enum SqlSchemaVersion { + /// Corresponds to 'V0' in Java impl. Does not support `iceberg_type` column. + V0, + /// Corresponds to 'V1' in Java impl. Supports `iceberg_type` for distinguishing between VIEWs and TABLEs + V1, +} + +fn query_list_tables(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "SELECT {CATALOG_FIELD_TABLE_NAME}, + {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ? + ", + ), + SqlSchemaVersion::V1 => format!( + "SELECT {CATALOG_FIELD_TABLE_NAME}, + {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )", + ), + } +} +fn query_table_exists(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "SELECT 1 + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + " + ), + SqlSchemaVersion::V1 => format!( + "SELECT 1 + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + } +} +fn query_drop_table(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "DELETE FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + " + ), + SqlSchemaVersion::V1 => format!( + "DELETE FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + } +} +fn query_load_table(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + " + ), + SqlSchemaVersion::V1 => format!( + "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + } +} +/// NOTE: these two paths have a different number of placeholders. +fn query_create_table(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "INSERT INTO {CATALOG_TABLE_NAME} + ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}) + VALUES (?, ?, ?, ?) + "), + SqlSchemaVersion::V1 => format!( + "INSERT INTO {CATALOG_TABLE_NAME} + ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) + VALUES (?, ?, ?, ?, ?) + "), + } +} +fn query_rename_table(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + " + ), + SqlSchemaVersion::V1 => format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + } +} +/// NOTE: these two paths have a different number of placeholders. +fn query_register_table(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "INSERT INTO {CATALOG_TABLE_NAME} + ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}) + VALUES (?, ?, ?, ?) + "), + SqlSchemaVersion::V1 => format!( + "INSERT INTO {CATALOG_TABLE_NAME} + ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) + VALUES (?, ?, ?, ?, ?) + "), + } +} +fn query_update_table(schema_ver: SqlSchemaVersion) -> String { + match schema_ver { + SqlSchemaVersion::V0 => format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?" + ), + SqlSchemaVersion::V1 => format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + ) + AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?" + ), + } +} + /// Builder for [`SqlCatalog`] #[derive(Debug)] pub struct SqlCatalogBuilder(SqlCatalogConfig); @@ -71,6 +248,7 @@ impl Default for SqlCatalogBuilder { Self(SqlCatalogConfig { uri: "".to_string(), name: "".to_string(), + upgrade_schema: false, warehouse_location: "".to_string(), sql_bind_style: SqlBindStyle::DollarNumeric, props: HashMap::new(), @@ -106,6 +284,14 @@ impl SqlCatalogBuilder { self } + /// Configure whether database schema will be upgraded on write if outdated + /// + /// Currently only applies to `iceberg_tables` schema + pub fn upgrade_schema(mut self, upgrade_schema: bool) -> Self { + self.0.upgrade_schema = upgrade_schema; + self + } + /// Configure the any properties /// /// If the same key has values set in `props` during `SqlCatalogBuilder::load`, @@ -204,6 +390,7 @@ enum SqlBackend { struct SqlCatalogConfig { uri: String, name: String, + upgrade_schema: bool, warehouse_location: String, sql_bind_style: SqlBindStyle, props: HashMap, @@ -213,7 +400,8 @@ struct SqlCatalogConfig { /// Sql catalog implementation. pub struct SqlCatalog { name: String, - backend: SqlBackend, + upgrade_schema: bool, + schema_version: SqlSchemaVersion, connection: AnyPool, warehouse_location: String, fileio: FileIO, @@ -297,9 +485,45 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; + + // Determine schema version for `iceberg_tables` + let column_names: HashSet = match backend { + SqlBackend::SQLite => sqlx::query(&format!("PRAGMA table_info('{CATALOG_TABLE_NAME}')")) + .fetch_all(&pool) + .await + .map_err(from_sqlx_error)? + .iter() + .map(|x| x.get(1)) + .collect(), + SqlBackend::PostgreSQL => sqlx::query(&format!( + "SELECT column_name FROM information_schema.columns WHERE table_name='{CATALOG_TABLE_NAME}'" + )) + .fetch_all(&pool) + .await + .map_err(from_sqlx_error)? + .iter() + .map(|x| x.get(0)) + .collect(), + SqlBackend::MySQL => sqlx::query(&format!( + "SELECT column_name FROM information_schema.columns WHERE table_name='{CATALOG_TABLE_NAME}'" + )) + .fetch_all(&pool) + .await + .map_err(from_sqlx_error)? + .iter() + .map(|x| x.get(0)) + .collect(), + }; + let schema_version = if column_names.contains(CATALOG_FIELD_RECORD_TYPE) { + SqlSchemaVersion::V1 + } else { + SqlSchemaVersion::V0 + }; + Ok(SqlCatalog { name: config.name.to_owned(), - backend, + upgrade_schema: config.upgrade_schema, + schema_version, connection: pool, warehouse_location: config.warehouse_location, fileio, @@ -368,50 +592,12 @@ impl SqlCatalog { } } } - /// Method for getting table column names for MySQL, Postgres, or SQLite - async fn get_table_columns(&self, table: impl ToString) -> Result> { - let table = table.to_string(); - Ok(match self.backend { - SqlBackend::SQLite => sqlx::query(&format!("PRAGMA table_info('{table}')")) - .fetch_all(&self.connection) - .await - .map_err(from_sqlx_error)? - .iter() - .map(|x| x.get(1)) - .collect(), - SqlBackend::PostgreSQL => sqlx::query(&format!( - "SELECT column_name FROM information_schema.columns WHERE table_name='{table}'" - )) - .fetch_all(&self.connection) - .await - .map_err(from_sqlx_error)? - .iter() - .map(|x| x.get(0)) - .collect(), - SqlBackend::MySQL => sqlx::query(&format!( - "SELECT column_name FROM information_schema.columns WHERE table_name='{table}'" - )) - .fetch_all(&self.connection) - .await - .map_err(from_sqlx_error)? - .iter() - .map(|x| x.get(0)) - .collect(), - }) - } - async fn field_type_check_query(&self) -> Result { - let has_type_info = self - .get_table_columns(CATALOG_TABLE_NAME) - .await? - .contains(CATALOG_FIELD_RECORD_TYPE); - - Ok(if has_type_info { - format!( - "AND ({CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' OR {CATALOG_FIELD_RECORD_TYPE} IS NULL)" - ) - } else { - String::new() - }) + /// Upgrade `iceberg-tables` schema if needed. + async fn upgrade_schema(&self) -> Result<()> { + if self.upgrade_schema && matches!(self.schema_version, SqlSchemaVersion::V0) { + self.execute(&format!("ALTER TABLE {CATALOG_TABLE_NAME} ADD {CATALOG_FIELD_RECORD_TYPE}"), Vec::new(), None).await?; + } + Ok(()) } } @@ -714,12 +900,7 @@ impl Catalog for SqlCatalog { if exists { let rows = self .fetch_rows( - &format!( - "SELECT * - FROM {CATALOG_TABLE_NAME} - WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND {CATALOG_FIELD_CATALOG_NAME} = ?", - ), + &query_list_tables(self.schema_version), vec![Some(&namespace.join(".")), Some(&self.name)], ) .await?; @@ -727,14 +908,7 @@ impl Catalog for SqlCatalog { let mut tables = HashSet::::with_capacity(rows.len()); for row in rows.iter() { - // If `iceberg_type` column exists AND ≠ "TABLE", continue to next row. - if row - .try_get::<&str, _>(CATALOG_FIELD_RECORD_TYPE) - .unwrap_or(CATALOG_FIELD_TABLE_RECORD_TYPE) - != CATALOG_FIELD_TABLE_RECORD_TYPE - { - continue; - } + let tbl = row .try_get::(CATALOG_FIELD_TABLE_NAME) @@ -755,28 +929,14 @@ impl Catalog for SqlCatalog { async fn table_exists(&self, identifier: &TableIdent) -> Result { let namespace = identifier.namespace().join("."); let table_name = identifier.name(); - let rows = self + let tables = self .fetch_rows( - &format!( - "SELECT * - FROM {CATALOG_TABLE_NAME} - WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAME} = ?" - ), + &query_table_exists(self.schema_version), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) .await?; - // Filter for rows where `iceberg_type` is "TABLE" or not present. - let tables: Vec<_> = rows - .iter() - .filter(|row| { - row.try_get::<&str, _>(CATALOG_FIELD_RECORD_TYPE) - .unwrap_or(CATALOG_FIELD_TABLE_RECORD_TYPE) - == CATALOG_FIELD_TABLE_RECORD_TYPE - }) - .collect(); + if !tables.is_empty() { Ok(true) @@ -790,16 +950,14 @@ impl Catalog for SqlCatalog { return no_such_table_err(identifier); } - let field_type_check = self.field_type_check_query().await?; + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + self.upgrade_schema().await?; self.execute( - &format!( - "DELETE FROM {CATALOG_TABLE_NAME} - WHERE {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - {field_type_check}" - ), + &query_drop_table(self.schema_version), vec![ Some(&self.name), Some(identifier.name()), @@ -819,13 +977,7 @@ impl Catalog for SqlCatalog { let rows = self .fetch_rows( - &format!( - "SELECT * - FROM {CATALOG_TABLE_NAME} - WHERE {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?" - ), + &query_load_table(self.schema_version), vec![ Some(&self.name), Some(identifier.name()), @@ -834,16 +986,6 @@ impl Catalog for SqlCatalog { ) .await?; - // Filter for rows where `iceberg_type` is "TABLE" or not present. - let rows: Vec<_> = rows - .iter() - .filter(|row| { - row.try_get::<&str, _>(CATALOG_FIELD_RECORD_TYPE) - .unwrap_or(CATALOG_FIELD_TABLE_RECORD_TYPE) - == CATALOG_FIELD_TABLE_RECORD_TYPE - }) - .collect(); - if rows.is_empty() { return no_such_table_err(identifier); } @@ -872,6 +1014,8 @@ impl Catalog for SqlCatalog { return no_such_namespace_err(namespace); } + self.upgrade_schema().await?; + let tbl_name = creation.name.clone(); let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone()); @@ -918,12 +1062,12 @@ impl Catalog for SqlCatalog { .write_to(&self.fileio, &tbl_metadata_location) .await?; - self.execute(&format!( - "INSERT INTO {CATALOG_TABLE_NAME} - ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) - VALUES (?, ?, ?, ?, ?) - "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?; - + // Handle different number if placeholders in different schema versions + match self.schema_version { + SqlSchemaVersion::V0 => self.execute(&query_create_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location)], None).await?, + SqlSchemaVersion::V1 => self.execute(&query_create_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?, + }; + Ok(Table::builder() .file_io(self.fileio.clone()) .metadata_location(tbl_metadata_location) @@ -937,6 +1081,8 @@ impl Catalog for SqlCatalog { return Ok(()); } + self.upgrade_schema().await?; + if !self.table_exists(src).await? { return no_such_table_err(src); } @@ -949,17 +1095,8 @@ impl Catalog for SqlCatalog { return table_already_exists_err(dest); } - let field_type_check = self.field_type_check_query().await?; self.execute( - &format!( - "UPDATE {CATALOG_TABLE_NAME} - SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ? - WHERE {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - {field_type_check} - " - ), + &query_rename_table(self.schema_version), vec![ Some(dest.name()), Some(&dest.namespace().join(".")), @@ -983,17 +1120,18 @@ impl Catalog for SqlCatalog { return table_already_exists_err(table_ident); } + self.upgrade_schema().await?; + let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?; let namespace = table_ident.namespace(); let tbl_name = table_ident.name().to_string(); - self.execute(&format!( - "INSERT INTO {CATALOG_TABLE_NAME} - ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) - VALUES (?, ?, ?, ?, ?) - "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?; - + match self.schema_version { + SqlSchemaVersion::V0 => self.execute(&query_register_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location)], None).await?, + SqlSchemaVersion::V1 => self.execute(&query_register_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?, + }; + Ok(Table::builder() .identifier(table_ident.clone()) .metadata_location(metadata_location) @@ -1004,6 +1142,8 @@ impl Catalog for SqlCatalog { /// Updates an existing table within the SQL catalog. async fn update_table(&self, commit: TableCommit) -> Result { + self.upgrade_schema().await?; + let table_ident = commit.identifier().clone(); let current_table = self.load_table(&table_ident).await?; let current_metadata_location = current_table.metadata_location_result()?.to_string(); @@ -1018,18 +1158,7 @@ impl Catalog for SqlCatalog { let update_result = self .execute( - &format!( - "UPDATE {CATALOG_TABLE_NAME} - SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ? - WHERE {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - ) - AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?" - ), + &query_update_table(self.schema_version), vec![ Some(staged_metadata_location), Some(current_metadata_location.as_str()), From 8de2a87bc88468fb377783a9b2e7702eed3eb093 Mon Sep 17 00:00:00 2001 From: Brodie Alexander Date: Fri, 30 Jan 2026 13:33:17 -0600 Subject: [PATCH 5/6] remove trailing whitespace --- crates/catalog/sql/src/catalog.rs | 116 +++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 604d4e3e38..4c2ab5a1ba 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -21,7 +21,7 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; -use iceberg::spec::{ TableMetadata, TableMetadataBuilder}; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, @@ -67,7 +67,7 @@ static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each con pub enum SqlSchemaVersion { /// Corresponds to 'V0' in Java impl. Does not support `iceberg_type` column. V0, - /// Corresponds to 'V1' in Java impl. Supports `iceberg_type` for distinguishing between VIEWs and TABLEs + /// Corresponds to 'V1' in Java impl. Supports `iceberg_type` for distinguishing between VIEW and TABLE V1, } @@ -285,7 +285,7 @@ impl SqlCatalogBuilder { } /// Configure whether database schema will be upgraded on write if outdated - /// + /// /// Currently only applies to `iceberg_tables` schema pub fn upgrade_schema(mut self, upgrade_schema: bool) -> Self { self.0.upgrade_schema = upgrade_schema; @@ -485,9 +485,8 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; - // Determine schema version for `iceberg_tables` - let column_names: HashSet = match backend { + let column_names: HashSet = match backend { SqlBackend::SQLite => sqlx::query(&format!("PRAGMA table_info('{CATALOG_TABLE_NAME}')")) .fetch_all(&pool) .await @@ -595,8 +594,13 @@ impl SqlCatalog { /// Upgrade `iceberg-tables` schema if needed. async fn upgrade_schema(&self) -> Result<()> { if self.upgrade_schema && matches!(self.schema_version, SqlSchemaVersion::V0) { - self.execute(&format!("ALTER TABLE {CATALOG_TABLE_NAME} ADD {CATALOG_FIELD_RECORD_TYPE}"), Vec::new(), None).await?; - } + self.execute( + &format!("ALTER TABLE {CATALOG_TABLE_NAME} ADD {CATALOG_FIELD_RECORD_TYPE}"), + Vec::new(), + None, + ) + .await?; + } Ok(()) } } @@ -899,17 +903,15 @@ impl Catalog for SqlCatalog { let exists = self.namespace_exists(namespace).await?; if exists { let rows = self - .fetch_rows( - &query_list_tables(self.schema_version), - vec![Some(&namespace.join(".")), Some(&self.name)], - ) + .fetch_rows(&query_list_tables(self.schema_version), vec![ + Some(&namespace.join(".")), + Some(&self.name), + ]) .await?; let mut tables = HashSet::::with_capacity(rows.len()); for row in rows.iter() { - - let tbl = row .try_get::(CATALOG_FIELD_TABLE_NAME) .map_err(from_sqlx_error)?; @@ -930,14 +932,13 @@ impl Catalog for SqlCatalog { let namespace = identifier.namespace().join("."); let table_name = identifier.name(); let tables = self - .fetch_rows( - &query_table_exists(self.schema_version), - vec![Some(&namespace), Some(&self.name), Some(table_name)], - ) + .fetch_rows(&query_table_exists(self.schema_version), vec![ + Some(&namespace), + Some(&self.name), + Some(table_name), + ]) .await?; - - if !tables.is_empty() { Ok(true) } else { @@ -953,7 +954,7 @@ impl Catalog for SqlCatalog { if !self.table_exists(identifier).await? { return no_such_table_err(identifier); } - + self.upgrade_schema().await?; self.execute( @@ -976,14 +977,11 @@ impl Catalog for SqlCatalog { } let rows = self - .fetch_rows( - &query_load_table(self.schema_version), - vec![ - Some(&self.name), - Some(identifier.name()), - Some(&identifier.namespace().join(".")), - ], - ) + .fetch_rows(&query_load_table(self.schema_version), vec![ + Some(&self.name), + Some(identifier.name()), + Some(&identifier.namespace().join(".")), + ]) .await?; if rows.is_empty() { @@ -1064,10 +1062,35 @@ impl Catalog for SqlCatalog { // Handle different number if placeholders in different schema versions match self.schema_version { - SqlSchemaVersion::V0 => self.execute(&query_create_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location)], None).await?, - SqlSchemaVersion::V1 => self.execute(&query_create_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?, + SqlSchemaVersion::V0 => { + self.execute( + &query_create_table(self.schema_version), + vec![ + Some(&self.name), + Some(&namespace.join(".")), + Some(&tbl_name.clone()), + Some(&tbl_metadata_location), + ], + None, + ) + .await? + } + SqlSchemaVersion::V1 => { + self.execute( + &query_create_table(self.schema_version), + vec![ + Some(&self.name), + Some(&namespace.join(".")), + Some(&tbl_name.clone()), + Some(&tbl_metadata_location), + Some(CATALOG_FIELD_TABLE_RECORD_TYPE), + ], + None, + ) + .await? + } }; - + Ok(Table::builder() .file_io(self.fileio.clone()) .metadata_location(tbl_metadata_location) @@ -1128,10 +1151,35 @@ impl Catalog for SqlCatalog { let tbl_name = table_ident.name().to_string(); match self.schema_version { - SqlSchemaVersion::V0 => self.execute(&query_register_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location)], None).await?, - SqlSchemaVersion::V1 => self.execute(&query_register_table(self.schema_version), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?, + SqlSchemaVersion::V0 => { + self.execute( + &query_register_table(self.schema_version), + vec![ + Some(&self.name), + Some(&namespace.join(".")), + Some(&tbl_name), + Some(&metadata_location), + ], + None, + ) + .await? + } + SqlSchemaVersion::V1 => { + self.execute( + &query_register_table(self.schema_version), + vec![ + Some(&self.name), + Some(&namespace.join(".")), + Some(&tbl_name), + Some(&metadata_location), + Some(CATALOG_FIELD_TABLE_RECORD_TYPE), + ], + None, + ) + .await? + } }; - + Ok(Table::builder() .identifier(table_ident.clone()) .metadata_location(metadata_location) From ce88708cf0d1bdf3579706c4cee797bd30c19de7 Mon Sep 17 00:00:00 2001 From: Brodie Alexander Date: Fri, 30 Jan 2026 15:04:50 -0600 Subject: [PATCH 6/6] add concrete type for iceberg_type --- crates/catalog/sql/src/catalog.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 4c2ab5a1ba..743ee264c5 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -595,7 +595,9 @@ impl SqlCatalog { async fn upgrade_schema(&self) -> Result<()> { if self.upgrade_schema && matches!(self.schema_version, SqlSchemaVersion::V0) { self.execute( - &format!("ALTER TABLE {CATALOG_TABLE_NAME} ADD {CATALOG_FIELD_RECORD_TYPE}"), + &format!( + "ALTER TABLE {CATALOG_TABLE_NAME} ADD {CATALOG_FIELD_RECORD_TYPE} varchar(5)" + ), Vec::new(), None, )