1use std::collections::HashMap;
19use std::fmt::{Debug, Formatter};
20use std::net::ToSocketAddrs;
21
22use anyhow::anyhow;
23use async_trait::async_trait;
24use hive_metastore::{
25 ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
26 ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
27};
28use iceberg::io::FileIO;
29use iceberg::spec::{TableMetadata, TableMetadataBuilder};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33 TableCommit, TableCreation, TableIdent,
34};
35use volo_thrift::MaybeException;
36
37use super::utils::*;
38use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};
39
40pub const HMS_CATALOG_PROP_URI: &str = "uri";
42
43pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
45pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
47pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
49
50pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
52
53#[derive(Debug)]
55pub struct HmsCatalogBuilder(HmsCatalogConfig);
56
57impl Default for HmsCatalogBuilder {
58 fn default() -> Self {
59 Self(HmsCatalogConfig {
60 name: None,
61 address: "".to_string(),
62 thrift_transport: HmsThriftTransport::default(),
63 warehouse: "".to_string(),
64 props: HashMap::new(),
65 })
66 }
67}
68
69impl CatalogBuilder for HmsCatalogBuilder {
70 type C = HmsCatalog;
71
72 fn load(
73 mut self,
74 name: impl Into<String>,
75 props: HashMap<String, String>,
76 ) -> impl Future<Output = Result<Self::C>> + Send {
77 self.0.name = Some(name.into());
78
79 if props.contains_key(HMS_CATALOG_PROP_URI) {
80 self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
81 }
82
83 if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
84 self.0.thrift_transport = match tt.to_lowercase().as_str() {
85 THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
86 THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
87 _ => HmsThriftTransport::default(),
88 };
89 }
90
91 if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
92 self.0.warehouse = props
93 .get(HMS_CATALOG_PROP_WAREHOUSE)
94 .cloned()
95 .unwrap_or_default();
96 }
97
98 self.0.props = props
99 .into_iter()
100 .filter(|(k, _)| {
101 k != HMS_CATALOG_PROP_URI
102 && k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
103 && k != HMS_CATALOG_PROP_WAREHOUSE
104 })
105 .collect();
106
107 let result = {
108 if self.0.name.is_none() {
109 Err(Error::new(
110 ErrorKind::DataInvalid,
111 "Catalog name is required",
112 ))
113 } else if self.0.address.is_empty() {
114 Err(Error::new(
115 ErrorKind::DataInvalid,
116 "Catalog address is required",
117 ))
118 } else if self.0.warehouse.is_empty() {
119 Err(Error::new(
120 ErrorKind::DataInvalid,
121 "Catalog warehouse is required",
122 ))
123 } else {
124 HmsCatalog::new(self.0)
125 }
126 };
127
128 std::future::ready(result)
129 }
130}
131
132#[derive(Debug, Default)]
135pub enum HmsThriftTransport {
136 Framed,
138 #[default]
140 Buffered,
141}
142
143#[derive(Debug)]
145pub(crate) struct HmsCatalogConfig {
146 name: Option<String>,
147 address: String,
148 thrift_transport: HmsThriftTransport,
149 warehouse: String,
150 props: HashMap<String, String>,
151}
152
153struct HmsClient(ThriftHiveMetastoreClient);
154
155pub struct HmsCatalog {
157 config: HmsCatalogConfig,
158 client: HmsClient,
159 file_io: FileIO,
160}
161
162impl Debug for HmsCatalog {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("HmsCatalog")
165 .field("config", &self.config)
166 .finish_non_exhaustive()
167 }
168}
169
170impl HmsCatalog {
171 fn new(config: HmsCatalogConfig) -> Result<Self> {
173 let address = config
174 .address
175 .as_str()
176 .to_socket_addrs()
177 .map_err(from_io_error)?
178 .next()
179 .ok_or_else(|| {
180 Error::new(
181 ErrorKind::Unexpected,
182 format!("invalid address: {}", config.address),
183 )
184 })?;
185
186 let builder = ThriftHiveMetastoreClientBuilder::new("hms").address(address);
187
188 let client = match &config.thrift_transport {
189 HmsThriftTransport::Framed => builder
190 .make_codec(volo_thrift::codec::default::DefaultMakeCodec::framed())
191 .build(),
192 HmsThriftTransport::Buffered => builder
193 .make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
194 .build(),
195 };
196
197 let file_io = FileIO::from_path(&config.warehouse)?
198 .with_props(&config.props)
199 .build()?;
200
201 Ok(Self {
202 config,
203 client: HmsClient(client),
204 file_io,
205 })
206 }
207 pub fn file_io(&self) -> FileIO {
209 self.file_io.clone()
210 }
211}
212
213#[async_trait]
214impl Catalog for HmsCatalog {
215 async fn list_namespaces(
221 &self,
222 parent: Option<&NamespaceIdent>,
223 ) -> Result<Vec<NamespaceIdent>> {
224 let dbs = if parent.is_some() {
225 return Ok(vec![]);
226 } else {
227 self.client
228 .0
229 .get_all_databases()
230 .await
231 .map(from_thrift_exception)
232 .map_err(from_thrift_error)??
233 };
234
235 Ok(dbs
236 .into_iter()
237 .map(|v| NamespaceIdent::new(v.into()))
238 .collect())
239 }
240
241 async fn create_namespace(
257 &self,
258 namespace: &NamespaceIdent,
259 properties: HashMap<String, String>,
260 ) -> Result<Namespace> {
261 let database = convert_to_database(namespace, &properties)?;
262
263 self.client
264 .0
265 .create_database(database)
266 .await
267 .map_err(from_thrift_error)?;
268
269 Ok(Namespace::with_properties(namespace.clone(), properties))
270 }
271
272 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
283 let name = validate_namespace(namespace)?;
284
285 let db = self
286 .client
287 .0
288 .get_database(name.into())
289 .await
290 .map(from_thrift_exception)
291 .map_err(from_thrift_error)??;
292
293 let ns = convert_to_namespace(&db)?;
294
295 Ok(ns)
296 }
297
298 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
311 let name = validate_namespace(namespace)?;
312
313 let resp = self.client.0.get_database(name.into()).await;
314
315 match resp {
316 Ok(MaybeException::Ok(_)) => Ok(true),
317 Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) => {
318 Ok(false)
319 }
320 Ok(MaybeException::Exception(exception)) => Err(Error::new(
321 ErrorKind::Unexpected,
322 "Operation failed for hitting thrift error".to_string(),
323 )
324 .with_source(anyhow!("thrift error: {exception:?}"))),
325 Err(err) => Err(from_thrift_error(err)),
326 }
327 }
328
329 async fn update_namespace(
340 &self,
341 namespace: &NamespaceIdent,
342 properties: HashMap<String, String>,
343 ) -> Result<()> {
344 let db = convert_to_database(namespace, &properties)?;
345
346 let name = match &db.name {
347 Some(name) => name,
348 None => {
349 return Err(Error::new(
350 ErrorKind::DataInvalid,
351 "Database name must be specified",
352 ));
353 }
354 };
355
356 self.client
357 .0
358 .alter_database(name.clone(), db)
359 .await
360 .map_err(from_thrift_error)?;
361
362 Ok(())
363 }
364
365 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
373 let name = validate_namespace(namespace)?;
374
375 self.client
376 .0
377 .drop_database(name.into(), false, false)
378 .await
379 .map_err(from_thrift_error)?;
380
381 Ok(())
382 }
383
384 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
394 let name = validate_namespace(namespace)?;
395
396 let tables = self
397 .client
398 .0
399 .get_all_tables(name.into())
400 .await
401 .map(from_thrift_exception)
402 .map_err(from_thrift_error)??;
403
404 let tables = tables
405 .iter()
406 .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
407 .collect();
408
409 Ok(tables)
410 }
411
412 async fn create_table(
425 &self,
426 namespace: &NamespaceIdent,
427 mut creation: TableCreation,
428 ) -> Result<Table> {
429 let db_name = validate_namespace(namespace)?;
430 let table_name = creation.name.clone();
431
432 let location = match &creation.location {
433 Some(location) => location.clone(),
434 None => {
435 let ns = self.get_namespace(namespace).await?;
436 let location = get_default_table_location(&ns, &table_name, &self.config.warehouse);
437 creation.location = Some(location.clone());
438 location
439 }
440 };
441 let metadata = TableMetadataBuilder::from_table_creation(creation)?
442 .build()?
443 .metadata;
444
445 let metadata_location =
446 MetadataLocation::new_with_table_location(location.clone()).to_string();
447
448 metadata.write_to(&self.file_io, &metadata_location).await?;
449
450 let hive_table = convert_to_hive_table(
451 db_name.clone(),
452 metadata.current_schema(),
453 table_name.clone(),
454 location,
455 metadata_location.clone(),
456 metadata.properties(),
457 )?;
458
459 self.client
460 .0
461 .create_table(hive_table)
462 .await
463 .map_err(from_thrift_error)?;
464
465 Table::builder()
466 .file_io(self.file_io())
467 .metadata_location(metadata_location)
468 .metadata(metadata)
469 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
470 .build()
471 }
472
473 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
486 let db_name = validate_namespace(table.namespace())?;
487
488 let hive_table = self
489 .client
490 .0
491 .get_table(db_name.clone().into(), table.name.clone().into())
492 .await
493 .map(from_thrift_exception)
494 .map_err(from_thrift_error)??;
495
496 let metadata_location = get_metadata_location(&hive_table.parameters)?;
497
498 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
499
500 Table::builder()
501 .file_io(self.file_io())
502 .metadata_location(metadata_location)
503 .metadata(metadata)
504 .identifier(TableIdent::new(
505 NamespaceIdent::new(db_name),
506 table.name.clone(),
507 ))
508 .build()
509 }
510
511 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
522 let db_name = validate_namespace(table.namespace())?;
523
524 self.client
525 .0
526 .drop_table(db_name.into(), table.name.clone().into(), false)
527 .await
528 .map_err(from_thrift_error)?;
529
530 Ok(())
531 }
532
533 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
541 let db_name = validate_namespace(table.namespace())?;
542 let table_name = table.name.clone();
543
544 let resp = self
545 .client
546 .0
547 .get_table(db_name.into(), table_name.into())
548 .await;
549
550 match resp {
551 Ok(MaybeException::Ok(_)) => Ok(true),
552 Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => Ok(false),
553 Ok(MaybeException::Exception(exception)) => Err(Error::new(
554 ErrorKind::Unexpected,
555 "Operation failed for hitting thrift error".to_string(),
556 )
557 .with_source(anyhow!("thrift error: {exception:?}"))),
558 Err(err) => Err(from_thrift_error(err)),
559 }
560 }
561
562 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
569 let src_dbname = validate_namespace(src.namespace())?;
570 let dest_dbname = validate_namespace(dest.namespace())?;
571
572 let src_tbl_name = src.name.clone();
573 let dest_tbl_name = dest.name.clone();
574
575 let mut tbl = self
576 .client
577 .0
578 .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
579 .await
580 .map(from_thrift_exception)
581 .map_err(from_thrift_error)??;
582
583 tbl.db_name = Some(dest_dbname.into());
584 tbl.table_name = Some(dest_tbl_name.into());
585
586 self.client
587 .0
588 .alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
589 .await
590 .map_err(from_thrift_error)?;
591
592 Ok(())
593 }
594
595 async fn register_table(
596 &self,
597 _table_ident: &TableIdent,
598 _metadata_location: String,
599 ) -> Result<Table> {
600 Err(Error::new(
601 ErrorKind::FeatureUnsupported,
602 "Registering a table is not supported yet",
603 ))
604 }
605
606 async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
607 Err(Error::new(
608 ErrorKind::FeatureUnsupported,
609 "Updating a table is not supported yet",
610 ))
611 }
612}