iceberg_catalog_hms/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::{Debug, Formatter};
20use std::net::ToSocketAddrs;
21
22use anyhow::anyhow;
23use async_trait::async_trait;
24use hive_metastore::{
25    ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
26    ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
27};
28use iceberg::io::FileIO;
29use iceberg::spec::{TableMetadata, TableMetadataBuilder};
30use iceberg::table::Table;
31use iceberg::{
32    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33    TableCommit, TableCreation, TableIdent,
34};
35use volo_thrift::MaybeException;
36
37use super::utils::*;
38use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};
39
40/// HMS catalog address
41pub const HMS_CATALOG_PROP_URI: &str = "uri";
42
43/// HMS Catalog thrift transport
44pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
45/// HMS Catalog framed thrift transport
46pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
47/// HMS Catalog buffered thrift transport
48pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
49
50/// HMS Catalog warehouse location
51pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
52
53/// Builder for [`RestCatalog`].
54#[derive(Debug)]
55pub struct HmsCatalogBuilder(HmsCatalogConfig);
56
57impl Default for HmsCatalogBuilder {
58    fn default() -> Self {
59        Self(HmsCatalogConfig {
60            name: None,
61            address: "".to_string(),
62            thrift_transport: HmsThriftTransport::default(),
63            warehouse: "".to_string(),
64            props: HashMap::new(),
65        })
66    }
67}
68
69impl CatalogBuilder for HmsCatalogBuilder {
70    type C = HmsCatalog;
71
72    fn load(
73        mut self,
74        name: impl Into<String>,
75        props: HashMap<String, String>,
76    ) -> impl Future<Output = Result<Self::C>> + Send {
77        self.0.name = Some(name.into());
78
79        if props.contains_key(HMS_CATALOG_PROP_URI) {
80            self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
81        }
82
83        if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
84            self.0.thrift_transport = match tt.to_lowercase().as_str() {
85                THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
86                THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
87                _ => HmsThriftTransport::default(),
88            };
89        }
90
91        if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
92            self.0.warehouse = props
93                .get(HMS_CATALOG_PROP_WAREHOUSE)
94                .cloned()
95                .unwrap_or_default();
96        }
97
98        self.0.props = props
99            .into_iter()
100            .filter(|(k, _)| {
101                k != HMS_CATALOG_PROP_URI
102                    && k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
103                    && k != HMS_CATALOG_PROP_WAREHOUSE
104            })
105            .collect();
106
107        let result = {
108            if self.0.name.is_none() {
109                Err(Error::new(
110                    ErrorKind::DataInvalid,
111                    "Catalog name is required",
112                ))
113            } else if self.0.address.is_empty() {
114                Err(Error::new(
115                    ErrorKind::DataInvalid,
116                    "Catalog address is required",
117                ))
118            } else if self.0.warehouse.is_empty() {
119                Err(Error::new(
120                    ErrorKind::DataInvalid,
121                    "Catalog warehouse is required",
122                ))
123            } else {
124                HmsCatalog::new(self.0)
125            }
126        };
127
128        std::future::ready(result)
129    }
130}
131
132/// Which variant of the thrift transport to communicate with HMS
133/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
134#[derive(Debug, Default)]
135pub enum HmsThriftTransport {
136    /// Use the framed transport
137    Framed,
138    /// Use the buffered transport (default)
139    #[default]
140    Buffered,
141}
142
143/// Hive metastore Catalog configuration.
144#[derive(Debug)]
145pub(crate) struct HmsCatalogConfig {
146    name: Option<String>,
147    address: String,
148    thrift_transport: HmsThriftTransport,
149    warehouse: String,
150    props: HashMap<String, String>,
151}
152
153struct HmsClient(ThriftHiveMetastoreClient);
154
155/// Hive metastore Catalog.
156pub struct HmsCatalog {
157    config: HmsCatalogConfig,
158    client: HmsClient,
159    file_io: FileIO,
160}
161
162impl Debug for HmsCatalog {
163    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("HmsCatalog")
165            .field("config", &self.config)
166            .finish_non_exhaustive()
167    }
168}
169
170impl HmsCatalog {
171    /// Create a new hms catalog.
172    fn new(config: HmsCatalogConfig) -> Result<Self> {
173        let address = config
174            .address
175            .as_str()
176            .to_socket_addrs()
177            .map_err(from_io_error)?
178            .next()
179            .ok_or_else(|| {
180                Error::new(
181                    ErrorKind::Unexpected,
182                    format!("invalid address: {}", config.address),
183                )
184            })?;
185
186        let builder = ThriftHiveMetastoreClientBuilder::new("hms").address(address);
187
188        let client = match &config.thrift_transport {
189            HmsThriftTransport::Framed => builder
190                .make_codec(volo_thrift::codec::default::DefaultMakeCodec::framed())
191                .build(),
192            HmsThriftTransport::Buffered => builder
193                .make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
194                .build(),
195        };
196
197        let file_io = FileIO::from_path(&config.warehouse)?
198            .with_props(&config.props)
199            .build()?;
200
201        Ok(Self {
202            config,
203            client: HmsClient(client),
204            file_io,
205        })
206    }
207    /// Get the catalogs `FileIO`
208    pub fn file_io(&self) -> FileIO {
209        self.file_io.clone()
210    }
211}
212
213#[async_trait]
214impl Catalog for HmsCatalog {
215    /// HMS doesn't support nested namespaces.
216    ///
217    /// We will return empty list if parent is some.
218    ///
219    /// Align with java implementation: <https://github.com/apache/iceberg/blob/9bd62f79f8cd973c39d14e89163cb1c707470ed2/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java#L305C26-L330>
220    async fn list_namespaces(
221        &self,
222        parent: Option<&NamespaceIdent>,
223    ) -> Result<Vec<NamespaceIdent>> {
224        let dbs = if parent.is_some() {
225            return Ok(vec![]);
226        } else {
227            self.client
228                .0
229                .get_all_databases()
230                .await
231                .map(from_thrift_exception)
232                .map_err(from_thrift_error)??
233        };
234
235        Ok(dbs
236            .into_iter()
237            .map(|v| NamespaceIdent::new(v.into()))
238            .collect())
239    }
240
241    /// Creates a new namespace with the given identifier and properties.
242    ///
243    /// Attempts to create a namespace defined by the `namespace`
244    /// parameter and configured with the specified `properties`.
245    ///
246    /// This function can return an error in the following situations:
247    ///
248    /// - If `hive.metastore.database.owner-type` is specified without
249    /// `hive.metastore.database.owner`,
250    /// - Errors from `validate_namespace` if the namespace identifier does not
251    /// meet validation criteria.
252    /// - Errors from `convert_to_database` if the properties cannot be
253    /// successfully converted into a database configuration.
254    /// - Errors from the underlying database creation process, converted using
255    /// `from_thrift_error`.
256    async fn create_namespace(
257        &self,
258        namespace: &NamespaceIdent,
259        properties: HashMap<String, String>,
260    ) -> Result<Namespace> {
261        let database = convert_to_database(namespace, &properties)?;
262
263        self.client
264            .0
265            .create_database(database)
266            .await
267            .map_err(from_thrift_error)?;
268
269        Ok(Namespace::with_properties(namespace.clone(), properties))
270    }
271
272    /// Retrieves a namespace by its identifier.
273    ///
274    /// Validates the given namespace identifier and then queries the
275    /// underlying database client to fetch the corresponding namespace data.
276    /// Constructs a `Namespace` object with the retrieved data and returns it.
277    ///
278    /// This function can return an error in any of the following situations:
279    /// - If the provided namespace identifier fails validation checks
280    /// - If there is an error querying the database, returned by
281    /// `from_thrift_error`.
282    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
283        let name = validate_namespace(namespace)?;
284
285        let db = self
286            .client
287            .0
288            .get_database(name.into())
289            .await
290            .map(from_thrift_exception)
291            .map_err(from_thrift_error)??;
292
293        let ns = convert_to_namespace(&db)?;
294
295        Ok(ns)
296    }
297
298    /// Checks if a namespace exists within the Hive Metastore.
299    ///
300    /// Validates the namespace identifier by querying the Hive Metastore
301    /// to determine if the specified namespace (database) exists.
302    ///
303    /// # Returns
304    /// A `Result<bool>` indicating the outcome of the check:
305    /// - `Ok(true)` if the namespace exists.
306    /// - `Ok(false)` if the namespace does not exist, identified by a specific
307    /// `UserException` variant.
308    /// - `Err(...)` if an error occurs during validation or the Hive Metastore
309    /// query, with the error encapsulating the issue.
310    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
311        let name = validate_namespace(namespace)?;
312
313        let resp = self.client.0.get_database(name.into()).await;
314
315        match resp {
316            Ok(MaybeException::Ok(_)) => Ok(true),
317            Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) => {
318                Ok(false)
319            }
320            Ok(MaybeException::Exception(exception)) => Err(Error::new(
321                ErrorKind::Unexpected,
322                "Operation failed for hitting thrift error".to_string(),
323            )
324            .with_source(anyhow!("thrift error: {exception:?}"))),
325            Err(err) => Err(from_thrift_error(err)),
326        }
327    }
328
329    /// Asynchronously updates properties of an existing namespace.
330    ///
331    /// Converts the given namespace identifier and properties into a database
332    /// representation and then attempts to update the corresponding namespace
333    /// in the Hive Metastore.
334    ///
335    /// # Returns
336    /// Returns `Ok(())` if the namespace update is successful. If the
337    /// namespace cannot be updated due to missing information or an error
338    /// during the update process, an `Err(...)` is returned.
339    async fn update_namespace(
340        &self,
341        namespace: &NamespaceIdent,
342        properties: HashMap<String, String>,
343    ) -> Result<()> {
344        let db = convert_to_database(namespace, &properties)?;
345
346        let name = match &db.name {
347            Some(name) => name,
348            None => {
349                return Err(Error::new(
350                    ErrorKind::DataInvalid,
351                    "Database name must be specified",
352                ));
353            }
354        };
355
356        self.client
357            .0
358            .alter_database(name.clone(), db)
359            .await
360            .map_err(from_thrift_error)?;
361
362        Ok(())
363    }
364
365    /// Asynchronously drops a namespace from the Hive Metastore.
366    ///
367    /// # Returns
368    /// A `Result<()>` indicating the outcome:
369    /// - `Ok(())` signifies successful namespace deletion.
370    /// - `Err(...)` signifies failure to drop the namespace due to validation
371    /// errors, connectivity issues, or Hive Metastore constraints.
372    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
373        let name = validate_namespace(namespace)?;
374
375        self.client
376            .0
377            .drop_database(name.into(), false, false)
378            .await
379            .map_err(from_thrift_error)?;
380
381        Ok(())
382    }
383
384    /// Asynchronously lists all tables within a specified namespace.
385    ///
386    /// # Returns
387    ///
388    /// A `Result<Vec<TableIdent>>`, which is:
389    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
390    /// representing a table within the specified namespace.
391    /// - `Err(...)` if an error occurs during namespace validation or while
392    /// querying the database.
393    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
394        let name = validate_namespace(namespace)?;
395
396        let tables = self
397            .client
398            .0
399            .get_all_tables(name.into())
400            .await
401            .map(from_thrift_exception)
402            .map_err(from_thrift_error)??;
403
404        let tables = tables
405            .iter()
406            .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
407            .collect();
408
409        Ok(tables)
410    }
411
412    /// Creates a new table within a specified namespace using the provided
413    /// table creation settings.
414    ///
415    /// # Returns
416    /// A `Result` wrapping a `Table` object representing the newly created
417    /// table.
418    ///
419    /// # Errors
420    /// This function may return an error in several cases, including invalid
421    /// namespace identifiers, failure to determine a default storage location,
422    /// issues generating or writing table metadata, and errors communicating
423    /// with the Hive Metastore.
424    async fn create_table(
425        &self,
426        namespace: &NamespaceIdent,
427        mut creation: TableCreation,
428    ) -> Result<Table> {
429        let db_name = validate_namespace(namespace)?;
430        let table_name = creation.name.clone();
431
432        let location = match &creation.location {
433            Some(location) => location.clone(),
434            None => {
435                let ns = self.get_namespace(namespace).await?;
436                let location = get_default_table_location(&ns, &table_name, &self.config.warehouse);
437                creation.location = Some(location.clone());
438                location
439            }
440        };
441        let metadata = TableMetadataBuilder::from_table_creation(creation)?
442            .build()?
443            .metadata;
444
445        let metadata_location =
446            MetadataLocation::new_with_table_location(location.clone()).to_string();
447
448        metadata.write_to(&self.file_io, &metadata_location).await?;
449
450        let hive_table = convert_to_hive_table(
451            db_name.clone(),
452            metadata.current_schema(),
453            table_name.clone(),
454            location,
455            metadata_location.clone(),
456            metadata.properties(),
457        )?;
458
459        self.client
460            .0
461            .create_table(hive_table)
462            .await
463            .map_err(from_thrift_error)?;
464
465        Table::builder()
466            .file_io(self.file_io())
467            .metadata_location(metadata_location)
468            .metadata(metadata)
469            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
470            .build()
471    }
472
473    /// Loads a table from the Hive Metastore and constructs a `Table` object
474    /// based on its metadata.
475    ///
476    /// # Returns
477    /// A `Result` wrapping a `Table` object that represents the loaded table.
478    ///
479    /// # Errors
480    /// This function may return an error in several scenarios, including:
481    /// - Failure to validate the namespace.
482    /// - Failure to retrieve the table from the Hive Metastore.
483    /// - Absence of metadata location information in the table's properties.
484    /// - Issues reading or deserializing the table's metadata file.
485    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
486        let db_name = validate_namespace(table.namespace())?;
487
488        let hive_table = self
489            .client
490            .0
491            .get_table(db_name.clone().into(), table.name.clone().into())
492            .await
493            .map(from_thrift_exception)
494            .map_err(from_thrift_error)??;
495
496        let metadata_location = get_metadata_location(&hive_table.parameters)?;
497
498        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
499
500        Table::builder()
501            .file_io(self.file_io())
502            .metadata_location(metadata_location)
503            .metadata(metadata)
504            .identifier(TableIdent::new(
505                NamespaceIdent::new(db_name),
506                table.name.clone(),
507            ))
508            .build()
509    }
510
511    /// Asynchronously drops a table from the database.
512    ///
513    /// # Errors
514    /// Returns an error if:
515    /// - The namespace provided in `table` cannot be validated
516    /// or does not exist.
517    /// - The underlying database client encounters an error while
518    /// attempting to drop the table. This includes scenarios where
519    /// the table does not exist.
520    /// - Any network or communication error occurs with the database backend.
521    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
522        let db_name = validate_namespace(table.namespace())?;
523
524        self.client
525            .0
526            .drop_table(db_name.into(), table.name.clone().into(), false)
527            .await
528            .map_err(from_thrift_error)?;
529
530        Ok(())
531    }
532
533    /// Asynchronously checks the existence of a specified table
534    /// in the database.
535    ///
536    /// # Returns
537    /// - `Ok(true)` if the table exists in the database.
538    /// - `Ok(false)` if the table does not exist in the database.
539    /// - `Err(...)` if an error occurs during the process
540    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
541        let db_name = validate_namespace(table.namespace())?;
542        let table_name = table.name.clone();
543
544        let resp = self
545            .client
546            .0
547            .get_table(db_name.into(), table_name.into())
548            .await;
549
550        match resp {
551            Ok(MaybeException::Ok(_)) => Ok(true),
552            Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => Ok(false),
553            Ok(MaybeException::Exception(exception)) => Err(Error::new(
554                ErrorKind::Unexpected,
555                "Operation failed for hitting thrift error".to_string(),
556            )
557            .with_source(anyhow!("thrift error: {exception:?}"))),
558            Err(err) => Err(from_thrift_error(err)),
559        }
560    }
561
562    /// Asynchronously renames a table within the database
563    /// or moves it between namespaces (databases).
564    ///
565    /// # Returns
566    /// - `Ok(())` on successful rename or move of the table.
567    /// - `Err(...)` if an error occurs during the process.
568    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
569        let src_dbname = validate_namespace(src.namespace())?;
570        let dest_dbname = validate_namespace(dest.namespace())?;
571
572        let src_tbl_name = src.name.clone();
573        let dest_tbl_name = dest.name.clone();
574
575        let mut tbl = self
576            .client
577            .0
578            .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
579            .await
580            .map(from_thrift_exception)
581            .map_err(from_thrift_error)??;
582
583        tbl.db_name = Some(dest_dbname.into());
584        tbl.table_name = Some(dest_tbl_name.into());
585
586        self.client
587            .0
588            .alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
589            .await
590            .map_err(from_thrift_error)?;
591
592        Ok(())
593    }
594
595    async fn register_table(
596        &self,
597        _table_ident: &TableIdent,
598        _metadata_location: String,
599    ) -> Result<Table> {
600        Err(Error::new(
601            ErrorKind::FeatureUnsupported,
602            "Registering a table is not supported yet",
603        ))
604    }
605
606    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
607        Err(Error::new(
608            ErrorKind::FeatureUnsupported,
609            "Updating a table is not supported yet",
610        ))
611    }
612}