iceberg_catalog_rest/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains the iceberg REST catalog implementation.
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::future::Future;
23use std::str::FromStr;
24
25use async_trait::async_trait;
26use iceberg::io::{self, FileIO};
27use iceberg::table::Table;
28use iceberg::{
29    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30    TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34    HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41    HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44    CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45    CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46    NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49/// REST catalog URI
50pub const REST_CATALOG_PROP_URI: &str = "uri";
51/// REST catalog warehouse location
52pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53/// Disable header redaction in error logs (defaults to false for security)
54pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60/// Builder for [`RestCatalog`].
61#[derive(Debug)]
62pub struct RestCatalogBuilder(RestCatalogConfig);
63
64impl Default for RestCatalogBuilder {
65    fn default() -> Self {
66        Self(RestCatalogConfig {
67            name: None,
68            uri: "".to_string(),
69            warehouse: None,
70            props: HashMap::new(),
71            client: None,
72        })
73    }
74}
75
76impl CatalogBuilder for RestCatalogBuilder {
77    type C = RestCatalog;
78
79    fn load(
80        mut self,
81        name: impl Into<String>,
82        props: HashMap<String, String>,
83    ) -> impl Future<Output = Result<Self::C>> + Send {
84        self.0.name = Some(name.into());
85
86        if props.contains_key(REST_CATALOG_PROP_URI) {
87            self.0.uri = props
88                .get(REST_CATALOG_PROP_URI)
89                .cloned()
90                .unwrap_or_default();
91        }
92
93        if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
94            self.0.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
95        }
96
97        // Collect other remaining properties
98        self.0.props = props
99            .into_iter()
100            .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
101            .collect();
102
103        let result = {
104            if self.0.name.is_none() {
105                Err(Error::new(
106                    ErrorKind::DataInvalid,
107                    "Catalog name is required",
108                ))
109            } else if self.0.uri.is_empty() {
110                Err(Error::new(
111                    ErrorKind::DataInvalid,
112                    "Catalog uri is required",
113                ))
114            } else {
115                Ok(RestCatalog::new(self.0))
116            }
117        };
118
119        std::future::ready(result)
120    }
121}
122
123impl RestCatalogBuilder {
124    /// Configures the catalog with a custom HTTP client.
125    pub fn with_client(mut self, client: Client) -> Self {
126        self.0.client = Some(client);
127        self
128    }
129}
130
131/// Rest catalog configuration.
132#[derive(Clone, Debug, TypedBuilder)]
133pub(crate) struct RestCatalogConfig {
134    #[builder(default, setter(strip_option))]
135    name: Option<String>,
136
137    uri: String,
138
139    #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
140    warehouse: Option<String>,
141
142    #[builder(default)]
143    props: HashMap<String, String>,
144
145    #[builder(default)]
146    client: Option<Client>,
147}
148
149impl RestCatalogConfig {
150    fn url_prefixed(&self, parts: &[&str]) -> String {
151        [&self.uri, PATH_V1]
152            .into_iter()
153            .chain(self.props.get("prefix").map(|s| &**s))
154            .chain(parts.iter().cloned())
155            .join("/")
156    }
157
158    fn config_endpoint(&self) -> String {
159        [&self.uri, PATH_V1, "config"].join("/")
160    }
161
162    pub(crate) fn get_token_endpoint(&self) -> String {
163        if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
164            oauth2_uri.to_string()
165        } else {
166            [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
167        }
168    }
169
170    fn namespaces_endpoint(&self) -> String {
171        self.url_prefixed(&["namespaces"])
172    }
173
174    fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
175        self.url_prefixed(&["namespaces", &ns.to_url_string()])
176    }
177
178    fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
179        self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
180    }
181
182    fn rename_table_endpoint(&self) -> String {
183        self.url_prefixed(&["tables", "rename"])
184    }
185
186    fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
187        self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
188    }
189
190    fn table_endpoint(&self, table: &TableIdent) -> String {
191        self.url_prefixed(&[
192            "namespaces",
193            &table.namespace.to_url_string(),
194            "tables",
195            &table.name,
196        ])
197    }
198
199    /// Get the client from the config.
200    pub(crate) fn client(&self) -> Option<Client> {
201        self.client.clone()
202    }
203
204    /// Get the token from the config.
205    ///
206    /// The client can use this token to send requests.
207    pub(crate) fn token(&self) -> Option<String> {
208        self.props.get("token").cloned()
209    }
210
211    /// Get the credentials from the config. The client can use these credentials to fetch a new
212    /// token.
213    ///
214    /// ## Output
215    ///
216    /// - `None`: No credential is set.
217    /// - `Some(None, client_secret)`: No client_id is set, use client_secret directly.
218    /// - `Some(Some(client_id), client_secret)`: Both client_id and client_secret are set.
219    pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
220        let cred = self.props.get("credential")?;
221
222        match cred.split_once(':') {
223            Some((client_id, client_secret)) => {
224                Some((Some(client_id.to_string()), client_secret.to_string()))
225            }
226            None => Some((None, cred.to_string())),
227        }
228    }
229
230    /// Get the extra headers from config, which includes:
231    ///
232    /// - `content-type`
233    /// - `x-client-version`
234    /// - `user-agent`
235    /// - All headers specified by `header.xxx` in props.
236    pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
237        let mut headers = HeaderMap::from_iter([
238            (
239                header::CONTENT_TYPE,
240                HeaderValue::from_static("application/json"),
241            ),
242            (
243                HeaderName::from_static("x-client-version"),
244                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
245            ),
246            (
247                header::USER_AGENT,
248                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
249            ),
250        ]);
251
252        for (key, value) in self
253            .props
254            .iter()
255            .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
256        {
257            headers.insert(
258                HeaderName::from_str(key).map_err(|e| {
259                    Error::new(
260                        ErrorKind::DataInvalid,
261                        format!("Invalid header name: {key}"),
262                    )
263                    .with_source(e)
264                })?,
265                HeaderValue::from_str(value).map_err(|e| {
266                    Error::new(
267                        ErrorKind::DataInvalid,
268                        format!("Invalid header value: {value}"),
269                    )
270                    .with_source(e)
271                })?,
272            );
273        }
274
275        Ok(headers)
276    }
277
278    /// Get the optional OAuth headers from the config.
279    pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
280        let mut params = HashMap::new();
281
282        if let Some(scope) = self.props.get("scope") {
283            params.insert("scope".to_string(), scope.to_string());
284        } else {
285            params.insert("scope".to_string(), "catalog".to_string());
286        }
287
288        let optional_params = ["audience", "resource"];
289        for param_name in optional_params {
290            if let Some(value) = self.props.get(param_name) {
291                params.insert(param_name.to_string(), value.to_string());
292            }
293        }
294
295        params
296    }
297
298    /// Check if header redaction is disabled in error logs.
299    ///
300    /// Returns true if the `disable-header-redaction` property is set to "true".
301    /// Defaults to false for security (headers are redacted by default).
302    pub(crate) fn disable_header_redaction(&self) -> bool {
303        self.props
304            .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
305            .map(|v| v.eq_ignore_ascii_case("true"))
306            .unwrap_or(false)
307    }
308
309    /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server).
310    pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
311        if let Some(uri) = config.overrides.remove("uri") {
312            self.uri = uri;
313        }
314
315        let mut props = config.defaults;
316        props.extend(self.props);
317        props.extend(config.overrides);
318
319        self.props = props;
320        self
321    }
322}
323
324#[derive(Debug)]
325struct RestContext {
326    client: HttpClient,
327    /// Runtime config is fetched from rest server and stored here.
328    ///
329    /// It's could be different from the user config.
330    config: RestCatalogConfig,
331}
332
333/// Rest catalog implementation.
334#[derive(Debug)]
335pub struct RestCatalog {
336    /// User config is stored as-is and never be changed.
337    ///
338    /// It could be different from the config fetched from the server and used at runtime.
339    user_config: RestCatalogConfig,
340    ctx: OnceCell<RestContext>,
341    /// Extensions for the FileIOBuilder.
342    file_io_extensions: io::Extensions,
343}
344
345impl RestCatalog {
346    /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
347    fn new(config: RestCatalogConfig) -> Self {
348        Self {
349            user_config: config,
350            ctx: OnceCell::new(),
351            file_io_extensions: io::Extensions::default(),
352        }
353    }
354
355    /// Add an extension to the file IO builder.
356    pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
357        self.file_io_extensions.add(ext);
358        self
359    }
360
361    /// Gets the [`RestContext`] from the catalog.
362    async fn context(&self) -> Result<&RestContext> {
363        self.ctx
364            .get_or_try_init(|| async {
365                let client = HttpClient::new(&self.user_config)?;
366                let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
367                let config = self.user_config.clone().merge_with_config(catalog_config);
368                let client = client.update_with(&config)?;
369
370                Ok(RestContext { config, client })
371            })
372            .await
373    }
374
375    /// Load the runtime config from the server by `user_config`.
376    ///
377    /// It's required for a REST catalog to update its config after creation.
378    async fn load_config(
379        client: &HttpClient,
380        user_config: &RestCatalogConfig,
381    ) -> Result<CatalogConfig> {
382        let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
383
384        if let Some(warehouse_location) = &user_config.warehouse {
385            request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
386        }
387
388        let request = request_builder.build()?;
389
390        let http_response = client.query_catalog(request).await?;
391
392        match http_response.status() {
393            StatusCode::OK => deserialize_catalog_response(http_response).await,
394            _ => Err(deserialize_unexpected_catalog_error(
395                http_response,
396                client.disable_header_redaction(),
397            )
398            .await),
399        }
400    }
401
402    async fn load_file_io(
403        &self,
404        metadata_location: Option<&str>,
405        extra_config: Option<HashMap<String, String>>,
406    ) -> Result<FileIO> {
407        let mut props = self.context().await?.config.props.clone();
408        if let Some(config) = extra_config {
409            props.extend(config);
410        }
411
412        // If the warehouse is a logical identifier instead of a URL we don't want
413        // to raise an exception
414        let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
415            Some(url) if Url::parse(url).is_ok() => Some(url),
416            Some(_) => None,
417            None => None,
418        };
419
420        let file_io = match metadata_location.or(warehouse_path) {
421            Some(url) => FileIO::from_path(url)?
422                .with_props(props)
423                .with_extensions(self.file_io_extensions.clone())
424                .build()?,
425            None => {
426                return Err(Error::new(
427                    ErrorKind::Unexpected,
428                    "Unable to load file io, neither warehouse nor metadata location is set!",
429                ))?;
430            }
431        };
432
433        Ok(file_io)
434    }
435
436    /// Invalidate the current token without generating a new one. On the next request, the client
437    /// will attempt to generate a new token.
438    pub async fn invalidate_token(&self) -> Result<()> {
439        self.context().await?.client.invalidate_token().await
440    }
441
442    /// Invalidate the current token and set a new one. Generates a new token before invalidating
443    /// the current token, meaning the old token will be used until this function acquires the lock
444    /// and overwrites the token.
445    ///
446    /// If credential is invalid, or the request fails, this method will return an error and leave
447    /// the current token unchanged.
448    pub async fn regenerate_token(&self) -> Result<()> {
449        self.context().await?.client.regenerate_token().await
450    }
451}
452
453/// All requests and expected responses are derived from the REST catalog API spec:
454/// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
455#[async_trait]
456impl Catalog for RestCatalog {
457    async fn list_namespaces(
458        &self,
459        parent: Option<&NamespaceIdent>,
460    ) -> Result<Vec<NamespaceIdent>> {
461        let context = self.context().await?;
462        let endpoint = context.config.namespaces_endpoint();
463        let mut namespaces = Vec::new();
464        let mut next_token = None;
465
466        loop {
467            let mut request = context.client.request(Method::GET, endpoint.clone());
468
469            // Filter on `parent={namespace}` if a parent namespace exists.
470            if let Some(ns) = parent {
471                request = request.query(&[("parent", ns.to_url_string())]);
472            }
473
474            if let Some(token) = next_token {
475                request = request.query(&[("pageToken", token)]);
476            }
477
478            let http_response = context.client.query_catalog(request.build()?).await?;
479
480            match http_response.status() {
481                StatusCode::OK => {
482                    let response =
483                        deserialize_catalog_response::<ListNamespaceResponse>(http_response)
484                            .await?;
485
486                    namespaces.extend(response.namespaces);
487
488                    match response.next_page_token {
489                        Some(token) => next_token = Some(token),
490                        None => break,
491                    }
492                }
493                StatusCode::NOT_FOUND => {
494                    return Err(Error::new(
495                        ErrorKind::Unexpected,
496                        "The parent parameter of the namespace provided does not exist",
497                    ));
498                }
499                _ => {
500                    return Err(deserialize_unexpected_catalog_error(
501                        http_response,
502                        context.client.disable_header_redaction(),
503                    )
504                    .await);
505                }
506            }
507        }
508
509        Ok(namespaces)
510    }
511
512    async fn create_namespace(
513        &self,
514        namespace: &NamespaceIdent,
515        properties: HashMap<String, String>,
516    ) -> Result<Namespace> {
517        let context = self.context().await?;
518
519        let request = context
520            .client
521            .request(Method::POST, context.config.namespaces_endpoint())
522            .json(&CreateNamespaceRequest {
523                namespace: namespace.clone(),
524                properties,
525            })
526            .build()?;
527
528        let http_response = context.client.query_catalog(request).await?;
529
530        match http_response.status() {
531            StatusCode::OK => {
532                let response =
533                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
534                Ok(Namespace::from(response))
535            }
536            StatusCode::CONFLICT => Err(Error::new(
537                ErrorKind::Unexpected,
538                "Tried to create a namespace that already exists",
539            )),
540            _ => Err(deserialize_unexpected_catalog_error(
541                http_response,
542                context.client.disable_header_redaction(),
543            )
544            .await),
545        }
546    }
547
548    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
549        let context = self.context().await?;
550
551        let request = context
552            .client
553            .request(Method::GET, context.config.namespace_endpoint(namespace))
554            .build()?;
555
556        let http_response = context.client.query_catalog(request).await?;
557
558        match http_response.status() {
559            StatusCode::OK => {
560                let response =
561                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
562                Ok(Namespace::from(response))
563            }
564            StatusCode::NOT_FOUND => Err(Error::new(
565                ErrorKind::Unexpected,
566                "Tried to get a namespace that does not exist",
567            )),
568            _ => Err(deserialize_unexpected_catalog_error(
569                http_response,
570                context.client.disable_header_redaction(),
571            )
572            .await),
573        }
574    }
575
576    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
577        let context = self.context().await?;
578
579        let request = context
580            .client
581            .request(Method::HEAD, context.config.namespace_endpoint(ns))
582            .build()?;
583
584        let http_response = context.client.query_catalog(request).await?;
585
586        match http_response.status() {
587            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
588            StatusCode::NOT_FOUND => Ok(false),
589            _ => Err(deserialize_unexpected_catalog_error(
590                http_response,
591                context.client.disable_header_redaction(),
592            )
593            .await),
594        }
595    }
596
597    async fn update_namespace(
598        &self,
599        _namespace: &NamespaceIdent,
600        _properties: HashMap<String, String>,
601    ) -> Result<()> {
602        Err(Error::new(
603            ErrorKind::FeatureUnsupported,
604            "Updating namespace not supported yet!",
605        ))
606    }
607
608    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
609        let context = self.context().await?;
610
611        let request = context
612            .client
613            .request(Method::DELETE, context.config.namespace_endpoint(namespace))
614            .build()?;
615
616        let http_response = context.client.query_catalog(request).await?;
617
618        match http_response.status() {
619            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
620            StatusCode::NOT_FOUND => Err(Error::new(
621                ErrorKind::Unexpected,
622                "Tried to drop a namespace that does not exist",
623            )),
624            _ => Err(deserialize_unexpected_catalog_error(
625                http_response,
626                context.client.disable_header_redaction(),
627            )
628            .await),
629        }
630    }
631
632    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
633        let context = self.context().await?;
634        let endpoint = context.config.tables_endpoint(namespace);
635        let mut identifiers = Vec::new();
636        let mut next_token = None;
637
638        loop {
639            let mut request = context.client.request(Method::GET, endpoint.clone());
640
641            if let Some(token) = next_token {
642                request = request.query(&[("pageToken", token)]);
643            }
644
645            let http_response = context.client.query_catalog(request.build()?).await?;
646
647            match http_response.status() {
648                StatusCode::OK => {
649                    let response =
650                        deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
651
652                    identifiers.extend(response.identifiers);
653
654                    match response.next_page_token {
655                        Some(token) => next_token = Some(token),
656                        None => break,
657                    }
658                }
659                StatusCode::NOT_FOUND => {
660                    return Err(Error::new(
661                        ErrorKind::Unexpected,
662                        "Tried to list tables of a namespace that does not exist",
663                    ));
664                }
665                _ => {
666                    return Err(deserialize_unexpected_catalog_error(
667                        http_response,
668                        context.client.disable_header_redaction(),
669                    )
670                    .await);
671                }
672            }
673        }
674
675        Ok(identifiers)
676    }
677
678    /// Create a new table inside the namespace.
679    ///
680    /// In the resulting table, if there are any config properties that
681    /// are present in both the response from the REST server and the
682    /// config provided when creating this `RestCatalog` instance then
683    /// the value provided locally to the `RestCatalog` will take precedence.
684    async fn create_table(
685        &self,
686        namespace: &NamespaceIdent,
687        creation: TableCreation,
688    ) -> Result<Table> {
689        let context = self.context().await?;
690
691        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
692
693        let request = context
694            .client
695            .request(Method::POST, context.config.tables_endpoint(namespace))
696            .json(&CreateTableRequest {
697                name: creation.name,
698                location: creation.location,
699                schema: creation.schema,
700                partition_spec: creation.partition_spec,
701                write_order: creation.sort_order,
702                stage_create: Some(false),
703                properties: creation.properties,
704            })
705            .build()?;
706
707        let http_response = context.client.query_catalog(request).await?;
708
709        let response = match http_response.status() {
710            StatusCode::OK => {
711                deserialize_catalog_response::<LoadTableResult>(http_response).await?
712            }
713            StatusCode::NOT_FOUND => {
714                return Err(Error::new(
715                    ErrorKind::Unexpected,
716                    "Tried to create a table under a namespace that does not exist",
717                ));
718            }
719            StatusCode::CONFLICT => {
720                return Err(Error::new(
721                    ErrorKind::Unexpected,
722                    "The table already exists",
723                ));
724            }
725            _ => {
726                return Err(deserialize_unexpected_catalog_error(
727                    http_response,
728                    context.client.disable_header_redaction(),
729                )
730                .await);
731            }
732        };
733
734        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
735            ErrorKind::DataInvalid,
736            "Metadata location missing in `create_table` response!",
737        ))?;
738
739        let config = response
740            .config
741            .into_iter()
742            .chain(self.user_config.props.clone())
743            .collect();
744
745        let file_io = self
746            .load_file_io(Some(metadata_location), Some(config))
747            .await?;
748
749        let table_builder = Table::builder()
750            .identifier(table_ident.clone())
751            .file_io(file_io)
752            .metadata(response.metadata);
753
754        if let Some(metadata_location) = response.metadata_location {
755            table_builder.metadata_location(metadata_location).build()
756        } else {
757            table_builder.build()
758        }
759    }
760
761    /// Load table from the catalog.
762    ///
763    /// If there are any config properties that are present in both the response from the REST
764    /// server and the config provided when creating this `RestCatalog` instance, then the value
765    /// provided locally to the `RestCatalog` will take precedence.
766    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
767        let context = self.context().await?;
768
769        let request = context
770            .client
771            .request(Method::GET, context.config.table_endpoint(table_ident))
772            .build()?;
773
774        let http_response = context.client.query_catalog(request).await?;
775
776        let response = match http_response.status() {
777            StatusCode::OK | StatusCode::NOT_MODIFIED => {
778                deserialize_catalog_response::<LoadTableResult>(http_response).await?
779            }
780            StatusCode::NOT_FOUND => {
781                return Err(Error::new(
782                    ErrorKind::Unexpected,
783                    "Tried to load a table that does not exist",
784                ));
785            }
786            _ => {
787                return Err(deserialize_unexpected_catalog_error(
788                    http_response,
789                    context.client.disable_header_redaction(),
790                )
791                .await);
792            }
793        };
794
795        let config = response
796            .config
797            .into_iter()
798            .chain(self.user_config.props.clone())
799            .collect();
800
801        let file_io = self
802            .load_file_io(response.metadata_location.as_deref(), Some(config))
803            .await?;
804
805        let table_builder = Table::builder()
806            .identifier(table_ident.clone())
807            .file_io(file_io)
808            .metadata(response.metadata);
809
810        if let Some(metadata_location) = response.metadata_location {
811            table_builder.metadata_location(metadata_location).build()
812        } else {
813            table_builder.build()
814        }
815    }
816
817    /// Drop a table from the catalog.
818    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
819        let context = self.context().await?;
820
821        let request = context
822            .client
823            .request(Method::DELETE, context.config.table_endpoint(table))
824            .build()?;
825
826        let http_response = context.client.query_catalog(request).await?;
827
828        match http_response.status() {
829            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
830            StatusCode::NOT_FOUND => Err(Error::new(
831                ErrorKind::Unexpected,
832                "Tried to drop a table that does not exist",
833            )),
834            _ => Err(deserialize_unexpected_catalog_error(
835                http_response,
836                context.client.disable_header_redaction(),
837            )
838            .await),
839        }
840    }
841
842    /// Check if a table exists in the catalog.
843    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
844        let context = self.context().await?;
845
846        let request = context
847            .client
848            .request(Method::HEAD, context.config.table_endpoint(table))
849            .build()?;
850
851        let http_response = context.client.query_catalog(request).await?;
852
853        match http_response.status() {
854            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
855            StatusCode::NOT_FOUND => Ok(false),
856            _ => Err(deserialize_unexpected_catalog_error(
857                http_response,
858                context.client.disable_header_redaction(),
859            )
860            .await),
861        }
862    }
863
864    /// Rename a table in the catalog.
865    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
866        let context = self.context().await?;
867
868        let request = context
869            .client
870            .request(Method::POST, context.config.rename_table_endpoint())
871            .json(&RenameTableRequest {
872                source: src.clone(),
873                destination: dest.clone(),
874            })
875            .build()?;
876
877        let http_response = context.client.query_catalog(request).await?;
878
879        match http_response.status() {
880            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
881            StatusCode::NOT_FOUND => Err(Error::new(
882                ErrorKind::Unexpected,
883                "Tried to rename a table that does not exist (is the namespace correct?)",
884            )),
885            StatusCode::CONFLICT => Err(Error::new(
886                ErrorKind::Unexpected,
887                "Tried to rename a table to a name that already exists",
888            )),
889            _ => Err(deserialize_unexpected_catalog_error(
890                http_response,
891                context.client.disable_header_redaction(),
892            )
893            .await),
894        }
895    }
896
897    async fn register_table(
898        &self,
899        table_ident: &TableIdent,
900        metadata_location: String,
901    ) -> Result<Table> {
902        let context = self.context().await?;
903
904        let request = context
905            .client
906            .request(
907                Method::POST,
908                context
909                    .config
910                    .register_table_endpoint(table_ident.namespace()),
911            )
912            .json(&RegisterTableRequest {
913                name: table_ident.name.clone(),
914                metadata_location: metadata_location.clone(),
915                overwrite: Some(false),
916            })
917            .build()?;
918
919        let http_response = context.client.query_catalog(request).await?;
920
921        let response: LoadTableResult = match http_response.status() {
922            StatusCode::OK => {
923                deserialize_catalog_response::<LoadTableResult>(http_response).await?
924            }
925            StatusCode::NOT_FOUND => {
926                return Err(Error::new(
927                    ErrorKind::NamespaceNotFound,
928                    "The namespace specified does not exist.",
929                ));
930            }
931            StatusCode::CONFLICT => {
932                return Err(Error::new(
933                    ErrorKind::TableAlreadyExists,
934                    "The given table already exists.",
935                ));
936            }
937            _ => {
938                return Err(deserialize_unexpected_catalog_error(
939                    http_response,
940                    context.client.disable_header_redaction(),
941                )
942                .await);
943            }
944        };
945
946        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
947            ErrorKind::DataInvalid,
948            "Metadata location missing in `register_table` response!",
949        ))?;
950
951        let file_io = self.load_file_io(Some(metadata_location), None).await?;
952
953        Table::builder()
954            .identifier(table_ident.clone())
955            .file_io(file_io)
956            .metadata(response.metadata)
957            .metadata_location(metadata_location.clone())
958            .build()
959    }
960
961    async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
962        let context = self.context().await?;
963
964        let request = context
965            .client
966            .request(
967                Method::POST,
968                context.config.table_endpoint(commit.identifier()),
969            )
970            .json(&CommitTableRequest {
971                identifier: Some(commit.identifier().clone()),
972                requirements: commit.take_requirements(),
973                updates: commit.take_updates(),
974            })
975            .build()?;
976
977        let http_response = context.client.query_catalog(request).await?;
978
979        let response: CommitTableResponse = match http_response.status() {
980            StatusCode::OK => deserialize_catalog_response(http_response).await?,
981            StatusCode::NOT_FOUND => {
982                return Err(Error::new(
983                    ErrorKind::TableNotFound,
984                    "Tried to update a table that does not exist",
985                ));
986            }
987            StatusCode::CONFLICT => {
988                return Err(Error::new(
989                    ErrorKind::CatalogCommitConflicts,
990                    "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
991                )
992                .with_retryable(true));
993            }
994            StatusCode::INTERNAL_SERVER_ERROR => {
995                return Err(Error::new(
996                    ErrorKind::Unexpected,
997                    "An unknown server-side problem occurred; the commit state is unknown.",
998                ));
999            }
1000            StatusCode::BAD_GATEWAY => {
1001                return Err(Error::new(
1002                    ErrorKind::Unexpected,
1003                    "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1004                ));
1005            }
1006            StatusCode::GATEWAY_TIMEOUT => {
1007                return Err(Error::new(
1008                    ErrorKind::Unexpected,
1009                    "A server-side gateway timeout occurred; the commit state is unknown.",
1010                ));
1011            }
1012            _ => {
1013                return Err(deserialize_unexpected_catalog_error(
1014                    http_response,
1015                    context.client.disable_header_redaction(),
1016                )
1017                .await);
1018            }
1019        };
1020
1021        let file_io = self
1022            .load_file_io(Some(&response.metadata_location), None)
1023            .await?;
1024
1025        Table::builder()
1026            .identifier(commit.identifier().clone())
1027            .file_io(file_io)
1028            .metadata(response.metadata)
1029            .metadata_location(response.metadata_location)
1030            .build()
1031    }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036    use std::fs::File;
1037    use std::io::BufReader;
1038    use std::sync::Arc;
1039
1040    use chrono::{TimeZone, Utc};
1041    use iceberg::spec::{
1042        FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1043        SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1044        UnboundPartitionField, UnboundPartitionSpec,
1045    };
1046    use iceberg::transaction::{ApplyTransactionAction, Transaction};
1047    use mockito::{Mock, Server, ServerGuard};
1048    use serde_json::json;
1049    use uuid::uuid;
1050
1051    use super::*;
1052
1053    #[tokio::test]
1054    async fn test_update_config() {
1055        let mut server = Server::new_async().await;
1056
1057        let config_mock = server
1058            .mock("GET", "/v1/config")
1059            .with_status(200)
1060            .with_body(
1061                r#"{
1062                "overrides": {
1063                    "warehouse": "s3://iceberg-catalog"
1064                },
1065                "defaults": {}
1066            }"#,
1067            )
1068            .create_async()
1069            .await;
1070
1071        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1072
1073        assert_eq!(
1074            catalog
1075                .context()
1076                .await
1077                .unwrap()
1078                .config
1079                .props
1080                .get("warehouse"),
1081            Some(&"s3://iceberg-catalog".to_string())
1082        );
1083
1084        config_mock.assert_async().await;
1085    }
1086
1087    async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1088        server
1089            .mock("GET", "/v1/config")
1090            .with_status(200)
1091            .with_body(
1092                r#"{
1093                "overrides": {
1094                    "warehouse": "s3://iceberg-catalog"
1095                },
1096                "defaults": {}
1097            }"#,
1098            )
1099            .create_async()
1100            .await
1101    }
1102
1103    async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1104        create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1105    }
1106
1107    async fn create_oauth_mock_with_path(
1108        server: &mut ServerGuard,
1109        path: &str,
1110        token: &str,
1111        status: usize,
1112    ) -> Mock {
1113        let body = format!(
1114            r#"{{
1115                "access_token": "{token}",
1116                "token_type": "Bearer",
1117                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1118                "expires_in": 86400
1119            }}"#
1120        );
1121        server
1122            .mock("POST", path)
1123            .with_status(status)
1124            .with_body(body)
1125            .expect(1)
1126            .create_async()
1127            .await
1128    }
1129
1130    #[tokio::test]
1131    async fn test_oauth() {
1132        let mut server = Server::new_async().await;
1133        let oauth_mock = create_oauth_mock(&mut server).await;
1134        let config_mock = create_config_mock(&mut server).await;
1135
1136        let mut props = HashMap::new();
1137        props.insert("credential".to_string(), "client1:secret1".to_string());
1138
1139        let catalog = RestCatalog::new(
1140            RestCatalogConfig::builder()
1141                .uri(server.url())
1142                .props(props)
1143                .build(),
1144        );
1145
1146        let token = catalog.context().await.unwrap().client.token().await;
1147        oauth_mock.assert_async().await;
1148        config_mock.assert_async().await;
1149        assert_eq!(token, Some("ey000000000000".to_string()));
1150    }
1151
1152    #[tokio::test]
1153    async fn test_oauth_with_optional_param() {
1154        let mut props = HashMap::new();
1155        props.insert("credential".to_string(), "client1:secret1".to_string());
1156        props.insert("scope".to_string(), "custom_scope".to_string());
1157        props.insert("audience".to_string(), "custom_audience".to_string());
1158        props.insert("resource".to_string(), "custom_resource".to_string());
1159
1160        let mut server = Server::new_async().await;
1161        let oauth_mock = server
1162            .mock("POST", "/v1/oauth/tokens")
1163            .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1164            .match_body(mockito::Matcher::Regex(
1165                "audience=custom_audience".to_string(),
1166            ))
1167            .match_body(mockito::Matcher::Regex(
1168                "resource=custom_resource".to_string(),
1169            ))
1170            .with_status(200)
1171            .with_body(
1172                r#"{
1173                "access_token": "ey000000000000",
1174                "token_type": "Bearer",
1175                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1176                "expires_in": 86400
1177                }"#,
1178            )
1179            .expect(1)
1180            .create_async()
1181            .await;
1182
1183        let config_mock = create_config_mock(&mut server).await;
1184
1185        let catalog = RestCatalog::new(
1186            RestCatalogConfig::builder()
1187                .uri(server.url())
1188                .props(props)
1189                .build(),
1190        );
1191
1192        let token = catalog.context().await.unwrap().client.token().await;
1193
1194        oauth_mock.assert_async().await;
1195        config_mock.assert_async().await;
1196        assert_eq!(token, Some("ey000000000000".to_string()));
1197    }
1198
1199    #[tokio::test]
1200    async fn test_invalidate_token() {
1201        let mut server = Server::new_async().await;
1202        let oauth_mock = create_oauth_mock(&mut server).await;
1203        let config_mock = create_config_mock(&mut server).await;
1204
1205        let mut props = HashMap::new();
1206        props.insert("credential".to_string(), "client1:secret1".to_string());
1207
1208        let catalog = RestCatalog::new(
1209            RestCatalogConfig::builder()
1210                .uri(server.url())
1211                .props(props)
1212                .build(),
1213        );
1214
1215        let token = catalog.context().await.unwrap().client.token().await;
1216        oauth_mock.assert_async().await;
1217        config_mock.assert_async().await;
1218        assert_eq!(token, Some("ey000000000000".to_string()));
1219
1220        let oauth_mock =
1221            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1222                .await;
1223        catalog.invalidate_token().await.unwrap();
1224        let token = catalog.context().await.unwrap().client.token().await;
1225        oauth_mock.assert_async().await;
1226        assert_eq!(token, Some("ey000000000001".to_string()));
1227    }
1228
1229    #[tokio::test]
1230    async fn test_invalidate_token_failing_request() {
1231        let mut server = Server::new_async().await;
1232        let oauth_mock = create_oauth_mock(&mut server).await;
1233        let config_mock = create_config_mock(&mut server).await;
1234
1235        let mut props = HashMap::new();
1236        props.insert("credential".to_string(), "client1:secret1".to_string());
1237
1238        let catalog = RestCatalog::new(
1239            RestCatalogConfig::builder()
1240                .uri(server.url())
1241                .props(props)
1242                .build(),
1243        );
1244
1245        let token = catalog.context().await.unwrap().client.token().await;
1246        oauth_mock.assert_async().await;
1247        config_mock.assert_async().await;
1248        assert_eq!(token, Some("ey000000000000".to_string()));
1249
1250        let oauth_mock =
1251            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1252                .await;
1253        catalog.invalidate_token().await.unwrap();
1254        let token = catalog.context().await.unwrap().client.token().await;
1255        oauth_mock.assert_async().await;
1256        assert_eq!(token, None);
1257    }
1258
1259    #[tokio::test]
1260    async fn test_regenerate_token() {
1261        let mut server = Server::new_async().await;
1262        let oauth_mock = create_oauth_mock(&mut server).await;
1263        let config_mock = create_config_mock(&mut server).await;
1264
1265        let mut props = HashMap::new();
1266        props.insert("credential".to_string(), "client1:secret1".to_string());
1267
1268        let catalog = RestCatalog::new(
1269            RestCatalogConfig::builder()
1270                .uri(server.url())
1271                .props(props)
1272                .build(),
1273        );
1274
1275        let token = catalog.context().await.unwrap().client.token().await;
1276        oauth_mock.assert_async().await;
1277        config_mock.assert_async().await;
1278        assert_eq!(token, Some("ey000000000000".to_string()));
1279
1280        let oauth_mock =
1281            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1282                .await;
1283        catalog.regenerate_token().await.unwrap();
1284        oauth_mock.assert_async().await;
1285        let token = catalog.context().await.unwrap().client.token().await;
1286        assert_eq!(token, Some("ey000000000001".to_string()));
1287    }
1288
1289    #[tokio::test]
1290    async fn test_regenerate_token_failing_request() {
1291        let mut server = Server::new_async().await;
1292        let oauth_mock = create_oauth_mock(&mut server).await;
1293        let config_mock = create_config_mock(&mut server).await;
1294
1295        let mut props = HashMap::new();
1296        props.insert("credential".to_string(), "client1:secret1".to_string());
1297
1298        let catalog = RestCatalog::new(
1299            RestCatalogConfig::builder()
1300                .uri(server.url())
1301                .props(props)
1302                .build(),
1303        );
1304
1305        let token = catalog.context().await.unwrap().client.token().await;
1306        oauth_mock.assert_async().await;
1307        config_mock.assert_async().await;
1308        assert_eq!(token, Some("ey000000000000".to_string()));
1309
1310        let oauth_mock =
1311            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1312                .await;
1313        let invalidate_result = catalog.regenerate_token().await;
1314        assert!(invalidate_result.is_err());
1315        oauth_mock.assert_async().await;
1316        let token = catalog.context().await.unwrap().client.token().await;
1317
1318        // original token is left intact
1319        assert_eq!(token, Some("ey000000000000".to_string()));
1320    }
1321
1322    #[tokio::test]
1323    async fn test_http_headers() {
1324        let server = Server::new_async().await;
1325        let mut props = HashMap::new();
1326        props.insert("credential".to_string(), "client1:secret1".to_string());
1327
1328        let config = RestCatalogConfig::builder()
1329            .uri(server.url())
1330            .props(props)
1331            .build();
1332        let headers: HeaderMap = config.extra_headers().unwrap();
1333
1334        let expected_headers = HeaderMap::from_iter([
1335            (
1336                header::CONTENT_TYPE,
1337                HeaderValue::from_static("application/json"),
1338            ),
1339            (
1340                HeaderName::from_static("x-client-version"),
1341                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1342            ),
1343            (
1344                header::USER_AGENT,
1345                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1346            ),
1347        ]);
1348        assert_eq!(headers, expected_headers);
1349    }
1350
1351    #[tokio::test]
1352    async fn test_http_headers_with_custom_headers() {
1353        let server = Server::new_async().await;
1354        let mut props = HashMap::new();
1355        props.insert("credential".to_string(), "client1:secret1".to_string());
1356        props.insert(
1357            "header.content-type".to_string(),
1358            "application/yaml".to_string(),
1359        );
1360        props.insert(
1361            "header.customized-header".to_string(),
1362            "some/value".to_string(),
1363        );
1364
1365        let config = RestCatalogConfig::builder()
1366            .uri(server.url())
1367            .props(props)
1368            .build();
1369        let headers: HeaderMap = config.extra_headers().unwrap();
1370
1371        let expected_headers = HeaderMap::from_iter([
1372            (
1373                header::CONTENT_TYPE,
1374                HeaderValue::from_static("application/yaml"),
1375            ),
1376            (
1377                HeaderName::from_static("x-client-version"),
1378                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1379            ),
1380            (
1381                header::USER_AGENT,
1382                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1383            ),
1384            (
1385                HeaderName::from_static("customized-header"),
1386                HeaderValue::from_static("some/value"),
1387            ),
1388        ]);
1389        assert_eq!(headers, expected_headers);
1390    }
1391
1392    #[tokio::test]
1393    async fn test_oauth_with_oauth2_server_uri() {
1394        let mut server = Server::new_async().await;
1395        let config_mock = create_config_mock(&mut server).await;
1396
1397        let mut auth_server = Server::new_async().await;
1398        let auth_server_path = "/some/path";
1399        let oauth_mock =
1400            create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1401                .await;
1402
1403        let mut props = HashMap::new();
1404        props.insert("credential".to_string(), "client1:secret1".to_string());
1405        props.insert(
1406            "oauth2-server-uri".to_string(),
1407            format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1408        );
1409
1410        let catalog = RestCatalog::new(
1411            RestCatalogConfig::builder()
1412                .uri(server.url())
1413                .props(props)
1414                .build(),
1415        );
1416
1417        let token = catalog.context().await.unwrap().client.token().await;
1418
1419        oauth_mock.assert_async().await;
1420        config_mock.assert_async().await;
1421        assert_eq!(token, Some("ey000000000000".to_string()));
1422    }
1423
1424    #[tokio::test]
1425    async fn test_config_override() {
1426        let mut server = Server::new_async().await;
1427        let mut redirect_server = Server::new_async().await;
1428        let new_uri = redirect_server.url();
1429
1430        let config_mock = server
1431            .mock("GET", "/v1/config")
1432            .with_status(200)
1433            .with_body(
1434                json!(
1435                    {
1436                        "overrides": {
1437                            "uri": new_uri,
1438                            "warehouse": "s3://iceberg-catalog",
1439                            "prefix": "ice/warehouses/my"
1440                        },
1441                        "defaults": {},
1442                    }
1443                )
1444                .to_string(),
1445            )
1446            .create_async()
1447            .await;
1448
1449        let list_ns_mock = redirect_server
1450            .mock("GET", "/v1/ice/warehouses/my/namespaces")
1451            .with_body(
1452                r#"{
1453                    "namespaces": []
1454                }"#,
1455            )
1456            .create_async()
1457            .await;
1458
1459        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1460
1461        let _namespaces = catalog.list_namespaces(None).await.unwrap();
1462
1463        config_mock.assert_async().await;
1464        list_ns_mock.assert_async().await;
1465    }
1466
1467    #[tokio::test]
1468    async fn test_list_namespace() {
1469        let mut server = Server::new_async().await;
1470
1471        let config_mock = create_config_mock(&mut server).await;
1472
1473        let list_ns_mock = server
1474            .mock("GET", "/v1/namespaces")
1475            .with_body(
1476                r#"{
1477                "namespaces": [
1478                    ["ns1", "ns11"],
1479                    ["ns2"]
1480                ]
1481            }"#,
1482            )
1483            .create_async()
1484            .await;
1485
1486        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1487
1488        let namespaces = catalog.list_namespaces(None).await.unwrap();
1489
1490        let expected_ns = vec![
1491            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1492            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1493        ];
1494
1495        assert_eq!(expected_ns, namespaces);
1496
1497        config_mock.assert_async().await;
1498        list_ns_mock.assert_async().await;
1499    }
1500
1501    #[tokio::test]
1502    async fn test_list_namespace_with_pagination() {
1503        let mut server = Server::new_async().await;
1504
1505        let config_mock = create_config_mock(&mut server).await;
1506
1507        let list_ns_mock_page1 = server
1508            .mock("GET", "/v1/namespaces")
1509            .with_body(
1510                r#"{
1511                "namespaces": [
1512                    ["ns1", "ns11"],
1513                    ["ns2"]
1514                ],
1515                "next-page-token": "token123"
1516            }"#,
1517            )
1518            .create_async()
1519            .await;
1520
1521        let list_ns_mock_page2 = server
1522            .mock("GET", "/v1/namespaces?pageToken=token123")
1523            .with_body(
1524                r#"{
1525                "namespaces": [
1526                    ["ns3"],
1527                    ["ns4", "ns41"]
1528                ]
1529            }"#,
1530            )
1531            .create_async()
1532            .await;
1533
1534        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1535
1536        let namespaces = catalog.list_namespaces(None).await.unwrap();
1537
1538        let expected_ns = vec![
1539            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1540            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1541            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1542            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1543        ];
1544
1545        assert_eq!(expected_ns, namespaces);
1546
1547        config_mock.assert_async().await;
1548        list_ns_mock_page1.assert_async().await;
1549        list_ns_mock_page2.assert_async().await;
1550    }
1551
1552    #[tokio::test]
1553    async fn test_list_namespace_with_multiple_pages() {
1554        let mut server = Server::new_async().await;
1555
1556        let config_mock = create_config_mock(&mut server).await;
1557
1558        // Page 1
1559        let list_ns_mock_page1 = server
1560            .mock("GET", "/v1/namespaces")
1561            .with_body(
1562                r#"{
1563                "namespaces": [
1564                    ["ns1", "ns11"],
1565                    ["ns2"]
1566                ],
1567                "next-page-token": "page2"
1568            }"#,
1569            )
1570            .create_async()
1571            .await;
1572
1573        // Page 2
1574        let list_ns_mock_page2 = server
1575            .mock("GET", "/v1/namespaces?pageToken=page2")
1576            .with_body(
1577                r#"{
1578                "namespaces": [
1579                    ["ns3"],
1580                    ["ns4", "ns41"]
1581                ],
1582                "next-page-token": "page3"
1583            }"#,
1584            )
1585            .create_async()
1586            .await;
1587
1588        // Page 3
1589        let list_ns_mock_page3 = server
1590            .mock("GET", "/v1/namespaces?pageToken=page3")
1591            .with_body(
1592                r#"{
1593                "namespaces": [
1594                    ["ns5", "ns51", "ns511"]
1595                ],
1596                "next-page-token": "page4"
1597            }"#,
1598            )
1599            .create_async()
1600            .await;
1601
1602        // Page 4
1603        let list_ns_mock_page4 = server
1604            .mock("GET", "/v1/namespaces?pageToken=page4")
1605            .with_body(
1606                r#"{
1607                "namespaces": [
1608                    ["ns6"],
1609                    ["ns7"]
1610                ],
1611                "next-page-token": "page5"
1612            }"#,
1613            )
1614            .create_async()
1615            .await;
1616
1617        // Page 5 (final page)
1618        let list_ns_mock_page5 = server
1619            .mock("GET", "/v1/namespaces?pageToken=page5")
1620            .with_body(
1621                r#"{
1622                "namespaces": [
1623                    ["ns8", "ns81"]
1624                ]
1625            }"#,
1626            )
1627            .create_async()
1628            .await;
1629
1630        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1631
1632        let namespaces = catalog.list_namespaces(None).await.unwrap();
1633
1634        let expected_ns = vec![
1635            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1636            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1637            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1638            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1639            NamespaceIdent::from_vec(vec![
1640                "ns5".to_string(),
1641                "ns51".to_string(),
1642                "ns511".to_string(),
1643            ])
1644            .unwrap(),
1645            NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1646            NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1647            NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1648        ];
1649
1650        assert_eq!(expected_ns, namespaces);
1651
1652        // Verify all page requests were made
1653        config_mock.assert_async().await;
1654        list_ns_mock_page1.assert_async().await;
1655        list_ns_mock_page2.assert_async().await;
1656        list_ns_mock_page3.assert_async().await;
1657        list_ns_mock_page4.assert_async().await;
1658        list_ns_mock_page5.assert_async().await;
1659    }
1660
1661    #[tokio::test]
1662    async fn test_create_namespace() {
1663        let mut server = Server::new_async().await;
1664
1665        let config_mock = create_config_mock(&mut server).await;
1666
1667        let create_ns_mock = server
1668            .mock("POST", "/v1/namespaces")
1669            .with_body(
1670                r#"{
1671                "namespace": [ "ns1", "ns11"],
1672                "properties" : {
1673                    "key1": "value1"
1674                }
1675            }"#,
1676            )
1677            .create_async()
1678            .await;
1679
1680        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1681
1682        let namespaces = catalog
1683            .create_namespace(
1684                &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1685                HashMap::from([("key1".to_string(), "value1".to_string())]),
1686            )
1687            .await
1688            .unwrap();
1689
1690        let expected_ns = Namespace::with_properties(
1691            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1692            HashMap::from([("key1".to_string(), "value1".to_string())]),
1693        );
1694
1695        assert_eq!(expected_ns, namespaces);
1696
1697        config_mock.assert_async().await;
1698        create_ns_mock.assert_async().await;
1699    }
1700
1701    #[tokio::test]
1702    async fn test_get_namespace() {
1703        let mut server = Server::new_async().await;
1704
1705        let config_mock = create_config_mock(&mut server).await;
1706
1707        let get_ns_mock = server
1708            .mock("GET", "/v1/namespaces/ns1")
1709            .with_body(
1710                r#"{
1711                "namespace": [ "ns1"],
1712                "properties" : {
1713                    "key1": "value1"
1714                }
1715            }"#,
1716            )
1717            .create_async()
1718            .await;
1719
1720        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1721
1722        let namespaces = catalog
1723            .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1724            .await
1725            .unwrap();
1726
1727        let expected_ns = Namespace::with_properties(
1728            NamespaceIdent::new("ns1".to_string()),
1729            HashMap::from([("key1".to_string(), "value1".to_string())]),
1730        );
1731
1732        assert_eq!(expected_ns, namespaces);
1733
1734        config_mock.assert_async().await;
1735        get_ns_mock.assert_async().await;
1736    }
1737
1738    #[tokio::test]
1739    async fn check_namespace_exists() {
1740        let mut server = Server::new_async().await;
1741
1742        let config_mock = create_config_mock(&mut server).await;
1743
1744        let get_ns_mock = server
1745            .mock("HEAD", "/v1/namespaces/ns1")
1746            .with_status(204)
1747            .create_async()
1748            .await;
1749
1750        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1751
1752        assert!(
1753            catalog
1754                .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1755                .await
1756                .unwrap()
1757        );
1758
1759        config_mock.assert_async().await;
1760        get_ns_mock.assert_async().await;
1761    }
1762
1763    #[tokio::test]
1764    async fn test_drop_namespace() {
1765        let mut server = Server::new_async().await;
1766
1767        let config_mock = create_config_mock(&mut server).await;
1768
1769        let drop_ns_mock = server
1770            .mock("DELETE", "/v1/namespaces/ns1")
1771            .with_status(204)
1772            .create_async()
1773            .await;
1774
1775        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1776
1777        catalog
1778            .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1779            .await
1780            .unwrap();
1781
1782        config_mock.assert_async().await;
1783        drop_ns_mock.assert_async().await;
1784    }
1785
1786    #[tokio::test]
1787    async fn test_list_tables() {
1788        let mut server = Server::new_async().await;
1789
1790        let config_mock = create_config_mock(&mut server).await;
1791
1792        let list_tables_mock = server
1793            .mock("GET", "/v1/namespaces/ns1/tables")
1794            .with_status(200)
1795            .with_body(
1796                r#"{
1797                "identifiers": [
1798                    {
1799                        "namespace": ["ns1"],
1800                        "name": "table1"
1801                    },
1802                    {
1803                        "namespace": ["ns1"],
1804                        "name": "table2"
1805                    }
1806                ]
1807            }"#,
1808            )
1809            .create_async()
1810            .await;
1811
1812        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1813
1814        let tables = catalog
1815            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1816            .await
1817            .unwrap();
1818
1819        let expected_tables = vec![
1820            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1821            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1822        ];
1823
1824        assert_eq!(tables, expected_tables);
1825
1826        config_mock.assert_async().await;
1827        list_tables_mock.assert_async().await;
1828    }
1829
1830    #[tokio::test]
1831    async fn test_list_tables_with_pagination() {
1832        let mut server = Server::new_async().await;
1833
1834        let config_mock = create_config_mock(&mut server).await;
1835
1836        let list_tables_mock_page1 = server
1837            .mock("GET", "/v1/namespaces/ns1/tables")
1838            .with_status(200)
1839            .with_body(
1840                r#"{
1841                "identifiers": [
1842                    {
1843                        "namespace": ["ns1"],
1844                        "name": "table1"
1845                    },
1846                    {
1847                        "namespace": ["ns1"],
1848                        "name": "table2"
1849                    }
1850                ],
1851                "next-page-token": "token456"
1852            }"#,
1853            )
1854            .create_async()
1855            .await;
1856
1857        let list_tables_mock_page2 = server
1858            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1859            .with_status(200)
1860            .with_body(
1861                r#"{
1862                "identifiers": [
1863                    {
1864                        "namespace": ["ns1"],
1865                        "name": "table3"
1866                    },
1867                    {
1868                        "namespace": ["ns1"],
1869                        "name": "table4"
1870                    }
1871                ]
1872            }"#,
1873            )
1874            .create_async()
1875            .await;
1876
1877        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1878
1879        let tables = catalog
1880            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1881            .await
1882            .unwrap();
1883
1884        let expected_tables = vec![
1885            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1886            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1887            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1888            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1889        ];
1890
1891        assert_eq!(tables, expected_tables);
1892
1893        config_mock.assert_async().await;
1894        list_tables_mock_page1.assert_async().await;
1895        list_tables_mock_page2.assert_async().await;
1896    }
1897
1898    #[tokio::test]
1899    async fn test_list_tables_with_multiple_pages() {
1900        let mut server = Server::new_async().await;
1901
1902        let config_mock = create_config_mock(&mut server).await;
1903
1904        // Page 1
1905        let list_tables_mock_page1 = server
1906            .mock("GET", "/v1/namespaces/ns1/tables")
1907            .with_status(200)
1908            .with_body(
1909                r#"{
1910                "identifiers": [
1911                    {
1912                        "namespace": ["ns1"],
1913                        "name": "table1"
1914                    },
1915                    {
1916                        "namespace": ["ns1"],
1917                        "name": "table2"
1918                    }
1919                ],
1920                "next-page-token": "page2"
1921            }"#,
1922            )
1923            .create_async()
1924            .await;
1925
1926        // Page 2
1927        let list_tables_mock_page2 = server
1928            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1929            .with_status(200)
1930            .with_body(
1931                r#"{
1932                "identifiers": [
1933                    {
1934                        "namespace": ["ns1"],
1935                        "name": "table3"
1936                    },
1937                    {
1938                        "namespace": ["ns1"],
1939                        "name": "table4"
1940                    }
1941                ],
1942                "next-page-token": "page3"
1943            }"#,
1944            )
1945            .create_async()
1946            .await;
1947
1948        // Page 3
1949        let list_tables_mock_page3 = server
1950            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
1951            .with_status(200)
1952            .with_body(
1953                r#"{
1954                "identifiers": [
1955                    {
1956                        "namespace": ["ns1"],
1957                        "name": "table5"
1958                    }
1959                ],
1960                "next-page-token": "page4"
1961            }"#,
1962            )
1963            .create_async()
1964            .await;
1965
1966        // Page 4
1967        let list_tables_mock_page4 = server
1968            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
1969            .with_status(200)
1970            .with_body(
1971                r#"{
1972                "identifiers": [
1973                    {
1974                        "namespace": ["ns1"],
1975                        "name": "table6"
1976                    },
1977                    {
1978                        "namespace": ["ns1"],
1979                        "name": "table7"
1980                    }
1981                ],
1982                "next-page-token": "page5"
1983            }"#,
1984            )
1985            .create_async()
1986            .await;
1987
1988        // Page 5 (final page)
1989        let list_tables_mock_page5 = server
1990            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
1991            .with_status(200)
1992            .with_body(
1993                r#"{
1994                "identifiers": [
1995                    {
1996                        "namespace": ["ns1"],
1997                        "name": "table8"
1998                    }
1999                ]
2000            }"#,
2001            )
2002            .create_async()
2003            .await;
2004
2005        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2006
2007        let tables = catalog
2008            .list_tables(&NamespaceIdent::new("ns1".to_string()))
2009            .await
2010            .unwrap();
2011
2012        let expected_tables = vec![
2013            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2014            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2015            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2016            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2017            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2018            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2019            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2020            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2021        ];
2022
2023        assert_eq!(tables, expected_tables);
2024
2025        // Verify all page requests were made
2026        config_mock.assert_async().await;
2027        list_tables_mock_page1.assert_async().await;
2028        list_tables_mock_page2.assert_async().await;
2029        list_tables_mock_page3.assert_async().await;
2030        list_tables_mock_page4.assert_async().await;
2031        list_tables_mock_page5.assert_async().await;
2032    }
2033
2034    #[tokio::test]
2035    async fn test_drop_tables() {
2036        let mut server = Server::new_async().await;
2037
2038        let config_mock = create_config_mock(&mut server).await;
2039
2040        let delete_table_mock = server
2041            .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2042            .with_status(204)
2043            .create_async()
2044            .await;
2045
2046        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2047
2048        catalog
2049            .drop_table(&TableIdent::new(
2050                NamespaceIdent::new("ns1".to_string()),
2051                "table1".to_string(),
2052            ))
2053            .await
2054            .unwrap();
2055
2056        config_mock.assert_async().await;
2057        delete_table_mock.assert_async().await;
2058    }
2059
2060    #[tokio::test]
2061    async fn test_check_table_exists() {
2062        let mut server = Server::new_async().await;
2063
2064        let config_mock = create_config_mock(&mut server).await;
2065
2066        let check_table_exists_mock = server
2067            .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2068            .with_status(204)
2069            .create_async()
2070            .await;
2071
2072        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2073
2074        assert!(
2075            catalog
2076                .table_exists(&TableIdent::new(
2077                    NamespaceIdent::new("ns1".to_string()),
2078                    "table1".to_string(),
2079                ))
2080                .await
2081                .unwrap()
2082        );
2083
2084        config_mock.assert_async().await;
2085        check_table_exists_mock.assert_async().await;
2086    }
2087
2088    #[tokio::test]
2089    async fn test_rename_table() {
2090        let mut server = Server::new_async().await;
2091
2092        let config_mock = create_config_mock(&mut server).await;
2093
2094        let rename_table_mock = server
2095            .mock("POST", "/v1/tables/rename")
2096            .with_status(204)
2097            .create_async()
2098            .await;
2099
2100        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2101
2102        catalog
2103            .rename_table(
2104                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2105                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2106            )
2107            .await
2108            .unwrap();
2109
2110        config_mock.assert_async().await;
2111        rename_table_mock.assert_async().await;
2112    }
2113
2114    #[tokio::test]
2115    async fn test_load_table() {
2116        let mut server = Server::new_async().await;
2117
2118        let config_mock = create_config_mock(&mut server).await;
2119
2120        let rename_table_mock = server
2121            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2122            .with_status(200)
2123            .with_body_from_file(format!(
2124                "{}/testdata/{}",
2125                env!("CARGO_MANIFEST_DIR"),
2126                "load_table_response.json"
2127            ))
2128            .create_async()
2129            .await;
2130
2131        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2132
2133        let table = catalog
2134            .load_table(&TableIdent::new(
2135                NamespaceIdent::new("ns1".to_string()),
2136                "test1".to_string(),
2137            ))
2138            .await
2139            .unwrap();
2140
2141        assert_eq!(
2142            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2143            table.identifier()
2144        );
2145        assert_eq!(
2146            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2147            table.metadata_location().unwrap()
2148        );
2149        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2150        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2151        assert_eq!(
2152            uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2153            table.metadata().uuid()
2154        );
2155        assert_eq!(
2156            Utc.timestamp_millis_opt(1646787054459).unwrap(),
2157            table.metadata().last_updated_timestamp().unwrap()
2158        );
2159        assert_eq!(
2160            vec![&Arc::new(
2161                Schema::builder()
2162                    .with_fields(vec![
2163                        NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2164                        NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2165                            .into(),
2166                    ])
2167                    .build()
2168                    .unwrap()
2169            )],
2170            table.metadata().schemas_iter().collect::<Vec<_>>()
2171        );
2172        assert_eq!(
2173            &HashMap::from([
2174                ("owner".to_string(), "bryan".to_string()),
2175                (
2176                    "write.metadata.compression-codec".to_string(),
2177                    "gzip".to_string()
2178                )
2179            ]),
2180            table.metadata().properties()
2181        );
2182        assert_eq!(vec![&Arc::new(Snapshot::builder()
2183            .with_snapshot_id(3497810964824022504)
2184            .with_timestamp_ms(1646787054459)
2185            .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2186            .with_sequence_number(0)
2187            .with_schema_id(0)
2188            .with_summary(Summary {
2189                operation: Operation::Append,
2190                additional_properties: HashMap::from_iter([
2191                    ("spark.app.id", "local-1646787004168"),
2192                    ("added-data-files", "1"),
2193                    ("added-records", "1"),
2194                    ("added-files-size", "697"),
2195                    ("changed-partition-count", "1"),
2196                    ("total-records", "1"),
2197                    ("total-files-size", "697"),
2198                    ("total-data-files", "1"),
2199                    ("total-delete-files", "0"),
2200                    ("total-position-deletes", "0"),
2201                    ("total-equality-deletes", "0")
2202                ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2203            }).build()
2204        )], table.metadata().snapshots().collect::<Vec<_>>());
2205        assert_eq!(
2206            &[SnapshotLog {
2207                timestamp_ms: 1646787054459,
2208                snapshot_id: 3497810964824022504,
2209            }],
2210            table.metadata().history()
2211        );
2212        assert_eq!(
2213            vec![&Arc::new(SortOrder {
2214                order_id: 0,
2215                fields: vec![],
2216            })],
2217            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2218        );
2219
2220        config_mock.assert_async().await;
2221        rename_table_mock.assert_async().await;
2222    }
2223
2224    #[tokio::test]
2225    async fn test_load_table_404() {
2226        let mut server = Server::new_async().await;
2227
2228        let config_mock = create_config_mock(&mut server).await;
2229
2230        let rename_table_mock = server
2231            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2232            .with_status(404)
2233            .with_body(r#"
2234{
2235    "error": {
2236        "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2237        "type": "NoSuchNamespaceErrorException",
2238        "code": 404
2239    }
2240}
2241            "#)
2242            .create_async()
2243            .await;
2244
2245        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2246
2247        let table = catalog
2248            .load_table(&TableIdent::new(
2249                NamespaceIdent::new("ns1".to_string()),
2250                "test1".to_string(),
2251            ))
2252            .await;
2253
2254        assert!(table.is_err());
2255        assert!(table.err().unwrap().message().contains("does not exist"));
2256
2257        config_mock.assert_async().await;
2258        rename_table_mock.assert_async().await;
2259    }
2260
2261    #[tokio::test]
2262    async fn test_create_table() {
2263        let mut server = Server::new_async().await;
2264
2265        let config_mock = create_config_mock(&mut server).await;
2266
2267        let create_table_mock = server
2268            .mock("POST", "/v1/namespaces/ns1/tables")
2269            .with_status(200)
2270            .with_body_from_file(format!(
2271                "{}/testdata/{}",
2272                env!("CARGO_MANIFEST_DIR"),
2273                "create_table_response.json"
2274            ))
2275            .create_async()
2276            .await;
2277
2278        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2279
2280        let table_creation = TableCreation::builder()
2281            .name("test1".to_string())
2282            .schema(
2283                Schema::builder()
2284                    .with_fields(vec![
2285                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2286                            .into(),
2287                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2288                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2289                            .into(),
2290                    ])
2291                    .with_schema_id(1)
2292                    .with_identifier_field_ids(vec![2])
2293                    .build()
2294                    .unwrap(),
2295            )
2296            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2297            .partition_spec(
2298                UnboundPartitionSpec::builder()
2299                    .add_partition_fields(vec![
2300                        UnboundPartitionField::builder()
2301                            .source_id(1)
2302                            .transform(Transform::Truncate(3))
2303                            .name("id".to_string())
2304                            .build(),
2305                    ])
2306                    .unwrap()
2307                    .build(),
2308            )
2309            .sort_order(
2310                SortOrder::builder()
2311                    .with_sort_field(
2312                        SortField::builder()
2313                            .source_id(2)
2314                            .transform(Transform::Identity)
2315                            .direction(SortDirection::Ascending)
2316                            .null_order(NullOrder::First)
2317                            .build(),
2318                    )
2319                    .build_unbound()
2320                    .unwrap(),
2321            )
2322            .build();
2323
2324        let table = catalog
2325            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2326            .await
2327            .unwrap();
2328
2329        assert_eq!(
2330            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2331            table.identifier()
2332        );
2333        assert_eq!(
2334            "s3://warehouse/database/table/metadata.json",
2335            table.metadata_location().unwrap()
2336        );
2337        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2338        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2339        assert_eq!(
2340            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2341            table.metadata().uuid()
2342        );
2343        assert_eq!(
2344            1657810967051,
2345            table
2346                .metadata()
2347                .last_updated_timestamp()
2348                .unwrap()
2349                .timestamp_millis()
2350        );
2351        assert_eq!(
2352            vec![&Arc::new(
2353                Schema::builder()
2354                    .with_fields(vec![
2355                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2356                            .into(),
2357                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2358                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2359                            .into(),
2360                    ])
2361                    .with_schema_id(0)
2362                    .with_identifier_field_ids(vec![2])
2363                    .build()
2364                    .unwrap()
2365            )],
2366            table.metadata().schemas_iter().collect::<Vec<_>>()
2367        );
2368        assert_eq!(
2369            &HashMap::from([
2370                (
2371                    "write.delete.parquet.compression-codec".to_string(),
2372                    "zstd".to_string()
2373                ),
2374                (
2375                    "write.metadata.compression-codec".to_string(),
2376                    "gzip".to_string()
2377                ),
2378                (
2379                    "write.summary.partition-limit".to_string(),
2380                    "100".to_string()
2381                ),
2382                (
2383                    "write.parquet.compression-codec".to_string(),
2384                    "zstd".to_string()
2385                ),
2386            ]),
2387            table.metadata().properties()
2388        );
2389        assert!(table.metadata().current_snapshot().is_none());
2390        assert!(table.metadata().history().is_empty());
2391        assert_eq!(
2392            vec![&Arc::new(SortOrder {
2393                order_id: 0,
2394                fields: vec![],
2395            })],
2396            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2397        );
2398
2399        config_mock.assert_async().await;
2400        create_table_mock.assert_async().await;
2401    }
2402
2403    #[tokio::test]
2404    async fn test_create_table_409() {
2405        let mut server = Server::new_async().await;
2406
2407        let config_mock = create_config_mock(&mut server).await;
2408
2409        let create_table_mock = server
2410            .mock("POST", "/v1/namespaces/ns1/tables")
2411            .with_status(409)
2412            .with_body(r#"
2413{
2414    "error": {
2415        "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2416        "type": "AlreadyExistsException",
2417        "code": 409
2418    }
2419}
2420            "#)
2421            .create_async()
2422            .await;
2423
2424        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2425
2426        let table_creation = TableCreation::builder()
2427            .name("test1".to_string())
2428            .schema(
2429                Schema::builder()
2430                    .with_fields(vec![
2431                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2432                            .into(),
2433                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2434                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2435                            .into(),
2436                    ])
2437                    .with_schema_id(1)
2438                    .with_identifier_field_ids(vec![2])
2439                    .build()
2440                    .unwrap(),
2441            )
2442            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2443            .build();
2444
2445        let table_result = catalog
2446            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2447            .await;
2448
2449        assert!(table_result.is_err());
2450        assert!(
2451            table_result
2452                .err()
2453                .unwrap()
2454                .message()
2455                .contains("already exists")
2456        );
2457
2458        config_mock.assert_async().await;
2459        create_table_mock.assert_async().await;
2460    }
2461
2462    #[tokio::test]
2463    async fn test_update_table() {
2464        let mut server = Server::new_async().await;
2465
2466        let config_mock = create_config_mock(&mut server).await;
2467
2468        let load_table_mock = server
2469            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2470            .with_status(200)
2471            .with_body_from_file(format!(
2472                "{}/testdata/{}",
2473                env!("CARGO_MANIFEST_DIR"),
2474                "load_table_response.json"
2475            ))
2476            .create_async()
2477            .await;
2478
2479        let update_table_mock = server
2480            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2481            .with_status(200)
2482            .with_body_from_file(format!(
2483                "{}/testdata/{}",
2484                env!("CARGO_MANIFEST_DIR"),
2485                "update_table_response.json"
2486            ))
2487            .create_async()
2488            .await;
2489
2490        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2491
2492        let table1 = {
2493            let file = File::open(format!(
2494                "{}/testdata/{}",
2495                env!("CARGO_MANIFEST_DIR"),
2496                "create_table_response.json"
2497            ))
2498            .unwrap();
2499            let reader = BufReader::new(file);
2500            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2501
2502            Table::builder()
2503                .metadata(resp.metadata)
2504                .metadata_location(resp.metadata_location.unwrap())
2505                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2506                .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2507                .build()
2508                .unwrap()
2509        };
2510
2511        let tx = Transaction::new(&table1);
2512        let table = tx
2513            .upgrade_table_version()
2514            .set_format_version(FormatVersion::V2)
2515            .apply(tx)
2516            .unwrap()
2517            .commit(&catalog)
2518            .await
2519            .unwrap();
2520
2521        assert_eq!(
2522            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2523            table.identifier()
2524        );
2525        assert_eq!(
2526            "s3://warehouse/database/table/metadata.json",
2527            table.metadata_location().unwrap()
2528        );
2529        assert_eq!(FormatVersion::V2, table.metadata().format_version());
2530        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2531        assert_eq!(
2532            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2533            table.metadata().uuid()
2534        );
2535        assert_eq!(
2536            1657810967051,
2537            table
2538                .metadata()
2539                .last_updated_timestamp()
2540                .unwrap()
2541                .timestamp_millis()
2542        );
2543        assert_eq!(
2544            vec![&Arc::new(
2545                Schema::builder()
2546                    .with_fields(vec![
2547                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2548                            .into(),
2549                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2550                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2551                            .into(),
2552                    ])
2553                    .with_schema_id(0)
2554                    .with_identifier_field_ids(vec![2])
2555                    .build()
2556                    .unwrap()
2557            )],
2558            table.metadata().schemas_iter().collect::<Vec<_>>()
2559        );
2560        assert_eq!(
2561            &HashMap::from([
2562                (
2563                    "write.delete.parquet.compression-codec".to_string(),
2564                    "zstd".to_string()
2565                ),
2566                (
2567                    "write.metadata.compression-codec".to_string(),
2568                    "gzip".to_string()
2569                ),
2570                (
2571                    "write.summary.partition-limit".to_string(),
2572                    "100".to_string()
2573                ),
2574                (
2575                    "write.parquet.compression-codec".to_string(),
2576                    "zstd".to_string()
2577                ),
2578            ]),
2579            table.metadata().properties()
2580        );
2581        assert!(table.metadata().current_snapshot().is_none());
2582        assert!(table.metadata().history().is_empty());
2583        assert_eq!(
2584            vec![&Arc::new(SortOrder {
2585                order_id: 0,
2586                fields: vec![],
2587            })],
2588            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2589        );
2590
2591        config_mock.assert_async().await;
2592        update_table_mock.assert_async().await;
2593        load_table_mock.assert_async().await
2594    }
2595
2596    #[tokio::test]
2597    async fn test_update_table_404() {
2598        let mut server = Server::new_async().await;
2599
2600        let config_mock = create_config_mock(&mut server).await;
2601
2602        let load_table_mock = server
2603            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2604            .with_status(200)
2605            .with_body_from_file(format!(
2606                "{}/testdata/{}",
2607                env!("CARGO_MANIFEST_DIR"),
2608                "load_table_response.json"
2609            ))
2610            .create_async()
2611            .await;
2612
2613        let update_table_mock = server
2614            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2615            .with_status(404)
2616            .with_body(
2617                r#"
2618{
2619    "error": {
2620        "message": "The given table does not exist",
2621        "type": "NoSuchTableException",
2622        "code": 404
2623    }
2624}
2625            "#,
2626            )
2627            .create_async()
2628            .await;
2629
2630        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2631
2632        let table1 = {
2633            let file = File::open(format!(
2634                "{}/testdata/{}",
2635                env!("CARGO_MANIFEST_DIR"),
2636                "create_table_response.json"
2637            ))
2638            .unwrap();
2639            let reader = BufReader::new(file);
2640            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2641
2642            Table::builder()
2643                .metadata(resp.metadata)
2644                .metadata_location(resp.metadata_location.unwrap())
2645                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2646                .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2647                .build()
2648                .unwrap()
2649        };
2650
2651        let tx = Transaction::new(&table1);
2652        let table_result = tx
2653            .upgrade_table_version()
2654            .set_format_version(FormatVersion::V2)
2655            .apply(tx)
2656            .unwrap()
2657            .commit(&catalog)
2658            .await;
2659
2660        assert!(table_result.is_err());
2661        assert!(
2662            table_result
2663                .err()
2664                .unwrap()
2665                .message()
2666                .contains("does not exist")
2667        );
2668
2669        config_mock.assert_async().await;
2670        update_table_mock.assert_async().await;
2671        load_table_mock.assert_async().await;
2672    }
2673
2674    #[tokio::test]
2675    async fn test_register_table() {
2676        let mut server = Server::new_async().await;
2677
2678        let config_mock = create_config_mock(&mut server).await;
2679
2680        let register_table_mock = server
2681            .mock("POST", "/v1/namespaces/ns1/register")
2682            .with_status(200)
2683            .with_body_from_file(format!(
2684                "{}/testdata/{}",
2685                env!("CARGO_MANIFEST_DIR"),
2686                "load_table_response.json"
2687            ))
2688            .create_async()
2689            .await;
2690
2691        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2692        let table_ident =
2693            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2694        let metadata_location = String::from(
2695            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2696        );
2697
2698        let table = catalog
2699            .register_table(&table_ident, metadata_location)
2700            .await
2701            .unwrap();
2702
2703        assert_eq!(
2704            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2705            table.identifier()
2706        );
2707        assert_eq!(
2708            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2709            table.metadata_location().unwrap()
2710        );
2711
2712        config_mock.assert_async().await;
2713        register_table_mock.assert_async().await;
2714    }
2715
2716    #[tokio::test]
2717    async fn test_register_table_404() {
2718        let mut server = Server::new_async().await;
2719
2720        let config_mock = create_config_mock(&mut server).await;
2721
2722        let register_table_mock = server
2723            .mock("POST", "/v1/namespaces/ns1/register")
2724            .with_status(404)
2725            .with_body(
2726                r#"
2727{
2728    "error": {
2729        "message": "The namespace specified does not exist",
2730        "type": "NoSuchNamespaceErrorException",
2731        "code": 404
2732    }
2733}
2734            "#,
2735            )
2736            .create_async()
2737            .await;
2738
2739        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2740
2741        let table_ident =
2742            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2743        let metadata_location = String::from(
2744            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2745        );
2746        let table = catalog
2747            .register_table(&table_ident, metadata_location)
2748            .await;
2749
2750        assert!(table.is_err());
2751        assert!(table.err().unwrap().message().contains("does not exist"));
2752
2753        config_mock.assert_async().await;
2754        register_table_mock.assert_async().await;
2755    }
2756
2757    #[tokio::test]
2758    async fn test_create_rest_catalog() {
2759        let builder = RestCatalogBuilder::default().with_client(Client::new());
2760
2761        let catalog = builder
2762            .load(
2763                "test",
2764                HashMap::from([
2765                    (
2766                        REST_CATALOG_PROP_URI.to_string(),
2767                        "http://localhost:8080".to_string(),
2768                    ),
2769                    ("a".to_string(), "b".to_string()),
2770                ]),
2771            )
2772            .await;
2773
2774        assert!(catalog.is_ok());
2775
2776        let catalog_config = catalog.unwrap().user_config;
2777        assert_eq!(catalog_config.name.as_deref(), Some("test"));
2778        assert_eq!(catalog_config.uri, "http://localhost:8080");
2779        assert_eq!(catalog_config.warehouse, None);
2780        assert!(catalog_config.client.is_some());
2781
2782        assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2783        assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2784    }
2785
2786    #[tokio::test]
2787    async fn test_create_rest_catalog_no_uri() {
2788        let builder = RestCatalogBuilder::default();
2789
2790        let catalog = builder
2791            .load(
2792                "test",
2793                HashMap::from([(
2794                    REST_CATALOG_PROP_WAREHOUSE.to_string(),
2795                    "s3://warehouse".to_string(),
2796                )]),
2797            )
2798            .await;
2799
2800        assert!(catalog.is_err());
2801        if let Err(err) = catalog {
2802            assert_eq!(err.kind(), ErrorKind::DataInvalid);
2803            assert_eq!(err.message(), "Catalog uri is required");
2804        }
2805    }
2806}