iceberg_catalog_rest/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains the iceberg REST catalog implementation.
19
20use std::collections::HashMap;
21use std::future::Future;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
27use iceberg::table::Table;
28use iceberg::{
29    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30    TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34    HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41    HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44    CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45    CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46    NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49/// REST catalog URI
50pub const REST_CATALOG_PROP_URI: &str = "uri";
51/// REST catalog warehouse location
52pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53/// Disable header redaction in error logs (defaults to false for security)
54pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60/// Builder for [`RestCatalog`].
61#[derive(Debug)]
62pub struct RestCatalogBuilder {
63    config: RestCatalogConfig,
64    storage_factory: Option<Arc<dyn StorageFactory>>,
65}
66
67impl Default for RestCatalogBuilder {
68    fn default() -> Self {
69        Self {
70            config: RestCatalogConfig {
71                name: None,
72                uri: "".to_string(),
73                warehouse: None,
74                props: HashMap::new(),
75                client: None,
76            },
77            storage_factory: None,
78        }
79    }
80}
81
82impl CatalogBuilder for RestCatalogBuilder {
83    type C = RestCatalog;
84
85    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
86        self.storage_factory = Some(storage_factory);
87        self
88    }
89
90    fn load(
91        mut self,
92        name: impl Into<String>,
93        props: HashMap<String, String>,
94    ) -> impl Future<Output = Result<Self::C>> + Send {
95        self.config.name = Some(name.into());
96
97        if props.contains_key(REST_CATALOG_PROP_URI) {
98            self.config.uri = props
99                .get(REST_CATALOG_PROP_URI)
100                .cloned()
101                .unwrap_or_default();
102        }
103
104        if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
105            self.config.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
106        }
107
108        // Collect other remaining properties
109        self.config.props = props
110            .into_iter()
111            .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
112            .collect();
113
114        let result = {
115            if self.config.name.is_none() {
116                Err(Error::new(
117                    ErrorKind::DataInvalid,
118                    "Catalog name is required",
119                ))
120            } else if self.config.uri.is_empty() {
121                Err(Error::new(
122                    ErrorKind::DataInvalid,
123                    "Catalog uri is required",
124                ))
125            } else {
126                Ok(RestCatalog::new(self.config, self.storage_factory))
127            }
128        };
129
130        std::future::ready(result)
131    }
132}
133
134impl RestCatalogBuilder {
135    /// Configures the catalog with a custom HTTP client.
136    pub fn with_client(mut self, client: Client) -> Self {
137        self.config.client = Some(client);
138        self
139    }
140}
141
142/// Rest catalog configuration.
143#[derive(Clone, Debug, TypedBuilder)]
144pub(crate) struct RestCatalogConfig {
145    #[builder(default, setter(strip_option))]
146    name: Option<String>,
147
148    uri: String,
149
150    #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
151    warehouse: Option<String>,
152
153    #[builder(default)]
154    props: HashMap<String, String>,
155
156    #[builder(default)]
157    client: Option<Client>,
158}
159
160impl RestCatalogConfig {
161    fn url_prefixed(&self, parts: &[&str]) -> String {
162        [&self.uri, PATH_V1]
163            .into_iter()
164            .chain(self.props.get("prefix").map(|s| &**s))
165            .chain(parts.iter().cloned())
166            .join("/")
167    }
168
169    fn config_endpoint(&self) -> String {
170        [&self.uri, PATH_V1, "config"].join("/")
171    }
172
173    pub(crate) fn get_token_endpoint(&self) -> String {
174        if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
175            oauth2_uri.to_string()
176        } else {
177            [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
178        }
179    }
180
181    fn namespaces_endpoint(&self) -> String {
182        self.url_prefixed(&["namespaces"])
183    }
184
185    fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
186        self.url_prefixed(&["namespaces", &ns.to_url_string()])
187    }
188
189    fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
190        self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
191    }
192
193    fn rename_table_endpoint(&self) -> String {
194        self.url_prefixed(&["tables", "rename"])
195    }
196
197    fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
198        self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
199    }
200
201    fn table_endpoint(&self, table: &TableIdent) -> String {
202        self.url_prefixed(&[
203            "namespaces",
204            &table.namespace.to_url_string(),
205            "tables",
206            &table.name,
207        ])
208    }
209
210    /// Get the client from the config.
211    pub(crate) fn client(&self) -> Option<Client> {
212        self.client.clone()
213    }
214
215    /// Get the token from the config.
216    ///
217    /// The client can use this token to send requests.
218    pub(crate) fn token(&self) -> Option<String> {
219        self.props.get("token").cloned()
220    }
221
222    /// Get the credentials from the config. The client can use these credentials to fetch a new
223    /// token.
224    ///
225    /// ## Output
226    ///
227    /// - `None`: No credential is set.
228    /// - `Some(None, client_secret)`: No client_id is set, use client_secret directly.
229    /// - `Some(Some(client_id), client_secret)`: Both client_id and client_secret are set.
230    pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
231        let cred = self.props.get("credential")?;
232
233        match cred.split_once(':') {
234            Some((client_id, client_secret)) => {
235                Some((Some(client_id.to_string()), client_secret.to_string()))
236            }
237            None => Some((None, cred.to_string())),
238        }
239    }
240
241    /// Get the extra headers from config, which includes:
242    ///
243    /// - `content-type`
244    /// - `x-client-version`
245    /// - `user-agent`
246    /// - All headers specified by `header.xxx` in props.
247    pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
248        let mut headers = HeaderMap::from_iter([
249            (
250                header::CONTENT_TYPE,
251                HeaderValue::from_static("application/json"),
252            ),
253            (
254                HeaderName::from_static("x-client-version"),
255                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
256            ),
257            (
258                header::USER_AGENT,
259                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
260            ),
261        ]);
262
263        for (key, value) in self
264            .props
265            .iter()
266            .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
267        {
268            headers.insert(
269                HeaderName::from_str(key).map_err(|e| {
270                    Error::new(
271                        ErrorKind::DataInvalid,
272                        format!("Invalid header name: {key}"),
273                    )
274                    .with_source(e)
275                })?,
276                HeaderValue::from_str(value).map_err(|e| {
277                    Error::new(
278                        ErrorKind::DataInvalid,
279                        format!("Invalid header value: {value}"),
280                    )
281                    .with_source(e)
282                })?,
283            );
284        }
285
286        Ok(headers)
287    }
288
289    /// Get the optional OAuth headers from the config.
290    pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
291        let mut params = HashMap::new();
292
293        if let Some(scope) = self.props.get("scope") {
294            params.insert("scope".to_string(), scope.to_string());
295        } else {
296            params.insert("scope".to_string(), "catalog".to_string());
297        }
298
299        let optional_params = ["audience", "resource"];
300        for param_name in optional_params {
301            if let Some(value) = self.props.get(param_name) {
302                params.insert(param_name.to_string(), value.to_string());
303            }
304        }
305
306        params
307    }
308
309    /// Check if header redaction is disabled in error logs.
310    ///
311    /// Returns true if the `disable-header-redaction` property is set to "true".
312    /// Defaults to false for security (headers are redacted by default).
313    pub(crate) fn disable_header_redaction(&self) -> bool {
314        self.props
315            .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
316            .map(|v| v.eq_ignore_ascii_case("true"))
317            .unwrap_or(false)
318    }
319
320    /// Merge the `RestCatalogConfig` with the a [`CatalogConfig`] (fetched from the REST server).
321    pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
322        if let Some(uri) = config.overrides.remove("uri") {
323            self.uri = uri;
324        }
325
326        let mut props = config.defaults;
327        props.extend(self.props);
328        props.extend(config.overrides);
329
330        self.props = props;
331        self
332    }
333}
334
335#[derive(Debug)]
336struct RestContext {
337    client: HttpClient,
338    /// Runtime config is fetched from rest server and stored here.
339    ///
340    /// It's could be different from the user config.
341    config: RestCatalogConfig,
342}
343
344/// Rest catalog implementation.
345#[derive(Debug)]
346pub struct RestCatalog {
347    /// User config is stored as-is and never be changed.
348    ///
349    /// It could be different from the config fetched from the server and used at runtime.
350    user_config: RestCatalogConfig,
351    ctx: OnceCell<RestContext>,
352    /// Storage factory for creating FileIO instances.
353    storage_factory: Option<Arc<dyn StorageFactory>>,
354}
355
356impl RestCatalog {
357    /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
358    fn new(config: RestCatalogConfig, storage_factory: Option<Arc<dyn StorageFactory>>) -> Self {
359        Self {
360            user_config: config,
361            ctx: OnceCell::new(),
362            storage_factory,
363        }
364    }
365
366    /// Gets the [`RestContext`] from the catalog.
367    async fn context(&self) -> Result<&RestContext> {
368        self.ctx
369            .get_or_try_init(|| async {
370                let client = HttpClient::new(&self.user_config)?;
371                let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
372                let config = self.user_config.clone().merge_with_config(catalog_config);
373                let client = client.update_with(&config)?;
374
375                Ok(RestContext { config, client })
376            })
377            .await
378    }
379
380    /// Load the runtime config from the server by `user_config`.
381    ///
382    /// It's required for a REST catalog to update its config after creation.
383    async fn load_config(
384        client: &HttpClient,
385        user_config: &RestCatalogConfig,
386    ) -> Result<CatalogConfig> {
387        let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
388
389        if let Some(warehouse_location) = &user_config.warehouse {
390            request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
391        }
392
393        let request = request_builder.build()?;
394
395        let http_response = client.query_catalog(request).await?;
396
397        match http_response.status() {
398            StatusCode::OK => deserialize_catalog_response(http_response).await,
399            _ => Err(deserialize_unexpected_catalog_error(
400                http_response,
401                client.disable_header_redaction(),
402            )
403            .await),
404        }
405    }
406
407    async fn load_file_io(
408        &self,
409        metadata_location: Option<&str>,
410        extra_config: Option<HashMap<String, String>>,
411    ) -> Result<FileIO> {
412        let mut props = self.context().await?.config.props.clone();
413        if let Some(config) = extra_config {
414            props.extend(config);
415        }
416
417        // If the warehouse is a logical identifier instead of a URL we don't want
418        // to raise an exception
419        let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
420            Some(url) if Url::parse(url).is_ok() => Some(url),
421            Some(_) => None,
422            None => None,
423        };
424
425        if metadata_location.or(warehouse_path).is_none() {
426            return Err(Error::new(
427                ErrorKind::Unexpected,
428                "Unable to load file io, neither warehouse nor metadata location is set!",
429            ));
430        }
431
432        // Require a StorageFactory to be provided
433        let factory = self
434            .storage_factory
435            .clone()
436            .ok_or_else(|| {
437                Error::new(
438                    ErrorKind::Unexpected,
439                    "StorageFactory must be provided for RestCatalog. Use `with_storage_factory` to configure it.",
440                )
441            })?;
442
443        let file_io = FileIOBuilder::new(factory).with_props(props).build();
444
445        Ok(file_io)
446    }
447
448    /// Invalidate the current token without generating a new one. On the next request, the client
449    /// will attempt to generate a new token.
450    pub async fn invalidate_token(&self) -> Result<()> {
451        self.context().await?.client.invalidate_token().await
452    }
453
454    /// Invalidate the current token and set a new one. Generates a new token before invalidating
455    /// the current token, meaning the old token will be used until this function acquires the lock
456    /// and overwrites the token.
457    ///
458    /// If credential is invalid, or the request fails, this method will return an error and leave
459    /// the current token unchanged.
460    pub async fn regenerate_token(&self) -> Result<()> {
461        self.context().await?.client.regenerate_token().await
462    }
463}
464
465/// All requests and expected responses are derived from the REST catalog API spec:
466/// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
467#[async_trait]
468impl Catalog for RestCatalog {
469    async fn list_namespaces(
470        &self,
471        parent: Option<&NamespaceIdent>,
472    ) -> Result<Vec<NamespaceIdent>> {
473        let context = self.context().await?;
474        let endpoint = context.config.namespaces_endpoint();
475        let mut namespaces = Vec::new();
476        let mut next_token = None;
477
478        loop {
479            let mut request = context.client.request(Method::GET, endpoint.clone());
480
481            // Filter on `parent={namespace}` if a parent namespace exists.
482            if let Some(ns) = parent {
483                request = request.query(&[("parent", ns.to_url_string())]);
484            }
485
486            if let Some(token) = next_token {
487                request = request.query(&[("pageToken", token)]);
488            }
489
490            let http_response = context.client.query_catalog(request.build()?).await?;
491
492            match http_response.status() {
493                StatusCode::OK => {
494                    let response =
495                        deserialize_catalog_response::<ListNamespaceResponse>(http_response)
496                            .await?;
497
498                    namespaces.extend(response.namespaces);
499
500                    match response.next_page_token {
501                        Some(token) => next_token = Some(token),
502                        None => break,
503                    }
504                }
505                StatusCode::NOT_FOUND => {
506                    return Err(Error::new(
507                        ErrorKind::NamespaceNotFound,
508                        "The parent parameter of the namespace provided does not exist",
509                    ));
510                }
511                _ => {
512                    return Err(deserialize_unexpected_catalog_error(
513                        http_response,
514                        context.client.disable_header_redaction(),
515                    )
516                    .await);
517                }
518            }
519        }
520
521        Ok(namespaces)
522    }
523
524    async fn create_namespace(
525        &self,
526        namespace: &NamespaceIdent,
527        properties: HashMap<String, String>,
528    ) -> Result<Namespace> {
529        let context = self.context().await?;
530
531        let request = context
532            .client
533            .request(Method::POST, context.config.namespaces_endpoint())
534            .json(&CreateNamespaceRequest {
535                namespace: namespace.clone(),
536                properties,
537            })
538            .build()?;
539
540        let http_response = context.client.query_catalog(request).await?;
541
542        match http_response.status() {
543            StatusCode::OK => {
544                let response =
545                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
546                Ok(Namespace::from(response))
547            }
548            StatusCode::CONFLICT => Err(Error::new(
549                ErrorKind::NamespaceAlreadyExists,
550                "Tried to create a namespace that already exists",
551            )),
552            _ => Err(deserialize_unexpected_catalog_error(
553                http_response,
554                context.client.disable_header_redaction(),
555            )
556            .await),
557        }
558    }
559
560    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
561        let context = self.context().await?;
562
563        let request = context
564            .client
565            .request(Method::GET, context.config.namespace_endpoint(namespace))
566            .build()?;
567
568        let http_response = context.client.query_catalog(request).await?;
569
570        match http_response.status() {
571            StatusCode::OK => {
572                let response =
573                    deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
574                Ok(Namespace::from(response))
575            }
576            StatusCode::NOT_FOUND => Err(Error::new(
577                ErrorKind::NamespaceNotFound,
578                "Tried to get a namespace that does not exist",
579            )),
580            _ => Err(deserialize_unexpected_catalog_error(
581                http_response,
582                context.client.disable_header_redaction(),
583            )
584            .await),
585        }
586    }
587
588    async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
589        let context = self.context().await?;
590
591        let request = context
592            .client
593            .request(Method::HEAD, context.config.namespace_endpoint(ns))
594            .build()?;
595
596        let http_response = context.client.query_catalog(request).await?;
597
598        match http_response.status() {
599            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
600            StatusCode::NOT_FOUND => Ok(false),
601            _ => Err(deserialize_unexpected_catalog_error(
602                http_response,
603                context.client.disable_header_redaction(),
604            )
605            .await),
606        }
607    }
608
609    async fn update_namespace(
610        &self,
611        _namespace: &NamespaceIdent,
612        _properties: HashMap<String, String>,
613    ) -> Result<()> {
614        Err(Error::new(
615            ErrorKind::FeatureUnsupported,
616            "Updating namespace not supported yet!",
617        ))
618    }
619
620    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
621        let context = self.context().await?;
622
623        let request = context
624            .client
625            .request(Method::DELETE, context.config.namespace_endpoint(namespace))
626            .build()?;
627
628        let http_response = context.client.query_catalog(request).await?;
629
630        match http_response.status() {
631            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
632            StatusCode::NOT_FOUND => Err(Error::new(
633                ErrorKind::NamespaceNotFound,
634                "Tried to drop a namespace that does not exist",
635            )),
636            _ => Err(deserialize_unexpected_catalog_error(
637                http_response,
638                context.client.disable_header_redaction(),
639            )
640            .await),
641        }
642    }
643
644    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
645        let context = self.context().await?;
646        let endpoint = context.config.tables_endpoint(namespace);
647        let mut identifiers = Vec::new();
648        let mut next_token = None;
649
650        loop {
651            let mut request = context.client.request(Method::GET, endpoint.clone());
652
653            if let Some(token) = next_token {
654                request = request.query(&[("pageToken", token)]);
655            }
656
657            let http_response = context.client.query_catalog(request.build()?).await?;
658
659            match http_response.status() {
660                StatusCode::OK => {
661                    let response =
662                        deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
663
664                    identifiers.extend(response.identifiers);
665
666                    match response.next_page_token {
667                        Some(token) => next_token = Some(token),
668                        None => break,
669                    }
670                }
671                StatusCode::NOT_FOUND => {
672                    return Err(Error::new(
673                        ErrorKind::NamespaceNotFound,
674                        "Tried to list tables of a namespace that does not exist",
675                    ));
676                }
677                _ => {
678                    return Err(deserialize_unexpected_catalog_error(
679                        http_response,
680                        context.client.disable_header_redaction(),
681                    )
682                    .await);
683                }
684            }
685        }
686
687        Ok(identifiers)
688    }
689
690    /// Create a new table inside the namespace.
691    ///
692    /// In the resulting table, if there are any config properties that
693    /// are present in both the response from the REST server and the
694    /// config provided when creating this `RestCatalog` instance then
695    /// the value provided locally to the `RestCatalog` will take precedence.
696    async fn create_table(
697        &self,
698        namespace: &NamespaceIdent,
699        creation: TableCreation,
700    ) -> Result<Table> {
701        let context = self.context().await?;
702
703        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
704
705        let request = context
706            .client
707            .request(Method::POST, context.config.tables_endpoint(namespace))
708            .json(&CreateTableRequest {
709                name: creation.name,
710                location: creation.location,
711                schema: creation.schema,
712                partition_spec: creation.partition_spec,
713                write_order: creation.sort_order,
714                stage_create: Some(false),
715                properties: creation.properties,
716            })
717            .build()?;
718
719        let http_response = context.client.query_catalog(request).await?;
720
721        let response = match http_response.status() {
722            StatusCode::OK => {
723                deserialize_catalog_response::<LoadTableResult>(http_response).await?
724            }
725            StatusCode::NOT_FOUND => {
726                return Err(Error::new(
727                    ErrorKind::NamespaceNotFound,
728                    "Tried to create a table under a namespace that does not exist",
729                ));
730            }
731            StatusCode::CONFLICT => {
732                return Err(Error::new(
733                    ErrorKind::TableAlreadyExists,
734                    "The table already exists",
735                ));
736            }
737            _ => {
738                return Err(deserialize_unexpected_catalog_error(
739                    http_response,
740                    context.client.disable_header_redaction(),
741                )
742                .await);
743            }
744        };
745
746        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
747            ErrorKind::DataInvalid,
748            "Metadata location missing in `create_table` response!",
749        ))?;
750
751        let config = response
752            .config
753            .into_iter()
754            .chain(self.user_config.props.clone())
755            .collect();
756
757        let file_io = self
758            .load_file_io(Some(metadata_location), Some(config))
759            .await?;
760
761        let table_builder = Table::builder()
762            .identifier(table_ident.clone())
763            .file_io(file_io)
764            .metadata(response.metadata);
765
766        if let Some(metadata_location) = response.metadata_location {
767            table_builder.metadata_location(metadata_location).build()
768        } else {
769            table_builder.build()
770        }
771    }
772
773    /// Load table from the catalog.
774    ///
775    /// If there are any config properties that are present in both the response from the REST
776    /// server and the config provided when creating this `RestCatalog` instance, then the value
777    /// provided locally to the `RestCatalog` will take precedence.
778    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
779        let context = self.context().await?;
780
781        let request = context
782            .client
783            .request(Method::GET, context.config.table_endpoint(table_ident))
784            .build()?;
785
786        let http_response = context.client.query_catalog(request).await?;
787
788        let response = match http_response.status() {
789            StatusCode::OK | StatusCode::NOT_MODIFIED => {
790                deserialize_catalog_response::<LoadTableResult>(http_response).await?
791            }
792            StatusCode::NOT_FOUND => {
793                return Err(Error::new(
794                    ErrorKind::TableNotFound,
795                    "Tried to load a table that does not exist",
796                ));
797            }
798            _ => {
799                return Err(deserialize_unexpected_catalog_error(
800                    http_response,
801                    context.client.disable_header_redaction(),
802                )
803                .await);
804            }
805        };
806
807        let config = response
808            .config
809            .into_iter()
810            .chain(self.user_config.props.clone())
811            .collect();
812
813        let file_io = self
814            .load_file_io(response.metadata_location.as_deref(), Some(config))
815            .await?;
816
817        let table_builder = Table::builder()
818            .identifier(table_ident.clone())
819            .file_io(file_io)
820            .metadata(response.metadata);
821
822        if let Some(metadata_location) = response.metadata_location {
823            table_builder.metadata_location(metadata_location).build()
824        } else {
825            table_builder.build()
826        }
827    }
828
829    /// Drop a table from the catalog.
830    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
831        let context = self.context().await?;
832
833        let request = context
834            .client
835            .request(Method::DELETE, context.config.table_endpoint(table))
836            .build()?;
837
838        let http_response = context.client.query_catalog(request).await?;
839
840        match http_response.status() {
841            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
842            StatusCode::NOT_FOUND => Err(Error::new(
843                ErrorKind::TableNotFound,
844                "Tried to drop a table that does not exist",
845            )),
846            _ => Err(deserialize_unexpected_catalog_error(
847                http_response,
848                context.client.disable_header_redaction(),
849            )
850            .await),
851        }
852    }
853
854    /// Check if a table exists in the catalog.
855    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
856        let context = self.context().await?;
857
858        let request = context
859            .client
860            .request(Method::HEAD, context.config.table_endpoint(table))
861            .build()?;
862
863        let http_response = context.client.query_catalog(request).await?;
864
865        match http_response.status() {
866            StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
867            StatusCode::NOT_FOUND => Ok(false),
868            _ => Err(deserialize_unexpected_catalog_error(
869                http_response,
870                context.client.disable_header_redaction(),
871            )
872            .await),
873        }
874    }
875
876    /// Rename a table in the catalog.
877    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
878        let context = self.context().await?;
879
880        let request = context
881            .client
882            .request(Method::POST, context.config.rename_table_endpoint())
883            .json(&RenameTableRequest {
884                source: src.clone(),
885                destination: dest.clone(),
886            })
887            .build()?;
888
889        let http_response = context.client.query_catalog(request).await?;
890
891        match http_response.status() {
892            StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
893            StatusCode::NOT_FOUND => Err(Error::new(
894                ErrorKind::TableNotFound,
895                "Tried to rename a table that does not exist (is the namespace correct?)",
896            )),
897            StatusCode::CONFLICT => Err(Error::new(
898                ErrorKind::TableAlreadyExists,
899                "Tried to rename a table to a name that already exists",
900            )),
901            _ => Err(deserialize_unexpected_catalog_error(
902                http_response,
903                context.client.disable_header_redaction(),
904            )
905            .await),
906        }
907    }
908
909    async fn register_table(
910        &self,
911        table_ident: &TableIdent,
912        metadata_location: String,
913    ) -> Result<Table> {
914        let context = self.context().await?;
915
916        let request = context
917            .client
918            .request(
919                Method::POST,
920                context
921                    .config
922                    .register_table_endpoint(table_ident.namespace()),
923            )
924            .json(&RegisterTableRequest {
925                name: table_ident.name.clone(),
926                metadata_location: metadata_location.clone(),
927                overwrite: Some(false),
928            })
929            .build()?;
930
931        let http_response = context.client.query_catalog(request).await?;
932
933        let response: LoadTableResult = match http_response.status() {
934            StatusCode::OK => {
935                deserialize_catalog_response::<LoadTableResult>(http_response).await?
936            }
937            StatusCode::NOT_FOUND => {
938                return Err(Error::new(
939                    ErrorKind::NamespaceNotFound,
940                    "The namespace specified does not exist.",
941                ));
942            }
943            StatusCode::CONFLICT => {
944                return Err(Error::new(
945                    ErrorKind::TableAlreadyExists,
946                    "The given table already exists.",
947                ));
948            }
949            _ => {
950                return Err(deserialize_unexpected_catalog_error(
951                    http_response,
952                    context.client.disable_header_redaction(),
953                )
954                .await);
955            }
956        };
957
958        let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
959            ErrorKind::DataInvalid,
960            "Metadata location missing in `register_table` response!",
961        ))?;
962
963        let file_io = self.load_file_io(Some(metadata_location), None).await?;
964
965        Table::builder()
966            .identifier(table_ident.clone())
967            .file_io(file_io)
968            .metadata(response.metadata)
969            .metadata_location(metadata_location.clone())
970            .build()
971    }
972
973    async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
974        let context = self.context().await?;
975
976        let request = context
977            .client
978            .request(
979                Method::POST,
980                context.config.table_endpoint(commit.identifier()),
981            )
982            .json(&CommitTableRequest {
983                identifier: Some(commit.identifier().clone()),
984                requirements: commit.take_requirements(),
985                updates: commit.take_updates(),
986            })
987            .build()?;
988
989        let http_response = context.client.query_catalog(request).await?;
990
991        let response: CommitTableResponse = match http_response.status() {
992            StatusCode::OK => deserialize_catalog_response(http_response).await?,
993            StatusCode::NOT_FOUND => {
994                return Err(Error::new(
995                    ErrorKind::TableNotFound,
996                    "Tried to update a table that does not exist",
997                ));
998            }
999            StatusCode::CONFLICT => {
1000                return Err(Error::new(
1001                    ErrorKind::CatalogCommitConflicts,
1002                    "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
1003                )
1004                .with_retryable(true));
1005            }
1006            StatusCode::INTERNAL_SERVER_ERROR => {
1007                return Err(Error::new(
1008                    ErrorKind::Unexpected,
1009                    "An unknown server-side problem occurred; the commit state is unknown.",
1010                ));
1011            }
1012            StatusCode::BAD_GATEWAY => {
1013                return Err(Error::new(
1014                    ErrorKind::Unexpected,
1015                    "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1016                ));
1017            }
1018            StatusCode::GATEWAY_TIMEOUT => {
1019                return Err(Error::new(
1020                    ErrorKind::Unexpected,
1021                    "A server-side gateway timeout occurred; the commit state is unknown.",
1022                ));
1023            }
1024            _ => {
1025                return Err(deserialize_unexpected_catalog_error(
1026                    http_response,
1027                    context.client.disable_header_redaction(),
1028                )
1029                .await);
1030            }
1031        };
1032
1033        let file_io = self
1034            .load_file_io(Some(&response.metadata_location), None)
1035            .await?;
1036
1037        Table::builder()
1038            .identifier(commit.identifier().clone())
1039            .file_io(file_io)
1040            .metadata(response.metadata)
1041            .metadata_location(response.metadata_location)
1042            .build()
1043    }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use std::fs::File;
1049    use std::io::BufReader;
1050    use std::sync::Arc;
1051
1052    use chrono::{TimeZone, Utc};
1053    use iceberg::io::LocalFsStorageFactory;
1054    use iceberg::spec::{
1055        FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1056        SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1057        UnboundPartitionField, UnboundPartitionSpec,
1058    };
1059    use iceberg::transaction::{ApplyTransactionAction, Transaction};
1060    use mockito::{Mock, Server, ServerGuard};
1061    use serde_json::json;
1062    use uuid::uuid;
1063
1064    use super::*;
1065
1066    #[tokio::test]
1067    async fn test_update_config() {
1068        let mut server = Server::new_async().await;
1069
1070        let config_mock = server
1071            .mock("GET", "/v1/config")
1072            .with_status(200)
1073            .with_body(
1074                r#"{
1075                "overrides": {
1076                    "warehouse": "s3://iceberg-catalog"
1077                },
1078                "defaults": {}
1079            }"#,
1080            )
1081            .create_async()
1082            .await;
1083
1084        let catalog = RestCatalog::new(
1085            RestCatalogConfig::builder().uri(server.url()).build(),
1086            Some(Arc::new(LocalFsStorageFactory)),
1087        );
1088
1089        assert_eq!(
1090            catalog
1091                .context()
1092                .await
1093                .unwrap()
1094                .config
1095                .props
1096                .get("warehouse"),
1097            Some(&"s3://iceberg-catalog".to_string())
1098        );
1099
1100        config_mock.assert_async().await;
1101    }
1102
1103    async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1104        server
1105            .mock("GET", "/v1/config")
1106            .with_status(200)
1107            .with_body(
1108                r#"{
1109                "overrides": {
1110                    "warehouse": "s3://iceberg-catalog"
1111                },
1112                "defaults": {}
1113            }"#,
1114            )
1115            .create_async()
1116            .await
1117    }
1118
1119    async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1120        create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1121    }
1122
1123    async fn create_oauth_mock_with_path(
1124        server: &mut ServerGuard,
1125        path: &str,
1126        token: &str,
1127        status: usize,
1128    ) -> Mock {
1129        let body = format!(
1130            r#"{{
1131                "access_token": "{token}",
1132                "token_type": "Bearer",
1133                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1134                "expires_in": 86400
1135            }}"#
1136        );
1137        server
1138            .mock("POST", path)
1139            .with_status(status)
1140            .with_body(body)
1141            .expect(1)
1142            .create_async()
1143            .await
1144    }
1145
1146    #[tokio::test]
1147    async fn test_oauth() {
1148        let mut server = Server::new_async().await;
1149        let oauth_mock = create_oauth_mock(&mut server).await;
1150        let config_mock = create_config_mock(&mut server).await;
1151
1152        let mut props = HashMap::new();
1153        props.insert("credential".to_string(), "client1:secret1".to_string());
1154
1155        let catalog = RestCatalog::new(
1156            RestCatalogConfig::builder()
1157                .uri(server.url())
1158                .props(props)
1159                .build(),
1160            Some(Arc::new(LocalFsStorageFactory)),
1161        );
1162
1163        let token = catalog.context().await.unwrap().client.token().await;
1164        oauth_mock.assert_async().await;
1165        config_mock.assert_async().await;
1166        assert_eq!(token, Some("ey000000000000".to_string()));
1167    }
1168
1169    #[tokio::test]
1170    async fn test_oauth_with_optional_param() {
1171        let mut props = HashMap::new();
1172        props.insert("credential".to_string(), "client1:secret1".to_string());
1173        props.insert("scope".to_string(), "custom_scope".to_string());
1174        props.insert("audience".to_string(), "custom_audience".to_string());
1175        props.insert("resource".to_string(), "custom_resource".to_string());
1176
1177        let mut server = Server::new_async().await;
1178        let oauth_mock = server
1179            .mock("POST", "/v1/oauth/tokens")
1180            .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1181            .match_body(mockito::Matcher::Regex(
1182                "audience=custom_audience".to_string(),
1183            ))
1184            .match_body(mockito::Matcher::Regex(
1185                "resource=custom_resource".to_string(),
1186            ))
1187            .with_status(200)
1188            .with_body(
1189                r#"{
1190                "access_token": "ey000000000000",
1191                "token_type": "Bearer",
1192                "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1193                "expires_in": 86400
1194                }"#,
1195            )
1196            .expect(1)
1197            .create_async()
1198            .await;
1199
1200        let config_mock = create_config_mock(&mut server).await;
1201
1202        let catalog = RestCatalog::new(
1203            RestCatalogConfig::builder()
1204                .uri(server.url())
1205                .props(props)
1206                .build(),
1207            Some(Arc::new(LocalFsStorageFactory)),
1208        );
1209
1210        let token = catalog.context().await.unwrap().client.token().await;
1211
1212        oauth_mock.assert_async().await;
1213        config_mock.assert_async().await;
1214        assert_eq!(token, Some("ey000000000000".to_string()));
1215    }
1216
1217    #[tokio::test]
1218    async fn test_invalidate_token() {
1219        let mut server = Server::new_async().await;
1220        let oauth_mock = create_oauth_mock(&mut server).await;
1221        let config_mock = create_config_mock(&mut server).await;
1222
1223        let mut props = HashMap::new();
1224        props.insert("credential".to_string(), "client1:secret1".to_string());
1225
1226        let catalog = RestCatalog::new(
1227            RestCatalogConfig::builder()
1228                .uri(server.url())
1229                .props(props)
1230                .build(),
1231            Some(Arc::new(LocalFsStorageFactory)),
1232        );
1233
1234        let token = catalog.context().await.unwrap().client.token().await;
1235        oauth_mock.assert_async().await;
1236        config_mock.assert_async().await;
1237        assert_eq!(token, Some("ey000000000000".to_string()));
1238
1239        let oauth_mock =
1240            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1241                .await;
1242        catalog.invalidate_token().await.unwrap();
1243        let token = catalog.context().await.unwrap().client.token().await;
1244        oauth_mock.assert_async().await;
1245        assert_eq!(token, Some("ey000000000001".to_string()));
1246    }
1247
1248    #[tokio::test]
1249    async fn test_invalidate_token_failing_request() {
1250        let mut server = Server::new_async().await;
1251        let oauth_mock = create_oauth_mock(&mut server).await;
1252        let config_mock = create_config_mock(&mut server).await;
1253
1254        let mut props = HashMap::new();
1255        props.insert("credential".to_string(), "client1:secret1".to_string());
1256
1257        let catalog = RestCatalog::new(
1258            RestCatalogConfig::builder()
1259                .uri(server.url())
1260                .props(props)
1261                .build(),
1262            Some(Arc::new(LocalFsStorageFactory)),
1263        );
1264
1265        let token = catalog.context().await.unwrap().client.token().await;
1266        oauth_mock.assert_async().await;
1267        config_mock.assert_async().await;
1268        assert_eq!(token, Some("ey000000000000".to_string()));
1269
1270        let oauth_mock =
1271            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1272                .await;
1273        catalog.invalidate_token().await.unwrap();
1274        let token = catalog.context().await.unwrap().client.token().await;
1275        oauth_mock.assert_async().await;
1276        assert_eq!(token, None);
1277    }
1278
1279    #[tokio::test]
1280    async fn test_regenerate_token() {
1281        let mut server = Server::new_async().await;
1282        let oauth_mock = create_oauth_mock(&mut server).await;
1283        let config_mock = create_config_mock(&mut server).await;
1284
1285        let mut props = HashMap::new();
1286        props.insert("credential".to_string(), "client1:secret1".to_string());
1287
1288        let catalog = RestCatalog::new(
1289            RestCatalogConfig::builder()
1290                .uri(server.url())
1291                .props(props)
1292                .build(),
1293            Some(Arc::new(LocalFsStorageFactory)),
1294        );
1295
1296        let token = catalog.context().await.unwrap().client.token().await;
1297        oauth_mock.assert_async().await;
1298        config_mock.assert_async().await;
1299        assert_eq!(token, Some("ey000000000000".to_string()));
1300
1301        let oauth_mock =
1302            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1303                .await;
1304        catalog.regenerate_token().await.unwrap();
1305        oauth_mock.assert_async().await;
1306        let token = catalog.context().await.unwrap().client.token().await;
1307        assert_eq!(token, Some("ey000000000001".to_string()));
1308    }
1309
1310    #[tokio::test]
1311    async fn test_regenerate_token_failing_request() {
1312        let mut server = Server::new_async().await;
1313        let oauth_mock = create_oauth_mock(&mut server).await;
1314        let config_mock = create_config_mock(&mut server).await;
1315
1316        let mut props = HashMap::new();
1317        props.insert("credential".to_string(), "client1:secret1".to_string());
1318
1319        let catalog = RestCatalog::new(
1320            RestCatalogConfig::builder()
1321                .uri(server.url())
1322                .props(props)
1323                .build(),
1324            Some(Arc::new(LocalFsStorageFactory)),
1325        );
1326
1327        let token = catalog.context().await.unwrap().client.token().await;
1328        oauth_mock.assert_async().await;
1329        config_mock.assert_async().await;
1330        assert_eq!(token, Some("ey000000000000".to_string()));
1331
1332        let oauth_mock =
1333            create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1334                .await;
1335        let invalidate_result = catalog.regenerate_token().await;
1336        assert!(invalidate_result.is_err());
1337        oauth_mock.assert_async().await;
1338        let token = catalog.context().await.unwrap().client.token().await;
1339
1340        // original token is left intact
1341        assert_eq!(token, Some("ey000000000000".to_string()));
1342    }
1343
1344    #[tokio::test]
1345    async fn test_http_headers() {
1346        let server = Server::new_async().await;
1347        let mut props = HashMap::new();
1348        props.insert("credential".to_string(), "client1:secret1".to_string());
1349
1350        let config = RestCatalogConfig::builder()
1351            .uri(server.url())
1352            .props(props)
1353            .build();
1354        let headers: HeaderMap = config.extra_headers().unwrap();
1355
1356        let expected_headers = HeaderMap::from_iter([
1357            (
1358                header::CONTENT_TYPE,
1359                HeaderValue::from_static("application/json"),
1360            ),
1361            (
1362                HeaderName::from_static("x-client-version"),
1363                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1364            ),
1365            (
1366                header::USER_AGENT,
1367                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1368            ),
1369        ]);
1370        assert_eq!(headers, expected_headers);
1371    }
1372
1373    #[tokio::test]
1374    async fn test_http_headers_with_custom_headers() {
1375        let server = Server::new_async().await;
1376        let mut props = HashMap::new();
1377        props.insert("credential".to_string(), "client1:secret1".to_string());
1378        props.insert(
1379            "header.content-type".to_string(),
1380            "application/yaml".to_string(),
1381        );
1382        props.insert(
1383            "header.customized-header".to_string(),
1384            "some/value".to_string(),
1385        );
1386
1387        let config = RestCatalogConfig::builder()
1388            .uri(server.url())
1389            .props(props)
1390            .build();
1391        let headers: HeaderMap = config.extra_headers().unwrap();
1392
1393        let expected_headers = HeaderMap::from_iter([
1394            (
1395                header::CONTENT_TYPE,
1396                HeaderValue::from_static("application/yaml"),
1397            ),
1398            (
1399                HeaderName::from_static("x-client-version"),
1400                HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1401            ),
1402            (
1403                header::USER_AGENT,
1404                HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1405            ),
1406            (
1407                HeaderName::from_static("customized-header"),
1408                HeaderValue::from_static("some/value"),
1409            ),
1410        ]);
1411        assert_eq!(headers, expected_headers);
1412    }
1413
1414    #[tokio::test]
1415    async fn test_oauth_with_oauth2_server_uri() {
1416        let mut server = Server::new_async().await;
1417        let config_mock = create_config_mock(&mut server).await;
1418
1419        let mut auth_server = Server::new_async().await;
1420        let auth_server_path = "/some/path";
1421        let oauth_mock =
1422            create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1423                .await;
1424
1425        let mut props = HashMap::new();
1426        props.insert("credential".to_string(), "client1:secret1".to_string());
1427        props.insert(
1428            "oauth2-server-uri".to_string(),
1429            format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1430        );
1431
1432        let catalog = RestCatalog::new(
1433            RestCatalogConfig::builder()
1434                .uri(server.url())
1435                .props(props)
1436                .build(),
1437            Some(Arc::new(LocalFsStorageFactory)),
1438        );
1439
1440        let token = catalog.context().await.unwrap().client.token().await;
1441
1442        oauth_mock.assert_async().await;
1443        config_mock.assert_async().await;
1444        assert_eq!(token, Some("ey000000000000".to_string()));
1445    }
1446
1447    #[tokio::test]
1448    async fn test_config_override() {
1449        let mut server = Server::new_async().await;
1450        let mut redirect_server = Server::new_async().await;
1451        let new_uri = redirect_server.url();
1452
1453        let config_mock = server
1454            .mock("GET", "/v1/config")
1455            .with_status(200)
1456            .with_body(
1457                json!(
1458                    {
1459                        "overrides": {
1460                            "uri": new_uri,
1461                            "warehouse": "s3://iceberg-catalog",
1462                            "prefix": "ice/warehouses/my"
1463                        },
1464                        "defaults": {},
1465                    }
1466                )
1467                .to_string(),
1468            )
1469            .create_async()
1470            .await;
1471
1472        let list_ns_mock = redirect_server
1473            .mock("GET", "/v1/ice/warehouses/my/namespaces")
1474            .with_body(
1475                r#"{
1476                    "namespaces": []
1477                }"#,
1478            )
1479            .create_async()
1480            .await;
1481
1482        let catalog = RestCatalog::new(
1483            RestCatalogConfig::builder().uri(server.url()).build(),
1484            Some(Arc::new(LocalFsStorageFactory)),
1485        );
1486
1487        let _namespaces = catalog.list_namespaces(None).await.unwrap();
1488
1489        config_mock.assert_async().await;
1490        list_ns_mock.assert_async().await;
1491    }
1492
1493    #[tokio::test]
1494    async fn test_list_namespace() {
1495        let mut server = Server::new_async().await;
1496
1497        let config_mock = create_config_mock(&mut server).await;
1498
1499        let list_ns_mock = server
1500            .mock("GET", "/v1/namespaces")
1501            .with_body(
1502                r#"{
1503                "namespaces": [
1504                    ["ns1", "ns11"],
1505                    ["ns2"]
1506                ]
1507            }"#,
1508            )
1509            .create_async()
1510            .await;
1511
1512        let catalog = RestCatalog::new(
1513            RestCatalogConfig::builder().uri(server.url()).build(),
1514            Some(Arc::new(LocalFsStorageFactory)),
1515        );
1516
1517        let namespaces = catalog.list_namespaces(None).await.unwrap();
1518
1519        let expected_ns = vec![
1520            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1521            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1522        ];
1523
1524        assert_eq!(expected_ns, namespaces);
1525
1526        config_mock.assert_async().await;
1527        list_ns_mock.assert_async().await;
1528    }
1529
1530    #[tokio::test]
1531    async fn test_list_namespace_with_pagination() {
1532        let mut server = Server::new_async().await;
1533
1534        let config_mock = create_config_mock(&mut server).await;
1535
1536        let list_ns_mock_page1 = server
1537            .mock("GET", "/v1/namespaces")
1538            .with_body(
1539                r#"{
1540                "namespaces": [
1541                    ["ns1", "ns11"],
1542                    ["ns2"]
1543                ],
1544                "next-page-token": "token123"
1545            }"#,
1546            )
1547            .create_async()
1548            .await;
1549
1550        let list_ns_mock_page2 = server
1551            .mock("GET", "/v1/namespaces?pageToken=token123")
1552            .with_body(
1553                r#"{
1554                "namespaces": [
1555                    ["ns3"],
1556                    ["ns4", "ns41"]
1557                ]
1558            }"#,
1559            )
1560            .create_async()
1561            .await;
1562
1563        let catalog = RestCatalog::new(
1564            RestCatalogConfig::builder().uri(server.url()).build(),
1565            Some(Arc::new(LocalFsStorageFactory)),
1566        );
1567
1568        let namespaces = catalog.list_namespaces(None).await.unwrap();
1569
1570        let expected_ns = vec![
1571            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1572            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1573            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1574            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1575        ];
1576
1577        assert_eq!(expected_ns, namespaces);
1578
1579        config_mock.assert_async().await;
1580        list_ns_mock_page1.assert_async().await;
1581        list_ns_mock_page2.assert_async().await;
1582    }
1583
1584    #[tokio::test]
1585    async fn test_list_namespace_with_multiple_pages() {
1586        let mut server = Server::new_async().await;
1587
1588        let config_mock = create_config_mock(&mut server).await;
1589
1590        // Page 1
1591        let list_ns_mock_page1 = server
1592            .mock("GET", "/v1/namespaces")
1593            .with_body(
1594                r#"{
1595                "namespaces": [
1596                    ["ns1", "ns11"],
1597                    ["ns2"]
1598                ],
1599                "next-page-token": "page2"
1600            }"#,
1601            )
1602            .create_async()
1603            .await;
1604
1605        // Page 2
1606        let list_ns_mock_page2 = server
1607            .mock("GET", "/v1/namespaces?pageToken=page2")
1608            .with_body(
1609                r#"{
1610                "namespaces": [
1611                    ["ns3"],
1612                    ["ns4", "ns41"]
1613                ],
1614                "next-page-token": "page3"
1615            }"#,
1616            )
1617            .create_async()
1618            .await;
1619
1620        // Page 3
1621        let list_ns_mock_page3 = server
1622            .mock("GET", "/v1/namespaces?pageToken=page3")
1623            .with_body(
1624                r#"{
1625                "namespaces": [
1626                    ["ns5", "ns51", "ns511"]
1627                ],
1628                "next-page-token": "page4"
1629            }"#,
1630            )
1631            .create_async()
1632            .await;
1633
1634        // Page 4
1635        let list_ns_mock_page4 = server
1636            .mock("GET", "/v1/namespaces?pageToken=page4")
1637            .with_body(
1638                r#"{
1639                "namespaces": [
1640                    ["ns6"],
1641                    ["ns7"]
1642                ],
1643                "next-page-token": "page5"
1644            }"#,
1645            )
1646            .create_async()
1647            .await;
1648
1649        // Page 5 (final page)
1650        let list_ns_mock_page5 = server
1651            .mock("GET", "/v1/namespaces?pageToken=page5")
1652            .with_body(
1653                r#"{
1654                "namespaces": [
1655                    ["ns8", "ns81"]
1656                ]
1657            }"#,
1658            )
1659            .create_async()
1660            .await;
1661
1662        let catalog = RestCatalog::new(
1663            RestCatalogConfig::builder().uri(server.url()).build(),
1664            Some(Arc::new(LocalFsStorageFactory)),
1665        );
1666
1667        let namespaces = catalog.list_namespaces(None).await.unwrap();
1668
1669        let expected_ns = vec![
1670            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1671            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1672            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1673            NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1674            NamespaceIdent::from_vec(vec![
1675                "ns5".to_string(),
1676                "ns51".to_string(),
1677                "ns511".to_string(),
1678            ])
1679            .unwrap(),
1680            NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1681            NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1682            NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1683        ];
1684
1685        assert_eq!(expected_ns, namespaces);
1686
1687        // Verify all page requests were made
1688        config_mock.assert_async().await;
1689        list_ns_mock_page1.assert_async().await;
1690        list_ns_mock_page2.assert_async().await;
1691        list_ns_mock_page3.assert_async().await;
1692        list_ns_mock_page4.assert_async().await;
1693        list_ns_mock_page5.assert_async().await;
1694    }
1695
1696    #[tokio::test]
1697    async fn test_create_namespace() {
1698        let mut server = Server::new_async().await;
1699
1700        let config_mock = create_config_mock(&mut server).await;
1701
1702        let create_ns_mock = server
1703            .mock("POST", "/v1/namespaces")
1704            .with_body(
1705                r#"{
1706                "namespace": [ "ns1", "ns11"],
1707                "properties" : {
1708                    "key1": "value1"
1709                }
1710            }"#,
1711            )
1712            .create_async()
1713            .await;
1714
1715        let catalog = RestCatalog::new(
1716            RestCatalogConfig::builder().uri(server.url()).build(),
1717            Some(Arc::new(LocalFsStorageFactory)),
1718        );
1719
1720        let namespaces = catalog
1721            .create_namespace(
1722                &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1723                HashMap::from([("key1".to_string(), "value1".to_string())]),
1724            )
1725            .await
1726            .unwrap();
1727
1728        let expected_ns = Namespace::with_properties(
1729            NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1730            HashMap::from([("key1".to_string(), "value1".to_string())]),
1731        );
1732
1733        assert_eq!(expected_ns, namespaces);
1734
1735        config_mock.assert_async().await;
1736        create_ns_mock.assert_async().await;
1737    }
1738
1739    #[tokio::test]
1740    async fn test_get_namespace() {
1741        let mut server = Server::new_async().await;
1742
1743        let config_mock = create_config_mock(&mut server).await;
1744
1745        let get_ns_mock = server
1746            .mock("GET", "/v1/namespaces/ns1")
1747            .with_body(
1748                r#"{
1749                "namespace": [ "ns1"],
1750                "properties" : {
1751                    "key1": "value1"
1752                }
1753            }"#,
1754            )
1755            .create_async()
1756            .await;
1757
1758        let catalog = RestCatalog::new(
1759            RestCatalogConfig::builder().uri(server.url()).build(),
1760            Some(Arc::new(LocalFsStorageFactory)),
1761        );
1762
1763        let namespaces = catalog
1764            .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1765            .await
1766            .unwrap();
1767
1768        let expected_ns = Namespace::with_properties(
1769            NamespaceIdent::new("ns1".to_string()),
1770            HashMap::from([("key1".to_string(), "value1".to_string())]),
1771        );
1772
1773        assert_eq!(expected_ns, namespaces);
1774
1775        config_mock.assert_async().await;
1776        get_ns_mock.assert_async().await;
1777    }
1778
1779    #[tokio::test]
1780    async fn check_namespace_exists() {
1781        let mut server = Server::new_async().await;
1782
1783        let config_mock = create_config_mock(&mut server).await;
1784
1785        let get_ns_mock = server
1786            .mock("HEAD", "/v1/namespaces/ns1")
1787            .with_status(204)
1788            .create_async()
1789            .await;
1790
1791        let catalog = RestCatalog::new(
1792            RestCatalogConfig::builder().uri(server.url()).build(),
1793            Some(Arc::new(LocalFsStorageFactory)),
1794        );
1795
1796        assert!(
1797            catalog
1798                .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1799                .await
1800                .unwrap()
1801        );
1802
1803        config_mock.assert_async().await;
1804        get_ns_mock.assert_async().await;
1805    }
1806
1807    #[tokio::test]
1808    async fn test_drop_namespace() {
1809        let mut server = Server::new_async().await;
1810
1811        let config_mock = create_config_mock(&mut server).await;
1812
1813        let drop_ns_mock = server
1814            .mock("DELETE", "/v1/namespaces/ns1")
1815            .with_status(204)
1816            .create_async()
1817            .await;
1818
1819        let catalog = RestCatalog::new(
1820            RestCatalogConfig::builder().uri(server.url()).build(),
1821            Some(Arc::new(LocalFsStorageFactory)),
1822        );
1823
1824        catalog
1825            .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1826            .await
1827            .unwrap();
1828
1829        config_mock.assert_async().await;
1830        drop_ns_mock.assert_async().await;
1831    }
1832
1833    #[tokio::test]
1834    async fn test_list_tables() {
1835        let mut server = Server::new_async().await;
1836
1837        let config_mock = create_config_mock(&mut server).await;
1838
1839        let list_tables_mock = server
1840            .mock("GET", "/v1/namespaces/ns1/tables")
1841            .with_status(200)
1842            .with_body(
1843                r#"{
1844                "identifiers": [
1845                    {
1846                        "namespace": ["ns1"],
1847                        "name": "table1"
1848                    },
1849                    {
1850                        "namespace": ["ns1"],
1851                        "name": "table2"
1852                    }
1853                ]
1854            }"#,
1855            )
1856            .create_async()
1857            .await;
1858
1859        let catalog = RestCatalog::new(
1860            RestCatalogConfig::builder().uri(server.url()).build(),
1861            Some(Arc::new(LocalFsStorageFactory)),
1862        );
1863
1864        let tables = catalog
1865            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1866            .await
1867            .unwrap();
1868
1869        let expected_tables = vec![
1870            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1871            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1872        ];
1873
1874        assert_eq!(tables, expected_tables);
1875
1876        config_mock.assert_async().await;
1877        list_tables_mock.assert_async().await;
1878    }
1879
1880    #[tokio::test]
1881    async fn test_list_tables_with_pagination() {
1882        let mut server = Server::new_async().await;
1883
1884        let config_mock = create_config_mock(&mut server).await;
1885
1886        let list_tables_mock_page1 = server
1887            .mock("GET", "/v1/namespaces/ns1/tables")
1888            .with_status(200)
1889            .with_body(
1890                r#"{
1891                "identifiers": [
1892                    {
1893                        "namespace": ["ns1"],
1894                        "name": "table1"
1895                    },
1896                    {
1897                        "namespace": ["ns1"],
1898                        "name": "table2"
1899                    }
1900                ],
1901                "next-page-token": "token456"
1902            }"#,
1903            )
1904            .create_async()
1905            .await;
1906
1907        let list_tables_mock_page2 = server
1908            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1909            .with_status(200)
1910            .with_body(
1911                r#"{
1912                "identifiers": [
1913                    {
1914                        "namespace": ["ns1"],
1915                        "name": "table3"
1916                    },
1917                    {
1918                        "namespace": ["ns1"],
1919                        "name": "table4"
1920                    }
1921                ]
1922            }"#,
1923            )
1924            .create_async()
1925            .await;
1926
1927        let catalog = RestCatalog::new(
1928            RestCatalogConfig::builder().uri(server.url()).build(),
1929            Some(Arc::new(LocalFsStorageFactory)),
1930        );
1931
1932        let tables = catalog
1933            .list_tables(&NamespaceIdent::new("ns1".to_string()))
1934            .await
1935            .unwrap();
1936
1937        let expected_tables = vec![
1938            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1939            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1940            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1941            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1942        ];
1943
1944        assert_eq!(tables, expected_tables);
1945
1946        config_mock.assert_async().await;
1947        list_tables_mock_page1.assert_async().await;
1948        list_tables_mock_page2.assert_async().await;
1949    }
1950
1951    #[tokio::test]
1952    async fn test_list_tables_with_multiple_pages() {
1953        let mut server = Server::new_async().await;
1954
1955        let config_mock = create_config_mock(&mut server).await;
1956
1957        // Page 1
1958        let list_tables_mock_page1 = server
1959            .mock("GET", "/v1/namespaces/ns1/tables")
1960            .with_status(200)
1961            .with_body(
1962                r#"{
1963                "identifiers": [
1964                    {
1965                        "namespace": ["ns1"],
1966                        "name": "table1"
1967                    },
1968                    {
1969                        "namespace": ["ns1"],
1970                        "name": "table2"
1971                    }
1972                ],
1973                "next-page-token": "page2"
1974            }"#,
1975            )
1976            .create_async()
1977            .await;
1978
1979        // Page 2
1980        let list_tables_mock_page2 = server
1981            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1982            .with_status(200)
1983            .with_body(
1984                r#"{
1985                "identifiers": [
1986                    {
1987                        "namespace": ["ns1"],
1988                        "name": "table3"
1989                    },
1990                    {
1991                        "namespace": ["ns1"],
1992                        "name": "table4"
1993                    }
1994                ],
1995                "next-page-token": "page3"
1996            }"#,
1997            )
1998            .create_async()
1999            .await;
2000
2001        // Page 3
2002        let list_tables_mock_page3 = server
2003            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
2004            .with_status(200)
2005            .with_body(
2006                r#"{
2007                "identifiers": [
2008                    {
2009                        "namespace": ["ns1"],
2010                        "name": "table5"
2011                    }
2012                ],
2013                "next-page-token": "page4"
2014            }"#,
2015            )
2016            .create_async()
2017            .await;
2018
2019        // Page 4
2020        let list_tables_mock_page4 = server
2021            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
2022            .with_status(200)
2023            .with_body(
2024                r#"{
2025                "identifiers": [
2026                    {
2027                        "namespace": ["ns1"],
2028                        "name": "table6"
2029                    },
2030                    {
2031                        "namespace": ["ns1"],
2032                        "name": "table7"
2033                    }
2034                ],
2035                "next-page-token": "page5"
2036            }"#,
2037            )
2038            .create_async()
2039            .await;
2040
2041        // Page 5 (final page)
2042        let list_tables_mock_page5 = server
2043            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
2044            .with_status(200)
2045            .with_body(
2046                r#"{
2047                "identifiers": [
2048                    {
2049                        "namespace": ["ns1"],
2050                        "name": "table8"
2051                    }
2052                ]
2053            }"#,
2054            )
2055            .create_async()
2056            .await;
2057
2058        let catalog = RestCatalog::new(
2059            RestCatalogConfig::builder().uri(server.url()).build(),
2060            Some(Arc::new(LocalFsStorageFactory)),
2061        );
2062
2063        let tables = catalog
2064            .list_tables(&NamespaceIdent::new("ns1".to_string()))
2065            .await
2066            .unwrap();
2067
2068        let expected_tables = vec![
2069            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2070            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2071            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2072            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2073            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2074            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2075            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2076            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2077        ];
2078
2079        assert_eq!(tables, expected_tables);
2080
2081        // Verify all page requests were made
2082        config_mock.assert_async().await;
2083        list_tables_mock_page1.assert_async().await;
2084        list_tables_mock_page2.assert_async().await;
2085        list_tables_mock_page3.assert_async().await;
2086        list_tables_mock_page4.assert_async().await;
2087        list_tables_mock_page5.assert_async().await;
2088    }
2089
2090    #[tokio::test]
2091    async fn test_drop_tables() {
2092        let mut server = Server::new_async().await;
2093
2094        let config_mock = create_config_mock(&mut server).await;
2095
2096        let delete_table_mock = server
2097            .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2098            .with_status(204)
2099            .create_async()
2100            .await;
2101
2102        let catalog = RestCatalog::new(
2103            RestCatalogConfig::builder().uri(server.url()).build(),
2104            Some(Arc::new(LocalFsStorageFactory)),
2105        );
2106
2107        catalog
2108            .drop_table(&TableIdent::new(
2109                NamespaceIdent::new("ns1".to_string()),
2110                "table1".to_string(),
2111            ))
2112            .await
2113            .unwrap();
2114
2115        config_mock.assert_async().await;
2116        delete_table_mock.assert_async().await;
2117    }
2118
2119    #[tokio::test]
2120    async fn test_check_table_exists() {
2121        let mut server = Server::new_async().await;
2122
2123        let config_mock = create_config_mock(&mut server).await;
2124
2125        let check_table_exists_mock = server
2126            .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2127            .with_status(204)
2128            .create_async()
2129            .await;
2130
2131        let catalog = RestCatalog::new(
2132            RestCatalogConfig::builder().uri(server.url()).build(),
2133            Some(Arc::new(LocalFsStorageFactory)),
2134        );
2135
2136        assert!(
2137            catalog
2138                .table_exists(&TableIdent::new(
2139                    NamespaceIdent::new("ns1".to_string()),
2140                    "table1".to_string(),
2141                ))
2142                .await
2143                .unwrap()
2144        );
2145
2146        config_mock.assert_async().await;
2147        check_table_exists_mock.assert_async().await;
2148    }
2149
2150    #[tokio::test]
2151    async fn test_rename_table() {
2152        let mut server = Server::new_async().await;
2153
2154        let config_mock = create_config_mock(&mut server).await;
2155
2156        let rename_table_mock = server
2157            .mock("POST", "/v1/tables/rename")
2158            .with_status(204)
2159            .create_async()
2160            .await;
2161
2162        let catalog = RestCatalog::new(
2163            RestCatalogConfig::builder().uri(server.url()).build(),
2164            Some(Arc::new(LocalFsStorageFactory)),
2165        );
2166
2167        catalog
2168            .rename_table(
2169                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2170                &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2171            )
2172            .await
2173            .unwrap();
2174
2175        config_mock.assert_async().await;
2176        rename_table_mock.assert_async().await;
2177    }
2178
2179    #[tokio::test]
2180    async fn test_load_table() {
2181        let mut server = Server::new_async().await;
2182
2183        let config_mock = create_config_mock(&mut server).await;
2184
2185        let rename_table_mock = server
2186            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2187            .with_status(200)
2188            .with_body_from_file(format!(
2189                "{}/testdata/{}",
2190                env!("CARGO_MANIFEST_DIR"),
2191                "load_table_response.json"
2192            ))
2193            .create_async()
2194            .await;
2195
2196        let catalog = RestCatalog::new(
2197            RestCatalogConfig::builder().uri(server.url()).build(),
2198            Some(Arc::new(LocalFsStorageFactory)),
2199        );
2200
2201        let table = catalog
2202            .load_table(&TableIdent::new(
2203                NamespaceIdent::new("ns1".to_string()),
2204                "test1".to_string(),
2205            ))
2206            .await
2207            .unwrap();
2208
2209        assert_eq!(
2210            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2211            table.identifier()
2212        );
2213        assert_eq!(
2214            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2215            table.metadata_location().unwrap()
2216        );
2217        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2218        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2219        assert_eq!(
2220            uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2221            table.metadata().uuid()
2222        );
2223        assert_eq!(
2224            Utc.timestamp_millis_opt(1646787054459).unwrap(),
2225            table.metadata().last_updated_timestamp().unwrap()
2226        );
2227        assert_eq!(
2228            vec![&Arc::new(
2229                Schema::builder()
2230                    .with_fields(vec![
2231                        NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2232                        NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2233                            .into(),
2234                    ])
2235                    .build()
2236                    .unwrap()
2237            )],
2238            table.metadata().schemas_iter().collect::<Vec<_>>()
2239        );
2240        assert_eq!(
2241            &HashMap::from([
2242                ("owner".to_string(), "bryan".to_string()),
2243                (
2244                    "write.metadata.compression-codec".to_string(),
2245                    "gzip".to_string()
2246                )
2247            ]),
2248            table.metadata().properties()
2249        );
2250        assert_eq!(vec![&Arc::new(Snapshot::builder()
2251            .with_snapshot_id(3497810964824022504)
2252            .with_timestamp_ms(1646787054459)
2253            .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2254            .with_sequence_number(0)
2255            .with_schema_id(0)
2256            .with_summary(Summary {
2257                operation: Operation::Append,
2258                additional_properties: HashMap::from_iter([
2259                    ("spark.app.id", "local-1646787004168"),
2260                    ("added-data-files", "1"),
2261                    ("added-records", "1"),
2262                    ("added-files-size", "697"),
2263                    ("changed-partition-count", "1"),
2264                    ("total-records", "1"),
2265                    ("total-files-size", "697"),
2266                    ("total-data-files", "1"),
2267                    ("total-delete-files", "0"),
2268                    ("total-position-deletes", "0"),
2269                    ("total-equality-deletes", "0")
2270                ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2271            }).build()
2272        )], table.metadata().snapshots().collect::<Vec<_>>());
2273        assert_eq!(
2274            &[SnapshotLog {
2275                timestamp_ms: 1646787054459,
2276                snapshot_id: 3497810964824022504,
2277            }],
2278            table.metadata().history()
2279        );
2280        assert_eq!(
2281            vec![&Arc::new(SortOrder {
2282                order_id: 0,
2283                fields: vec![],
2284            })],
2285            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2286        );
2287
2288        config_mock.assert_async().await;
2289        rename_table_mock.assert_async().await;
2290    }
2291
2292    #[tokio::test]
2293    async fn test_load_table_404() {
2294        let mut server = Server::new_async().await;
2295
2296        let config_mock = create_config_mock(&mut server).await;
2297
2298        let rename_table_mock = server
2299            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2300            .with_status(404)
2301            .with_body(r#"
2302{
2303    "error": {
2304        "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2305        "type": "NoSuchNamespaceErrorException",
2306        "code": 404
2307    }
2308}
2309            "#)
2310            .create_async()
2311            .await;
2312
2313        let catalog = RestCatalog::new(
2314            RestCatalogConfig::builder().uri(server.url()).build(),
2315            Some(Arc::new(LocalFsStorageFactory)),
2316        );
2317
2318        let table = catalog
2319            .load_table(&TableIdent::new(
2320                NamespaceIdent::new("ns1".to_string()),
2321                "test1".to_string(),
2322            ))
2323            .await;
2324
2325        assert!(table.is_err());
2326        assert!(table.err().unwrap().message().contains("does not exist"));
2327
2328        config_mock.assert_async().await;
2329        rename_table_mock.assert_async().await;
2330    }
2331
2332    #[tokio::test]
2333    async fn test_create_table() {
2334        let mut server = Server::new_async().await;
2335
2336        let config_mock = create_config_mock(&mut server).await;
2337
2338        let create_table_mock = server
2339            .mock("POST", "/v1/namespaces/ns1/tables")
2340            .with_status(200)
2341            .with_body_from_file(format!(
2342                "{}/testdata/{}",
2343                env!("CARGO_MANIFEST_DIR"),
2344                "create_table_response.json"
2345            ))
2346            .create_async()
2347            .await;
2348
2349        let catalog = RestCatalog::new(
2350            RestCatalogConfig::builder().uri(server.url()).build(),
2351            Some(Arc::new(LocalFsStorageFactory)),
2352        );
2353
2354        let table_creation = TableCreation::builder()
2355            .name("test1".to_string())
2356            .schema(
2357                Schema::builder()
2358                    .with_fields(vec![
2359                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2360                            .into(),
2361                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2362                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2363                            .into(),
2364                    ])
2365                    .with_schema_id(1)
2366                    .with_identifier_field_ids(vec![2])
2367                    .build()
2368                    .unwrap(),
2369            )
2370            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2371            .partition_spec(
2372                UnboundPartitionSpec::builder()
2373                    .add_partition_fields(vec![
2374                        UnboundPartitionField::builder()
2375                            .source_id(1)
2376                            .transform(Transform::Truncate(3))
2377                            .name("id".to_string())
2378                            .build(),
2379                    ])
2380                    .unwrap()
2381                    .build(),
2382            )
2383            .sort_order(
2384                SortOrder::builder()
2385                    .with_sort_field(
2386                        SortField::builder()
2387                            .source_id(2)
2388                            .transform(Transform::Identity)
2389                            .direction(SortDirection::Ascending)
2390                            .null_order(NullOrder::First)
2391                            .build(),
2392                    )
2393                    .build_unbound()
2394                    .unwrap(),
2395            )
2396            .build();
2397
2398        let table = catalog
2399            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2400            .await
2401            .unwrap();
2402
2403        assert_eq!(
2404            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2405            table.identifier()
2406        );
2407        assert_eq!(
2408            "s3://warehouse/database/table/metadata.json",
2409            table.metadata_location().unwrap()
2410        );
2411        assert_eq!(FormatVersion::V1, table.metadata().format_version());
2412        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2413        assert_eq!(
2414            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2415            table.metadata().uuid()
2416        );
2417        assert_eq!(
2418            1657810967051,
2419            table
2420                .metadata()
2421                .last_updated_timestamp()
2422                .unwrap()
2423                .timestamp_millis()
2424        );
2425        assert_eq!(
2426            vec![&Arc::new(
2427                Schema::builder()
2428                    .with_fields(vec![
2429                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2430                            .into(),
2431                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2432                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2433                            .into(),
2434                    ])
2435                    .with_schema_id(0)
2436                    .with_identifier_field_ids(vec![2])
2437                    .build()
2438                    .unwrap()
2439            )],
2440            table.metadata().schemas_iter().collect::<Vec<_>>()
2441        );
2442        assert_eq!(
2443            &HashMap::from([
2444                (
2445                    "write.delete.parquet.compression-codec".to_string(),
2446                    "zstd".to_string()
2447                ),
2448                (
2449                    "write.metadata.compression-codec".to_string(),
2450                    "gzip".to_string()
2451                ),
2452                (
2453                    "write.summary.partition-limit".to_string(),
2454                    "100".to_string()
2455                ),
2456                (
2457                    "write.parquet.compression-codec".to_string(),
2458                    "zstd".to_string()
2459                ),
2460            ]),
2461            table.metadata().properties()
2462        );
2463        assert!(table.metadata().current_snapshot().is_none());
2464        assert!(table.metadata().history().is_empty());
2465        assert_eq!(
2466            vec![&Arc::new(SortOrder {
2467                order_id: 0,
2468                fields: vec![],
2469            })],
2470            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2471        );
2472
2473        config_mock.assert_async().await;
2474        create_table_mock.assert_async().await;
2475    }
2476
2477    #[tokio::test]
2478    async fn test_create_table_409() {
2479        let mut server = Server::new_async().await;
2480
2481        let config_mock = create_config_mock(&mut server).await;
2482
2483        let create_table_mock = server
2484            .mock("POST", "/v1/namespaces/ns1/tables")
2485            .with_status(409)
2486            .with_body(r#"
2487{
2488    "error": {
2489        "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2490        "type": "AlreadyExistsException",
2491        "code": 409
2492    }
2493}
2494            "#)
2495            .create_async()
2496            .await;
2497
2498        let catalog = RestCatalog::new(
2499            RestCatalogConfig::builder().uri(server.url()).build(),
2500            Some(Arc::new(LocalFsStorageFactory)),
2501        );
2502
2503        let table_creation = TableCreation::builder()
2504            .name("test1".to_string())
2505            .schema(
2506                Schema::builder()
2507                    .with_fields(vec![
2508                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2509                            .into(),
2510                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2511                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2512                            .into(),
2513                    ])
2514                    .with_schema_id(1)
2515                    .with_identifier_field_ids(vec![2])
2516                    .build()
2517                    .unwrap(),
2518            )
2519            .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2520            .build();
2521
2522        let table_result = catalog
2523            .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2524            .await;
2525
2526        assert!(table_result.is_err());
2527        assert!(
2528            table_result
2529                .err()
2530                .unwrap()
2531                .message()
2532                .contains("already exists")
2533        );
2534
2535        config_mock.assert_async().await;
2536        create_table_mock.assert_async().await;
2537    }
2538
2539    #[tokio::test]
2540    async fn test_update_table() {
2541        let mut server = Server::new_async().await;
2542
2543        let config_mock = create_config_mock(&mut server).await;
2544
2545        let load_table_mock = server
2546            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2547            .with_status(200)
2548            .with_body_from_file(format!(
2549                "{}/testdata/{}",
2550                env!("CARGO_MANIFEST_DIR"),
2551                "load_table_response.json"
2552            ))
2553            .create_async()
2554            .await;
2555
2556        let update_table_mock = server
2557            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2558            .with_status(200)
2559            .with_body_from_file(format!(
2560                "{}/testdata/{}",
2561                env!("CARGO_MANIFEST_DIR"),
2562                "update_table_response.json"
2563            ))
2564            .create_async()
2565            .await;
2566
2567        let catalog = RestCatalog::new(
2568            RestCatalogConfig::builder().uri(server.url()).build(),
2569            Some(Arc::new(LocalFsStorageFactory)),
2570        );
2571
2572        let table1 = {
2573            let file = File::open(format!(
2574                "{}/testdata/{}",
2575                env!("CARGO_MANIFEST_DIR"),
2576                "create_table_response.json"
2577            ))
2578            .unwrap();
2579            let reader = BufReader::new(file);
2580            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2581
2582            Table::builder()
2583                .metadata(resp.metadata)
2584                .metadata_location(resp.metadata_location.unwrap())
2585                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2586                .file_io(FileIO::new_with_fs())
2587                .build()
2588                .unwrap()
2589        };
2590
2591        let tx = Transaction::new(&table1);
2592        let table = tx
2593            .upgrade_table_version()
2594            .set_format_version(FormatVersion::V2)
2595            .apply(tx)
2596            .unwrap()
2597            .commit(&catalog)
2598            .await
2599            .unwrap();
2600
2601        assert_eq!(
2602            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2603            table.identifier()
2604        );
2605        assert_eq!(
2606            "s3://warehouse/database/table/metadata.json",
2607            table.metadata_location().unwrap()
2608        );
2609        assert_eq!(FormatVersion::V2, table.metadata().format_version());
2610        assert_eq!("s3://warehouse/database/table", table.metadata().location());
2611        assert_eq!(
2612            uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2613            table.metadata().uuid()
2614        );
2615        assert_eq!(
2616            1657810967051,
2617            table
2618                .metadata()
2619                .last_updated_timestamp()
2620                .unwrap()
2621                .timestamp_millis()
2622        );
2623        assert_eq!(
2624            vec![&Arc::new(
2625                Schema::builder()
2626                    .with_fields(vec![
2627                        NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2628                            .into(),
2629                        NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2630                        NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2631                            .into(),
2632                    ])
2633                    .with_schema_id(0)
2634                    .with_identifier_field_ids(vec![2])
2635                    .build()
2636                    .unwrap()
2637            )],
2638            table.metadata().schemas_iter().collect::<Vec<_>>()
2639        );
2640        assert_eq!(
2641            &HashMap::from([
2642                (
2643                    "write.delete.parquet.compression-codec".to_string(),
2644                    "zstd".to_string()
2645                ),
2646                (
2647                    "write.metadata.compression-codec".to_string(),
2648                    "gzip".to_string()
2649                ),
2650                (
2651                    "write.summary.partition-limit".to_string(),
2652                    "100".to_string()
2653                ),
2654                (
2655                    "write.parquet.compression-codec".to_string(),
2656                    "zstd".to_string()
2657                ),
2658            ]),
2659            table.metadata().properties()
2660        );
2661        assert!(table.metadata().current_snapshot().is_none());
2662        assert!(table.metadata().history().is_empty());
2663        assert_eq!(
2664            vec![&Arc::new(SortOrder {
2665                order_id: 0,
2666                fields: vec![],
2667            })],
2668            table.metadata().sort_orders_iter().collect::<Vec<_>>()
2669        );
2670
2671        config_mock.assert_async().await;
2672        update_table_mock.assert_async().await;
2673        load_table_mock.assert_async().await
2674    }
2675
2676    #[tokio::test]
2677    async fn test_update_table_404() {
2678        let mut server = Server::new_async().await;
2679
2680        let config_mock = create_config_mock(&mut server).await;
2681
2682        let load_table_mock = server
2683            .mock("GET", "/v1/namespaces/ns1/tables/test1")
2684            .with_status(200)
2685            .with_body_from_file(format!(
2686                "{}/testdata/{}",
2687                env!("CARGO_MANIFEST_DIR"),
2688                "load_table_response.json"
2689            ))
2690            .create_async()
2691            .await;
2692
2693        let update_table_mock = server
2694            .mock("POST", "/v1/namespaces/ns1/tables/test1")
2695            .with_status(404)
2696            .with_body(
2697                r#"
2698{
2699    "error": {
2700        "message": "The given table does not exist",
2701        "type": "NoSuchTableException",
2702        "code": 404
2703    }
2704}
2705            "#,
2706            )
2707            .create_async()
2708            .await;
2709
2710        let catalog = RestCatalog::new(
2711            RestCatalogConfig::builder().uri(server.url()).build(),
2712            Some(Arc::new(LocalFsStorageFactory)),
2713        );
2714
2715        let table1 = {
2716            let file = File::open(format!(
2717                "{}/testdata/{}",
2718                env!("CARGO_MANIFEST_DIR"),
2719                "create_table_response.json"
2720            ))
2721            .unwrap();
2722            let reader = BufReader::new(file);
2723            let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2724
2725            Table::builder()
2726                .metadata(resp.metadata)
2727                .metadata_location(resp.metadata_location.unwrap())
2728                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2729                .file_io(FileIO::new_with_fs())
2730                .build()
2731                .unwrap()
2732        };
2733
2734        let tx = Transaction::new(&table1);
2735        let table_result = tx
2736            .upgrade_table_version()
2737            .set_format_version(FormatVersion::V2)
2738            .apply(tx)
2739            .unwrap()
2740            .commit(&catalog)
2741            .await;
2742
2743        assert!(table_result.is_err());
2744        assert!(
2745            table_result
2746                .err()
2747                .unwrap()
2748                .message()
2749                .contains("does not exist")
2750        );
2751
2752        config_mock.assert_async().await;
2753        update_table_mock.assert_async().await;
2754        load_table_mock.assert_async().await;
2755    }
2756
2757    #[tokio::test]
2758    async fn test_register_table() {
2759        let mut server = Server::new_async().await;
2760
2761        let config_mock = create_config_mock(&mut server).await;
2762
2763        let register_table_mock = server
2764            .mock("POST", "/v1/namespaces/ns1/register")
2765            .with_status(200)
2766            .with_body_from_file(format!(
2767                "{}/testdata/{}",
2768                env!("CARGO_MANIFEST_DIR"),
2769                "load_table_response.json"
2770            ))
2771            .create_async()
2772            .await;
2773
2774        let catalog = RestCatalog::new(
2775            RestCatalogConfig::builder().uri(server.url()).build(),
2776            Some(Arc::new(LocalFsStorageFactory)),
2777        );
2778        let table_ident =
2779            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2780        let metadata_location = String::from(
2781            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2782        );
2783
2784        let table = catalog
2785            .register_table(&table_ident, metadata_location)
2786            .await
2787            .unwrap();
2788
2789        assert_eq!(
2790            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2791            table.identifier()
2792        );
2793        assert_eq!(
2794            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2795            table.metadata_location().unwrap()
2796        );
2797
2798        config_mock.assert_async().await;
2799        register_table_mock.assert_async().await;
2800    }
2801
2802    #[tokio::test]
2803    async fn test_register_table_404() {
2804        let mut server = Server::new_async().await;
2805
2806        let config_mock = create_config_mock(&mut server).await;
2807
2808        let register_table_mock = server
2809            .mock("POST", "/v1/namespaces/ns1/register")
2810            .with_status(404)
2811            .with_body(
2812                r#"
2813{
2814    "error": {
2815        "message": "The namespace specified does not exist",
2816        "type": "NoSuchNamespaceErrorException",
2817        "code": 404
2818    }
2819}
2820            "#,
2821            )
2822            .create_async()
2823            .await;
2824
2825        let catalog = RestCatalog::new(
2826            RestCatalogConfig::builder().uri(server.url()).build(),
2827            Some(Arc::new(LocalFsStorageFactory)),
2828        );
2829
2830        let table_ident =
2831            TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2832        let metadata_location = String::from(
2833            "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2834        );
2835        let table = catalog
2836            .register_table(&table_ident, metadata_location)
2837            .await;
2838
2839        assert!(table.is_err());
2840        assert!(table.err().unwrap().message().contains("does not exist"));
2841
2842        config_mock.assert_async().await;
2843        register_table_mock.assert_async().await;
2844    }
2845
2846    #[tokio::test]
2847    async fn test_create_rest_catalog() {
2848        let builder = RestCatalogBuilder::default().with_client(Client::new());
2849
2850        let catalog = builder
2851            .load(
2852                "test",
2853                HashMap::from([
2854                    (
2855                        REST_CATALOG_PROP_URI.to_string(),
2856                        "http://localhost:8080".to_string(),
2857                    ),
2858                    ("a".to_string(), "b".to_string()),
2859                ]),
2860            )
2861            .await;
2862
2863        assert!(catalog.is_ok());
2864
2865        let catalog_config = catalog.unwrap().user_config;
2866        assert_eq!(catalog_config.name.as_deref(), Some("test"));
2867        assert_eq!(catalog_config.uri, "http://localhost:8080");
2868        assert_eq!(catalog_config.warehouse, None);
2869        assert!(catalog_config.client.is_some());
2870
2871        assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2872        assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2873    }
2874
2875    #[tokio::test]
2876    async fn test_create_rest_catalog_no_uri() {
2877        let builder = RestCatalogBuilder::default();
2878
2879        let catalog = builder
2880            .load(
2881                "test",
2882                HashMap::from([(
2883                    REST_CATALOG_PROP_WAREHOUSE.to_string(),
2884                    "s3://warehouse".to_string(),
2885                )]),
2886            )
2887            .await;
2888
2889        assert!(catalog.is_err());
2890        if let Err(err) = catalog {
2891            assert_eq!(err.kind(), ErrorKind::DataInvalid);
2892            assert_eq!(err.message(), "Catalog uri is required");
2893        }
2894    }
2895}