iceberg_catalog_rest/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains the iceberg REST catalog implementation.
19
20use std::collections::HashMap;
21use std::future::Future;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
27use iceberg::table::Table;
28use iceberg::{
29    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, Runtime,
30    TableCommit, TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34    HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41    HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44    CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45    CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46    NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49/// REST catalog URI
50pub const REST_CATALOG_PROP_URI: &str = "uri";
51/// REST catalog warehouse location
52pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53/// Disable header redaction in error logs (defaults to false for security)
54pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60/// Builder for [`RestCatalog`].
61#[derive(Debug)]
62pub struct RestCatalogBuilder {
63    config: RestCatalogConfig,
64    storage_factory: Option<Arc<dyn StorageFactory>>,
65    runtime: Option<Runtime>,
66}
67
68impl Default for RestCatalogBuilder {
69    fn default() -> Self {
70        Self {
71            config: RestCatalogConfig {
72                name: None,
73                uri: "".to_string(),
74                warehouse: None,
75                props: HashMap::new(),
76                client: None,
77            },
78            storage_factory: None,
79            runtime: None,
80        }
81    }
82}
83
84impl CatalogBuilder for RestCatalogBuilder {
85    type C = RestCatalog;
86
87    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
88        self.storage_factory = Some(storage_factory);
89        self
90    }
91
92    fn with_runtime(mut self, runtime: Runtime) -> Self {
93        self.runtime = Some(runtime);
94        self
95    }
96
97    fn load(
98        mut self,
99        name: impl Into<String>,
100        props: HashMap<String, String>,
101    ) -> impl Future<Output = Result<Self::C>> + Send {
102        self.config.name = Some(name.into());
103
104        if props.contains_key(REST_CATALOG_PROP_URI) {
105            self.config.uri = props
106                .get(REST_CATALOG_PROP_URI)
107                .cloned()
108                .unwrap_or_default();
109        }
110
111        if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
112            self.config.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
113        }
114
115        // Collect other remaining properties
116        self.config.props = props
117            .into_iter()
118            .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
119            .collect();
120
121        let result = {
122            if self.config.name.is_none() {
123                Err(Error::new(
124                    ErrorKind::DataInvalid,
125                    "Catalog name is required",
126                ))
127            } else if self.config.uri.is_empty() {
128                Err(Error::new(
129                    ErrorKind::DataInvalid,
130                    "Catalog uri is required",
131                ))
132            } else {
133                let runtime = self.runtime.unwrap_or_else(Runtime::current);
134                Ok(RestCatalog::new(self.config, self.storage_factory, runtime))
135            }
136        };
137
138        std::future::ready(result)
139    }
140}
141
142impl RestCatalogBuilder {
143    /// Configures the catalog with a custom HTTP client.
144    pub fn with_client(mut self, client: Client) -> Self {
145        self.config.client = Some(client);
146        self
147    }
148}
149
150/// Rest catalog configuration.
151#[derive(Clone, Debug, TypedBuilder)]
152pub(crate) struct RestCatalogConfig {
153    #[builder(default, setter(strip_option))]
154    name: Option<String>,
155
156    uri: String,
157
158    #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
159    warehouse: Option<String>,
160
161    #[builder(default)]
162    props: HashMap<String, String>,
163
164    #[builder(default)]
165    client: Option<Client>,
166}
167
168impl RestCatalogConfig {
169    fn url_prefixed(&self, parts: &[&str]) -> String {
170        [&self.uri, PATH_V1]
171            .into_iter()
172            .chain(self.props.get("prefix").map(|s| &**s))
173            .chain(parts.iter().cloned())
174            .join("/")
175    }
176
177    fn config_endpoint(&self) -> String {
178        [&self.uri, PATH_V1, "config"].join("/")
179    }
180
181    pub(crate) fn get_token_endpoint(&self) -> String {
182        if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
183            oauth2_uri.to_string()
184        } else {
185            [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
186        }
187    }
188
189    fn namespaces_endpoint(&self) -> String {
190        self.url_prefixed(&["namespaces"])
191    }
192
193    fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
194        self.url_prefixed(&["namespaces", &ns.to_url_string()])
195    }
196
197    fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
198        self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
199    }
200
201    fn rename_table_endpoint(&self) -> String {
202        self.url_prefixed(&["tables", "rename"])
203    }
204
205    fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
206        self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
207    }
208
209    fn table_endpoint(&self, table: &TableIdent) -> String {
210        self.url_prefixed(&[
211            "namespaces",
212            &table.namespace.to_url_string(),
213            "tables",
214            &table.name,
215        ])
216    }
217
218    /// Get the client from the config.
219    pub(crate) fn client(&self) -> Option<Client> {
220        self.client.clone()
221    }
222
223    /// Get the token from the config.
224    ///
225    /// The client can use this token to send requests.
226    pub(crate) fn token(&self) -> Option<String> {
227        self.props.get("token").cloned()
228    }
229
230    /// Get the credentials from the config. The client can use these credentials to fetch a new
231    /// token.
232    ///
233    /// ## Output
234    ///
235    /// - `None`: No credential is set.
236    /// - `Some(None, client_secret)`: No client_id is set, use client_secret directly.
237    /// - `Some(Some(client_id), client_secret)`: Both client_id and client_secret are set.
238    pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
239        let cred = self.props.get("credential")?;
240
241        match cred.split_once(':') {
242            Some((client_id, client_secret)) => {
243                Some((Some(client_id.to_string()), client_secret.to_string()))
244            }
245            None => Some((None, cred.to_string())),
246        }
247    }
248
249    /// Get the extra headers from config, which includes:
250    ///
251    /// - `content-type`
252    /// - `x-client-version`
253    /// - `user-agent`
254    /// - All headers specified by `header.xxx` in props.
255    pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
256        let mut headers = HeaderMap::from_iter([
257            (
258                header::CONTENT_TYPE,
259                HeaderValue::from_static("application/json"),
260            ),
261            (
262                HeaderName::from_static("x-client-version"),
263                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
264            ),
265            (
266                header::USER_AGENT,
267                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
268            ),
269        ]);
270
271        for (key, value) in self
272            .props
273            .iter()
274            .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
275        {
276            headers.insert(
277                HeaderName::from_str(key).map_err(|e| {
278                    Error::new(
279                        ErrorKind::DataInvalid,
280                        format!("Invalid header name: {key}"),
281                    )
282                    .with_source(e)
283                })?,
284                HeaderValue::from_str(value).map_err(|e| {
285                    Error::new(
286                        ErrorKind::DataInvalid,
287                        format!("Invalid header value: {value}"),
288                    )
289                    .with_source(e)
290                })?,
291            );
292        }
293
294        Ok(headers)
295    }
296
297    /// Get the optional OAuth headers from the config.
298    pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
299        let mut params = HashMap::new();
300
301        if let Some(scope) = self.props.get("scope") {
302            params.insert("scope".to_string(), scope.to_string());
303        } else {
304            params.insert("scope".to_string(), "catalog".to_string());
305        }
306
307        let optional_params = ["audience", "resource"];
308        for param_name in optional_params {
309            if let Some(value) = self.props.get(param_name) {
310                params.insert(param_name.to_string(), value.to_string());
311            }
312        }
313
314        params
315    }
316
317    /// Check if header redaction is disabled in error logs.
318    ///
319    /// Returns true if the `disable-header-redaction` property is set to "true".
320    /// Defaults to false for security (headers are redacted by default).
321    pub(crate) fn disable_header_redaction(&self) -> bool {
322        self.props
323            .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
324            .map(|v| v.eq_ignore_ascii_case("true"))
325            .unwrap_or(false)
326    }
327
328    /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server).
329    pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
330        if let Some(uri) = config.overrides.remove("uri") {
331            self.uri = uri;
332        }
333
334        let mut props = config.defaults;
335        props.extend(self.props);
336        props.extend(config.overrides);
337
338        self.props = props;
339        self
340    }
341}
342
343#[derive(Debug)]
344struct RestContext {
345    client: HttpClient,
346    /// Runtime config is fetched from rest server and stored here.
347    ///
348    /// It's could be different from the user config.
349    config: RestCatalogConfig,
350}
351
352/// Rest catalog implementation.
353#[derive(Debug)]
354pub struct RestCatalog {
355    /// User config is stored as-is and never be changed.
356    ///
357    /// It could be different from the config fetched from the server and used at runtime.
358    user_config: RestCatalogConfig,
359    ctx: OnceCell<RestContext>,
360    /// Storage factory for creating FileIO instances.
361    storage_factory: Option<Arc<dyn StorageFactory>>,
362    runtime: Runtime,
363}
364
365impl RestCatalog {
366    /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
367    fn new(
368        config: RestCatalogConfig,
369        storage_factory: Option<Arc<dyn StorageFactory>>,
370        runtime: Runtime,
371    ) -> Self {
372        Self {
373            user_config: config,
374            ctx: OnceCell::new(),
375            storage_factory,
376            runtime,
377        }
378    }
379
380    /// Sends a DELETE request for the given table, optionally requesting purge.
381    async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> {
382        let context = self.context().await?;
383
384        let mut request_builder = context
385            .client
386            .request(Method::DELETE, context.config.table_endpoint(table));
387
388        if purge {
389            request_builder = request_builder.query(&[("purgeRequested", "true")]);
390        }
391
392        let request = request_builder.build()?;
393        let http_response = context.client.query_catalog(request).await?;
394
395        match http_response.status() {
396            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
397            StatusCode::NOT_FOUND => Err(Error::new(
398                ErrorKind::TableNotFound,
399                "Tried to drop a table that does not exist",
400            )),
401            _ => Err(deserialize_unexpected_catalog_error(
402                http_response,
403                context.client.disable_header_redaction(),
404            )
405            .await),
406        }
407    }
408
409    /// Gets the [`RestContext`] from the catalog.
410    async fn context(&self) -> Result<&RestContext> {
411        self.ctx
412            .get_or_try_init(|| async {
413                let client = HttpClient::new(&self.user_config)?;
414                let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
415                let config = self.user_config.clone().merge_with_config(catalog_config);
416                let client = client.update_with(&config)?;
417
418                Ok(RestContext { config, client })
419            })
420            .await
421    }
422
423    /// Load the runtime config from the server by `user_config`.
424    ///
425    /// It's required for a REST catalog to update its config after creation.
426    async fn load_config(
427        client: &HttpClient,
428        user_config: &RestCatalogConfig,
429    ) -> Result<CatalogConfig> {
430        let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
431
432        if let Some(warehouse_location) = &user_config.warehouse {
433            request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
434        }
435
436        let request = request_builder.build()?;
437
438        let http_response = client.query_catalog(request).await?;
439
440        match http_response.status() {
441            StatusCode::OK => deserialize_catalog_response(http_response).await,
442            _ => Err(deserialize_unexpected_catalog_error(
443                http_response,
444                client.disable_header_redaction(),
445            )
446            .await),
447        }
448    }
449
450    async fn load_file_io(
451        &self,
452        metadata_location: Option<&str>,
453        extra_config: Option<HashMap<String, String>>,
454    ) -> Result<FileIO> {
455        let mut props = self.context().await?.config.props.clone();
456        if let Some(config) = extra_config {
457            props.extend(config);
458        }
459
460        // If the warehouse is a logical identifier instead of a URL we don't want
461        // to raise an exception
462        let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
463            Some(url) if Url::parse(url).is_ok() => Some(url),
464            Some(_) => None,
465            None => None,
466        };
467
468        if metadata_location.or(warehouse_path).is_none() {
469            return Err(Error::new(
470                ErrorKind::Unexpected,
471                "Unable to load file io, neither warehouse nor metadata location is set!",
472            ));
473        }
474
475        // Require a StorageFactory to be provided
476        let factory = self
477            .storage_factory
478            .clone()
479            .ok_or_else(|| {
480                Error::new(
481                    ErrorKind::Unexpected,
482                    "StorageFactory must be provided for RestCatalog. Use `with_storage_factory` to configure it.",
483                )
484            })?;
485
486        let file_io = FileIOBuilder::new(factory).with_props(props).build();
487
488        Ok(file_io)
489    }
490
491    /// Invalidate the current token without generating a new one. On the next request, the client
492    /// will attempt to generate a new token.
493    pub async fn invalidate_token(&self) -> Result<()> {
494        self.context().await?.client.invalidate_token().await
495    }
496
497    /// Invalidate the current token and set a new one. Generates a new token before invalidating
498    /// the current token, meaning the old token will be used until this function acquires the lock
499    /// and overwrites the token.
500    ///
501    /// If credential is invalid, or the request fails, this method will return an error and leave
502    /// the current token unchanged.
503    pub async fn regenerate_token(&self) -> Result<()> {
504        self.context().await?.client.regenerate_token().await
505    }
506}
507
508/// All requests and expected responses are derived from the REST catalog API spec:
509/// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
510#[async_trait]
511impl Catalog for RestCatalog {
512    async fn list_namespaces(
513        &self,
514        parent: Option<&NamespaceIdent>,
515    ) -> Result<Vec<NamespaceIdent>> {
516        let context = self.context().await?;
517        let endpoint = context.config.namespaces_endpoint();
518        let mut namespaces = Vec::new();
519        let mut next_token = None;
520
521        loop {
522            let mut request = context.client.request(Method::GET, endpoint.clone());
523
524            // Filter on `parent={namespace}` if a parent namespace exists.
525            if let Some(ns) = parent {
526                request = request.query(&[("parent", ns.to_url_string())]);
527            }
528
529            if let Some(token) = next_token {
530                request = request.query(&[("pageToken", token)]);
531            }
532
533            let http_response = context.client.query_catalog(request.build()?).await?;
534
535            match http_response.status() {
536                StatusCode::OK => {
537                    let response =
538                        deserialize_catalog_response::<ListNamespaceResponse>(http_response)
539                            .await?;
540
541                    namespaces.extend(response.namespaces);
542
543                    match response.next_page_token {
544                        Some(token) => next_token = Some(token),
545                        None => break,
546                    }
547                }
548                StatusCode::NOT_FOUND => {
549                    return Err(Error::new(
550                        ErrorKind::NamespaceNotFound,
551                        "The parent parameter of the namespace provided does not exist",
552                    ));
553                }
554                _ => {
555                    return Err(deserialize_unexpected_catalog_error(
556                        http_response,
557                        context.client.disable_header_redaction(),
558                    )
559                    .await);
560                }
561            }
562        }
563
564        Ok(namespaces)
565    }
566
567    async fn create_namespace(
568        &self,
569        namespace: &NamespaceIdent,
570        properties: HashMap<String, String>,
571    ) -> Result<Namespace> {
572        let context = self.context().await?;
573
574        let request = context
575            .client
576            .request(Method::POST, context.config.namespaces_endpoint())
577            .json(&CreateNamespaceRequest {
578                namespace: namespace.clone(),
579                properties,
580            })
581            .build()?;
582
583        let http_response = context.client.query_catalog(request).await?;
584
585        match http_response.status() {
586            StatusCode::OK => {
587                let response =
588                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
589                Ok(Namespace::from(response))
590            }
591            StatusCode::CONFLICT => Err(Error::new(
592                ErrorKind::NamespaceAlreadyExists,
593                "Tried to create a namespace that already exists",
594            )),
595            _ => Err(deserialize_unexpected_catalog_error(
596                http_response,
597                context.client.disable_header_redaction(),
598            )
599            .await),
600        }
601    }
602
603    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
604        let context = self.context().await?;
605
606        let request = context
607            .client
608            .request(Method::GET, context.config.namespace_endpoint(namespace))
609            .build()?;
610
611        let http_response = context.client.query_catalog(request).await?;
612
613        match http_response.status() {
614            StatusCode::OK => {
615                let response =
616                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
617                Ok(Namespace::from(response))
618            }
619            StatusCode::NOT_FOUND => Err(Error::new(
620                ErrorKind::NamespaceNotFound,
621                "Tried to get a namespace that does not exist",
622            )),
623            _ => Err(deserialize_unexpected_catalog_error(
624                http_response,
625                context.client.disable_header_redaction(),
626            )
627            .await),
628        }
629    }
630
631    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
632        let context = self.context().await?;
633
634        let request = context
635            .client
636            .request(Method::HEAD, context.config.namespace_endpoint(ns))
637            .build()?;
638
639        let http_response = context.client.query_catalog(request).await?;
640
641        match http_response.status() {
642            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
643            StatusCode::NOT_FOUND => Ok(false),
644            _ => Err(deserialize_unexpected_catalog_error(
645                http_response,
646                context.client.disable_header_redaction(),
647            )
648            .await),
649        }
650    }
651
652    async fn update_namespace(
653        &self,
654        _namespace: &NamespaceIdent,
655        _properties: HashMap<String, String>,
656    ) -> Result<()> {
657        Err(Error::new(
658            ErrorKind::FeatureUnsupported,
659            "Updating namespace not supported yet!",
660        ))
661    }
662
663    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
664        let context = self.context().await?;
665
666        let request = context
667            .client
668            .request(Method::DELETE, context.config.namespace_endpoint(namespace))
669            .build()?;
670
671        let http_response = context.client.query_catalog(request).await?;
672
673        match http_response.status() {
674            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
675            StatusCode::NOT_FOUND => Err(Error::new(
676                ErrorKind::NamespaceNotFound,
677                "Tried to drop a namespace that does not exist",
678            )),
679            _ => Err(deserialize_unexpected_catalog_error(
680                http_response,
681                context.client.disable_header_redaction(),
682            )
683            .await),
684        }
685    }
686
687    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
688        let context = self.context().await?;
689        let endpoint = context.config.tables_endpoint(namespace);
690        let mut identifiers = Vec::new();
691        let mut next_token = None;
692
693        loop {
694            let mut request = context.client.request(Method::GET, endpoint.clone());
695
696            if let Some(token) = next_token {
697                request = request.query(&[("pageToken", token)]);
698            }
699
700            let http_response = context.client.query_catalog(request.build()?).await?;
701
702            match http_response.status() {
703                StatusCode::OK => {
704                    let response =
705                        deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
706
707                    identifiers.extend(response.identifiers);
708
709                    match response.next_page_token {
710                        Some(token) => next_token = Some(token),
711                        None => break,
712                    }
713                }
714                StatusCode::NOT_FOUND => {
715                    return Err(Error::new(
716                        ErrorKind::NamespaceNotFound,
717                        "Tried to list tables of a namespace that does not exist",
718                    ));
719                }
720                _ => {
721                    return Err(deserialize_unexpected_catalog_error(
722                        http_response,
723                        context.client.disable_header_redaction(),
724                    )
725                    .await);
726                }
727            }
728        }
729
730        Ok(identifiers)
731    }
732
733    /// Create a new table inside the namespace.
734    ///
735    /// In the resulting table, if there are any config properties that
736    /// are present in both the response from the REST server and the
737    /// config provided when creating this `RestCatalog` instance then
738    /// the value provided locally to the `RestCatalog` will take precedence.
739    async fn create_table(
740        &self,
741        namespace: &NamespaceIdent,
742        creation: TableCreation,
743    ) -> Result<Table> {
744        let context = self.context().await?;
745
746        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
747
748        let request = context
749            .client
750            .request(Method::POST, context.config.tables_endpoint(namespace))
751            .json(&CreateTableRequest {
752                name: creation.name,
753                location: creation.location,
754                schema: creation.schema,
755                partition_spec: creation.partition_spec,
756                write_order: creation.sort_order,
757                stage_create: Some(false),
758                properties: creation.properties,
759            })
760            .build()?;
761
762        let http_response = context.client.query_catalog(request).await?;
763
764        let response = match http_response.status() {
765            StatusCode::OK => {
766                deserialize_catalog_response::<LoadTableResult>(http_response).await?
767            }
768            StatusCode::NOT_FOUND => {
769                return Err(Error::new(
770                    ErrorKind::NamespaceNotFound,
771                    "Tried to create a table under a namespace that does not exist",
772                ));
773            }
774            StatusCode::CONFLICT => {
775                return Err(Error::new(
776                    ErrorKind::TableAlreadyExists,
777                    "The table already exists",
778                ));
779            }
780            _ => {
781                return Err(deserialize_unexpected_catalog_error(
782                    http_response,
783                    context.client.disable_header_redaction(),
784                )
785                .await);
786            }
787        };
788
789        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
790            ErrorKind::DataInvalid,
791            "Metadata location missing in `create_table` response!",
792        ))?;
793
794        let config = response
795            .config
796            .into_iter()
797            .chain(self.user_config.props.clone())
798            .collect();
799
800        let file_io = self
801            .load_file_io(Some(metadata_location), Some(config))
802            .await?;
803
804        let table_builder = Table::builder()
805            .identifier(table_ident.clone())
806            .file_io(file_io)
807            .metadata(response.metadata)
808            .runtime(self.runtime.clone());
809
810        if let Some(metadata_location) = response.metadata_location {
811            table_builder.metadata_location(metadata_location).build()
812        } else {
813            table_builder.build()
814        }
815    }
816
817    /// Load table from the catalog.
818    ///
819    /// If there are any config properties that are present in both the response from the REST
820    /// server and the config provided when creating this `RestCatalog` instance, then the value
821    /// provided locally to the `RestCatalog` will take precedence.
822    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
823        let context = self.context().await?;
824
825        let request = context
826            .client
827            .request(Method::GET, context.config.table_endpoint(table_ident))
828            .build()?;
829
830        let http_response = context.client.query_catalog(request).await?;
831
832        let response = match http_response.status() {
833            StatusCode::OK | StatusCode::NOT_MODIFIED => {
834                deserialize_catalog_response::<LoadTableResult>(http_response).await?
835            }
836            StatusCode::NOT_FOUND => {
837                return Err(Error::new(
838                    ErrorKind::TableNotFound,
839                    "Tried to load a table that does not exist",
840                ));
841            }
842            _ => {
843                return Err(deserialize_unexpected_catalog_error(
844                    http_response,
845                    context.client.disable_header_redaction(),
846                )
847                .await);
848            }
849        };
850
851        let config = response
852            .config
853            .into_iter()
854            .chain(self.user_config.props.clone())
855            .collect();
856
857        let file_io = self
858            .load_file_io(response.metadata_location.as_deref(), Some(config))
859            .await?;
860
861        let table_builder = Table::builder()
862            .identifier(table_ident.clone())
863            .file_io(file_io)
864            .metadata(response.metadata)
865            .runtime(self.runtime.clone());
866
867        if let Some(metadata_location) = response.metadata_location {
868            table_builder.metadata_location(metadata_location).build()
869        } else {
870            table_builder.build()
871        }
872    }
873
874    /// Drop a table from the catalog.
875    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
876        self.delete_table(table, false).await
877    }
878
879    /// Drop a table from the catalog and purge its data by sending
880    /// `purgeRequested=true` to the REST server.
881    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
882        self.delete_table(table, true).await
883    }
884
885    /// Check if a table exists in the catalog.
886    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
887        let context = self.context().await?;
888
889        let request = context
890            .client
891            .request(Method::HEAD, context.config.table_endpoint(table))
892            .build()?;
893
894        let http_response = context.client.query_catalog(request).await?;
895
896        match http_response.status() {
897            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
898            StatusCode::NOT_FOUND => Ok(false),
899            _ => Err(deserialize_unexpected_catalog_error(
900                http_response,
901                context.client.disable_header_redaction(),
902            )
903            .await),
904        }
905    }
906
907    /// Rename a table in the catalog.
908    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
909        let context = self.context().await?;
910
911        let request = context
912            .client
913            .request(Method::POST, context.config.rename_table_endpoint())
914            .json(&RenameTableRequest {
915                source: src.clone(),
916                destination: dest.clone(),
917            })
918            .build()?;
919
920        let http_response = context.client.query_catalog(request).await?;
921
922        match http_response.status() {
923            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
924            StatusCode::NOT_FOUND => Err(Error::new(
925                ErrorKind::TableNotFound,
926                "Tried to rename a table that does not exist (is the namespace correct?)",
927            )),
928            StatusCode::CONFLICT => Err(Error::new(
929                ErrorKind::TableAlreadyExists,
930                "Tried to rename a table to a name that already exists",
931            )),
932            _ => Err(deserialize_unexpected_catalog_error(
933                http_response,
934                context.client.disable_header_redaction(),
935            )
936            .await),
937        }
938    }
939
940    async fn register_table(
941        &self,
942        table_ident: &TableIdent,
943        metadata_location: String,
944    ) -> Result<Table> {
945        let context = self.context().await?;
946
947        let request = context
948            .client
949            .request(
950                Method::POST,
951                context
952                    .config
953                    .register_table_endpoint(table_ident.namespace()),
954            )
955            .json(&RegisterTableRequest {
956                name: table_ident.name.clone(),
957                metadata_location: metadata_location.clone(),
958                overwrite: Some(false),
959            })
960            .build()?;
961
962        let http_response = context.client.query_catalog(request).await?;
963
964        let response: LoadTableResult = match http_response.status() {
965            StatusCode::OK => {
966                deserialize_catalog_response::<LoadTableResult>(http_response).await?
967            }
968            StatusCode::NOT_FOUND => {
969                return Err(Error::new(
970                    ErrorKind::NamespaceNotFound,
971                    "The namespace specified does not exist.",
972                ));
973            }
974            StatusCode::CONFLICT => {
975                return Err(Error::new(
976                    ErrorKind::TableAlreadyExists,
977                    "The given table already exists.",
978                ));
979            }
980            _ => {
981                return Err(deserialize_unexpected_catalog_error(
982                    http_response,
983                    context.client.disable_header_redaction(),
984                )
985                .await);
986            }
987        };
988
989        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
990            ErrorKind::DataInvalid,
991            "Metadata location missing in `register_table` response!",
992        ))?;
993
994        let file_io = self.load_file_io(Some(metadata_location), None).await?;
995
996        Table::builder()
997            .identifier(table_ident.clone())
998            .file_io(file_io)
999            .metadata(response.metadata)
1000            .metadata_location(metadata_location.clone())
1001            .runtime(self.runtime.clone())
1002            .build()
1003    }
1004
1005    async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
1006        let context = self.context().await?;
1007
1008        let request = context
1009            .client
1010            .request(
1011                Method::POST,
1012                context.config.table_endpoint(commit.identifier()),
1013            )
1014            .json(&CommitTableRequest {
1015                identifier: Some(commit.identifier().clone()),
1016                requirements: commit.take_requirements(),
1017                updates: commit.take_updates(),
1018            })
1019            .build()?;
1020
1021        let http_response = context.client.query_catalog(request).await?;
1022
1023        let response: CommitTableResponse = match http_response.status() {
1024            StatusCode::OK => deserialize_catalog_response(http_response).await?,
1025            StatusCode::NOT_FOUND => {
1026                return Err(Error::new(
1027                    ErrorKind::TableNotFound,
1028                    "Tried to update a table that does not exist",
1029                ));
1030            }
1031            StatusCode::CONFLICT => {
1032                return Err(Error::new(
1033                    ErrorKind::CatalogCommitConflicts,
1034                    "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
1035                )
1036                .with_retryable(true));
1037            }
1038            StatusCode::INTERNAL_SERVER_ERROR => {
1039                return Err(Error::new(
1040                    ErrorKind::Unexpected,
1041                    "An unknown server-side problem occurred; the commit state is unknown.",
1042                ));
1043            }
1044            StatusCode::BAD_GATEWAY => {
1045                return Err(Error::new(
1046                    ErrorKind::Unexpected,
1047                    "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1048                ));
1049            }
1050            StatusCode::GATEWAY_TIMEOUT => {
1051                return Err(Error::new(
1052                    ErrorKind::Unexpected,
1053                    "A server-side gateway timeout occurred; the commit state is unknown.",
1054                ));
1055            }
1056            _ => {
1057                return Err(deserialize_unexpected_catalog_error(
1058                    http_response,
1059                    context.client.disable_header_redaction(),
1060                )
1061                .await);
1062            }
1063        };
1064
1065        let file_io = self
1066            .load_file_io(Some(&response.metadata_location), None)
1067            .await?;
1068
1069        Table::builder()
1070            .identifier(commit.identifier().clone())
1071            .file_io(file_io)
1072            .metadata(response.metadata)
1073            .metadata_location(response.metadata_location)
1074            .runtime(self.runtime.clone())
1075            .build()
1076    }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use std::fs::File;
1082    use std::io::BufReader;
1083    use std::sync::Arc;
1084
1085    use chrono::{TimeZone, Utc};
1086    use iceberg::io::LocalFsStorageFactory;
1087    use iceberg::spec::{
1088        FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1089        SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1090        UnboundPartitionField, UnboundPartitionSpec,
1091    };
1092    use iceberg::test_utils::test_runtime;
1093    use iceberg::transaction::{ApplyTransactionAction, Transaction};
1094    use mockito::{Mock, Server, ServerGuard};
1095    use serde_json::json;
1096    use uuid::uuid;
1097
1098    use super::*;
1099
1100    #[tokio::test]
1101    async fn test_update_config() {
1102        let mut server = Server::new_async().await;
1103
1104        let config_mock = server
1105            .mock("GET", "/v1/config")
1106            .with_status(200)
1107            .with_body(
1108                r#"{
1109                "overrides": {
1110                    "warehouse": "s3://iceberg-catalog"
1111                },
1112                "defaults": {}
1113            }"#,
1114            )
1115            .create_async()
1116            .await;
1117
1118        let catalog = RestCatalog::new(
1119            RestCatalogConfig::builder().uri(server.url()).build(),
1120            Some(Arc::new(LocalFsStorageFactory)),
1121            Runtime::current(),
1122        );
1123
1124        assert_eq!(
1125            catalog
1126                .context()
1127                .await
1128                .unwrap()
1129                .config
1130                .props
1131                .get("warehouse"),
1132            Some(&"s3://iceberg-catalog".to_string())
1133        );
1134
1135        config_mock.assert_async().await;
1136    }
1137
1138    async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1139        server
1140            .mock("GET", "/v1/config")
1141            .with_status(200)
1142            .with_body(
1143                r#"{
1144                "overrides": {
1145                    "warehouse": "s3://iceberg-catalog"
1146                },
1147                "defaults": {}
1148            }"#,
1149            )
1150            .create_async()
1151            .await
1152    }
1153
1154    async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1155        create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1156    }
1157
1158    async fn create_oauth_mock_with_path(
1159        server: &mut ServerGuard,
1160        path: &str,
1161        token: &str,
1162        status: usize,
1163    ) -> Mock {
1164        let body = format!(
1165            r#"{{
1166                "access_token": "{token}",
1167                "token_type": "Bearer",
1168                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1169                "expires_in": 86400
1170            }}"#
1171        );
1172        server
1173            .mock("POST", path)
1174            .with_status(status)
1175            .with_body(body)
1176            .expect(1)
1177            .create_async()
1178            .await
1179    }
1180
1181    #[tokio::test]
1182    async fn test_oauth() {
1183        let mut server = Server::new_async().await;
1184        let oauth_mock = create_oauth_mock(&mut server).await;
1185        let config_mock = create_config_mock(&mut server).await;
1186
1187        let mut props = HashMap::new();
1188        props.insert("credential".to_string(), "client1:secret1".to_string());
1189
1190        let catalog = RestCatalog::new(
1191            RestCatalogConfig::builder()
1192                .uri(server.url())
1193                .props(props)
1194                .build(),
1195            Some(Arc::new(LocalFsStorageFactory)),
1196            Runtime::current(),
1197        );
1198
1199        let token = catalog.context().await.unwrap().client.token().await;
1200        oauth_mock.assert_async().await;
1201        config_mock.assert_async().await;
1202        assert_eq!(token, Some("ey000000000000".to_string()));
1203    }
1204
1205    #[tokio::test]
1206    async fn test_oauth_with_optional_param() {
1207        let mut props = HashMap::new();
1208        props.insert("credential".to_string(), "client1:secret1".to_string());
1209        props.insert("scope".to_string(), "custom_scope".to_string());
1210        props.insert("audience".to_string(), "custom_audience".to_string());
1211        props.insert("resource".to_string(), "custom_resource".to_string());
1212
1213        let mut server = Server::new_async().await;
1214        let oauth_mock = server
1215            .mock("POST", "/v1/oauth/tokens")
1216            .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1217            .match_body(mockito::Matcher::Regex(
1218                "audience=custom_audience".to_string(),
1219            ))
1220            .match_body(mockito::Matcher::Regex(
1221                "resource=custom_resource".to_string(),
1222            ))
1223            .with_status(200)
1224            .with_body(
1225                r#"{
1226                "access_token": "ey000000000000",
1227                "token_type": "Bearer",
1228                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1229                "expires_in": 86400
1230                }"#,
1231            )
1232            .expect(1)
1233            .create_async()
1234            .await;
1235
1236        let config_mock = create_config_mock(&mut server).await;
1237
1238        let catalog = RestCatalog::new(
1239            RestCatalogConfig::builder()
1240                .uri(server.url())
1241                .props(props)
1242                .build(),
1243            Some(Arc::new(LocalFsStorageFactory)),
1244            Runtime::current(),
1245        );
1246
1247        let token = catalog.context().await.unwrap().client.token().await;
1248
1249        oauth_mock.assert_async().await;
1250        config_mock.assert_async().await;
1251        assert_eq!(token, Some("ey000000000000".to_string()));
1252    }
1253
1254    #[tokio::test]
1255    async fn test_invalidate_token() {
1256        let mut server = Server::new_async().await;
1257        let oauth_mock = create_oauth_mock(&mut server).await;
1258        let config_mock = create_config_mock(&mut server).await;
1259
1260        let mut props = HashMap::new();
1261        props.insert("credential".to_string(), "client1:secret1".to_string());
1262
1263        let catalog = RestCatalog::new(
1264            RestCatalogConfig::builder()
1265                .uri(server.url())
1266                .props(props)
1267                .build(),
1268            Some(Arc::new(LocalFsStorageFactory)),
1269            Runtime::current(),
1270        );
1271
1272        let token = catalog.context().await.unwrap().client.token().await;
1273        oauth_mock.assert_async().await;
1274        config_mock.assert_async().await;
1275        assert_eq!(token, Some("ey000000000000".to_string()));
1276
1277        let oauth_mock =
1278            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1279                .await;
1280        catalog.invalidate_token().await.unwrap();
1281        let token = catalog.context().await.unwrap().client.token().await;
1282        oauth_mock.assert_async().await;
1283        assert_eq!(token, Some("ey000000000001".to_string()));
1284    }
1285
1286    #[tokio::test]
1287    async fn test_invalidate_token_failing_request() {
1288        let mut server = Server::new_async().await;
1289        let oauth_mock = create_oauth_mock(&mut server).await;
1290        let config_mock = create_config_mock(&mut server).await;
1291
1292        let mut props = HashMap::new();
1293        props.insert("credential".to_string(), "client1:secret1".to_string());
1294
1295        let catalog = RestCatalog::new(
1296            RestCatalogConfig::builder()
1297                .uri(server.url())
1298                .props(props)
1299                .build(),
1300            Some(Arc::new(LocalFsStorageFactory)),
1301            Runtime::current(),
1302        );
1303
1304        let token = catalog.context().await.unwrap().client.token().await;
1305        oauth_mock.assert_async().await;
1306        config_mock.assert_async().await;
1307        assert_eq!(token, Some("ey000000000000".to_string()));
1308
1309        let oauth_mock =
1310            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1311                .await;
1312        catalog.invalidate_token().await.unwrap();
1313        let token = catalog.context().await.unwrap().client.token().await;
1314        oauth_mock.assert_async().await;
1315        assert_eq!(token, None);
1316    }
1317
1318    #[tokio::test]
1319    async fn test_regenerate_token() {
1320        let mut server = Server::new_async().await;
1321        let oauth_mock = create_oauth_mock(&mut server).await;
1322        let config_mock = create_config_mock(&mut server).await;
1323
1324        let mut props = HashMap::new();
1325        props.insert("credential".to_string(), "client1:secret1".to_string());
1326
1327        let catalog = RestCatalog::new(
1328            RestCatalogConfig::builder()
1329                .uri(server.url())
1330                .props(props)
1331                .build(),
1332            Some(Arc::new(LocalFsStorageFactory)),
1333            Runtime::current(),
1334        );
1335
1336        let token = catalog.context().await.unwrap().client.token().await;
1337        oauth_mock.assert_async().await;
1338        config_mock.assert_async().await;
1339        assert_eq!(token, Some("ey000000000000".to_string()));
1340
1341        let oauth_mock =
1342            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1343                .await;
1344        catalog.regenerate_token().await.unwrap();
1345        oauth_mock.assert_async().await;
1346        let token = catalog.context().await.unwrap().client.token().await;
1347        assert_eq!(token, Some("ey000000000001".to_string()));
1348    }
1349
1350    #[tokio::test]
1351    async fn test_regenerate_token_failing_request() {
1352        let mut server = Server::new_async().await;
1353        let oauth_mock = create_oauth_mock(&mut server).await;
1354        let config_mock = create_config_mock(&mut server).await;
1355
1356        let mut props = HashMap::new();
1357        props.insert("credential".to_string(), "client1:secret1".to_string());
1358
1359        let catalog = RestCatalog::new(
1360            RestCatalogConfig::builder()
1361                .uri(server.url())
1362                .props(props)
1363                .build(),
1364            Some(Arc::new(LocalFsStorageFactory)),
1365            Runtime::current(),
1366        );
1367
1368        let token = catalog.context().await.unwrap().client.token().await;
1369        oauth_mock.assert_async().await;
1370        config_mock.assert_async().await;
1371        assert_eq!(token, Some("ey000000000000".to_string()));
1372
1373        let oauth_mock =
1374            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1375                .await;
1376        let invalidate_result = catalog.regenerate_token().await;
1377        assert!(invalidate_result.is_err());
1378        oauth_mock.assert_async().await;
1379        let token = catalog.context().await.unwrap().client.token().await;
1380
1381        // original token is left intact
1382        assert_eq!(token, Some("ey000000000000".to_string()));
1383    }
1384
1385    #[tokio::test]
1386    async fn test_http_headers() {
1387        let server = Server::new_async().await;
1388        let mut props = HashMap::new();
1389        props.insert("credential".to_string(), "client1:secret1".to_string());
1390
1391        let config = RestCatalogConfig::builder()
1392            .uri(server.url())
1393            .props(props)
1394            .build();
1395        let headers: HeaderMap = config.extra_headers().unwrap();
1396
1397        let expected_headers = HeaderMap::from_iter([
1398            (
1399                header::CONTENT_TYPE,
1400                HeaderValue::from_static("application/json"),
1401            ),
1402            (
1403                HeaderName::from_static("x-client-version"),
1404                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1405            ),
1406            (
1407                header::USER_AGENT,
1408                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1409            ),
1410        ]);
1411        assert_eq!(headers, expected_headers);
1412    }
1413
1414    #[tokio::test]
1415    async fn test_http_headers_with_custom_headers() {
1416        let server = Server::new_async().await;
1417        let mut props = HashMap::new();
1418        props.insert("credential".to_string(), "client1:secret1".to_string());
1419        props.insert(
1420            "header.content-type".to_string(),
1421            "application/yaml".to_string(),
1422        );
1423        props.insert(
1424            "header.customized-header".to_string(),
1425            "some/value".to_string(),
1426        );
1427
1428        let config = RestCatalogConfig::builder()
1429            .uri(server.url())
1430            .props(props)
1431            .build();
1432        let headers: HeaderMap = config.extra_headers().unwrap();
1433
1434        let expected_headers = HeaderMap::from_iter([
1435            (
1436                header::CONTENT_TYPE,
1437                HeaderValue::from_static("application/yaml"),
1438            ),
1439            (
1440                HeaderName::from_static("x-client-version"),
1441                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1442            ),
1443            (
1444                header::USER_AGENT,
1445                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1446            ),
1447            (
1448                HeaderName::from_static("customized-header"),
1449                HeaderValue::from_static("some/value"),
1450            ),
1451        ]);
1452        assert_eq!(headers, expected_headers);
1453    }
1454
1455    #[tokio::test]
1456    async fn test_oauth_with_oauth2_server_uri() {
1457        let mut server = Server::new_async().await;
1458        let config_mock = create_config_mock(&mut server).await;
1459
1460        let mut auth_server = Server::new_async().await;
1461        let auth_server_path = "/some/path";
1462        let oauth_mock =
1463            create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1464                .await;
1465
1466        let mut props = HashMap::new();
1467        props.insert("credential".to_string(), "client1:secret1".to_string());
1468        props.insert(
1469            "oauth2-server-uri".to_string(),
1470            format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1471        );
1472
1473        let catalog = RestCatalog::new(
1474            RestCatalogConfig::builder()
1475                .uri(server.url())
1476                .props(props)
1477                .build(),
1478            Some(Arc::new(LocalFsStorageFactory)),
1479            Runtime::current(),
1480        );
1481
1482        let token = catalog.context().await.unwrap().client.token().await;
1483
1484        oauth_mock.assert_async().await;
1485        config_mock.assert_async().await;
1486        assert_eq!(token, Some("ey000000000000".to_string()));
1487    }
1488
1489    #[tokio::test]
1490    async fn test_config_override() {
1491        let mut server = Server::new_async().await;
1492        let mut redirect_server = Server::new_async().await;
1493        let new_uri = redirect_server.url();
1494
1495        let config_mock = server
1496            .mock("GET", "/v1/config")
1497            .with_status(200)
1498            .with_body(
1499                json!(
1500                    {
1501                        "overrides": {
1502                            "uri": new_uri,
1503                            "warehouse": "s3://iceberg-catalog",
1504                            "prefix": "ice/warehouses/my"
1505                        },
1506                        "defaults": {},
1507                    }
1508                )
1509                .to_string(),
1510            )
1511            .create_async()
1512            .await;
1513
1514        let list_ns_mock = redirect_server
1515            .mock("GET", "/v1/ice/warehouses/my/namespaces")
1516            .with_body(
1517                r#"{
1518                    "namespaces": []
1519                }"#,
1520            )
1521            .create_async()
1522            .await;
1523
1524        let catalog = RestCatalog::new(
1525            RestCatalogConfig::builder().uri(server.url()).build(),
1526            Some(Arc::new(LocalFsStorageFactory)),
1527            Runtime::current(),
1528        );
1529
1530        let _namespaces = catalog.list_namespaces(None).await.unwrap();
1531
1532        config_mock.assert_async().await;
1533        list_ns_mock.assert_async().await;
1534    }
1535
1536    #[tokio::test]
1537    async fn test_list_namespace() {
1538        let mut server = Server::new_async().await;
1539
1540        let config_mock = create_config_mock(&mut server).await;
1541
1542        let list_ns_mock = server
1543            .mock("GET", "/v1/namespaces")
1544            .with_body(
1545                r#"{
1546                "namespaces": [
1547                    ["ns1", "ns11"],
1548                    ["ns2"]
1549                ]
1550            }"#,
1551            )
1552            .create_async()
1553            .await;
1554
1555        let catalog = RestCatalog::new(
1556            RestCatalogConfig::builder().uri(server.url()).build(),
1557            Some(Arc::new(LocalFsStorageFactory)),
1558            Runtime::current(),
1559        );
1560
1561        let namespaces = catalog.list_namespaces(None).await.unwrap();
1562
1563        let expected_ns = vec![
1564            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1565            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1566        ];
1567
1568        assert_eq!(expected_ns, namespaces);
1569
1570        config_mock.assert_async().await;
1571        list_ns_mock.assert_async().await;
1572    }
1573
1574    #[tokio::test]
1575    async fn test_list_namespace_with_pagination() {
1576        let mut server = Server::new_async().await;
1577
1578        let config_mock = create_config_mock(&mut server).await;
1579
1580        let list_ns_mock_page1 = server
1581            .mock("GET", "/v1/namespaces")
1582            .with_body(
1583                r#"{
1584                "namespaces": [
1585                    ["ns1", "ns11"],
1586                    ["ns2"]
1587                ],
1588                "next-page-token": "token123"
1589            }"#,
1590            )
1591            .create_async()
1592            .await;
1593
1594        let list_ns_mock_page2 = server
1595            .mock("GET", "/v1/namespaces?pageToken=token123")
1596            .with_body(
1597                r#"{
1598                "namespaces": [
1599                    ["ns3"],
1600                    ["ns4", "ns41"]
1601                ]
1602            }"#,
1603            )
1604            .create_async()
1605            .await;
1606
1607        let catalog = RestCatalog::new(
1608            RestCatalogConfig::builder().uri(server.url()).build(),
1609            Some(Arc::new(LocalFsStorageFactory)),
1610            Runtime::current(),
1611        );
1612
1613        let namespaces = catalog.list_namespaces(None).await.unwrap();
1614
1615        let expected_ns = vec![
1616            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1617            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1618            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1619            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1620        ];
1621
1622        assert_eq!(expected_ns, namespaces);
1623
1624        config_mock.assert_async().await;
1625        list_ns_mock_page1.assert_async().await;
1626        list_ns_mock_page2.assert_async().await;
1627    }
1628
1629    #[tokio::test]
1630    async fn test_list_namespace_with_multiple_pages() {
1631        let mut server = Server::new_async().await;
1632
1633        let config_mock = create_config_mock(&mut server).await;
1634
1635        // Page 1
1636        let list_ns_mock_page1 = server
1637            .mock("GET", "/v1/namespaces")
1638            .with_body(
1639                r#"{
1640                "namespaces": [
1641                    ["ns1", "ns11"],
1642                    ["ns2"]
1643                ],
1644                "next-page-token": "page2"
1645            }"#,
1646            )
1647            .create_async()
1648            .await;
1649
1650        // Page 2
1651        let list_ns_mock_page2 = server
1652            .mock("GET", "/v1/namespaces?pageToken=page2")
1653            .with_body(
1654                r#"{
1655                "namespaces": [
1656                    ["ns3"],
1657                    ["ns4", "ns41"]
1658                ],
1659                "next-page-token": "page3"
1660            }"#,
1661            )
1662            .create_async()
1663            .await;
1664
1665        // Page 3
1666        let list_ns_mock_page3 = server
1667            .mock("GET", "/v1/namespaces?pageToken=page3")
1668            .with_body(
1669                r#"{
1670                "namespaces": [
1671                    ["ns5", "ns51", "ns511"]
1672                ],
1673                "next-page-token": "page4"
1674            }"#,
1675            )
1676            .create_async()
1677            .await;
1678
1679        // Page 4
1680        let list_ns_mock_page4 = server
1681            .mock("GET", "/v1/namespaces?pageToken=page4")
1682            .with_body(
1683                r#"{
1684                "namespaces": [
1685                    ["ns6"],
1686                    ["ns7"]
1687                ],
1688                "next-page-token": "page5"
1689            }"#,
1690            )
1691            .create_async()
1692            .await;
1693
1694        // Page 5 (final page)
1695        let list_ns_mock_page5 = server
1696            .mock("GET", "/v1/namespaces?pageToken=page5")
1697            .with_body(
1698                r#"{
1699                "namespaces": [
1700                    ["ns8", "ns81"]
1701                ]
1702            }"#,
1703            )
1704            .create_async()
1705            .await;
1706
1707        let catalog = RestCatalog::new(
1708            RestCatalogConfig::builder().uri(server.url()).build(),
1709            Some(Arc::new(LocalFsStorageFactory)),
1710            Runtime::current(),
1711        );
1712
1713        let namespaces = catalog.list_namespaces(None).await.unwrap();
1714
1715        let expected_ns = vec![
1716            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1717            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1718            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1719            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1720            NamespaceIdent::from_vec(vec![
1721                "ns5".to_string(),
1722                "ns51".to_string(),
1723                "ns511".to_string(),
1724            ])
1725            .unwrap(),
1726            NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1727            NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1728            NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1729        ];
1730
1731        assert_eq!(expected_ns, namespaces);
1732
1733        // Verify all page requests were made
1734        config_mock.assert_async().await;
1735        list_ns_mock_page1.assert_async().await;
1736        list_ns_mock_page2.assert_async().await;
1737        list_ns_mock_page3.assert_async().await;
1738        list_ns_mock_page4.assert_async().await;
1739        list_ns_mock_page5.assert_async().await;
1740    }
1741
1742    #[tokio::test]
1743    async fn test_create_namespace() {
1744        let mut server = Server::new_async().await;
1745
1746        let config_mock = create_config_mock(&mut server).await;
1747
1748        let create_ns_mock = server
1749            .mock("POST", "/v1/namespaces")
1750            .with_body(
1751                r#"{
1752                "namespace": [ "ns1", "ns11"],
1753                "properties" : {
1754                    "key1": "value1"
1755                }
1756            }"#,
1757            )
1758            .create_async()
1759            .await;
1760
1761        let catalog = RestCatalog::new(
1762            RestCatalogConfig::builder().uri(server.url()).build(),
1763            Some(Arc::new(LocalFsStorageFactory)),
1764            Runtime::current(),
1765        );
1766
1767        let namespaces = catalog
1768            .create_namespace(
1769                &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1770                HashMap::from([("key1".to_string(), "value1".to_string())]),
1771            )
1772            .await
1773            .unwrap();
1774
1775        let expected_ns = Namespace::with_properties(
1776            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1777            HashMap::from([("key1".to_string(), "value1".to_string())]),
1778        );
1779
1780        assert_eq!(expected_ns, namespaces);
1781
1782        config_mock.assert_async().await;
1783        create_ns_mock.assert_async().await;
1784    }
1785
1786    #[tokio::test]
1787    async fn test_get_namespace() {
1788        let mut server = Server::new_async().await;
1789
1790        let config_mock = create_config_mock(&mut server).await;
1791
1792        let get_ns_mock = server
1793            .mock("GET", "/v1/namespaces/ns1")
1794            .with_body(
1795                r#"{
1796                "namespace": [ "ns1"],
1797                "properties" : {
1798                    "key1": "value1"
1799                }
1800            }"#,
1801            )
1802            .create_async()
1803            .await;
1804
1805        let catalog = RestCatalog::new(
1806            RestCatalogConfig::builder().uri(server.url()).build(),
1807            Some(Arc::new(LocalFsStorageFactory)),
1808            Runtime::current(),
1809        );
1810
1811        let namespaces = catalog
1812            .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1813            .await
1814            .unwrap();
1815
1816        let expected_ns = Namespace::with_properties(
1817            NamespaceIdent::new("ns1".to_string()),
1818            HashMap::from([("key1".to_string(), "value1".to_string())]),
1819        );
1820
1821        assert_eq!(expected_ns, namespaces);
1822
1823        config_mock.assert_async().await;
1824        get_ns_mock.assert_async().await;
1825    }
1826
1827    #[tokio::test]
1828    async fn check_namespace_exists() {
1829        let mut server = Server::new_async().await;
1830
1831        let config_mock = create_config_mock(&mut server).await;
1832
1833        let get_ns_mock = server
1834            .mock("HEAD", "/v1/namespaces/ns1")
1835            .with_status(204)
1836            .create_async()
1837            .await;
1838
1839        let catalog = RestCatalog::new(
1840            RestCatalogConfig::builder().uri(server.url()).build(),
1841            Some(Arc::new(LocalFsStorageFactory)),
1842            Runtime::current(),
1843        );
1844
1845        assert!(
1846            catalog
1847                .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1848                .await
1849                .unwrap()
1850        );
1851
1852        config_mock.assert_async().await;
1853        get_ns_mock.assert_async().await;
1854    }
1855
1856    #[tokio::test]
1857    async fn test_drop_namespace() {
1858        let mut server = Server::new_async().await;
1859
1860        let config_mock = create_config_mock(&mut server).await;
1861
1862        let drop_ns_mock = server
1863            .mock("DELETE", "/v1/namespaces/ns1")
1864            .with_status(204)
1865            .create_async()
1866            .await;
1867
1868        let catalog = RestCatalog::new(
1869            RestCatalogConfig::builder().uri(server.url()).build(),
1870            Some(Arc::new(LocalFsStorageFactory)),
1871            Runtime::current(),
1872        );
1873
1874        catalog
1875            .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1876            .await
1877            .unwrap();
1878
1879        config_mock.assert_async().await;
1880        drop_ns_mock.assert_async().await;
1881    }
1882
1883    #[tokio::test]
1884    async fn test_list_tables() {
1885        let mut server = Server::new_async().await;
1886
1887        let config_mock = create_config_mock(&mut server).await;
1888
1889        let list_tables_mock = server
1890            .mock("GET", "/v1/namespaces/ns1/tables")
1891            .with_status(200)
1892            .with_body(
1893                r#"{
1894                "identifiers": [
1895                    {
1896                        "namespace": ["ns1"],
1897                        "name": "table1"
1898                    },
1899                    {
1900                        "namespace": ["ns1"],
1901                        "name": "table2"
1902                    }
1903                ]
1904            }"#,
1905            )
1906            .create_async()
1907            .await;
1908
1909        let catalog = RestCatalog::new(
1910            RestCatalogConfig::builder().uri(server.url()).build(),
1911            Some(Arc::new(LocalFsStorageFactory)),
1912            Runtime::current(),
1913        );
1914
1915        let tables = catalog
1916            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1917            .await
1918            .unwrap();
1919
1920        let expected_tables = vec![
1921            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1922            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1923        ];
1924
1925        assert_eq!(tables, expected_tables);
1926
1927        config_mock.assert_async().await;
1928        list_tables_mock.assert_async().await;
1929    }
1930
1931    #[tokio::test]
1932    async fn test_list_tables_with_pagination() {
1933        let mut server = Server::new_async().await;
1934
1935        let config_mock = create_config_mock(&mut server).await;
1936
1937        let list_tables_mock_page1 = server
1938            .mock("GET", "/v1/namespaces/ns1/tables")
1939            .with_status(200)
1940            .with_body(
1941                r#"{
1942                "identifiers": [
1943                    {
1944                        "namespace": ["ns1"],
1945                        "name": "table1"
1946                    },
1947                    {
1948                        "namespace": ["ns1"],
1949                        "name": "table2"
1950                    }
1951                ],
1952                "next-page-token": "token456"
1953            }"#,
1954            )
1955            .create_async()
1956            .await;
1957
1958        let list_tables_mock_page2 = server
1959            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1960            .with_status(200)
1961            .with_body(
1962                r#"{
1963                "identifiers": [
1964                    {
1965                        "namespace": ["ns1"],
1966                        "name": "table3"
1967                    },
1968                    {
1969                        "namespace": ["ns1"],
1970                        "name": "table4"
1971                    }
1972                ]
1973            }"#,
1974            )
1975            .create_async()
1976            .await;
1977
1978        let catalog = RestCatalog::new(
1979            RestCatalogConfig::builder().uri(server.url()).build(),
1980            Some(Arc::new(LocalFsStorageFactory)),
1981            Runtime::current(),
1982        );
1983
1984        let tables = catalog
1985            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1986            .await
1987            .unwrap();
1988
1989        let expected_tables = vec![
1990            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1991            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1992            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1993            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1994        ];
1995
1996        assert_eq!(tables, expected_tables);
1997
1998        config_mock.assert_async().await;
1999        list_tables_mock_page1.assert_async().await;
2000        list_tables_mock_page2.assert_async().await;
2001    }
2002
2003    #[tokio::test]
2004    async fn test_list_tables_with_multiple_pages() {
2005        let mut server = Server::new_async().await;
2006
2007        let config_mock = create_config_mock(&mut server).await;
2008
2009        // Page 1
2010        let list_tables_mock_page1 = server
2011            .mock("GET", "/v1/namespaces/ns1/tables")
2012            .with_status(200)
2013            .with_body(
2014                r#"{
2015                "identifiers": [
2016                    {
2017                        "namespace": ["ns1"],
2018                        "name": "table1"
2019                    },
2020                    {
2021                        "namespace": ["ns1"],
2022                        "name": "table2"
2023                    }
2024                ],
2025                "next-page-token": "page2"
2026            }"#,
2027            )
2028            .create_async()
2029            .await;
2030
2031        // Page 2
2032        let list_tables_mock_page2 = server
2033            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
2034            .with_status(200)
2035            .with_body(
2036                r#"{
2037                "identifiers": [
2038                    {
2039                        "namespace": ["ns1"],
2040                        "name": "table3"
2041                    },
2042                    {
2043                        "namespace": ["ns1"],
2044                        "name": "table4"
2045                    }
2046                ],
2047                "next-page-token": "page3"
2048            }"#,
2049            )
2050            .create_async()
2051            .await;
2052
2053        // Page 3
2054        let list_tables_mock_page3 = server
2055            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
2056            .with_status(200)
2057            .with_body(
2058                r#"{
2059                "identifiers": [
2060                    {
2061                        "namespace": ["ns1"],
2062                        "name": "table5"
2063                    }
2064                ],
2065                "next-page-token": "page4"
2066            }"#,
2067            )
2068            .create_async()
2069            .await;
2070
2071        // Page 4
2072        let list_tables_mock_page4 = server
2073            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
2074            .with_status(200)
2075            .with_body(
2076                r#"{
2077                "identifiers": [
2078                    {
2079                        "namespace": ["ns1"],
2080                        "name": "table6"
2081                    },
2082                    {
2083                        "namespace": ["ns1"],
2084                        "name": "table7"
2085                    }
2086                ],
2087                "next-page-token": "page5"
2088            }"#,
2089            )
2090            .create_async()
2091            .await;
2092
2093        // Page 5 (final page)
2094        let list_tables_mock_page5 = server
2095            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
2096            .with_status(200)
2097            .with_body(
2098                r#"{
2099                "identifiers": [
2100                    {
2101                        "namespace": ["ns1"],
2102                        "name": "table8"
2103                    }
2104                ]
2105            }"#,
2106            )
2107            .create_async()
2108            .await;
2109
2110        let catalog = RestCatalog::new(
2111            RestCatalogConfig::builder().uri(server.url()).build(),
2112            Some(Arc::new(LocalFsStorageFactory)),
2113            Runtime::current(),
2114        );
2115
2116        let tables = catalog
2117            .list_tables(&NamespaceIdent::new("ns1".to_string()))
2118            .await
2119            .unwrap();
2120
2121        let expected_tables = vec![
2122            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2123            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2124            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2125            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2126            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2127            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2128            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2129            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2130        ];
2131
2132        assert_eq!(tables, expected_tables);
2133
2134        // Verify all page requests were made
2135        config_mock.assert_async().await;
2136        list_tables_mock_page1.assert_async().await;
2137        list_tables_mock_page2.assert_async().await;
2138        list_tables_mock_page3.assert_async().await;
2139        list_tables_mock_page4.assert_async().await;
2140        list_tables_mock_page5.assert_async().await;
2141    }
2142
2143    #[tokio::test]
2144    async fn test_drop_tables() {
2145        let mut server = Server::new_async().await;
2146
2147        let config_mock = create_config_mock(&mut server).await;
2148
2149        let delete_table_mock = server
2150            .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2151            .with_status(204)
2152            .create_async()
2153            .await;
2154
2155        let catalog = RestCatalog::new(
2156            RestCatalogConfig::builder().uri(server.url()).build(),
2157            Some(Arc::new(LocalFsStorageFactory)),
2158            Runtime::current(),
2159        );
2160
2161        catalog
2162            .drop_table(&TableIdent::new(
2163                NamespaceIdent::new("ns1".to_string()),
2164                "table1".to_string(),
2165            ))
2166            .await
2167            .unwrap();
2168
2169        config_mock.assert_async().await;
2170        delete_table_mock.assert_async().await;
2171    }
2172
2173    #[tokio::test]
2174    async fn test_check_table_exists() {
2175        let mut server = Server::new_async().await;
2176
2177        let config_mock = create_config_mock(&mut server).await;
2178
2179        let check_table_exists_mock = server
2180            .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2181            .with_status(204)
2182            .create_async()
2183            .await;
2184
2185        let catalog = RestCatalog::new(
2186            RestCatalogConfig::builder().uri(server.url()).build(),
2187            Some(Arc::new(LocalFsStorageFactory)),
2188            Runtime::current(),
2189        );
2190
2191        assert!(
2192            catalog
2193                .table_exists(&TableIdent::new(
2194                    NamespaceIdent::new("ns1".to_string()),
2195                    "table1".to_string(),
2196                ))
2197                .await
2198                .unwrap()
2199        );
2200
2201        config_mock.assert_async().await;
2202        check_table_exists_mock.assert_async().await;
2203    }
2204
2205    #[tokio::test]
2206    async fn test_rename_table() {
2207        let mut server = Server::new_async().await;
2208
2209        let config_mock = create_config_mock(&mut server).await;
2210
2211        let rename_table_mock = server
2212            .mock("POST", "/v1/tables/rename")
2213            .with_status(204)
2214            .create_async()
2215            .await;
2216
2217        let catalog = RestCatalog::new(
2218            RestCatalogConfig::builder().uri(server.url()).build(),
2219            Some(Arc::new(LocalFsStorageFactory)),
2220            Runtime::current(),
2221        );
2222
2223        catalog
2224            .rename_table(
2225                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2226                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2227            )
2228            .await
2229            .unwrap();
2230
2231        config_mock.assert_async().await;
2232        rename_table_mock.assert_async().await;
2233    }
2234
2235    #[tokio::test]
2236    async fn test_load_table() {
2237        let mut server = Server::new_async().await;
2238
2239        let config_mock = create_config_mock(&mut server).await;
2240
2241        let rename_table_mock = server
2242            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2243            .with_status(200)
2244            .with_body_from_file(format!(
2245                "{}/testdata/{}",
2246                env!("CARGO_MANIFEST_DIR"),
2247                "load_table_response.json"
2248            ))
2249            .create_async()
2250            .await;
2251
2252        let catalog = RestCatalog::new(
2253            RestCatalogConfig::builder().uri(server.url()).build(),
2254            Some(Arc::new(LocalFsStorageFactory)),
2255            Runtime::current(),
2256        );
2257
2258        let table = catalog
2259            .load_table(&TableIdent::new(
2260                NamespaceIdent::new("ns1".to_string()),
2261                "test1".to_string(),
2262            ))
2263            .await
2264            .unwrap();
2265
2266        assert_eq!(
2267            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2268            table.identifier()
2269        );
2270        assert_eq!(
2271            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2272            table.metadata_location().unwrap()
2273        );
2274        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2275        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2276        assert_eq!(
2277            uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2278            table.metadata().uuid()
2279        );
2280        assert_eq!(
2281            Utc.timestamp_millis_opt(1646787054459).unwrap(),
2282            table.metadata().last_updated_timestamp().unwrap()
2283        );
2284        assert_eq!(
2285            vec![&Arc::new(
2286                Schema::builder()
2287                    .with_fields(vec![
2288                        NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2289                        NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2290                            .into(),
2291                    ])
2292                    .build()
2293                    .unwrap()
2294            )],
2295            table.metadata().schemas_iter().collect::<Vec<_>>()
2296        );
2297        assert_eq!(
2298            &HashMap::from([
2299                ("owner".to_string(), "bryan".to_string()),
2300                (
2301                    "write.metadata.compression-codec".to_string(),
2302                    "gzip".to_string()
2303                )
2304            ]),
2305            table.metadata().properties()
2306        );
2307        assert_eq!(vec![&Arc::new(Snapshot::builder()
2308            .with_snapshot_id(3497810964824022504)
2309            .with_timestamp_ms(1646787054459)
2310            .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2311            .with_sequence_number(0)
2312            .with_schema_id(0)
2313            .with_summary(Summary {
2314                operation: Operation::Append,
2315                additional_properties: HashMap::from_iter([
2316                    ("spark.app.id", "local-1646787004168"),
2317                    ("added-data-files", "1"),
2318                    ("added-records", "1"),
2319                    ("added-files-size", "697"),
2320                    ("changed-partition-count", "1"),
2321                    ("total-records", "1"),
2322                    ("total-files-size", "697"),
2323                    ("total-data-files", "1"),
2324                    ("total-delete-files", "0"),
2325                    ("total-position-deletes", "0"),
2326                    ("total-equality-deletes", "0")
2327                ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2328            }).build()
2329        )], table.metadata().snapshots().collect::<Vec<_>>());
2330        assert_eq!(
2331            &[SnapshotLog {
2332                timestamp_ms: 1646787054459,
2333                snapshot_id: 3497810964824022504,
2334            }],
2335            table.metadata().history()
2336        );
2337        assert_eq!(
2338            vec![&Arc::new(SortOrder {
2339                order_id: 0,
2340                fields: vec![],
2341            })],
2342            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2343        );
2344
2345        config_mock.assert_async().await;
2346        rename_table_mock.assert_async().await;
2347    }
2348
2349    #[tokio::test]
2350    async fn test_load_table_404() {
2351        let mut server = Server::new_async().await;
2352
2353        let config_mock = create_config_mock(&mut server).await;
2354
2355        let rename_table_mock = server
2356            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2357            .with_status(404)
2358            .with_body(r#"
2359{
2360    "error": {
2361        "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2362        "type": "NoSuchNamespaceErrorException",
2363        "code": 404
2364    }
2365}
2366            "#)
2367            .create_async()
2368            .await;
2369
2370        let catalog = RestCatalog::new(
2371            RestCatalogConfig::builder().uri(server.url()).build(),
2372            Some(Arc::new(LocalFsStorageFactory)),
2373            Runtime::current(),
2374        );
2375
2376        let table = catalog
2377            .load_table(&TableIdent::new(
2378                NamespaceIdent::new("ns1".to_string()),
2379                "test1".to_string(),
2380            ))
2381            .await;
2382
2383        assert!(table.is_err());
2384        assert!(table.err().unwrap().message().contains("does not exist"));
2385
2386        config_mock.assert_async().await;
2387        rename_table_mock.assert_async().await;
2388    }
2389
2390    #[tokio::test]
2391    async fn test_create_table() {
2392        let mut server = Server::new_async().await;
2393
2394        let config_mock = create_config_mock(&mut server).await;
2395
2396        let create_table_mock = server
2397            .mock("POST", "/v1/namespaces/ns1/tables")
2398            .with_status(200)
2399            .with_body_from_file(format!(
2400                "{}/testdata/{}",
2401                env!("CARGO_MANIFEST_DIR"),
2402                "create_table_response.json"
2403            ))
2404            .create_async()
2405            .await;
2406
2407        let catalog = RestCatalog::new(
2408            RestCatalogConfig::builder().uri(server.url()).build(),
2409            Some(Arc::new(LocalFsStorageFactory)),
2410            Runtime::current(),
2411        );
2412
2413        let table_creation = TableCreation::builder()
2414            .name("test1".to_string())
2415            .schema(
2416                Schema::builder()
2417                    .with_fields(vec![
2418                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2419                            .into(),
2420                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2421                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2422                            .into(),
2423                    ])
2424                    .with_schema_id(1)
2425                    .with_identifier_field_ids(vec![2])
2426                    .build()
2427                    .unwrap(),
2428            )
2429            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2430            .partition_spec(
2431                UnboundPartitionSpec::builder()
2432                    .add_partition_fields(vec![
2433                        UnboundPartitionField::builder()
2434                            .source_id(1)
2435                            .transform(Transform::Truncate(3))
2436                            .name("id".to_string())
2437                            .build(),
2438                    ])
2439                    .unwrap()
2440                    .build(),
2441            )
2442            .sort_order(
2443                SortOrder::builder()
2444                    .with_sort_field(
2445                        SortField::builder()
2446                            .source_id(2)
2447                            .transform(Transform::Identity)
2448                            .direction(SortDirection::Ascending)
2449                            .null_order(NullOrder::First)
2450                            .build(),
2451                    )
2452                    .build_unbound()
2453                    .unwrap(),
2454            )
2455            .build();
2456
2457        let table = catalog
2458            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2459            .await
2460            .unwrap();
2461
2462        assert_eq!(
2463            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2464            table.identifier()
2465        );
2466        assert_eq!(
2467            "s3://warehouse/database/table/metadata.json",
2468            table.metadata_location().unwrap()
2469        );
2470        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2471        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2472        assert_eq!(
2473            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2474            table.metadata().uuid()
2475        );
2476        assert_eq!(
2477            1657810967051,
2478            table
2479                .metadata()
2480                .last_updated_timestamp()
2481                .unwrap()
2482                .timestamp_millis()
2483        );
2484        assert_eq!(
2485            vec![&Arc::new(
2486                Schema::builder()
2487                    .with_fields(vec![
2488                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2489                            .into(),
2490                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2491                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2492                            .into(),
2493                    ])
2494                    .with_schema_id(0)
2495                    .with_identifier_field_ids(vec![2])
2496                    .build()
2497                    .unwrap()
2498            )],
2499            table.metadata().schemas_iter().collect::<Vec<_>>()
2500        );
2501        assert_eq!(
2502            &HashMap::from([
2503                (
2504                    "write.delete.parquet.compression-codec".to_string(),
2505                    "zstd".to_string()
2506                ),
2507                (
2508                    "write.metadata.compression-codec".to_string(),
2509                    "gzip".to_string()
2510                ),
2511                (
2512                    "write.summary.partition-limit".to_string(),
2513                    "100".to_string()
2514                ),
2515                (
2516                    "write.parquet.compression-codec".to_string(),
2517                    "zstd".to_string()
2518                ),
2519            ]),
2520            table.metadata().properties()
2521        );
2522        assert!(table.metadata().current_snapshot().is_none());
2523        assert!(table.metadata().history().is_empty());
2524        assert_eq!(
2525            vec![&Arc::new(SortOrder {
2526                order_id: 0,
2527                fields: vec![],
2528            })],
2529            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2530        );
2531
2532        config_mock.assert_async().await;
2533        create_table_mock.assert_async().await;
2534    }
2535
2536    #[tokio::test]
2537    async fn test_create_table_409() {
2538        let mut server = Server::new_async().await;
2539
2540        let config_mock = create_config_mock(&mut server).await;
2541
2542        let create_table_mock = server
2543            .mock("POST", "/v1/namespaces/ns1/tables")
2544            .with_status(409)
2545            .with_body(r#"
2546{
2547    "error": {
2548        "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2549        "type": "AlreadyExistsException",
2550        "code": 409
2551    }
2552}
2553            "#)
2554            .create_async()
2555            .await;
2556
2557        let catalog = RestCatalog::new(
2558            RestCatalogConfig::builder().uri(server.url()).build(),
2559            Some(Arc::new(LocalFsStorageFactory)),
2560            Runtime::current(),
2561        );
2562
2563        let table_creation = TableCreation::builder()
2564            .name("test1".to_string())
2565            .schema(
2566                Schema::builder()
2567                    .with_fields(vec![
2568                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2569                            .into(),
2570                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2571                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2572                            .into(),
2573                    ])
2574                    .with_schema_id(1)
2575                    .with_identifier_field_ids(vec![2])
2576                    .build()
2577                    .unwrap(),
2578            )
2579            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2580            .build();
2581
2582        let table_result = catalog
2583            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2584            .await;
2585
2586        assert!(table_result.is_err());
2587        assert!(
2588            table_result
2589                .err()
2590                .unwrap()
2591                .message()
2592                .contains("already exists")
2593        );
2594
2595        config_mock.assert_async().await;
2596        create_table_mock.assert_async().await;
2597    }
2598
2599    #[tokio::test]
2600    async fn test_update_table() {
2601        let mut server = Server::new_async().await;
2602
2603        let config_mock = create_config_mock(&mut server).await;
2604
2605        let load_table_mock = server
2606            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2607            .with_status(200)
2608            .with_body_from_file(format!(
2609                "{}/testdata/{}",
2610                env!("CARGO_MANIFEST_DIR"),
2611                "load_table_response.json"
2612            ))
2613            .create_async()
2614            .await;
2615
2616        let update_table_mock = server
2617            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2618            .with_status(200)
2619            .with_body_from_file(format!(
2620                "{}/testdata/{}",
2621                env!("CARGO_MANIFEST_DIR"),
2622                "update_table_response.json"
2623            ))
2624            .create_async()
2625            .await;
2626
2627        let catalog = RestCatalog::new(
2628            RestCatalogConfig::builder().uri(server.url()).build(),
2629            Some(Arc::new(LocalFsStorageFactory)),
2630            Runtime::current(),
2631        );
2632
2633        let table1 = {
2634            let file = File::open(format!(
2635                "{}/testdata/{}",
2636                env!("CARGO_MANIFEST_DIR"),
2637                "create_table_response.json"
2638            ))
2639            .unwrap();
2640            let reader = BufReader::new(file);
2641            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2642
2643            Table::builder()
2644                .metadata(resp.metadata)
2645                .metadata_location(resp.metadata_location.unwrap())
2646                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2647                .file_io(FileIO::new_with_fs())
2648                .runtime(test_runtime())
2649                .build()
2650                .unwrap()
2651        };
2652
2653        let tx = Transaction::new(&table1);
2654        let table = tx
2655            .upgrade_table_version()
2656            .set_format_version(FormatVersion::V2)
2657            .apply(tx)
2658            .unwrap()
2659            .commit(&catalog)
2660            .await
2661            .unwrap();
2662
2663        assert_eq!(
2664            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2665            table.identifier()
2666        );
2667        assert_eq!(
2668            "s3://warehouse/database/table/metadata.json",
2669            table.metadata_location().unwrap()
2670        );
2671        assert_eq!(FormatVersion::V2, table.metadata().format_version());
2672        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2673        assert_eq!(
2674            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2675            table.metadata().uuid()
2676        );
2677        assert_eq!(
2678            1657810967051,
2679            table
2680                .metadata()
2681                .last_updated_timestamp()
2682                .unwrap()
2683                .timestamp_millis()
2684        );
2685        assert_eq!(
2686            vec![&Arc::new(
2687                Schema::builder()
2688                    .with_fields(vec![
2689                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2690                            .into(),
2691                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2692                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2693                            .into(),
2694                    ])
2695                    .with_schema_id(0)
2696                    .with_identifier_field_ids(vec![2])
2697                    .build()
2698                    .unwrap()
2699            )],
2700            table.metadata().schemas_iter().collect::<Vec<_>>()
2701        );
2702        assert_eq!(
2703            &HashMap::from([
2704                (
2705                    "write.delete.parquet.compression-codec".to_string(),
2706                    "zstd".to_string()
2707                ),
2708                (
2709                    "write.metadata.compression-codec".to_string(),
2710                    "gzip".to_string()
2711                ),
2712                (
2713                    "write.summary.partition-limit".to_string(),
2714                    "100".to_string()
2715                ),
2716                (
2717                    "write.parquet.compression-codec".to_string(),
2718                    "zstd".to_string()
2719                ),
2720            ]),
2721            table.metadata().properties()
2722        );
2723        assert!(table.metadata().current_snapshot().is_none());
2724        assert!(table.metadata().history().is_empty());
2725        assert_eq!(
2726            vec![&Arc::new(SortOrder {
2727                order_id: 0,
2728                fields: vec![],
2729            })],
2730            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2731        );
2732
2733        config_mock.assert_async().await;
2734        update_table_mock.assert_async().await;
2735        load_table_mock.assert_async().await
2736    }
2737
2738    #[tokio::test]
2739    async fn test_update_table_404() {
2740        let mut server = Server::new_async().await;
2741
2742        let config_mock = create_config_mock(&mut server).await;
2743
2744        let load_table_mock = server
2745            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2746            .with_status(200)
2747            .with_body_from_file(format!(
2748                "{}/testdata/{}",
2749                env!("CARGO_MANIFEST_DIR"),
2750                "load_table_response.json"
2751            ))
2752            .create_async()
2753            .await;
2754
2755        let update_table_mock = server
2756            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2757            .with_status(404)
2758            .with_body(
2759                r#"
2760{
2761    "error": {
2762        "message": "The given table does not exist",
2763        "type": "NoSuchTableException",
2764        "code": 404
2765    }
2766}
2767            "#,
2768            )
2769            .create_async()
2770            .await;
2771
2772        let catalog = RestCatalog::new(
2773            RestCatalogConfig::builder().uri(server.url()).build(),
2774            Some(Arc::new(LocalFsStorageFactory)),
2775            Runtime::current(),
2776        );
2777
2778        let table1 = {
2779            let file = File::open(format!(
2780                "{}/testdata/{}",
2781                env!("CARGO_MANIFEST_DIR"),
2782                "create_table_response.json"
2783            ))
2784            .unwrap();
2785            let reader = BufReader::new(file);
2786            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2787
2788            Table::builder()
2789                .metadata(resp.metadata)
2790                .metadata_location(resp.metadata_location.unwrap())
2791                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2792                .file_io(FileIO::new_with_fs())
2793                .runtime(test_runtime())
2794                .build()
2795                .unwrap()
2796        };
2797
2798        let tx = Transaction::new(&table1);
2799        let table_result = tx
2800            .upgrade_table_version()
2801            .set_format_version(FormatVersion::V2)
2802            .apply(tx)
2803            .unwrap()
2804            .commit(&catalog)
2805            .await;
2806
2807        assert!(table_result.is_err());
2808        assert!(
2809            table_result
2810                .err()
2811                .unwrap()
2812                .message()
2813                .contains("does not exist")
2814        );
2815
2816        config_mock.assert_async().await;
2817        update_table_mock.assert_async().await;
2818        load_table_mock.assert_async().await;
2819    }
2820
2821    #[tokio::test]
2822    async fn test_register_table() {
2823        let mut server = Server::new_async().await;
2824
2825        let config_mock = create_config_mock(&mut server).await;
2826
2827        let register_table_mock = server
2828            .mock("POST", "/v1/namespaces/ns1/register")
2829            .with_status(200)
2830            .with_body_from_file(format!(
2831                "{}/testdata/{}",
2832                env!("CARGO_MANIFEST_DIR"),
2833                "load_table_response.json"
2834            ))
2835            .create_async()
2836            .await;
2837
2838        let catalog = RestCatalog::new(
2839            RestCatalogConfig::builder().uri(server.url()).build(),
2840            Some(Arc::new(LocalFsStorageFactory)),
2841            Runtime::current(),
2842        );
2843        let table_ident =
2844            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2845        let metadata_location = String::from(
2846            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2847        );
2848
2849        let table = catalog
2850            .register_table(&table_ident, metadata_location)
2851            .await
2852            .unwrap();
2853
2854        assert_eq!(
2855            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2856            table.identifier()
2857        );
2858        assert_eq!(
2859            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2860            table.metadata_location().unwrap()
2861        );
2862
2863        config_mock.assert_async().await;
2864        register_table_mock.assert_async().await;
2865    }
2866
2867    #[tokio::test]
2868    async fn test_register_table_404() {
2869        let mut server = Server::new_async().await;
2870
2871        let config_mock = create_config_mock(&mut server).await;
2872
2873        let register_table_mock = server
2874            .mock("POST", "/v1/namespaces/ns1/register")
2875            .with_status(404)
2876            .with_body(
2877                r#"
2878{
2879    "error": {
2880        "message": "The namespace specified does not exist",
2881        "type": "NoSuchNamespaceErrorException",
2882        "code": 404
2883    }
2884}
2885            "#,
2886            )
2887            .create_async()
2888            .await;
2889
2890        let catalog = RestCatalog::new(
2891            RestCatalogConfig::builder().uri(server.url()).build(),
2892            Some(Arc::new(LocalFsStorageFactory)),
2893            Runtime::current(),
2894        );
2895
2896        let table_ident =
2897            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2898        let metadata_location = String::from(
2899            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2900        );
2901        let table = catalog
2902            .register_table(&table_ident, metadata_location)
2903            .await;
2904
2905        assert!(table.is_err());
2906        assert!(table.err().unwrap().message().contains("does not exist"));
2907
2908        config_mock.assert_async().await;
2909        register_table_mock.assert_async().await;
2910    }
2911
2912    #[tokio::test]
2913    async fn test_create_rest_catalog() {
2914        let builder = RestCatalogBuilder::default().with_client(Client::new());
2915
2916        let catalog = builder
2917            .load(
2918                "test",
2919                HashMap::from([
2920                    (
2921                        REST_CATALOG_PROP_URI.to_string(),
2922                        "http://localhost:8080".to_string(),
2923                    ),
2924                    ("a".to_string(), "b".to_string()),
2925                ]),
2926            )
2927            .await;
2928
2929        assert!(catalog.is_ok());
2930
2931        let catalog_config = catalog.unwrap().user_config;
2932        assert_eq!(catalog_config.name.as_deref(), Some("test"));
2933        assert_eq!(catalog_config.uri, "http://localhost:8080");
2934        assert_eq!(catalog_config.warehouse, None);
2935        assert!(catalog_config.client.is_some());
2936
2937        assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2938        assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2939    }
2940
2941    #[tokio::test]
2942    async fn test_create_rest_catalog_no_uri() {
2943        let builder = RestCatalogBuilder::default();
2944
2945        let catalog = builder
2946            .load(
2947                "test",
2948                HashMap::from([(
2949                    REST_CATALOG_PROP_WAREHOUSE.to_string(),
2950                    "s3://warehouse".to_string(),
2951                )]),
2952            )
2953            .await;
2954
2955        assert!(catalog.is_err());
2956        if let Err(err) = catalog {
2957            assert_eq!(err.kind(), ErrorKind::DataInvalid);
2958            assert_eq!(err.message(), "Catalog uri is required");
2959        }
2960    }
2961}