iceberg_catalog_rest/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains the iceberg REST catalog implementation.
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::future::Future;
23use std::str::FromStr;
24
25use async_trait::async_trait;
26use iceberg::io::{self, FileIO};
27use iceberg::table::Table;
28use iceberg::{
29    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30    TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34    HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41    HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44    CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45    CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46    NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49/// REST catalog URI
50pub const REST_CATALOG_PROP_URI: &str = "uri";
51/// REST catalog warehouse location
52pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
55const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
56const PATH_V1: &str = "v1";
57
58/// Builder for [`RestCatalog`].
59#[derive(Debug)]
60pub struct RestCatalogBuilder(RestCatalogConfig);
61
62impl Default for RestCatalogBuilder {
63    fn default() -> Self {
64        Self(RestCatalogConfig {
65            name: None,
66            uri: "".to_string(),
67            warehouse: None,
68            props: HashMap::new(),
69            client: None,
70        })
71    }
72}
73
74impl CatalogBuilder for RestCatalogBuilder {
75    type C = RestCatalog;
76
77    fn load(
78        mut self,
79        name: impl Into<String>,
80        props: HashMap<String, String>,
81    ) -> impl Future<Output = Result<Self::C>> + Send {
82        self.0.name = Some(name.into());
83
84        if props.contains_key(REST_CATALOG_PROP_URI) {
85            self.0.uri = props
86                .get(REST_CATALOG_PROP_URI)
87                .cloned()
88                .unwrap_or_default();
89        }
90
91        if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
92            self.0.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
93        }
94
95        // Collect other remaining properties
96        self.0.props = props
97            .into_iter()
98            .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
99            .collect();
100
101        let result = {
102            if self.0.name.is_none() {
103                Err(Error::new(
104                    ErrorKind::DataInvalid,
105                    "Catalog name is required",
106                ))
107            } else if self.0.uri.is_empty() {
108                Err(Error::new(
109                    ErrorKind::DataInvalid,
110                    "Catalog uri is required",
111                ))
112            } else {
113                Ok(RestCatalog::new(self.0))
114            }
115        };
116
117        std::future::ready(result)
118    }
119}
120
121impl RestCatalogBuilder {
122    /// Configures the catalog with a custom HTTP client.
123    pub fn with_client(mut self, client: Client) -> Self {
124        self.0.client = Some(client);
125        self
126    }
127}
128
129/// Rest catalog configuration.
130#[derive(Clone, Debug, TypedBuilder)]
131pub(crate) struct RestCatalogConfig {
132    #[builder(default, setter(strip_option))]
133    name: Option<String>,
134
135    uri: String,
136
137    #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
138    warehouse: Option<String>,
139
140    #[builder(default)]
141    props: HashMap<String, String>,
142
143    #[builder(default)]
144    client: Option<Client>,
145}
146
147impl RestCatalogConfig {
148    fn url_prefixed(&self, parts: &[&str]) -> String {
149        [&self.uri, PATH_V1]
150            .into_iter()
151            .chain(self.props.get("prefix").map(|s| &**s))
152            .chain(parts.iter().cloned())
153            .join("/")
154    }
155
156    fn config_endpoint(&self) -> String {
157        [&self.uri, PATH_V1, "config"].join("/")
158    }
159
160    pub(crate) fn get_token_endpoint(&self) -> String {
161        if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
162            oauth2_uri.to_string()
163        } else {
164            [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
165        }
166    }
167
168    fn namespaces_endpoint(&self) -> String {
169        self.url_prefixed(&["namespaces"])
170    }
171
172    fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
173        self.url_prefixed(&["namespaces", &ns.to_url_string()])
174    }
175
176    fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
177        self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
178    }
179
180    fn rename_table_endpoint(&self) -> String {
181        self.url_prefixed(&["tables", "rename"])
182    }
183
184    fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
185        self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
186    }
187
188    fn table_endpoint(&self, table: &TableIdent) -> String {
189        self.url_prefixed(&[
190            "namespaces",
191            &table.namespace.to_url_string(),
192            "tables",
193            &table.name,
194        ])
195    }
196
197    /// Get the client from the config.
198    pub(crate) fn client(&self) -> Option<Client> {
199        self.client.clone()
200    }
201
202    /// Get the token from the config.
203    ///
204    /// The client can use this token to send requests.
205    pub(crate) fn token(&self) -> Option<String> {
206        self.props.get("token").cloned()
207    }
208
209    /// Get the credentials from the config. The client can use these credentials to fetch a new
210    /// token.
211    ///
212    /// ## Output
213    ///
214    /// - `None`: No credential is set.
215    /// - `Some(None, client_secret)`: No client_id is set, use client_secret directly.
216    /// - `Some(Some(client_id), client_secret)`: Both client_id and client_secret are set.
217    pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
218        let cred = self.props.get("credential")?;
219
220        match cred.split_once(':') {
221            Some((client_id, client_secret)) => {
222                Some((Some(client_id.to_string()), client_secret.to_string()))
223            }
224            None => Some((None, cred.to_string())),
225        }
226    }
227
228    /// Get the extra headers from config, which includes:
229    ///
230    /// - `content-type`
231    /// - `x-client-version`
232    /// - `user-agent`
233    /// - All headers specified by `header.xxx` in props.
234    pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
235        let mut headers = HeaderMap::from_iter([
236            (
237                header::CONTENT_TYPE,
238                HeaderValue::from_static("application/json"),
239            ),
240            (
241                HeaderName::from_static("x-client-version"),
242                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
243            ),
244            (
245                header::USER_AGENT,
246                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
247            ),
248        ]);
249
250        for (key, value) in self
251            .props
252            .iter()
253            .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
254        {
255            headers.insert(
256                HeaderName::from_str(key).map_err(|e| {
257                    Error::new(
258                        ErrorKind::DataInvalid,
259                        format!("Invalid header name: {key}"),
260                    )
261                    .with_source(e)
262                })?,
263                HeaderValue::from_str(value).map_err(|e| {
264                    Error::new(
265                        ErrorKind::DataInvalid,
266                        format!("Invalid header value: {value}"),
267                    )
268                    .with_source(e)
269                })?,
270            );
271        }
272
273        Ok(headers)
274    }
275
276    /// Get the optional OAuth headers from the config.
277    pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
278        let mut params = HashMap::new();
279
280        if let Some(scope) = self.props.get("scope") {
281            params.insert("scope".to_string(), scope.to_string());
282        } else {
283            params.insert("scope".to_string(), "catalog".to_string());
284        }
285
286        let optional_params = ["audience", "resource"];
287        for param_name in optional_params {
288            if let Some(value) = self.props.get(param_name) {
289                params.insert(param_name.to_string(), value.to_string());
290            }
291        }
292
293        params
294    }
295
296    /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server).
297    pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
298        if let Some(uri) = config.overrides.remove("uri") {
299            self.uri = uri;
300        }
301
302        let mut props = config.defaults;
303        props.extend(self.props);
304        props.extend(config.overrides);
305
306        self.props = props;
307        self
308    }
309}
310
311#[derive(Debug)]
312struct RestContext {
313    client: HttpClient,
314    /// Runtime config is fetched from rest server and stored here.
315    ///
316    /// It's could be different from the user config.
317    config: RestCatalogConfig,
318}
319
320/// Rest catalog implementation.
321#[derive(Debug)]
322pub struct RestCatalog {
323    /// User config is stored as-is and never be changed.
324    ///
325    /// It could be different from the config fetched from the server and used at runtime.
326    user_config: RestCatalogConfig,
327    ctx: OnceCell<RestContext>,
328    /// Extensions for the FileIOBuilder.
329    file_io_extensions: io::Extensions,
330}
331
332impl RestCatalog {
333    /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
334    fn new(config: RestCatalogConfig) -> Self {
335        Self {
336            user_config: config,
337            ctx: OnceCell::new(),
338            file_io_extensions: io::Extensions::default(),
339        }
340    }
341
342    /// Add an extension to the file IO builder.
343    pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
344        self.file_io_extensions.add(ext);
345        self
346    }
347
348    /// Gets the [`RestContext`] from the catalog.
349    async fn context(&self) -> Result<&RestContext> {
350        self.ctx
351            .get_or_try_init(|| async {
352                let client = HttpClient::new(&self.user_config)?;
353                let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
354                let config = self.user_config.clone().merge_with_config(catalog_config);
355                let client = client.update_with(&config)?;
356
357                Ok(RestContext { config, client })
358            })
359            .await
360    }
361
362    /// Load the runtime config from the server by `user_config`.
363    ///
364    /// It's required for a REST catalog to update its config after creation.
365    async fn load_config(
366        client: &HttpClient,
367        user_config: &RestCatalogConfig,
368    ) -> Result<CatalogConfig> {
369        let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
370
371        if let Some(warehouse_location) = &user_config.warehouse {
372            request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
373        }
374
375        let request = request_builder.build()?;
376
377        let http_response = client.query_catalog(request).await?;
378
379        match http_response.status() {
380            StatusCode::OK => deserialize_catalog_response(http_response).await,
381            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
382        }
383    }
384
385    async fn load_file_io(
386        &self,
387        metadata_location: Option<&str>,
388        extra_config: Option<HashMap<String, String>>,
389    ) -> Result<FileIO> {
390        let mut props = self.context().await?.config.props.clone();
391        if let Some(config) = extra_config {
392            props.extend(config);
393        }
394
395        // If the warehouse is a logical identifier instead of a URL we don't want
396        // to raise an exception
397        let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
398            Some(url) if Url::parse(url).is_ok() => Some(url),
399            Some(_) => None,
400            None => None,
401        };
402
403        let file_io = match metadata_location.or(warehouse_path) {
404            Some(url) => FileIO::from_path(url)?
405                .with_props(props)
406                .with_extensions(self.file_io_extensions.clone())
407                .build()?,
408            None => {
409                return Err(Error::new(
410                    ErrorKind::Unexpected,
411                    "Unable to load file io, neither warehouse nor metadata location is set!",
412                ))?;
413            }
414        };
415
416        Ok(file_io)
417    }
418
419    /// Invalidate the current token without generating a new one. On the next request, the client
420    /// will attempt to generate a new token.
421    pub async fn invalidate_token(&self) -> Result<()> {
422        self.context().await?.client.invalidate_token().await
423    }
424
425    /// Invalidate the current token and set a new one. Generates a new token before invalidating
426    /// the current token, meaning the old token will be used until this function acquires the lock
427    /// and overwrites the token.
428    ///
429    /// If credential is invalid, or the request fails, this method will return an error and leave
430    /// the current token unchanged.
431    pub async fn regenerate_token(&self) -> Result<()> {
432        self.context().await?.client.regenerate_token().await
433    }
434}
435
436/// All requests and expected responses are derived from the REST catalog API spec:
437/// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
438#[async_trait]
439impl Catalog for RestCatalog {
440    async fn list_namespaces(
441        &self,
442        parent: Option<&NamespaceIdent>,
443    ) -> Result<Vec<NamespaceIdent>> {
444        let context = self.context().await?;
445        let endpoint = context.config.namespaces_endpoint();
446        let mut namespaces = Vec::new();
447        let mut next_token = None;
448
449        loop {
450            let mut request = context.client.request(Method::GET, endpoint.clone());
451
452            // Filter on `parent={namespace}` if a parent namespace exists.
453            if let Some(ns) = parent {
454                request = request.query(&[("parent", ns.to_url_string())]);
455            }
456
457            if let Some(token) = next_token {
458                request = request.query(&[("pageToken", token)]);
459            }
460
461            let http_response = context.client.query_catalog(request.build()?).await?;
462
463            match http_response.status() {
464                StatusCode::OK => {
465                    let response =
466                        deserialize_catalog_response::<ListNamespaceResponse>(http_response)
467                            .await?;
468
469                    namespaces.extend(response.namespaces);
470
471                    match response.next_page_token {
472                        Some(token) => next_token = Some(token),
473                        None => break,
474                    }
475                }
476                StatusCode::NOT_FOUND => {
477                    return Err(Error::new(
478                        ErrorKind::Unexpected,
479                        "The parent parameter of the namespace provided does not exist",
480                    ));
481                }
482                _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
483            }
484        }
485
486        Ok(namespaces)
487    }
488
489    async fn create_namespace(
490        &self,
491        namespace: &NamespaceIdent,
492        properties: HashMap<String, String>,
493    ) -> Result<Namespace> {
494        let context = self.context().await?;
495
496        let request = context
497            .client
498            .request(Method::POST, context.config.namespaces_endpoint())
499            .json(&CreateNamespaceRequest {
500                namespace: namespace.clone(),
501                properties,
502            })
503            .build()?;
504
505        let http_response = context.client.query_catalog(request).await?;
506
507        match http_response.status() {
508            StatusCode::OK => {
509                let response =
510                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
511                Ok(Namespace::from(response))
512            }
513            StatusCode::CONFLICT => Err(Error::new(
514                ErrorKind::Unexpected,
515                "Tried to create a namespace that already exists",
516            )),
517            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
518        }
519    }
520
521    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
522        let context = self.context().await?;
523
524        let request = context
525            .client
526            .request(Method::GET, context.config.namespace_endpoint(namespace))
527            .build()?;
528
529        let http_response = context.client.query_catalog(request).await?;
530
531        match http_response.status() {
532            StatusCode::OK => {
533                let response =
534                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
535                Ok(Namespace::from(response))
536            }
537            StatusCode::NOT_FOUND => Err(Error::new(
538                ErrorKind::Unexpected,
539                "Tried to get a namespace that does not exist",
540            )),
541            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
542        }
543    }
544
545    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
546        let context = self.context().await?;
547
548        let request = context
549            .client
550            .request(Method::HEAD, context.config.namespace_endpoint(ns))
551            .build()?;
552
553        let http_response = context.client.query_catalog(request).await?;
554
555        match http_response.status() {
556            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
557            StatusCode::NOT_FOUND => Ok(false),
558            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
559        }
560    }
561
562    async fn update_namespace(
563        &self,
564        _namespace: &NamespaceIdent,
565        _properties: HashMap<String, String>,
566    ) -> Result<()> {
567        Err(Error::new(
568            ErrorKind::FeatureUnsupported,
569            "Updating namespace not supported yet!",
570        ))
571    }
572
573    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
574        let context = self.context().await?;
575
576        let request = context
577            .client
578            .request(Method::DELETE, context.config.namespace_endpoint(namespace))
579            .build()?;
580
581        let http_response = context.client.query_catalog(request).await?;
582
583        match http_response.status() {
584            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
585            StatusCode::NOT_FOUND => Err(Error::new(
586                ErrorKind::Unexpected,
587                "Tried to drop a namespace that does not exist",
588            )),
589            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
590        }
591    }
592
593    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
594        let context = self.context().await?;
595        let endpoint = context.config.tables_endpoint(namespace);
596        let mut identifiers = Vec::new();
597        let mut next_token = None;
598
599        loop {
600            let mut request = context.client.request(Method::GET, endpoint.clone());
601
602            if let Some(token) = next_token {
603                request = request.query(&[("pageToken", token)]);
604            }
605
606            let http_response = context.client.query_catalog(request.build()?).await?;
607
608            match http_response.status() {
609                StatusCode::OK => {
610                    let response =
611                        deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
612
613                    identifiers.extend(response.identifiers);
614
615                    match response.next_page_token {
616                        Some(token) => next_token = Some(token),
617                        None => break,
618                    }
619                }
620                StatusCode::NOT_FOUND => {
621                    return Err(Error::new(
622                        ErrorKind::Unexpected,
623                        "Tried to list tables of a namespace that does not exist",
624                    ));
625                }
626                _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
627            }
628        }
629
630        Ok(identifiers)
631    }
632
633    /// Create a new table inside the namespace.
634    ///
635    /// In the resulting table, if there are any config properties that
636    /// are present in both the response from the REST server and the
637    /// config provided when creating this `RestCatalog` instance then
638    /// the value provided locally to the `RestCatalog` will take precedence.
639    async fn create_table(
640        &self,
641        namespace: &NamespaceIdent,
642        creation: TableCreation,
643    ) -> Result<Table> {
644        let context = self.context().await?;
645
646        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
647
648        let request = context
649            .client
650            .request(Method::POST, context.config.tables_endpoint(namespace))
651            .json(&CreateTableRequest {
652                name: creation.name,
653                location: creation.location,
654                schema: creation.schema,
655                partition_spec: creation.partition_spec,
656                write_order: creation.sort_order,
657                stage_create: Some(false),
658                properties: creation.properties,
659            })
660            .build()?;
661
662        let http_response = context.client.query_catalog(request).await?;
663
664        let response = match http_response.status() {
665            StatusCode::OK => {
666                deserialize_catalog_response::<LoadTableResult>(http_response).await?
667            }
668            StatusCode::NOT_FOUND => {
669                return Err(Error::new(
670                    ErrorKind::Unexpected,
671                    "Tried to create a table under a namespace that does not exist",
672                ));
673            }
674            StatusCode::CONFLICT => {
675                return Err(Error::new(
676                    ErrorKind::Unexpected,
677                    "The table already exists",
678                ));
679            }
680            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
681        };
682
683        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
684            ErrorKind::DataInvalid,
685            "Metadata location missing in `create_table` response!",
686        ))?;
687
688        let config = response
689            .config
690            .into_iter()
691            .chain(self.user_config.props.clone())
692            .collect();
693
694        let file_io = self
695            .load_file_io(Some(metadata_location), Some(config))
696            .await?;
697
698        let table_builder = Table::builder()
699            .identifier(table_ident.clone())
700            .file_io(file_io)
701            .metadata(response.metadata);
702
703        if let Some(metadata_location) = response.metadata_location {
704            table_builder.metadata_location(metadata_location).build()
705        } else {
706            table_builder.build()
707        }
708    }
709
710    /// Load table from the catalog.
711    ///
712    /// If there are any config properties that are present in both the response from the REST
713    /// server and the config provided when creating this `RestCatalog` instance, then the value
714    /// provided locally to the `RestCatalog` will take precedence.
715    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
716        let context = self.context().await?;
717
718        let request = context
719            .client
720            .request(Method::GET, context.config.table_endpoint(table_ident))
721            .build()?;
722
723        let http_response = context.client.query_catalog(request).await?;
724
725        let response = match http_response.status() {
726            StatusCode::OK | StatusCode::NOT_MODIFIED => {
727                deserialize_catalog_response::<LoadTableResult>(http_response).await?
728            }
729            StatusCode::NOT_FOUND => {
730                return Err(Error::new(
731                    ErrorKind::Unexpected,
732                    "Tried to load a table that does not exist",
733                ));
734            }
735            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
736        };
737
738        let config = response
739            .config
740            .into_iter()
741            .chain(self.user_config.props.clone())
742            .collect();
743
744        let file_io = self
745            .load_file_io(response.metadata_location.as_deref(), Some(config))
746            .await?;
747
748        let table_builder = Table::builder()
749            .identifier(table_ident.clone())
750            .file_io(file_io)
751            .metadata(response.metadata);
752
753        if let Some(metadata_location) = response.metadata_location {
754            table_builder.metadata_location(metadata_location).build()
755        } else {
756            table_builder.build()
757        }
758    }
759
760    /// Drop a table from the catalog.
761    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
762        let context = self.context().await?;
763
764        let request = context
765            .client
766            .request(Method::DELETE, context.config.table_endpoint(table))
767            .build()?;
768
769        let http_response = context.client.query_catalog(request).await?;
770
771        match http_response.status() {
772            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
773            StatusCode::NOT_FOUND => Err(Error::new(
774                ErrorKind::Unexpected,
775                "Tried to drop a table that does not exist",
776            )),
777            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
778        }
779    }
780
781    /// Check if a table exists in the catalog.
782    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
783        let context = self.context().await?;
784
785        let request = context
786            .client
787            .request(Method::HEAD, context.config.table_endpoint(table))
788            .build()?;
789
790        let http_response = context.client.query_catalog(request).await?;
791
792        match http_response.status() {
793            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
794            StatusCode::NOT_FOUND => Ok(false),
795            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
796        }
797    }
798
799    /// Rename a table in the catalog.
800    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
801        let context = self.context().await?;
802
803        let request = context
804            .client
805            .request(Method::POST, context.config.rename_table_endpoint())
806            .json(&RenameTableRequest {
807                source: src.clone(),
808                destination: dest.clone(),
809            })
810            .build()?;
811
812        let http_response = context.client.query_catalog(request).await?;
813
814        match http_response.status() {
815            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
816            StatusCode::NOT_FOUND => Err(Error::new(
817                ErrorKind::Unexpected,
818                "Tried to rename a table that does not exist (is the namespace correct?)",
819            )),
820            StatusCode::CONFLICT => Err(Error::new(
821                ErrorKind::Unexpected,
822                "Tried to rename a table to a name that already exists",
823            )),
824            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
825        }
826    }
827
828    async fn register_table(
829        &self,
830        table_ident: &TableIdent,
831        metadata_location: String,
832    ) -> Result<Table> {
833        let context = self.context().await?;
834
835        let request = context
836            .client
837            .request(
838                Method::POST,
839                context
840                    .config
841                    .register_table_endpoint(table_ident.namespace()),
842            )
843            .json(&RegisterTableRequest {
844                name: table_ident.name.clone(),
845                metadata_location: metadata_location.clone(),
846                overwrite: Some(false),
847            })
848            .build()?;
849
850        let http_response = context.client.query_catalog(request).await?;
851
852        let response: LoadTableResult = match http_response.status() {
853            StatusCode::OK => {
854                deserialize_catalog_response::<LoadTableResult>(http_response).await?
855            }
856            StatusCode::NOT_FOUND => {
857                return Err(Error::new(
858                    ErrorKind::NamespaceNotFound,
859                    "The namespace specified does not exist.",
860                ));
861            }
862            StatusCode::CONFLICT => {
863                return Err(Error::new(
864                    ErrorKind::TableAlreadyExists,
865                    "The given table already exists.",
866                ));
867            }
868            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
869        };
870
871        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
872            ErrorKind::DataInvalid,
873            "Metadata location missing in `register_table` response!",
874        ))?;
875
876        let file_io = self.load_file_io(Some(metadata_location), None).await?;
877
878        Table::builder()
879            .identifier(table_ident.clone())
880            .file_io(file_io)
881            .metadata(response.metadata)
882            .metadata_location(metadata_location.clone())
883            .build()
884    }
885
886    async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
887        let context = self.context().await?;
888
889        let request = context
890            .client
891            .request(
892                Method::POST,
893                context.config.table_endpoint(commit.identifier()),
894            )
895            .json(&CommitTableRequest {
896                identifier: Some(commit.identifier().clone()),
897                requirements: commit.take_requirements(),
898                updates: commit.take_updates(),
899            })
900            .build()?;
901
902        let http_response = context.client.query_catalog(request).await?;
903
904        let response: CommitTableResponse = match http_response.status() {
905            StatusCode::OK => deserialize_catalog_response(http_response).await?,
906            StatusCode::NOT_FOUND => {
907                return Err(Error::new(
908                    ErrorKind::TableNotFound,
909                    "Tried to update a table that does not exist",
910                ));
911            }
912            StatusCode::CONFLICT => {
913                return Err(Error::new(
914                    ErrorKind::CatalogCommitConflicts,
915                    "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
916                )
917                .with_retryable(true));
918            }
919            StatusCode::INTERNAL_SERVER_ERROR => {
920                return Err(Error::new(
921                    ErrorKind::Unexpected,
922                    "An unknown server-side problem occurred; the commit state is unknown.",
923                ));
924            }
925            StatusCode::BAD_GATEWAY => {
926                return Err(Error::new(
927                    ErrorKind::Unexpected,
928                    "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
929                ));
930            }
931            StatusCode::GATEWAY_TIMEOUT => {
932                return Err(Error::new(
933                    ErrorKind::Unexpected,
934                    "A server-side gateway timeout occurred; the commit state is unknown.",
935                ));
936            }
937            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
938        };
939
940        let file_io = self
941            .load_file_io(Some(&response.metadata_location), None)
942            .await?;
943
944        Table::builder()
945            .identifier(commit.identifier().clone())
946            .file_io(file_io)
947            .metadata(response.metadata)
948            .metadata_location(response.metadata_location)
949            .build()
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use std::fs::File;
956    use std::io::BufReader;
957    use std::sync::Arc;
958
959    use chrono::{TimeZone, Utc};
960    use iceberg::spec::{
961        FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
962        SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
963        UnboundPartitionField, UnboundPartitionSpec,
964    };
965    use iceberg::transaction::{ApplyTransactionAction, Transaction};
966    use mockito::{Mock, Server, ServerGuard};
967    use serde_json::json;
968    use uuid::uuid;
969
970    use super::*;
971
972    #[tokio::test]
973    async fn test_update_config() {
974        let mut server = Server::new_async().await;
975
976        let config_mock = server
977            .mock("GET", "/v1/config")
978            .with_status(200)
979            .with_body(
980                r#"{
981                "overrides": {
982                    "warehouse": "s3://iceberg-catalog"
983                },
984                "defaults": {}
985            }"#,
986            )
987            .create_async()
988            .await;
989
990        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
991
992        assert_eq!(
993            catalog
994                .context()
995                .await
996                .unwrap()
997                .config
998                .props
999                .get("warehouse"),
1000            Some(&"s3://iceberg-catalog".to_string())
1001        );
1002
1003        config_mock.assert_async().await;
1004    }
1005
1006    async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1007        server
1008            .mock("GET", "/v1/config")
1009            .with_status(200)
1010            .with_body(
1011                r#"{
1012                "overrides": {
1013                    "warehouse": "s3://iceberg-catalog"
1014                },
1015                "defaults": {}
1016            }"#,
1017            )
1018            .create_async()
1019            .await
1020    }
1021
1022    async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1023        create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1024    }
1025
1026    async fn create_oauth_mock_with_path(
1027        server: &mut ServerGuard,
1028        path: &str,
1029        token: &str,
1030        status: usize,
1031    ) -> Mock {
1032        let body = format!(
1033            r#"{{
1034                "access_token": "{token}",
1035                "token_type": "Bearer",
1036                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1037                "expires_in": 86400
1038            }}"#
1039        );
1040        server
1041            .mock("POST", path)
1042            .with_status(status)
1043            .with_body(body)
1044            .expect(1)
1045            .create_async()
1046            .await
1047    }
1048
1049    #[tokio::test]
1050    async fn test_oauth() {
1051        let mut server = Server::new_async().await;
1052        let oauth_mock = create_oauth_mock(&mut server).await;
1053        let config_mock = create_config_mock(&mut server).await;
1054
1055        let mut props = HashMap::new();
1056        props.insert("credential".to_string(), "client1:secret1".to_string());
1057
1058        let catalog = RestCatalog::new(
1059            RestCatalogConfig::builder()
1060                .uri(server.url())
1061                .props(props)
1062                .build(),
1063        );
1064
1065        let token = catalog.context().await.unwrap().client.token().await;
1066        oauth_mock.assert_async().await;
1067        config_mock.assert_async().await;
1068        assert_eq!(token, Some("ey000000000000".to_string()));
1069    }
1070
1071    #[tokio::test]
1072    async fn test_oauth_with_optional_param() {
1073        let mut props = HashMap::new();
1074        props.insert("credential".to_string(), "client1:secret1".to_string());
1075        props.insert("scope".to_string(), "custom_scope".to_string());
1076        props.insert("audience".to_string(), "custom_audience".to_string());
1077        props.insert("resource".to_string(), "custom_resource".to_string());
1078
1079        let mut server = Server::new_async().await;
1080        let oauth_mock = server
1081            .mock("POST", "/v1/oauth/tokens")
1082            .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1083            .match_body(mockito::Matcher::Regex(
1084                "audience=custom_audience".to_string(),
1085            ))
1086            .match_body(mockito::Matcher::Regex(
1087                "resource=custom_resource".to_string(),
1088            ))
1089            .with_status(200)
1090            .with_body(
1091                r#"{
1092                "access_token": "ey000000000000",
1093                "token_type": "Bearer",
1094                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1095                "expires_in": 86400
1096                }"#,
1097            )
1098            .expect(1)
1099            .create_async()
1100            .await;
1101
1102        let config_mock = create_config_mock(&mut server).await;
1103
1104        let catalog = RestCatalog::new(
1105            RestCatalogConfig::builder()
1106                .uri(server.url())
1107                .props(props)
1108                .build(),
1109        );
1110
1111        let token = catalog.context().await.unwrap().client.token().await;
1112
1113        oauth_mock.assert_async().await;
1114        config_mock.assert_async().await;
1115        assert_eq!(token, Some("ey000000000000".to_string()));
1116    }
1117
1118    #[tokio::test]
1119    async fn test_invalidate_token() {
1120        let mut server = Server::new_async().await;
1121        let oauth_mock = create_oauth_mock(&mut server).await;
1122        let config_mock = create_config_mock(&mut server).await;
1123
1124        let mut props = HashMap::new();
1125        props.insert("credential".to_string(), "client1:secret1".to_string());
1126
1127        let catalog = RestCatalog::new(
1128            RestCatalogConfig::builder()
1129                .uri(server.url())
1130                .props(props)
1131                .build(),
1132        );
1133
1134        let token = catalog.context().await.unwrap().client.token().await;
1135        oauth_mock.assert_async().await;
1136        config_mock.assert_async().await;
1137        assert_eq!(token, Some("ey000000000000".to_string()));
1138
1139        let oauth_mock =
1140            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1141                .await;
1142        catalog.invalidate_token().await.unwrap();
1143        let token = catalog.context().await.unwrap().client.token().await;
1144        oauth_mock.assert_async().await;
1145        assert_eq!(token, Some("ey000000000001".to_string()));
1146    }
1147
1148    #[tokio::test]
1149    async fn test_invalidate_token_failing_request() {
1150        let mut server = Server::new_async().await;
1151        let oauth_mock = create_oauth_mock(&mut server).await;
1152        let config_mock = create_config_mock(&mut server).await;
1153
1154        let mut props = HashMap::new();
1155        props.insert("credential".to_string(), "client1:secret1".to_string());
1156
1157        let catalog = RestCatalog::new(
1158            RestCatalogConfig::builder()
1159                .uri(server.url())
1160                .props(props)
1161                .build(),
1162        );
1163
1164        let token = catalog.context().await.unwrap().client.token().await;
1165        oauth_mock.assert_async().await;
1166        config_mock.assert_async().await;
1167        assert_eq!(token, Some("ey000000000000".to_string()));
1168
1169        let oauth_mock =
1170            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1171                .await;
1172        catalog.invalidate_token().await.unwrap();
1173        let token = catalog.context().await.unwrap().client.token().await;
1174        oauth_mock.assert_async().await;
1175        assert_eq!(token, None);
1176    }
1177
1178    #[tokio::test]
1179    async fn test_regenerate_token() {
1180        let mut server = Server::new_async().await;
1181        let oauth_mock = create_oauth_mock(&mut server).await;
1182        let config_mock = create_config_mock(&mut server).await;
1183
1184        let mut props = HashMap::new();
1185        props.insert("credential".to_string(), "client1:secret1".to_string());
1186
1187        let catalog = RestCatalog::new(
1188            RestCatalogConfig::builder()
1189                .uri(server.url())
1190                .props(props)
1191                .build(),
1192        );
1193
1194        let token = catalog.context().await.unwrap().client.token().await;
1195        oauth_mock.assert_async().await;
1196        config_mock.assert_async().await;
1197        assert_eq!(token, Some("ey000000000000".to_string()));
1198
1199        let oauth_mock =
1200            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1201                .await;
1202        catalog.regenerate_token().await.unwrap();
1203        oauth_mock.assert_async().await;
1204        let token = catalog.context().await.unwrap().client.token().await;
1205        assert_eq!(token, Some("ey000000000001".to_string()));
1206    }
1207
1208    #[tokio::test]
1209    async fn test_regenerate_token_failing_request() {
1210        let mut server = Server::new_async().await;
1211        let oauth_mock = create_oauth_mock(&mut server).await;
1212        let config_mock = create_config_mock(&mut server).await;
1213
1214        let mut props = HashMap::new();
1215        props.insert("credential".to_string(), "client1:secret1".to_string());
1216
1217        let catalog = RestCatalog::new(
1218            RestCatalogConfig::builder()
1219                .uri(server.url())
1220                .props(props)
1221                .build(),
1222        );
1223
1224        let token = catalog.context().await.unwrap().client.token().await;
1225        oauth_mock.assert_async().await;
1226        config_mock.assert_async().await;
1227        assert_eq!(token, Some("ey000000000000".to_string()));
1228
1229        let oauth_mock =
1230            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1231                .await;
1232        let invalidate_result = catalog.regenerate_token().await;
1233        assert!(invalidate_result.is_err());
1234        oauth_mock.assert_async().await;
1235        let token = catalog.context().await.unwrap().client.token().await;
1236
1237        // original token is left intact
1238        assert_eq!(token, Some("ey000000000000".to_string()));
1239    }
1240
1241    #[tokio::test]
1242    async fn test_http_headers() {
1243        let server = Server::new_async().await;
1244        let mut props = HashMap::new();
1245        props.insert("credential".to_string(), "client1:secret1".to_string());
1246
1247        let config = RestCatalogConfig::builder()
1248            .uri(server.url())
1249            .props(props)
1250            .build();
1251        let headers: HeaderMap = config.extra_headers().unwrap();
1252
1253        let expected_headers = HeaderMap::from_iter([
1254            (
1255                header::CONTENT_TYPE,
1256                HeaderValue::from_static("application/json"),
1257            ),
1258            (
1259                HeaderName::from_static("x-client-version"),
1260                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1261            ),
1262            (
1263                header::USER_AGENT,
1264                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1265            ),
1266        ]);
1267        assert_eq!(headers, expected_headers);
1268    }
1269
1270    #[tokio::test]
1271    async fn test_http_headers_with_custom_headers() {
1272        let server = Server::new_async().await;
1273        let mut props = HashMap::new();
1274        props.insert("credential".to_string(), "client1:secret1".to_string());
1275        props.insert(
1276            "header.content-type".to_string(),
1277            "application/yaml".to_string(),
1278        );
1279        props.insert(
1280            "header.customized-header".to_string(),
1281            "some/value".to_string(),
1282        );
1283
1284        let config = RestCatalogConfig::builder()
1285            .uri(server.url())
1286            .props(props)
1287            .build();
1288        let headers: HeaderMap = config.extra_headers().unwrap();
1289
1290        let expected_headers = HeaderMap::from_iter([
1291            (
1292                header::CONTENT_TYPE,
1293                HeaderValue::from_static("application/yaml"),
1294            ),
1295            (
1296                HeaderName::from_static("x-client-version"),
1297                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1298            ),
1299            (
1300                header::USER_AGENT,
1301                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1302            ),
1303            (
1304                HeaderName::from_static("customized-header"),
1305                HeaderValue::from_static("some/value"),
1306            ),
1307        ]);
1308        assert_eq!(headers, expected_headers);
1309    }
1310
1311    #[tokio::test]
1312    async fn test_oauth_with_oauth2_server_uri() {
1313        let mut server = Server::new_async().await;
1314        let config_mock = create_config_mock(&mut server).await;
1315
1316        let mut auth_server = Server::new_async().await;
1317        let auth_server_path = "/some/path";
1318        let oauth_mock =
1319            create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1320                .await;
1321
1322        let mut props = HashMap::new();
1323        props.insert("credential".to_string(), "client1:secret1".to_string());
1324        props.insert(
1325            "oauth2-server-uri".to_string(),
1326            format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1327        );
1328
1329        let catalog = RestCatalog::new(
1330            RestCatalogConfig::builder()
1331                .uri(server.url())
1332                .props(props)
1333                .build(),
1334        );
1335
1336        let token = catalog.context().await.unwrap().client.token().await;
1337
1338        oauth_mock.assert_async().await;
1339        config_mock.assert_async().await;
1340        assert_eq!(token, Some("ey000000000000".to_string()));
1341    }
1342
1343    #[tokio::test]
1344    async fn test_config_override() {
1345        let mut server = Server::new_async().await;
1346        let mut redirect_server = Server::new_async().await;
1347        let new_uri = redirect_server.url();
1348
1349        let config_mock = server
1350            .mock("GET", "/v1/config")
1351            .with_status(200)
1352            .with_body(
1353                json!(
1354                    {
1355                        "overrides": {
1356                            "uri": new_uri,
1357                            "warehouse": "s3://iceberg-catalog",
1358                            "prefix": "ice/warehouses/my"
1359                        },
1360                        "defaults": {},
1361                    }
1362                )
1363                .to_string(),
1364            )
1365            .create_async()
1366            .await;
1367
1368        let list_ns_mock = redirect_server
1369            .mock("GET", "/v1/ice/warehouses/my/namespaces")
1370            .with_body(
1371                r#"{
1372                    "namespaces": []
1373                }"#,
1374            )
1375            .create_async()
1376            .await;
1377
1378        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1379
1380        let _namespaces = catalog.list_namespaces(None).await.unwrap();
1381
1382        config_mock.assert_async().await;
1383        list_ns_mock.assert_async().await;
1384    }
1385
1386    #[tokio::test]
1387    async fn test_list_namespace() {
1388        let mut server = Server::new_async().await;
1389
1390        let config_mock = create_config_mock(&mut server).await;
1391
1392        let list_ns_mock = server
1393            .mock("GET", "/v1/namespaces")
1394            .with_body(
1395                r#"{
1396                "namespaces": [
1397                    ["ns1", "ns11"],
1398                    ["ns2"]
1399                ]
1400            }"#,
1401            )
1402            .create_async()
1403            .await;
1404
1405        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1406
1407        let namespaces = catalog.list_namespaces(None).await.unwrap();
1408
1409        let expected_ns = vec![
1410            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1411            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1412        ];
1413
1414        assert_eq!(expected_ns, namespaces);
1415
1416        config_mock.assert_async().await;
1417        list_ns_mock.assert_async().await;
1418    }
1419
1420    #[tokio::test]
1421    async fn test_list_namespace_with_pagination() {
1422        let mut server = Server::new_async().await;
1423
1424        let config_mock = create_config_mock(&mut server).await;
1425
1426        let list_ns_mock_page1 = server
1427            .mock("GET", "/v1/namespaces")
1428            .with_body(
1429                r#"{
1430                "namespaces": [
1431                    ["ns1", "ns11"],
1432                    ["ns2"]
1433                ],
1434                "next-page-token": "token123"
1435            }"#,
1436            )
1437            .create_async()
1438            .await;
1439
1440        let list_ns_mock_page2 = server
1441            .mock("GET", "/v1/namespaces?pageToken=token123")
1442            .with_body(
1443                r#"{
1444                "namespaces": [
1445                    ["ns3"],
1446                    ["ns4", "ns41"]
1447                ]
1448            }"#,
1449            )
1450            .create_async()
1451            .await;
1452
1453        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1454
1455        let namespaces = catalog.list_namespaces(None).await.unwrap();
1456
1457        let expected_ns = vec![
1458            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1459            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1460            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1461            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1462        ];
1463
1464        assert_eq!(expected_ns, namespaces);
1465
1466        config_mock.assert_async().await;
1467        list_ns_mock_page1.assert_async().await;
1468        list_ns_mock_page2.assert_async().await;
1469    }
1470
1471    #[tokio::test]
1472    async fn test_list_namespace_with_multiple_pages() {
1473        let mut server = Server::new_async().await;
1474
1475        let config_mock = create_config_mock(&mut server).await;
1476
1477        // Page 1
1478        let list_ns_mock_page1 = server
1479            .mock("GET", "/v1/namespaces")
1480            .with_body(
1481                r#"{
1482                "namespaces": [
1483                    ["ns1", "ns11"],
1484                    ["ns2"]
1485                ],
1486                "next-page-token": "page2"
1487            }"#,
1488            )
1489            .create_async()
1490            .await;
1491
1492        // Page 2
1493        let list_ns_mock_page2 = server
1494            .mock("GET", "/v1/namespaces?pageToken=page2")
1495            .with_body(
1496                r#"{
1497                "namespaces": [
1498                    ["ns3"],
1499                    ["ns4", "ns41"]
1500                ],
1501                "next-page-token": "page3"
1502            }"#,
1503            )
1504            .create_async()
1505            .await;
1506
1507        // Page 3
1508        let list_ns_mock_page3 = server
1509            .mock("GET", "/v1/namespaces?pageToken=page3")
1510            .with_body(
1511                r#"{
1512                "namespaces": [
1513                    ["ns5", "ns51", "ns511"]
1514                ],
1515                "next-page-token": "page4"
1516            }"#,
1517            )
1518            .create_async()
1519            .await;
1520
1521        // Page 4
1522        let list_ns_mock_page4 = server
1523            .mock("GET", "/v1/namespaces?pageToken=page4")
1524            .with_body(
1525                r#"{
1526                "namespaces": [
1527                    ["ns6"],
1528                    ["ns7"]
1529                ],
1530                "next-page-token": "page5"
1531            }"#,
1532            )
1533            .create_async()
1534            .await;
1535
1536        // Page 5 (final page)
1537        let list_ns_mock_page5 = server
1538            .mock("GET", "/v1/namespaces?pageToken=page5")
1539            .with_body(
1540                r#"{
1541                "namespaces": [
1542                    ["ns8", "ns81"]
1543                ]
1544            }"#,
1545            )
1546            .create_async()
1547            .await;
1548
1549        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1550
1551        let namespaces = catalog.list_namespaces(None).await.unwrap();
1552
1553        let expected_ns = vec![
1554            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1555            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1556            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1557            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1558            NamespaceIdent::from_vec(vec![
1559                "ns5".to_string(),
1560                "ns51".to_string(),
1561                "ns511".to_string(),
1562            ])
1563            .unwrap(),
1564            NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1565            NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1566            NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1567        ];
1568
1569        assert_eq!(expected_ns, namespaces);
1570
1571        // Verify all page requests were made
1572        config_mock.assert_async().await;
1573        list_ns_mock_page1.assert_async().await;
1574        list_ns_mock_page2.assert_async().await;
1575        list_ns_mock_page3.assert_async().await;
1576        list_ns_mock_page4.assert_async().await;
1577        list_ns_mock_page5.assert_async().await;
1578    }
1579
1580    #[tokio::test]
1581    async fn test_create_namespace() {
1582        let mut server = Server::new_async().await;
1583
1584        let config_mock = create_config_mock(&mut server).await;
1585
1586        let create_ns_mock = server
1587            .mock("POST", "/v1/namespaces")
1588            .with_body(
1589                r#"{
1590                "namespace": [ "ns1", "ns11"],
1591                "properties" : {
1592                    "key1": "value1"
1593                }
1594            }"#,
1595            )
1596            .create_async()
1597            .await;
1598
1599        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1600
1601        let namespaces = catalog
1602            .create_namespace(
1603                &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1604                HashMap::from([("key1".to_string(), "value1".to_string())]),
1605            )
1606            .await
1607            .unwrap();
1608
1609        let expected_ns = Namespace::with_properties(
1610            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1611            HashMap::from([("key1".to_string(), "value1".to_string())]),
1612        );
1613
1614        assert_eq!(expected_ns, namespaces);
1615
1616        config_mock.assert_async().await;
1617        create_ns_mock.assert_async().await;
1618    }
1619
1620    #[tokio::test]
1621    async fn test_get_namespace() {
1622        let mut server = Server::new_async().await;
1623
1624        let config_mock = create_config_mock(&mut server).await;
1625
1626        let get_ns_mock = server
1627            .mock("GET", "/v1/namespaces/ns1")
1628            .with_body(
1629                r#"{
1630                "namespace": [ "ns1"],
1631                "properties" : {
1632                    "key1": "value1"
1633                }
1634            }"#,
1635            )
1636            .create_async()
1637            .await;
1638
1639        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1640
1641        let namespaces = catalog
1642            .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1643            .await
1644            .unwrap();
1645
1646        let expected_ns = Namespace::with_properties(
1647            NamespaceIdent::new("ns1".to_string()),
1648            HashMap::from([("key1".to_string(), "value1".to_string())]),
1649        );
1650
1651        assert_eq!(expected_ns, namespaces);
1652
1653        config_mock.assert_async().await;
1654        get_ns_mock.assert_async().await;
1655    }
1656
1657    #[tokio::test]
1658    async fn check_namespace_exists() {
1659        let mut server = Server::new_async().await;
1660
1661        let config_mock = create_config_mock(&mut server).await;
1662
1663        let get_ns_mock = server
1664            .mock("HEAD", "/v1/namespaces/ns1")
1665            .with_status(204)
1666            .create_async()
1667            .await;
1668
1669        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1670
1671        assert!(
1672            catalog
1673                .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1674                .await
1675                .unwrap()
1676        );
1677
1678        config_mock.assert_async().await;
1679        get_ns_mock.assert_async().await;
1680    }
1681
1682    #[tokio::test]
1683    async fn test_drop_namespace() {
1684        let mut server = Server::new_async().await;
1685
1686        let config_mock = create_config_mock(&mut server).await;
1687
1688        let drop_ns_mock = server
1689            .mock("DELETE", "/v1/namespaces/ns1")
1690            .with_status(204)
1691            .create_async()
1692            .await;
1693
1694        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1695
1696        catalog
1697            .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1698            .await
1699            .unwrap();
1700
1701        config_mock.assert_async().await;
1702        drop_ns_mock.assert_async().await;
1703    }
1704
1705    #[tokio::test]
1706    async fn test_list_tables() {
1707        let mut server = Server::new_async().await;
1708
1709        let config_mock = create_config_mock(&mut server).await;
1710
1711        let list_tables_mock = server
1712            .mock("GET", "/v1/namespaces/ns1/tables")
1713            .with_status(200)
1714            .with_body(
1715                r#"{
1716                "identifiers": [
1717                    {
1718                        "namespace": ["ns1"],
1719                        "name": "table1"
1720                    },
1721                    {
1722                        "namespace": ["ns1"],
1723                        "name": "table2"
1724                    }
1725                ]
1726            }"#,
1727            )
1728            .create_async()
1729            .await;
1730
1731        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1732
1733        let tables = catalog
1734            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1735            .await
1736            .unwrap();
1737
1738        let expected_tables = vec![
1739            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1740            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1741        ];
1742
1743        assert_eq!(tables, expected_tables);
1744
1745        config_mock.assert_async().await;
1746        list_tables_mock.assert_async().await;
1747    }
1748
1749    #[tokio::test]
1750    async fn test_list_tables_with_pagination() {
1751        let mut server = Server::new_async().await;
1752
1753        let config_mock = create_config_mock(&mut server).await;
1754
1755        let list_tables_mock_page1 = server
1756            .mock("GET", "/v1/namespaces/ns1/tables")
1757            .with_status(200)
1758            .with_body(
1759                r#"{
1760                "identifiers": [
1761                    {
1762                        "namespace": ["ns1"],
1763                        "name": "table1"
1764                    },
1765                    {
1766                        "namespace": ["ns1"],
1767                        "name": "table2"
1768                    }
1769                ],
1770                "next-page-token": "token456"
1771            }"#,
1772            )
1773            .create_async()
1774            .await;
1775
1776        let list_tables_mock_page2 = server
1777            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1778            .with_status(200)
1779            .with_body(
1780                r#"{
1781                "identifiers": [
1782                    {
1783                        "namespace": ["ns1"],
1784                        "name": "table3"
1785                    },
1786                    {
1787                        "namespace": ["ns1"],
1788                        "name": "table4"
1789                    }
1790                ]
1791            }"#,
1792            )
1793            .create_async()
1794            .await;
1795
1796        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1797
1798        let tables = catalog
1799            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1800            .await
1801            .unwrap();
1802
1803        let expected_tables = vec![
1804            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1805            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1806            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1807            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1808        ];
1809
1810        assert_eq!(tables, expected_tables);
1811
1812        config_mock.assert_async().await;
1813        list_tables_mock_page1.assert_async().await;
1814        list_tables_mock_page2.assert_async().await;
1815    }
1816
1817    #[tokio::test]
1818    async fn test_list_tables_with_multiple_pages() {
1819        let mut server = Server::new_async().await;
1820
1821        let config_mock = create_config_mock(&mut server).await;
1822
1823        // Page 1
1824        let list_tables_mock_page1 = server
1825            .mock("GET", "/v1/namespaces/ns1/tables")
1826            .with_status(200)
1827            .with_body(
1828                r#"{
1829                "identifiers": [
1830                    {
1831                        "namespace": ["ns1"],
1832                        "name": "table1"
1833                    },
1834                    {
1835                        "namespace": ["ns1"],
1836                        "name": "table2"
1837                    }
1838                ],
1839                "next-page-token": "page2"
1840            }"#,
1841            )
1842            .create_async()
1843            .await;
1844
1845        // Page 2
1846        let list_tables_mock_page2 = server
1847            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1848            .with_status(200)
1849            .with_body(
1850                r#"{
1851                "identifiers": [
1852                    {
1853                        "namespace": ["ns1"],
1854                        "name": "table3"
1855                    },
1856                    {
1857                        "namespace": ["ns1"],
1858                        "name": "table4"
1859                    }
1860                ],
1861                "next-page-token": "page3"
1862            }"#,
1863            )
1864            .create_async()
1865            .await;
1866
1867        // Page 3
1868        let list_tables_mock_page3 = server
1869            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
1870            .with_status(200)
1871            .with_body(
1872                r#"{
1873                "identifiers": [
1874                    {
1875                        "namespace": ["ns1"],
1876                        "name": "table5"
1877                    }
1878                ],
1879                "next-page-token": "page4"
1880            }"#,
1881            )
1882            .create_async()
1883            .await;
1884
1885        // Page 4
1886        let list_tables_mock_page4 = server
1887            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
1888            .with_status(200)
1889            .with_body(
1890                r#"{
1891                "identifiers": [
1892                    {
1893                        "namespace": ["ns1"],
1894                        "name": "table6"
1895                    },
1896                    {
1897                        "namespace": ["ns1"],
1898                        "name": "table7"
1899                    }
1900                ],
1901                "next-page-token": "page5"
1902            }"#,
1903            )
1904            .create_async()
1905            .await;
1906
1907        // Page 5 (final page)
1908        let list_tables_mock_page5 = server
1909            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
1910            .with_status(200)
1911            .with_body(
1912                r#"{
1913                "identifiers": [
1914                    {
1915                        "namespace": ["ns1"],
1916                        "name": "table8"
1917                    }
1918                ]
1919            }"#,
1920            )
1921            .create_async()
1922            .await;
1923
1924        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1925
1926        let tables = catalog
1927            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1928            .await
1929            .unwrap();
1930
1931        let expected_tables = vec![
1932            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1933            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1934            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1935            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1936            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
1937            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
1938            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
1939            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
1940        ];
1941
1942        assert_eq!(tables, expected_tables);
1943
1944        // Verify all page requests were made
1945        config_mock.assert_async().await;
1946        list_tables_mock_page1.assert_async().await;
1947        list_tables_mock_page2.assert_async().await;
1948        list_tables_mock_page3.assert_async().await;
1949        list_tables_mock_page4.assert_async().await;
1950        list_tables_mock_page5.assert_async().await;
1951    }
1952
1953    #[tokio::test]
1954    async fn test_drop_tables() {
1955        let mut server = Server::new_async().await;
1956
1957        let config_mock = create_config_mock(&mut server).await;
1958
1959        let delete_table_mock = server
1960            .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
1961            .with_status(204)
1962            .create_async()
1963            .await;
1964
1965        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1966
1967        catalog
1968            .drop_table(&TableIdent::new(
1969                NamespaceIdent::new("ns1".to_string()),
1970                "table1".to_string(),
1971            ))
1972            .await
1973            .unwrap();
1974
1975        config_mock.assert_async().await;
1976        delete_table_mock.assert_async().await;
1977    }
1978
1979    #[tokio::test]
1980    async fn test_check_table_exists() {
1981        let mut server = Server::new_async().await;
1982
1983        let config_mock = create_config_mock(&mut server).await;
1984
1985        let check_table_exists_mock = server
1986            .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
1987            .with_status(204)
1988            .create_async()
1989            .await;
1990
1991        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1992
1993        assert!(
1994            catalog
1995                .table_exists(&TableIdent::new(
1996                    NamespaceIdent::new("ns1".to_string()),
1997                    "table1".to_string(),
1998                ))
1999                .await
2000                .unwrap()
2001        );
2002
2003        config_mock.assert_async().await;
2004        check_table_exists_mock.assert_async().await;
2005    }
2006
2007    #[tokio::test]
2008    async fn test_rename_table() {
2009        let mut server = Server::new_async().await;
2010
2011        let config_mock = create_config_mock(&mut server).await;
2012
2013        let rename_table_mock = server
2014            .mock("POST", "/v1/tables/rename")
2015            .with_status(204)
2016            .create_async()
2017            .await;
2018
2019        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2020
2021        catalog
2022            .rename_table(
2023                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2024                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2025            )
2026            .await
2027            .unwrap();
2028
2029        config_mock.assert_async().await;
2030        rename_table_mock.assert_async().await;
2031    }
2032
2033    #[tokio::test]
2034    async fn test_load_table() {
2035        let mut server = Server::new_async().await;
2036
2037        let config_mock = create_config_mock(&mut server).await;
2038
2039        let rename_table_mock = server
2040            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2041            .with_status(200)
2042            .with_body_from_file(format!(
2043                "{}/testdata/{}",
2044                env!("CARGO_MANIFEST_DIR"),
2045                "load_table_response.json"
2046            ))
2047            .create_async()
2048            .await;
2049
2050        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2051
2052        let table = catalog
2053            .load_table(&TableIdent::new(
2054                NamespaceIdent::new("ns1".to_string()),
2055                "test1".to_string(),
2056            ))
2057            .await
2058            .unwrap();
2059
2060        assert_eq!(
2061            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2062            table.identifier()
2063        );
2064        assert_eq!(
2065            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2066            table.metadata_location().unwrap()
2067        );
2068        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2069        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2070        assert_eq!(
2071            uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2072            table.metadata().uuid()
2073        );
2074        assert_eq!(
2075            Utc.timestamp_millis_opt(1646787054459).unwrap(),
2076            table.metadata().last_updated_timestamp().unwrap()
2077        );
2078        assert_eq!(
2079            vec![&Arc::new(
2080                Schema::builder()
2081                    .with_fields(vec![
2082                        NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2083                        NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2084                            .into(),
2085                    ])
2086                    .build()
2087                    .unwrap()
2088            )],
2089            table.metadata().schemas_iter().collect::<Vec<_>>()
2090        );
2091        assert_eq!(
2092            &HashMap::from([
2093                ("owner".to_string(), "bryan".to_string()),
2094                (
2095                    "write.metadata.compression-codec".to_string(),
2096                    "gzip".to_string()
2097                )
2098            ]),
2099            table.metadata().properties()
2100        );
2101        assert_eq!(vec![&Arc::new(Snapshot::builder()
2102            .with_snapshot_id(3497810964824022504)
2103            .with_timestamp_ms(1646787054459)
2104            .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2105            .with_sequence_number(0)
2106            .with_schema_id(0)
2107            .with_summary(Summary {
2108                operation: Operation::Append,
2109                additional_properties: HashMap::from_iter([
2110                    ("spark.app.id", "local-1646787004168"),
2111                    ("added-data-files", "1"),
2112                    ("added-records", "1"),
2113                    ("added-files-size", "697"),
2114                    ("changed-partition-count", "1"),
2115                    ("total-records", "1"),
2116                    ("total-files-size", "697"),
2117                    ("total-data-files", "1"),
2118                    ("total-delete-files", "0"),
2119                    ("total-position-deletes", "0"),
2120                    ("total-equality-deletes", "0")
2121                ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2122            }).build()
2123        )], table.metadata().snapshots().collect::<Vec<_>>());
2124        assert_eq!(
2125            &[SnapshotLog {
2126                timestamp_ms: 1646787054459,
2127                snapshot_id: 3497810964824022504,
2128            }],
2129            table.metadata().history()
2130        );
2131        assert_eq!(
2132            vec![&Arc::new(SortOrder {
2133                order_id: 0,
2134                fields: vec![],
2135            })],
2136            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2137        );
2138
2139        config_mock.assert_async().await;
2140        rename_table_mock.assert_async().await;
2141    }
2142
2143    #[tokio::test]
2144    async fn test_load_table_404() {
2145        let mut server = Server::new_async().await;
2146
2147        let config_mock = create_config_mock(&mut server).await;
2148
2149        let rename_table_mock = server
2150            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2151            .with_status(404)
2152            .with_body(r#"
2153{
2154    "error": {
2155        "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2156        "type": "NoSuchNamespaceErrorException",
2157        "code": 404
2158    }
2159}
2160            "#)
2161            .create_async()
2162            .await;
2163
2164        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2165
2166        let table = catalog
2167            .load_table(&TableIdent::new(
2168                NamespaceIdent::new("ns1".to_string()),
2169                "test1".to_string(),
2170            ))
2171            .await;
2172
2173        assert!(table.is_err());
2174        assert!(table.err().unwrap().message().contains("does not exist"));
2175
2176        config_mock.assert_async().await;
2177        rename_table_mock.assert_async().await;
2178    }
2179
2180    #[tokio::test]
2181    async fn test_create_table() {
2182        let mut server = Server::new_async().await;
2183
2184        let config_mock = create_config_mock(&mut server).await;
2185
2186        let create_table_mock = server
2187            .mock("POST", "/v1/namespaces/ns1/tables")
2188            .with_status(200)
2189            .with_body_from_file(format!(
2190                "{}/testdata/{}",
2191                env!("CARGO_MANIFEST_DIR"),
2192                "create_table_response.json"
2193            ))
2194            .create_async()
2195            .await;
2196
2197        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2198
2199        let table_creation = TableCreation::builder()
2200            .name("test1".to_string())
2201            .schema(
2202                Schema::builder()
2203                    .with_fields(vec![
2204                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2205                            .into(),
2206                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2207                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2208                            .into(),
2209                    ])
2210                    .with_schema_id(1)
2211                    .with_identifier_field_ids(vec![2])
2212                    .build()
2213                    .unwrap(),
2214            )
2215            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2216            .partition_spec(
2217                UnboundPartitionSpec::builder()
2218                    .add_partition_fields(vec![
2219                        UnboundPartitionField::builder()
2220                            .source_id(1)
2221                            .transform(Transform::Truncate(3))
2222                            .name("id".to_string())
2223                            .build(),
2224                    ])
2225                    .unwrap()
2226                    .build(),
2227            )
2228            .sort_order(
2229                SortOrder::builder()
2230                    .with_sort_field(
2231                        SortField::builder()
2232                            .source_id(2)
2233                            .transform(Transform::Identity)
2234                            .direction(SortDirection::Ascending)
2235                            .null_order(NullOrder::First)
2236                            .build(),
2237                    )
2238                    .build_unbound()
2239                    .unwrap(),
2240            )
2241            .build();
2242
2243        let table = catalog
2244            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2245            .await
2246            .unwrap();
2247
2248        assert_eq!(
2249            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2250            table.identifier()
2251        );
2252        assert_eq!(
2253            "s3://warehouse/database/table/metadata.json",
2254            table.metadata_location().unwrap()
2255        );
2256        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2257        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2258        assert_eq!(
2259            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2260            table.metadata().uuid()
2261        );
2262        assert_eq!(
2263            1657810967051,
2264            table
2265                .metadata()
2266                .last_updated_timestamp()
2267                .unwrap()
2268                .timestamp_millis()
2269        );
2270        assert_eq!(
2271            vec![&Arc::new(
2272                Schema::builder()
2273                    .with_fields(vec![
2274                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2275                            .into(),
2276                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2277                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2278                            .into(),
2279                    ])
2280                    .with_schema_id(0)
2281                    .with_identifier_field_ids(vec![2])
2282                    .build()
2283                    .unwrap()
2284            )],
2285            table.metadata().schemas_iter().collect::<Vec<_>>()
2286        );
2287        assert_eq!(
2288            &HashMap::from([
2289                (
2290                    "write.delete.parquet.compression-codec".to_string(),
2291                    "zstd".to_string()
2292                ),
2293                (
2294                    "write.metadata.compression-codec".to_string(),
2295                    "gzip".to_string()
2296                ),
2297                (
2298                    "write.summary.partition-limit".to_string(),
2299                    "100".to_string()
2300                ),
2301                (
2302                    "write.parquet.compression-codec".to_string(),
2303                    "zstd".to_string()
2304                ),
2305            ]),
2306            table.metadata().properties()
2307        );
2308        assert!(table.metadata().current_snapshot().is_none());
2309        assert!(table.metadata().history().is_empty());
2310        assert_eq!(
2311            vec![&Arc::new(SortOrder {
2312                order_id: 0,
2313                fields: vec![],
2314            })],
2315            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2316        );
2317
2318        config_mock.assert_async().await;
2319        create_table_mock.assert_async().await;
2320    }
2321
2322    #[tokio::test]
2323    async fn test_create_table_409() {
2324        let mut server = Server::new_async().await;
2325
2326        let config_mock = create_config_mock(&mut server).await;
2327
2328        let create_table_mock = server
2329            .mock("POST", "/v1/namespaces/ns1/tables")
2330            .with_status(409)
2331            .with_body(r#"
2332{
2333    "error": {
2334        "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2335        "type": "AlreadyExistsException",
2336        "code": 409
2337    }
2338}
2339            "#)
2340            .create_async()
2341            .await;
2342
2343        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2344
2345        let table_creation = TableCreation::builder()
2346            .name("test1".to_string())
2347            .schema(
2348                Schema::builder()
2349                    .with_fields(vec![
2350                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2351                            .into(),
2352                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2353                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2354                            .into(),
2355                    ])
2356                    .with_schema_id(1)
2357                    .with_identifier_field_ids(vec![2])
2358                    .build()
2359                    .unwrap(),
2360            )
2361            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2362            .build();
2363
2364        let table_result = catalog
2365            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2366            .await;
2367
2368        assert!(table_result.is_err());
2369        assert!(
2370            table_result
2371                .err()
2372                .unwrap()
2373                .message()
2374                .contains("already exists")
2375        );
2376
2377        config_mock.assert_async().await;
2378        create_table_mock.assert_async().await;
2379    }
2380
2381    #[tokio::test]
2382    async fn test_update_table() {
2383        let mut server = Server::new_async().await;
2384
2385        let config_mock = create_config_mock(&mut server).await;
2386
2387        let load_table_mock = server
2388            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2389            .with_status(200)
2390            .with_body_from_file(format!(
2391                "{}/testdata/{}",
2392                env!("CARGO_MANIFEST_DIR"),
2393                "load_table_response.json"
2394            ))
2395            .create_async()
2396            .await;
2397
2398        let update_table_mock = server
2399            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2400            .with_status(200)
2401            .with_body_from_file(format!(
2402                "{}/testdata/{}",
2403                env!("CARGO_MANIFEST_DIR"),
2404                "update_table_response.json"
2405            ))
2406            .create_async()
2407            .await;
2408
2409        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2410
2411        let table1 = {
2412            let file = File::open(format!(
2413                "{}/testdata/{}",
2414                env!("CARGO_MANIFEST_DIR"),
2415                "create_table_response.json"
2416            ))
2417            .unwrap();
2418            let reader = BufReader::new(file);
2419            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2420
2421            Table::builder()
2422                .metadata(resp.metadata)
2423                .metadata_location(resp.metadata_location.unwrap())
2424                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2425                .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2426                .build()
2427                .unwrap()
2428        };
2429
2430        let tx = Transaction::new(&table1);
2431        let table = tx
2432            .upgrade_table_version()
2433            .set_format_version(FormatVersion::V2)
2434            .apply(tx)
2435            .unwrap()
2436            .commit(&catalog)
2437            .await
2438            .unwrap();
2439
2440        assert_eq!(
2441            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2442            table.identifier()
2443        );
2444        assert_eq!(
2445            "s3://warehouse/database/table/metadata.json",
2446            table.metadata_location().unwrap()
2447        );
2448        assert_eq!(FormatVersion::V2, table.metadata().format_version());
2449        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2450        assert_eq!(
2451            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2452            table.metadata().uuid()
2453        );
2454        assert_eq!(
2455            1657810967051,
2456            table
2457                .metadata()
2458                .last_updated_timestamp()
2459                .unwrap()
2460                .timestamp_millis()
2461        );
2462        assert_eq!(
2463            vec![&Arc::new(
2464                Schema::builder()
2465                    .with_fields(vec![
2466                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2467                            .into(),
2468                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2469                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2470                            .into(),
2471                    ])
2472                    .with_schema_id(0)
2473                    .with_identifier_field_ids(vec![2])
2474                    .build()
2475                    .unwrap()
2476            )],
2477            table.metadata().schemas_iter().collect::<Vec<_>>()
2478        );
2479        assert_eq!(
2480            &HashMap::from([
2481                (
2482                    "write.delete.parquet.compression-codec".to_string(),
2483                    "zstd".to_string()
2484                ),
2485                (
2486                    "write.metadata.compression-codec".to_string(),
2487                    "gzip".to_string()
2488                ),
2489                (
2490                    "write.summary.partition-limit".to_string(),
2491                    "100".to_string()
2492                ),
2493                (
2494                    "write.parquet.compression-codec".to_string(),
2495                    "zstd".to_string()
2496                ),
2497            ]),
2498            table.metadata().properties()
2499        );
2500        assert!(table.metadata().current_snapshot().is_none());
2501        assert!(table.metadata().history().is_empty());
2502        assert_eq!(
2503            vec![&Arc::new(SortOrder {
2504                order_id: 0,
2505                fields: vec![],
2506            })],
2507            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2508        );
2509
2510        config_mock.assert_async().await;
2511        update_table_mock.assert_async().await;
2512        load_table_mock.assert_async().await
2513    }
2514
2515    #[tokio::test]
2516    async fn test_update_table_404() {
2517        let mut server = Server::new_async().await;
2518
2519        let config_mock = create_config_mock(&mut server).await;
2520
2521        let load_table_mock = server
2522            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2523            .with_status(200)
2524            .with_body_from_file(format!(
2525                "{}/testdata/{}",
2526                env!("CARGO_MANIFEST_DIR"),
2527                "load_table_response.json"
2528            ))
2529            .create_async()
2530            .await;
2531
2532        let update_table_mock = server
2533            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2534            .with_status(404)
2535            .with_body(
2536                r#"
2537{
2538    "error": {
2539        "message": "The given table does not exist",
2540        "type": "NoSuchTableException",
2541        "code": 404
2542    }
2543}
2544            "#,
2545            )
2546            .create_async()
2547            .await;
2548
2549        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2550
2551        let table1 = {
2552            let file = File::open(format!(
2553                "{}/testdata/{}",
2554                env!("CARGO_MANIFEST_DIR"),
2555                "create_table_response.json"
2556            ))
2557            .unwrap();
2558            let reader = BufReader::new(file);
2559            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2560
2561            Table::builder()
2562                .metadata(resp.metadata)
2563                .metadata_location(resp.metadata_location.unwrap())
2564                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2565                .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2566                .build()
2567                .unwrap()
2568        };
2569
2570        let tx = Transaction::new(&table1);
2571        let table_result = tx
2572            .upgrade_table_version()
2573            .set_format_version(FormatVersion::V2)
2574            .apply(tx)
2575            .unwrap()
2576            .commit(&catalog)
2577            .await;
2578
2579        assert!(table_result.is_err());
2580        assert!(
2581            table_result
2582                .err()
2583                .unwrap()
2584                .message()
2585                .contains("does not exist")
2586        );
2587
2588        config_mock.assert_async().await;
2589        update_table_mock.assert_async().await;
2590        load_table_mock.assert_async().await;
2591    }
2592
2593    #[tokio::test]
2594    async fn test_register_table() {
2595        let mut server = Server::new_async().await;
2596
2597        let config_mock = create_config_mock(&mut server).await;
2598
2599        let register_table_mock = server
2600            .mock("POST", "/v1/namespaces/ns1/register")
2601            .with_status(200)
2602            .with_body_from_file(format!(
2603                "{}/testdata/{}",
2604                env!("CARGO_MANIFEST_DIR"),
2605                "load_table_response.json"
2606            ))
2607            .create_async()
2608            .await;
2609
2610        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2611        let table_ident =
2612            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2613        let metadata_location = String::from(
2614            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2615        );
2616
2617        let table = catalog
2618            .register_table(&table_ident, metadata_location)
2619            .await
2620            .unwrap();
2621
2622        assert_eq!(
2623            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2624            table.identifier()
2625        );
2626        assert_eq!(
2627            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2628            table.metadata_location().unwrap()
2629        );
2630
2631        config_mock.assert_async().await;
2632        register_table_mock.assert_async().await;
2633    }
2634
2635    #[tokio::test]
2636    async fn test_register_table_404() {
2637        let mut server = Server::new_async().await;
2638
2639        let config_mock = create_config_mock(&mut server).await;
2640
2641        let register_table_mock = server
2642            .mock("POST", "/v1/namespaces/ns1/register")
2643            .with_status(404)
2644            .with_body(
2645                r#"
2646{
2647    "error": {
2648        "message": "The namespace specified does not exist",
2649        "type": "NoSuchNamespaceErrorException",
2650        "code": 404
2651    }
2652}
2653            "#,
2654            )
2655            .create_async()
2656            .await;
2657
2658        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2659
2660        let table_ident =
2661            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2662        let metadata_location = String::from(
2663            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2664        );
2665        let table = catalog
2666            .register_table(&table_ident, metadata_location)
2667            .await;
2668
2669        assert!(table.is_err());
2670        assert!(table.err().unwrap().message().contains("does not exist"));
2671
2672        config_mock.assert_async().await;
2673        register_table_mock.assert_async().await;
2674    }
2675
2676    #[tokio::test]
2677    async fn test_create_rest_catalog() {
2678        let builder = RestCatalogBuilder::default().with_client(Client::new());
2679
2680        let catalog = builder
2681            .load(
2682                "test",
2683                HashMap::from([
2684                    (
2685                        REST_CATALOG_PROP_URI.to_string(),
2686                        "http://localhost:8080".to_string(),
2687                    ),
2688                    ("a".to_string(), "b".to_string()),
2689                ]),
2690            )
2691            .await;
2692
2693        assert!(catalog.is_ok());
2694
2695        let catalog_config = catalog.unwrap().user_config;
2696        assert_eq!(catalog_config.name.as_deref(), Some("test"));
2697        assert_eq!(catalog_config.uri, "http://localhost:8080");
2698        assert_eq!(catalog_config.warehouse, None);
2699        assert!(catalog_config.client.is_some());
2700
2701        assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2702        assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2703    }
2704
2705    #[tokio::test]
2706    async fn test_create_rest_catalog_no_uri() {
2707        let builder = RestCatalogBuilder::default();
2708
2709        let catalog = builder
2710            .load(
2711                "test",
2712                HashMap::from([(
2713                    REST_CATALOG_PROP_WAREHOUSE.to_string(),
2714                    "s3://warehouse".to_string(),
2715                )]),
2716            )
2717            .await;
2718
2719        assert!(catalog.is_err());
2720        if let Err(err) = catalog {
2721            assert_eq!(err.kind(), ErrorKind::DataInvalid);
2722            assert_eq!(err.message(), "Catalog uri is required");
2723        }
2724    }
2725}