1use std::collections::HashMap;
21use std::future::Future;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
27use iceberg::table::Table;
28use iceberg::{
29 Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30 TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34 HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41 HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44 CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45 CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46 NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49pub const REST_CATALOG_PROP_URI: &str = "uri";
51pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60#[derive(Debug)]
62pub struct RestCatalogBuilder {
63 config: RestCatalogConfig,
64 storage_factory: Option<Arc<dyn StorageFactory>>,
65}
66
67impl Default for RestCatalogBuilder {
68 fn default() -> Self {
69 Self {
70 config: RestCatalogConfig {
71 name: None,
72 uri: "".to_string(),
73 warehouse: None,
74 props: HashMap::new(),
75 client: None,
76 },
77 storage_factory: None,
78 }
79 }
80}
81
82impl CatalogBuilder for RestCatalogBuilder {
83 type C = RestCatalog;
84
85 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
86 self.storage_factory = Some(storage_factory);
87 self
88 }
89
90 fn load(
91 mut self,
92 name: impl Into<String>,
93 props: HashMap<String, String>,
94 ) -> impl Future<Output = Result<Self::C>> + Send {
95 self.config.name = Some(name.into());
96
97 if props.contains_key(REST_CATALOG_PROP_URI) {
98 self.config.uri = props
99 .get(REST_CATALOG_PROP_URI)
100 .cloned()
101 .unwrap_or_default();
102 }
103
104 if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
105 self.config.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
106 }
107
108 self.config.props = props
110 .into_iter()
111 .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
112 .collect();
113
114 let result = {
115 if self.config.name.is_none() {
116 Err(Error::new(
117 ErrorKind::DataInvalid,
118 "Catalog name is required",
119 ))
120 } else if self.config.uri.is_empty() {
121 Err(Error::new(
122 ErrorKind::DataInvalid,
123 "Catalog uri is required",
124 ))
125 } else {
126 Ok(RestCatalog::new(self.config, self.storage_factory))
127 }
128 };
129
130 std::future::ready(result)
131 }
132}
133
134impl RestCatalogBuilder {
135 pub fn with_client(mut self, client: Client) -> Self {
137 self.config.client = Some(client);
138 self
139 }
140}
141
142#[derive(Clone, Debug, TypedBuilder)]
144pub(crate) struct RestCatalogConfig {
145 #[builder(default, setter(strip_option))]
146 name: Option<String>,
147
148 uri: String,
149
150 #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
151 warehouse: Option<String>,
152
153 #[builder(default)]
154 props: HashMap<String, String>,
155
156 #[builder(default)]
157 client: Option<Client>,
158}
159
160impl RestCatalogConfig {
161 fn url_prefixed(&self, parts: &[&str]) -> String {
162 [&self.uri, PATH_V1]
163 .into_iter()
164 .chain(self.props.get("prefix").map(|s| &**s))
165 .chain(parts.iter().cloned())
166 .join("/")
167 }
168
169 fn config_endpoint(&self) -> String {
170 [&self.uri, PATH_V1, "config"].join("/")
171 }
172
173 pub(crate) fn get_token_endpoint(&self) -> String {
174 if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
175 oauth2_uri.to_string()
176 } else {
177 [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
178 }
179 }
180
181 fn namespaces_endpoint(&self) -> String {
182 self.url_prefixed(&["namespaces"])
183 }
184
185 fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
186 self.url_prefixed(&["namespaces", &ns.to_url_string()])
187 }
188
189 fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
190 self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
191 }
192
193 fn rename_table_endpoint(&self) -> String {
194 self.url_prefixed(&["tables", "rename"])
195 }
196
197 fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
198 self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
199 }
200
201 fn table_endpoint(&self, table: &TableIdent) -> String {
202 self.url_prefixed(&[
203 "namespaces",
204 &table.namespace.to_url_string(),
205 "tables",
206 &table.name,
207 ])
208 }
209
210 pub(crate) fn client(&self) -> Option<Client> {
212 self.client.clone()
213 }
214
215 pub(crate) fn token(&self) -> Option<String> {
219 self.props.get("token").cloned()
220 }
221
222 pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
231 let cred = self.props.get("credential")?;
232
233 match cred.split_once(':') {
234 Some((client_id, client_secret)) => {
235 Some((Some(client_id.to_string()), client_secret.to_string()))
236 }
237 None => Some((None, cred.to_string())),
238 }
239 }
240
241 pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
248 let mut headers = HeaderMap::from_iter([
249 (
250 header::CONTENT_TYPE,
251 HeaderValue::from_static("application/json"),
252 ),
253 (
254 HeaderName::from_static("x-client-version"),
255 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
256 ),
257 (
258 header::USER_AGENT,
259 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
260 ),
261 ]);
262
263 for (key, value) in self
264 .props
265 .iter()
266 .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
267 {
268 headers.insert(
269 HeaderName::from_str(key).map_err(|e| {
270 Error::new(
271 ErrorKind::DataInvalid,
272 format!("Invalid header name: {key}"),
273 )
274 .with_source(e)
275 })?,
276 HeaderValue::from_str(value).map_err(|e| {
277 Error::new(
278 ErrorKind::DataInvalid,
279 format!("Invalid header value: {value}"),
280 )
281 .with_source(e)
282 })?,
283 );
284 }
285
286 Ok(headers)
287 }
288
289 pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
291 let mut params = HashMap::new();
292
293 if let Some(scope) = self.props.get("scope") {
294 params.insert("scope".to_string(), scope.to_string());
295 } else {
296 params.insert("scope".to_string(), "catalog".to_string());
297 }
298
299 let optional_params = ["audience", "resource"];
300 for param_name in optional_params {
301 if let Some(value) = self.props.get(param_name) {
302 params.insert(param_name.to_string(), value.to_string());
303 }
304 }
305
306 params
307 }
308
309 pub(crate) fn disable_header_redaction(&self) -> bool {
314 self.props
315 .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
316 .map(|v| v.eq_ignore_ascii_case("true"))
317 .unwrap_or(false)
318 }
319
320 pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
322 if let Some(uri) = config.overrides.remove("uri") {
323 self.uri = uri;
324 }
325
326 let mut props = config.defaults;
327 props.extend(self.props);
328 props.extend(config.overrides);
329
330 self.props = props;
331 self
332 }
333}
334
335#[derive(Debug)]
336struct RestContext {
337 client: HttpClient,
338 config: RestCatalogConfig,
342}
343
344#[derive(Debug)]
346pub struct RestCatalog {
347 user_config: RestCatalogConfig,
351 ctx: OnceCell<RestContext>,
352 storage_factory: Option<Arc<dyn StorageFactory>>,
354}
355
356impl RestCatalog {
357 fn new(config: RestCatalogConfig, storage_factory: Option<Arc<dyn StorageFactory>>) -> Self {
359 Self {
360 user_config: config,
361 ctx: OnceCell::new(),
362 storage_factory,
363 }
364 }
365
366 async fn context(&self) -> Result<&RestContext> {
368 self.ctx
369 .get_or_try_init(|| async {
370 let client = HttpClient::new(&self.user_config)?;
371 let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
372 let config = self.user_config.clone().merge_with_config(catalog_config);
373 let client = client.update_with(&config)?;
374
375 Ok(RestContext { config, client })
376 })
377 .await
378 }
379
380 async fn load_config(
384 client: &HttpClient,
385 user_config: &RestCatalogConfig,
386 ) -> Result<CatalogConfig> {
387 let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
388
389 if let Some(warehouse_location) = &user_config.warehouse {
390 request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
391 }
392
393 let request = request_builder.build()?;
394
395 let http_response = client.query_catalog(request).await?;
396
397 match http_response.status() {
398 StatusCode::OK => deserialize_catalog_response(http_response).await,
399 _ => Err(deserialize_unexpected_catalog_error(
400 http_response,
401 client.disable_header_redaction(),
402 )
403 .await),
404 }
405 }
406
407 async fn load_file_io(
408 &self,
409 metadata_location: Option<&str>,
410 extra_config: Option<HashMap<String, String>>,
411 ) -> Result<FileIO> {
412 let mut props = self.context().await?.config.props.clone();
413 if let Some(config) = extra_config {
414 props.extend(config);
415 }
416
417 let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
420 Some(url) if Url::parse(url).is_ok() => Some(url),
421 Some(_) => None,
422 None => None,
423 };
424
425 if metadata_location.or(warehouse_path).is_none() {
426 return Err(Error::new(
427 ErrorKind::Unexpected,
428 "Unable to load file io, neither warehouse nor metadata location is set!",
429 ));
430 }
431
432 let factory = self
434 .storage_factory
435 .clone()
436 .ok_or_else(|| {
437 Error::new(
438 ErrorKind::Unexpected,
439 "StorageFactory must be provided for RestCatalog. Use `with_storage_factory` to configure it.",
440 )
441 })?;
442
443 let file_io = FileIOBuilder::new(factory).with_props(props).build();
444
445 Ok(file_io)
446 }
447
448 pub async fn invalidate_token(&self) -> Result<()> {
451 self.context().await?.client.invalidate_token().await
452 }
453
454 pub async fn regenerate_token(&self) -> Result<()> {
461 self.context().await?.client.regenerate_token().await
462 }
463}
464
465#[async_trait]
468impl Catalog for RestCatalog {
469 async fn list_namespaces(
470 &self,
471 parent: Option<&NamespaceIdent>,
472 ) -> Result<Vec<NamespaceIdent>> {
473 let context = self.context().await?;
474 let endpoint = context.config.namespaces_endpoint();
475 let mut namespaces = Vec::new();
476 let mut next_token = None;
477
478 loop {
479 let mut request = context.client.request(Method::GET, endpoint.clone());
480
481 if let Some(ns) = parent {
483 request = request.query(&[("parent", ns.to_url_string())]);
484 }
485
486 if let Some(token) = next_token {
487 request = request.query(&[("pageToken", token)]);
488 }
489
490 let http_response = context.client.query_catalog(request.build()?).await?;
491
492 match http_response.status() {
493 StatusCode::OK => {
494 let response =
495 deserialize_catalog_response::<ListNamespaceResponse>(http_response)
496 .await?;
497
498 namespaces.extend(response.namespaces);
499
500 match response.next_page_token {
501 Some(token) => next_token = Some(token),
502 None => break,
503 }
504 }
505 StatusCode::NOT_FOUND => {
506 return Err(Error::new(
507 ErrorKind::NamespaceNotFound,
508 "The parent parameter of the namespace provided does not exist",
509 ));
510 }
511 _ => {
512 return Err(deserialize_unexpected_catalog_error(
513 http_response,
514 context.client.disable_header_redaction(),
515 )
516 .await);
517 }
518 }
519 }
520
521 Ok(namespaces)
522 }
523
524 async fn create_namespace(
525 &self,
526 namespace: &NamespaceIdent,
527 properties: HashMap<String, String>,
528 ) -> Result<Namespace> {
529 let context = self.context().await?;
530
531 let request = context
532 .client
533 .request(Method::POST, context.config.namespaces_endpoint())
534 .json(&CreateNamespaceRequest {
535 namespace: namespace.clone(),
536 properties,
537 })
538 .build()?;
539
540 let http_response = context.client.query_catalog(request).await?;
541
542 match http_response.status() {
543 StatusCode::OK => {
544 let response =
545 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
546 Ok(Namespace::from(response))
547 }
548 StatusCode::CONFLICT => Err(Error::new(
549 ErrorKind::NamespaceAlreadyExists,
550 "Tried to create a namespace that already exists",
551 )),
552 _ => Err(deserialize_unexpected_catalog_error(
553 http_response,
554 context.client.disable_header_redaction(),
555 )
556 .await),
557 }
558 }
559
560 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
561 let context = self.context().await?;
562
563 let request = context
564 .client
565 .request(Method::GET, context.config.namespace_endpoint(namespace))
566 .build()?;
567
568 let http_response = context.client.query_catalog(request).await?;
569
570 match http_response.status() {
571 StatusCode::OK => {
572 let response =
573 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
574 Ok(Namespace::from(response))
575 }
576 StatusCode::NOT_FOUND => Err(Error::new(
577 ErrorKind::NamespaceNotFound,
578 "Tried to get a namespace that does not exist",
579 )),
580 _ => Err(deserialize_unexpected_catalog_error(
581 http_response,
582 context.client.disable_header_redaction(),
583 )
584 .await),
585 }
586 }
587
588 async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
589 let context = self.context().await?;
590
591 let request = context
592 .client
593 .request(Method::HEAD, context.config.namespace_endpoint(ns))
594 .build()?;
595
596 let http_response = context.client.query_catalog(request).await?;
597
598 match http_response.status() {
599 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
600 StatusCode::NOT_FOUND => Ok(false),
601 _ => Err(deserialize_unexpected_catalog_error(
602 http_response,
603 context.client.disable_header_redaction(),
604 )
605 .await),
606 }
607 }
608
609 async fn update_namespace(
610 &self,
611 _namespace: &NamespaceIdent,
612 _properties: HashMap<String, String>,
613 ) -> Result<()> {
614 Err(Error::new(
615 ErrorKind::FeatureUnsupported,
616 "Updating namespace not supported yet!",
617 ))
618 }
619
620 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
621 let context = self.context().await?;
622
623 let request = context
624 .client
625 .request(Method::DELETE, context.config.namespace_endpoint(namespace))
626 .build()?;
627
628 let http_response = context.client.query_catalog(request).await?;
629
630 match http_response.status() {
631 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
632 StatusCode::NOT_FOUND => Err(Error::new(
633 ErrorKind::NamespaceNotFound,
634 "Tried to drop a namespace that does not exist",
635 )),
636 _ => Err(deserialize_unexpected_catalog_error(
637 http_response,
638 context.client.disable_header_redaction(),
639 )
640 .await),
641 }
642 }
643
644 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
645 let context = self.context().await?;
646 let endpoint = context.config.tables_endpoint(namespace);
647 let mut identifiers = Vec::new();
648 let mut next_token = None;
649
650 loop {
651 let mut request = context.client.request(Method::GET, endpoint.clone());
652
653 if let Some(token) = next_token {
654 request = request.query(&[("pageToken", token)]);
655 }
656
657 let http_response = context.client.query_catalog(request.build()?).await?;
658
659 match http_response.status() {
660 StatusCode::OK => {
661 let response =
662 deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
663
664 identifiers.extend(response.identifiers);
665
666 match response.next_page_token {
667 Some(token) => next_token = Some(token),
668 None => break,
669 }
670 }
671 StatusCode::NOT_FOUND => {
672 return Err(Error::new(
673 ErrorKind::NamespaceNotFound,
674 "Tried to list tables of a namespace that does not exist",
675 ));
676 }
677 _ => {
678 return Err(deserialize_unexpected_catalog_error(
679 http_response,
680 context.client.disable_header_redaction(),
681 )
682 .await);
683 }
684 }
685 }
686
687 Ok(identifiers)
688 }
689
690 async fn create_table(
697 &self,
698 namespace: &NamespaceIdent,
699 creation: TableCreation,
700 ) -> Result<Table> {
701 let context = self.context().await?;
702
703 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
704
705 let request = context
706 .client
707 .request(Method::POST, context.config.tables_endpoint(namespace))
708 .json(&CreateTableRequest {
709 name: creation.name,
710 location: creation.location,
711 schema: creation.schema,
712 partition_spec: creation.partition_spec,
713 write_order: creation.sort_order,
714 stage_create: Some(false),
715 properties: creation.properties,
716 })
717 .build()?;
718
719 let http_response = context.client.query_catalog(request).await?;
720
721 let response = match http_response.status() {
722 StatusCode::OK => {
723 deserialize_catalog_response::<LoadTableResult>(http_response).await?
724 }
725 StatusCode::NOT_FOUND => {
726 return Err(Error::new(
727 ErrorKind::NamespaceNotFound,
728 "Tried to create a table under a namespace that does not exist",
729 ));
730 }
731 StatusCode::CONFLICT => {
732 return Err(Error::new(
733 ErrorKind::TableAlreadyExists,
734 "The table already exists",
735 ));
736 }
737 _ => {
738 return Err(deserialize_unexpected_catalog_error(
739 http_response,
740 context.client.disable_header_redaction(),
741 )
742 .await);
743 }
744 };
745
746 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
747 ErrorKind::DataInvalid,
748 "Metadata location missing in `create_table` response!",
749 ))?;
750
751 let config = response
752 .config
753 .into_iter()
754 .chain(self.user_config.props.clone())
755 .collect();
756
757 let file_io = self
758 .load_file_io(Some(metadata_location), Some(config))
759 .await?;
760
761 let table_builder = Table::builder()
762 .identifier(table_ident.clone())
763 .file_io(file_io)
764 .metadata(response.metadata);
765
766 if let Some(metadata_location) = response.metadata_location {
767 table_builder.metadata_location(metadata_location).build()
768 } else {
769 table_builder.build()
770 }
771 }
772
773 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
779 let context = self.context().await?;
780
781 let request = context
782 .client
783 .request(Method::GET, context.config.table_endpoint(table_ident))
784 .build()?;
785
786 let http_response = context.client.query_catalog(request).await?;
787
788 let response = match http_response.status() {
789 StatusCode::OK | StatusCode::NOT_MODIFIED => {
790 deserialize_catalog_response::<LoadTableResult>(http_response).await?
791 }
792 StatusCode::NOT_FOUND => {
793 return Err(Error::new(
794 ErrorKind::TableNotFound,
795 "Tried to load a table that does not exist",
796 ));
797 }
798 _ => {
799 return Err(deserialize_unexpected_catalog_error(
800 http_response,
801 context.client.disable_header_redaction(),
802 )
803 .await);
804 }
805 };
806
807 let config = response
808 .config
809 .into_iter()
810 .chain(self.user_config.props.clone())
811 .collect();
812
813 let file_io = self
814 .load_file_io(response.metadata_location.as_deref(), Some(config))
815 .await?;
816
817 let table_builder = Table::builder()
818 .identifier(table_ident.clone())
819 .file_io(file_io)
820 .metadata(response.metadata);
821
822 if let Some(metadata_location) = response.metadata_location {
823 table_builder.metadata_location(metadata_location).build()
824 } else {
825 table_builder.build()
826 }
827 }
828
829 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
831 let context = self.context().await?;
832
833 let request = context
834 .client
835 .request(Method::DELETE, context.config.table_endpoint(table))
836 .build()?;
837
838 let http_response = context.client.query_catalog(request).await?;
839
840 match http_response.status() {
841 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
842 StatusCode::NOT_FOUND => Err(Error::new(
843 ErrorKind::TableNotFound,
844 "Tried to drop a table that does not exist",
845 )),
846 _ => Err(deserialize_unexpected_catalog_error(
847 http_response,
848 context.client.disable_header_redaction(),
849 )
850 .await),
851 }
852 }
853
854 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
856 let context = self.context().await?;
857
858 let request = context
859 .client
860 .request(Method::HEAD, context.config.table_endpoint(table))
861 .build()?;
862
863 let http_response = context.client.query_catalog(request).await?;
864
865 match http_response.status() {
866 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
867 StatusCode::NOT_FOUND => Ok(false),
868 _ => Err(deserialize_unexpected_catalog_error(
869 http_response,
870 context.client.disable_header_redaction(),
871 )
872 .await),
873 }
874 }
875
876 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
878 let context = self.context().await?;
879
880 let request = context
881 .client
882 .request(Method::POST, context.config.rename_table_endpoint())
883 .json(&RenameTableRequest {
884 source: src.clone(),
885 destination: dest.clone(),
886 })
887 .build()?;
888
889 let http_response = context.client.query_catalog(request).await?;
890
891 match http_response.status() {
892 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
893 StatusCode::NOT_FOUND => Err(Error::new(
894 ErrorKind::TableNotFound,
895 "Tried to rename a table that does not exist (is the namespace correct?)",
896 )),
897 StatusCode::CONFLICT => Err(Error::new(
898 ErrorKind::TableAlreadyExists,
899 "Tried to rename a table to a name that already exists",
900 )),
901 _ => Err(deserialize_unexpected_catalog_error(
902 http_response,
903 context.client.disable_header_redaction(),
904 )
905 .await),
906 }
907 }
908
909 async fn register_table(
910 &self,
911 table_ident: &TableIdent,
912 metadata_location: String,
913 ) -> Result<Table> {
914 let context = self.context().await?;
915
916 let request = context
917 .client
918 .request(
919 Method::POST,
920 context
921 .config
922 .register_table_endpoint(table_ident.namespace()),
923 )
924 .json(&RegisterTableRequest {
925 name: table_ident.name.clone(),
926 metadata_location: metadata_location.clone(),
927 overwrite: Some(false),
928 })
929 .build()?;
930
931 let http_response = context.client.query_catalog(request).await?;
932
933 let response: LoadTableResult = match http_response.status() {
934 StatusCode::OK => {
935 deserialize_catalog_response::<LoadTableResult>(http_response).await?
936 }
937 StatusCode::NOT_FOUND => {
938 return Err(Error::new(
939 ErrorKind::NamespaceNotFound,
940 "The namespace specified does not exist.",
941 ));
942 }
943 StatusCode::CONFLICT => {
944 return Err(Error::new(
945 ErrorKind::TableAlreadyExists,
946 "The given table already exists.",
947 ));
948 }
949 _ => {
950 return Err(deserialize_unexpected_catalog_error(
951 http_response,
952 context.client.disable_header_redaction(),
953 )
954 .await);
955 }
956 };
957
958 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
959 ErrorKind::DataInvalid,
960 "Metadata location missing in `register_table` response!",
961 ))?;
962
963 let file_io = self.load_file_io(Some(metadata_location), None).await?;
964
965 Table::builder()
966 .identifier(table_ident.clone())
967 .file_io(file_io)
968 .metadata(response.metadata)
969 .metadata_location(metadata_location.clone())
970 .build()
971 }
972
973 async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
974 let context = self.context().await?;
975
976 let request = context
977 .client
978 .request(
979 Method::POST,
980 context.config.table_endpoint(commit.identifier()),
981 )
982 .json(&CommitTableRequest {
983 identifier: Some(commit.identifier().clone()),
984 requirements: commit.take_requirements(),
985 updates: commit.take_updates(),
986 })
987 .build()?;
988
989 let http_response = context.client.query_catalog(request).await?;
990
991 let response: CommitTableResponse = match http_response.status() {
992 StatusCode::OK => deserialize_catalog_response(http_response).await?,
993 StatusCode::NOT_FOUND => {
994 return Err(Error::new(
995 ErrorKind::TableNotFound,
996 "Tried to update a table that does not exist",
997 ));
998 }
999 StatusCode::CONFLICT => {
1000 return Err(Error::new(
1001 ErrorKind::CatalogCommitConflicts,
1002 "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
1003 )
1004 .with_retryable(true));
1005 }
1006 StatusCode::INTERNAL_SERVER_ERROR => {
1007 return Err(Error::new(
1008 ErrorKind::Unexpected,
1009 "An unknown server-side problem occurred; the commit state is unknown.",
1010 ));
1011 }
1012 StatusCode::BAD_GATEWAY => {
1013 return Err(Error::new(
1014 ErrorKind::Unexpected,
1015 "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1016 ));
1017 }
1018 StatusCode::GATEWAY_TIMEOUT => {
1019 return Err(Error::new(
1020 ErrorKind::Unexpected,
1021 "A server-side gateway timeout occurred; the commit state is unknown.",
1022 ));
1023 }
1024 _ => {
1025 return Err(deserialize_unexpected_catalog_error(
1026 http_response,
1027 context.client.disable_header_redaction(),
1028 )
1029 .await);
1030 }
1031 };
1032
1033 let file_io = self
1034 .load_file_io(Some(&response.metadata_location), None)
1035 .await?;
1036
1037 Table::builder()
1038 .identifier(commit.identifier().clone())
1039 .file_io(file_io)
1040 .metadata(response.metadata)
1041 .metadata_location(response.metadata_location)
1042 .build()
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use std::fs::File;
1049 use std::io::BufReader;
1050 use std::sync::Arc;
1051
1052 use chrono::{TimeZone, Utc};
1053 use iceberg::io::LocalFsStorageFactory;
1054 use iceberg::spec::{
1055 FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1056 SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1057 UnboundPartitionField, UnboundPartitionSpec,
1058 };
1059 use iceberg::transaction::{ApplyTransactionAction, Transaction};
1060 use mockito::{Mock, Server, ServerGuard};
1061 use serde_json::json;
1062 use uuid::uuid;
1063
1064 use super::*;
1065
1066 #[tokio::test]
1067 async fn test_update_config() {
1068 let mut server = Server::new_async().await;
1069
1070 let config_mock = server
1071 .mock("GET", "/v1/config")
1072 .with_status(200)
1073 .with_body(
1074 r#"{
1075 "overrides": {
1076 "warehouse": "s3://iceberg-catalog"
1077 },
1078 "defaults": {}
1079 }"#,
1080 )
1081 .create_async()
1082 .await;
1083
1084 let catalog = RestCatalog::new(
1085 RestCatalogConfig::builder().uri(server.url()).build(),
1086 Some(Arc::new(LocalFsStorageFactory)),
1087 );
1088
1089 assert_eq!(
1090 catalog
1091 .context()
1092 .await
1093 .unwrap()
1094 .config
1095 .props
1096 .get("warehouse"),
1097 Some(&"s3://iceberg-catalog".to_string())
1098 );
1099
1100 config_mock.assert_async().await;
1101 }
1102
1103 async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1104 server
1105 .mock("GET", "/v1/config")
1106 .with_status(200)
1107 .with_body(
1108 r#"{
1109 "overrides": {
1110 "warehouse": "s3://iceberg-catalog"
1111 },
1112 "defaults": {}
1113 }"#,
1114 )
1115 .create_async()
1116 .await
1117 }
1118
1119 async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1120 create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1121 }
1122
1123 async fn create_oauth_mock_with_path(
1124 server: &mut ServerGuard,
1125 path: &str,
1126 token: &str,
1127 status: usize,
1128 ) -> Mock {
1129 let body = format!(
1130 r#"{{
1131 "access_token": "{token}",
1132 "token_type": "Bearer",
1133 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1134 "expires_in": 86400
1135 }}"#
1136 );
1137 server
1138 .mock("POST", path)
1139 .with_status(status)
1140 .with_body(body)
1141 .expect(1)
1142 .create_async()
1143 .await
1144 }
1145
1146 #[tokio::test]
1147 async fn test_oauth() {
1148 let mut server = Server::new_async().await;
1149 let oauth_mock = create_oauth_mock(&mut server).await;
1150 let config_mock = create_config_mock(&mut server).await;
1151
1152 let mut props = HashMap::new();
1153 props.insert("credential".to_string(), "client1:secret1".to_string());
1154
1155 let catalog = RestCatalog::new(
1156 RestCatalogConfig::builder()
1157 .uri(server.url())
1158 .props(props)
1159 .build(),
1160 Some(Arc::new(LocalFsStorageFactory)),
1161 );
1162
1163 let token = catalog.context().await.unwrap().client.token().await;
1164 oauth_mock.assert_async().await;
1165 config_mock.assert_async().await;
1166 assert_eq!(token, Some("ey000000000000".to_string()));
1167 }
1168
1169 #[tokio::test]
1170 async fn test_oauth_with_optional_param() {
1171 let mut props = HashMap::new();
1172 props.insert("credential".to_string(), "client1:secret1".to_string());
1173 props.insert("scope".to_string(), "custom_scope".to_string());
1174 props.insert("audience".to_string(), "custom_audience".to_string());
1175 props.insert("resource".to_string(), "custom_resource".to_string());
1176
1177 let mut server = Server::new_async().await;
1178 let oauth_mock = server
1179 .mock("POST", "/v1/oauth/tokens")
1180 .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1181 .match_body(mockito::Matcher::Regex(
1182 "audience=custom_audience".to_string(),
1183 ))
1184 .match_body(mockito::Matcher::Regex(
1185 "resource=custom_resource".to_string(),
1186 ))
1187 .with_status(200)
1188 .with_body(
1189 r#"{
1190 "access_token": "ey000000000000",
1191 "token_type": "Bearer",
1192 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1193 "expires_in": 86400
1194 }"#,
1195 )
1196 .expect(1)
1197 .create_async()
1198 .await;
1199
1200 let config_mock = create_config_mock(&mut server).await;
1201
1202 let catalog = RestCatalog::new(
1203 RestCatalogConfig::builder()
1204 .uri(server.url())
1205 .props(props)
1206 .build(),
1207 Some(Arc::new(LocalFsStorageFactory)),
1208 );
1209
1210 let token = catalog.context().await.unwrap().client.token().await;
1211
1212 oauth_mock.assert_async().await;
1213 config_mock.assert_async().await;
1214 assert_eq!(token, Some("ey000000000000".to_string()));
1215 }
1216
1217 #[tokio::test]
1218 async fn test_invalidate_token() {
1219 let mut server = Server::new_async().await;
1220 let oauth_mock = create_oauth_mock(&mut server).await;
1221 let config_mock = create_config_mock(&mut server).await;
1222
1223 let mut props = HashMap::new();
1224 props.insert("credential".to_string(), "client1:secret1".to_string());
1225
1226 let catalog = RestCatalog::new(
1227 RestCatalogConfig::builder()
1228 .uri(server.url())
1229 .props(props)
1230 .build(),
1231 Some(Arc::new(LocalFsStorageFactory)),
1232 );
1233
1234 let token = catalog.context().await.unwrap().client.token().await;
1235 oauth_mock.assert_async().await;
1236 config_mock.assert_async().await;
1237 assert_eq!(token, Some("ey000000000000".to_string()));
1238
1239 let oauth_mock =
1240 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1241 .await;
1242 catalog.invalidate_token().await.unwrap();
1243 let token = catalog.context().await.unwrap().client.token().await;
1244 oauth_mock.assert_async().await;
1245 assert_eq!(token, Some("ey000000000001".to_string()));
1246 }
1247
1248 #[tokio::test]
1249 async fn test_invalidate_token_failing_request() {
1250 let mut server = Server::new_async().await;
1251 let oauth_mock = create_oauth_mock(&mut server).await;
1252 let config_mock = create_config_mock(&mut server).await;
1253
1254 let mut props = HashMap::new();
1255 props.insert("credential".to_string(), "client1:secret1".to_string());
1256
1257 let catalog = RestCatalog::new(
1258 RestCatalogConfig::builder()
1259 .uri(server.url())
1260 .props(props)
1261 .build(),
1262 Some(Arc::new(LocalFsStorageFactory)),
1263 );
1264
1265 let token = catalog.context().await.unwrap().client.token().await;
1266 oauth_mock.assert_async().await;
1267 config_mock.assert_async().await;
1268 assert_eq!(token, Some("ey000000000000".to_string()));
1269
1270 let oauth_mock =
1271 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1272 .await;
1273 catalog.invalidate_token().await.unwrap();
1274 let token = catalog.context().await.unwrap().client.token().await;
1275 oauth_mock.assert_async().await;
1276 assert_eq!(token, None);
1277 }
1278
1279 #[tokio::test]
1280 async fn test_regenerate_token() {
1281 let mut server = Server::new_async().await;
1282 let oauth_mock = create_oauth_mock(&mut server).await;
1283 let config_mock = create_config_mock(&mut server).await;
1284
1285 let mut props = HashMap::new();
1286 props.insert("credential".to_string(), "client1:secret1".to_string());
1287
1288 let catalog = RestCatalog::new(
1289 RestCatalogConfig::builder()
1290 .uri(server.url())
1291 .props(props)
1292 .build(),
1293 Some(Arc::new(LocalFsStorageFactory)),
1294 );
1295
1296 let token = catalog.context().await.unwrap().client.token().await;
1297 oauth_mock.assert_async().await;
1298 config_mock.assert_async().await;
1299 assert_eq!(token, Some("ey000000000000".to_string()));
1300
1301 let oauth_mock =
1302 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1303 .await;
1304 catalog.regenerate_token().await.unwrap();
1305 oauth_mock.assert_async().await;
1306 let token = catalog.context().await.unwrap().client.token().await;
1307 assert_eq!(token, Some("ey000000000001".to_string()));
1308 }
1309
1310 #[tokio::test]
1311 async fn test_regenerate_token_failing_request() {
1312 let mut server = Server::new_async().await;
1313 let oauth_mock = create_oauth_mock(&mut server).await;
1314 let config_mock = create_config_mock(&mut server).await;
1315
1316 let mut props = HashMap::new();
1317 props.insert("credential".to_string(), "client1:secret1".to_string());
1318
1319 let catalog = RestCatalog::new(
1320 RestCatalogConfig::builder()
1321 .uri(server.url())
1322 .props(props)
1323 .build(),
1324 Some(Arc::new(LocalFsStorageFactory)),
1325 );
1326
1327 let token = catalog.context().await.unwrap().client.token().await;
1328 oauth_mock.assert_async().await;
1329 config_mock.assert_async().await;
1330 assert_eq!(token, Some("ey000000000000".to_string()));
1331
1332 let oauth_mock =
1333 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1334 .await;
1335 let invalidate_result = catalog.regenerate_token().await;
1336 assert!(invalidate_result.is_err());
1337 oauth_mock.assert_async().await;
1338 let token = catalog.context().await.unwrap().client.token().await;
1339
1340 assert_eq!(token, Some("ey000000000000".to_string()));
1342 }
1343
1344 #[tokio::test]
1345 async fn test_http_headers() {
1346 let server = Server::new_async().await;
1347 let mut props = HashMap::new();
1348 props.insert("credential".to_string(), "client1:secret1".to_string());
1349
1350 let config = RestCatalogConfig::builder()
1351 .uri(server.url())
1352 .props(props)
1353 .build();
1354 let headers: HeaderMap = config.extra_headers().unwrap();
1355
1356 let expected_headers = HeaderMap::from_iter([
1357 (
1358 header::CONTENT_TYPE,
1359 HeaderValue::from_static("application/json"),
1360 ),
1361 (
1362 HeaderName::from_static("x-client-version"),
1363 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1364 ),
1365 (
1366 header::USER_AGENT,
1367 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1368 ),
1369 ]);
1370 assert_eq!(headers, expected_headers);
1371 }
1372
1373 #[tokio::test]
1374 async fn test_http_headers_with_custom_headers() {
1375 let server = Server::new_async().await;
1376 let mut props = HashMap::new();
1377 props.insert("credential".to_string(), "client1:secret1".to_string());
1378 props.insert(
1379 "header.content-type".to_string(),
1380 "application/yaml".to_string(),
1381 );
1382 props.insert(
1383 "header.customized-header".to_string(),
1384 "some/value".to_string(),
1385 );
1386
1387 let config = RestCatalogConfig::builder()
1388 .uri(server.url())
1389 .props(props)
1390 .build();
1391 let headers: HeaderMap = config.extra_headers().unwrap();
1392
1393 let expected_headers = HeaderMap::from_iter([
1394 (
1395 header::CONTENT_TYPE,
1396 HeaderValue::from_static("application/yaml"),
1397 ),
1398 (
1399 HeaderName::from_static("x-client-version"),
1400 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1401 ),
1402 (
1403 header::USER_AGENT,
1404 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1405 ),
1406 (
1407 HeaderName::from_static("customized-header"),
1408 HeaderValue::from_static("some/value"),
1409 ),
1410 ]);
1411 assert_eq!(headers, expected_headers);
1412 }
1413
1414 #[tokio::test]
1415 async fn test_oauth_with_oauth2_server_uri() {
1416 let mut server = Server::new_async().await;
1417 let config_mock = create_config_mock(&mut server).await;
1418
1419 let mut auth_server = Server::new_async().await;
1420 let auth_server_path = "/some/path";
1421 let oauth_mock =
1422 create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1423 .await;
1424
1425 let mut props = HashMap::new();
1426 props.insert("credential".to_string(), "client1:secret1".to_string());
1427 props.insert(
1428 "oauth2-server-uri".to_string(),
1429 format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1430 );
1431
1432 let catalog = RestCatalog::new(
1433 RestCatalogConfig::builder()
1434 .uri(server.url())
1435 .props(props)
1436 .build(),
1437 Some(Arc::new(LocalFsStorageFactory)),
1438 );
1439
1440 let token = catalog.context().await.unwrap().client.token().await;
1441
1442 oauth_mock.assert_async().await;
1443 config_mock.assert_async().await;
1444 assert_eq!(token, Some("ey000000000000".to_string()));
1445 }
1446
1447 #[tokio::test]
1448 async fn test_config_override() {
1449 let mut server = Server::new_async().await;
1450 let mut redirect_server = Server::new_async().await;
1451 let new_uri = redirect_server.url();
1452
1453 let config_mock = server
1454 .mock("GET", "/v1/config")
1455 .with_status(200)
1456 .with_body(
1457 json!(
1458 {
1459 "overrides": {
1460 "uri": new_uri,
1461 "warehouse": "s3://iceberg-catalog",
1462 "prefix": "ice/warehouses/my"
1463 },
1464 "defaults": {},
1465 }
1466 )
1467 .to_string(),
1468 )
1469 .create_async()
1470 .await;
1471
1472 let list_ns_mock = redirect_server
1473 .mock("GET", "/v1/ice/warehouses/my/namespaces")
1474 .with_body(
1475 r#"{
1476 "namespaces": []
1477 }"#,
1478 )
1479 .create_async()
1480 .await;
1481
1482 let catalog = RestCatalog::new(
1483 RestCatalogConfig::builder().uri(server.url()).build(),
1484 Some(Arc::new(LocalFsStorageFactory)),
1485 );
1486
1487 let _namespaces = catalog.list_namespaces(None).await.unwrap();
1488
1489 config_mock.assert_async().await;
1490 list_ns_mock.assert_async().await;
1491 }
1492
1493 #[tokio::test]
1494 async fn test_list_namespace() {
1495 let mut server = Server::new_async().await;
1496
1497 let config_mock = create_config_mock(&mut server).await;
1498
1499 let list_ns_mock = server
1500 .mock("GET", "/v1/namespaces")
1501 .with_body(
1502 r#"{
1503 "namespaces": [
1504 ["ns1", "ns11"],
1505 ["ns2"]
1506 ]
1507 }"#,
1508 )
1509 .create_async()
1510 .await;
1511
1512 let catalog = RestCatalog::new(
1513 RestCatalogConfig::builder().uri(server.url()).build(),
1514 Some(Arc::new(LocalFsStorageFactory)),
1515 );
1516
1517 let namespaces = catalog.list_namespaces(None).await.unwrap();
1518
1519 let expected_ns = vec![
1520 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1521 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1522 ];
1523
1524 assert_eq!(expected_ns, namespaces);
1525
1526 config_mock.assert_async().await;
1527 list_ns_mock.assert_async().await;
1528 }
1529
1530 #[tokio::test]
1531 async fn test_list_namespace_with_pagination() {
1532 let mut server = Server::new_async().await;
1533
1534 let config_mock = create_config_mock(&mut server).await;
1535
1536 let list_ns_mock_page1 = server
1537 .mock("GET", "/v1/namespaces")
1538 .with_body(
1539 r#"{
1540 "namespaces": [
1541 ["ns1", "ns11"],
1542 ["ns2"]
1543 ],
1544 "next-page-token": "token123"
1545 }"#,
1546 )
1547 .create_async()
1548 .await;
1549
1550 let list_ns_mock_page2 = server
1551 .mock("GET", "/v1/namespaces?pageToken=token123")
1552 .with_body(
1553 r#"{
1554 "namespaces": [
1555 ["ns3"],
1556 ["ns4", "ns41"]
1557 ]
1558 }"#,
1559 )
1560 .create_async()
1561 .await;
1562
1563 let catalog = RestCatalog::new(
1564 RestCatalogConfig::builder().uri(server.url()).build(),
1565 Some(Arc::new(LocalFsStorageFactory)),
1566 );
1567
1568 let namespaces = catalog.list_namespaces(None).await.unwrap();
1569
1570 let expected_ns = vec![
1571 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1572 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1573 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1574 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1575 ];
1576
1577 assert_eq!(expected_ns, namespaces);
1578
1579 config_mock.assert_async().await;
1580 list_ns_mock_page1.assert_async().await;
1581 list_ns_mock_page2.assert_async().await;
1582 }
1583
1584 #[tokio::test]
1585 async fn test_list_namespace_with_multiple_pages() {
1586 let mut server = Server::new_async().await;
1587
1588 let config_mock = create_config_mock(&mut server).await;
1589
1590 let list_ns_mock_page1 = server
1592 .mock("GET", "/v1/namespaces")
1593 .with_body(
1594 r#"{
1595 "namespaces": [
1596 ["ns1", "ns11"],
1597 ["ns2"]
1598 ],
1599 "next-page-token": "page2"
1600 }"#,
1601 )
1602 .create_async()
1603 .await;
1604
1605 let list_ns_mock_page2 = server
1607 .mock("GET", "/v1/namespaces?pageToken=page2")
1608 .with_body(
1609 r#"{
1610 "namespaces": [
1611 ["ns3"],
1612 ["ns4", "ns41"]
1613 ],
1614 "next-page-token": "page3"
1615 }"#,
1616 )
1617 .create_async()
1618 .await;
1619
1620 let list_ns_mock_page3 = server
1622 .mock("GET", "/v1/namespaces?pageToken=page3")
1623 .with_body(
1624 r#"{
1625 "namespaces": [
1626 ["ns5", "ns51", "ns511"]
1627 ],
1628 "next-page-token": "page4"
1629 }"#,
1630 )
1631 .create_async()
1632 .await;
1633
1634 let list_ns_mock_page4 = server
1636 .mock("GET", "/v1/namespaces?pageToken=page4")
1637 .with_body(
1638 r#"{
1639 "namespaces": [
1640 ["ns6"],
1641 ["ns7"]
1642 ],
1643 "next-page-token": "page5"
1644 }"#,
1645 )
1646 .create_async()
1647 .await;
1648
1649 let list_ns_mock_page5 = server
1651 .mock("GET", "/v1/namespaces?pageToken=page5")
1652 .with_body(
1653 r#"{
1654 "namespaces": [
1655 ["ns8", "ns81"]
1656 ]
1657 }"#,
1658 )
1659 .create_async()
1660 .await;
1661
1662 let catalog = RestCatalog::new(
1663 RestCatalogConfig::builder().uri(server.url()).build(),
1664 Some(Arc::new(LocalFsStorageFactory)),
1665 );
1666
1667 let namespaces = catalog.list_namespaces(None).await.unwrap();
1668
1669 let expected_ns = vec![
1670 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1671 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1672 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1673 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1674 NamespaceIdent::from_vec(vec![
1675 "ns5".to_string(),
1676 "ns51".to_string(),
1677 "ns511".to_string(),
1678 ])
1679 .unwrap(),
1680 NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1681 NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1682 NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1683 ];
1684
1685 assert_eq!(expected_ns, namespaces);
1686
1687 config_mock.assert_async().await;
1689 list_ns_mock_page1.assert_async().await;
1690 list_ns_mock_page2.assert_async().await;
1691 list_ns_mock_page3.assert_async().await;
1692 list_ns_mock_page4.assert_async().await;
1693 list_ns_mock_page5.assert_async().await;
1694 }
1695
1696 #[tokio::test]
1697 async fn test_create_namespace() {
1698 let mut server = Server::new_async().await;
1699
1700 let config_mock = create_config_mock(&mut server).await;
1701
1702 let create_ns_mock = server
1703 .mock("POST", "/v1/namespaces")
1704 .with_body(
1705 r#"{
1706 "namespace": [ "ns1", "ns11"],
1707 "properties" : {
1708 "key1": "value1"
1709 }
1710 }"#,
1711 )
1712 .create_async()
1713 .await;
1714
1715 let catalog = RestCatalog::new(
1716 RestCatalogConfig::builder().uri(server.url()).build(),
1717 Some(Arc::new(LocalFsStorageFactory)),
1718 );
1719
1720 let namespaces = catalog
1721 .create_namespace(
1722 &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1723 HashMap::from([("key1".to_string(), "value1".to_string())]),
1724 )
1725 .await
1726 .unwrap();
1727
1728 let expected_ns = Namespace::with_properties(
1729 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1730 HashMap::from([("key1".to_string(), "value1".to_string())]),
1731 );
1732
1733 assert_eq!(expected_ns, namespaces);
1734
1735 config_mock.assert_async().await;
1736 create_ns_mock.assert_async().await;
1737 }
1738
1739 #[tokio::test]
1740 async fn test_get_namespace() {
1741 let mut server = Server::new_async().await;
1742
1743 let config_mock = create_config_mock(&mut server).await;
1744
1745 let get_ns_mock = server
1746 .mock("GET", "/v1/namespaces/ns1")
1747 .with_body(
1748 r#"{
1749 "namespace": [ "ns1"],
1750 "properties" : {
1751 "key1": "value1"
1752 }
1753 }"#,
1754 )
1755 .create_async()
1756 .await;
1757
1758 let catalog = RestCatalog::new(
1759 RestCatalogConfig::builder().uri(server.url()).build(),
1760 Some(Arc::new(LocalFsStorageFactory)),
1761 );
1762
1763 let namespaces = catalog
1764 .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1765 .await
1766 .unwrap();
1767
1768 let expected_ns = Namespace::with_properties(
1769 NamespaceIdent::new("ns1".to_string()),
1770 HashMap::from([("key1".to_string(), "value1".to_string())]),
1771 );
1772
1773 assert_eq!(expected_ns, namespaces);
1774
1775 config_mock.assert_async().await;
1776 get_ns_mock.assert_async().await;
1777 }
1778
1779 #[tokio::test]
1780 async fn check_namespace_exists() {
1781 let mut server = Server::new_async().await;
1782
1783 let config_mock = create_config_mock(&mut server).await;
1784
1785 let get_ns_mock = server
1786 .mock("HEAD", "/v1/namespaces/ns1")
1787 .with_status(204)
1788 .create_async()
1789 .await;
1790
1791 let catalog = RestCatalog::new(
1792 RestCatalogConfig::builder().uri(server.url()).build(),
1793 Some(Arc::new(LocalFsStorageFactory)),
1794 );
1795
1796 assert!(
1797 catalog
1798 .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1799 .await
1800 .unwrap()
1801 );
1802
1803 config_mock.assert_async().await;
1804 get_ns_mock.assert_async().await;
1805 }
1806
1807 #[tokio::test]
1808 async fn test_drop_namespace() {
1809 let mut server = Server::new_async().await;
1810
1811 let config_mock = create_config_mock(&mut server).await;
1812
1813 let drop_ns_mock = server
1814 .mock("DELETE", "/v1/namespaces/ns1")
1815 .with_status(204)
1816 .create_async()
1817 .await;
1818
1819 let catalog = RestCatalog::new(
1820 RestCatalogConfig::builder().uri(server.url()).build(),
1821 Some(Arc::new(LocalFsStorageFactory)),
1822 );
1823
1824 catalog
1825 .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1826 .await
1827 .unwrap();
1828
1829 config_mock.assert_async().await;
1830 drop_ns_mock.assert_async().await;
1831 }
1832
1833 #[tokio::test]
1834 async fn test_list_tables() {
1835 let mut server = Server::new_async().await;
1836
1837 let config_mock = create_config_mock(&mut server).await;
1838
1839 let list_tables_mock = server
1840 .mock("GET", "/v1/namespaces/ns1/tables")
1841 .with_status(200)
1842 .with_body(
1843 r#"{
1844 "identifiers": [
1845 {
1846 "namespace": ["ns1"],
1847 "name": "table1"
1848 },
1849 {
1850 "namespace": ["ns1"],
1851 "name": "table2"
1852 }
1853 ]
1854 }"#,
1855 )
1856 .create_async()
1857 .await;
1858
1859 let catalog = RestCatalog::new(
1860 RestCatalogConfig::builder().uri(server.url()).build(),
1861 Some(Arc::new(LocalFsStorageFactory)),
1862 );
1863
1864 let tables = catalog
1865 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1866 .await
1867 .unwrap();
1868
1869 let expected_tables = vec![
1870 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1871 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1872 ];
1873
1874 assert_eq!(tables, expected_tables);
1875
1876 config_mock.assert_async().await;
1877 list_tables_mock.assert_async().await;
1878 }
1879
1880 #[tokio::test]
1881 async fn test_list_tables_with_pagination() {
1882 let mut server = Server::new_async().await;
1883
1884 let config_mock = create_config_mock(&mut server).await;
1885
1886 let list_tables_mock_page1 = server
1887 .mock("GET", "/v1/namespaces/ns1/tables")
1888 .with_status(200)
1889 .with_body(
1890 r#"{
1891 "identifiers": [
1892 {
1893 "namespace": ["ns1"],
1894 "name": "table1"
1895 },
1896 {
1897 "namespace": ["ns1"],
1898 "name": "table2"
1899 }
1900 ],
1901 "next-page-token": "token456"
1902 }"#,
1903 )
1904 .create_async()
1905 .await;
1906
1907 let list_tables_mock_page2 = server
1908 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1909 .with_status(200)
1910 .with_body(
1911 r#"{
1912 "identifiers": [
1913 {
1914 "namespace": ["ns1"],
1915 "name": "table3"
1916 },
1917 {
1918 "namespace": ["ns1"],
1919 "name": "table4"
1920 }
1921 ]
1922 }"#,
1923 )
1924 .create_async()
1925 .await;
1926
1927 let catalog = RestCatalog::new(
1928 RestCatalogConfig::builder().uri(server.url()).build(),
1929 Some(Arc::new(LocalFsStorageFactory)),
1930 );
1931
1932 let tables = catalog
1933 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1934 .await
1935 .unwrap();
1936
1937 let expected_tables = vec![
1938 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1939 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1940 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1941 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1942 ];
1943
1944 assert_eq!(tables, expected_tables);
1945
1946 config_mock.assert_async().await;
1947 list_tables_mock_page1.assert_async().await;
1948 list_tables_mock_page2.assert_async().await;
1949 }
1950
1951 #[tokio::test]
1952 async fn test_list_tables_with_multiple_pages() {
1953 let mut server = Server::new_async().await;
1954
1955 let config_mock = create_config_mock(&mut server).await;
1956
1957 let list_tables_mock_page1 = server
1959 .mock("GET", "/v1/namespaces/ns1/tables")
1960 .with_status(200)
1961 .with_body(
1962 r#"{
1963 "identifiers": [
1964 {
1965 "namespace": ["ns1"],
1966 "name": "table1"
1967 },
1968 {
1969 "namespace": ["ns1"],
1970 "name": "table2"
1971 }
1972 ],
1973 "next-page-token": "page2"
1974 }"#,
1975 )
1976 .create_async()
1977 .await;
1978
1979 let list_tables_mock_page2 = server
1981 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1982 .with_status(200)
1983 .with_body(
1984 r#"{
1985 "identifiers": [
1986 {
1987 "namespace": ["ns1"],
1988 "name": "table3"
1989 },
1990 {
1991 "namespace": ["ns1"],
1992 "name": "table4"
1993 }
1994 ],
1995 "next-page-token": "page3"
1996 }"#,
1997 )
1998 .create_async()
1999 .await;
2000
2001 let list_tables_mock_page3 = server
2003 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
2004 .with_status(200)
2005 .with_body(
2006 r#"{
2007 "identifiers": [
2008 {
2009 "namespace": ["ns1"],
2010 "name": "table5"
2011 }
2012 ],
2013 "next-page-token": "page4"
2014 }"#,
2015 )
2016 .create_async()
2017 .await;
2018
2019 let list_tables_mock_page4 = server
2021 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
2022 .with_status(200)
2023 .with_body(
2024 r#"{
2025 "identifiers": [
2026 {
2027 "namespace": ["ns1"],
2028 "name": "table6"
2029 },
2030 {
2031 "namespace": ["ns1"],
2032 "name": "table7"
2033 }
2034 ],
2035 "next-page-token": "page5"
2036 }"#,
2037 )
2038 .create_async()
2039 .await;
2040
2041 let list_tables_mock_page5 = server
2043 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
2044 .with_status(200)
2045 .with_body(
2046 r#"{
2047 "identifiers": [
2048 {
2049 "namespace": ["ns1"],
2050 "name": "table8"
2051 }
2052 ]
2053 }"#,
2054 )
2055 .create_async()
2056 .await;
2057
2058 let catalog = RestCatalog::new(
2059 RestCatalogConfig::builder().uri(server.url()).build(),
2060 Some(Arc::new(LocalFsStorageFactory)),
2061 );
2062
2063 let tables = catalog
2064 .list_tables(&NamespaceIdent::new("ns1".to_string()))
2065 .await
2066 .unwrap();
2067
2068 let expected_tables = vec![
2069 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2070 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2071 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2072 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2073 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2074 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2075 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2076 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2077 ];
2078
2079 assert_eq!(tables, expected_tables);
2080
2081 config_mock.assert_async().await;
2083 list_tables_mock_page1.assert_async().await;
2084 list_tables_mock_page2.assert_async().await;
2085 list_tables_mock_page3.assert_async().await;
2086 list_tables_mock_page4.assert_async().await;
2087 list_tables_mock_page5.assert_async().await;
2088 }
2089
2090 #[tokio::test]
2091 async fn test_drop_tables() {
2092 let mut server = Server::new_async().await;
2093
2094 let config_mock = create_config_mock(&mut server).await;
2095
2096 let delete_table_mock = server
2097 .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2098 .with_status(204)
2099 .create_async()
2100 .await;
2101
2102 let catalog = RestCatalog::new(
2103 RestCatalogConfig::builder().uri(server.url()).build(),
2104 Some(Arc::new(LocalFsStorageFactory)),
2105 );
2106
2107 catalog
2108 .drop_table(&TableIdent::new(
2109 NamespaceIdent::new("ns1".to_string()),
2110 "table1".to_string(),
2111 ))
2112 .await
2113 .unwrap();
2114
2115 config_mock.assert_async().await;
2116 delete_table_mock.assert_async().await;
2117 }
2118
2119 #[tokio::test]
2120 async fn test_check_table_exists() {
2121 let mut server = Server::new_async().await;
2122
2123 let config_mock = create_config_mock(&mut server).await;
2124
2125 let check_table_exists_mock = server
2126 .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2127 .with_status(204)
2128 .create_async()
2129 .await;
2130
2131 let catalog = RestCatalog::new(
2132 RestCatalogConfig::builder().uri(server.url()).build(),
2133 Some(Arc::new(LocalFsStorageFactory)),
2134 );
2135
2136 assert!(
2137 catalog
2138 .table_exists(&TableIdent::new(
2139 NamespaceIdent::new("ns1".to_string()),
2140 "table1".to_string(),
2141 ))
2142 .await
2143 .unwrap()
2144 );
2145
2146 config_mock.assert_async().await;
2147 check_table_exists_mock.assert_async().await;
2148 }
2149
2150 #[tokio::test]
2151 async fn test_rename_table() {
2152 let mut server = Server::new_async().await;
2153
2154 let config_mock = create_config_mock(&mut server).await;
2155
2156 let rename_table_mock = server
2157 .mock("POST", "/v1/tables/rename")
2158 .with_status(204)
2159 .create_async()
2160 .await;
2161
2162 let catalog = RestCatalog::new(
2163 RestCatalogConfig::builder().uri(server.url()).build(),
2164 Some(Arc::new(LocalFsStorageFactory)),
2165 );
2166
2167 catalog
2168 .rename_table(
2169 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2170 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2171 )
2172 .await
2173 .unwrap();
2174
2175 config_mock.assert_async().await;
2176 rename_table_mock.assert_async().await;
2177 }
2178
2179 #[tokio::test]
2180 async fn test_load_table() {
2181 let mut server = Server::new_async().await;
2182
2183 let config_mock = create_config_mock(&mut server).await;
2184
2185 let rename_table_mock = server
2186 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2187 .with_status(200)
2188 .with_body_from_file(format!(
2189 "{}/testdata/{}",
2190 env!("CARGO_MANIFEST_DIR"),
2191 "load_table_response.json"
2192 ))
2193 .create_async()
2194 .await;
2195
2196 let catalog = RestCatalog::new(
2197 RestCatalogConfig::builder().uri(server.url()).build(),
2198 Some(Arc::new(LocalFsStorageFactory)),
2199 );
2200
2201 let table = catalog
2202 .load_table(&TableIdent::new(
2203 NamespaceIdent::new("ns1".to_string()),
2204 "test1".to_string(),
2205 ))
2206 .await
2207 .unwrap();
2208
2209 assert_eq!(
2210 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2211 table.identifier()
2212 );
2213 assert_eq!(
2214 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2215 table.metadata_location().unwrap()
2216 );
2217 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2218 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2219 assert_eq!(
2220 uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2221 table.metadata().uuid()
2222 );
2223 assert_eq!(
2224 Utc.timestamp_millis_opt(1646787054459).unwrap(),
2225 table.metadata().last_updated_timestamp().unwrap()
2226 );
2227 assert_eq!(
2228 vec![&Arc::new(
2229 Schema::builder()
2230 .with_fields(vec![
2231 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2232 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2233 .into(),
2234 ])
2235 .build()
2236 .unwrap()
2237 )],
2238 table.metadata().schemas_iter().collect::<Vec<_>>()
2239 );
2240 assert_eq!(
2241 &HashMap::from([
2242 ("owner".to_string(), "bryan".to_string()),
2243 (
2244 "write.metadata.compression-codec".to_string(),
2245 "gzip".to_string()
2246 )
2247 ]),
2248 table.metadata().properties()
2249 );
2250 assert_eq!(vec![&Arc::new(Snapshot::builder()
2251 .with_snapshot_id(3497810964824022504)
2252 .with_timestamp_ms(1646787054459)
2253 .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2254 .with_sequence_number(0)
2255 .with_schema_id(0)
2256 .with_summary(Summary {
2257 operation: Operation::Append,
2258 additional_properties: HashMap::from_iter([
2259 ("spark.app.id", "local-1646787004168"),
2260 ("added-data-files", "1"),
2261 ("added-records", "1"),
2262 ("added-files-size", "697"),
2263 ("changed-partition-count", "1"),
2264 ("total-records", "1"),
2265 ("total-files-size", "697"),
2266 ("total-data-files", "1"),
2267 ("total-delete-files", "0"),
2268 ("total-position-deletes", "0"),
2269 ("total-equality-deletes", "0")
2270 ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2271 }).build()
2272 )], table.metadata().snapshots().collect::<Vec<_>>());
2273 assert_eq!(
2274 &[SnapshotLog {
2275 timestamp_ms: 1646787054459,
2276 snapshot_id: 3497810964824022504,
2277 }],
2278 table.metadata().history()
2279 );
2280 assert_eq!(
2281 vec![&Arc::new(SortOrder {
2282 order_id: 0,
2283 fields: vec![],
2284 })],
2285 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2286 );
2287
2288 config_mock.assert_async().await;
2289 rename_table_mock.assert_async().await;
2290 }
2291
2292 #[tokio::test]
2293 async fn test_load_table_404() {
2294 let mut server = Server::new_async().await;
2295
2296 let config_mock = create_config_mock(&mut server).await;
2297
2298 let rename_table_mock = server
2299 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2300 .with_status(404)
2301 .with_body(r#"
2302{
2303 "error": {
2304 "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2305 "type": "NoSuchNamespaceErrorException",
2306 "code": 404
2307 }
2308}
2309 "#)
2310 .create_async()
2311 .await;
2312
2313 let catalog = RestCatalog::new(
2314 RestCatalogConfig::builder().uri(server.url()).build(),
2315 Some(Arc::new(LocalFsStorageFactory)),
2316 );
2317
2318 let table = catalog
2319 .load_table(&TableIdent::new(
2320 NamespaceIdent::new("ns1".to_string()),
2321 "test1".to_string(),
2322 ))
2323 .await;
2324
2325 assert!(table.is_err());
2326 assert!(table.err().unwrap().message().contains("does not exist"));
2327
2328 config_mock.assert_async().await;
2329 rename_table_mock.assert_async().await;
2330 }
2331
2332 #[tokio::test]
2333 async fn test_create_table() {
2334 let mut server = Server::new_async().await;
2335
2336 let config_mock = create_config_mock(&mut server).await;
2337
2338 let create_table_mock = server
2339 .mock("POST", "/v1/namespaces/ns1/tables")
2340 .with_status(200)
2341 .with_body_from_file(format!(
2342 "{}/testdata/{}",
2343 env!("CARGO_MANIFEST_DIR"),
2344 "create_table_response.json"
2345 ))
2346 .create_async()
2347 .await;
2348
2349 let catalog = RestCatalog::new(
2350 RestCatalogConfig::builder().uri(server.url()).build(),
2351 Some(Arc::new(LocalFsStorageFactory)),
2352 );
2353
2354 let table_creation = TableCreation::builder()
2355 .name("test1".to_string())
2356 .schema(
2357 Schema::builder()
2358 .with_fields(vec![
2359 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2360 .into(),
2361 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2362 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2363 .into(),
2364 ])
2365 .with_schema_id(1)
2366 .with_identifier_field_ids(vec![2])
2367 .build()
2368 .unwrap(),
2369 )
2370 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2371 .partition_spec(
2372 UnboundPartitionSpec::builder()
2373 .add_partition_fields(vec![
2374 UnboundPartitionField::builder()
2375 .source_id(1)
2376 .transform(Transform::Truncate(3))
2377 .name("id".to_string())
2378 .build(),
2379 ])
2380 .unwrap()
2381 .build(),
2382 )
2383 .sort_order(
2384 SortOrder::builder()
2385 .with_sort_field(
2386 SortField::builder()
2387 .source_id(2)
2388 .transform(Transform::Identity)
2389 .direction(SortDirection::Ascending)
2390 .null_order(NullOrder::First)
2391 .build(),
2392 )
2393 .build_unbound()
2394 .unwrap(),
2395 )
2396 .build();
2397
2398 let table = catalog
2399 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2400 .await
2401 .unwrap();
2402
2403 assert_eq!(
2404 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2405 table.identifier()
2406 );
2407 assert_eq!(
2408 "s3://warehouse/database/table/metadata.json",
2409 table.metadata_location().unwrap()
2410 );
2411 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2412 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2413 assert_eq!(
2414 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2415 table.metadata().uuid()
2416 );
2417 assert_eq!(
2418 1657810967051,
2419 table
2420 .metadata()
2421 .last_updated_timestamp()
2422 .unwrap()
2423 .timestamp_millis()
2424 );
2425 assert_eq!(
2426 vec![&Arc::new(
2427 Schema::builder()
2428 .with_fields(vec![
2429 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2430 .into(),
2431 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2432 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2433 .into(),
2434 ])
2435 .with_schema_id(0)
2436 .with_identifier_field_ids(vec![2])
2437 .build()
2438 .unwrap()
2439 )],
2440 table.metadata().schemas_iter().collect::<Vec<_>>()
2441 );
2442 assert_eq!(
2443 &HashMap::from([
2444 (
2445 "write.delete.parquet.compression-codec".to_string(),
2446 "zstd".to_string()
2447 ),
2448 (
2449 "write.metadata.compression-codec".to_string(),
2450 "gzip".to_string()
2451 ),
2452 (
2453 "write.summary.partition-limit".to_string(),
2454 "100".to_string()
2455 ),
2456 (
2457 "write.parquet.compression-codec".to_string(),
2458 "zstd".to_string()
2459 ),
2460 ]),
2461 table.metadata().properties()
2462 );
2463 assert!(table.metadata().current_snapshot().is_none());
2464 assert!(table.metadata().history().is_empty());
2465 assert_eq!(
2466 vec![&Arc::new(SortOrder {
2467 order_id: 0,
2468 fields: vec![],
2469 })],
2470 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2471 );
2472
2473 config_mock.assert_async().await;
2474 create_table_mock.assert_async().await;
2475 }
2476
2477 #[tokio::test]
2478 async fn test_create_table_409() {
2479 let mut server = Server::new_async().await;
2480
2481 let config_mock = create_config_mock(&mut server).await;
2482
2483 let create_table_mock = server
2484 .mock("POST", "/v1/namespaces/ns1/tables")
2485 .with_status(409)
2486 .with_body(r#"
2487{
2488 "error": {
2489 "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2490 "type": "AlreadyExistsException",
2491 "code": 409
2492 }
2493}
2494 "#)
2495 .create_async()
2496 .await;
2497
2498 let catalog = RestCatalog::new(
2499 RestCatalogConfig::builder().uri(server.url()).build(),
2500 Some(Arc::new(LocalFsStorageFactory)),
2501 );
2502
2503 let table_creation = TableCreation::builder()
2504 .name("test1".to_string())
2505 .schema(
2506 Schema::builder()
2507 .with_fields(vec![
2508 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2509 .into(),
2510 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2511 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2512 .into(),
2513 ])
2514 .with_schema_id(1)
2515 .with_identifier_field_ids(vec![2])
2516 .build()
2517 .unwrap(),
2518 )
2519 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2520 .build();
2521
2522 let table_result = catalog
2523 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2524 .await;
2525
2526 assert!(table_result.is_err());
2527 assert!(
2528 table_result
2529 .err()
2530 .unwrap()
2531 .message()
2532 .contains("already exists")
2533 );
2534
2535 config_mock.assert_async().await;
2536 create_table_mock.assert_async().await;
2537 }
2538
2539 #[tokio::test]
2540 async fn test_update_table() {
2541 let mut server = Server::new_async().await;
2542
2543 let config_mock = create_config_mock(&mut server).await;
2544
2545 let load_table_mock = server
2546 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2547 .with_status(200)
2548 .with_body_from_file(format!(
2549 "{}/testdata/{}",
2550 env!("CARGO_MANIFEST_DIR"),
2551 "load_table_response.json"
2552 ))
2553 .create_async()
2554 .await;
2555
2556 let update_table_mock = server
2557 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2558 .with_status(200)
2559 .with_body_from_file(format!(
2560 "{}/testdata/{}",
2561 env!("CARGO_MANIFEST_DIR"),
2562 "update_table_response.json"
2563 ))
2564 .create_async()
2565 .await;
2566
2567 let catalog = RestCatalog::new(
2568 RestCatalogConfig::builder().uri(server.url()).build(),
2569 Some(Arc::new(LocalFsStorageFactory)),
2570 );
2571
2572 let table1 = {
2573 let file = File::open(format!(
2574 "{}/testdata/{}",
2575 env!("CARGO_MANIFEST_DIR"),
2576 "create_table_response.json"
2577 ))
2578 .unwrap();
2579 let reader = BufReader::new(file);
2580 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2581
2582 Table::builder()
2583 .metadata(resp.metadata)
2584 .metadata_location(resp.metadata_location.unwrap())
2585 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2586 .file_io(FileIO::new_with_fs())
2587 .build()
2588 .unwrap()
2589 };
2590
2591 let tx = Transaction::new(&table1);
2592 let table = tx
2593 .upgrade_table_version()
2594 .set_format_version(FormatVersion::V2)
2595 .apply(tx)
2596 .unwrap()
2597 .commit(&catalog)
2598 .await
2599 .unwrap();
2600
2601 assert_eq!(
2602 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2603 table.identifier()
2604 );
2605 assert_eq!(
2606 "s3://warehouse/database/table/metadata.json",
2607 table.metadata_location().unwrap()
2608 );
2609 assert_eq!(FormatVersion::V2, table.metadata().format_version());
2610 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2611 assert_eq!(
2612 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2613 table.metadata().uuid()
2614 );
2615 assert_eq!(
2616 1657810967051,
2617 table
2618 .metadata()
2619 .last_updated_timestamp()
2620 .unwrap()
2621 .timestamp_millis()
2622 );
2623 assert_eq!(
2624 vec![&Arc::new(
2625 Schema::builder()
2626 .with_fields(vec![
2627 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2628 .into(),
2629 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2630 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2631 .into(),
2632 ])
2633 .with_schema_id(0)
2634 .with_identifier_field_ids(vec![2])
2635 .build()
2636 .unwrap()
2637 )],
2638 table.metadata().schemas_iter().collect::<Vec<_>>()
2639 );
2640 assert_eq!(
2641 &HashMap::from([
2642 (
2643 "write.delete.parquet.compression-codec".to_string(),
2644 "zstd".to_string()
2645 ),
2646 (
2647 "write.metadata.compression-codec".to_string(),
2648 "gzip".to_string()
2649 ),
2650 (
2651 "write.summary.partition-limit".to_string(),
2652 "100".to_string()
2653 ),
2654 (
2655 "write.parquet.compression-codec".to_string(),
2656 "zstd".to_string()
2657 ),
2658 ]),
2659 table.metadata().properties()
2660 );
2661 assert!(table.metadata().current_snapshot().is_none());
2662 assert!(table.metadata().history().is_empty());
2663 assert_eq!(
2664 vec![&Arc::new(SortOrder {
2665 order_id: 0,
2666 fields: vec![],
2667 })],
2668 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2669 );
2670
2671 config_mock.assert_async().await;
2672 update_table_mock.assert_async().await;
2673 load_table_mock.assert_async().await
2674 }
2675
2676 #[tokio::test]
2677 async fn test_update_table_404() {
2678 let mut server = Server::new_async().await;
2679
2680 let config_mock = create_config_mock(&mut server).await;
2681
2682 let load_table_mock = server
2683 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2684 .with_status(200)
2685 .with_body_from_file(format!(
2686 "{}/testdata/{}",
2687 env!("CARGO_MANIFEST_DIR"),
2688 "load_table_response.json"
2689 ))
2690 .create_async()
2691 .await;
2692
2693 let update_table_mock = server
2694 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2695 .with_status(404)
2696 .with_body(
2697 r#"
2698{
2699 "error": {
2700 "message": "The given table does not exist",
2701 "type": "NoSuchTableException",
2702 "code": 404
2703 }
2704}
2705 "#,
2706 )
2707 .create_async()
2708 .await;
2709
2710 let catalog = RestCatalog::new(
2711 RestCatalogConfig::builder().uri(server.url()).build(),
2712 Some(Arc::new(LocalFsStorageFactory)),
2713 );
2714
2715 let table1 = {
2716 let file = File::open(format!(
2717 "{}/testdata/{}",
2718 env!("CARGO_MANIFEST_DIR"),
2719 "create_table_response.json"
2720 ))
2721 .unwrap();
2722 let reader = BufReader::new(file);
2723 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2724
2725 Table::builder()
2726 .metadata(resp.metadata)
2727 .metadata_location(resp.metadata_location.unwrap())
2728 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2729 .file_io(FileIO::new_with_fs())
2730 .build()
2731 .unwrap()
2732 };
2733
2734 let tx = Transaction::new(&table1);
2735 let table_result = tx
2736 .upgrade_table_version()
2737 .set_format_version(FormatVersion::V2)
2738 .apply(tx)
2739 .unwrap()
2740 .commit(&catalog)
2741 .await;
2742
2743 assert!(table_result.is_err());
2744 assert!(
2745 table_result
2746 .err()
2747 .unwrap()
2748 .message()
2749 .contains("does not exist")
2750 );
2751
2752 config_mock.assert_async().await;
2753 update_table_mock.assert_async().await;
2754 load_table_mock.assert_async().await;
2755 }
2756
2757 #[tokio::test]
2758 async fn test_register_table() {
2759 let mut server = Server::new_async().await;
2760
2761 let config_mock = create_config_mock(&mut server).await;
2762
2763 let register_table_mock = server
2764 .mock("POST", "/v1/namespaces/ns1/register")
2765 .with_status(200)
2766 .with_body_from_file(format!(
2767 "{}/testdata/{}",
2768 env!("CARGO_MANIFEST_DIR"),
2769 "load_table_response.json"
2770 ))
2771 .create_async()
2772 .await;
2773
2774 let catalog = RestCatalog::new(
2775 RestCatalogConfig::builder().uri(server.url()).build(),
2776 Some(Arc::new(LocalFsStorageFactory)),
2777 );
2778 let table_ident =
2779 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2780 let metadata_location = String::from(
2781 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2782 );
2783
2784 let table = catalog
2785 .register_table(&table_ident, metadata_location)
2786 .await
2787 .unwrap();
2788
2789 assert_eq!(
2790 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2791 table.identifier()
2792 );
2793 assert_eq!(
2794 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2795 table.metadata_location().unwrap()
2796 );
2797
2798 config_mock.assert_async().await;
2799 register_table_mock.assert_async().await;
2800 }
2801
2802 #[tokio::test]
2803 async fn test_register_table_404() {
2804 let mut server = Server::new_async().await;
2805
2806 let config_mock = create_config_mock(&mut server).await;
2807
2808 let register_table_mock = server
2809 .mock("POST", "/v1/namespaces/ns1/register")
2810 .with_status(404)
2811 .with_body(
2812 r#"
2813{
2814 "error": {
2815 "message": "The namespace specified does not exist",
2816 "type": "NoSuchNamespaceErrorException",
2817 "code": 404
2818 }
2819}
2820 "#,
2821 )
2822 .create_async()
2823 .await;
2824
2825 let catalog = RestCatalog::new(
2826 RestCatalogConfig::builder().uri(server.url()).build(),
2827 Some(Arc::new(LocalFsStorageFactory)),
2828 );
2829
2830 let table_ident =
2831 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2832 let metadata_location = String::from(
2833 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2834 );
2835 let table = catalog
2836 .register_table(&table_ident, metadata_location)
2837 .await;
2838
2839 assert!(table.is_err());
2840 assert!(table.err().unwrap().message().contains("does not exist"));
2841
2842 config_mock.assert_async().await;
2843 register_table_mock.assert_async().await;
2844 }
2845
2846 #[tokio::test]
2847 async fn test_create_rest_catalog() {
2848 let builder = RestCatalogBuilder::default().with_client(Client::new());
2849
2850 let catalog = builder
2851 .load(
2852 "test",
2853 HashMap::from([
2854 (
2855 REST_CATALOG_PROP_URI.to_string(),
2856 "http://localhost:8080".to_string(),
2857 ),
2858 ("a".to_string(), "b".to_string()),
2859 ]),
2860 )
2861 .await;
2862
2863 assert!(catalog.is_ok());
2864
2865 let catalog_config = catalog.unwrap().user_config;
2866 assert_eq!(catalog_config.name.as_deref(), Some("test"));
2867 assert_eq!(catalog_config.uri, "http://localhost:8080");
2868 assert_eq!(catalog_config.warehouse, None);
2869 assert!(catalog_config.client.is_some());
2870
2871 assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2872 assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2873 }
2874
2875 #[tokio::test]
2876 async fn test_create_rest_catalog_no_uri() {
2877 let builder = RestCatalogBuilder::default();
2878
2879 let catalog = builder
2880 .load(
2881 "test",
2882 HashMap::from([(
2883 REST_CATALOG_PROP_WAREHOUSE.to_string(),
2884 "s3://warehouse".to_string(),
2885 )]),
2886 )
2887 .await;
2888
2889 assert!(catalog.is_err());
2890 if let Err(err) = catalog {
2891 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2892 assert_eq!(err.message(), "Catalog uri is required");
2893 }
2894 }
2895}