iceberg_catalog_rest/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains the iceberg REST catalog implementation.
19
20use std::collections::HashMap;
21use std::future::Future;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
27use iceberg::table::Table;
28use iceberg::{
29    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30    TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34    HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41    HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44    CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45    CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46    NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49/// REST catalog URI
50pub const REST_CATALOG_PROP_URI: &str = "uri";
51/// REST catalog warehouse location
52pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53/// Disable header redaction in error logs (defaults to false for security)
54pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60/// Builder for [`RestCatalog`].
61#[derive(Debug)]
62pub struct RestCatalogBuilder {
63    config: RestCatalogConfig,
64    storage_factory: Option<Arc<dyn StorageFactory>>,
65}
66
67impl Default for RestCatalogBuilder {
68    fn default() -> Self {
69        Self {
70            config: RestCatalogConfig {
71                name: None,
72                uri: "".to_string(),
73                warehouse: None,
74                props: HashMap::new(),
75                client: None,
76            },
77            storage_factory: None,
78        }
79    }
80}
81
82impl CatalogBuilder for RestCatalogBuilder {
83    type C = RestCatalog;
84
85    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
86        self.storage_factory = Some(storage_factory);
87        self
88    }
89
90    fn load(
91        mut self,
92        name: impl Into<String>,
93        props: HashMap<String, String>,
94    ) -> impl Future<Output = Result<Self::C>> + Send {
95        self.config.name = Some(name.into());
96
97        if props.contains_key(REST_CATALOG_PROP_URI) {
98            self.config.uri = props
99                .get(REST_CATALOG_PROP_URI)
100                .cloned()
101                .unwrap_or_default();
102        }
103
104        if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
105            self.config.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
106        }
107
108        // Collect other remaining properties
109        self.config.props = props
110            .into_iter()
111            .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
112            .collect();
113
114        let result = {
115            if self.config.name.is_none() {
116                Err(Error::new(
117                    ErrorKind::DataInvalid,
118                    "Catalog name is required",
119                ))
120            } else if self.config.uri.is_empty() {
121                Err(Error::new(
122                    ErrorKind::DataInvalid,
123                    "Catalog uri is required",
124                ))
125            } else {
126                Ok(RestCatalog::new(self.config, self.storage_factory))
127            }
128        };
129
130        std::future::ready(result)
131    }
132}
133
134impl RestCatalogBuilder {
135    /// Configures the catalog with a custom HTTP client.
136    pub fn with_client(mut self, client: Client) -> Self {
137        self.config.client = Some(client);
138        self
139    }
140}
141
142/// Rest catalog configuration.
143#[derive(Clone, Debug, TypedBuilder)]
144pub(crate) struct RestCatalogConfig {
145    #[builder(default, setter(strip_option))]
146    name: Option<String>,
147
148    uri: String,
149
150    #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
151    warehouse: Option<String>,
152
153    #[builder(default)]
154    props: HashMap<String, String>,
155
156    #[builder(default)]
157    client: Option<Client>,
158}
159
160impl RestCatalogConfig {
161    fn url_prefixed(&self, parts: &[&str]) -> String {
162        [&self.uri, PATH_V1]
163            .into_iter()
164            .chain(self.props.get("prefix").map(|s| &**s))
165            .chain(parts.iter().cloned())
166            .join("/")
167    }
168
169    fn config_endpoint(&self) -> String {
170        [&self.uri, PATH_V1, "config"].join("/")
171    }
172
173    pub(crate) fn get_token_endpoint(&self) -> String {
174        if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
175            oauth2_uri.to_string()
176        } else {
177            [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
178        }
179    }
180
181    fn namespaces_endpoint(&self) -> String {
182        self.url_prefixed(&["namespaces"])
183    }
184
185    fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
186        self.url_prefixed(&["namespaces", &ns.to_url_string()])
187    }
188
189    fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
190        self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
191    }
192
193    fn rename_table_endpoint(&self) -> String {
194        self.url_prefixed(&["tables", "rename"])
195    }
196
197    fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
198        self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
199    }
200
201    fn table_endpoint(&self, table: &TableIdent) -> String {
202        self.url_prefixed(&[
203            "namespaces",
204            &table.namespace.to_url_string(),
205            "tables",
206            &table.name,
207        ])
208    }
209
210    /// Get the client from the config.
211    pub(crate) fn client(&self) -> Option<Client> {
212        self.client.clone()
213    }
214
215    /// Get the token from the config.
216    ///
217    /// The client can use this token to send requests.
218    pub(crate) fn token(&self) -> Option<String> {
219        self.props.get("token").cloned()
220    }
221
222    /// Get the credentials from the config. The client can use these credentials to fetch a new
223    /// token.
224    ///
225    /// ## Output
226    ///
227    /// - `None`: No credential is set.
228    /// - `Some(None, client_secret)`: No client_id is set, use client_secret directly.
229    /// - `Some(Some(client_id), client_secret)`: Both client_id and client_secret are set.
230    pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
231        let cred = self.props.get("credential")?;
232
233        match cred.split_once(':') {
234            Some((client_id, client_secret)) => {
235                Some((Some(client_id.to_string()), client_secret.to_string()))
236            }
237            None => Some((None, cred.to_string())),
238        }
239    }
240
241    /// Get the extra headers from config, which includes:
242    ///
243    /// - `content-type`
244    /// - `x-client-version`
245    /// - `user-agent`
246    /// - All headers specified by `header.xxx` in props.
247    pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
248        let mut headers = HeaderMap::from_iter([
249            (
250                header::CONTENT_TYPE,
251                HeaderValue::from_static("application/json"),
252            ),
253            (
254                HeaderName::from_static("x-client-version"),
255                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
256            ),
257            (
258                header::USER_AGENT,
259                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
260            ),
261        ]);
262
263        for (key, value) in self
264            .props
265            .iter()
266            .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
267        {
268            headers.insert(
269                HeaderName::from_str(key).map_err(|e| {
270                    Error::new(
271                        ErrorKind::DataInvalid,
272                        format!("Invalid header name: {key}"),
273                    )
274                    .with_source(e)
275                })?,
276                HeaderValue::from_str(value).map_err(|e| {
277                    Error::new(
278                        ErrorKind::DataInvalid,
279                        format!("Invalid header value: {value}"),
280                    )
281                    .with_source(e)
282                })?,
283            );
284        }
285
286        Ok(headers)
287    }
288
289    /// Get the optional OAuth headers from the config.
290    pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
291        let mut params = HashMap::new();
292
293        if let Some(scope) = self.props.get("scope") {
294            params.insert("scope".to_string(), scope.to_string());
295        } else {
296            params.insert("scope".to_string(), "catalog".to_string());
297        }
298
299        let optional_params = ["audience", "resource"];
300        for param_name in optional_params {
301            if let Some(value) = self.props.get(param_name) {
302                params.insert(param_name.to_string(), value.to_string());
303            }
304        }
305
306        params
307    }
308
309    /// Check if header redaction is disabled in error logs.
310    ///
311    /// Returns true if the `disable-header-redaction` property is set to "true".
312    /// Defaults to false for security (headers are redacted by default).
313    pub(crate) fn disable_header_redaction(&self) -> bool {
314        self.props
315            .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
316            .map(|v| v.eq_ignore_ascii_case("true"))
317            .unwrap_or(false)
318    }
319
320    /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server).
321    pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
322        if let Some(uri) = config.overrides.remove("uri") {
323            self.uri = uri;
324        }
325
326        let mut props = config.defaults;
327        props.extend(self.props);
328        props.extend(config.overrides);
329
330        self.props = props;
331        self
332    }
333}
334
335#[derive(Debug)]
336struct RestContext {
337    client: HttpClient,
338    /// Runtime config is fetched from rest server and stored here.
339    ///
340    /// It's could be different from the user config.
341    config: RestCatalogConfig,
342}
343
344/// Rest catalog implementation.
345#[derive(Debug)]
346pub struct RestCatalog {
347    /// User config is stored as-is and never be changed.
348    ///
349    /// It could be different from the config fetched from the server and used at runtime.
350    user_config: RestCatalogConfig,
351    ctx: OnceCell<RestContext>,
352    /// Storage factory for creating FileIO instances.
353    storage_factory: Option<Arc<dyn StorageFactory>>,
354}
355
356impl RestCatalog {
357    /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
358    fn new(config: RestCatalogConfig, storage_factory: Option<Arc<dyn StorageFactory>>) -> Self {
359        Self {
360            user_config: config,
361            ctx: OnceCell::new(),
362            storage_factory,
363        }
364    }
365
366    /// Sends a DELETE request for the given table, optionally requesting purge.
367    async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> {
368        let context = self.context().await?;
369
370        let mut request_builder = context
371            .client
372            .request(Method::DELETE, context.config.table_endpoint(table));
373
374        if purge {
375            request_builder = request_builder.query(&[("purgeRequested", "true")]);
376        }
377
378        let request = request_builder.build()?;
379        let http_response = context.client.query_catalog(request).await?;
380
381        match http_response.status() {
382            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
383            StatusCode::NOT_FOUND => Err(Error::new(
384                ErrorKind::TableNotFound,
385                "Tried to drop a table that does not exist",
386            )),
387            _ => Err(deserialize_unexpected_catalog_error(
388                http_response,
389                context.client.disable_header_redaction(),
390            )
391            .await),
392        }
393    }
394
395    /// Gets the [`RestContext`] from the catalog.
396    async fn context(&self) -> Result<&RestContext> {
397        self.ctx
398            .get_or_try_init(|| async {
399                let client = HttpClient::new(&self.user_config)?;
400                let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
401                let config = self.user_config.clone().merge_with_config(catalog_config);
402                let client = client.update_with(&config)?;
403
404                Ok(RestContext { config, client })
405            })
406            .await
407    }
408
409    /// Load the runtime config from the server by `user_config`.
410    ///
411    /// It's required for a REST catalog to update its config after creation.
412    async fn load_config(
413        client: &HttpClient,
414        user_config: &RestCatalogConfig,
415    ) -> Result<CatalogConfig> {
416        let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
417
418        if let Some(warehouse_location) = &user_config.warehouse {
419            request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
420        }
421
422        let request = request_builder.build()?;
423
424        let http_response = client.query_catalog(request).await?;
425
426        match http_response.status() {
427            StatusCode::OK => deserialize_catalog_response(http_response).await,
428            _ => Err(deserialize_unexpected_catalog_error(
429                http_response,
430                client.disable_header_redaction(),
431            )
432            .await),
433        }
434    }
435
436    async fn load_file_io(
437        &self,
438        metadata_location: Option<&str>,
439        extra_config: Option<HashMap<String, String>>,
440    ) -> Result<FileIO> {
441        let mut props = self.context().await?.config.props.clone();
442        if let Some(config) = extra_config {
443            props.extend(config);
444        }
445
446        // If the warehouse is a logical identifier instead of a URL we don't want
447        // to raise an exception
448        let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
449            Some(url) if Url::parse(url).is_ok() => Some(url),
450            Some(_) => None,
451            None => None,
452        };
453
454        if metadata_location.or(warehouse_path).is_none() {
455            return Err(Error::new(
456                ErrorKind::Unexpected,
457                "Unable to load file io, neither warehouse nor metadata location is set!",
458            ));
459        }
460
461        // Require a StorageFactory to be provided
462        let factory = self
463            .storage_factory
464            .clone()
465            .ok_or_else(|| {
466                Error::new(
467                    ErrorKind::Unexpected,
468                    "StorageFactory must be provided for RestCatalog. Use `with_storage_factory` to configure it.",
469                )
470            })?;
471
472        let file_io = FileIOBuilder::new(factory).with_props(props).build();
473
474        Ok(file_io)
475    }
476
477    /// Invalidate the current token without generating a new one. On the next request, the client
478    /// will attempt to generate a new token.
479    pub async fn invalidate_token(&self) -> Result<()> {
480        self.context().await?.client.invalidate_token().await
481    }
482
483    /// Invalidate the current token and set a new one. Generates a new token before invalidating
484    /// the current token, meaning the old token will be used until this function acquires the lock
485    /// and overwrites the token.
486    ///
487    /// If credential is invalid, or the request fails, this method will return an error and leave
488    /// the current token unchanged.
489    pub async fn regenerate_token(&self) -> Result<()> {
490        self.context().await?.client.regenerate_token().await
491    }
492}
493
494/// All requests and expected responses are derived from the REST catalog API spec:
495/// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
496#[async_trait]
497impl Catalog for RestCatalog {
498    async fn list_namespaces(
499        &self,
500        parent: Option<&NamespaceIdent>,
501    ) -> Result<Vec<NamespaceIdent>> {
502        let context = self.context().await?;
503        let endpoint = context.config.namespaces_endpoint();
504        let mut namespaces = Vec::new();
505        let mut next_token = None;
506
507        loop {
508            let mut request = context.client.request(Method::GET, endpoint.clone());
509
510            // Filter on `parent={namespace}` if a parent namespace exists.
511            if let Some(ns) = parent {
512                request = request.query(&[("parent", ns.to_url_string())]);
513            }
514
515            if let Some(token) = next_token {
516                request = request.query(&[("pageToken", token)]);
517            }
518
519            let http_response = context.client.query_catalog(request.build()?).await?;
520
521            match http_response.status() {
522                StatusCode::OK => {
523                    let response =
524                        deserialize_catalog_response::<ListNamespaceResponse>(http_response)
525                            .await?;
526
527                    namespaces.extend(response.namespaces);
528
529                    match response.next_page_token {
530                        Some(token) => next_token = Some(token),
531                        None => break,
532                    }
533                }
534                StatusCode::NOT_FOUND => {
535                    return Err(Error::new(
536                        ErrorKind::NamespaceNotFound,
537                        "The parent parameter of the namespace provided does not exist",
538                    ));
539                }
540                _ => {
541                    return Err(deserialize_unexpected_catalog_error(
542                        http_response,
543                        context.client.disable_header_redaction(),
544                    )
545                    .await);
546                }
547            }
548        }
549
550        Ok(namespaces)
551    }
552
553    async fn create_namespace(
554        &self,
555        namespace: &NamespaceIdent,
556        properties: HashMap<String, String>,
557    ) -> Result<Namespace> {
558        let context = self.context().await?;
559
560        let request = context
561            .client
562            .request(Method::POST, context.config.namespaces_endpoint())
563            .json(&CreateNamespaceRequest {
564                namespace: namespace.clone(),
565                properties,
566            })
567            .build()?;
568
569        let http_response = context.client.query_catalog(request).await?;
570
571        match http_response.status() {
572            StatusCode::OK => {
573                let response =
574                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
575                Ok(Namespace::from(response))
576            }
577            StatusCode::CONFLICT => Err(Error::new(
578                ErrorKind::NamespaceAlreadyExists,
579                "Tried to create a namespace that already exists",
580            )),
581            _ => Err(deserialize_unexpected_catalog_error(
582                http_response,
583                context.client.disable_header_redaction(),
584            )
585            .await),
586        }
587    }
588
589    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
590        let context = self.context().await?;
591
592        let request = context
593            .client
594            .request(Method::GET, context.config.namespace_endpoint(namespace))
595            .build()?;
596
597        let http_response = context.client.query_catalog(request).await?;
598
599        match http_response.status() {
600            StatusCode::OK => {
601                let response =
602                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
603                Ok(Namespace::from(response))
604            }
605            StatusCode::NOT_FOUND => Err(Error::new(
606                ErrorKind::NamespaceNotFound,
607                "Tried to get a namespace that does not exist",
608            )),
609            _ => Err(deserialize_unexpected_catalog_error(
610                http_response,
611                context.client.disable_header_redaction(),
612            )
613            .await),
614        }
615    }
616
617    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
618        let context = self.context().await?;
619
620        let request = context
621            .client
622            .request(Method::HEAD, context.config.namespace_endpoint(ns))
623            .build()?;
624
625        let http_response = context.client.query_catalog(request).await?;
626
627        match http_response.status() {
628            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
629            StatusCode::NOT_FOUND => Ok(false),
630            _ => Err(deserialize_unexpected_catalog_error(
631                http_response,
632                context.client.disable_header_redaction(),
633            )
634            .await),
635        }
636    }
637
638    async fn update_namespace(
639        &self,
640        _namespace: &NamespaceIdent,
641        _properties: HashMap<String, String>,
642    ) -> Result<()> {
643        Err(Error::new(
644            ErrorKind::FeatureUnsupported,
645            "Updating namespace not supported yet!",
646        ))
647    }
648
649    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
650        let context = self.context().await?;
651
652        let request = context
653            .client
654            .request(Method::DELETE, context.config.namespace_endpoint(namespace))
655            .build()?;
656
657        let http_response = context.client.query_catalog(request).await?;
658
659        match http_response.status() {
660            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
661            StatusCode::NOT_FOUND => Err(Error::new(
662                ErrorKind::NamespaceNotFound,
663                "Tried to drop a namespace that does not exist",
664            )),
665            _ => Err(deserialize_unexpected_catalog_error(
666                http_response,
667                context.client.disable_header_redaction(),
668            )
669            .await),
670        }
671    }
672
673    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
674        let context = self.context().await?;
675        let endpoint = context.config.tables_endpoint(namespace);
676        let mut identifiers = Vec::new();
677        let mut next_token = None;
678
679        loop {
680            let mut request = context.client.request(Method::GET, endpoint.clone());
681
682            if let Some(token) = next_token {
683                request = request.query(&[("pageToken", token)]);
684            }
685
686            let http_response = context.client.query_catalog(request.build()?).await?;
687
688            match http_response.status() {
689                StatusCode::OK => {
690                    let response =
691                        deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
692
693                    identifiers.extend(response.identifiers);
694
695                    match response.next_page_token {
696                        Some(token) => next_token = Some(token),
697                        None => break,
698                    }
699                }
700                StatusCode::NOT_FOUND => {
701                    return Err(Error::new(
702                        ErrorKind::NamespaceNotFound,
703                        "Tried to list tables of a namespace that does not exist",
704                    ));
705                }
706                _ => {
707                    return Err(deserialize_unexpected_catalog_error(
708                        http_response,
709                        context.client.disable_header_redaction(),
710                    )
711                    .await);
712                }
713            }
714        }
715
716        Ok(identifiers)
717    }
718
719    /// Create a new table inside the namespace.
720    ///
721    /// In the resulting table, if there are any config properties that
722    /// are present in both the response from the REST server and the
723    /// config provided when creating this `RestCatalog` instance then
724    /// the value provided locally to the `RestCatalog` will take precedence.
725    async fn create_table(
726        &self,
727        namespace: &NamespaceIdent,
728        creation: TableCreation,
729    ) -> Result<Table> {
730        let context = self.context().await?;
731
732        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
733
734        let request = context
735            .client
736            .request(Method::POST, context.config.tables_endpoint(namespace))
737            .json(&CreateTableRequest {
738                name: creation.name,
739                location: creation.location,
740                schema: creation.schema,
741                partition_spec: creation.partition_spec,
742                write_order: creation.sort_order,
743                stage_create: Some(false),
744                properties: creation.properties,
745            })
746            .build()?;
747
748        let http_response = context.client.query_catalog(request).await?;
749
750        let response = match http_response.status() {
751            StatusCode::OK => {
752                deserialize_catalog_response::<LoadTableResult>(http_response).await?
753            }
754            StatusCode::NOT_FOUND => {
755                return Err(Error::new(
756                    ErrorKind::NamespaceNotFound,
757                    "Tried to create a table under a namespace that does not exist",
758                ));
759            }
760            StatusCode::CONFLICT => {
761                return Err(Error::new(
762                    ErrorKind::TableAlreadyExists,
763                    "The table already exists",
764                ));
765            }
766            _ => {
767                return Err(deserialize_unexpected_catalog_error(
768                    http_response,
769                    context.client.disable_header_redaction(),
770                )
771                .await);
772            }
773        };
774
775        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
776            ErrorKind::DataInvalid,
777            "Metadata location missing in `create_table` response!",
778        ))?;
779
780        let config = response
781            .config
782            .into_iter()
783            .chain(self.user_config.props.clone())
784            .collect();
785
786        let file_io = self
787            .load_file_io(Some(metadata_location), Some(config))
788            .await?;
789
790        let table_builder = Table::builder()
791            .identifier(table_ident.clone())
792            .file_io(file_io)
793            .metadata(response.metadata);
794
795        if let Some(metadata_location) = response.metadata_location {
796            table_builder.metadata_location(metadata_location).build()
797        } else {
798            table_builder.build()
799        }
800    }
801
802    /// Load table from the catalog.
803    ///
804    /// If there are any config properties that are present in both the response from the REST
805    /// server and the config provided when creating this `RestCatalog` instance, then the value
806    /// provided locally to the `RestCatalog` will take precedence.
807    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
808        let context = self.context().await?;
809
810        let request = context
811            .client
812            .request(Method::GET, context.config.table_endpoint(table_ident))
813            .build()?;
814
815        let http_response = context.client.query_catalog(request).await?;
816
817        let response = match http_response.status() {
818            StatusCode::OK | StatusCode::NOT_MODIFIED => {
819                deserialize_catalog_response::<LoadTableResult>(http_response).await?
820            }
821            StatusCode::NOT_FOUND => {
822                return Err(Error::new(
823                    ErrorKind::TableNotFound,
824                    "Tried to load a table that does not exist",
825                ));
826            }
827            _ => {
828                return Err(deserialize_unexpected_catalog_error(
829                    http_response,
830                    context.client.disable_header_redaction(),
831                )
832                .await);
833            }
834        };
835
836        let config = response
837            .config
838            .into_iter()
839            .chain(self.user_config.props.clone())
840            .collect();
841
842        let file_io = self
843            .load_file_io(response.metadata_location.as_deref(), Some(config))
844            .await?;
845
846        let table_builder = Table::builder()
847            .identifier(table_ident.clone())
848            .file_io(file_io)
849            .metadata(response.metadata);
850
851        if let Some(metadata_location) = response.metadata_location {
852            table_builder.metadata_location(metadata_location).build()
853        } else {
854            table_builder.build()
855        }
856    }
857
858    /// Drop a table from the catalog.
859    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
860        self.delete_table(table, false).await
861    }
862
863    /// Drop a table from the catalog and purge its data by sending
864    /// `purgeRequested=true` to the REST server.
865    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
866        self.delete_table(table, true).await
867    }
868
869    /// Check if a table exists in the catalog.
870    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
871        let context = self.context().await?;
872
873        let request = context
874            .client
875            .request(Method::HEAD, context.config.table_endpoint(table))
876            .build()?;
877
878        let http_response = context.client.query_catalog(request).await?;
879
880        match http_response.status() {
881            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
882            StatusCode::NOT_FOUND => Ok(false),
883            _ => Err(deserialize_unexpected_catalog_error(
884                http_response,
885                context.client.disable_header_redaction(),
886            )
887            .await),
888        }
889    }
890
891    /// Rename a table in the catalog.
892    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
893        let context = self.context().await?;
894
895        let request = context
896            .client
897            .request(Method::POST, context.config.rename_table_endpoint())
898            .json(&RenameTableRequest {
899                source: src.clone(),
900                destination: dest.clone(),
901            })
902            .build()?;
903
904        let http_response = context.client.query_catalog(request).await?;
905
906        match http_response.status() {
907            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
908            StatusCode::NOT_FOUND => Err(Error::new(
909                ErrorKind::TableNotFound,
910                "Tried to rename a table that does not exist (is the namespace correct?)",
911            )),
912            StatusCode::CONFLICT => Err(Error::new(
913                ErrorKind::TableAlreadyExists,
914                "Tried to rename a table to a name that already exists",
915            )),
916            _ => Err(deserialize_unexpected_catalog_error(
917                http_response,
918                context.client.disable_header_redaction(),
919            )
920            .await),
921        }
922    }
923
924    async fn register_table(
925        &self,
926        table_ident: &TableIdent,
927        metadata_location: String,
928    ) -> Result<Table> {
929        let context = self.context().await?;
930
931        let request = context
932            .client
933            .request(
934                Method::POST,
935                context
936                    .config
937                    .register_table_endpoint(table_ident.namespace()),
938            )
939            .json(&RegisterTableRequest {
940                name: table_ident.name.clone(),
941                metadata_location: metadata_location.clone(),
942                overwrite: Some(false),
943            })
944            .build()?;
945
946        let http_response = context.client.query_catalog(request).await?;
947
948        let response: LoadTableResult = match http_response.status() {
949            StatusCode::OK => {
950                deserialize_catalog_response::<LoadTableResult>(http_response).await?
951            }
952            StatusCode::NOT_FOUND => {
953                return Err(Error::new(
954                    ErrorKind::NamespaceNotFound,
955                    "The namespace specified does not exist.",
956                ));
957            }
958            StatusCode::CONFLICT => {
959                return Err(Error::new(
960                    ErrorKind::TableAlreadyExists,
961                    "The given table already exists.",
962                ));
963            }
964            _ => {
965                return Err(deserialize_unexpected_catalog_error(
966                    http_response,
967                    context.client.disable_header_redaction(),
968                )
969                .await);
970            }
971        };
972
973        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
974            ErrorKind::DataInvalid,
975            "Metadata location missing in `register_table` response!",
976        ))?;
977
978        let file_io = self.load_file_io(Some(metadata_location), None).await?;
979
980        Table::builder()
981            .identifier(table_ident.clone())
982            .file_io(file_io)
983            .metadata(response.metadata)
984            .metadata_location(metadata_location.clone())
985            .build()
986    }
987
988    async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
989        let context = self.context().await?;
990
991        let request = context
992            .client
993            .request(
994                Method::POST,
995                context.config.table_endpoint(commit.identifier()),
996            )
997            .json(&CommitTableRequest {
998                identifier: Some(commit.identifier().clone()),
999                requirements: commit.take_requirements(),
1000                updates: commit.take_updates(),
1001            })
1002            .build()?;
1003
1004        let http_response = context.client.query_catalog(request).await?;
1005
1006        let response: CommitTableResponse = match http_response.status() {
1007            StatusCode::OK => deserialize_catalog_response(http_response).await?,
1008            StatusCode::NOT_FOUND => {
1009                return Err(Error::new(
1010                    ErrorKind::TableNotFound,
1011                    "Tried to update a table that does not exist",
1012                ));
1013            }
1014            StatusCode::CONFLICT => {
1015                return Err(Error::new(
1016                    ErrorKind::CatalogCommitConflicts,
1017                    "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
1018                )
1019                .with_retryable(true));
1020            }
1021            StatusCode::INTERNAL_SERVER_ERROR => {
1022                return Err(Error::new(
1023                    ErrorKind::Unexpected,
1024                    "An unknown server-side problem occurred; the commit state is unknown.",
1025                ));
1026            }
1027            StatusCode::BAD_GATEWAY => {
1028                return Err(Error::new(
1029                    ErrorKind::Unexpected,
1030                    "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1031                ));
1032            }
1033            StatusCode::GATEWAY_TIMEOUT => {
1034                return Err(Error::new(
1035                    ErrorKind::Unexpected,
1036                    "A server-side gateway timeout occurred; the commit state is unknown.",
1037                ));
1038            }
1039            _ => {
1040                return Err(deserialize_unexpected_catalog_error(
1041                    http_response,
1042                    context.client.disable_header_redaction(),
1043                )
1044                .await);
1045            }
1046        };
1047
1048        let file_io = self
1049            .load_file_io(Some(&response.metadata_location), None)
1050            .await?;
1051
1052        Table::builder()
1053            .identifier(commit.identifier().clone())
1054            .file_io(file_io)
1055            .metadata(response.metadata)
1056            .metadata_location(response.metadata_location)
1057            .build()
1058    }
1059}
1060
1061#[cfg(test)]
1062mod tests {
1063    use std::fs::File;
1064    use std::io::BufReader;
1065    use std::sync::Arc;
1066
1067    use chrono::{TimeZone, Utc};
1068    use iceberg::io::LocalFsStorageFactory;
1069    use iceberg::spec::{
1070        FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1071        SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1072        UnboundPartitionField, UnboundPartitionSpec,
1073    };
1074    use iceberg::transaction::{ApplyTransactionAction, Transaction};
1075    use mockito::{Mock, Server, ServerGuard};
1076    use serde_json::json;
1077    use uuid::uuid;
1078
1079    use super::*;
1080
1081    #[tokio::test]
1082    async fn test_update_config() {
1083        let mut server = Server::new_async().await;
1084
1085        let config_mock = server
1086            .mock("GET", "/v1/config")
1087            .with_status(200)
1088            .with_body(
1089                r#"{
1090                "overrides": {
1091                    "warehouse": "s3://iceberg-catalog"
1092                },
1093                "defaults": {}
1094            }"#,
1095            )
1096            .create_async()
1097            .await;
1098
1099        let catalog = RestCatalog::new(
1100            RestCatalogConfig::builder().uri(server.url()).build(),
1101            Some(Arc::new(LocalFsStorageFactory)),
1102        );
1103
1104        assert_eq!(
1105            catalog
1106                .context()
1107                .await
1108                .unwrap()
1109                .config
1110                .props
1111                .get("warehouse"),
1112            Some(&"s3://iceberg-catalog".to_string())
1113        );
1114
1115        config_mock.assert_async().await;
1116    }
1117
1118    async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1119        server
1120            .mock("GET", "/v1/config")
1121            .with_status(200)
1122            .with_body(
1123                r#"{
1124                "overrides": {
1125                    "warehouse": "s3://iceberg-catalog"
1126                },
1127                "defaults": {}
1128            }"#,
1129            )
1130            .create_async()
1131            .await
1132    }
1133
1134    async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1135        create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1136    }
1137
1138    async fn create_oauth_mock_with_path(
1139        server: &mut ServerGuard,
1140        path: &str,
1141        token: &str,
1142        status: usize,
1143    ) -> Mock {
1144        let body = format!(
1145            r#"{{
1146                "access_token": "{token}",
1147                "token_type": "Bearer",
1148                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1149                "expires_in": 86400
1150            }}"#
1151        );
1152        server
1153            .mock("POST", path)
1154            .with_status(status)
1155            .with_body(body)
1156            .expect(1)
1157            .create_async()
1158            .await
1159    }
1160
1161    #[tokio::test]
1162    async fn test_oauth() {
1163        let mut server = Server::new_async().await;
1164        let oauth_mock = create_oauth_mock(&mut server).await;
1165        let config_mock = create_config_mock(&mut server).await;
1166
1167        let mut props = HashMap::new();
1168        props.insert("credential".to_string(), "client1:secret1".to_string());
1169
1170        let catalog = RestCatalog::new(
1171            RestCatalogConfig::builder()
1172                .uri(server.url())
1173                .props(props)
1174                .build(),
1175            Some(Arc::new(LocalFsStorageFactory)),
1176        );
1177
1178        let token = catalog.context().await.unwrap().client.token().await;
1179        oauth_mock.assert_async().await;
1180        config_mock.assert_async().await;
1181        assert_eq!(token, Some("ey000000000000".to_string()));
1182    }
1183
1184    #[tokio::test]
1185    async fn test_oauth_with_optional_param() {
1186        let mut props = HashMap::new();
1187        props.insert("credential".to_string(), "client1:secret1".to_string());
1188        props.insert("scope".to_string(), "custom_scope".to_string());
1189        props.insert("audience".to_string(), "custom_audience".to_string());
1190        props.insert("resource".to_string(), "custom_resource".to_string());
1191
1192        let mut server = Server::new_async().await;
1193        let oauth_mock = server
1194            .mock("POST", "/v1/oauth/tokens")
1195            .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1196            .match_body(mockito::Matcher::Regex(
1197                "audience=custom_audience".to_string(),
1198            ))
1199            .match_body(mockito::Matcher::Regex(
1200                "resource=custom_resource".to_string(),
1201            ))
1202            .with_status(200)
1203            .with_body(
1204                r#"{
1205                "access_token": "ey000000000000",
1206                "token_type": "Bearer",
1207                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1208                "expires_in": 86400
1209                }"#,
1210            )
1211            .expect(1)
1212            .create_async()
1213            .await;
1214
1215        let config_mock = create_config_mock(&mut server).await;
1216
1217        let catalog = RestCatalog::new(
1218            RestCatalogConfig::builder()
1219                .uri(server.url())
1220                .props(props)
1221                .build(),
1222            Some(Arc::new(LocalFsStorageFactory)),
1223        );
1224
1225        let token = catalog.context().await.unwrap().client.token().await;
1226
1227        oauth_mock.assert_async().await;
1228        config_mock.assert_async().await;
1229        assert_eq!(token, Some("ey000000000000".to_string()));
1230    }
1231
1232    #[tokio::test]
1233    async fn test_invalidate_token() {
1234        let mut server = Server::new_async().await;
1235        let oauth_mock = create_oauth_mock(&mut server).await;
1236        let config_mock = create_config_mock(&mut server).await;
1237
1238        let mut props = HashMap::new();
1239        props.insert("credential".to_string(), "client1:secret1".to_string());
1240
1241        let catalog = RestCatalog::new(
1242            RestCatalogConfig::builder()
1243                .uri(server.url())
1244                .props(props)
1245                .build(),
1246            Some(Arc::new(LocalFsStorageFactory)),
1247        );
1248
1249        let token = catalog.context().await.unwrap().client.token().await;
1250        oauth_mock.assert_async().await;
1251        config_mock.assert_async().await;
1252        assert_eq!(token, Some("ey000000000000".to_string()));
1253
1254        let oauth_mock =
1255            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1256                .await;
1257        catalog.invalidate_token().await.unwrap();
1258        let token = catalog.context().await.unwrap().client.token().await;
1259        oauth_mock.assert_async().await;
1260        assert_eq!(token, Some("ey000000000001".to_string()));
1261    }
1262
1263    #[tokio::test]
1264    async fn test_invalidate_token_failing_request() {
1265        let mut server = Server::new_async().await;
1266        let oauth_mock = create_oauth_mock(&mut server).await;
1267        let config_mock = create_config_mock(&mut server).await;
1268
1269        let mut props = HashMap::new();
1270        props.insert("credential".to_string(), "client1:secret1".to_string());
1271
1272        let catalog = RestCatalog::new(
1273            RestCatalogConfig::builder()
1274                .uri(server.url())
1275                .props(props)
1276                .build(),
1277            Some(Arc::new(LocalFsStorageFactory)),
1278        );
1279
1280        let token = catalog.context().await.unwrap().client.token().await;
1281        oauth_mock.assert_async().await;
1282        config_mock.assert_async().await;
1283        assert_eq!(token, Some("ey000000000000".to_string()));
1284
1285        let oauth_mock =
1286            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1287                .await;
1288        catalog.invalidate_token().await.unwrap();
1289        let token = catalog.context().await.unwrap().client.token().await;
1290        oauth_mock.assert_async().await;
1291        assert_eq!(token, None);
1292    }
1293
1294    #[tokio::test]
1295    async fn test_regenerate_token() {
1296        let mut server = Server::new_async().await;
1297        let oauth_mock = create_oauth_mock(&mut server).await;
1298        let config_mock = create_config_mock(&mut server).await;
1299
1300        let mut props = HashMap::new();
1301        props.insert("credential".to_string(), "client1:secret1".to_string());
1302
1303        let catalog = RestCatalog::new(
1304            RestCatalogConfig::builder()
1305                .uri(server.url())
1306                .props(props)
1307                .build(),
1308            Some(Arc::new(LocalFsStorageFactory)),
1309        );
1310
1311        let token = catalog.context().await.unwrap().client.token().await;
1312        oauth_mock.assert_async().await;
1313        config_mock.assert_async().await;
1314        assert_eq!(token, Some("ey000000000000".to_string()));
1315
1316        let oauth_mock =
1317            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1318                .await;
1319        catalog.regenerate_token().await.unwrap();
1320        oauth_mock.assert_async().await;
1321        let token = catalog.context().await.unwrap().client.token().await;
1322        assert_eq!(token, Some("ey000000000001".to_string()));
1323    }
1324
1325    #[tokio::test]
1326    async fn test_regenerate_token_failing_request() {
1327        let mut server = Server::new_async().await;
1328        let oauth_mock = create_oauth_mock(&mut server).await;
1329        let config_mock = create_config_mock(&mut server).await;
1330
1331        let mut props = HashMap::new();
1332        props.insert("credential".to_string(), "client1:secret1".to_string());
1333
1334        let catalog = RestCatalog::new(
1335            RestCatalogConfig::builder()
1336                .uri(server.url())
1337                .props(props)
1338                .build(),
1339            Some(Arc::new(LocalFsStorageFactory)),
1340        );
1341
1342        let token = catalog.context().await.unwrap().client.token().await;
1343        oauth_mock.assert_async().await;
1344        config_mock.assert_async().await;
1345        assert_eq!(token, Some("ey000000000000".to_string()));
1346
1347        let oauth_mock =
1348            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1349                .await;
1350        let invalidate_result = catalog.regenerate_token().await;
1351        assert!(invalidate_result.is_err());
1352        oauth_mock.assert_async().await;
1353        let token = catalog.context().await.unwrap().client.token().await;
1354
1355        // original token is left intact
1356        assert_eq!(token, Some("ey000000000000".to_string()));
1357    }
1358
1359    #[tokio::test]
1360    async fn test_http_headers() {
1361        let server = Server::new_async().await;
1362        let mut props = HashMap::new();
1363        props.insert("credential".to_string(), "client1:secret1".to_string());
1364
1365        let config = RestCatalogConfig::builder()
1366            .uri(server.url())
1367            .props(props)
1368            .build();
1369        let headers: HeaderMap = config.extra_headers().unwrap();
1370
1371        let expected_headers = HeaderMap::from_iter([
1372            (
1373                header::CONTENT_TYPE,
1374                HeaderValue::from_static("application/json"),
1375            ),
1376            (
1377                HeaderName::from_static("x-client-version"),
1378                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1379            ),
1380            (
1381                header::USER_AGENT,
1382                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1383            ),
1384        ]);
1385        assert_eq!(headers, expected_headers);
1386    }
1387
1388    #[tokio::test]
1389    async fn test_http_headers_with_custom_headers() {
1390        let server = Server::new_async().await;
1391        let mut props = HashMap::new();
1392        props.insert("credential".to_string(), "client1:secret1".to_string());
1393        props.insert(
1394            "header.content-type".to_string(),
1395            "application/yaml".to_string(),
1396        );
1397        props.insert(
1398            "header.customized-header".to_string(),
1399            "some/value".to_string(),
1400        );
1401
1402        let config = RestCatalogConfig::builder()
1403            .uri(server.url())
1404            .props(props)
1405            .build();
1406        let headers: HeaderMap = config.extra_headers().unwrap();
1407
1408        let expected_headers = HeaderMap::from_iter([
1409            (
1410                header::CONTENT_TYPE,
1411                HeaderValue::from_static("application/yaml"),
1412            ),
1413            (
1414                HeaderName::from_static("x-client-version"),
1415                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1416            ),
1417            (
1418                header::USER_AGENT,
1419                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1420            ),
1421            (
1422                HeaderName::from_static("customized-header"),
1423                HeaderValue::from_static("some/value"),
1424            ),
1425        ]);
1426        assert_eq!(headers, expected_headers);
1427    }
1428
1429    #[tokio::test]
1430    async fn test_oauth_with_oauth2_server_uri() {
1431        let mut server = Server::new_async().await;
1432        let config_mock = create_config_mock(&mut server).await;
1433
1434        let mut auth_server = Server::new_async().await;
1435        let auth_server_path = "/some/path";
1436        let oauth_mock =
1437            create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1438                .await;
1439
1440        let mut props = HashMap::new();
1441        props.insert("credential".to_string(), "client1:secret1".to_string());
1442        props.insert(
1443            "oauth2-server-uri".to_string(),
1444            format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1445        );
1446
1447        let catalog = RestCatalog::new(
1448            RestCatalogConfig::builder()
1449                .uri(server.url())
1450                .props(props)
1451                .build(),
1452            Some(Arc::new(LocalFsStorageFactory)),
1453        );
1454
1455        let token = catalog.context().await.unwrap().client.token().await;
1456
1457        oauth_mock.assert_async().await;
1458        config_mock.assert_async().await;
1459        assert_eq!(token, Some("ey000000000000".to_string()));
1460    }
1461
1462    #[tokio::test]
1463    async fn test_config_override() {
1464        let mut server = Server::new_async().await;
1465        let mut redirect_server = Server::new_async().await;
1466        let new_uri = redirect_server.url();
1467
1468        let config_mock = server
1469            .mock("GET", "/v1/config")
1470            .with_status(200)
1471            .with_body(
1472                json!(
1473                    {
1474                        "overrides": {
1475                            "uri": new_uri,
1476                            "warehouse": "s3://iceberg-catalog",
1477                            "prefix": "ice/warehouses/my"
1478                        },
1479                        "defaults": {},
1480                    }
1481                )
1482                .to_string(),
1483            )
1484            .create_async()
1485            .await;
1486
1487        let list_ns_mock = redirect_server
1488            .mock("GET", "/v1/ice/warehouses/my/namespaces")
1489            .with_body(
1490                r#"{
1491                    "namespaces": []
1492                }"#,
1493            )
1494            .create_async()
1495            .await;
1496
1497        let catalog = RestCatalog::new(
1498            RestCatalogConfig::builder().uri(server.url()).build(),
1499            Some(Arc::new(LocalFsStorageFactory)),
1500        );
1501
1502        let _namespaces = catalog.list_namespaces(None).await.unwrap();
1503
1504        config_mock.assert_async().await;
1505        list_ns_mock.assert_async().await;
1506    }
1507
1508    #[tokio::test]
1509    async fn test_list_namespace() {
1510        let mut server = Server::new_async().await;
1511
1512        let config_mock = create_config_mock(&mut server).await;
1513
1514        let list_ns_mock = server
1515            .mock("GET", "/v1/namespaces")
1516            .with_body(
1517                r#"{
1518                "namespaces": [
1519                    ["ns1", "ns11"],
1520                    ["ns2"]
1521                ]
1522            }"#,
1523            )
1524            .create_async()
1525            .await;
1526
1527        let catalog = RestCatalog::new(
1528            RestCatalogConfig::builder().uri(server.url()).build(),
1529            Some(Arc::new(LocalFsStorageFactory)),
1530        );
1531
1532        let namespaces = catalog.list_namespaces(None).await.unwrap();
1533
1534        let expected_ns = vec![
1535            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1536            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1537        ];
1538
1539        assert_eq!(expected_ns, namespaces);
1540
1541        config_mock.assert_async().await;
1542        list_ns_mock.assert_async().await;
1543    }
1544
1545    #[tokio::test]
1546    async fn test_list_namespace_with_pagination() {
1547        let mut server = Server::new_async().await;
1548
1549        let config_mock = create_config_mock(&mut server).await;
1550
1551        let list_ns_mock_page1 = server
1552            .mock("GET", "/v1/namespaces")
1553            .with_body(
1554                r#"{
1555                "namespaces": [
1556                    ["ns1", "ns11"],
1557                    ["ns2"]
1558                ],
1559                "next-page-token": "token123"
1560            }"#,
1561            )
1562            .create_async()
1563            .await;
1564
1565        let list_ns_mock_page2 = server
1566            .mock("GET", "/v1/namespaces?pageToken=token123")
1567            .with_body(
1568                r#"{
1569                "namespaces": [
1570                    ["ns3"],
1571                    ["ns4", "ns41"]
1572                ]
1573            }"#,
1574            )
1575            .create_async()
1576            .await;
1577
1578        let catalog = RestCatalog::new(
1579            RestCatalogConfig::builder().uri(server.url()).build(),
1580            Some(Arc::new(LocalFsStorageFactory)),
1581        );
1582
1583        let namespaces = catalog.list_namespaces(None).await.unwrap();
1584
1585        let expected_ns = vec![
1586            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1587            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1588            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1589            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1590        ];
1591
1592        assert_eq!(expected_ns, namespaces);
1593
1594        config_mock.assert_async().await;
1595        list_ns_mock_page1.assert_async().await;
1596        list_ns_mock_page2.assert_async().await;
1597    }
1598
1599    #[tokio::test]
1600    async fn test_list_namespace_with_multiple_pages() {
1601        let mut server = Server::new_async().await;
1602
1603        let config_mock = create_config_mock(&mut server).await;
1604
1605        // Page 1
1606        let list_ns_mock_page1 = server
1607            .mock("GET", "/v1/namespaces")
1608            .with_body(
1609                r#"{
1610                "namespaces": [
1611                    ["ns1", "ns11"],
1612                    ["ns2"]
1613                ],
1614                "next-page-token": "page2"
1615            }"#,
1616            )
1617            .create_async()
1618            .await;
1619
1620        // Page 2
1621        let list_ns_mock_page2 = server
1622            .mock("GET", "/v1/namespaces?pageToken=page2")
1623            .with_body(
1624                r#"{
1625                "namespaces": [
1626                    ["ns3"],
1627                    ["ns4", "ns41"]
1628                ],
1629                "next-page-token": "page3"
1630            }"#,
1631            )
1632            .create_async()
1633            .await;
1634
1635        // Page 3
1636        let list_ns_mock_page3 = server
1637            .mock("GET", "/v1/namespaces?pageToken=page3")
1638            .with_body(
1639                r#"{
1640                "namespaces": [
1641                    ["ns5", "ns51", "ns511"]
1642                ],
1643                "next-page-token": "page4"
1644            }"#,
1645            )
1646            .create_async()
1647            .await;
1648
1649        // Page 4
1650        let list_ns_mock_page4 = server
1651            .mock("GET", "/v1/namespaces?pageToken=page4")
1652            .with_body(
1653                r#"{
1654                "namespaces": [
1655                    ["ns6"],
1656                    ["ns7"]
1657                ],
1658                "next-page-token": "page5"
1659            }"#,
1660            )
1661            .create_async()
1662            .await;
1663
1664        // Page 5 (final page)
1665        let list_ns_mock_page5 = server
1666            .mock("GET", "/v1/namespaces?pageToken=page5")
1667            .with_body(
1668                r#"{
1669                "namespaces": [
1670                    ["ns8", "ns81"]
1671                ]
1672            }"#,
1673            )
1674            .create_async()
1675            .await;
1676
1677        let catalog = RestCatalog::new(
1678            RestCatalogConfig::builder().uri(server.url()).build(),
1679            Some(Arc::new(LocalFsStorageFactory)),
1680        );
1681
1682        let namespaces = catalog.list_namespaces(None).await.unwrap();
1683
1684        let expected_ns = vec![
1685            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1686            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1687            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1688            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1689            NamespaceIdent::from_vec(vec![
1690                "ns5".to_string(),
1691                "ns51".to_string(),
1692                "ns511".to_string(),
1693            ])
1694            .unwrap(),
1695            NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1696            NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1697            NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1698        ];
1699
1700        assert_eq!(expected_ns, namespaces);
1701
1702        // Verify all page requests were made
1703        config_mock.assert_async().await;
1704        list_ns_mock_page1.assert_async().await;
1705        list_ns_mock_page2.assert_async().await;
1706        list_ns_mock_page3.assert_async().await;
1707        list_ns_mock_page4.assert_async().await;
1708        list_ns_mock_page5.assert_async().await;
1709    }
1710
1711    #[tokio::test]
1712    async fn test_create_namespace() {
1713        let mut server = Server::new_async().await;
1714
1715        let config_mock = create_config_mock(&mut server).await;
1716
1717        let create_ns_mock = server
1718            .mock("POST", "/v1/namespaces")
1719            .with_body(
1720                r#"{
1721                "namespace": [ "ns1", "ns11"],
1722                "properties" : {
1723                    "key1": "value1"
1724                }
1725            }"#,
1726            )
1727            .create_async()
1728            .await;
1729
1730        let catalog = RestCatalog::new(
1731            RestCatalogConfig::builder().uri(server.url()).build(),
1732            Some(Arc::new(LocalFsStorageFactory)),
1733        );
1734
1735        let namespaces = catalog
1736            .create_namespace(
1737                &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1738                HashMap::from([("key1".to_string(), "value1".to_string())]),
1739            )
1740            .await
1741            .unwrap();
1742
1743        let expected_ns = Namespace::with_properties(
1744            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1745            HashMap::from([("key1".to_string(), "value1".to_string())]),
1746        );
1747
1748        assert_eq!(expected_ns, namespaces);
1749
1750        config_mock.assert_async().await;
1751        create_ns_mock.assert_async().await;
1752    }
1753
1754    #[tokio::test]
1755    async fn test_get_namespace() {
1756        let mut server = Server::new_async().await;
1757
1758        let config_mock = create_config_mock(&mut server).await;
1759
1760        let get_ns_mock = server
1761            .mock("GET", "/v1/namespaces/ns1")
1762            .with_body(
1763                r#"{
1764                "namespace": [ "ns1"],
1765                "properties" : {
1766                    "key1": "value1"
1767                }
1768            }"#,
1769            )
1770            .create_async()
1771            .await;
1772
1773        let catalog = RestCatalog::new(
1774            RestCatalogConfig::builder().uri(server.url()).build(),
1775            Some(Arc::new(LocalFsStorageFactory)),
1776        );
1777
1778        let namespaces = catalog
1779            .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1780            .await
1781            .unwrap();
1782
1783        let expected_ns = Namespace::with_properties(
1784            NamespaceIdent::new("ns1".to_string()),
1785            HashMap::from([("key1".to_string(), "value1".to_string())]),
1786        );
1787
1788        assert_eq!(expected_ns, namespaces);
1789
1790        config_mock.assert_async().await;
1791        get_ns_mock.assert_async().await;
1792    }
1793
1794    #[tokio::test]
1795    async fn check_namespace_exists() {
1796        let mut server = Server::new_async().await;
1797
1798        let config_mock = create_config_mock(&mut server).await;
1799
1800        let get_ns_mock = server
1801            .mock("HEAD", "/v1/namespaces/ns1")
1802            .with_status(204)
1803            .create_async()
1804            .await;
1805
1806        let catalog = RestCatalog::new(
1807            RestCatalogConfig::builder().uri(server.url()).build(),
1808            Some(Arc::new(LocalFsStorageFactory)),
1809        );
1810
1811        assert!(
1812            catalog
1813                .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1814                .await
1815                .unwrap()
1816        );
1817
1818        config_mock.assert_async().await;
1819        get_ns_mock.assert_async().await;
1820    }
1821
1822    #[tokio::test]
1823    async fn test_drop_namespace() {
1824        let mut server = Server::new_async().await;
1825
1826        let config_mock = create_config_mock(&mut server).await;
1827
1828        let drop_ns_mock = server
1829            .mock("DELETE", "/v1/namespaces/ns1")
1830            .with_status(204)
1831            .create_async()
1832            .await;
1833
1834        let catalog = RestCatalog::new(
1835            RestCatalogConfig::builder().uri(server.url()).build(),
1836            Some(Arc::new(LocalFsStorageFactory)),
1837        );
1838
1839        catalog
1840            .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1841            .await
1842            .unwrap();
1843
1844        config_mock.assert_async().await;
1845        drop_ns_mock.assert_async().await;
1846    }
1847
1848    #[tokio::test]
1849    async fn test_list_tables() {
1850        let mut server = Server::new_async().await;
1851
1852        let config_mock = create_config_mock(&mut server).await;
1853
1854        let list_tables_mock = server
1855            .mock("GET", "/v1/namespaces/ns1/tables")
1856            .with_status(200)
1857            .with_body(
1858                r#"{
1859                "identifiers": [
1860                    {
1861                        "namespace": ["ns1"],
1862                        "name": "table1"
1863                    },
1864                    {
1865                        "namespace": ["ns1"],
1866                        "name": "table2"
1867                    }
1868                ]
1869            }"#,
1870            )
1871            .create_async()
1872            .await;
1873
1874        let catalog = RestCatalog::new(
1875            RestCatalogConfig::builder().uri(server.url()).build(),
1876            Some(Arc::new(LocalFsStorageFactory)),
1877        );
1878
1879        let tables = catalog
1880            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1881            .await
1882            .unwrap();
1883
1884        let expected_tables = vec![
1885            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1886            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1887        ];
1888
1889        assert_eq!(tables, expected_tables);
1890
1891        config_mock.assert_async().await;
1892        list_tables_mock.assert_async().await;
1893    }
1894
1895    #[tokio::test]
1896    async fn test_list_tables_with_pagination() {
1897        let mut server = Server::new_async().await;
1898
1899        let config_mock = create_config_mock(&mut server).await;
1900
1901        let list_tables_mock_page1 = server
1902            .mock("GET", "/v1/namespaces/ns1/tables")
1903            .with_status(200)
1904            .with_body(
1905                r#"{
1906                "identifiers": [
1907                    {
1908                        "namespace": ["ns1"],
1909                        "name": "table1"
1910                    },
1911                    {
1912                        "namespace": ["ns1"],
1913                        "name": "table2"
1914                    }
1915                ],
1916                "next-page-token": "token456"
1917            }"#,
1918            )
1919            .create_async()
1920            .await;
1921
1922        let list_tables_mock_page2 = server
1923            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1924            .with_status(200)
1925            .with_body(
1926                r#"{
1927                "identifiers": [
1928                    {
1929                        "namespace": ["ns1"],
1930                        "name": "table3"
1931                    },
1932                    {
1933                        "namespace": ["ns1"],
1934                        "name": "table4"
1935                    }
1936                ]
1937            }"#,
1938            )
1939            .create_async()
1940            .await;
1941
1942        let catalog = RestCatalog::new(
1943            RestCatalogConfig::builder().uri(server.url()).build(),
1944            Some(Arc::new(LocalFsStorageFactory)),
1945        );
1946
1947        let tables = catalog
1948            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1949            .await
1950            .unwrap();
1951
1952        let expected_tables = vec![
1953            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1954            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1955            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1956            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1957        ];
1958
1959        assert_eq!(tables, expected_tables);
1960
1961        config_mock.assert_async().await;
1962        list_tables_mock_page1.assert_async().await;
1963        list_tables_mock_page2.assert_async().await;
1964    }
1965
1966    #[tokio::test]
1967    async fn test_list_tables_with_multiple_pages() {
1968        let mut server = Server::new_async().await;
1969
1970        let config_mock = create_config_mock(&mut server).await;
1971
1972        // Page 1
1973        let list_tables_mock_page1 = server
1974            .mock("GET", "/v1/namespaces/ns1/tables")
1975            .with_status(200)
1976            .with_body(
1977                r#"{
1978                "identifiers": [
1979                    {
1980                        "namespace": ["ns1"],
1981                        "name": "table1"
1982                    },
1983                    {
1984                        "namespace": ["ns1"],
1985                        "name": "table2"
1986                    }
1987                ],
1988                "next-page-token": "page2"
1989            }"#,
1990            )
1991            .create_async()
1992            .await;
1993
1994        // Page 2
1995        let list_tables_mock_page2 = server
1996            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1997            .with_status(200)
1998            .with_body(
1999                r#"{
2000                "identifiers": [
2001                    {
2002                        "namespace": ["ns1"],
2003                        "name": "table3"
2004                    },
2005                    {
2006                        "namespace": ["ns1"],
2007                        "name": "table4"
2008                    }
2009                ],
2010                "next-page-token": "page3"
2011            }"#,
2012            )
2013            .create_async()
2014            .await;
2015
2016        // Page 3
2017        let list_tables_mock_page3 = server
2018            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
2019            .with_status(200)
2020            .with_body(
2021                r#"{
2022                "identifiers": [
2023                    {
2024                        "namespace": ["ns1"],
2025                        "name": "table5"
2026                    }
2027                ],
2028                "next-page-token": "page4"
2029            }"#,
2030            )
2031            .create_async()
2032            .await;
2033
2034        // Page 4
2035        let list_tables_mock_page4 = server
2036            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
2037            .with_status(200)
2038            .with_body(
2039                r#"{
2040                "identifiers": [
2041                    {
2042                        "namespace": ["ns1"],
2043                        "name": "table6"
2044                    },
2045                    {
2046                        "namespace": ["ns1"],
2047                        "name": "table7"
2048                    }
2049                ],
2050                "next-page-token": "page5"
2051            }"#,
2052            )
2053            .create_async()
2054            .await;
2055
2056        // Page 5 (final page)
2057        let list_tables_mock_page5 = server
2058            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
2059            .with_status(200)
2060            .with_body(
2061                r#"{
2062                "identifiers": [
2063                    {
2064                        "namespace": ["ns1"],
2065                        "name": "table8"
2066                    }
2067                ]
2068            }"#,
2069            )
2070            .create_async()
2071            .await;
2072
2073        let catalog = RestCatalog::new(
2074            RestCatalogConfig::builder().uri(server.url()).build(),
2075            Some(Arc::new(LocalFsStorageFactory)),
2076        );
2077
2078        let tables = catalog
2079            .list_tables(&NamespaceIdent::new("ns1".to_string()))
2080            .await
2081            .unwrap();
2082
2083        let expected_tables = vec![
2084            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2085            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2086            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2087            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2088            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2089            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2090            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2091            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2092        ];
2093
2094        assert_eq!(tables, expected_tables);
2095
2096        // Verify all page requests were made
2097        config_mock.assert_async().await;
2098        list_tables_mock_page1.assert_async().await;
2099        list_tables_mock_page2.assert_async().await;
2100        list_tables_mock_page3.assert_async().await;
2101        list_tables_mock_page4.assert_async().await;
2102        list_tables_mock_page5.assert_async().await;
2103    }
2104
2105    #[tokio::test]
2106    async fn test_drop_tables() {
2107        let mut server = Server::new_async().await;
2108
2109        let config_mock = create_config_mock(&mut server).await;
2110
2111        let delete_table_mock = server
2112            .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2113            .with_status(204)
2114            .create_async()
2115            .await;
2116
2117        let catalog = RestCatalog::new(
2118            RestCatalogConfig::builder().uri(server.url()).build(),
2119            Some(Arc::new(LocalFsStorageFactory)),
2120        );
2121
2122        catalog
2123            .drop_table(&TableIdent::new(
2124                NamespaceIdent::new("ns1".to_string()),
2125                "table1".to_string(),
2126            ))
2127            .await
2128            .unwrap();
2129
2130        config_mock.assert_async().await;
2131        delete_table_mock.assert_async().await;
2132    }
2133
2134    #[tokio::test]
2135    async fn test_check_table_exists() {
2136        let mut server = Server::new_async().await;
2137
2138        let config_mock = create_config_mock(&mut server).await;
2139
2140        let check_table_exists_mock = server
2141            .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2142            .with_status(204)
2143            .create_async()
2144            .await;
2145
2146        let catalog = RestCatalog::new(
2147            RestCatalogConfig::builder().uri(server.url()).build(),
2148            Some(Arc::new(LocalFsStorageFactory)),
2149        );
2150
2151        assert!(
2152            catalog
2153                .table_exists(&TableIdent::new(
2154                    NamespaceIdent::new("ns1".to_string()),
2155                    "table1".to_string(),
2156                ))
2157                .await
2158                .unwrap()
2159        );
2160
2161        config_mock.assert_async().await;
2162        check_table_exists_mock.assert_async().await;
2163    }
2164
2165    #[tokio::test]
2166    async fn test_rename_table() {
2167        let mut server = Server::new_async().await;
2168
2169        let config_mock = create_config_mock(&mut server).await;
2170
2171        let rename_table_mock = server
2172            .mock("POST", "/v1/tables/rename")
2173            .with_status(204)
2174            .create_async()
2175            .await;
2176
2177        let catalog = RestCatalog::new(
2178            RestCatalogConfig::builder().uri(server.url()).build(),
2179            Some(Arc::new(LocalFsStorageFactory)),
2180        );
2181
2182        catalog
2183            .rename_table(
2184                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2185                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2186            )
2187            .await
2188            .unwrap();
2189
2190        config_mock.assert_async().await;
2191        rename_table_mock.assert_async().await;
2192    }
2193
2194    #[tokio::test]
2195    async fn test_load_table() {
2196        let mut server = Server::new_async().await;
2197
2198        let config_mock = create_config_mock(&mut server).await;
2199
2200        let rename_table_mock = server
2201            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2202            .with_status(200)
2203            .with_body_from_file(format!(
2204                "{}/testdata/{}",
2205                env!("CARGO_MANIFEST_DIR"),
2206                "load_table_response.json"
2207            ))
2208            .create_async()
2209            .await;
2210
2211        let catalog = RestCatalog::new(
2212            RestCatalogConfig::builder().uri(server.url()).build(),
2213            Some(Arc::new(LocalFsStorageFactory)),
2214        );
2215
2216        let table = catalog
2217            .load_table(&TableIdent::new(
2218                NamespaceIdent::new("ns1".to_string()),
2219                "test1".to_string(),
2220            ))
2221            .await
2222            .unwrap();
2223
2224        assert_eq!(
2225            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2226            table.identifier()
2227        );
2228        assert_eq!(
2229            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2230            table.metadata_location().unwrap()
2231        );
2232        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2233        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2234        assert_eq!(
2235            uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2236            table.metadata().uuid()
2237        );
2238        assert_eq!(
2239            Utc.timestamp_millis_opt(1646787054459).unwrap(),
2240            table.metadata().last_updated_timestamp().unwrap()
2241        );
2242        assert_eq!(
2243            vec![&Arc::new(
2244                Schema::builder()
2245                    .with_fields(vec![
2246                        NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2247                        NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2248                            .into(),
2249                    ])
2250                    .build()
2251                    .unwrap()
2252            )],
2253            table.metadata().schemas_iter().collect::<Vec<_>>()
2254        );
2255        assert_eq!(
2256            &HashMap::from([
2257                ("owner".to_string(), "bryan".to_string()),
2258                (
2259                    "write.metadata.compression-codec".to_string(),
2260                    "gzip".to_string()
2261                )
2262            ]),
2263            table.metadata().properties()
2264        );
2265        assert_eq!(vec![&Arc::new(Snapshot::builder()
2266            .with_snapshot_id(3497810964824022504)
2267            .with_timestamp_ms(1646787054459)
2268            .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2269            .with_sequence_number(0)
2270            .with_schema_id(0)
2271            .with_summary(Summary {
2272                operation: Operation::Append,
2273                additional_properties: HashMap::from_iter([
2274                    ("spark.app.id", "local-1646787004168"),
2275                    ("added-data-files", "1"),
2276                    ("added-records", "1"),
2277                    ("added-files-size", "697"),
2278                    ("changed-partition-count", "1"),
2279                    ("total-records", "1"),
2280                    ("total-files-size", "697"),
2281                    ("total-data-files", "1"),
2282                    ("total-delete-files", "0"),
2283                    ("total-position-deletes", "0"),
2284                    ("total-equality-deletes", "0")
2285                ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2286            }).build()
2287        )], table.metadata().snapshots().collect::<Vec<_>>());
2288        assert_eq!(
2289            &[SnapshotLog {
2290                timestamp_ms: 1646787054459,
2291                snapshot_id: 3497810964824022504,
2292            }],
2293            table.metadata().history()
2294        );
2295        assert_eq!(
2296            vec![&Arc::new(SortOrder {
2297                order_id: 0,
2298                fields: vec![],
2299            })],
2300            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2301        );
2302
2303        config_mock.assert_async().await;
2304        rename_table_mock.assert_async().await;
2305    }
2306
2307    #[tokio::test]
2308    async fn test_load_table_404() {
2309        let mut server = Server::new_async().await;
2310
2311        let config_mock = create_config_mock(&mut server).await;
2312
2313        let rename_table_mock = server
2314            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2315            .with_status(404)
2316            .with_body(r#"
2317{
2318    "error": {
2319        "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2320        "type": "NoSuchNamespaceErrorException",
2321        "code": 404
2322    }
2323}
2324            "#)
2325            .create_async()
2326            .await;
2327
2328        let catalog = RestCatalog::new(
2329            RestCatalogConfig::builder().uri(server.url()).build(),
2330            Some(Arc::new(LocalFsStorageFactory)),
2331        );
2332
2333        let table = catalog
2334            .load_table(&TableIdent::new(
2335                NamespaceIdent::new("ns1".to_string()),
2336                "test1".to_string(),
2337            ))
2338            .await;
2339
2340        assert!(table.is_err());
2341        assert!(table.err().unwrap().message().contains("does not exist"));
2342
2343        config_mock.assert_async().await;
2344        rename_table_mock.assert_async().await;
2345    }
2346
2347    #[tokio::test]
2348    async fn test_create_table() {
2349        let mut server = Server::new_async().await;
2350
2351        let config_mock = create_config_mock(&mut server).await;
2352
2353        let create_table_mock = server
2354            .mock("POST", "/v1/namespaces/ns1/tables")
2355            .with_status(200)
2356            .with_body_from_file(format!(
2357                "{}/testdata/{}",
2358                env!("CARGO_MANIFEST_DIR"),
2359                "create_table_response.json"
2360            ))
2361            .create_async()
2362            .await;
2363
2364        let catalog = RestCatalog::new(
2365            RestCatalogConfig::builder().uri(server.url()).build(),
2366            Some(Arc::new(LocalFsStorageFactory)),
2367        );
2368
2369        let table_creation = TableCreation::builder()
2370            .name("test1".to_string())
2371            .schema(
2372                Schema::builder()
2373                    .with_fields(vec![
2374                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2375                            .into(),
2376                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2377                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2378                            .into(),
2379                    ])
2380                    .with_schema_id(1)
2381                    .with_identifier_field_ids(vec![2])
2382                    .build()
2383                    .unwrap(),
2384            )
2385            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2386            .partition_spec(
2387                UnboundPartitionSpec::builder()
2388                    .add_partition_fields(vec![
2389                        UnboundPartitionField::builder()
2390                            .source_id(1)
2391                            .transform(Transform::Truncate(3))
2392                            .name("id".to_string())
2393                            .build(),
2394                    ])
2395                    .unwrap()
2396                    .build(),
2397            )
2398            .sort_order(
2399                SortOrder::builder()
2400                    .with_sort_field(
2401                        SortField::builder()
2402                            .source_id(2)
2403                            .transform(Transform::Identity)
2404                            .direction(SortDirection::Ascending)
2405                            .null_order(NullOrder::First)
2406                            .build(),
2407                    )
2408                    .build_unbound()
2409                    .unwrap(),
2410            )
2411            .build();
2412
2413        let table = catalog
2414            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2415            .await
2416            .unwrap();
2417
2418        assert_eq!(
2419            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2420            table.identifier()
2421        );
2422        assert_eq!(
2423            "s3://warehouse/database/table/metadata.json",
2424            table.metadata_location().unwrap()
2425        );
2426        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2427        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2428        assert_eq!(
2429            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2430            table.metadata().uuid()
2431        );
2432        assert_eq!(
2433            1657810967051,
2434            table
2435                .metadata()
2436                .last_updated_timestamp()
2437                .unwrap()
2438                .timestamp_millis()
2439        );
2440        assert_eq!(
2441            vec![&Arc::new(
2442                Schema::builder()
2443                    .with_fields(vec![
2444                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2445                            .into(),
2446                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2447                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2448                            .into(),
2449                    ])
2450                    .with_schema_id(0)
2451                    .with_identifier_field_ids(vec![2])
2452                    .build()
2453                    .unwrap()
2454            )],
2455            table.metadata().schemas_iter().collect::<Vec<_>>()
2456        );
2457        assert_eq!(
2458            &HashMap::from([
2459                (
2460                    "write.delete.parquet.compression-codec".to_string(),
2461                    "zstd".to_string()
2462                ),
2463                (
2464                    "write.metadata.compression-codec".to_string(),
2465                    "gzip".to_string()
2466                ),
2467                (
2468                    "write.summary.partition-limit".to_string(),
2469                    "100".to_string()
2470                ),
2471                (
2472                    "write.parquet.compression-codec".to_string(),
2473                    "zstd".to_string()
2474                ),
2475            ]),
2476            table.metadata().properties()
2477        );
2478        assert!(table.metadata().current_snapshot().is_none());
2479        assert!(table.metadata().history().is_empty());
2480        assert_eq!(
2481            vec![&Arc::new(SortOrder {
2482                order_id: 0,
2483                fields: vec![],
2484            })],
2485            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2486        );
2487
2488        config_mock.assert_async().await;
2489        create_table_mock.assert_async().await;
2490    }
2491
2492    #[tokio::test]
2493    async fn test_create_table_409() {
2494        let mut server = Server::new_async().await;
2495
2496        let config_mock = create_config_mock(&mut server).await;
2497
2498        let create_table_mock = server
2499            .mock("POST", "/v1/namespaces/ns1/tables")
2500            .with_status(409)
2501            .with_body(r#"
2502{
2503    "error": {
2504        "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2505        "type": "AlreadyExistsException",
2506        "code": 409
2507    }
2508}
2509            "#)
2510            .create_async()
2511            .await;
2512
2513        let catalog = RestCatalog::new(
2514            RestCatalogConfig::builder().uri(server.url()).build(),
2515            Some(Arc::new(LocalFsStorageFactory)),
2516        );
2517
2518        let table_creation = TableCreation::builder()
2519            .name("test1".to_string())
2520            .schema(
2521                Schema::builder()
2522                    .with_fields(vec![
2523                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2524                            .into(),
2525                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2526                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2527                            .into(),
2528                    ])
2529                    .with_schema_id(1)
2530                    .with_identifier_field_ids(vec![2])
2531                    .build()
2532                    .unwrap(),
2533            )
2534            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2535            .build();
2536
2537        let table_result = catalog
2538            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2539            .await;
2540
2541        assert!(table_result.is_err());
2542        assert!(
2543            table_result
2544                .err()
2545                .unwrap()
2546                .message()
2547                .contains("already exists")
2548        );
2549
2550        config_mock.assert_async().await;
2551        create_table_mock.assert_async().await;
2552    }
2553
2554    #[tokio::test]
2555    async fn test_update_table() {
2556        let mut server = Server::new_async().await;
2557
2558        let config_mock = create_config_mock(&mut server).await;
2559
2560        let load_table_mock = server
2561            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2562            .with_status(200)
2563            .with_body_from_file(format!(
2564                "{}/testdata/{}",
2565                env!("CARGO_MANIFEST_DIR"),
2566                "load_table_response.json"
2567            ))
2568            .create_async()
2569            .await;
2570
2571        let update_table_mock = server
2572            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2573            .with_status(200)
2574            .with_body_from_file(format!(
2575                "{}/testdata/{}",
2576                env!("CARGO_MANIFEST_DIR"),
2577                "update_table_response.json"
2578            ))
2579            .create_async()
2580            .await;
2581
2582        let catalog = RestCatalog::new(
2583            RestCatalogConfig::builder().uri(server.url()).build(),
2584            Some(Arc::new(LocalFsStorageFactory)),
2585        );
2586
2587        let table1 = {
2588            let file = File::open(format!(
2589                "{}/testdata/{}",
2590                env!("CARGO_MANIFEST_DIR"),
2591                "create_table_response.json"
2592            ))
2593            .unwrap();
2594            let reader = BufReader::new(file);
2595            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2596
2597            Table::builder()
2598                .metadata(resp.metadata)
2599                .metadata_location(resp.metadata_location.unwrap())
2600                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2601                .file_io(FileIO::new_with_fs())
2602                .build()
2603                .unwrap()
2604        };
2605
2606        let tx = Transaction::new(&table1);
2607        let table = tx
2608            .upgrade_table_version()
2609            .set_format_version(FormatVersion::V2)
2610            .apply(tx)
2611            .unwrap()
2612            .commit(&catalog)
2613            .await
2614            .unwrap();
2615
2616        assert_eq!(
2617            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2618            table.identifier()
2619        );
2620        assert_eq!(
2621            "s3://warehouse/database/table/metadata.json",
2622            table.metadata_location().unwrap()
2623        );
2624        assert_eq!(FormatVersion::V2, table.metadata().format_version());
2625        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2626        assert_eq!(
2627            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2628            table.metadata().uuid()
2629        );
2630        assert_eq!(
2631            1657810967051,
2632            table
2633                .metadata()
2634                .last_updated_timestamp()
2635                .unwrap()
2636                .timestamp_millis()
2637        );
2638        assert_eq!(
2639            vec![&Arc::new(
2640                Schema::builder()
2641                    .with_fields(vec![
2642                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2643                            .into(),
2644                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2645                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2646                            .into(),
2647                    ])
2648                    .with_schema_id(0)
2649                    .with_identifier_field_ids(vec![2])
2650                    .build()
2651                    .unwrap()
2652            )],
2653            table.metadata().schemas_iter().collect::<Vec<_>>()
2654        );
2655        assert_eq!(
2656            &HashMap::from([
2657                (
2658                    "write.delete.parquet.compression-codec".to_string(),
2659                    "zstd".to_string()
2660                ),
2661                (
2662                    "write.metadata.compression-codec".to_string(),
2663                    "gzip".to_string()
2664                ),
2665                (
2666                    "write.summary.partition-limit".to_string(),
2667                    "100".to_string()
2668                ),
2669                (
2670                    "write.parquet.compression-codec".to_string(),
2671                    "zstd".to_string()
2672                ),
2673            ]),
2674            table.metadata().properties()
2675        );
2676        assert!(table.metadata().current_snapshot().is_none());
2677        assert!(table.metadata().history().is_empty());
2678        assert_eq!(
2679            vec![&Arc::new(SortOrder {
2680                order_id: 0,
2681                fields: vec![],
2682            })],
2683            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2684        );
2685
2686        config_mock.assert_async().await;
2687        update_table_mock.assert_async().await;
2688        load_table_mock.assert_async().await
2689    }
2690
2691    #[tokio::test]
2692    async fn test_update_table_404() {
2693        let mut server = Server::new_async().await;
2694
2695        let config_mock = create_config_mock(&mut server).await;
2696
2697        let load_table_mock = server
2698            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2699            .with_status(200)
2700            .with_body_from_file(format!(
2701                "{}/testdata/{}",
2702                env!("CARGO_MANIFEST_DIR"),
2703                "load_table_response.json"
2704            ))
2705            .create_async()
2706            .await;
2707
2708        let update_table_mock = server
2709            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2710            .with_status(404)
2711            .with_body(
2712                r#"
2713{
2714    "error": {
2715        "message": "The given table does not exist",
2716        "type": "NoSuchTableException",
2717        "code": 404
2718    }
2719}
2720            "#,
2721            )
2722            .create_async()
2723            .await;
2724
2725        let catalog = RestCatalog::new(
2726            RestCatalogConfig::builder().uri(server.url()).build(),
2727            Some(Arc::new(LocalFsStorageFactory)),
2728        );
2729
2730        let table1 = {
2731            let file = File::open(format!(
2732                "{}/testdata/{}",
2733                env!("CARGO_MANIFEST_DIR"),
2734                "create_table_response.json"
2735            ))
2736            .unwrap();
2737            let reader = BufReader::new(file);
2738            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2739
2740            Table::builder()
2741                .metadata(resp.metadata)
2742                .metadata_location(resp.metadata_location.unwrap())
2743                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2744                .file_io(FileIO::new_with_fs())
2745                .build()
2746                .unwrap()
2747        };
2748
2749        let tx = Transaction::new(&table1);
2750        let table_result = tx
2751            .upgrade_table_version()
2752            .set_format_version(FormatVersion::V2)
2753            .apply(tx)
2754            .unwrap()
2755            .commit(&catalog)
2756            .await;
2757
2758        assert!(table_result.is_err());
2759        assert!(
2760            table_result
2761                .err()
2762                .unwrap()
2763                .message()
2764                .contains("does not exist")
2765        );
2766
2767        config_mock.assert_async().await;
2768        update_table_mock.assert_async().await;
2769        load_table_mock.assert_async().await;
2770    }
2771
2772    #[tokio::test]
2773    async fn test_register_table() {
2774        let mut server = Server::new_async().await;
2775
2776        let config_mock = create_config_mock(&mut server).await;
2777
2778        let register_table_mock = server
2779            .mock("POST", "/v1/namespaces/ns1/register")
2780            .with_status(200)
2781            .with_body_from_file(format!(
2782                "{}/testdata/{}",
2783                env!("CARGO_MANIFEST_DIR"),
2784                "load_table_response.json"
2785            ))
2786            .create_async()
2787            .await;
2788
2789        let catalog = RestCatalog::new(
2790            RestCatalogConfig::builder().uri(server.url()).build(),
2791            Some(Arc::new(LocalFsStorageFactory)),
2792        );
2793        let table_ident =
2794            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2795        let metadata_location = String::from(
2796            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2797        );
2798
2799        let table = catalog
2800            .register_table(&table_ident, metadata_location)
2801            .await
2802            .unwrap();
2803
2804        assert_eq!(
2805            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2806            table.identifier()
2807        );
2808        assert_eq!(
2809            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2810            table.metadata_location().unwrap()
2811        );
2812
2813        config_mock.assert_async().await;
2814        register_table_mock.assert_async().await;
2815    }
2816
2817    #[tokio::test]
2818    async fn test_register_table_404() {
2819        let mut server = Server::new_async().await;
2820
2821        let config_mock = create_config_mock(&mut server).await;
2822
2823        let register_table_mock = server
2824            .mock("POST", "/v1/namespaces/ns1/register")
2825            .with_status(404)
2826            .with_body(
2827                r#"
2828{
2829    "error": {
2830        "message": "The namespace specified does not exist",
2831        "type": "NoSuchNamespaceErrorException",
2832        "code": 404
2833    }
2834}
2835            "#,
2836            )
2837            .create_async()
2838            .await;
2839
2840        let catalog = RestCatalog::new(
2841            RestCatalogConfig::builder().uri(server.url()).build(),
2842            Some(Arc::new(LocalFsStorageFactory)),
2843        );
2844
2845        let table_ident =
2846            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2847        let metadata_location = String::from(
2848            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2849        );
2850        let table = catalog
2851            .register_table(&table_ident, metadata_location)
2852            .await;
2853
2854        assert!(table.is_err());
2855        assert!(table.err().unwrap().message().contains("does not exist"));
2856
2857        config_mock.assert_async().await;
2858        register_table_mock.assert_async().await;
2859    }
2860
2861    #[tokio::test]
2862    async fn test_create_rest_catalog() {
2863        let builder = RestCatalogBuilder::default().with_client(Client::new());
2864
2865        let catalog = builder
2866            .load(
2867                "test",
2868                HashMap::from([
2869                    (
2870                        REST_CATALOG_PROP_URI.to_string(),
2871                        "http://localhost:8080".to_string(),
2872                    ),
2873                    ("a".to_string(), "b".to_string()),
2874                ]),
2875            )
2876            .await;
2877
2878        assert!(catalog.is_ok());
2879
2880        let catalog_config = catalog.unwrap().user_config;
2881        assert_eq!(catalog_config.name.as_deref(), Some("test"));
2882        assert_eq!(catalog_config.uri, "http://localhost:8080");
2883        assert_eq!(catalog_config.warehouse, None);
2884        assert!(catalog_config.client.is_some());
2885
2886        assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2887        assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2888    }
2889
2890    #[tokio::test]
2891    async fn test_create_rest_catalog_no_uri() {
2892        let builder = RestCatalogBuilder::default();
2893
2894        let catalog = builder
2895            .load(
2896                "test",
2897                HashMap::from([(
2898                    REST_CATALOG_PROP_WAREHOUSE.to_string(),
2899                    "s3://warehouse".to_string(),
2900                )]),
2901            )
2902            .await;
2903
2904        assert!(catalog.is_err());
2905        if let Err(err) = catalog {
2906            assert_eq!(err.kind(), ErrorKind::DataInvalid);
2907            assert_eq!(err.message(), "Catalog uri is required");
2908        }
2909    }
2910}