iceberg_catalog_rest/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains the iceberg REST catalog implementation.
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::future::Future;
23use std::str::FromStr;
24
25use async_trait::async_trait;
26use iceberg::io::{self, FileIO};
27use iceberg::table::Table;
28use iceberg::{
29    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30    TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34    HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41    HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44    CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
45    ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde,
46    RegisterTableRequest, RenameTableRequest,
47};
48
49/// REST catalog URI
50pub const REST_CATALOG_PROP_URI: &str = "uri";
51/// REST catalog warehouse location
52pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
55const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
56const PATH_V1: &str = "v1";
57
58/// Builder for [`RestCatalog`].
59#[derive(Debug)]
60pub struct RestCatalogBuilder(RestCatalogConfig);
61
62impl Default for RestCatalogBuilder {
63    fn default() -> Self {
64        Self(RestCatalogConfig {
65            name: None,
66            uri: "".to_string(),
67            warehouse: None,
68            props: HashMap::new(),
69            client: None,
70        })
71    }
72}
73
74impl CatalogBuilder for RestCatalogBuilder {
75    type C = RestCatalog;
76
77    fn load(
78        mut self,
79        name: impl Into<String>,
80        props: HashMap<String, String>,
81    ) -> impl Future<Output = Result<Self::C>> + Send {
82        self.0.name = Some(name.into());
83
84        if props.contains_key(REST_CATALOG_PROP_URI) {
85            self.0.uri = props
86                .get(REST_CATALOG_PROP_URI)
87                .cloned()
88                .unwrap_or_default();
89        }
90
91        if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
92            self.0.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
93        }
94
95        // Collect other remaining properties
96        self.0.props = props
97            .into_iter()
98            .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
99            .collect();
100
101        let result = {
102            if self.0.name.is_none() {
103                Err(Error::new(
104                    ErrorKind::DataInvalid,
105                    "Catalog name is required",
106                ))
107            } else if self.0.uri.is_empty() {
108                Err(Error::new(
109                    ErrorKind::DataInvalid,
110                    "Catalog uri is required",
111                ))
112            } else {
113                Ok(RestCatalog::new(self.0))
114            }
115        };
116
117        std::future::ready(result)
118    }
119}
120
121impl RestCatalogBuilder {
122    /// Configures the catalog with a custom HTTP client.
123    pub fn with_client(mut self, client: Client) -> Self {
124        self.0.client = Some(client);
125        self
126    }
127}
128
129/// Rest catalog configuration.
130#[derive(Clone, Debug, TypedBuilder)]
131pub(crate) struct RestCatalogConfig {
132    #[builder(default, setter(strip_option))]
133    name: Option<String>,
134
135    uri: String,
136
137    #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
138    warehouse: Option<String>,
139
140    #[builder(default)]
141    props: HashMap<String, String>,
142
143    #[builder(default)]
144    client: Option<Client>,
145}
146
147impl RestCatalogConfig {
148    fn url_prefixed(&self, parts: &[&str]) -> String {
149        [&self.uri, PATH_V1]
150            .into_iter()
151            .chain(self.props.get("prefix").map(|s| &**s))
152            .chain(parts.iter().cloned())
153            .join("/")
154    }
155
156    fn config_endpoint(&self) -> String {
157        [&self.uri, PATH_V1, "config"].join("/")
158    }
159
160    pub(crate) fn get_token_endpoint(&self) -> String {
161        if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
162            oauth2_uri.to_string()
163        } else {
164            [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
165        }
166    }
167
168    fn namespaces_endpoint(&self) -> String {
169        self.url_prefixed(&["namespaces"])
170    }
171
172    fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
173        self.url_prefixed(&["namespaces", &ns.to_url_string()])
174    }
175
176    fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
177        self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
178    }
179
180    fn rename_table_endpoint(&self) -> String {
181        self.url_prefixed(&["tables", "rename"])
182    }
183
184    fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
185        self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
186    }
187
188    fn table_endpoint(&self, table: &TableIdent) -> String {
189        self.url_prefixed(&[
190            "namespaces",
191            &table.namespace.to_url_string(),
192            "tables",
193            &table.name,
194        ])
195    }
196
197    /// Get the client from the config.
198    pub(crate) fn client(&self) -> Option<Client> {
199        self.client.clone()
200    }
201
202    /// Get the token from the config.
203    ///
204    /// The client can use this token to send requests.
205    pub(crate) fn token(&self) -> Option<String> {
206        self.props.get("token").cloned()
207    }
208
209    /// Get the credentials from the config. The client can use these credentials to fetch a new
210    /// token.
211    ///
212    /// ## Output
213    ///
214    /// - `None`: No credential is set.
215    /// - `Some(None, client_secret)`: No client_id is set, use client_secret directly.
216    /// - `Some(Some(client_id), client_secret)`: Both client_id and client_secret are set.
217    pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
218        let cred = self.props.get("credential")?;
219
220        match cred.split_once(':') {
221            Some((client_id, client_secret)) => {
222                Some((Some(client_id.to_string()), client_secret.to_string()))
223            }
224            None => Some((None, cred.to_string())),
225        }
226    }
227
228    /// Get the extra headers from config, which includes:
229    ///
230    /// - `content-type`
231    /// - `x-client-version`
232    /// - `user-agent`
233    /// - All headers specified by `header.xxx` in props.
234    pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
235        let mut headers = HeaderMap::from_iter([
236            (
237                header::CONTENT_TYPE,
238                HeaderValue::from_static("application/json"),
239            ),
240            (
241                HeaderName::from_static("x-client-version"),
242                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
243            ),
244            (
245                header::USER_AGENT,
246                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
247            ),
248        ]);
249
250        for (key, value) in self
251            .props
252            .iter()
253            .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
254        {
255            headers.insert(
256                HeaderName::from_str(key).map_err(|e| {
257                    Error::new(
258                        ErrorKind::DataInvalid,
259                        format!("Invalid header name: {key}"),
260                    )
261                    .with_source(e)
262                })?,
263                HeaderValue::from_str(value).map_err(|e| {
264                    Error::new(
265                        ErrorKind::DataInvalid,
266                        format!("Invalid header value: {value}"),
267                    )
268                    .with_source(e)
269                })?,
270            );
271        }
272
273        Ok(headers)
274    }
275
276    /// Get the optional OAuth headers from the config.
277    pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
278        let mut params = HashMap::new();
279
280        if let Some(scope) = self.props.get("scope") {
281            params.insert("scope".to_string(), scope.to_string());
282        } else {
283            params.insert("scope".to_string(), "catalog".to_string());
284        }
285
286        let optional_params = ["audience", "resource"];
287        for param_name in optional_params {
288            if let Some(value) = self.props.get(param_name) {
289                params.insert(param_name.to_string(), value.to_string());
290            }
291        }
292
293        params
294    }
295
296    /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server).
297    pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
298        if let Some(uri) = config.overrides.remove("uri") {
299            self.uri = uri;
300        }
301
302        let mut props = config.defaults;
303        props.extend(self.props);
304        props.extend(config.overrides);
305
306        self.props = props;
307        self
308    }
309}
310
311#[derive(Debug)]
312struct RestContext {
313    client: HttpClient,
314    /// Runtime config is fetched from rest server and stored here.
315    ///
316    /// It's could be different from the user config.
317    config: RestCatalogConfig,
318}
319
320/// Rest catalog implementation.
321#[derive(Debug)]
322pub struct RestCatalog {
323    /// User config is stored as-is and never be changed.
324    ///
325    /// It could be different from the config fetched from the server and used at runtime.
326    user_config: RestCatalogConfig,
327    ctx: OnceCell<RestContext>,
328    /// Extensions for the FileIOBuilder.
329    file_io_extensions: io::Extensions,
330}
331
332impl RestCatalog {
333    /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
334    fn new(config: RestCatalogConfig) -> Self {
335        Self {
336            user_config: config,
337            ctx: OnceCell::new(),
338            file_io_extensions: io::Extensions::default(),
339        }
340    }
341
342    /// Add an extension to the file IO builder.
343    pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
344        self.file_io_extensions.add(ext);
345        self
346    }
347
348    /// Gets the [`RestContext`] from the catalog.
349    async fn context(&self) -> Result<&RestContext> {
350        self.ctx
351            .get_or_try_init(|| async {
352                let client = HttpClient::new(&self.user_config)?;
353                let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
354                let config = self.user_config.clone().merge_with_config(catalog_config);
355                let client = client.update_with(&config)?;
356
357                Ok(RestContext { config, client })
358            })
359            .await
360    }
361
362    /// Load the runtime config from the server by `user_config`.
363    ///
364    /// It's required for a REST catalog to update its config after creation.
365    async fn load_config(
366        client: &HttpClient,
367        user_config: &RestCatalogConfig,
368    ) -> Result<CatalogConfig> {
369        let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
370
371        if let Some(warehouse_location) = &user_config.warehouse {
372            request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
373        }
374
375        let request = request_builder.build()?;
376
377        let http_response = client.query_catalog(request).await?;
378
379        match http_response.status() {
380            StatusCode::OK => deserialize_catalog_response(http_response).await,
381            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
382        }
383    }
384
385    async fn load_file_io(
386        &self,
387        metadata_location: Option<&str>,
388        extra_config: Option<HashMap<String, String>>,
389    ) -> Result<FileIO> {
390        let mut props = self.context().await?.config.props.clone();
391        if let Some(config) = extra_config {
392            props.extend(config);
393        }
394
395        // If the warehouse is a logical identifier instead of a URL we don't want
396        // to raise an exception
397        let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
398            Some(url) if Url::parse(url).is_ok() => Some(url),
399            Some(_) => None,
400            None => None,
401        };
402
403        let file_io = match metadata_location.or(warehouse_path) {
404            Some(url) => FileIO::from_path(url)?
405                .with_props(props)
406                .with_extensions(self.file_io_extensions.clone())
407                .build()?,
408            None => {
409                return Err(Error::new(
410                    ErrorKind::Unexpected,
411                    "Unable to load file io, neither warehouse nor metadata location is set!",
412                ))?;
413            }
414        };
415
416        Ok(file_io)
417    }
418
419    /// Invalidate the current token without generating a new one. On the next request, the client
420    /// will attempt to generate a new token.
421    pub async fn invalidate_token(&self) -> Result<()> {
422        self.context().await?.client.invalidate_token().await
423    }
424
425    /// Invalidate the current token and set a new one. Generates a new token before invalidating
426    /// the current token, meaning the old token will be used until this function acquires the lock
427    /// and overwrites the token.
428    ///
429    /// If credential is invalid, or the request fails, this method will return an error and leave
430    /// the current token unchanged.
431    pub async fn regenerate_token(&self) -> Result<()> {
432        self.context().await?.client.regenerate_token().await
433    }
434}
435
436/// All requests and expected responses are derived from the REST catalog API spec:
437/// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
438#[async_trait]
439impl Catalog for RestCatalog {
440    async fn list_namespaces(
441        &self,
442        parent: Option<&NamespaceIdent>,
443    ) -> Result<Vec<NamespaceIdent>> {
444        let context = self.context().await?;
445        let endpoint = context.config.namespaces_endpoint();
446        let mut namespaces = Vec::new();
447        let mut next_token = None;
448
449        loop {
450            let mut request = context.client.request(Method::GET, endpoint.clone());
451
452            // Filter on `parent={namespace}` if a parent namespace exists.
453            if let Some(ns) = parent {
454                request = request.query(&[("parent", ns.to_url_string())]);
455            }
456
457            if let Some(token) = next_token {
458                request = request.query(&[("pageToken", token)]);
459            }
460
461            let http_response = context.client.query_catalog(request.build()?).await?;
462
463            match http_response.status() {
464                StatusCode::OK => {
465                    let response =
466                        deserialize_catalog_response::<ListNamespaceResponse>(http_response)
467                            .await?;
468
469                    let ns_identifiers = response
470                        .namespaces
471                        .into_iter()
472                        .map(NamespaceIdent::from_vec)
473                        .collect::<Result<Vec<NamespaceIdent>>>()?;
474
475                    namespaces.extend(ns_identifiers);
476
477                    match response.next_page_token {
478                        Some(token) => next_token = Some(token),
479                        None => break,
480                    }
481                }
482                StatusCode::NOT_FOUND => {
483                    return Err(Error::new(
484                        ErrorKind::Unexpected,
485                        "The parent parameter of the namespace provided does not exist",
486                    ));
487                }
488                _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
489            }
490        }
491
492        Ok(namespaces)
493    }
494
495    async fn create_namespace(
496        &self,
497        namespace: &NamespaceIdent,
498        properties: HashMap<String, String>,
499    ) -> Result<Namespace> {
500        let context = self.context().await?;
501
502        let request = context
503            .client
504            .request(Method::POST, context.config.namespaces_endpoint())
505            .json(&NamespaceSerde {
506                namespace: namespace.as_ref().clone(),
507                properties: Some(properties),
508            })
509            .build()?;
510
511        let http_response = context.client.query_catalog(request).await?;
512
513        match http_response.status() {
514            StatusCode::OK => {
515                let response =
516                    deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
517                Namespace::try_from(response)
518            }
519            StatusCode::CONFLICT => Err(Error::new(
520                ErrorKind::Unexpected,
521                "Tried to create a namespace that already exists",
522            )),
523            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
524        }
525    }
526
527    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
528        let context = self.context().await?;
529
530        let request = context
531            .client
532            .request(Method::GET, context.config.namespace_endpoint(namespace))
533            .build()?;
534
535        let http_response = context.client.query_catalog(request).await?;
536
537        match http_response.status() {
538            StatusCode::OK => {
539                let response =
540                    deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
541                Namespace::try_from(response)
542            }
543            StatusCode::NOT_FOUND => Err(Error::new(
544                ErrorKind::Unexpected,
545                "Tried to get a namespace that does not exist",
546            )),
547            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
548        }
549    }
550
551    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
552        let context = self.context().await?;
553
554        let request = context
555            .client
556            .request(Method::HEAD, context.config.namespace_endpoint(ns))
557            .build()?;
558
559        let http_response = context.client.query_catalog(request).await?;
560
561        match http_response.status() {
562            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
563            StatusCode::NOT_FOUND => Ok(false),
564            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
565        }
566    }
567
568    async fn update_namespace(
569        &self,
570        _namespace: &NamespaceIdent,
571        _properties: HashMap<String, String>,
572    ) -> Result<()> {
573        Err(Error::new(
574            ErrorKind::FeatureUnsupported,
575            "Updating namespace not supported yet!",
576        ))
577    }
578
579    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
580        let context = self.context().await?;
581
582        let request = context
583            .client
584            .request(Method::DELETE, context.config.namespace_endpoint(namespace))
585            .build()?;
586
587        let http_response = context.client.query_catalog(request).await?;
588
589        match http_response.status() {
590            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
591            StatusCode::NOT_FOUND => Err(Error::new(
592                ErrorKind::Unexpected,
593                "Tried to drop a namespace that does not exist",
594            )),
595            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
596        }
597    }
598
599    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
600        let context = self.context().await?;
601        let endpoint = context.config.tables_endpoint(namespace);
602        let mut identifiers = Vec::new();
603        let mut next_token = None;
604
605        loop {
606            let mut request = context.client.request(Method::GET, endpoint.clone());
607
608            if let Some(token) = next_token {
609                request = request.query(&[("pageToken", token)]);
610            }
611
612            let http_response = context.client.query_catalog(request.build()?).await?;
613
614            match http_response.status() {
615                StatusCode::OK => {
616                    let response =
617                        deserialize_catalog_response::<ListTableResponse>(http_response).await?;
618
619                    identifiers.extend(response.identifiers);
620
621                    match response.next_page_token {
622                        Some(token) => next_token = Some(token),
623                        None => break,
624                    }
625                }
626                StatusCode::NOT_FOUND => {
627                    return Err(Error::new(
628                        ErrorKind::Unexpected,
629                        "Tried to list tables of a namespace that does not exist",
630                    ));
631                }
632                _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
633            }
634        }
635
636        Ok(identifiers)
637    }
638
639    /// Create a new table inside the namespace.
640    ///
641    /// In the resulting table, if there are any config properties that
642    /// are present in both the response from the REST server and the
643    /// config provided when creating this `RestCatalog` instance then
644    /// the value provided locally to the `RestCatalog` will take precedence.
645    async fn create_table(
646        &self,
647        namespace: &NamespaceIdent,
648        creation: TableCreation,
649    ) -> Result<Table> {
650        let context = self.context().await?;
651
652        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
653
654        let request = context
655            .client
656            .request(Method::POST, context.config.tables_endpoint(namespace))
657            .json(&CreateTableRequest {
658                name: creation.name,
659                location: creation.location,
660                schema: creation.schema,
661                partition_spec: creation.partition_spec,
662                write_order: creation.sort_order,
663                stage_create: Some(false),
664                properties: if creation.properties.is_empty() {
665                    None
666                } else {
667                    Some(creation.properties)
668                },
669            })
670            .build()?;
671
672        let http_response = context.client.query_catalog(request).await?;
673
674        let response = match http_response.status() {
675            StatusCode::OK => {
676                deserialize_catalog_response::<LoadTableResponse>(http_response).await?
677            }
678            StatusCode::NOT_FOUND => {
679                return Err(Error::new(
680                    ErrorKind::Unexpected,
681                    "Tried to create a table under a namespace that does not exist",
682                ));
683            }
684            StatusCode::CONFLICT => {
685                return Err(Error::new(
686                    ErrorKind::Unexpected,
687                    "The table already exists",
688                ));
689            }
690            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
691        };
692
693        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
694            ErrorKind::DataInvalid,
695            "Metadata location missing in `create_table` response!",
696        ))?;
697
698        let config = response
699            .config
700            .unwrap_or_default()
701            .into_iter()
702            .chain(self.user_config.props.clone())
703            .collect();
704
705        let file_io = self
706            .load_file_io(Some(metadata_location), Some(config))
707            .await?;
708
709        let table_builder = Table::builder()
710            .identifier(table_ident.clone())
711            .file_io(file_io)
712            .metadata(response.metadata);
713
714        if let Some(metadata_location) = response.metadata_location {
715            table_builder.metadata_location(metadata_location).build()
716        } else {
717            table_builder.build()
718        }
719    }
720
721    /// Load table from the catalog.
722    ///
723    /// If there are any config properties that are present in both the response from the REST
724    /// server and the config provided when creating this `RestCatalog` instance, then the value
725    /// provided locally to the `RestCatalog` will take precedence.
726    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
727        let context = self.context().await?;
728
729        let request = context
730            .client
731            .request(Method::GET, context.config.table_endpoint(table_ident))
732            .build()?;
733
734        let http_response = context.client.query_catalog(request).await?;
735
736        let response = match http_response.status() {
737            StatusCode::OK | StatusCode::NOT_MODIFIED => {
738                deserialize_catalog_response::<LoadTableResponse>(http_response).await?
739            }
740            StatusCode::NOT_FOUND => {
741                return Err(Error::new(
742                    ErrorKind::Unexpected,
743                    "Tried to load a table that does not exist",
744                ));
745            }
746            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
747        };
748
749        let config = response
750            .config
751            .unwrap_or_default()
752            .into_iter()
753            .chain(self.user_config.props.clone())
754            .collect();
755
756        let file_io = self
757            .load_file_io(response.metadata_location.as_deref(), Some(config))
758            .await?;
759
760        let table_builder = Table::builder()
761            .identifier(table_ident.clone())
762            .file_io(file_io)
763            .metadata(response.metadata);
764
765        if let Some(metadata_location) = response.metadata_location {
766            table_builder.metadata_location(metadata_location).build()
767        } else {
768            table_builder.build()
769        }
770    }
771
772    /// Drop a table from the catalog.
773    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
774        let context = self.context().await?;
775
776        let request = context
777            .client
778            .request(Method::DELETE, context.config.table_endpoint(table))
779            .build()?;
780
781        let http_response = context.client.query_catalog(request).await?;
782
783        match http_response.status() {
784            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
785            StatusCode::NOT_FOUND => Err(Error::new(
786                ErrorKind::Unexpected,
787                "Tried to drop a table that does not exist",
788            )),
789            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
790        }
791    }
792
793    /// Check if a table exists in the catalog.
794    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
795        let context = self.context().await?;
796
797        let request = context
798            .client
799            .request(Method::HEAD, context.config.table_endpoint(table))
800            .build()?;
801
802        let http_response = context.client.query_catalog(request).await?;
803
804        match http_response.status() {
805            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
806            StatusCode::NOT_FOUND => Ok(false),
807            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
808        }
809    }
810
811    /// Rename a table in the catalog.
812    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
813        let context = self.context().await?;
814
815        let request = context
816            .client
817            .request(Method::POST, context.config.rename_table_endpoint())
818            .json(&RenameTableRequest {
819                source: src.clone(),
820                destination: dest.clone(),
821            })
822            .build()?;
823
824        let http_response = context.client.query_catalog(request).await?;
825
826        match http_response.status() {
827            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
828            StatusCode::NOT_FOUND => Err(Error::new(
829                ErrorKind::Unexpected,
830                "Tried to rename a table that does not exist (is the namespace correct?)",
831            )),
832            StatusCode::CONFLICT => Err(Error::new(
833                ErrorKind::Unexpected,
834                "Tried to rename a table to a name that already exists",
835            )),
836            _ => Err(deserialize_unexpected_catalog_error(http_response).await),
837        }
838    }
839
840    async fn register_table(
841        &self,
842        table_ident: &TableIdent,
843        metadata_location: String,
844    ) -> Result<Table> {
845        let context = self.context().await?;
846
847        let request = context
848            .client
849            .request(
850                Method::POST,
851                context
852                    .config
853                    .register_table_endpoint(table_ident.namespace()),
854            )
855            .json(&RegisterTableRequest {
856                name: table_ident.name.clone(),
857                metadata_location: metadata_location.clone(),
858                overwrite: Some(false),
859            })
860            .build()?;
861
862        let http_response = context.client.query_catalog(request).await?;
863
864        let response: LoadTableResponse = match http_response.status() {
865            StatusCode::OK => {
866                deserialize_catalog_response::<LoadTableResponse>(http_response).await?
867            }
868            StatusCode::NOT_FOUND => {
869                return Err(Error::new(
870                    ErrorKind::NamespaceNotFound,
871                    "The namespace specified does not exist.",
872                ));
873            }
874            StatusCode::CONFLICT => {
875                return Err(Error::new(
876                    ErrorKind::TableAlreadyExists,
877                    "The given table already exists.",
878                ));
879            }
880            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
881        };
882
883        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
884            ErrorKind::DataInvalid,
885            "Metadata location missing in `register_table` response!",
886        ))?;
887
888        let file_io = self.load_file_io(Some(metadata_location), None).await?;
889
890        Table::builder()
891            .identifier(table_ident.clone())
892            .file_io(file_io)
893            .metadata(response.metadata)
894            .metadata_location(metadata_location.clone())
895            .build()
896    }
897
898    async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
899        let context = self.context().await?;
900
901        let request = context
902            .client
903            .request(
904                Method::POST,
905                context.config.table_endpoint(commit.identifier()),
906            )
907            .json(&CommitTableRequest {
908                identifier: commit.identifier().clone(),
909                requirements: commit.take_requirements(),
910                updates: commit.take_updates(),
911            })
912            .build()?;
913
914        let http_response = context.client.query_catalog(request).await?;
915
916        let response: CommitTableResponse = match http_response.status() {
917            StatusCode::OK => deserialize_catalog_response(http_response).await?,
918            StatusCode::NOT_FOUND => {
919                return Err(Error::new(
920                    ErrorKind::TableNotFound,
921                    "Tried to update a table that does not exist",
922                ));
923            }
924            StatusCode::CONFLICT => {
925                return Err(Error::new(
926                    ErrorKind::CatalogCommitConflicts,
927                    "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
928                )
929                .with_retryable(true));
930            }
931            StatusCode::INTERNAL_SERVER_ERROR => {
932                return Err(Error::new(
933                    ErrorKind::Unexpected,
934                    "An unknown server-side problem occurred; the commit state is unknown.",
935                ));
936            }
937            StatusCode::BAD_GATEWAY => {
938                return Err(Error::new(
939                    ErrorKind::Unexpected,
940                    "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
941                ));
942            }
943            StatusCode::GATEWAY_TIMEOUT => {
944                return Err(Error::new(
945                    ErrorKind::Unexpected,
946                    "A server-side gateway timeout occurred; the commit state is unknown.",
947                ));
948            }
949            _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
950        };
951
952        let file_io = self
953            .load_file_io(Some(&response.metadata_location), None)
954            .await?;
955
956        Table::builder()
957            .identifier(commit.identifier().clone())
958            .file_io(file_io)
959            .metadata(response.metadata)
960            .metadata_location(response.metadata_location)
961            .build()
962    }
963}
964
965#[cfg(test)]
966mod tests {
967    use std::fs::File;
968    use std::io::BufReader;
969    use std::sync::Arc;
970
971    use chrono::{TimeZone, Utc};
972    use iceberg::spec::{
973        FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
974        SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
975        UnboundPartitionField, UnboundPartitionSpec,
976    };
977    use iceberg::transaction::{ApplyTransactionAction, Transaction};
978    use mockito::{Mock, Server, ServerGuard};
979    use serde_json::json;
980    use uuid::uuid;
981
982    use super::*;
983
984    #[tokio::test]
985    async fn test_update_config() {
986        let mut server = Server::new_async().await;
987
988        let config_mock = server
989            .mock("GET", "/v1/config")
990            .with_status(200)
991            .with_body(
992                r#"{
993                "overrides": {
994                    "warehouse": "s3://iceberg-catalog"
995                },
996                "defaults": {}
997            }"#,
998            )
999            .create_async()
1000            .await;
1001
1002        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1003
1004        assert_eq!(
1005            catalog
1006                .context()
1007                .await
1008                .unwrap()
1009                .config
1010                .props
1011                .get("warehouse"),
1012            Some(&"s3://iceberg-catalog".to_string())
1013        );
1014
1015        config_mock.assert_async().await;
1016    }
1017
1018    async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1019        server
1020            .mock("GET", "/v1/config")
1021            .with_status(200)
1022            .with_body(
1023                r#"{
1024                "overrides": {
1025                    "warehouse": "s3://iceberg-catalog"
1026                },
1027                "defaults": {}
1028            }"#,
1029            )
1030            .create_async()
1031            .await
1032    }
1033
1034    async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1035        create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1036    }
1037
1038    async fn create_oauth_mock_with_path(
1039        server: &mut ServerGuard,
1040        path: &str,
1041        token: &str,
1042        status: usize,
1043    ) -> Mock {
1044        let body = format!(
1045            r#"{{
1046                "access_token": "{token}",
1047                "token_type": "Bearer",
1048                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1049                "expires_in": 86400
1050            }}"#
1051        );
1052        server
1053            .mock("POST", path)
1054            .with_status(status)
1055            .with_body(body)
1056            .expect(1)
1057            .create_async()
1058            .await
1059    }
1060
1061    #[tokio::test]
1062    async fn test_oauth() {
1063        let mut server = Server::new_async().await;
1064        let oauth_mock = create_oauth_mock(&mut server).await;
1065        let config_mock = create_config_mock(&mut server).await;
1066
1067        let mut props = HashMap::new();
1068        props.insert("credential".to_string(), "client1:secret1".to_string());
1069
1070        let catalog = RestCatalog::new(
1071            RestCatalogConfig::builder()
1072                .uri(server.url())
1073                .props(props)
1074                .build(),
1075        );
1076
1077        let token = catalog.context().await.unwrap().client.token().await;
1078        oauth_mock.assert_async().await;
1079        config_mock.assert_async().await;
1080        assert_eq!(token, Some("ey000000000000".to_string()));
1081    }
1082
1083    #[tokio::test]
1084    async fn test_oauth_with_optional_param() {
1085        let mut props = HashMap::new();
1086        props.insert("credential".to_string(), "client1:secret1".to_string());
1087        props.insert("scope".to_string(), "custom_scope".to_string());
1088        props.insert("audience".to_string(), "custom_audience".to_string());
1089        props.insert("resource".to_string(), "custom_resource".to_string());
1090
1091        let mut server = Server::new_async().await;
1092        let oauth_mock = server
1093            .mock("POST", "/v1/oauth/tokens")
1094            .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1095            .match_body(mockito::Matcher::Regex(
1096                "audience=custom_audience".to_string(),
1097            ))
1098            .match_body(mockito::Matcher::Regex(
1099                "resource=custom_resource".to_string(),
1100            ))
1101            .with_status(200)
1102            .with_body(
1103                r#"{
1104                "access_token": "ey000000000000",
1105                "token_type": "Bearer",
1106                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1107                "expires_in": 86400
1108                }"#,
1109            )
1110            .expect(1)
1111            .create_async()
1112            .await;
1113
1114        let config_mock = create_config_mock(&mut server).await;
1115
1116        let catalog = RestCatalog::new(
1117            RestCatalogConfig::builder()
1118                .uri(server.url())
1119                .props(props)
1120                .build(),
1121        );
1122
1123        let token = catalog.context().await.unwrap().client.token().await;
1124
1125        oauth_mock.assert_async().await;
1126        config_mock.assert_async().await;
1127        assert_eq!(token, Some("ey000000000000".to_string()));
1128    }
1129
1130    #[tokio::test]
1131    async fn test_invalidate_token() {
1132        let mut server = Server::new_async().await;
1133        let oauth_mock = create_oauth_mock(&mut server).await;
1134        let config_mock = create_config_mock(&mut server).await;
1135
1136        let mut props = HashMap::new();
1137        props.insert("credential".to_string(), "client1:secret1".to_string());
1138
1139        let catalog = RestCatalog::new(
1140            RestCatalogConfig::builder()
1141                .uri(server.url())
1142                .props(props)
1143                .build(),
1144        );
1145
1146        let token = catalog.context().await.unwrap().client.token().await;
1147        oauth_mock.assert_async().await;
1148        config_mock.assert_async().await;
1149        assert_eq!(token, Some("ey000000000000".to_string()));
1150
1151        let oauth_mock =
1152            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1153                .await;
1154        catalog.invalidate_token().await.unwrap();
1155        let token = catalog.context().await.unwrap().client.token().await;
1156        oauth_mock.assert_async().await;
1157        assert_eq!(token, Some("ey000000000001".to_string()));
1158    }
1159
1160    #[tokio::test]
1161    async fn test_invalidate_token_failing_request() {
1162        let mut server = Server::new_async().await;
1163        let oauth_mock = create_oauth_mock(&mut server).await;
1164        let config_mock = create_config_mock(&mut server).await;
1165
1166        let mut props = HashMap::new();
1167        props.insert("credential".to_string(), "client1:secret1".to_string());
1168
1169        let catalog = RestCatalog::new(
1170            RestCatalogConfig::builder()
1171                .uri(server.url())
1172                .props(props)
1173                .build(),
1174        );
1175
1176        let token = catalog.context().await.unwrap().client.token().await;
1177        oauth_mock.assert_async().await;
1178        config_mock.assert_async().await;
1179        assert_eq!(token, Some("ey000000000000".to_string()));
1180
1181        let oauth_mock =
1182            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1183                .await;
1184        catalog.invalidate_token().await.unwrap();
1185        let token = catalog.context().await.unwrap().client.token().await;
1186        oauth_mock.assert_async().await;
1187        assert_eq!(token, None);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_regenerate_token() {
1192        let mut server = Server::new_async().await;
1193        let oauth_mock = create_oauth_mock(&mut server).await;
1194        let config_mock = create_config_mock(&mut server).await;
1195
1196        let mut props = HashMap::new();
1197        props.insert("credential".to_string(), "client1:secret1".to_string());
1198
1199        let catalog = RestCatalog::new(
1200            RestCatalogConfig::builder()
1201                .uri(server.url())
1202                .props(props)
1203                .build(),
1204        );
1205
1206        let token = catalog.context().await.unwrap().client.token().await;
1207        oauth_mock.assert_async().await;
1208        config_mock.assert_async().await;
1209        assert_eq!(token, Some("ey000000000000".to_string()));
1210
1211        let oauth_mock =
1212            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1213                .await;
1214        catalog.regenerate_token().await.unwrap();
1215        oauth_mock.assert_async().await;
1216        let token = catalog.context().await.unwrap().client.token().await;
1217        assert_eq!(token, Some("ey000000000001".to_string()));
1218    }
1219
1220    #[tokio::test]
1221    async fn test_regenerate_token_failing_request() {
1222        let mut server = Server::new_async().await;
1223        let oauth_mock = create_oauth_mock(&mut server).await;
1224        let config_mock = create_config_mock(&mut server).await;
1225
1226        let mut props = HashMap::new();
1227        props.insert("credential".to_string(), "client1:secret1".to_string());
1228
1229        let catalog = RestCatalog::new(
1230            RestCatalogConfig::builder()
1231                .uri(server.url())
1232                .props(props)
1233                .build(),
1234        );
1235
1236        let token = catalog.context().await.unwrap().client.token().await;
1237        oauth_mock.assert_async().await;
1238        config_mock.assert_async().await;
1239        assert_eq!(token, Some("ey000000000000".to_string()));
1240
1241        let oauth_mock =
1242            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1243                .await;
1244        let invalidate_result = catalog.regenerate_token().await;
1245        assert!(invalidate_result.is_err());
1246        oauth_mock.assert_async().await;
1247        let token = catalog.context().await.unwrap().client.token().await;
1248
1249        // original token is left intact
1250        assert_eq!(token, Some("ey000000000000".to_string()));
1251    }
1252
1253    #[tokio::test]
1254    async fn test_http_headers() {
1255        let server = Server::new_async().await;
1256        let mut props = HashMap::new();
1257        props.insert("credential".to_string(), "client1:secret1".to_string());
1258
1259        let config = RestCatalogConfig::builder()
1260            .uri(server.url())
1261            .props(props)
1262            .build();
1263        let headers: HeaderMap = config.extra_headers().unwrap();
1264
1265        let expected_headers = HeaderMap::from_iter([
1266            (
1267                header::CONTENT_TYPE,
1268                HeaderValue::from_static("application/json"),
1269            ),
1270            (
1271                HeaderName::from_static("x-client-version"),
1272                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1273            ),
1274            (
1275                header::USER_AGENT,
1276                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1277            ),
1278        ]);
1279        assert_eq!(headers, expected_headers);
1280    }
1281
1282    #[tokio::test]
1283    async fn test_http_headers_with_custom_headers() {
1284        let server = Server::new_async().await;
1285        let mut props = HashMap::new();
1286        props.insert("credential".to_string(), "client1:secret1".to_string());
1287        props.insert(
1288            "header.content-type".to_string(),
1289            "application/yaml".to_string(),
1290        );
1291        props.insert(
1292            "header.customized-header".to_string(),
1293            "some/value".to_string(),
1294        );
1295
1296        let config = RestCatalogConfig::builder()
1297            .uri(server.url())
1298            .props(props)
1299            .build();
1300        let headers: HeaderMap = config.extra_headers().unwrap();
1301
1302        let expected_headers = HeaderMap::from_iter([
1303            (
1304                header::CONTENT_TYPE,
1305                HeaderValue::from_static("application/yaml"),
1306            ),
1307            (
1308                HeaderName::from_static("x-client-version"),
1309                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1310            ),
1311            (
1312                header::USER_AGENT,
1313                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1314            ),
1315            (
1316                HeaderName::from_static("customized-header"),
1317                HeaderValue::from_static("some/value"),
1318            ),
1319        ]);
1320        assert_eq!(headers, expected_headers);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_oauth_with_oauth2_server_uri() {
1325        let mut server = Server::new_async().await;
1326        let config_mock = create_config_mock(&mut server).await;
1327
1328        let mut auth_server = Server::new_async().await;
1329        let auth_server_path = "/some/path";
1330        let oauth_mock =
1331            create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1332                .await;
1333
1334        let mut props = HashMap::new();
1335        props.insert("credential".to_string(), "client1:secret1".to_string());
1336        props.insert(
1337            "oauth2-server-uri".to_string(),
1338            format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1339        );
1340
1341        let catalog = RestCatalog::new(
1342            RestCatalogConfig::builder()
1343                .uri(server.url())
1344                .props(props)
1345                .build(),
1346        );
1347
1348        let token = catalog.context().await.unwrap().client.token().await;
1349
1350        oauth_mock.assert_async().await;
1351        config_mock.assert_async().await;
1352        assert_eq!(token, Some("ey000000000000".to_string()));
1353    }
1354
1355    #[tokio::test]
1356    async fn test_config_override() {
1357        let mut server = Server::new_async().await;
1358        let mut redirect_server = Server::new_async().await;
1359        let new_uri = redirect_server.url();
1360
1361        let config_mock = server
1362            .mock("GET", "/v1/config")
1363            .with_status(200)
1364            .with_body(
1365                json!(
1366                    {
1367                        "overrides": {
1368                            "uri": new_uri,
1369                            "warehouse": "s3://iceberg-catalog",
1370                            "prefix": "ice/warehouses/my"
1371                        },
1372                        "defaults": {},
1373                    }
1374                )
1375                .to_string(),
1376            )
1377            .create_async()
1378            .await;
1379
1380        let list_ns_mock = redirect_server
1381            .mock("GET", "/v1/ice/warehouses/my/namespaces")
1382            .with_body(
1383                r#"{
1384                    "namespaces": []
1385                }"#,
1386            )
1387            .create_async()
1388            .await;
1389
1390        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1391
1392        let _namespaces = catalog.list_namespaces(None).await.unwrap();
1393
1394        config_mock.assert_async().await;
1395        list_ns_mock.assert_async().await;
1396    }
1397
1398    #[tokio::test]
1399    async fn test_list_namespace() {
1400        let mut server = Server::new_async().await;
1401
1402        let config_mock = create_config_mock(&mut server).await;
1403
1404        let list_ns_mock = server
1405            .mock("GET", "/v1/namespaces")
1406            .with_body(
1407                r#"{
1408                "namespaces": [
1409                    ["ns1", "ns11"],
1410                    ["ns2"]
1411                ]
1412            }"#,
1413            )
1414            .create_async()
1415            .await;
1416
1417        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1418
1419        let namespaces = catalog.list_namespaces(None).await.unwrap();
1420
1421        let expected_ns = vec![
1422            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1423            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1424        ];
1425
1426        assert_eq!(expected_ns, namespaces);
1427
1428        config_mock.assert_async().await;
1429        list_ns_mock.assert_async().await;
1430    }
1431
1432    #[tokio::test]
1433    async fn test_list_namespace_with_pagination() {
1434        let mut server = Server::new_async().await;
1435
1436        let config_mock = create_config_mock(&mut server).await;
1437
1438        let list_ns_mock_page1 = server
1439            .mock("GET", "/v1/namespaces")
1440            .with_body(
1441                r#"{
1442                "namespaces": [
1443                    ["ns1", "ns11"],
1444                    ["ns2"]
1445                ],
1446                "next-page-token": "token123"
1447            }"#,
1448            )
1449            .create_async()
1450            .await;
1451
1452        let list_ns_mock_page2 = server
1453            .mock("GET", "/v1/namespaces?pageToken=token123")
1454            .with_body(
1455                r#"{
1456                "namespaces": [
1457                    ["ns3"],
1458                    ["ns4", "ns41"]
1459                ]
1460            }"#,
1461            )
1462            .create_async()
1463            .await;
1464
1465        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1466
1467        let namespaces = catalog.list_namespaces(None).await.unwrap();
1468
1469        let expected_ns = vec![
1470            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1471            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1472            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1473            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1474        ];
1475
1476        assert_eq!(expected_ns, namespaces);
1477
1478        config_mock.assert_async().await;
1479        list_ns_mock_page1.assert_async().await;
1480        list_ns_mock_page2.assert_async().await;
1481    }
1482
1483    #[tokio::test]
1484    async fn test_list_namespace_with_multiple_pages() {
1485        let mut server = Server::new_async().await;
1486
1487        let config_mock = create_config_mock(&mut server).await;
1488
1489        // Page 1
1490        let list_ns_mock_page1 = server
1491            .mock("GET", "/v1/namespaces")
1492            .with_body(
1493                r#"{
1494                "namespaces": [
1495                    ["ns1", "ns11"],
1496                    ["ns2"]
1497                ],
1498                "next-page-token": "page2"
1499            }"#,
1500            )
1501            .create_async()
1502            .await;
1503
1504        // Page 2
1505        let list_ns_mock_page2 = server
1506            .mock("GET", "/v1/namespaces?pageToken=page2")
1507            .with_body(
1508                r#"{
1509                "namespaces": [
1510                    ["ns3"],
1511                    ["ns4", "ns41"]
1512                ],
1513                "next-page-token": "page3"
1514            }"#,
1515            )
1516            .create_async()
1517            .await;
1518
1519        // Page 3
1520        let list_ns_mock_page3 = server
1521            .mock("GET", "/v1/namespaces?pageToken=page3")
1522            .with_body(
1523                r#"{
1524                "namespaces": [
1525                    ["ns5", "ns51", "ns511"]
1526                ],
1527                "next-page-token": "page4"
1528            }"#,
1529            )
1530            .create_async()
1531            .await;
1532
1533        // Page 4
1534        let list_ns_mock_page4 = server
1535            .mock("GET", "/v1/namespaces?pageToken=page4")
1536            .with_body(
1537                r#"{
1538                "namespaces": [
1539                    ["ns6"],
1540                    ["ns7"]
1541                ],
1542                "next-page-token": "page5"
1543            }"#,
1544            )
1545            .create_async()
1546            .await;
1547
1548        // Page 5 (final page)
1549        let list_ns_mock_page5 = server
1550            .mock("GET", "/v1/namespaces?pageToken=page5")
1551            .with_body(
1552                r#"{
1553                "namespaces": [
1554                    ["ns8", "ns81"]
1555                ]
1556            }"#,
1557            )
1558            .create_async()
1559            .await;
1560
1561        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1562
1563        let namespaces = catalog.list_namespaces(None).await.unwrap();
1564
1565        let expected_ns = vec![
1566            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1567            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1568            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1569            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1570            NamespaceIdent::from_vec(vec![
1571                "ns5".to_string(),
1572                "ns51".to_string(),
1573                "ns511".to_string(),
1574            ])
1575            .unwrap(),
1576            NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1577            NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1578            NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1579        ];
1580
1581        assert_eq!(expected_ns, namespaces);
1582
1583        // Verify all page requests were made
1584        config_mock.assert_async().await;
1585        list_ns_mock_page1.assert_async().await;
1586        list_ns_mock_page2.assert_async().await;
1587        list_ns_mock_page3.assert_async().await;
1588        list_ns_mock_page4.assert_async().await;
1589        list_ns_mock_page5.assert_async().await;
1590    }
1591
1592    #[tokio::test]
1593    async fn test_create_namespace() {
1594        let mut server = Server::new_async().await;
1595
1596        let config_mock = create_config_mock(&mut server).await;
1597
1598        let create_ns_mock = server
1599            .mock("POST", "/v1/namespaces")
1600            .with_body(
1601                r#"{
1602                "namespace": [ "ns1", "ns11"],
1603                "properties" : {
1604                    "key1": "value1"
1605                }
1606            }"#,
1607            )
1608            .create_async()
1609            .await;
1610
1611        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1612
1613        let namespaces = catalog
1614            .create_namespace(
1615                &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1616                HashMap::from([("key1".to_string(), "value1".to_string())]),
1617            )
1618            .await
1619            .unwrap();
1620
1621        let expected_ns = Namespace::with_properties(
1622            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1623            HashMap::from([("key1".to_string(), "value1".to_string())]),
1624        );
1625
1626        assert_eq!(expected_ns, namespaces);
1627
1628        config_mock.assert_async().await;
1629        create_ns_mock.assert_async().await;
1630    }
1631
1632    #[tokio::test]
1633    async fn test_get_namespace() {
1634        let mut server = Server::new_async().await;
1635
1636        let config_mock = create_config_mock(&mut server).await;
1637
1638        let get_ns_mock = server
1639            .mock("GET", "/v1/namespaces/ns1")
1640            .with_body(
1641                r#"{
1642                "namespace": [ "ns1"],
1643                "properties" : {
1644                    "key1": "value1"
1645                }
1646            }"#,
1647            )
1648            .create_async()
1649            .await;
1650
1651        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1652
1653        let namespaces = catalog
1654            .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1655            .await
1656            .unwrap();
1657
1658        let expected_ns = Namespace::with_properties(
1659            NamespaceIdent::new("ns1".to_string()),
1660            HashMap::from([("key1".to_string(), "value1".to_string())]),
1661        );
1662
1663        assert_eq!(expected_ns, namespaces);
1664
1665        config_mock.assert_async().await;
1666        get_ns_mock.assert_async().await;
1667    }
1668
1669    #[tokio::test]
1670    async fn check_namespace_exists() {
1671        let mut server = Server::new_async().await;
1672
1673        let config_mock = create_config_mock(&mut server).await;
1674
1675        let get_ns_mock = server
1676            .mock("HEAD", "/v1/namespaces/ns1")
1677            .with_status(204)
1678            .create_async()
1679            .await;
1680
1681        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1682
1683        assert!(
1684            catalog
1685                .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1686                .await
1687                .unwrap()
1688        );
1689
1690        config_mock.assert_async().await;
1691        get_ns_mock.assert_async().await;
1692    }
1693
1694    #[tokio::test]
1695    async fn test_drop_namespace() {
1696        let mut server = Server::new_async().await;
1697
1698        let config_mock = create_config_mock(&mut server).await;
1699
1700        let drop_ns_mock = server
1701            .mock("DELETE", "/v1/namespaces/ns1")
1702            .with_status(204)
1703            .create_async()
1704            .await;
1705
1706        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1707
1708        catalog
1709            .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1710            .await
1711            .unwrap();
1712
1713        config_mock.assert_async().await;
1714        drop_ns_mock.assert_async().await;
1715    }
1716
1717    #[tokio::test]
1718    async fn test_list_tables() {
1719        let mut server = Server::new_async().await;
1720
1721        let config_mock = create_config_mock(&mut server).await;
1722
1723        let list_tables_mock = server
1724            .mock("GET", "/v1/namespaces/ns1/tables")
1725            .with_status(200)
1726            .with_body(
1727                r#"{
1728                "identifiers": [
1729                    {
1730                        "namespace": ["ns1"],
1731                        "name": "table1"
1732                    },
1733                    {
1734                        "namespace": ["ns1"],
1735                        "name": "table2"
1736                    }
1737                ]
1738            }"#,
1739            )
1740            .create_async()
1741            .await;
1742
1743        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1744
1745        let tables = catalog
1746            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1747            .await
1748            .unwrap();
1749
1750        let expected_tables = vec![
1751            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1752            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1753        ];
1754
1755        assert_eq!(tables, expected_tables);
1756
1757        config_mock.assert_async().await;
1758        list_tables_mock.assert_async().await;
1759    }
1760
1761    #[tokio::test]
1762    async fn test_list_tables_with_pagination() {
1763        let mut server = Server::new_async().await;
1764
1765        let config_mock = create_config_mock(&mut server).await;
1766
1767        let list_tables_mock_page1 = server
1768            .mock("GET", "/v1/namespaces/ns1/tables")
1769            .with_status(200)
1770            .with_body(
1771                r#"{
1772                "identifiers": [
1773                    {
1774                        "namespace": ["ns1"],
1775                        "name": "table1"
1776                    },
1777                    {
1778                        "namespace": ["ns1"],
1779                        "name": "table2"
1780                    }
1781                ],
1782                "next-page-token": "token456"
1783            }"#,
1784            )
1785            .create_async()
1786            .await;
1787
1788        let list_tables_mock_page2 = server
1789            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1790            .with_status(200)
1791            .with_body(
1792                r#"{
1793                "identifiers": [
1794                    {
1795                        "namespace": ["ns1"],
1796                        "name": "table3"
1797                    },
1798                    {
1799                        "namespace": ["ns1"],
1800                        "name": "table4"
1801                    }
1802                ]
1803            }"#,
1804            )
1805            .create_async()
1806            .await;
1807
1808        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1809
1810        let tables = catalog
1811            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1812            .await
1813            .unwrap();
1814
1815        let expected_tables = vec![
1816            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1817            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1818            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1819            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1820        ];
1821
1822        assert_eq!(tables, expected_tables);
1823
1824        config_mock.assert_async().await;
1825        list_tables_mock_page1.assert_async().await;
1826        list_tables_mock_page2.assert_async().await;
1827    }
1828
1829    #[tokio::test]
1830    async fn test_list_tables_with_multiple_pages() {
1831        let mut server = Server::new_async().await;
1832
1833        let config_mock = create_config_mock(&mut server).await;
1834
1835        // Page 1
1836        let list_tables_mock_page1 = server
1837            .mock("GET", "/v1/namespaces/ns1/tables")
1838            .with_status(200)
1839            .with_body(
1840                r#"{
1841                "identifiers": [
1842                    {
1843                        "namespace": ["ns1"],
1844                        "name": "table1"
1845                    },
1846                    {
1847                        "namespace": ["ns1"],
1848                        "name": "table2"
1849                    }
1850                ],
1851                "next-page-token": "page2"
1852            }"#,
1853            )
1854            .create_async()
1855            .await;
1856
1857        // Page 2
1858        let list_tables_mock_page2 = server
1859            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1860            .with_status(200)
1861            .with_body(
1862                r#"{
1863                "identifiers": [
1864                    {
1865                        "namespace": ["ns1"],
1866                        "name": "table3"
1867                    },
1868                    {
1869                        "namespace": ["ns1"],
1870                        "name": "table4"
1871                    }
1872                ],
1873                "next-page-token": "page3"
1874            }"#,
1875            )
1876            .create_async()
1877            .await;
1878
1879        // Page 3
1880        let list_tables_mock_page3 = server
1881            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
1882            .with_status(200)
1883            .with_body(
1884                r#"{
1885                "identifiers": [
1886                    {
1887                        "namespace": ["ns1"],
1888                        "name": "table5"
1889                    }
1890                ],
1891                "next-page-token": "page4"
1892            }"#,
1893            )
1894            .create_async()
1895            .await;
1896
1897        // Page 4
1898        let list_tables_mock_page4 = server
1899            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
1900            .with_status(200)
1901            .with_body(
1902                r#"{
1903                "identifiers": [
1904                    {
1905                        "namespace": ["ns1"],
1906                        "name": "table6"
1907                    },
1908                    {
1909                        "namespace": ["ns1"],
1910                        "name": "table7"
1911                    }
1912                ],
1913                "next-page-token": "page5"
1914            }"#,
1915            )
1916            .create_async()
1917            .await;
1918
1919        // Page 5 (final page)
1920        let list_tables_mock_page5 = server
1921            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
1922            .with_status(200)
1923            .with_body(
1924                r#"{
1925                "identifiers": [
1926                    {
1927                        "namespace": ["ns1"],
1928                        "name": "table8"
1929                    }
1930                ]
1931            }"#,
1932            )
1933            .create_async()
1934            .await;
1935
1936        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1937
1938        let tables = catalog
1939            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1940            .await
1941            .unwrap();
1942
1943        let expected_tables = vec![
1944            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1945            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1946            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1947            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1948            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
1949            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
1950            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
1951            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
1952        ];
1953
1954        assert_eq!(tables, expected_tables);
1955
1956        // Verify all page requests were made
1957        config_mock.assert_async().await;
1958        list_tables_mock_page1.assert_async().await;
1959        list_tables_mock_page2.assert_async().await;
1960        list_tables_mock_page3.assert_async().await;
1961        list_tables_mock_page4.assert_async().await;
1962        list_tables_mock_page5.assert_async().await;
1963    }
1964
1965    #[tokio::test]
1966    async fn test_drop_tables() {
1967        let mut server = Server::new_async().await;
1968
1969        let config_mock = create_config_mock(&mut server).await;
1970
1971        let delete_table_mock = server
1972            .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
1973            .with_status(204)
1974            .create_async()
1975            .await;
1976
1977        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1978
1979        catalog
1980            .drop_table(&TableIdent::new(
1981                NamespaceIdent::new("ns1".to_string()),
1982                "table1".to_string(),
1983            ))
1984            .await
1985            .unwrap();
1986
1987        config_mock.assert_async().await;
1988        delete_table_mock.assert_async().await;
1989    }
1990
1991    #[tokio::test]
1992    async fn test_check_table_exists() {
1993        let mut server = Server::new_async().await;
1994
1995        let config_mock = create_config_mock(&mut server).await;
1996
1997        let check_table_exists_mock = server
1998            .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
1999            .with_status(204)
2000            .create_async()
2001            .await;
2002
2003        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2004
2005        assert!(
2006            catalog
2007                .table_exists(&TableIdent::new(
2008                    NamespaceIdent::new("ns1".to_string()),
2009                    "table1".to_string(),
2010                ))
2011                .await
2012                .unwrap()
2013        );
2014
2015        config_mock.assert_async().await;
2016        check_table_exists_mock.assert_async().await;
2017    }
2018
2019    #[tokio::test]
2020    async fn test_rename_table() {
2021        let mut server = Server::new_async().await;
2022
2023        let config_mock = create_config_mock(&mut server).await;
2024
2025        let rename_table_mock = server
2026            .mock("POST", "/v1/tables/rename")
2027            .with_status(204)
2028            .create_async()
2029            .await;
2030
2031        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2032
2033        catalog
2034            .rename_table(
2035                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2036                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2037            )
2038            .await
2039            .unwrap();
2040
2041        config_mock.assert_async().await;
2042        rename_table_mock.assert_async().await;
2043    }
2044
2045    #[tokio::test]
2046    async fn test_load_table() {
2047        let mut server = Server::new_async().await;
2048
2049        let config_mock = create_config_mock(&mut server).await;
2050
2051        let rename_table_mock = server
2052            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2053            .with_status(200)
2054            .with_body_from_file(format!(
2055                "{}/testdata/{}",
2056                env!("CARGO_MANIFEST_DIR"),
2057                "load_table_response.json"
2058            ))
2059            .create_async()
2060            .await;
2061
2062        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2063
2064        let table = catalog
2065            .load_table(&TableIdent::new(
2066                NamespaceIdent::new("ns1".to_string()),
2067                "test1".to_string(),
2068            ))
2069            .await
2070            .unwrap();
2071
2072        assert_eq!(
2073            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2074            table.identifier()
2075        );
2076        assert_eq!(
2077            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2078            table.metadata_location().unwrap()
2079        );
2080        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2081        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2082        assert_eq!(
2083            uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2084            table.metadata().uuid()
2085        );
2086        assert_eq!(
2087            Utc.timestamp_millis_opt(1646787054459).unwrap(),
2088            table.metadata().last_updated_timestamp().unwrap()
2089        );
2090        assert_eq!(
2091            vec![&Arc::new(
2092                Schema::builder()
2093                    .with_fields(vec![
2094                        NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2095                        NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2096                            .into(),
2097                    ])
2098                    .build()
2099                    .unwrap()
2100            )],
2101            table.metadata().schemas_iter().collect::<Vec<_>>()
2102        );
2103        assert_eq!(
2104            &HashMap::from([
2105                ("owner".to_string(), "bryan".to_string()),
2106                (
2107                    "write.metadata.compression-codec".to_string(),
2108                    "gzip".to_string()
2109                )
2110            ]),
2111            table.metadata().properties()
2112        );
2113        assert_eq!(vec![&Arc::new(Snapshot::builder()
2114            .with_snapshot_id(3497810964824022504)
2115            .with_timestamp_ms(1646787054459)
2116            .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2117            .with_sequence_number(0)
2118            .with_schema_id(0)
2119            .with_summary(Summary {
2120                operation: Operation::Append,
2121                additional_properties: HashMap::from_iter([
2122                    ("spark.app.id", "local-1646787004168"),
2123                    ("added-data-files", "1"),
2124                    ("added-records", "1"),
2125                    ("added-files-size", "697"),
2126                    ("changed-partition-count", "1"),
2127                    ("total-records", "1"),
2128                    ("total-files-size", "697"),
2129                    ("total-data-files", "1"),
2130                    ("total-delete-files", "0"),
2131                    ("total-position-deletes", "0"),
2132                    ("total-equality-deletes", "0")
2133                ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2134            }).build()
2135        )], table.metadata().snapshots().collect::<Vec<_>>());
2136        assert_eq!(
2137            &[SnapshotLog {
2138                timestamp_ms: 1646787054459,
2139                snapshot_id: 3497810964824022504,
2140            }],
2141            table.metadata().history()
2142        );
2143        assert_eq!(
2144            vec![&Arc::new(SortOrder {
2145                order_id: 0,
2146                fields: vec![],
2147            })],
2148            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2149        );
2150
2151        config_mock.assert_async().await;
2152        rename_table_mock.assert_async().await;
2153    }
2154
2155    #[tokio::test]
2156    async fn test_load_table_404() {
2157        let mut server = Server::new_async().await;
2158
2159        let config_mock = create_config_mock(&mut server).await;
2160
2161        let rename_table_mock = server
2162            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2163            .with_status(404)
2164            .with_body(r#"
2165{
2166    "error": {
2167        "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2168        "type": "NoSuchNamespaceErrorException",
2169        "code": 404
2170    }
2171}
2172            "#)
2173            .create_async()
2174            .await;
2175
2176        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2177
2178        let table = catalog
2179            .load_table(&TableIdent::new(
2180                NamespaceIdent::new("ns1".to_string()),
2181                "test1".to_string(),
2182            ))
2183            .await;
2184
2185        assert!(table.is_err());
2186        assert!(table.err().unwrap().message().contains("does not exist"));
2187
2188        config_mock.assert_async().await;
2189        rename_table_mock.assert_async().await;
2190    }
2191
2192    #[tokio::test]
2193    async fn test_create_table() {
2194        let mut server = Server::new_async().await;
2195
2196        let config_mock = create_config_mock(&mut server).await;
2197
2198        let create_table_mock = server
2199            .mock("POST", "/v1/namespaces/ns1/tables")
2200            .with_status(200)
2201            .with_body_from_file(format!(
2202                "{}/testdata/{}",
2203                env!("CARGO_MANIFEST_DIR"),
2204                "create_table_response.json"
2205            ))
2206            .create_async()
2207            .await;
2208
2209        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2210
2211        let table_creation = TableCreation::builder()
2212            .name("test1".to_string())
2213            .schema(
2214                Schema::builder()
2215                    .with_fields(vec![
2216                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2217                            .into(),
2218                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2219                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2220                            .into(),
2221                    ])
2222                    .with_schema_id(1)
2223                    .with_identifier_field_ids(vec![2])
2224                    .build()
2225                    .unwrap(),
2226            )
2227            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2228            .partition_spec(
2229                UnboundPartitionSpec::builder()
2230                    .add_partition_fields(vec![
2231                        UnboundPartitionField::builder()
2232                            .source_id(1)
2233                            .transform(Transform::Truncate(3))
2234                            .name("id".to_string())
2235                            .build(),
2236                    ])
2237                    .unwrap()
2238                    .build(),
2239            )
2240            .sort_order(
2241                SortOrder::builder()
2242                    .with_sort_field(
2243                        SortField::builder()
2244                            .source_id(2)
2245                            .transform(Transform::Identity)
2246                            .direction(SortDirection::Ascending)
2247                            .null_order(NullOrder::First)
2248                            .build(),
2249                    )
2250                    .build_unbound()
2251                    .unwrap(),
2252            )
2253            .build();
2254
2255        let table = catalog
2256            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2257            .await
2258            .unwrap();
2259
2260        assert_eq!(
2261            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2262            table.identifier()
2263        );
2264        assert_eq!(
2265            "s3://warehouse/database/table/metadata.json",
2266            table.metadata_location().unwrap()
2267        );
2268        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2269        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2270        assert_eq!(
2271            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2272            table.metadata().uuid()
2273        );
2274        assert_eq!(
2275            1657810967051,
2276            table
2277                .metadata()
2278                .last_updated_timestamp()
2279                .unwrap()
2280                .timestamp_millis()
2281        );
2282        assert_eq!(
2283            vec![&Arc::new(
2284                Schema::builder()
2285                    .with_fields(vec![
2286                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2287                            .into(),
2288                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2289                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2290                            .into(),
2291                    ])
2292                    .with_schema_id(0)
2293                    .with_identifier_field_ids(vec![2])
2294                    .build()
2295                    .unwrap()
2296            )],
2297            table.metadata().schemas_iter().collect::<Vec<_>>()
2298        );
2299        assert_eq!(
2300            &HashMap::from([
2301                (
2302                    "write.delete.parquet.compression-codec".to_string(),
2303                    "zstd".to_string()
2304                ),
2305                (
2306                    "write.metadata.compression-codec".to_string(),
2307                    "gzip".to_string()
2308                ),
2309                (
2310                    "write.summary.partition-limit".to_string(),
2311                    "100".to_string()
2312                ),
2313                (
2314                    "write.parquet.compression-codec".to_string(),
2315                    "zstd".to_string()
2316                ),
2317            ]),
2318            table.metadata().properties()
2319        );
2320        assert!(table.metadata().current_snapshot().is_none());
2321        assert!(table.metadata().history().is_empty());
2322        assert_eq!(
2323            vec![&Arc::new(SortOrder {
2324                order_id: 0,
2325                fields: vec![],
2326            })],
2327            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2328        );
2329
2330        config_mock.assert_async().await;
2331        create_table_mock.assert_async().await;
2332    }
2333
2334    #[tokio::test]
2335    async fn test_create_table_409() {
2336        let mut server = Server::new_async().await;
2337
2338        let config_mock = create_config_mock(&mut server).await;
2339
2340        let create_table_mock = server
2341            .mock("POST", "/v1/namespaces/ns1/tables")
2342            .with_status(409)
2343            .with_body(r#"
2344{
2345    "error": {
2346        "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2347        "type": "AlreadyExistsException",
2348        "code": 409
2349    }
2350}
2351            "#)
2352            .create_async()
2353            .await;
2354
2355        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2356
2357        let table_creation = TableCreation::builder()
2358            .name("test1".to_string())
2359            .schema(
2360                Schema::builder()
2361                    .with_fields(vec![
2362                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2363                            .into(),
2364                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2365                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2366                            .into(),
2367                    ])
2368                    .with_schema_id(1)
2369                    .with_identifier_field_ids(vec![2])
2370                    .build()
2371                    .unwrap(),
2372            )
2373            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2374            .build();
2375
2376        let table_result = catalog
2377            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2378            .await;
2379
2380        assert!(table_result.is_err());
2381        assert!(
2382            table_result
2383                .err()
2384                .unwrap()
2385                .message()
2386                .contains("already exists")
2387        );
2388
2389        config_mock.assert_async().await;
2390        create_table_mock.assert_async().await;
2391    }
2392
2393    #[tokio::test]
2394    async fn test_update_table() {
2395        let mut server = Server::new_async().await;
2396
2397        let config_mock = create_config_mock(&mut server).await;
2398
2399        let load_table_mock = server
2400            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2401            .with_status(200)
2402            .with_body_from_file(format!(
2403                "{}/testdata/{}",
2404                env!("CARGO_MANIFEST_DIR"),
2405                "load_table_response.json"
2406            ))
2407            .create_async()
2408            .await;
2409
2410        let update_table_mock = server
2411            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2412            .with_status(200)
2413            .with_body_from_file(format!(
2414                "{}/testdata/{}",
2415                env!("CARGO_MANIFEST_DIR"),
2416                "update_table_response.json"
2417            ))
2418            .create_async()
2419            .await;
2420
2421        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2422
2423        let table1 = {
2424            let file = File::open(format!(
2425                "{}/testdata/{}",
2426                env!("CARGO_MANIFEST_DIR"),
2427                "create_table_response.json"
2428            ))
2429            .unwrap();
2430            let reader = BufReader::new(file);
2431            let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap();
2432
2433            Table::builder()
2434                .metadata(resp.metadata)
2435                .metadata_location(resp.metadata_location.unwrap())
2436                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2437                .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2438                .build()
2439                .unwrap()
2440        };
2441
2442        let tx = Transaction::new(&table1);
2443        let table = tx
2444            .upgrade_table_version()
2445            .set_format_version(FormatVersion::V2)
2446            .apply(tx)
2447            .unwrap()
2448            .commit(&catalog)
2449            .await
2450            .unwrap();
2451
2452        assert_eq!(
2453            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2454            table.identifier()
2455        );
2456        assert_eq!(
2457            "s3://warehouse/database/table/metadata.json",
2458            table.metadata_location().unwrap()
2459        );
2460        assert_eq!(FormatVersion::V2, table.metadata().format_version());
2461        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2462        assert_eq!(
2463            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2464            table.metadata().uuid()
2465        );
2466        assert_eq!(
2467            1657810967051,
2468            table
2469                .metadata()
2470                .last_updated_timestamp()
2471                .unwrap()
2472                .timestamp_millis()
2473        );
2474        assert_eq!(
2475            vec![&Arc::new(
2476                Schema::builder()
2477                    .with_fields(vec![
2478                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2479                            .into(),
2480                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2481                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2482                            .into(),
2483                    ])
2484                    .with_schema_id(0)
2485                    .with_identifier_field_ids(vec![2])
2486                    .build()
2487                    .unwrap()
2488            )],
2489            table.metadata().schemas_iter().collect::<Vec<_>>()
2490        );
2491        assert_eq!(
2492            &HashMap::from([
2493                (
2494                    "write.delete.parquet.compression-codec".to_string(),
2495                    "zstd".to_string()
2496                ),
2497                (
2498                    "write.metadata.compression-codec".to_string(),
2499                    "gzip".to_string()
2500                ),
2501                (
2502                    "write.summary.partition-limit".to_string(),
2503                    "100".to_string()
2504                ),
2505                (
2506                    "write.parquet.compression-codec".to_string(),
2507                    "zstd".to_string()
2508                ),
2509            ]),
2510            table.metadata().properties()
2511        );
2512        assert!(table.metadata().current_snapshot().is_none());
2513        assert!(table.metadata().history().is_empty());
2514        assert_eq!(
2515            vec![&Arc::new(SortOrder {
2516                order_id: 0,
2517                fields: vec![],
2518            })],
2519            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2520        );
2521
2522        config_mock.assert_async().await;
2523        update_table_mock.assert_async().await;
2524        load_table_mock.assert_async().await
2525    }
2526
2527    #[tokio::test]
2528    async fn test_update_table_404() {
2529        let mut server = Server::new_async().await;
2530
2531        let config_mock = create_config_mock(&mut server).await;
2532
2533        let load_table_mock = server
2534            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2535            .with_status(200)
2536            .with_body_from_file(format!(
2537                "{}/testdata/{}",
2538                env!("CARGO_MANIFEST_DIR"),
2539                "load_table_response.json"
2540            ))
2541            .create_async()
2542            .await;
2543
2544        let update_table_mock = server
2545            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2546            .with_status(404)
2547            .with_body(
2548                r#"
2549{
2550    "error": {
2551        "message": "The given table does not exist",
2552        "type": "NoSuchTableException",
2553        "code": 404
2554    }
2555}
2556            "#,
2557            )
2558            .create_async()
2559            .await;
2560
2561        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2562
2563        let table1 = {
2564            let file = File::open(format!(
2565                "{}/testdata/{}",
2566                env!("CARGO_MANIFEST_DIR"),
2567                "create_table_response.json"
2568            ))
2569            .unwrap();
2570            let reader = BufReader::new(file);
2571            let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap();
2572
2573            Table::builder()
2574                .metadata(resp.metadata)
2575                .metadata_location(resp.metadata_location.unwrap())
2576                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2577                .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2578                .build()
2579                .unwrap()
2580        };
2581
2582        let tx = Transaction::new(&table1);
2583        let table_result = tx
2584            .upgrade_table_version()
2585            .set_format_version(FormatVersion::V2)
2586            .apply(tx)
2587            .unwrap()
2588            .commit(&catalog)
2589            .await;
2590
2591        assert!(table_result.is_err());
2592        assert!(
2593            table_result
2594                .err()
2595                .unwrap()
2596                .message()
2597                .contains("does not exist")
2598        );
2599
2600        config_mock.assert_async().await;
2601        update_table_mock.assert_async().await;
2602        load_table_mock.assert_async().await;
2603    }
2604
2605    #[tokio::test]
2606    async fn test_register_table() {
2607        let mut server = Server::new_async().await;
2608
2609        let config_mock = create_config_mock(&mut server).await;
2610
2611        let register_table_mock = server
2612            .mock("POST", "/v1/namespaces/ns1/register")
2613            .with_status(200)
2614            .with_body_from_file(format!(
2615                "{}/testdata/{}",
2616                env!("CARGO_MANIFEST_DIR"),
2617                "load_table_response.json"
2618            ))
2619            .create_async()
2620            .await;
2621
2622        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2623        let table_ident =
2624            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2625        let metadata_location = String::from(
2626            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2627        );
2628
2629        let table = catalog
2630            .register_table(&table_ident, metadata_location)
2631            .await
2632            .unwrap();
2633
2634        assert_eq!(
2635            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2636            table.identifier()
2637        );
2638        assert_eq!(
2639            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2640            table.metadata_location().unwrap()
2641        );
2642
2643        config_mock.assert_async().await;
2644        register_table_mock.assert_async().await;
2645    }
2646
2647    #[tokio::test]
2648    async fn test_register_table_404() {
2649        let mut server = Server::new_async().await;
2650
2651        let config_mock = create_config_mock(&mut server).await;
2652
2653        let register_table_mock = server
2654            .mock("POST", "/v1/namespaces/ns1/register")
2655            .with_status(404)
2656            .with_body(
2657                r#"
2658{
2659    "error": {
2660        "message": "The namespace specified does not exist",
2661        "type": "NoSuchNamespaceErrorException",
2662        "code": 404
2663    }
2664}
2665            "#,
2666            )
2667            .create_async()
2668            .await;
2669
2670        let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2671
2672        let table_ident =
2673            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2674        let metadata_location = String::from(
2675            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2676        );
2677        let table = catalog
2678            .register_table(&table_ident, metadata_location)
2679            .await;
2680
2681        assert!(table.is_err());
2682        assert!(table.err().unwrap().message().contains("does not exist"));
2683
2684        config_mock.assert_async().await;
2685        register_table_mock.assert_async().await;
2686    }
2687
2688    #[tokio::test]
2689    async fn test_create_rest_catalog() {
2690        let builder = RestCatalogBuilder::default().with_client(Client::new());
2691
2692        let catalog = builder
2693            .load(
2694                "test",
2695                HashMap::from([
2696                    (
2697                        REST_CATALOG_PROP_URI.to_string(),
2698                        "http://localhost:8080".to_string(),
2699                    ),
2700                    ("a".to_string(), "b".to_string()),
2701                ]),
2702            )
2703            .await;
2704
2705        assert!(catalog.is_ok());
2706
2707        let catalog_config = catalog.unwrap().user_config;
2708        assert_eq!(catalog_config.name.as_deref(), Some("test"));
2709        assert_eq!(catalog_config.uri, "http://localhost:8080");
2710        assert_eq!(catalog_config.warehouse, None);
2711        assert!(catalog_config.client.is_some());
2712
2713        assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2714        assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2715    }
2716
2717    #[tokio::test]
2718    async fn test_create_rest_catalog_no_uri() {
2719        let builder = RestCatalogBuilder::default();
2720
2721        let catalog = builder
2722            .load(
2723                "test",
2724                HashMap::from([(
2725                    REST_CATALOG_PROP_WAREHOUSE.to_string(),
2726                    "s3://warehouse".to_string(),
2727                )]),
2728            )
2729            .await;
2730
2731        assert!(catalog.is_err());
2732        if let Err(err) = catalog {
2733            assert_eq!(err.kind(), ErrorKind::DataInvalid);
2734            assert_eq!(err.message(), "Catalog uri is required");
2735        }
2736    }
2737}