1use std::collections::HashMap;
21use std::future::Future;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
27use iceberg::table::Table;
28use iceberg::{
29 Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30 TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34 HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41 HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44 CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45 CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46 NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49pub const REST_CATALOG_PROP_URI: &str = "uri";
51pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60#[derive(Debug)]
62pub struct RestCatalogBuilder {
63 config: RestCatalogConfig,
64 storage_factory: Option<Arc<dyn StorageFactory>>,
65}
66
67impl Default for RestCatalogBuilder {
68 fn default() -> Self {
69 Self {
70 config: RestCatalogConfig {
71 name: None,
72 uri: "".to_string(),
73 warehouse: None,
74 props: HashMap::new(),
75 client: None,
76 },
77 storage_factory: None,
78 }
79 }
80}
81
82impl CatalogBuilder for RestCatalogBuilder {
83 type C = RestCatalog;
84
85 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
86 self.storage_factory = Some(storage_factory);
87 self
88 }
89
90 fn load(
91 mut self,
92 name: impl Into<String>,
93 props: HashMap<String, String>,
94 ) -> impl Future<Output = Result<Self::C>> + Send {
95 self.config.name = Some(name.into());
96
97 if props.contains_key(REST_CATALOG_PROP_URI) {
98 self.config.uri = props
99 .get(REST_CATALOG_PROP_URI)
100 .cloned()
101 .unwrap_or_default();
102 }
103
104 if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
105 self.config.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
106 }
107
108 self.config.props = props
110 .into_iter()
111 .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
112 .collect();
113
114 let result = {
115 if self.config.name.is_none() {
116 Err(Error::new(
117 ErrorKind::DataInvalid,
118 "Catalog name is required",
119 ))
120 } else if self.config.uri.is_empty() {
121 Err(Error::new(
122 ErrorKind::DataInvalid,
123 "Catalog uri is required",
124 ))
125 } else {
126 Ok(RestCatalog::new(self.config, self.storage_factory))
127 }
128 };
129
130 std::future::ready(result)
131 }
132}
133
134impl RestCatalogBuilder {
135 pub fn with_client(mut self, client: Client) -> Self {
137 self.config.client = Some(client);
138 self
139 }
140}
141
142#[derive(Clone, Debug, TypedBuilder)]
144pub(crate) struct RestCatalogConfig {
145 #[builder(default, setter(strip_option))]
146 name: Option<String>,
147
148 uri: String,
149
150 #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
151 warehouse: Option<String>,
152
153 #[builder(default)]
154 props: HashMap<String, String>,
155
156 #[builder(default)]
157 client: Option<Client>,
158}
159
160impl RestCatalogConfig {
161 fn url_prefixed(&self, parts: &[&str]) -> String {
162 [&self.uri, PATH_V1]
163 .into_iter()
164 .chain(self.props.get("prefix").map(|s| &**s))
165 .chain(parts.iter().cloned())
166 .join("/")
167 }
168
169 fn config_endpoint(&self) -> String {
170 [&self.uri, PATH_V1, "config"].join("/")
171 }
172
173 pub(crate) fn get_token_endpoint(&self) -> String {
174 if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
175 oauth2_uri.to_string()
176 } else {
177 [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
178 }
179 }
180
181 fn namespaces_endpoint(&self) -> String {
182 self.url_prefixed(&["namespaces"])
183 }
184
185 fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
186 self.url_prefixed(&["namespaces", &ns.to_url_string()])
187 }
188
189 fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
190 self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
191 }
192
193 fn rename_table_endpoint(&self) -> String {
194 self.url_prefixed(&["tables", "rename"])
195 }
196
197 fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
198 self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
199 }
200
201 fn table_endpoint(&self, table: &TableIdent) -> String {
202 self.url_prefixed(&[
203 "namespaces",
204 &table.namespace.to_url_string(),
205 "tables",
206 &table.name,
207 ])
208 }
209
210 pub(crate) fn client(&self) -> Option<Client> {
212 self.client.clone()
213 }
214
215 pub(crate) fn token(&self) -> Option<String> {
219 self.props.get("token").cloned()
220 }
221
222 pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
231 let cred = self.props.get("credential")?;
232
233 match cred.split_once(':') {
234 Some((client_id, client_secret)) => {
235 Some((Some(client_id.to_string()), client_secret.to_string()))
236 }
237 None => Some((None, cred.to_string())),
238 }
239 }
240
241 pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
248 let mut headers = HeaderMap::from_iter([
249 (
250 header::CONTENT_TYPE,
251 HeaderValue::from_static("application/json"),
252 ),
253 (
254 HeaderName::from_static("x-client-version"),
255 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
256 ),
257 (
258 header::USER_AGENT,
259 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
260 ),
261 ]);
262
263 for (key, value) in self
264 .props
265 .iter()
266 .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
267 {
268 headers.insert(
269 HeaderName::from_str(key).map_err(|e| {
270 Error::new(
271 ErrorKind::DataInvalid,
272 format!("Invalid header name: {key}"),
273 )
274 .with_source(e)
275 })?,
276 HeaderValue::from_str(value).map_err(|e| {
277 Error::new(
278 ErrorKind::DataInvalid,
279 format!("Invalid header value: {value}"),
280 )
281 .with_source(e)
282 })?,
283 );
284 }
285
286 Ok(headers)
287 }
288
289 pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
291 let mut params = HashMap::new();
292
293 if let Some(scope) = self.props.get("scope") {
294 params.insert("scope".to_string(), scope.to_string());
295 } else {
296 params.insert("scope".to_string(), "catalog".to_string());
297 }
298
299 let optional_params = ["audience", "resource"];
300 for param_name in optional_params {
301 if let Some(value) = self.props.get(param_name) {
302 params.insert(param_name.to_string(), value.to_string());
303 }
304 }
305
306 params
307 }
308
309 pub(crate) fn disable_header_redaction(&self) -> bool {
314 self.props
315 .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
316 .map(|v| v.eq_ignore_ascii_case("true"))
317 .unwrap_or(false)
318 }
319
320 pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
322 if let Some(uri) = config.overrides.remove("uri") {
323 self.uri = uri;
324 }
325
326 let mut props = config.defaults;
327 props.extend(self.props);
328 props.extend(config.overrides);
329
330 self.props = props;
331 self
332 }
333}
334
335#[derive(Debug)]
336struct RestContext {
337 client: HttpClient,
338 config: RestCatalogConfig,
342}
343
344#[derive(Debug)]
346pub struct RestCatalog {
347 user_config: RestCatalogConfig,
351 ctx: OnceCell<RestContext>,
352 storage_factory: Option<Arc<dyn StorageFactory>>,
354}
355
356impl RestCatalog {
357 fn new(config: RestCatalogConfig, storage_factory: Option<Arc<dyn StorageFactory>>) -> Self {
359 Self {
360 user_config: config,
361 ctx: OnceCell::new(),
362 storage_factory,
363 }
364 }
365
366 async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> {
368 let context = self.context().await?;
369
370 let mut request_builder = context
371 .client
372 .request(Method::DELETE, context.config.table_endpoint(table));
373
374 if purge {
375 request_builder = request_builder.query(&[("purgeRequested", "true")]);
376 }
377
378 let request = request_builder.build()?;
379 let http_response = context.client.query_catalog(request).await?;
380
381 match http_response.status() {
382 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
383 StatusCode::NOT_FOUND => Err(Error::new(
384 ErrorKind::TableNotFound,
385 "Tried to drop a table that does not exist",
386 )),
387 _ => Err(deserialize_unexpected_catalog_error(
388 http_response,
389 context.client.disable_header_redaction(),
390 )
391 .await),
392 }
393 }
394
395 async fn context(&self) -> Result<&RestContext> {
397 self.ctx
398 .get_or_try_init(|| async {
399 let client = HttpClient::new(&self.user_config)?;
400 let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
401 let config = self.user_config.clone().merge_with_config(catalog_config);
402 let client = client.update_with(&config)?;
403
404 Ok(RestContext { config, client })
405 })
406 .await
407 }
408
409 async fn load_config(
413 client: &HttpClient,
414 user_config: &RestCatalogConfig,
415 ) -> Result<CatalogConfig> {
416 let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
417
418 if let Some(warehouse_location) = &user_config.warehouse {
419 request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
420 }
421
422 let request = request_builder.build()?;
423
424 let http_response = client.query_catalog(request).await?;
425
426 match http_response.status() {
427 StatusCode::OK => deserialize_catalog_response(http_response).await,
428 _ => Err(deserialize_unexpected_catalog_error(
429 http_response,
430 client.disable_header_redaction(),
431 )
432 .await),
433 }
434 }
435
436 async fn load_file_io(
437 &self,
438 metadata_location: Option<&str>,
439 extra_config: Option<HashMap<String, String>>,
440 ) -> Result<FileIO> {
441 let mut props = self.context().await?.config.props.clone();
442 if let Some(config) = extra_config {
443 props.extend(config);
444 }
445
446 let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
449 Some(url) if Url::parse(url).is_ok() => Some(url),
450 Some(_) => None,
451 None => None,
452 };
453
454 if metadata_location.or(warehouse_path).is_none() {
455 return Err(Error::new(
456 ErrorKind::Unexpected,
457 "Unable to load file io, neither warehouse nor metadata location is set!",
458 ));
459 }
460
461 let factory = self
463 .storage_factory
464 .clone()
465 .ok_or_else(|| {
466 Error::new(
467 ErrorKind::Unexpected,
468 "StorageFactory must be provided for RestCatalog. Use `with_storage_factory` to configure it.",
469 )
470 })?;
471
472 let file_io = FileIOBuilder::new(factory).with_props(props).build();
473
474 Ok(file_io)
475 }
476
477 pub async fn invalidate_token(&self) -> Result<()> {
480 self.context().await?.client.invalidate_token().await
481 }
482
483 pub async fn regenerate_token(&self) -> Result<()> {
490 self.context().await?.client.regenerate_token().await
491 }
492}
493
494#[async_trait]
497impl Catalog for RestCatalog {
498 async fn list_namespaces(
499 &self,
500 parent: Option<&NamespaceIdent>,
501 ) -> Result<Vec<NamespaceIdent>> {
502 let context = self.context().await?;
503 let endpoint = context.config.namespaces_endpoint();
504 let mut namespaces = Vec::new();
505 let mut next_token = None;
506
507 loop {
508 let mut request = context.client.request(Method::GET, endpoint.clone());
509
510 if let Some(ns) = parent {
512 request = request.query(&[("parent", ns.to_url_string())]);
513 }
514
515 if let Some(token) = next_token {
516 request = request.query(&[("pageToken", token)]);
517 }
518
519 let http_response = context.client.query_catalog(request.build()?).await?;
520
521 match http_response.status() {
522 StatusCode::OK => {
523 let response =
524 deserialize_catalog_response::<ListNamespaceResponse>(http_response)
525 .await?;
526
527 namespaces.extend(response.namespaces);
528
529 match response.next_page_token {
530 Some(token) => next_token = Some(token),
531 None => break,
532 }
533 }
534 StatusCode::NOT_FOUND => {
535 return Err(Error::new(
536 ErrorKind::NamespaceNotFound,
537 "The parent parameter of the namespace provided does not exist",
538 ));
539 }
540 _ => {
541 return Err(deserialize_unexpected_catalog_error(
542 http_response,
543 context.client.disable_header_redaction(),
544 )
545 .await);
546 }
547 }
548 }
549
550 Ok(namespaces)
551 }
552
553 async fn create_namespace(
554 &self,
555 namespace: &NamespaceIdent,
556 properties: HashMap<String, String>,
557 ) -> Result<Namespace> {
558 let context = self.context().await?;
559
560 let request = context
561 .client
562 .request(Method::POST, context.config.namespaces_endpoint())
563 .json(&CreateNamespaceRequest {
564 namespace: namespace.clone(),
565 properties,
566 })
567 .build()?;
568
569 let http_response = context.client.query_catalog(request).await?;
570
571 match http_response.status() {
572 StatusCode::OK => {
573 let response =
574 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
575 Ok(Namespace::from(response))
576 }
577 StatusCode::CONFLICT => Err(Error::new(
578 ErrorKind::NamespaceAlreadyExists,
579 "Tried to create a namespace that already exists",
580 )),
581 _ => Err(deserialize_unexpected_catalog_error(
582 http_response,
583 context.client.disable_header_redaction(),
584 )
585 .await),
586 }
587 }
588
589 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
590 let context = self.context().await?;
591
592 let request = context
593 .client
594 .request(Method::GET, context.config.namespace_endpoint(namespace))
595 .build()?;
596
597 let http_response = context.client.query_catalog(request).await?;
598
599 match http_response.status() {
600 StatusCode::OK => {
601 let response =
602 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
603 Ok(Namespace::from(response))
604 }
605 StatusCode::NOT_FOUND => Err(Error::new(
606 ErrorKind::NamespaceNotFound,
607 "Tried to get a namespace that does not exist",
608 )),
609 _ => Err(deserialize_unexpected_catalog_error(
610 http_response,
611 context.client.disable_header_redaction(),
612 )
613 .await),
614 }
615 }
616
617 async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
618 let context = self.context().await?;
619
620 let request = context
621 .client
622 .request(Method::HEAD, context.config.namespace_endpoint(ns))
623 .build()?;
624
625 let http_response = context.client.query_catalog(request).await?;
626
627 match http_response.status() {
628 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
629 StatusCode::NOT_FOUND => Ok(false),
630 _ => Err(deserialize_unexpected_catalog_error(
631 http_response,
632 context.client.disable_header_redaction(),
633 )
634 .await),
635 }
636 }
637
638 async fn update_namespace(
639 &self,
640 _namespace: &NamespaceIdent,
641 _properties: HashMap<String, String>,
642 ) -> Result<()> {
643 Err(Error::new(
644 ErrorKind::FeatureUnsupported,
645 "Updating namespace not supported yet!",
646 ))
647 }
648
649 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
650 let context = self.context().await?;
651
652 let request = context
653 .client
654 .request(Method::DELETE, context.config.namespace_endpoint(namespace))
655 .build()?;
656
657 let http_response = context.client.query_catalog(request).await?;
658
659 match http_response.status() {
660 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
661 StatusCode::NOT_FOUND => Err(Error::new(
662 ErrorKind::NamespaceNotFound,
663 "Tried to drop a namespace that does not exist",
664 )),
665 _ => Err(deserialize_unexpected_catalog_error(
666 http_response,
667 context.client.disable_header_redaction(),
668 )
669 .await),
670 }
671 }
672
673 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
674 let context = self.context().await?;
675 let endpoint = context.config.tables_endpoint(namespace);
676 let mut identifiers = Vec::new();
677 let mut next_token = None;
678
679 loop {
680 let mut request = context.client.request(Method::GET, endpoint.clone());
681
682 if let Some(token) = next_token {
683 request = request.query(&[("pageToken", token)]);
684 }
685
686 let http_response = context.client.query_catalog(request.build()?).await?;
687
688 match http_response.status() {
689 StatusCode::OK => {
690 let response =
691 deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
692
693 identifiers.extend(response.identifiers);
694
695 match response.next_page_token {
696 Some(token) => next_token = Some(token),
697 None => break,
698 }
699 }
700 StatusCode::NOT_FOUND => {
701 return Err(Error::new(
702 ErrorKind::NamespaceNotFound,
703 "Tried to list tables of a namespace that does not exist",
704 ));
705 }
706 _ => {
707 return Err(deserialize_unexpected_catalog_error(
708 http_response,
709 context.client.disable_header_redaction(),
710 )
711 .await);
712 }
713 }
714 }
715
716 Ok(identifiers)
717 }
718
719 async fn create_table(
726 &self,
727 namespace: &NamespaceIdent,
728 creation: TableCreation,
729 ) -> Result<Table> {
730 let context = self.context().await?;
731
732 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
733
734 let request = context
735 .client
736 .request(Method::POST, context.config.tables_endpoint(namespace))
737 .json(&CreateTableRequest {
738 name: creation.name,
739 location: creation.location,
740 schema: creation.schema,
741 partition_spec: creation.partition_spec,
742 write_order: creation.sort_order,
743 stage_create: Some(false),
744 properties: creation.properties,
745 })
746 .build()?;
747
748 let http_response = context.client.query_catalog(request).await?;
749
750 let response = match http_response.status() {
751 StatusCode::OK => {
752 deserialize_catalog_response::<LoadTableResult>(http_response).await?
753 }
754 StatusCode::NOT_FOUND => {
755 return Err(Error::new(
756 ErrorKind::NamespaceNotFound,
757 "Tried to create a table under a namespace that does not exist",
758 ));
759 }
760 StatusCode::CONFLICT => {
761 return Err(Error::new(
762 ErrorKind::TableAlreadyExists,
763 "The table already exists",
764 ));
765 }
766 _ => {
767 return Err(deserialize_unexpected_catalog_error(
768 http_response,
769 context.client.disable_header_redaction(),
770 )
771 .await);
772 }
773 };
774
775 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
776 ErrorKind::DataInvalid,
777 "Metadata location missing in `create_table` response!",
778 ))?;
779
780 let config = response
781 .config
782 .into_iter()
783 .chain(self.user_config.props.clone())
784 .collect();
785
786 let file_io = self
787 .load_file_io(Some(metadata_location), Some(config))
788 .await?;
789
790 let table_builder = Table::builder()
791 .identifier(table_ident.clone())
792 .file_io(file_io)
793 .metadata(response.metadata);
794
795 if let Some(metadata_location) = response.metadata_location {
796 table_builder.metadata_location(metadata_location).build()
797 } else {
798 table_builder.build()
799 }
800 }
801
802 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
808 let context = self.context().await?;
809
810 let request = context
811 .client
812 .request(Method::GET, context.config.table_endpoint(table_ident))
813 .build()?;
814
815 let http_response = context.client.query_catalog(request).await?;
816
817 let response = match http_response.status() {
818 StatusCode::OK | StatusCode::NOT_MODIFIED => {
819 deserialize_catalog_response::<LoadTableResult>(http_response).await?
820 }
821 StatusCode::NOT_FOUND => {
822 return Err(Error::new(
823 ErrorKind::TableNotFound,
824 "Tried to load a table that does not exist",
825 ));
826 }
827 _ => {
828 return Err(deserialize_unexpected_catalog_error(
829 http_response,
830 context.client.disable_header_redaction(),
831 )
832 .await);
833 }
834 };
835
836 let config = response
837 .config
838 .into_iter()
839 .chain(self.user_config.props.clone())
840 .collect();
841
842 let file_io = self
843 .load_file_io(response.metadata_location.as_deref(), Some(config))
844 .await?;
845
846 let table_builder = Table::builder()
847 .identifier(table_ident.clone())
848 .file_io(file_io)
849 .metadata(response.metadata);
850
851 if let Some(metadata_location) = response.metadata_location {
852 table_builder.metadata_location(metadata_location).build()
853 } else {
854 table_builder.build()
855 }
856 }
857
858 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
860 self.delete_table(table, false).await
861 }
862
863 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
866 self.delete_table(table, true).await
867 }
868
869 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
871 let context = self.context().await?;
872
873 let request = context
874 .client
875 .request(Method::HEAD, context.config.table_endpoint(table))
876 .build()?;
877
878 let http_response = context.client.query_catalog(request).await?;
879
880 match http_response.status() {
881 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
882 StatusCode::NOT_FOUND => Ok(false),
883 _ => Err(deserialize_unexpected_catalog_error(
884 http_response,
885 context.client.disable_header_redaction(),
886 )
887 .await),
888 }
889 }
890
891 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
893 let context = self.context().await?;
894
895 let request = context
896 .client
897 .request(Method::POST, context.config.rename_table_endpoint())
898 .json(&RenameTableRequest {
899 source: src.clone(),
900 destination: dest.clone(),
901 })
902 .build()?;
903
904 let http_response = context.client.query_catalog(request).await?;
905
906 match http_response.status() {
907 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
908 StatusCode::NOT_FOUND => Err(Error::new(
909 ErrorKind::TableNotFound,
910 "Tried to rename a table that does not exist (is the namespace correct?)",
911 )),
912 StatusCode::CONFLICT => Err(Error::new(
913 ErrorKind::TableAlreadyExists,
914 "Tried to rename a table to a name that already exists",
915 )),
916 _ => Err(deserialize_unexpected_catalog_error(
917 http_response,
918 context.client.disable_header_redaction(),
919 )
920 .await),
921 }
922 }
923
924 async fn register_table(
925 &self,
926 table_ident: &TableIdent,
927 metadata_location: String,
928 ) -> Result<Table> {
929 let context = self.context().await?;
930
931 let request = context
932 .client
933 .request(
934 Method::POST,
935 context
936 .config
937 .register_table_endpoint(table_ident.namespace()),
938 )
939 .json(&RegisterTableRequest {
940 name: table_ident.name.clone(),
941 metadata_location: metadata_location.clone(),
942 overwrite: Some(false),
943 })
944 .build()?;
945
946 let http_response = context.client.query_catalog(request).await?;
947
948 let response: LoadTableResult = match http_response.status() {
949 StatusCode::OK => {
950 deserialize_catalog_response::<LoadTableResult>(http_response).await?
951 }
952 StatusCode::NOT_FOUND => {
953 return Err(Error::new(
954 ErrorKind::NamespaceNotFound,
955 "The namespace specified does not exist.",
956 ));
957 }
958 StatusCode::CONFLICT => {
959 return Err(Error::new(
960 ErrorKind::TableAlreadyExists,
961 "The given table already exists.",
962 ));
963 }
964 _ => {
965 return Err(deserialize_unexpected_catalog_error(
966 http_response,
967 context.client.disable_header_redaction(),
968 )
969 .await);
970 }
971 };
972
973 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
974 ErrorKind::DataInvalid,
975 "Metadata location missing in `register_table` response!",
976 ))?;
977
978 let file_io = self.load_file_io(Some(metadata_location), None).await?;
979
980 Table::builder()
981 .identifier(table_ident.clone())
982 .file_io(file_io)
983 .metadata(response.metadata)
984 .metadata_location(metadata_location.clone())
985 .build()
986 }
987
988 async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
989 let context = self.context().await?;
990
991 let request = context
992 .client
993 .request(
994 Method::POST,
995 context.config.table_endpoint(commit.identifier()),
996 )
997 .json(&CommitTableRequest {
998 identifier: Some(commit.identifier().clone()),
999 requirements: commit.take_requirements(),
1000 updates: commit.take_updates(),
1001 })
1002 .build()?;
1003
1004 let http_response = context.client.query_catalog(request).await?;
1005
1006 let response: CommitTableResponse = match http_response.status() {
1007 StatusCode::OK => deserialize_catalog_response(http_response).await?,
1008 StatusCode::NOT_FOUND => {
1009 return Err(Error::new(
1010 ErrorKind::TableNotFound,
1011 "Tried to update a table that does not exist",
1012 ));
1013 }
1014 StatusCode::CONFLICT => {
1015 return Err(Error::new(
1016 ErrorKind::CatalogCommitConflicts,
1017 "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
1018 )
1019 .with_retryable(true));
1020 }
1021 StatusCode::INTERNAL_SERVER_ERROR => {
1022 return Err(Error::new(
1023 ErrorKind::Unexpected,
1024 "An unknown server-side problem occurred; the commit state is unknown.",
1025 ));
1026 }
1027 StatusCode::BAD_GATEWAY => {
1028 return Err(Error::new(
1029 ErrorKind::Unexpected,
1030 "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1031 ));
1032 }
1033 StatusCode::GATEWAY_TIMEOUT => {
1034 return Err(Error::new(
1035 ErrorKind::Unexpected,
1036 "A server-side gateway timeout occurred; the commit state is unknown.",
1037 ));
1038 }
1039 _ => {
1040 return Err(deserialize_unexpected_catalog_error(
1041 http_response,
1042 context.client.disable_header_redaction(),
1043 )
1044 .await);
1045 }
1046 };
1047
1048 let file_io = self
1049 .load_file_io(Some(&response.metadata_location), None)
1050 .await?;
1051
1052 Table::builder()
1053 .identifier(commit.identifier().clone())
1054 .file_io(file_io)
1055 .metadata(response.metadata)
1056 .metadata_location(response.metadata_location)
1057 .build()
1058 }
1059}
1060
1061#[cfg(test)]
1062mod tests {
1063 use std::fs::File;
1064 use std::io::BufReader;
1065 use std::sync::Arc;
1066
1067 use chrono::{TimeZone, Utc};
1068 use iceberg::io::LocalFsStorageFactory;
1069 use iceberg::spec::{
1070 FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1071 SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1072 UnboundPartitionField, UnboundPartitionSpec,
1073 };
1074 use iceberg::transaction::{ApplyTransactionAction, Transaction};
1075 use mockito::{Mock, Server, ServerGuard};
1076 use serde_json::json;
1077 use uuid::uuid;
1078
1079 use super::*;
1080
1081 #[tokio::test]
1082 async fn test_update_config() {
1083 let mut server = Server::new_async().await;
1084
1085 let config_mock = server
1086 .mock("GET", "/v1/config")
1087 .with_status(200)
1088 .with_body(
1089 r#"{
1090 "overrides": {
1091 "warehouse": "s3://iceberg-catalog"
1092 },
1093 "defaults": {}
1094 }"#,
1095 )
1096 .create_async()
1097 .await;
1098
1099 let catalog = RestCatalog::new(
1100 RestCatalogConfig::builder().uri(server.url()).build(),
1101 Some(Arc::new(LocalFsStorageFactory)),
1102 );
1103
1104 assert_eq!(
1105 catalog
1106 .context()
1107 .await
1108 .unwrap()
1109 .config
1110 .props
1111 .get("warehouse"),
1112 Some(&"s3://iceberg-catalog".to_string())
1113 );
1114
1115 config_mock.assert_async().await;
1116 }
1117
1118 async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1119 server
1120 .mock("GET", "/v1/config")
1121 .with_status(200)
1122 .with_body(
1123 r#"{
1124 "overrides": {
1125 "warehouse": "s3://iceberg-catalog"
1126 },
1127 "defaults": {}
1128 }"#,
1129 )
1130 .create_async()
1131 .await
1132 }
1133
1134 async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1135 create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1136 }
1137
1138 async fn create_oauth_mock_with_path(
1139 server: &mut ServerGuard,
1140 path: &str,
1141 token: &str,
1142 status: usize,
1143 ) -> Mock {
1144 let body = format!(
1145 r#"{{
1146 "access_token": "{token}",
1147 "token_type": "Bearer",
1148 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1149 "expires_in": 86400
1150 }}"#
1151 );
1152 server
1153 .mock("POST", path)
1154 .with_status(status)
1155 .with_body(body)
1156 .expect(1)
1157 .create_async()
1158 .await
1159 }
1160
1161 #[tokio::test]
1162 async fn test_oauth() {
1163 let mut server = Server::new_async().await;
1164 let oauth_mock = create_oauth_mock(&mut server).await;
1165 let config_mock = create_config_mock(&mut server).await;
1166
1167 let mut props = HashMap::new();
1168 props.insert("credential".to_string(), "client1:secret1".to_string());
1169
1170 let catalog = RestCatalog::new(
1171 RestCatalogConfig::builder()
1172 .uri(server.url())
1173 .props(props)
1174 .build(),
1175 Some(Arc::new(LocalFsStorageFactory)),
1176 );
1177
1178 let token = catalog.context().await.unwrap().client.token().await;
1179 oauth_mock.assert_async().await;
1180 config_mock.assert_async().await;
1181 assert_eq!(token, Some("ey000000000000".to_string()));
1182 }
1183
1184 #[tokio::test]
1185 async fn test_oauth_with_optional_param() {
1186 let mut props = HashMap::new();
1187 props.insert("credential".to_string(), "client1:secret1".to_string());
1188 props.insert("scope".to_string(), "custom_scope".to_string());
1189 props.insert("audience".to_string(), "custom_audience".to_string());
1190 props.insert("resource".to_string(), "custom_resource".to_string());
1191
1192 let mut server = Server::new_async().await;
1193 let oauth_mock = server
1194 .mock("POST", "/v1/oauth/tokens")
1195 .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1196 .match_body(mockito::Matcher::Regex(
1197 "audience=custom_audience".to_string(),
1198 ))
1199 .match_body(mockito::Matcher::Regex(
1200 "resource=custom_resource".to_string(),
1201 ))
1202 .with_status(200)
1203 .with_body(
1204 r#"{
1205 "access_token": "ey000000000000",
1206 "token_type": "Bearer",
1207 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1208 "expires_in": 86400
1209 }"#,
1210 )
1211 .expect(1)
1212 .create_async()
1213 .await;
1214
1215 let config_mock = create_config_mock(&mut server).await;
1216
1217 let catalog = RestCatalog::new(
1218 RestCatalogConfig::builder()
1219 .uri(server.url())
1220 .props(props)
1221 .build(),
1222 Some(Arc::new(LocalFsStorageFactory)),
1223 );
1224
1225 let token = catalog.context().await.unwrap().client.token().await;
1226
1227 oauth_mock.assert_async().await;
1228 config_mock.assert_async().await;
1229 assert_eq!(token, Some("ey000000000000".to_string()));
1230 }
1231
1232 #[tokio::test]
1233 async fn test_invalidate_token() {
1234 let mut server = Server::new_async().await;
1235 let oauth_mock = create_oauth_mock(&mut server).await;
1236 let config_mock = create_config_mock(&mut server).await;
1237
1238 let mut props = HashMap::new();
1239 props.insert("credential".to_string(), "client1:secret1".to_string());
1240
1241 let catalog = RestCatalog::new(
1242 RestCatalogConfig::builder()
1243 .uri(server.url())
1244 .props(props)
1245 .build(),
1246 Some(Arc::new(LocalFsStorageFactory)),
1247 );
1248
1249 let token = catalog.context().await.unwrap().client.token().await;
1250 oauth_mock.assert_async().await;
1251 config_mock.assert_async().await;
1252 assert_eq!(token, Some("ey000000000000".to_string()));
1253
1254 let oauth_mock =
1255 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1256 .await;
1257 catalog.invalidate_token().await.unwrap();
1258 let token = catalog.context().await.unwrap().client.token().await;
1259 oauth_mock.assert_async().await;
1260 assert_eq!(token, Some("ey000000000001".to_string()));
1261 }
1262
1263 #[tokio::test]
1264 async fn test_invalidate_token_failing_request() {
1265 let mut server = Server::new_async().await;
1266 let oauth_mock = create_oauth_mock(&mut server).await;
1267 let config_mock = create_config_mock(&mut server).await;
1268
1269 let mut props = HashMap::new();
1270 props.insert("credential".to_string(), "client1:secret1".to_string());
1271
1272 let catalog = RestCatalog::new(
1273 RestCatalogConfig::builder()
1274 .uri(server.url())
1275 .props(props)
1276 .build(),
1277 Some(Arc::new(LocalFsStorageFactory)),
1278 );
1279
1280 let token = catalog.context().await.unwrap().client.token().await;
1281 oauth_mock.assert_async().await;
1282 config_mock.assert_async().await;
1283 assert_eq!(token, Some("ey000000000000".to_string()));
1284
1285 let oauth_mock =
1286 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1287 .await;
1288 catalog.invalidate_token().await.unwrap();
1289 let token = catalog.context().await.unwrap().client.token().await;
1290 oauth_mock.assert_async().await;
1291 assert_eq!(token, None);
1292 }
1293
1294 #[tokio::test]
1295 async fn test_regenerate_token() {
1296 let mut server = Server::new_async().await;
1297 let oauth_mock = create_oauth_mock(&mut server).await;
1298 let config_mock = create_config_mock(&mut server).await;
1299
1300 let mut props = HashMap::new();
1301 props.insert("credential".to_string(), "client1:secret1".to_string());
1302
1303 let catalog = RestCatalog::new(
1304 RestCatalogConfig::builder()
1305 .uri(server.url())
1306 .props(props)
1307 .build(),
1308 Some(Arc::new(LocalFsStorageFactory)),
1309 );
1310
1311 let token = catalog.context().await.unwrap().client.token().await;
1312 oauth_mock.assert_async().await;
1313 config_mock.assert_async().await;
1314 assert_eq!(token, Some("ey000000000000".to_string()));
1315
1316 let oauth_mock =
1317 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1318 .await;
1319 catalog.regenerate_token().await.unwrap();
1320 oauth_mock.assert_async().await;
1321 let token = catalog.context().await.unwrap().client.token().await;
1322 assert_eq!(token, Some("ey000000000001".to_string()));
1323 }
1324
1325 #[tokio::test]
1326 async fn test_regenerate_token_failing_request() {
1327 let mut server = Server::new_async().await;
1328 let oauth_mock = create_oauth_mock(&mut server).await;
1329 let config_mock = create_config_mock(&mut server).await;
1330
1331 let mut props = HashMap::new();
1332 props.insert("credential".to_string(), "client1:secret1".to_string());
1333
1334 let catalog = RestCatalog::new(
1335 RestCatalogConfig::builder()
1336 .uri(server.url())
1337 .props(props)
1338 .build(),
1339 Some(Arc::new(LocalFsStorageFactory)),
1340 );
1341
1342 let token = catalog.context().await.unwrap().client.token().await;
1343 oauth_mock.assert_async().await;
1344 config_mock.assert_async().await;
1345 assert_eq!(token, Some("ey000000000000".to_string()));
1346
1347 let oauth_mock =
1348 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1349 .await;
1350 let invalidate_result = catalog.regenerate_token().await;
1351 assert!(invalidate_result.is_err());
1352 oauth_mock.assert_async().await;
1353 let token = catalog.context().await.unwrap().client.token().await;
1354
1355 assert_eq!(token, Some("ey000000000000".to_string()));
1357 }
1358
1359 #[tokio::test]
1360 async fn test_http_headers() {
1361 let server = Server::new_async().await;
1362 let mut props = HashMap::new();
1363 props.insert("credential".to_string(), "client1:secret1".to_string());
1364
1365 let config = RestCatalogConfig::builder()
1366 .uri(server.url())
1367 .props(props)
1368 .build();
1369 let headers: HeaderMap = config.extra_headers().unwrap();
1370
1371 let expected_headers = HeaderMap::from_iter([
1372 (
1373 header::CONTENT_TYPE,
1374 HeaderValue::from_static("application/json"),
1375 ),
1376 (
1377 HeaderName::from_static("x-client-version"),
1378 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1379 ),
1380 (
1381 header::USER_AGENT,
1382 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1383 ),
1384 ]);
1385 assert_eq!(headers, expected_headers);
1386 }
1387
1388 #[tokio::test]
1389 async fn test_http_headers_with_custom_headers() {
1390 let server = Server::new_async().await;
1391 let mut props = HashMap::new();
1392 props.insert("credential".to_string(), "client1:secret1".to_string());
1393 props.insert(
1394 "header.content-type".to_string(),
1395 "application/yaml".to_string(),
1396 );
1397 props.insert(
1398 "header.customized-header".to_string(),
1399 "some/value".to_string(),
1400 );
1401
1402 let config = RestCatalogConfig::builder()
1403 .uri(server.url())
1404 .props(props)
1405 .build();
1406 let headers: HeaderMap = config.extra_headers().unwrap();
1407
1408 let expected_headers = HeaderMap::from_iter([
1409 (
1410 header::CONTENT_TYPE,
1411 HeaderValue::from_static("application/yaml"),
1412 ),
1413 (
1414 HeaderName::from_static("x-client-version"),
1415 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1416 ),
1417 (
1418 header::USER_AGENT,
1419 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1420 ),
1421 (
1422 HeaderName::from_static("customized-header"),
1423 HeaderValue::from_static("some/value"),
1424 ),
1425 ]);
1426 assert_eq!(headers, expected_headers);
1427 }
1428
1429 #[tokio::test]
1430 async fn test_oauth_with_oauth2_server_uri() {
1431 let mut server = Server::new_async().await;
1432 let config_mock = create_config_mock(&mut server).await;
1433
1434 let mut auth_server = Server::new_async().await;
1435 let auth_server_path = "/some/path";
1436 let oauth_mock =
1437 create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1438 .await;
1439
1440 let mut props = HashMap::new();
1441 props.insert("credential".to_string(), "client1:secret1".to_string());
1442 props.insert(
1443 "oauth2-server-uri".to_string(),
1444 format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1445 );
1446
1447 let catalog = RestCatalog::new(
1448 RestCatalogConfig::builder()
1449 .uri(server.url())
1450 .props(props)
1451 .build(),
1452 Some(Arc::new(LocalFsStorageFactory)),
1453 );
1454
1455 let token = catalog.context().await.unwrap().client.token().await;
1456
1457 oauth_mock.assert_async().await;
1458 config_mock.assert_async().await;
1459 assert_eq!(token, Some("ey000000000000".to_string()));
1460 }
1461
1462 #[tokio::test]
1463 async fn test_config_override() {
1464 let mut server = Server::new_async().await;
1465 let mut redirect_server = Server::new_async().await;
1466 let new_uri = redirect_server.url();
1467
1468 let config_mock = server
1469 .mock("GET", "/v1/config")
1470 .with_status(200)
1471 .with_body(
1472 json!(
1473 {
1474 "overrides": {
1475 "uri": new_uri,
1476 "warehouse": "s3://iceberg-catalog",
1477 "prefix": "ice/warehouses/my"
1478 },
1479 "defaults": {},
1480 }
1481 )
1482 .to_string(),
1483 )
1484 .create_async()
1485 .await;
1486
1487 let list_ns_mock = redirect_server
1488 .mock("GET", "/v1/ice/warehouses/my/namespaces")
1489 .with_body(
1490 r#"{
1491 "namespaces": []
1492 }"#,
1493 )
1494 .create_async()
1495 .await;
1496
1497 let catalog = RestCatalog::new(
1498 RestCatalogConfig::builder().uri(server.url()).build(),
1499 Some(Arc::new(LocalFsStorageFactory)),
1500 );
1501
1502 let _namespaces = catalog.list_namespaces(None).await.unwrap();
1503
1504 config_mock.assert_async().await;
1505 list_ns_mock.assert_async().await;
1506 }
1507
1508 #[tokio::test]
1509 async fn test_list_namespace() {
1510 let mut server = Server::new_async().await;
1511
1512 let config_mock = create_config_mock(&mut server).await;
1513
1514 let list_ns_mock = server
1515 .mock("GET", "/v1/namespaces")
1516 .with_body(
1517 r#"{
1518 "namespaces": [
1519 ["ns1", "ns11"],
1520 ["ns2"]
1521 ]
1522 }"#,
1523 )
1524 .create_async()
1525 .await;
1526
1527 let catalog = RestCatalog::new(
1528 RestCatalogConfig::builder().uri(server.url()).build(),
1529 Some(Arc::new(LocalFsStorageFactory)),
1530 );
1531
1532 let namespaces = catalog.list_namespaces(None).await.unwrap();
1533
1534 let expected_ns = vec![
1535 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1536 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1537 ];
1538
1539 assert_eq!(expected_ns, namespaces);
1540
1541 config_mock.assert_async().await;
1542 list_ns_mock.assert_async().await;
1543 }
1544
1545 #[tokio::test]
1546 async fn test_list_namespace_with_pagination() {
1547 let mut server = Server::new_async().await;
1548
1549 let config_mock = create_config_mock(&mut server).await;
1550
1551 let list_ns_mock_page1 = server
1552 .mock("GET", "/v1/namespaces")
1553 .with_body(
1554 r#"{
1555 "namespaces": [
1556 ["ns1", "ns11"],
1557 ["ns2"]
1558 ],
1559 "next-page-token": "token123"
1560 }"#,
1561 )
1562 .create_async()
1563 .await;
1564
1565 let list_ns_mock_page2 = server
1566 .mock("GET", "/v1/namespaces?pageToken=token123")
1567 .with_body(
1568 r#"{
1569 "namespaces": [
1570 ["ns3"],
1571 ["ns4", "ns41"]
1572 ]
1573 }"#,
1574 )
1575 .create_async()
1576 .await;
1577
1578 let catalog = RestCatalog::new(
1579 RestCatalogConfig::builder().uri(server.url()).build(),
1580 Some(Arc::new(LocalFsStorageFactory)),
1581 );
1582
1583 let namespaces = catalog.list_namespaces(None).await.unwrap();
1584
1585 let expected_ns = vec![
1586 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1587 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1588 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1589 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1590 ];
1591
1592 assert_eq!(expected_ns, namespaces);
1593
1594 config_mock.assert_async().await;
1595 list_ns_mock_page1.assert_async().await;
1596 list_ns_mock_page2.assert_async().await;
1597 }
1598
1599 #[tokio::test]
1600 async fn test_list_namespace_with_multiple_pages() {
1601 let mut server = Server::new_async().await;
1602
1603 let config_mock = create_config_mock(&mut server).await;
1604
1605 let list_ns_mock_page1 = server
1607 .mock("GET", "/v1/namespaces")
1608 .with_body(
1609 r#"{
1610 "namespaces": [
1611 ["ns1", "ns11"],
1612 ["ns2"]
1613 ],
1614 "next-page-token": "page2"
1615 }"#,
1616 )
1617 .create_async()
1618 .await;
1619
1620 let list_ns_mock_page2 = server
1622 .mock("GET", "/v1/namespaces?pageToken=page2")
1623 .with_body(
1624 r#"{
1625 "namespaces": [
1626 ["ns3"],
1627 ["ns4", "ns41"]
1628 ],
1629 "next-page-token": "page3"
1630 }"#,
1631 )
1632 .create_async()
1633 .await;
1634
1635 let list_ns_mock_page3 = server
1637 .mock("GET", "/v1/namespaces?pageToken=page3")
1638 .with_body(
1639 r#"{
1640 "namespaces": [
1641 ["ns5", "ns51", "ns511"]
1642 ],
1643 "next-page-token": "page4"
1644 }"#,
1645 )
1646 .create_async()
1647 .await;
1648
1649 let list_ns_mock_page4 = server
1651 .mock("GET", "/v1/namespaces?pageToken=page4")
1652 .with_body(
1653 r#"{
1654 "namespaces": [
1655 ["ns6"],
1656 ["ns7"]
1657 ],
1658 "next-page-token": "page5"
1659 }"#,
1660 )
1661 .create_async()
1662 .await;
1663
1664 let list_ns_mock_page5 = server
1666 .mock("GET", "/v1/namespaces?pageToken=page5")
1667 .with_body(
1668 r#"{
1669 "namespaces": [
1670 ["ns8", "ns81"]
1671 ]
1672 }"#,
1673 )
1674 .create_async()
1675 .await;
1676
1677 let catalog = RestCatalog::new(
1678 RestCatalogConfig::builder().uri(server.url()).build(),
1679 Some(Arc::new(LocalFsStorageFactory)),
1680 );
1681
1682 let namespaces = catalog.list_namespaces(None).await.unwrap();
1683
1684 let expected_ns = vec![
1685 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1686 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1687 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1688 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1689 NamespaceIdent::from_vec(vec![
1690 "ns5".to_string(),
1691 "ns51".to_string(),
1692 "ns511".to_string(),
1693 ])
1694 .unwrap(),
1695 NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1696 NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1697 NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1698 ];
1699
1700 assert_eq!(expected_ns, namespaces);
1701
1702 config_mock.assert_async().await;
1704 list_ns_mock_page1.assert_async().await;
1705 list_ns_mock_page2.assert_async().await;
1706 list_ns_mock_page3.assert_async().await;
1707 list_ns_mock_page4.assert_async().await;
1708 list_ns_mock_page5.assert_async().await;
1709 }
1710
1711 #[tokio::test]
1712 async fn test_create_namespace() {
1713 let mut server = Server::new_async().await;
1714
1715 let config_mock = create_config_mock(&mut server).await;
1716
1717 let create_ns_mock = server
1718 .mock("POST", "/v1/namespaces")
1719 .with_body(
1720 r#"{
1721 "namespace": [ "ns1", "ns11"],
1722 "properties" : {
1723 "key1": "value1"
1724 }
1725 }"#,
1726 )
1727 .create_async()
1728 .await;
1729
1730 let catalog = RestCatalog::new(
1731 RestCatalogConfig::builder().uri(server.url()).build(),
1732 Some(Arc::new(LocalFsStorageFactory)),
1733 );
1734
1735 let namespaces = catalog
1736 .create_namespace(
1737 &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1738 HashMap::from([("key1".to_string(), "value1".to_string())]),
1739 )
1740 .await
1741 .unwrap();
1742
1743 let expected_ns = Namespace::with_properties(
1744 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1745 HashMap::from([("key1".to_string(), "value1".to_string())]),
1746 );
1747
1748 assert_eq!(expected_ns, namespaces);
1749
1750 config_mock.assert_async().await;
1751 create_ns_mock.assert_async().await;
1752 }
1753
1754 #[tokio::test]
1755 async fn test_get_namespace() {
1756 let mut server = Server::new_async().await;
1757
1758 let config_mock = create_config_mock(&mut server).await;
1759
1760 let get_ns_mock = server
1761 .mock("GET", "/v1/namespaces/ns1")
1762 .with_body(
1763 r#"{
1764 "namespace": [ "ns1"],
1765 "properties" : {
1766 "key1": "value1"
1767 }
1768 }"#,
1769 )
1770 .create_async()
1771 .await;
1772
1773 let catalog = RestCatalog::new(
1774 RestCatalogConfig::builder().uri(server.url()).build(),
1775 Some(Arc::new(LocalFsStorageFactory)),
1776 );
1777
1778 let namespaces = catalog
1779 .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1780 .await
1781 .unwrap();
1782
1783 let expected_ns = Namespace::with_properties(
1784 NamespaceIdent::new("ns1".to_string()),
1785 HashMap::from([("key1".to_string(), "value1".to_string())]),
1786 );
1787
1788 assert_eq!(expected_ns, namespaces);
1789
1790 config_mock.assert_async().await;
1791 get_ns_mock.assert_async().await;
1792 }
1793
1794 #[tokio::test]
1795 async fn check_namespace_exists() {
1796 let mut server = Server::new_async().await;
1797
1798 let config_mock = create_config_mock(&mut server).await;
1799
1800 let get_ns_mock = server
1801 .mock("HEAD", "/v1/namespaces/ns1")
1802 .with_status(204)
1803 .create_async()
1804 .await;
1805
1806 let catalog = RestCatalog::new(
1807 RestCatalogConfig::builder().uri(server.url()).build(),
1808 Some(Arc::new(LocalFsStorageFactory)),
1809 );
1810
1811 assert!(
1812 catalog
1813 .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1814 .await
1815 .unwrap()
1816 );
1817
1818 config_mock.assert_async().await;
1819 get_ns_mock.assert_async().await;
1820 }
1821
1822 #[tokio::test]
1823 async fn test_drop_namespace() {
1824 let mut server = Server::new_async().await;
1825
1826 let config_mock = create_config_mock(&mut server).await;
1827
1828 let drop_ns_mock = server
1829 .mock("DELETE", "/v1/namespaces/ns1")
1830 .with_status(204)
1831 .create_async()
1832 .await;
1833
1834 let catalog = RestCatalog::new(
1835 RestCatalogConfig::builder().uri(server.url()).build(),
1836 Some(Arc::new(LocalFsStorageFactory)),
1837 );
1838
1839 catalog
1840 .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1841 .await
1842 .unwrap();
1843
1844 config_mock.assert_async().await;
1845 drop_ns_mock.assert_async().await;
1846 }
1847
1848 #[tokio::test]
1849 async fn test_list_tables() {
1850 let mut server = Server::new_async().await;
1851
1852 let config_mock = create_config_mock(&mut server).await;
1853
1854 let list_tables_mock = server
1855 .mock("GET", "/v1/namespaces/ns1/tables")
1856 .with_status(200)
1857 .with_body(
1858 r#"{
1859 "identifiers": [
1860 {
1861 "namespace": ["ns1"],
1862 "name": "table1"
1863 },
1864 {
1865 "namespace": ["ns1"],
1866 "name": "table2"
1867 }
1868 ]
1869 }"#,
1870 )
1871 .create_async()
1872 .await;
1873
1874 let catalog = RestCatalog::new(
1875 RestCatalogConfig::builder().uri(server.url()).build(),
1876 Some(Arc::new(LocalFsStorageFactory)),
1877 );
1878
1879 let tables = catalog
1880 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1881 .await
1882 .unwrap();
1883
1884 let expected_tables = vec![
1885 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1886 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1887 ];
1888
1889 assert_eq!(tables, expected_tables);
1890
1891 config_mock.assert_async().await;
1892 list_tables_mock.assert_async().await;
1893 }
1894
1895 #[tokio::test]
1896 async fn test_list_tables_with_pagination() {
1897 let mut server = Server::new_async().await;
1898
1899 let config_mock = create_config_mock(&mut server).await;
1900
1901 let list_tables_mock_page1 = server
1902 .mock("GET", "/v1/namespaces/ns1/tables")
1903 .with_status(200)
1904 .with_body(
1905 r#"{
1906 "identifiers": [
1907 {
1908 "namespace": ["ns1"],
1909 "name": "table1"
1910 },
1911 {
1912 "namespace": ["ns1"],
1913 "name": "table2"
1914 }
1915 ],
1916 "next-page-token": "token456"
1917 }"#,
1918 )
1919 .create_async()
1920 .await;
1921
1922 let list_tables_mock_page2 = server
1923 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1924 .with_status(200)
1925 .with_body(
1926 r#"{
1927 "identifiers": [
1928 {
1929 "namespace": ["ns1"],
1930 "name": "table3"
1931 },
1932 {
1933 "namespace": ["ns1"],
1934 "name": "table4"
1935 }
1936 ]
1937 }"#,
1938 )
1939 .create_async()
1940 .await;
1941
1942 let catalog = RestCatalog::new(
1943 RestCatalogConfig::builder().uri(server.url()).build(),
1944 Some(Arc::new(LocalFsStorageFactory)),
1945 );
1946
1947 let tables = catalog
1948 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1949 .await
1950 .unwrap();
1951
1952 let expected_tables = vec![
1953 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1954 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1955 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1956 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1957 ];
1958
1959 assert_eq!(tables, expected_tables);
1960
1961 config_mock.assert_async().await;
1962 list_tables_mock_page1.assert_async().await;
1963 list_tables_mock_page2.assert_async().await;
1964 }
1965
1966 #[tokio::test]
1967 async fn test_list_tables_with_multiple_pages() {
1968 let mut server = Server::new_async().await;
1969
1970 let config_mock = create_config_mock(&mut server).await;
1971
1972 let list_tables_mock_page1 = server
1974 .mock("GET", "/v1/namespaces/ns1/tables")
1975 .with_status(200)
1976 .with_body(
1977 r#"{
1978 "identifiers": [
1979 {
1980 "namespace": ["ns1"],
1981 "name": "table1"
1982 },
1983 {
1984 "namespace": ["ns1"],
1985 "name": "table2"
1986 }
1987 ],
1988 "next-page-token": "page2"
1989 }"#,
1990 )
1991 .create_async()
1992 .await;
1993
1994 let list_tables_mock_page2 = server
1996 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1997 .with_status(200)
1998 .with_body(
1999 r#"{
2000 "identifiers": [
2001 {
2002 "namespace": ["ns1"],
2003 "name": "table3"
2004 },
2005 {
2006 "namespace": ["ns1"],
2007 "name": "table4"
2008 }
2009 ],
2010 "next-page-token": "page3"
2011 }"#,
2012 )
2013 .create_async()
2014 .await;
2015
2016 let list_tables_mock_page3 = server
2018 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
2019 .with_status(200)
2020 .with_body(
2021 r#"{
2022 "identifiers": [
2023 {
2024 "namespace": ["ns1"],
2025 "name": "table5"
2026 }
2027 ],
2028 "next-page-token": "page4"
2029 }"#,
2030 )
2031 .create_async()
2032 .await;
2033
2034 let list_tables_mock_page4 = server
2036 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
2037 .with_status(200)
2038 .with_body(
2039 r#"{
2040 "identifiers": [
2041 {
2042 "namespace": ["ns1"],
2043 "name": "table6"
2044 },
2045 {
2046 "namespace": ["ns1"],
2047 "name": "table7"
2048 }
2049 ],
2050 "next-page-token": "page5"
2051 }"#,
2052 )
2053 .create_async()
2054 .await;
2055
2056 let list_tables_mock_page5 = server
2058 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
2059 .with_status(200)
2060 .with_body(
2061 r#"{
2062 "identifiers": [
2063 {
2064 "namespace": ["ns1"],
2065 "name": "table8"
2066 }
2067 ]
2068 }"#,
2069 )
2070 .create_async()
2071 .await;
2072
2073 let catalog = RestCatalog::new(
2074 RestCatalogConfig::builder().uri(server.url()).build(),
2075 Some(Arc::new(LocalFsStorageFactory)),
2076 );
2077
2078 let tables = catalog
2079 .list_tables(&NamespaceIdent::new("ns1".to_string()))
2080 .await
2081 .unwrap();
2082
2083 let expected_tables = vec![
2084 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2085 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2086 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2087 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2088 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2089 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2090 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2091 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2092 ];
2093
2094 assert_eq!(tables, expected_tables);
2095
2096 config_mock.assert_async().await;
2098 list_tables_mock_page1.assert_async().await;
2099 list_tables_mock_page2.assert_async().await;
2100 list_tables_mock_page3.assert_async().await;
2101 list_tables_mock_page4.assert_async().await;
2102 list_tables_mock_page5.assert_async().await;
2103 }
2104
2105 #[tokio::test]
2106 async fn test_drop_tables() {
2107 let mut server = Server::new_async().await;
2108
2109 let config_mock = create_config_mock(&mut server).await;
2110
2111 let delete_table_mock = server
2112 .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2113 .with_status(204)
2114 .create_async()
2115 .await;
2116
2117 let catalog = RestCatalog::new(
2118 RestCatalogConfig::builder().uri(server.url()).build(),
2119 Some(Arc::new(LocalFsStorageFactory)),
2120 );
2121
2122 catalog
2123 .drop_table(&TableIdent::new(
2124 NamespaceIdent::new("ns1".to_string()),
2125 "table1".to_string(),
2126 ))
2127 .await
2128 .unwrap();
2129
2130 config_mock.assert_async().await;
2131 delete_table_mock.assert_async().await;
2132 }
2133
2134 #[tokio::test]
2135 async fn test_check_table_exists() {
2136 let mut server = Server::new_async().await;
2137
2138 let config_mock = create_config_mock(&mut server).await;
2139
2140 let check_table_exists_mock = server
2141 .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2142 .with_status(204)
2143 .create_async()
2144 .await;
2145
2146 let catalog = RestCatalog::new(
2147 RestCatalogConfig::builder().uri(server.url()).build(),
2148 Some(Arc::new(LocalFsStorageFactory)),
2149 );
2150
2151 assert!(
2152 catalog
2153 .table_exists(&TableIdent::new(
2154 NamespaceIdent::new("ns1".to_string()),
2155 "table1".to_string(),
2156 ))
2157 .await
2158 .unwrap()
2159 );
2160
2161 config_mock.assert_async().await;
2162 check_table_exists_mock.assert_async().await;
2163 }
2164
2165 #[tokio::test]
2166 async fn test_rename_table() {
2167 let mut server = Server::new_async().await;
2168
2169 let config_mock = create_config_mock(&mut server).await;
2170
2171 let rename_table_mock = server
2172 .mock("POST", "/v1/tables/rename")
2173 .with_status(204)
2174 .create_async()
2175 .await;
2176
2177 let catalog = RestCatalog::new(
2178 RestCatalogConfig::builder().uri(server.url()).build(),
2179 Some(Arc::new(LocalFsStorageFactory)),
2180 );
2181
2182 catalog
2183 .rename_table(
2184 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2185 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2186 )
2187 .await
2188 .unwrap();
2189
2190 config_mock.assert_async().await;
2191 rename_table_mock.assert_async().await;
2192 }
2193
2194 #[tokio::test]
2195 async fn test_load_table() {
2196 let mut server = Server::new_async().await;
2197
2198 let config_mock = create_config_mock(&mut server).await;
2199
2200 let rename_table_mock = server
2201 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2202 .with_status(200)
2203 .with_body_from_file(format!(
2204 "{}/testdata/{}",
2205 env!("CARGO_MANIFEST_DIR"),
2206 "load_table_response.json"
2207 ))
2208 .create_async()
2209 .await;
2210
2211 let catalog = RestCatalog::new(
2212 RestCatalogConfig::builder().uri(server.url()).build(),
2213 Some(Arc::new(LocalFsStorageFactory)),
2214 );
2215
2216 let table = catalog
2217 .load_table(&TableIdent::new(
2218 NamespaceIdent::new("ns1".to_string()),
2219 "test1".to_string(),
2220 ))
2221 .await
2222 .unwrap();
2223
2224 assert_eq!(
2225 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2226 table.identifier()
2227 );
2228 assert_eq!(
2229 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2230 table.metadata_location().unwrap()
2231 );
2232 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2233 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2234 assert_eq!(
2235 uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2236 table.metadata().uuid()
2237 );
2238 assert_eq!(
2239 Utc.timestamp_millis_opt(1646787054459).unwrap(),
2240 table.metadata().last_updated_timestamp().unwrap()
2241 );
2242 assert_eq!(
2243 vec![&Arc::new(
2244 Schema::builder()
2245 .with_fields(vec![
2246 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2247 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2248 .into(),
2249 ])
2250 .build()
2251 .unwrap()
2252 )],
2253 table.metadata().schemas_iter().collect::<Vec<_>>()
2254 );
2255 assert_eq!(
2256 &HashMap::from([
2257 ("owner".to_string(), "bryan".to_string()),
2258 (
2259 "write.metadata.compression-codec".to_string(),
2260 "gzip".to_string()
2261 )
2262 ]),
2263 table.metadata().properties()
2264 );
2265 assert_eq!(vec![&Arc::new(Snapshot::builder()
2266 .with_snapshot_id(3497810964824022504)
2267 .with_timestamp_ms(1646787054459)
2268 .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2269 .with_sequence_number(0)
2270 .with_schema_id(0)
2271 .with_summary(Summary {
2272 operation: Operation::Append,
2273 additional_properties: HashMap::from_iter([
2274 ("spark.app.id", "local-1646787004168"),
2275 ("added-data-files", "1"),
2276 ("added-records", "1"),
2277 ("added-files-size", "697"),
2278 ("changed-partition-count", "1"),
2279 ("total-records", "1"),
2280 ("total-files-size", "697"),
2281 ("total-data-files", "1"),
2282 ("total-delete-files", "0"),
2283 ("total-position-deletes", "0"),
2284 ("total-equality-deletes", "0")
2285 ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2286 }).build()
2287 )], table.metadata().snapshots().collect::<Vec<_>>());
2288 assert_eq!(
2289 &[SnapshotLog {
2290 timestamp_ms: 1646787054459,
2291 snapshot_id: 3497810964824022504,
2292 }],
2293 table.metadata().history()
2294 );
2295 assert_eq!(
2296 vec![&Arc::new(SortOrder {
2297 order_id: 0,
2298 fields: vec![],
2299 })],
2300 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2301 );
2302
2303 config_mock.assert_async().await;
2304 rename_table_mock.assert_async().await;
2305 }
2306
2307 #[tokio::test]
2308 async fn test_load_table_404() {
2309 let mut server = Server::new_async().await;
2310
2311 let config_mock = create_config_mock(&mut server).await;
2312
2313 let rename_table_mock = server
2314 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2315 .with_status(404)
2316 .with_body(r#"
2317{
2318 "error": {
2319 "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2320 "type": "NoSuchNamespaceErrorException",
2321 "code": 404
2322 }
2323}
2324 "#)
2325 .create_async()
2326 .await;
2327
2328 let catalog = RestCatalog::new(
2329 RestCatalogConfig::builder().uri(server.url()).build(),
2330 Some(Arc::new(LocalFsStorageFactory)),
2331 );
2332
2333 let table = catalog
2334 .load_table(&TableIdent::new(
2335 NamespaceIdent::new("ns1".to_string()),
2336 "test1".to_string(),
2337 ))
2338 .await;
2339
2340 assert!(table.is_err());
2341 assert!(table.err().unwrap().message().contains("does not exist"));
2342
2343 config_mock.assert_async().await;
2344 rename_table_mock.assert_async().await;
2345 }
2346
2347 #[tokio::test]
2348 async fn test_create_table() {
2349 let mut server = Server::new_async().await;
2350
2351 let config_mock = create_config_mock(&mut server).await;
2352
2353 let create_table_mock = server
2354 .mock("POST", "/v1/namespaces/ns1/tables")
2355 .with_status(200)
2356 .with_body_from_file(format!(
2357 "{}/testdata/{}",
2358 env!("CARGO_MANIFEST_DIR"),
2359 "create_table_response.json"
2360 ))
2361 .create_async()
2362 .await;
2363
2364 let catalog = RestCatalog::new(
2365 RestCatalogConfig::builder().uri(server.url()).build(),
2366 Some(Arc::new(LocalFsStorageFactory)),
2367 );
2368
2369 let table_creation = TableCreation::builder()
2370 .name("test1".to_string())
2371 .schema(
2372 Schema::builder()
2373 .with_fields(vec![
2374 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2375 .into(),
2376 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2377 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2378 .into(),
2379 ])
2380 .with_schema_id(1)
2381 .with_identifier_field_ids(vec![2])
2382 .build()
2383 .unwrap(),
2384 )
2385 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2386 .partition_spec(
2387 UnboundPartitionSpec::builder()
2388 .add_partition_fields(vec![
2389 UnboundPartitionField::builder()
2390 .source_id(1)
2391 .transform(Transform::Truncate(3))
2392 .name("id".to_string())
2393 .build(),
2394 ])
2395 .unwrap()
2396 .build(),
2397 )
2398 .sort_order(
2399 SortOrder::builder()
2400 .with_sort_field(
2401 SortField::builder()
2402 .source_id(2)
2403 .transform(Transform::Identity)
2404 .direction(SortDirection::Ascending)
2405 .null_order(NullOrder::First)
2406 .build(),
2407 )
2408 .build_unbound()
2409 .unwrap(),
2410 )
2411 .build();
2412
2413 let table = catalog
2414 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2415 .await
2416 .unwrap();
2417
2418 assert_eq!(
2419 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2420 table.identifier()
2421 );
2422 assert_eq!(
2423 "s3://warehouse/database/table/metadata.json",
2424 table.metadata_location().unwrap()
2425 );
2426 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2427 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2428 assert_eq!(
2429 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2430 table.metadata().uuid()
2431 );
2432 assert_eq!(
2433 1657810967051,
2434 table
2435 .metadata()
2436 .last_updated_timestamp()
2437 .unwrap()
2438 .timestamp_millis()
2439 );
2440 assert_eq!(
2441 vec![&Arc::new(
2442 Schema::builder()
2443 .with_fields(vec![
2444 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2445 .into(),
2446 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2447 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2448 .into(),
2449 ])
2450 .with_schema_id(0)
2451 .with_identifier_field_ids(vec![2])
2452 .build()
2453 .unwrap()
2454 )],
2455 table.metadata().schemas_iter().collect::<Vec<_>>()
2456 );
2457 assert_eq!(
2458 &HashMap::from([
2459 (
2460 "write.delete.parquet.compression-codec".to_string(),
2461 "zstd".to_string()
2462 ),
2463 (
2464 "write.metadata.compression-codec".to_string(),
2465 "gzip".to_string()
2466 ),
2467 (
2468 "write.summary.partition-limit".to_string(),
2469 "100".to_string()
2470 ),
2471 (
2472 "write.parquet.compression-codec".to_string(),
2473 "zstd".to_string()
2474 ),
2475 ]),
2476 table.metadata().properties()
2477 );
2478 assert!(table.metadata().current_snapshot().is_none());
2479 assert!(table.metadata().history().is_empty());
2480 assert_eq!(
2481 vec![&Arc::new(SortOrder {
2482 order_id: 0,
2483 fields: vec![],
2484 })],
2485 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2486 );
2487
2488 config_mock.assert_async().await;
2489 create_table_mock.assert_async().await;
2490 }
2491
2492 #[tokio::test]
2493 async fn test_create_table_409() {
2494 let mut server = Server::new_async().await;
2495
2496 let config_mock = create_config_mock(&mut server).await;
2497
2498 let create_table_mock = server
2499 .mock("POST", "/v1/namespaces/ns1/tables")
2500 .with_status(409)
2501 .with_body(r#"
2502{
2503 "error": {
2504 "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2505 "type": "AlreadyExistsException",
2506 "code": 409
2507 }
2508}
2509 "#)
2510 .create_async()
2511 .await;
2512
2513 let catalog = RestCatalog::new(
2514 RestCatalogConfig::builder().uri(server.url()).build(),
2515 Some(Arc::new(LocalFsStorageFactory)),
2516 );
2517
2518 let table_creation = TableCreation::builder()
2519 .name("test1".to_string())
2520 .schema(
2521 Schema::builder()
2522 .with_fields(vec![
2523 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2524 .into(),
2525 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2526 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2527 .into(),
2528 ])
2529 .with_schema_id(1)
2530 .with_identifier_field_ids(vec![2])
2531 .build()
2532 .unwrap(),
2533 )
2534 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2535 .build();
2536
2537 let table_result = catalog
2538 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2539 .await;
2540
2541 assert!(table_result.is_err());
2542 assert!(
2543 table_result
2544 .err()
2545 .unwrap()
2546 .message()
2547 .contains("already exists")
2548 );
2549
2550 config_mock.assert_async().await;
2551 create_table_mock.assert_async().await;
2552 }
2553
2554 #[tokio::test]
2555 async fn test_update_table() {
2556 let mut server = Server::new_async().await;
2557
2558 let config_mock = create_config_mock(&mut server).await;
2559
2560 let load_table_mock = server
2561 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2562 .with_status(200)
2563 .with_body_from_file(format!(
2564 "{}/testdata/{}",
2565 env!("CARGO_MANIFEST_DIR"),
2566 "load_table_response.json"
2567 ))
2568 .create_async()
2569 .await;
2570
2571 let update_table_mock = server
2572 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2573 .with_status(200)
2574 .with_body_from_file(format!(
2575 "{}/testdata/{}",
2576 env!("CARGO_MANIFEST_DIR"),
2577 "update_table_response.json"
2578 ))
2579 .create_async()
2580 .await;
2581
2582 let catalog = RestCatalog::new(
2583 RestCatalogConfig::builder().uri(server.url()).build(),
2584 Some(Arc::new(LocalFsStorageFactory)),
2585 );
2586
2587 let table1 = {
2588 let file = File::open(format!(
2589 "{}/testdata/{}",
2590 env!("CARGO_MANIFEST_DIR"),
2591 "create_table_response.json"
2592 ))
2593 .unwrap();
2594 let reader = BufReader::new(file);
2595 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2596
2597 Table::builder()
2598 .metadata(resp.metadata)
2599 .metadata_location(resp.metadata_location.unwrap())
2600 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2601 .file_io(FileIO::new_with_fs())
2602 .build()
2603 .unwrap()
2604 };
2605
2606 let tx = Transaction::new(&table1);
2607 let table = tx
2608 .upgrade_table_version()
2609 .set_format_version(FormatVersion::V2)
2610 .apply(tx)
2611 .unwrap()
2612 .commit(&catalog)
2613 .await
2614 .unwrap();
2615
2616 assert_eq!(
2617 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2618 table.identifier()
2619 );
2620 assert_eq!(
2621 "s3://warehouse/database/table/metadata.json",
2622 table.metadata_location().unwrap()
2623 );
2624 assert_eq!(FormatVersion::V2, table.metadata().format_version());
2625 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2626 assert_eq!(
2627 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2628 table.metadata().uuid()
2629 );
2630 assert_eq!(
2631 1657810967051,
2632 table
2633 .metadata()
2634 .last_updated_timestamp()
2635 .unwrap()
2636 .timestamp_millis()
2637 );
2638 assert_eq!(
2639 vec![&Arc::new(
2640 Schema::builder()
2641 .with_fields(vec![
2642 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2643 .into(),
2644 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2645 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2646 .into(),
2647 ])
2648 .with_schema_id(0)
2649 .with_identifier_field_ids(vec![2])
2650 .build()
2651 .unwrap()
2652 )],
2653 table.metadata().schemas_iter().collect::<Vec<_>>()
2654 );
2655 assert_eq!(
2656 &HashMap::from([
2657 (
2658 "write.delete.parquet.compression-codec".to_string(),
2659 "zstd".to_string()
2660 ),
2661 (
2662 "write.metadata.compression-codec".to_string(),
2663 "gzip".to_string()
2664 ),
2665 (
2666 "write.summary.partition-limit".to_string(),
2667 "100".to_string()
2668 ),
2669 (
2670 "write.parquet.compression-codec".to_string(),
2671 "zstd".to_string()
2672 ),
2673 ]),
2674 table.metadata().properties()
2675 );
2676 assert!(table.metadata().current_snapshot().is_none());
2677 assert!(table.metadata().history().is_empty());
2678 assert_eq!(
2679 vec![&Arc::new(SortOrder {
2680 order_id: 0,
2681 fields: vec![],
2682 })],
2683 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2684 );
2685
2686 config_mock.assert_async().await;
2687 update_table_mock.assert_async().await;
2688 load_table_mock.assert_async().await
2689 }
2690
2691 #[tokio::test]
2692 async fn test_update_table_404() {
2693 let mut server = Server::new_async().await;
2694
2695 let config_mock = create_config_mock(&mut server).await;
2696
2697 let load_table_mock = server
2698 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2699 .with_status(200)
2700 .with_body_from_file(format!(
2701 "{}/testdata/{}",
2702 env!("CARGO_MANIFEST_DIR"),
2703 "load_table_response.json"
2704 ))
2705 .create_async()
2706 .await;
2707
2708 let update_table_mock = server
2709 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2710 .with_status(404)
2711 .with_body(
2712 r#"
2713{
2714 "error": {
2715 "message": "The given table does not exist",
2716 "type": "NoSuchTableException",
2717 "code": 404
2718 }
2719}
2720 "#,
2721 )
2722 .create_async()
2723 .await;
2724
2725 let catalog = RestCatalog::new(
2726 RestCatalogConfig::builder().uri(server.url()).build(),
2727 Some(Arc::new(LocalFsStorageFactory)),
2728 );
2729
2730 let table1 = {
2731 let file = File::open(format!(
2732 "{}/testdata/{}",
2733 env!("CARGO_MANIFEST_DIR"),
2734 "create_table_response.json"
2735 ))
2736 .unwrap();
2737 let reader = BufReader::new(file);
2738 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2739
2740 Table::builder()
2741 .metadata(resp.metadata)
2742 .metadata_location(resp.metadata_location.unwrap())
2743 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2744 .file_io(FileIO::new_with_fs())
2745 .build()
2746 .unwrap()
2747 };
2748
2749 let tx = Transaction::new(&table1);
2750 let table_result = tx
2751 .upgrade_table_version()
2752 .set_format_version(FormatVersion::V2)
2753 .apply(tx)
2754 .unwrap()
2755 .commit(&catalog)
2756 .await;
2757
2758 assert!(table_result.is_err());
2759 assert!(
2760 table_result
2761 .err()
2762 .unwrap()
2763 .message()
2764 .contains("does not exist")
2765 );
2766
2767 config_mock.assert_async().await;
2768 update_table_mock.assert_async().await;
2769 load_table_mock.assert_async().await;
2770 }
2771
2772 #[tokio::test]
2773 async fn test_register_table() {
2774 let mut server = Server::new_async().await;
2775
2776 let config_mock = create_config_mock(&mut server).await;
2777
2778 let register_table_mock = server
2779 .mock("POST", "/v1/namespaces/ns1/register")
2780 .with_status(200)
2781 .with_body_from_file(format!(
2782 "{}/testdata/{}",
2783 env!("CARGO_MANIFEST_DIR"),
2784 "load_table_response.json"
2785 ))
2786 .create_async()
2787 .await;
2788
2789 let catalog = RestCatalog::new(
2790 RestCatalogConfig::builder().uri(server.url()).build(),
2791 Some(Arc::new(LocalFsStorageFactory)),
2792 );
2793 let table_ident =
2794 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2795 let metadata_location = String::from(
2796 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2797 );
2798
2799 let table = catalog
2800 .register_table(&table_ident, metadata_location)
2801 .await
2802 .unwrap();
2803
2804 assert_eq!(
2805 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2806 table.identifier()
2807 );
2808 assert_eq!(
2809 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2810 table.metadata_location().unwrap()
2811 );
2812
2813 config_mock.assert_async().await;
2814 register_table_mock.assert_async().await;
2815 }
2816
2817 #[tokio::test]
2818 async fn test_register_table_404() {
2819 let mut server = Server::new_async().await;
2820
2821 let config_mock = create_config_mock(&mut server).await;
2822
2823 let register_table_mock = server
2824 .mock("POST", "/v1/namespaces/ns1/register")
2825 .with_status(404)
2826 .with_body(
2827 r#"
2828{
2829 "error": {
2830 "message": "The namespace specified does not exist",
2831 "type": "NoSuchNamespaceErrorException",
2832 "code": 404
2833 }
2834}
2835 "#,
2836 )
2837 .create_async()
2838 .await;
2839
2840 let catalog = RestCatalog::new(
2841 RestCatalogConfig::builder().uri(server.url()).build(),
2842 Some(Arc::new(LocalFsStorageFactory)),
2843 );
2844
2845 let table_ident =
2846 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2847 let metadata_location = String::from(
2848 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2849 );
2850 let table = catalog
2851 .register_table(&table_ident, metadata_location)
2852 .await;
2853
2854 assert!(table.is_err());
2855 assert!(table.err().unwrap().message().contains("does not exist"));
2856
2857 config_mock.assert_async().await;
2858 register_table_mock.assert_async().await;
2859 }
2860
2861 #[tokio::test]
2862 async fn test_create_rest_catalog() {
2863 let builder = RestCatalogBuilder::default().with_client(Client::new());
2864
2865 let catalog = builder
2866 .load(
2867 "test",
2868 HashMap::from([
2869 (
2870 REST_CATALOG_PROP_URI.to_string(),
2871 "http://localhost:8080".to_string(),
2872 ),
2873 ("a".to_string(), "b".to_string()),
2874 ]),
2875 )
2876 .await;
2877
2878 assert!(catalog.is_ok());
2879
2880 let catalog_config = catalog.unwrap().user_config;
2881 assert_eq!(catalog_config.name.as_deref(), Some("test"));
2882 assert_eq!(catalog_config.uri, "http://localhost:8080");
2883 assert_eq!(catalog_config.warehouse, None);
2884 assert!(catalog_config.client.is_some());
2885
2886 assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2887 assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2888 }
2889
2890 #[tokio::test]
2891 async fn test_create_rest_catalog_no_uri() {
2892 let builder = RestCatalogBuilder::default();
2893
2894 let catalog = builder
2895 .load(
2896 "test",
2897 HashMap::from([(
2898 REST_CATALOG_PROP_WAREHOUSE.to_string(),
2899 "s3://warehouse".to_string(),
2900 )]),
2901 )
2902 .await;
2903
2904 assert!(catalog.is_err());
2905 if let Err(err) = catalog {
2906 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2907 assert_eq!(err.message(), "Catalog uri is required");
2908 }
2909 }
2910}