iceberg_catalog_hms/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::{Debug, Formatter};
20use std::net::ToSocketAddrs;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use hive_metastore::{
26    ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
27    ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
28};
29use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
30use iceberg::spec::{TableMetadata, TableMetadataBuilder};
31use iceberg::table::Table;
32use iceberg::{
33    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34    Runtime, TableCommit, TableCreation, TableIdent,
35};
36use volo_thrift::MaybeException;
37
38use super::utils::*;
39use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};
40
41/// HMS catalog address
42pub const HMS_CATALOG_PROP_URI: &str = "uri";
43
44/// HMS Catalog thrift transport
45pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
46/// HMS Catalog framed thrift transport
47pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
48/// HMS Catalog buffered thrift transport
49pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
50
51/// HMS Catalog warehouse location
52pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54/// Builder for [`HmsCatalog`].
55#[derive(Debug)]
56pub struct HmsCatalogBuilder {
57    config: HmsCatalogConfig,
58    storage_factory: Option<Arc<dyn StorageFactory>>,
59    runtime: Option<Runtime>,
60}
61
62impl Default for HmsCatalogBuilder {
63    fn default() -> Self {
64        Self {
65            config: HmsCatalogConfig {
66                name: None,
67                address: "".to_string(),
68                thrift_transport: HmsThriftTransport::default(),
69                warehouse: "".to_string(),
70                props: HashMap::new(),
71            },
72            storage_factory: None,
73            runtime: None,
74        }
75    }
76}
77
78impl CatalogBuilder for HmsCatalogBuilder {
79    type C = HmsCatalog;
80
81    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
82        self.storage_factory = Some(storage_factory);
83        self
84    }
85
86    fn with_runtime(mut self, runtime: Runtime) -> Self {
87        self.runtime = Some(runtime);
88        self
89    }
90
91    fn load(
92        mut self,
93        name: impl Into<String>,
94        props: HashMap<String, String>,
95    ) -> impl Future<Output = Result<Self::C>> + Send {
96        self.config.name = Some(name.into());
97
98        if props.contains_key(HMS_CATALOG_PROP_URI) {
99            self.config.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
100        }
101
102        if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
103            self.config.thrift_transport = match tt.to_lowercase().as_str() {
104                THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
105                THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
106                _ => HmsThriftTransport::default(),
107            };
108        }
109
110        if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
111            self.config.warehouse = props
112                .get(HMS_CATALOG_PROP_WAREHOUSE)
113                .cloned()
114                .unwrap_or_default();
115        }
116
117        self.config.props = props
118            .into_iter()
119            .filter(|(k, _)| {
120                k != HMS_CATALOG_PROP_URI
121                    && k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
122                    && k != HMS_CATALOG_PROP_WAREHOUSE
123            })
124            .collect();
125
126        let result = (|| -> Result<HmsCatalog> {
127            if self.config.name.is_none() {
128                return Err(Error::new(
129                    ErrorKind::DataInvalid,
130                    "Catalog name is required",
131                ));
132            }
133            if self.config.address.is_empty() {
134                return Err(Error::new(
135                    ErrorKind::DataInvalid,
136                    "Catalog address is required",
137                ));
138            }
139            if self.config.warehouse.is_empty() {
140                return Err(Error::new(
141                    ErrorKind::DataInvalid,
142                    "Catalog warehouse is required",
143                ));
144            }
145            let runtime = match self.runtime {
146                Some(rt) => rt,
147                None => Runtime::try_current()?,
148            };
149            HmsCatalog::new(self.config, self.storage_factory, runtime)
150        })();
151
152        std::future::ready(result)
153    }
154}
155
156/// Which variant of the thrift transport to communicate with HMS
157/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
158#[derive(Debug, Default)]
159pub enum HmsThriftTransport {
160    /// Use the framed transport
161    Framed,
162    /// Use the buffered transport (default)
163    #[default]
164    Buffered,
165}
166
167/// Hive metastore Catalog configuration.
168#[derive(Debug)]
169pub(crate) struct HmsCatalogConfig {
170    name: Option<String>,
171    address: String,
172    thrift_transport: HmsThriftTransport,
173    warehouse: String,
174    props: HashMap<String, String>,
175}
176
177struct HmsClient(ThriftHiveMetastoreClient);
178
179/// Hive metastore Catalog.
180pub struct HmsCatalog {
181    config: HmsCatalogConfig,
182    client: HmsClient,
183    file_io: FileIO,
184    runtime: Runtime,
185}
186
187impl Debug for HmsCatalog {
188    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
189        f.debug_struct("HmsCatalog")
190            .field("config", &self.config)
191            .finish_non_exhaustive()
192    }
193}
194
195impl HmsCatalog {
196    /// Create a new hms catalog.
197    fn new(
198        config: HmsCatalogConfig,
199        storage_factory: Option<Arc<dyn StorageFactory>>,
200        runtime: Runtime,
201    ) -> Result<Self> {
202        let address = config
203            .address
204            .as_str()
205            .to_socket_addrs()
206            .map_err(from_io_error)?
207            .next()
208            .ok_or_else(|| {
209                Error::new(
210                    ErrorKind::Unexpected,
211                    format!("invalid address: {}", config.address),
212                )
213            })?;
214
215        let builder = ThriftHiveMetastoreClientBuilder::new("hms").address(address);
216
217        let client = match &config.thrift_transport {
218            HmsThriftTransport::Framed => builder
219                .make_codec(volo_thrift::codec::default::DefaultMakeCodec::framed())
220                .build(),
221            HmsThriftTransport::Buffered => builder
222                .make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
223                .build(),
224        };
225
226        let factory = storage_factory.ok_or_else(|| {
227            Error::new(
228                ErrorKind::Unexpected,
229                "StorageFactory must be provided for HmsCatalog. Use `with_storage_factory` to configure it.",
230            )
231        })?;
232        let file_io = FileIOBuilder::new(factory)
233            .with_props(&config.props)
234            .build();
235
236        Ok(Self {
237            config,
238            client: HmsClient(client),
239            file_io,
240            runtime,
241        })
242    }
243    /// Get the catalogs `FileIO`
244    pub fn file_io(&self) -> FileIO {
245        self.file_io.clone()
246    }
247}
248
249#[async_trait]
250impl Catalog for HmsCatalog {
251    /// HMS doesn't support nested namespaces.
252    ///
253    /// We will return empty list if parent is some.
254    ///
255    /// Align with java implementation: <https://github.com/apache/iceberg/blob/9bd62f79f8cd973c39d14e89163cb1c707470ed2/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java#L305C26-L330>
256    async fn list_namespaces(
257        &self,
258        parent: Option<&NamespaceIdent>,
259    ) -> Result<Vec<NamespaceIdent>> {
260        let dbs = if parent.is_some() {
261            return Ok(vec![]);
262        } else {
263            self.client
264                .0
265                .get_all_databases()
266                .await
267                .map(from_thrift_exception)
268                .map_err(from_thrift_error)??
269        };
270
271        Ok(dbs
272            .into_iter()
273            .map(|v| NamespaceIdent::new(v.into()))
274            .collect())
275    }
276
277    /// Creates a new namespace with the given identifier and properties.
278    ///
279    /// Attempts to create a namespace defined by the `namespace`
280    /// parameter and configured with the specified `properties`.
281    ///
282    /// This function can return an error in the following situations:
283    ///
284    /// - If `hive.metastore.database.owner-type` is specified without
285    /// `hive.metastore.database.owner`,
286    /// - Errors from `validate_namespace` if the namespace identifier does not
287    /// meet validation criteria.
288    /// - Errors from `convert_to_database` if the properties cannot be
289    /// successfully converted into a database configuration.
290    /// - Errors from the underlying database creation process, converted using
291    /// `from_thrift_error`.
292    async fn create_namespace(
293        &self,
294        namespace: &NamespaceIdent,
295        properties: HashMap<String, String>,
296    ) -> Result<Namespace> {
297        if self.namespace_exists(namespace).await? {
298            return Err(Error::new(
299                ErrorKind::NamespaceAlreadyExists,
300                format!("Namespace {namespace:?} already exists"),
301            ));
302        }
303        let database = convert_to_database(namespace, &properties)?;
304
305        self.client
306            .0
307            .create_database(database)
308            .await
309            .map_err(from_thrift_error)?;
310
311        Ok(Namespace::with_properties(namespace.clone(), properties))
312    }
313
314    /// Retrieves a namespace by its identifier.
315    ///
316    /// Validates the given namespace identifier and then queries the
317    /// underlying database client to fetch the corresponding namespace data.
318    /// Constructs a `Namespace` object with the retrieved data and returns it.
319    ///
320    /// This function can return an error in any of the following situations:
321    /// - If the provided namespace identifier fails validation checks
322    /// - If there is an error querying the database, returned by
323    /// `from_thrift_error`.
324    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
325        let name = validate_namespace(namespace)?;
326
327        let resp = self
328            .client
329            .0
330            .get_database(name.into())
331            .await
332            .map_err(from_thrift_error)?;
333
334        let db = match resp {
335            MaybeException::Ok(db) => db,
336            MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_)) => {
337                return Err(Error::new(
338                    ErrorKind::NamespaceNotFound,
339                    format!("Namespace {namespace:?} not found"),
340                ));
341            }
342            MaybeException::Exception(exception) => {
343                return Err(Error::new(
344                    ErrorKind::Unexpected,
345                    "Operation failed for hitting thrift error".to_string(),
346                )
347                .with_source(anyhow!("thrift error: {exception:?}")));
348            }
349        };
350
351        let ns = convert_to_namespace(&db)?;
352
353        Ok(ns)
354    }
355
356    /// Checks if a namespace exists within the Hive Metastore.
357    ///
358    /// Validates the namespace identifier by querying the Hive Metastore
359    /// to determine if the specified namespace (database) exists.
360    ///
361    /// # Returns
362    /// A `Result<bool>` indicating the outcome of the check:
363    /// - `Ok(true)` if the namespace exists.
364    /// - `Ok(false)` if the namespace does not exist, identified by a specific
365    /// `UserException` variant.
366    /// - `Err(...)` if an error occurs during validation or the Hive Metastore
367    /// query, with the error encapsulating the issue.
368    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
369        let name = validate_namespace(namespace)?;
370
371        let resp = self.client.0.get_database(name.into()).await;
372
373        match resp {
374            Ok(MaybeException::Ok(_)) => Ok(true),
375            Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) => {
376                Ok(false)
377            }
378            Ok(MaybeException::Exception(exception)) => Err(Error::new(
379                ErrorKind::Unexpected,
380                "Operation failed for hitting thrift error".to_string(),
381            )
382            .with_source(anyhow!("thrift error: {exception:?}"))),
383            Err(err) => Err(from_thrift_error(err)),
384        }
385    }
386
387    /// Asynchronously updates properties of an existing namespace.
388    ///
389    /// Converts the given namespace identifier and properties into a database
390    /// representation and then attempts to update the corresponding namespace
391    /// in the Hive Metastore.
392    ///
393    /// # Returns
394    /// Returns `Ok(())` if the namespace update is successful. If the
395    /// namespace cannot be updated due to missing information or an error
396    /// during the update process, an `Err(...)` is returned.
397    async fn update_namespace(
398        &self,
399        namespace: &NamespaceIdent,
400        properties: HashMap<String, String>,
401    ) -> Result<()> {
402        if !self.namespace_exists(namespace).await? {
403            return Err(Error::new(
404                ErrorKind::NamespaceNotFound,
405                format!("Namespace {namespace:?} does not exist"),
406            ));
407        }
408        let db = convert_to_database(namespace, &properties)?;
409
410        let name = match &db.name {
411            Some(name) => name,
412            None => {
413                return Err(Error::new(
414                    ErrorKind::DataInvalid,
415                    "Database name must be specified",
416                ));
417            }
418        };
419
420        self.client
421            .0
422            .alter_database(name.clone(), db)
423            .await
424            .map_err(from_thrift_error)?;
425
426        Ok(())
427    }
428
429    /// Asynchronously drops a namespace from the Hive Metastore.
430    ///
431    /// # Returns
432    /// A `Result<()>` indicating the outcome:
433    /// - `Ok(())` signifies successful namespace deletion.
434    /// - `Err(...)` signifies failure to drop the namespace due to validation
435    /// errors, connectivity issues, or Hive Metastore constraints.
436    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
437        let name = validate_namespace(namespace)?;
438
439        if !self.namespace_exists(namespace).await? {
440            return Err(Error::new(
441                ErrorKind::NamespaceNotFound,
442                format!("Namespace {namespace:?} does not exist"),
443            ));
444        }
445
446        self.client
447            .0
448            .drop_database(name.into(), false, false)
449            .await
450            .map_err(from_thrift_error)?;
451
452        Ok(())
453    }
454
455    /// Asynchronously lists all tables within a specified namespace.
456    ///
457    /// # Returns
458    ///
459    /// A `Result<Vec<TableIdent>>`, which is:
460    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
461    /// representing a table within the specified namespace.
462    /// - `Err(...)` if an error occurs during namespace validation or while
463    /// querying the database.
464    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
465        let name = validate_namespace(namespace)?;
466        if !self.namespace_exists(namespace).await? {
467            return Err(Error::new(
468                ErrorKind::NamespaceNotFound,
469                format!("Namespace {namespace:?} does not exist"),
470            ));
471        }
472
473        let tables = self
474            .client
475            .0
476            .get_all_tables(name.into())
477            .await
478            .map(from_thrift_exception)
479            .map_err(from_thrift_error)??;
480
481        let tables = tables
482            .iter()
483            .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
484            .collect();
485
486        Ok(tables)
487    }
488
489    /// Creates a new table within a specified namespace using the provided
490    /// table creation settings.
491    ///
492    /// # Returns
493    /// A `Result` wrapping a `Table` object representing the newly created
494    /// table.
495    ///
496    /// # Errors
497    /// This function may return an error in several cases, including invalid
498    /// namespace identifiers, failure to determine a default storage location,
499    /// issues generating or writing table metadata, and errors communicating
500    /// with the Hive Metastore.
501    async fn create_table(
502        &self,
503        namespace: &NamespaceIdent,
504        mut creation: TableCreation,
505    ) -> Result<Table> {
506        let db_name = validate_namespace(namespace)?;
507        let table_name = creation.name.clone();
508
509        let location = match &creation.location {
510            Some(location) => location.clone(),
511            None => {
512                let ns = self.get_namespace(namespace).await?;
513                let location = get_default_table_location(&ns, &table_name, &self.config.warehouse);
514                creation.location = Some(location.clone());
515                location
516            }
517        };
518        let metadata = TableMetadataBuilder::from_table_creation(creation)?
519            .build()?
520            .metadata;
521
522        let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
523
524        metadata.write_to(&self.file_io, &metadata_location).await?;
525
526        let metadata_location_str = metadata_location.to_string();
527        let hive_table = convert_to_hive_table(
528            db_name.clone(),
529            metadata.current_schema(),
530            table_name.clone(),
531            location,
532            metadata_location_str.clone(),
533            metadata.properties(),
534        )?;
535
536        self.client
537            .0
538            .create_table(hive_table)
539            .await
540            .map_err(from_thrift_error)?;
541
542        Table::builder()
543            .file_io(self.file_io())
544            .metadata_location(metadata_location_str)
545            .metadata(metadata)
546            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
547            .runtime(self.runtime.clone())
548            .build()
549    }
550
551    /// Loads a table from the Hive Metastore and constructs a `Table` object
552    /// based on its metadata.
553    ///
554    /// # Returns
555    /// A `Result` wrapping a `Table` object that represents the loaded table.
556    ///
557    /// # Errors
558    /// This function may return an error in several scenarios, including:
559    /// - Failure to validate the namespace.
560    /// - Failure to retrieve the table from the Hive Metastore.
561    /// - Absence of metadata location information in the table's properties.
562    /// - Issues reading or deserializing the table's metadata file.
563    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
564        let db_name = validate_namespace(table.namespace())?;
565
566        let hive_table = self
567            .client
568            .0
569            .get_table(db_name.clone().into(), table.name.clone().into())
570            .await
571            .map(from_thrift_exception)
572            .map_err(from_thrift_error)??;
573
574        let metadata_location = get_metadata_location(&hive_table.parameters)?;
575
576        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
577
578        Table::builder()
579            .file_io(self.file_io())
580            .metadata_location(metadata_location)
581            .metadata(metadata)
582            .identifier(TableIdent::new(
583                NamespaceIdent::new(db_name),
584                table.name.clone(),
585            ))
586            .runtime(self.runtime.clone())
587            .build()
588    }
589
590    /// Asynchronously drops a table from the database.
591    ///
592    /// # Errors
593    /// Returns an error if:
594    /// - The namespace provided in `table` cannot be validated
595    /// or does not exist.
596    /// - The underlying database client encounters an error while
597    /// attempting to drop the table. This includes scenarios where
598    /// the table does not exist.
599    /// - Any network or communication error occurs with the database backend.
600    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
601        let db_name = validate_namespace(table.namespace())?;
602        if !self.namespace_exists(table.namespace()).await? {
603            return Err(Error::new(
604                ErrorKind::NamespaceNotFound,
605                format!("Namespace {:?} does not exist", table.namespace()),
606            ));
607        }
608        if !self.table_exists(table).await? {
609            return Err(Error::new(
610                ErrorKind::TableNotFound,
611                format!("Table {table:?} does not exist"),
612            ));
613        }
614
615        self.client
616            .0
617            .drop_table(db_name.into(), table.name.clone().into(), false)
618            .await
619            .map_err(from_thrift_error)?;
620
621        Ok(())
622    }
623
624    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
625        let table_info = self.load_table(table).await?;
626        self.drop_table(table).await?;
627        iceberg::drop_table_data(
628            table_info.file_io(),
629            table_info.metadata(),
630            table_info.metadata_location(),
631        )
632        .await
633    }
634
635    /// Asynchronously checks the existence of a specified table
636    /// in the database.
637    ///
638    /// # Returns
639    /// - `Ok(true)` if the table exists in the database.
640    /// - `Ok(false)` if the table does not exist in the database.
641    /// - `Err(...)` if an error occurs during the process
642    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
643        let db_name = validate_namespace(table.namespace())?;
644        let table_name = table.name.clone();
645
646        let resp = self
647            .client
648            .0
649            .get_table(db_name.into(), table_name.into())
650            .await;
651
652        match resp {
653            Ok(MaybeException::Ok(_)) => Ok(true),
654            Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => Ok(false),
655            Ok(MaybeException::Exception(exception)) => Err(Error::new(
656                ErrorKind::Unexpected,
657                "Operation failed for hitting thrift error".to_string(),
658            )
659            .with_source(anyhow!("thrift error: {exception:?}"))),
660            Err(err) => Err(from_thrift_error(err)),
661        }
662    }
663
664    /// Asynchronously renames a table within the database
665    /// or moves it between namespaces (databases).
666    ///
667    /// # Returns
668    /// - `Ok(())` on successful rename or move of the table.
669    /// - `Err(...)` if an error occurs during the process.
670    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
671        let src_dbname = validate_namespace(src.namespace())?;
672        let dest_dbname = validate_namespace(dest.namespace())?;
673        if self.table_exists(dest).await? {
674            return Err(Error::new(
675                ErrorKind::TableAlreadyExists,
676                format!("Destination table {dest:?} already exists"),
677            ));
678        }
679
680        let src_tbl_name = src.name.clone();
681        let dest_tbl_name = dest.name.clone();
682
683        let mut tbl = self
684            .client
685            .0
686            .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
687            .await
688            .map(from_thrift_exception)
689            .map_err(from_thrift_error)??;
690
691        tbl.db_name = Some(dest_dbname.into());
692        tbl.table_name = Some(dest_tbl_name.into());
693
694        self.client
695            .0
696            .alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
697            .await
698            .map_err(from_thrift_error)?;
699
700        Ok(())
701    }
702
703    async fn register_table(
704        &self,
705        _table_ident: &TableIdent,
706        _metadata_location: String,
707    ) -> Result<Table> {
708        Err(Error::new(
709            ErrorKind::FeatureUnsupported,
710            "Registering a table is not supported yet",
711        ))
712    }
713
714    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
715        Err(Error::new(
716            ErrorKind::FeatureUnsupported,
717            "Updating a table is not supported yet",
718        ))
719    }
720}