iceberg_catalog_hms/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::{Debug, Formatter};
20use std::net::ToSocketAddrs;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use hive_metastore::{
26    ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
27    ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
28};
29use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
30use iceberg::spec::{TableMetadata, TableMetadataBuilder};
31use iceberg::table::Table;
32use iceberg::{
33    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34    TableCommit, TableCreation, TableIdent,
35};
36use volo_thrift::MaybeException;
37
38use super::utils::*;
39use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};
40
41/// HMS catalog address
42pub const HMS_CATALOG_PROP_URI: &str = "uri";
43
44/// HMS Catalog thrift transport
45pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
46/// HMS Catalog framed thrift transport
47pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
48/// HMS Catalog buffered thrift transport
49pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
50
51/// HMS Catalog warehouse location
52pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54/// Builder for [`HmsCatalog`].
55#[derive(Debug)]
56pub struct HmsCatalogBuilder {
57    config: HmsCatalogConfig,
58    storage_factory: Option<Arc<dyn StorageFactory>>,
59}
60
61impl Default for HmsCatalogBuilder {
62    fn default() -> Self {
63        Self {
64            config: HmsCatalogConfig {
65                name: None,
66                address: "".to_string(),
67                thrift_transport: HmsThriftTransport::default(),
68                warehouse: "".to_string(),
69                props: HashMap::new(),
70            },
71            storage_factory: None,
72        }
73    }
74}
75
76impl CatalogBuilder for HmsCatalogBuilder {
77    type C = HmsCatalog;
78
79    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
80        self.storage_factory = Some(storage_factory);
81        self
82    }
83
84    fn load(
85        mut self,
86        name: impl Into<String>,
87        props: HashMap<String, String>,
88    ) -> impl Future<Output = Result<Self::C>> + Send {
89        self.config.name = Some(name.into());
90
91        if props.contains_key(HMS_CATALOG_PROP_URI) {
92            self.config.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
93        }
94
95        if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
96            self.config.thrift_transport = match tt.to_lowercase().as_str() {
97                THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
98                THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
99                _ => HmsThriftTransport::default(),
100            };
101        }
102
103        if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
104            self.config.warehouse = props
105                .get(HMS_CATALOG_PROP_WAREHOUSE)
106                .cloned()
107                .unwrap_or_default();
108        }
109
110        self.config.props = props
111            .into_iter()
112            .filter(|(k, _)| {
113                k != HMS_CATALOG_PROP_URI
114                    && k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
115                    && k != HMS_CATALOG_PROP_WAREHOUSE
116            })
117            .collect();
118
119        let result = {
120            if self.config.name.is_none() {
121                Err(Error::new(
122                    ErrorKind::DataInvalid,
123                    "Catalog name is required",
124                ))
125            } else if self.config.address.is_empty() {
126                Err(Error::new(
127                    ErrorKind::DataInvalid,
128                    "Catalog address is required",
129                ))
130            } else if self.config.warehouse.is_empty() {
131                Err(Error::new(
132                    ErrorKind::DataInvalid,
133                    "Catalog warehouse is required",
134                ))
135            } else {
136                HmsCatalog::new(self.config, self.storage_factory)
137            }
138        };
139
140        std::future::ready(result)
141    }
142}
143
144/// Which variant of the thrift transport to communicate with HMS
145/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
146#[derive(Debug, Default)]
147pub enum HmsThriftTransport {
148    /// Use the framed transport
149    Framed,
150    /// Use the buffered transport (default)
151    #[default]
152    Buffered,
153}
154
155/// Hive metastore Catalog configuration.
156#[derive(Debug)]
157pub(crate) struct HmsCatalogConfig {
158    name: Option<String>,
159    address: String,
160    thrift_transport: HmsThriftTransport,
161    warehouse: String,
162    props: HashMap<String, String>,
163}
164
165struct HmsClient(ThriftHiveMetastoreClient);
166
167/// Hive metastore Catalog.
168pub struct HmsCatalog {
169    config: HmsCatalogConfig,
170    client: HmsClient,
171    file_io: FileIO,
172}
173
174impl Debug for HmsCatalog {
175    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("HmsCatalog")
177            .field("config", &self.config)
178            .finish_non_exhaustive()
179    }
180}
181
182impl HmsCatalog {
183    /// Create a new hms catalog.
184    fn new(
185        config: HmsCatalogConfig,
186        storage_factory: Option<Arc<dyn StorageFactory>>,
187    ) -> Result<Self> {
188        let address = config
189            .address
190            .as_str()
191            .to_socket_addrs()
192            .map_err(from_io_error)?
193            .next()
194            .ok_or_else(|| {
195                Error::new(
196                    ErrorKind::Unexpected,
197                    format!("invalid address: {}", config.address),
198                )
199            })?;
200
201        let builder = ThriftHiveMetastoreClientBuilder::new("hms").address(address);
202
203        let client = match &config.thrift_transport {
204            HmsThriftTransport::Framed => builder
205                .make_codec(volo_thrift::codec::default::DefaultMakeCodec::framed())
206                .build(),
207            HmsThriftTransport::Buffered => builder
208                .make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
209                .build(),
210        };
211
212        let factory = storage_factory.ok_or_else(|| {
213            Error::new(
214                ErrorKind::Unexpected,
215                "StorageFactory must be provided for HmsCatalog. Use `with_storage_factory` to configure it.",
216            )
217        })?;
218        let file_io = FileIOBuilder::new(factory)
219            .with_props(&config.props)
220            .build();
221
222        Ok(Self {
223            config,
224            client: HmsClient(client),
225            file_io,
226        })
227    }
228    /// Get the catalogs `FileIO`
229    pub fn file_io(&self) -> FileIO {
230        self.file_io.clone()
231    }
232}
233
234#[async_trait]
235impl Catalog for HmsCatalog {
236    /// HMS doesn't support nested namespaces.
237    ///
238    /// We will return empty list if parent is some.
239    ///
240    /// Align with java implementation: <https://github.com/apache/iceberg/blob/9bd62f79f8cd973c39d14e89163cb1c707470ed2/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java#L305C26-L330>
241    async fn list_namespaces(
242        &self,
243        parent: Option<&NamespaceIdent>,
244    ) -> Result<Vec<NamespaceIdent>> {
245        let dbs = if parent.is_some() {
246            return Ok(vec![]);
247        } else {
248            self.client
249                .0
250                .get_all_databases()
251                .await
252                .map(from_thrift_exception)
253                .map_err(from_thrift_error)??
254        };
255
256        Ok(dbs
257            .into_iter()
258            .map(|v| NamespaceIdent::new(v.into()))
259            .collect())
260    }
261
262    /// Creates a new namespace with the given identifier and properties.
263    ///
264    /// Attempts to create a namespace defined by the `namespace`
265    /// parameter and configured with the specified `properties`.
266    ///
267    /// This function can return an error in the following situations:
268    ///
269    /// - If `hive.metastore.database.owner-type` is specified without
270    /// `hive.metastore.database.owner`,
271    /// - Errors from `validate_namespace` if the namespace identifier does not
272    /// meet validation criteria.
273    /// - Errors from `convert_to_database` if the properties cannot be
274    /// successfully converted into a database configuration.
275    /// - Errors from the underlying database creation process, converted using
276    /// `from_thrift_error`.
277    async fn create_namespace(
278        &self,
279        namespace: &NamespaceIdent,
280        properties: HashMap<String, String>,
281    ) -> Result<Namespace> {
282        if self.namespace_exists(namespace).await? {
283            return Err(Error::new(
284                ErrorKind::NamespaceAlreadyExists,
285                format!("Namespace {namespace:?} already exists"),
286            ));
287        }
288        let database = convert_to_database(namespace, &properties)?;
289
290        self.client
291            .0
292            .create_database(database)
293            .await
294            .map_err(from_thrift_error)?;
295
296        Ok(Namespace::with_properties(namespace.clone(), properties))
297    }
298
299    /// Retrieves a namespace by its identifier.
300    ///
301    /// Validates the given namespace identifier and then queries the
302    /// underlying database client to fetch the corresponding namespace data.
303    /// Constructs a `Namespace` object with the retrieved data and returns it.
304    ///
305    /// This function can return an error in any of the following situations:
306    /// - If the provided namespace identifier fails validation checks
307    /// - If there is an error querying the database, returned by
308    /// `from_thrift_error`.
309    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
310        let name = validate_namespace(namespace)?;
311
312        let resp = self
313            .client
314            .0
315            .get_database(name.into())
316            .await
317            .map_err(from_thrift_error)?;
318
319        let db = match resp {
320            MaybeException::Ok(db) => db,
321            MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_)) => {
322                return Err(Error::new(
323                    ErrorKind::NamespaceNotFound,
324                    format!("Namespace {namespace:?} not found"),
325                ));
326            }
327            MaybeException::Exception(exception) => {
328                return Err(Error::new(
329                    ErrorKind::Unexpected,
330                    "Operation failed for hitting thrift error".to_string(),
331                )
332                .with_source(anyhow!("thrift error: {exception:?}")));
333            }
334        };
335
336        let ns = convert_to_namespace(&db)?;
337
338        Ok(ns)
339    }
340
341    /// Checks if a namespace exists within the Hive Metastore.
342    ///
343    /// Validates the namespace identifier by querying the Hive Metastore
344    /// to determine if the specified namespace (database) exists.
345    ///
346    /// # Returns
347    /// A `Result<bool>` indicating the outcome of the check:
348    /// - `Ok(true)` if the namespace exists.
349    /// - `Ok(false)` if the namespace does not exist, identified by a specific
350    /// `UserException` variant.
351    /// - `Err(...)` if an error occurs during validation or the Hive Metastore
352    /// query, with the error encapsulating the issue.
353    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
354        let name = validate_namespace(namespace)?;
355
356        let resp = self.client.0.get_database(name.into()).await;
357
358        match resp {
359            Ok(MaybeException::Ok(_)) => Ok(true),
360            Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) => {
361                Ok(false)
362            }
363            Ok(MaybeException::Exception(exception)) => Err(Error::new(
364                ErrorKind::Unexpected,
365                "Operation failed for hitting thrift error".to_string(),
366            )
367            .with_source(anyhow!("thrift error: {exception:?}"))),
368            Err(err) => Err(from_thrift_error(err)),
369        }
370    }
371
372    /// Asynchronously updates properties of an existing namespace.
373    ///
374    /// Converts the given namespace identifier and properties into a database
375    /// representation and then attempts to update the corresponding namespace
376    /// in the Hive Metastore.
377    ///
378    /// # Returns
379    /// Returns `Ok(())` if the namespace update is successful. If the
380    /// namespace cannot be updated due to missing information or an error
381    /// during the update process, an `Err(...)` is returned.
382    async fn update_namespace(
383        &self,
384        namespace: &NamespaceIdent,
385        properties: HashMap<String, String>,
386    ) -> Result<()> {
387        if !self.namespace_exists(namespace).await? {
388            return Err(Error::new(
389                ErrorKind::NamespaceNotFound,
390                format!("Namespace {namespace:?} does not exist"),
391            ));
392        }
393        let db = convert_to_database(namespace, &properties)?;
394
395        let name = match &db.name {
396            Some(name) => name,
397            None => {
398                return Err(Error::new(
399                    ErrorKind::DataInvalid,
400                    "Database name must be specified",
401                ));
402            }
403        };
404
405        self.client
406            .0
407            .alter_database(name.clone(), db)
408            .await
409            .map_err(from_thrift_error)?;
410
411        Ok(())
412    }
413
414    /// Asynchronously drops a namespace from the Hive Metastore.
415    ///
416    /// # Returns
417    /// A `Result<()>` indicating the outcome:
418    /// - `Ok(())` signifies successful namespace deletion.
419    /// - `Err(...)` signifies failure to drop the namespace due to validation
420    /// errors, connectivity issues, or Hive Metastore constraints.
421    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
422        let name = validate_namespace(namespace)?;
423
424        if !self.namespace_exists(namespace).await? {
425            return Err(Error::new(
426                ErrorKind::NamespaceNotFound,
427                format!("Namespace {namespace:?} does not exist"),
428            ));
429        }
430
431        self.client
432            .0
433            .drop_database(name.into(), false, false)
434            .await
435            .map_err(from_thrift_error)?;
436
437        Ok(())
438    }
439
440    /// Asynchronously lists all tables within a specified namespace.
441    ///
442    /// # Returns
443    ///
444    /// A `Result<Vec<TableIdent>>`, which is:
445    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
446    /// representing a table within the specified namespace.
447    /// - `Err(...)` if an error occurs during namespace validation or while
448    /// querying the database.
449    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
450        let name = validate_namespace(namespace)?;
451        if !self.namespace_exists(namespace).await? {
452            return Err(Error::new(
453                ErrorKind::NamespaceNotFound,
454                format!("Namespace {namespace:?} does not exist"),
455            ));
456        }
457
458        let tables = self
459            .client
460            .0
461            .get_all_tables(name.into())
462            .await
463            .map(from_thrift_exception)
464            .map_err(from_thrift_error)??;
465
466        let tables = tables
467            .iter()
468            .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
469            .collect();
470
471        Ok(tables)
472    }
473
474    /// Creates a new table within a specified namespace using the provided
475    /// table creation settings.
476    ///
477    /// # Returns
478    /// A `Result` wrapping a `Table` object representing the newly created
479    /// table.
480    ///
481    /// # Errors
482    /// This function may return an error in several cases, including invalid
483    /// namespace identifiers, failure to determine a default storage location,
484    /// issues generating or writing table metadata, and errors communicating
485    /// with the Hive Metastore.
486    async fn create_table(
487        &self,
488        namespace: &NamespaceIdent,
489        mut creation: TableCreation,
490    ) -> Result<Table> {
491        let db_name = validate_namespace(namespace)?;
492        let table_name = creation.name.clone();
493
494        let location = match &creation.location {
495            Some(location) => location.clone(),
496            None => {
497                let ns = self.get_namespace(namespace).await?;
498                let location = get_default_table_location(&ns, &table_name, &self.config.warehouse);
499                creation.location = Some(location.clone());
500                location
501            }
502        };
503        let metadata = TableMetadataBuilder::from_table_creation(creation)?
504            .build()?
505            .metadata;
506
507        let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
508
509        metadata.write_to(&self.file_io, &metadata_location).await?;
510
511        let metadata_location_str = metadata_location.to_string();
512        let hive_table = convert_to_hive_table(
513            db_name.clone(),
514            metadata.current_schema(),
515            table_name.clone(),
516            location,
517            metadata_location_str.clone(),
518            metadata.properties(),
519        )?;
520
521        self.client
522            .0
523            .create_table(hive_table)
524            .await
525            .map_err(from_thrift_error)?;
526
527        Table::builder()
528            .file_io(self.file_io())
529            .metadata_location(metadata_location_str)
530            .metadata(metadata)
531            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
532            .build()
533    }
534
535    /// Loads a table from the Hive Metastore and constructs a `Table` object
536    /// based on its metadata.
537    ///
538    /// # Returns
539    /// A `Result` wrapping a `Table` object that represents the loaded table.
540    ///
541    /// # Errors
542    /// This function may return an error in several scenarios, including:
543    /// - Failure to validate the namespace.
544    /// - Failure to retrieve the table from the Hive Metastore.
545    /// - Absence of metadata location information in the table's properties.
546    /// - Issues reading or deserializing the table's metadata file.
547    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
548        let db_name = validate_namespace(table.namespace())?;
549
550        let hive_table = self
551            .client
552            .0
553            .get_table(db_name.clone().into(), table.name.clone().into())
554            .await
555            .map(from_thrift_exception)
556            .map_err(from_thrift_error)??;
557
558        let metadata_location = get_metadata_location(&hive_table.parameters)?;
559
560        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
561
562        Table::builder()
563            .file_io(self.file_io())
564            .metadata_location(metadata_location)
565            .metadata(metadata)
566            .identifier(TableIdent::new(
567                NamespaceIdent::new(db_name),
568                table.name.clone(),
569            ))
570            .build()
571    }
572
573    /// Asynchronously drops a table from the database.
574    ///
575    /// # Errors
576    /// Returns an error if:
577    /// - The namespace provided in `table` cannot be validated
578    /// or does not exist.
579    /// - The underlying database client encounters an error while
580    /// attempting to drop the table. This includes scenarios where
581    /// the table does not exist.
582    /// - Any network or communication error occurs with the database backend.
583    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
584        let db_name = validate_namespace(table.namespace())?;
585        if !self.namespace_exists(table.namespace()).await? {
586            return Err(Error::new(
587                ErrorKind::NamespaceNotFound,
588                format!("Namespace {:?} does not exist", table.namespace()),
589            ));
590        }
591        if !self.table_exists(table).await? {
592            return Err(Error::new(
593                ErrorKind::TableNotFound,
594                format!("Table {table:?} does not exist"),
595            ));
596        }
597
598        self.client
599            .0
600            .drop_table(db_name.into(), table.name.clone().into(), false)
601            .await
602            .map_err(from_thrift_error)?;
603
604        Ok(())
605    }
606
607    /// Asynchronously checks the existence of a specified table
608    /// in the database.
609    ///
610    /// # Returns
611    /// - `Ok(true)` if the table exists in the database.
612    /// - `Ok(false)` if the table does not exist in the database.
613    /// - `Err(...)` if an error occurs during the process
614    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
615        let db_name = validate_namespace(table.namespace())?;
616        let table_name = table.name.clone();
617
618        let resp = self
619            .client
620            .0
621            .get_table(db_name.into(), table_name.into())
622            .await;
623
624        match resp {
625            Ok(MaybeException::Ok(_)) => Ok(true),
626            Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => Ok(false),
627            Ok(MaybeException::Exception(exception)) => Err(Error::new(
628                ErrorKind::Unexpected,
629                "Operation failed for hitting thrift error".to_string(),
630            )
631            .with_source(anyhow!("thrift error: {exception:?}"))),
632            Err(err) => Err(from_thrift_error(err)),
633        }
634    }
635
636    /// Asynchronously renames a table within the database
637    /// or moves it between namespaces (databases).
638    ///
639    /// # Returns
640    /// - `Ok(())` on successful rename or move of the table.
641    /// - `Err(...)` if an error occurs during the process.
642    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
643        let src_dbname = validate_namespace(src.namespace())?;
644        let dest_dbname = validate_namespace(dest.namespace())?;
645        if self.table_exists(dest).await? {
646            return Err(Error::new(
647                ErrorKind::TableAlreadyExists,
648                format!("Destination table {dest:?} already exists"),
649            ));
650        }
651
652        let src_tbl_name = src.name.clone();
653        let dest_tbl_name = dest.name.clone();
654
655        let mut tbl = self
656            .client
657            .0
658            .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
659            .await
660            .map(from_thrift_exception)
661            .map_err(from_thrift_error)??;
662
663        tbl.db_name = Some(dest_dbname.into());
664        tbl.table_name = Some(dest_tbl_name.into());
665
666        self.client
667            .0
668            .alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
669            .await
670            .map_err(from_thrift_error)?;
671
672        Ok(())
673    }
674
675    async fn register_table(
676        &self,
677        _table_ident: &TableIdent,
678        _metadata_location: String,
679    ) -> Result<Table> {
680        Err(Error::new(
681            ErrorKind::FeatureUnsupported,
682            "Registering a table is not supported yet",
683        ))
684    }
685
686    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
687        Err(Error::new(
688            ErrorKind::FeatureUnsupported,
689            "Updating a table is not supported yet",
690        ))
691    }
692}