1use std::collections::HashMap;
19use std::fmt::{Debug, Formatter};
20use std::net::ToSocketAddrs;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use hive_metastore::{
26 ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
27 ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
28};
29use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
30use iceberg::spec::{TableMetadata, TableMetadataBuilder};
31use iceberg::table::Table;
32use iceberg::{
33 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34 Runtime, TableCommit, TableCreation, TableIdent,
35};
36use volo_thrift::MaybeException;
37
38use super::utils::*;
39use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};
40
41pub const HMS_CATALOG_PROP_URI: &str = "uri";
43
44pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
46pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
48pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
50
51pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54#[derive(Debug)]
56pub struct HmsCatalogBuilder {
57 config: HmsCatalogConfig,
58 storage_factory: Option<Arc<dyn StorageFactory>>,
59 runtime: Option<Runtime>,
60}
61
62impl Default for HmsCatalogBuilder {
63 fn default() -> Self {
64 Self {
65 config: HmsCatalogConfig {
66 name: None,
67 address: "".to_string(),
68 thrift_transport: HmsThriftTransport::default(),
69 warehouse: "".to_string(),
70 props: HashMap::new(),
71 },
72 storage_factory: None,
73 runtime: None,
74 }
75 }
76}
77
78impl CatalogBuilder for HmsCatalogBuilder {
79 type C = HmsCatalog;
80
81 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
82 self.storage_factory = Some(storage_factory);
83 self
84 }
85
86 fn with_runtime(mut self, runtime: Runtime) -> Self {
87 self.runtime = Some(runtime);
88 self
89 }
90
91 fn load(
92 mut self,
93 name: impl Into<String>,
94 props: HashMap<String, String>,
95 ) -> impl Future<Output = Result<Self::C>> + Send {
96 self.config.name = Some(name.into());
97
98 if props.contains_key(HMS_CATALOG_PROP_URI) {
99 self.config.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
100 }
101
102 if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
103 self.config.thrift_transport = match tt.to_lowercase().as_str() {
104 THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
105 THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
106 _ => HmsThriftTransport::default(),
107 };
108 }
109
110 if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
111 self.config.warehouse = props
112 .get(HMS_CATALOG_PROP_WAREHOUSE)
113 .cloned()
114 .unwrap_or_default();
115 }
116
117 self.config.props = props
118 .into_iter()
119 .filter(|(k, _)| {
120 k != HMS_CATALOG_PROP_URI
121 && k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
122 && k != HMS_CATALOG_PROP_WAREHOUSE
123 })
124 .collect();
125
126 let result = (|| -> Result<HmsCatalog> {
127 if self.config.name.is_none() {
128 return Err(Error::new(
129 ErrorKind::DataInvalid,
130 "Catalog name is required",
131 ));
132 }
133 if self.config.address.is_empty() {
134 return Err(Error::new(
135 ErrorKind::DataInvalid,
136 "Catalog address is required",
137 ));
138 }
139 if self.config.warehouse.is_empty() {
140 return Err(Error::new(
141 ErrorKind::DataInvalid,
142 "Catalog warehouse is required",
143 ));
144 }
145 let runtime = match self.runtime {
146 Some(rt) => rt,
147 None => Runtime::try_current()?,
148 };
149 HmsCatalog::new(self.config, self.storage_factory, runtime)
150 })();
151
152 std::future::ready(result)
153 }
154}
155
156#[derive(Debug, Default)]
159pub enum HmsThriftTransport {
160 Framed,
162 #[default]
164 Buffered,
165}
166
167#[derive(Debug)]
169pub(crate) struct HmsCatalogConfig {
170 name: Option<String>,
171 address: String,
172 thrift_transport: HmsThriftTransport,
173 warehouse: String,
174 props: HashMap<String, String>,
175}
176
177struct HmsClient(ThriftHiveMetastoreClient);
178
179pub struct HmsCatalog {
181 config: HmsCatalogConfig,
182 client: HmsClient,
183 file_io: FileIO,
184 runtime: Runtime,
185}
186
187impl Debug for HmsCatalog {
188 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
189 f.debug_struct("HmsCatalog")
190 .field("config", &self.config)
191 .finish_non_exhaustive()
192 }
193}
194
195impl HmsCatalog {
196 fn new(
198 config: HmsCatalogConfig,
199 storage_factory: Option<Arc<dyn StorageFactory>>,
200 runtime: Runtime,
201 ) -> Result<Self> {
202 let address = config
203 .address
204 .as_str()
205 .to_socket_addrs()
206 .map_err(from_io_error)?
207 .next()
208 .ok_or_else(|| {
209 Error::new(
210 ErrorKind::Unexpected,
211 format!("invalid address: {}", config.address),
212 )
213 })?;
214
215 let builder = ThriftHiveMetastoreClientBuilder::new("hms").address(address);
216
217 let client = match &config.thrift_transport {
218 HmsThriftTransport::Framed => builder
219 .make_codec(volo_thrift::codec::default::DefaultMakeCodec::framed())
220 .build(),
221 HmsThriftTransport::Buffered => builder
222 .make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
223 .build(),
224 };
225
226 let factory = storage_factory.ok_or_else(|| {
227 Error::new(
228 ErrorKind::Unexpected,
229 "StorageFactory must be provided for HmsCatalog. Use `with_storage_factory` to configure it.",
230 )
231 })?;
232 let file_io = FileIOBuilder::new(factory)
233 .with_props(&config.props)
234 .build();
235
236 Ok(Self {
237 config,
238 client: HmsClient(client),
239 file_io,
240 runtime,
241 })
242 }
243 pub fn file_io(&self) -> FileIO {
245 self.file_io.clone()
246 }
247}
248
249#[async_trait]
250impl Catalog for HmsCatalog {
251 async fn list_namespaces(
257 &self,
258 parent: Option<&NamespaceIdent>,
259 ) -> Result<Vec<NamespaceIdent>> {
260 let dbs = if parent.is_some() {
261 return Ok(vec![]);
262 } else {
263 self.client
264 .0
265 .get_all_databases()
266 .await
267 .map(from_thrift_exception)
268 .map_err(from_thrift_error)??
269 };
270
271 Ok(dbs
272 .into_iter()
273 .map(|v| NamespaceIdent::new(v.into()))
274 .collect())
275 }
276
277 async fn create_namespace(
293 &self,
294 namespace: &NamespaceIdent,
295 properties: HashMap<String, String>,
296 ) -> Result<Namespace> {
297 if self.namespace_exists(namespace).await? {
298 return Err(Error::new(
299 ErrorKind::NamespaceAlreadyExists,
300 format!("Namespace {namespace:?} already exists"),
301 ));
302 }
303 let database = convert_to_database(namespace, &properties)?;
304
305 self.client
306 .0
307 .create_database(database)
308 .await
309 .map_err(from_thrift_error)?;
310
311 Ok(Namespace::with_properties(namespace.clone(), properties))
312 }
313
314 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
325 let name = validate_namespace(namespace)?;
326
327 let resp = self
328 .client
329 .0
330 .get_database(name.into())
331 .await
332 .map_err(from_thrift_error)?;
333
334 let db = match resp {
335 MaybeException::Ok(db) => db,
336 MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_)) => {
337 return Err(Error::new(
338 ErrorKind::NamespaceNotFound,
339 format!("Namespace {namespace:?} not found"),
340 ));
341 }
342 MaybeException::Exception(exception) => {
343 return Err(Error::new(
344 ErrorKind::Unexpected,
345 "Operation failed for hitting thrift error".to_string(),
346 )
347 .with_source(anyhow!("thrift error: {exception:?}")));
348 }
349 };
350
351 let ns = convert_to_namespace(&db)?;
352
353 Ok(ns)
354 }
355
356 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
369 let name = validate_namespace(namespace)?;
370
371 let resp = self.client.0.get_database(name.into()).await;
372
373 match resp {
374 Ok(MaybeException::Ok(_)) => Ok(true),
375 Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) => {
376 Ok(false)
377 }
378 Ok(MaybeException::Exception(exception)) => Err(Error::new(
379 ErrorKind::Unexpected,
380 "Operation failed for hitting thrift error".to_string(),
381 )
382 .with_source(anyhow!("thrift error: {exception:?}"))),
383 Err(err) => Err(from_thrift_error(err)),
384 }
385 }
386
387 async fn update_namespace(
398 &self,
399 namespace: &NamespaceIdent,
400 properties: HashMap<String, String>,
401 ) -> Result<()> {
402 if !self.namespace_exists(namespace).await? {
403 return Err(Error::new(
404 ErrorKind::NamespaceNotFound,
405 format!("Namespace {namespace:?} does not exist"),
406 ));
407 }
408 let db = convert_to_database(namespace, &properties)?;
409
410 let name = match &db.name {
411 Some(name) => name,
412 None => {
413 return Err(Error::new(
414 ErrorKind::DataInvalid,
415 "Database name must be specified",
416 ));
417 }
418 };
419
420 self.client
421 .0
422 .alter_database(name.clone(), db)
423 .await
424 .map_err(from_thrift_error)?;
425
426 Ok(())
427 }
428
429 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
437 let name = validate_namespace(namespace)?;
438
439 if !self.namespace_exists(namespace).await? {
440 return Err(Error::new(
441 ErrorKind::NamespaceNotFound,
442 format!("Namespace {namespace:?} does not exist"),
443 ));
444 }
445
446 self.client
447 .0
448 .drop_database(name.into(), false, false)
449 .await
450 .map_err(from_thrift_error)?;
451
452 Ok(())
453 }
454
455 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
465 let name = validate_namespace(namespace)?;
466 if !self.namespace_exists(namespace).await? {
467 return Err(Error::new(
468 ErrorKind::NamespaceNotFound,
469 format!("Namespace {namespace:?} does not exist"),
470 ));
471 }
472
473 let tables = self
474 .client
475 .0
476 .get_all_tables(name.into())
477 .await
478 .map(from_thrift_exception)
479 .map_err(from_thrift_error)??;
480
481 let tables = tables
482 .iter()
483 .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
484 .collect();
485
486 Ok(tables)
487 }
488
489 async fn create_table(
502 &self,
503 namespace: &NamespaceIdent,
504 mut creation: TableCreation,
505 ) -> Result<Table> {
506 let db_name = validate_namespace(namespace)?;
507 let table_name = creation.name.clone();
508
509 let location = match &creation.location {
510 Some(location) => location.clone(),
511 None => {
512 let ns = self.get_namespace(namespace).await?;
513 let location = get_default_table_location(&ns, &table_name, &self.config.warehouse);
514 creation.location = Some(location.clone());
515 location
516 }
517 };
518 let metadata = TableMetadataBuilder::from_table_creation(creation)?
519 .build()?
520 .metadata;
521
522 let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
523
524 metadata.write_to(&self.file_io, &metadata_location).await?;
525
526 let metadata_location_str = metadata_location.to_string();
527 let hive_table = convert_to_hive_table(
528 db_name.clone(),
529 metadata.current_schema(),
530 table_name.clone(),
531 location,
532 metadata_location_str.clone(),
533 metadata.properties(),
534 )?;
535
536 self.client
537 .0
538 .create_table(hive_table)
539 .await
540 .map_err(from_thrift_error)?;
541
542 Table::builder()
543 .file_io(self.file_io())
544 .metadata_location(metadata_location_str)
545 .metadata(metadata)
546 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
547 .runtime(self.runtime.clone())
548 .build()
549 }
550
551 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
564 let db_name = validate_namespace(table.namespace())?;
565
566 let hive_table = self
567 .client
568 .0
569 .get_table(db_name.clone().into(), table.name.clone().into())
570 .await
571 .map(from_thrift_exception)
572 .map_err(from_thrift_error)??;
573
574 let metadata_location = get_metadata_location(&hive_table.parameters)?;
575
576 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
577
578 Table::builder()
579 .file_io(self.file_io())
580 .metadata_location(metadata_location)
581 .metadata(metadata)
582 .identifier(TableIdent::new(
583 NamespaceIdent::new(db_name),
584 table.name.clone(),
585 ))
586 .runtime(self.runtime.clone())
587 .build()
588 }
589
590 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
601 let db_name = validate_namespace(table.namespace())?;
602 if !self.namespace_exists(table.namespace()).await? {
603 return Err(Error::new(
604 ErrorKind::NamespaceNotFound,
605 format!("Namespace {:?} does not exist", table.namespace()),
606 ));
607 }
608 if !self.table_exists(table).await? {
609 return Err(Error::new(
610 ErrorKind::TableNotFound,
611 format!("Table {table:?} does not exist"),
612 ));
613 }
614
615 self.client
616 .0
617 .drop_table(db_name.into(), table.name.clone().into(), false)
618 .await
619 .map_err(from_thrift_error)?;
620
621 Ok(())
622 }
623
624 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
625 let table_info = self.load_table(table).await?;
626 self.drop_table(table).await?;
627 iceberg::drop_table_data(
628 table_info.file_io(),
629 table_info.metadata(),
630 table_info.metadata_location(),
631 )
632 .await
633 }
634
635 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
643 let db_name = validate_namespace(table.namespace())?;
644 let table_name = table.name.clone();
645
646 let resp = self
647 .client
648 .0
649 .get_table(db_name.into(), table_name.into())
650 .await;
651
652 match resp {
653 Ok(MaybeException::Ok(_)) => Ok(true),
654 Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => Ok(false),
655 Ok(MaybeException::Exception(exception)) => Err(Error::new(
656 ErrorKind::Unexpected,
657 "Operation failed for hitting thrift error".to_string(),
658 )
659 .with_source(anyhow!("thrift error: {exception:?}"))),
660 Err(err) => Err(from_thrift_error(err)),
661 }
662 }
663
664 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
671 let src_dbname = validate_namespace(src.namespace())?;
672 let dest_dbname = validate_namespace(dest.namespace())?;
673 if self.table_exists(dest).await? {
674 return Err(Error::new(
675 ErrorKind::TableAlreadyExists,
676 format!("Destination table {dest:?} already exists"),
677 ));
678 }
679
680 let src_tbl_name = src.name.clone();
681 let dest_tbl_name = dest.name.clone();
682
683 let mut tbl = self
684 .client
685 .0
686 .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
687 .await
688 .map(from_thrift_exception)
689 .map_err(from_thrift_error)??;
690
691 tbl.db_name = Some(dest_dbname.into());
692 tbl.table_name = Some(dest_tbl_name.into());
693
694 self.client
695 .0
696 .alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
697 .await
698 .map_err(from_thrift_error)?;
699
700 Ok(())
701 }
702
703 async fn register_table(
704 &self,
705 _table_ident: &TableIdent,
706 _metadata_location: String,
707 ) -> Result<Table> {
708 Err(Error::new(
709 ErrorKind::FeatureUnsupported,
710 "Registering a table is not supported yet",
711 ))
712 }
713
714 async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
715 Err(Error::new(
716 ErrorKind::FeatureUnsupported,
717 "Updating a table is not supported yet",
718 ))
719 }
720}