1use std::collections::HashMap;
19use std::fmt::{Debug, Formatter};
20use std::net::ToSocketAddrs;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use hive_metastore::{
26 ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
27 ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
28};
29use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
30use iceberg::spec::{TableMetadata, TableMetadataBuilder};
31use iceberg::table::Table;
32use iceberg::{
33 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34 TableCommit, TableCreation, TableIdent,
35};
36use volo_thrift::MaybeException;
37
38use super::utils::*;
39use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};
40
41pub const HMS_CATALOG_PROP_URI: &str = "uri";
43
44pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
46pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
48pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
50
51pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54#[derive(Debug)]
56pub struct HmsCatalogBuilder {
57 config: HmsCatalogConfig,
58 storage_factory: Option<Arc<dyn StorageFactory>>,
59}
60
61impl Default for HmsCatalogBuilder {
62 fn default() -> Self {
63 Self {
64 config: HmsCatalogConfig {
65 name: None,
66 address: "".to_string(),
67 thrift_transport: HmsThriftTransport::default(),
68 warehouse: "".to_string(),
69 props: HashMap::new(),
70 },
71 storage_factory: None,
72 }
73 }
74}
75
76impl CatalogBuilder for HmsCatalogBuilder {
77 type C = HmsCatalog;
78
79 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
80 self.storage_factory = Some(storage_factory);
81 self
82 }
83
84 fn load(
85 mut self,
86 name: impl Into<String>,
87 props: HashMap<String, String>,
88 ) -> impl Future<Output = Result<Self::C>> + Send {
89 self.config.name = Some(name.into());
90
91 if props.contains_key(HMS_CATALOG_PROP_URI) {
92 self.config.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
93 }
94
95 if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
96 self.config.thrift_transport = match tt.to_lowercase().as_str() {
97 THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
98 THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
99 _ => HmsThriftTransport::default(),
100 };
101 }
102
103 if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
104 self.config.warehouse = props
105 .get(HMS_CATALOG_PROP_WAREHOUSE)
106 .cloned()
107 .unwrap_or_default();
108 }
109
110 self.config.props = props
111 .into_iter()
112 .filter(|(k, _)| {
113 k != HMS_CATALOG_PROP_URI
114 && k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
115 && k != HMS_CATALOG_PROP_WAREHOUSE
116 })
117 .collect();
118
119 let result = {
120 if self.config.name.is_none() {
121 Err(Error::new(
122 ErrorKind::DataInvalid,
123 "Catalog name is required",
124 ))
125 } else if self.config.address.is_empty() {
126 Err(Error::new(
127 ErrorKind::DataInvalid,
128 "Catalog address is required",
129 ))
130 } else if self.config.warehouse.is_empty() {
131 Err(Error::new(
132 ErrorKind::DataInvalid,
133 "Catalog warehouse is required",
134 ))
135 } else {
136 HmsCatalog::new(self.config, self.storage_factory)
137 }
138 };
139
140 std::future::ready(result)
141 }
142}
143
144#[derive(Debug, Default)]
147pub enum HmsThriftTransport {
148 Framed,
150 #[default]
152 Buffered,
153}
154
155#[derive(Debug)]
157pub(crate) struct HmsCatalogConfig {
158 name: Option<String>,
159 address: String,
160 thrift_transport: HmsThriftTransport,
161 warehouse: String,
162 props: HashMap<String, String>,
163}
164
165struct HmsClient(ThriftHiveMetastoreClient);
166
167pub struct HmsCatalog {
169 config: HmsCatalogConfig,
170 client: HmsClient,
171 file_io: FileIO,
172}
173
174impl Debug for HmsCatalog {
175 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
176 f.debug_struct("HmsCatalog")
177 .field("config", &self.config)
178 .finish_non_exhaustive()
179 }
180}
181
182impl HmsCatalog {
183 fn new(
185 config: HmsCatalogConfig,
186 storage_factory: Option<Arc<dyn StorageFactory>>,
187 ) -> Result<Self> {
188 let address = config
189 .address
190 .as_str()
191 .to_socket_addrs()
192 .map_err(from_io_error)?
193 .next()
194 .ok_or_else(|| {
195 Error::new(
196 ErrorKind::Unexpected,
197 format!("invalid address: {}", config.address),
198 )
199 })?;
200
201 let builder = ThriftHiveMetastoreClientBuilder::new("hms").address(address);
202
203 let client = match &config.thrift_transport {
204 HmsThriftTransport::Framed => builder
205 .make_codec(volo_thrift::codec::default::DefaultMakeCodec::framed())
206 .build(),
207 HmsThriftTransport::Buffered => builder
208 .make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
209 .build(),
210 };
211
212 let factory = storage_factory.ok_or_else(|| {
213 Error::new(
214 ErrorKind::Unexpected,
215 "StorageFactory must be provided for HmsCatalog. Use `with_storage_factory` to configure it.",
216 )
217 })?;
218 let file_io = FileIOBuilder::new(factory)
219 .with_props(&config.props)
220 .build();
221
222 Ok(Self {
223 config,
224 client: HmsClient(client),
225 file_io,
226 })
227 }
228 pub fn file_io(&self) -> FileIO {
230 self.file_io.clone()
231 }
232}
233
234#[async_trait]
235impl Catalog for HmsCatalog {
236 async fn list_namespaces(
242 &self,
243 parent: Option<&NamespaceIdent>,
244 ) -> Result<Vec<NamespaceIdent>> {
245 let dbs = if parent.is_some() {
246 return Ok(vec![]);
247 } else {
248 self.client
249 .0
250 .get_all_databases()
251 .await
252 .map(from_thrift_exception)
253 .map_err(from_thrift_error)??
254 };
255
256 Ok(dbs
257 .into_iter()
258 .map(|v| NamespaceIdent::new(v.into()))
259 .collect())
260 }
261
262 async fn create_namespace(
278 &self,
279 namespace: &NamespaceIdent,
280 properties: HashMap<String, String>,
281 ) -> Result<Namespace> {
282 if self.namespace_exists(namespace).await? {
283 return Err(Error::new(
284 ErrorKind::NamespaceAlreadyExists,
285 format!("Namespace {namespace:?} already exists"),
286 ));
287 }
288 let database = convert_to_database(namespace, &properties)?;
289
290 self.client
291 .0
292 .create_database(database)
293 .await
294 .map_err(from_thrift_error)?;
295
296 Ok(Namespace::with_properties(namespace.clone(), properties))
297 }
298
299 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
310 let name = validate_namespace(namespace)?;
311
312 let resp = self
313 .client
314 .0
315 .get_database(name.into())
316 .await
317 .map_err(from_thrift_error)?;
318
319 let db = match resp {
320 MaybeException::Ok(db) => db,
321 MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_)) => {
322 return Err(Error::new(
323 ErrorKind::NamespaceNotFound,
324 format!("Namespace {namespace:?} not found"),
325 ));
326 }
327 MaybeException::Exception(exception) => {
328 return Err(Error::new(
329 ErrorKind::Unexpected,
330 "Operation failed for hitting thrift error".to_string(),
331 )
332 .with_source(anyhow!("thrift error: {exception:?}")));
333 }
334 };
335
336 let ns = convert_to_namespace(&db)?;
337
338 Ok(ns)
339 }
340
341 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
354 let name = validate_namespace(namespace)?;
355
356 let resp = self.client.0.get_database(name.into()).await;
357
358 match resp {
359 Ok(MaybeException::Ok(_)) => Ok(true),
360 Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) => {
361 Ok(false)
362 }
363 Ok(MaybeException::Exception(exception)) => Err(Error::new(
364 ErrorKind::Unexpected,
365 "Operation failed for hitting thrift error".to_string(),
366 )
367 .with_source(anyhow!("thrift error: {exception:?}"))),
368 Err(err) => Err(from_thrift_error(err)),
369 }
370 }
371
372 async fn update_namespace(
383 &self,
384 namespace: &NamespaceIdent,
385 properties: HashMap<String, String>,
386 ) -> Result<()> {
387 if !self.namespace_exists(namespace).await? {
388 return Err(Error::new(
389 ErrorKind::NamespaceNotFound,
390 format!("Namespace {namespace:?} does not exist"),
391 ));
392 }
393 let db = convert_to_database(namespace, &properties)?;
394
395 let name = match &db.name {
396 Some(name) => name,
397 None => {
398 return Err(Error::new(
399 ErrorKind::DataInvalid,
400 "Database name must be specified",
401 ));
402 }
403 };
404
405 self.client
406 .0
407 .alter_database(name.clone(), db)
408 .await
409 .map_err(from_thrift_error)?;
410
411 Ok(())
412 }
413
414 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
422 let name = validate_namespace(namespace)?;
423
424 if !self.namespace_exists(namespace).await? {
425 return Err(Error::new(
426 ErrorKind::NamespaceNotFound,
427 format!("Namespace {namespace:?} does not exist"),
428 ));
429 }
430
431 self.client
432 .0
433 .drop_database(name.into(), false, false)
434 .await
435 .map_err(from_thrift_error)?;
436
437 Ok(())
438 }
439
440 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
450 let name = validate_namespace(namespace)?;
451 if !self.namespace_exists(namespace).await? {
452 return Err(Error::new(
453 ErrorKind::NamespaceNotFound,
454 format!("Namespace {namespace:?} does not exist"),
455 ));
456 }
457
458 let tables = self
459 .client
460 .0
461 .get_all_tables(name.into())
462 .await
463 .map(from_thrift_exception)
464 .map_err(from_thrift_error)??;
465
466 let tables = tables
467 .iter()
468 .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
469 .collect();
470
471 Ok(tables)
472 }
473
474 async fn create_table(
487 &self,
488 namespace: &NamespaceIdent,
489 mut creation: TableCreation,
490 ) -> Result<Table> {
491 let db_name = validate_namespace(namespace)?;
492 let table_name = creation.name.clone();
493
494 let location = match &creation.location {
495 Some(location) => location.clone(),
496 None => {
497 let ns = self.get_namespace(namespace).await?;
498 let location = get_default_table_location(&ns, &table_name, &self.config.warehouse);
499 creation.location = Some(location.clone());
500 location
501 }
502 };
503 let metadata = TableMetadataBuilder::from_table_creation(creation)?
504 .build()?
505 .metadata;
506
507 let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
508
509 metadata.write_to(&self.file_io, &metadata_location).await?;
510
511 let metadata_location_str = metadata_location.to_string();
512 let hive_table = convert_to_hive_table(
513 db_name.clone(),
514 metadata.current_schema(),
515 table_name.clone(),
516 location,
517 metadata_location_str.clone(),
518 metadata.properties(),
519 )?;
520
521 self.client
522 .0
523 .create_table(hive_table)
524 .await
525 .map_err(from_thrift_error)?;
526
527 Table::builder()
528 .file_io(self.file_io())
529 .metadata_location(metadata_location_str)
530 .metadata(metadata)
531 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
532 .build()
533 }
534
535 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
548 let db_name = validate_namespace(table.namespace())?;
549
550 let hive_table = self
551 .client
552 .0
553 .get_table(db_name.clone().into(), table.name.clone().into())
554 .await
555 .map(from_thrift_exception)
556 .map_err(from_thrift_error)??;
557
558 let metadata_location = get_metadata_location(&hive_table.parameters)?;
559
560 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
561
562 Table::builder()
563 .file_io(self.file_io())
564 .metadata_location(metadata_location)
565 .metadata(metadata)
566 .identifier(TableIdent::new(
567 NamespaceIdent::new(db_name),
568 table.name.clone(),
569 ))
570 .build()
571 }
572
573 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
584 let db_name = validate_namespace(table.namespace())?;
585 if !self.namespace_exists(table.namespace()).await? {
586 return Err(Error::new(
587 ErrorKind::NamespaceNotFound,
588 format!("Namespace {:?} does not exist", table.namespace()),
589 ));
590 }
591 if !self.table_exists(table).await? {
592 return Err(Error::new(
593 ErrorKind::TableNotFound,
594 format!("Table {table:?} does not exist"),
595 ));
596 }
597
598 self.client
599 .0
600 .drop_table(db_name.into(), table.name.clone().into(), false)
601 .await
602 .map_err(from_thrift_error)?;
603
604 Ok(())
605 }
606
607 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
615 let db_name = validate_namespace(table.namespace())?;
616 let table_name = table.name.clone();
617
618 let resp = self
619 .client
620 .0
621 .get_table(db_name.into(), table_name.into())
622 .await;
623
624 match resp {
625 Ok(MaybeException::Ok(_)) => Ok(true),
626 Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => Ok(false),
627 Ok(MaybeException::Exception(exception)) => Err(Error::new(
628 ErrorKind::Unexpected,
629 "Operation failed for hitting thrift error".to_string(),
630 )
631 .with_source(anyhow!("thrift error: {exception:?}"))),
632 Err(err) => Err(from_thrift_error(err)),
633 }
634 }
635
636 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
643 let src_dbname = validate_namespace(src.namespace())?;
644 let dest_dbname = validate_namespace(dest.namespace())?;
645 if self.table_exists(dest).await? {
646 return Err(Error::new(
647 ErrorKind::TableAlreadyExists,
648 format!("Destination table {dest:?} already exists"),
649 ));
650 }
651
652 let src_tbl_name = src.name.clone();
653 let dest_tbl_name = dest.name.clone();
654
655 let mut tbl = self
656 .client
657 .0
658 .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
659 .await
660 .map(from_thrift_exception)
661 .map_err(from_thrift_error)??;
662
663 tbl.db_name = Some(dest_dbname.into());
664 tbl.table_name = Some(dest_tbl_name.into());
665
666 self.client
667 .0
668 .alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
669 .await
670 .map_err(from_thrift_error)?;
671
672 Ok(())
673 }
674
675 async fn register_table(
676 &self,
677 _table_ident: &TableIdent,
678 _metadata_location: String,
679 ) -> Result<Table> {
680 Err(Error::new(
681 ErrorKind::FeatureUnsupported,
682 "Registering a table is not supported yet",
683 ))
684 }
685
686 async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
687 Err(Error::new(
688 ErrorKind::FeatureUnsupported,
689 "Updating a table is not supported yet",
690 ))
691 }
692}