1use std::any::Any;
21use std::collections::HashMap;
22use std::future::Future;
23use std::str::FromStr;
24
25use async_trait::async_trait;
26use iceberg::io::{self, FileIO};
27use iceberg::table::Table;
28use iceberg::{
29 Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30 TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34 HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41 HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44 CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
45 ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde,
46 RegisterTableRequest, RenameTableRequest,
47};
48
49pub const REST_CATALOG_PROP_URI: &str = "uri";
51pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
55const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
56const PATH_V1: &str = "v1";
57
58#[derive(Debug)]
60pub struct RestCatalogBuilder(RestCatalogConfig);
61
62impl Default for RestCatalogBuilder {
63 fn default() -> Self {
64 Self(RestCatalogConfig {
65 name: None,
66 uri: "".to_string(),
67 warehouse: None,
68 props: HashMap::new(),
69 client: None,
70 })
71 }
72}
73
74impl CatalogBuilder for RestCatalogBuilder {
75 type C = RestCatalog;
76
77 fn load(
78 mut self,
79 name: impl Into<String>,
80 props: HashMap<String, String>,
81 ) -> impl Future<Output = Result<Self::C>> + Send {
82 self.0.name = Some(name.into());
83
84 if props.contains_key(REST_CATALOG_PROP_URI) {
85 self.0.uri = props
86 .get(REST_CATALOG_PROP_URI)
87 .cloned()
88 .unwrap_or_default();
89 }
90
91 if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
92 self.0.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
93 }
94
95 self.0.props = props
97 .into_iter()
98 .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
99 .collect();
100
101 let result = {
102 if self.0.name.is_none() {
103 Err(Error::new(
104 ErrorKind::DataInvalid,
105 "Catalog name is required",
106 ))
107 } else if self.0.uri.is_empty() {
108 Err(Error::new(
109 ErrorKind::DataInvalid,
110 "Catalog uri is required",
111 ))
112 } else {
113 Ok(RestCatalog::new(self.0))
114 }
115 };
116
117 std::future::ready(result)
118 }
119}
120
121impl RestCatalogBuilder {
122 pub fn with_client(mut self, client: Client) -> Self {
124 self.0.client = Some(client);
125 self
126 }
127}
128
129#[derive(Clone, Debug, TypedBuilder)]
131pub(crate) struct RestCatalogConfig {
132 #[builder(default, setter(strip_option))]
133 name: Option<String>,
134
135 uri: String,
136
137 #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
138 warehouse: Option<String>,
139
140 #[builder(default)]
141 props: HashMap<String, String>,
142
143 #[builder(default)]
144 client: Option<Client>,
145}
146
147impl RestCatalogConfig {
148 fn url_prefixed(&self, parts: &[&str]) -> String {
149 [&self.uri, PATH_V1]
150 .into_iter()
151 .chain(self.props.get("prefix").map(|s| &**s))
152 .chain(parts.iter().cloned())
153 .join("/")
154 }
155
156 fn config_endpoint(&self) -> String {
157 [&self.uri, PATH_V1, "config"].join("/")
158 }
159
160 pub(crate) fn get_token_endpoint(&self) -> String {
161 if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
162 oauth2_uri.to_string()
163 } else {
164 [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
165 }
166 }
167
168 fn namespaces_endpoint(&self) -> String {
169 self.url_prefixed(&["namespaces"])
170 }
171
172 fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
173 self.url_prefixed(&["namespaces", &ns.to_url_string()])
174 }
175
176 fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
177 self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
178 }
179
180 fn rename_table_endpoint(&self) -> String {
181 self.url_prefixed(&["tables", "rename"])
182 }
183
184 fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
185 self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
186 }
187
188 fn table_endpoint(&self, table: &TableIdent) -> String {
189 self.url_prefixed(&[
190 "namespaces",
191 &table.namespace.to_url_string(),
192 "tables",
193 &table.name,
194 ])
195 }
196
197 pub(crate) fn client(&self) -> Option<Client> {
199 self.client.clone()
200 }
201
202 pub(crate) fn token(&self) -> Option<String> {
206 self.props.get("token").cloned()
207 }
208
209 pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
218 let cred = self.props.get("credential")?;
219
220 match cred.split_once(':') {
221 Some((client_id, client_secret)) => {
222 Some((Some(client_id.to_string()), client_secret.to_string()))
223 }
224 None => Some((None, cred.to_string())),
225 }
226 }
227
228 pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
235 let mut headers = HeaderMap::from_iter([
236 (
237 header::CONTENT_TYPE,
238 HeaderValue::from_static("application/json"),
239 ),
240 (
241 HeaderName::from_static("x-client-version"),
242 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
243 ),
244 (
245 header::USER_AGENT,
246 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
247 ),
248 ]);
249
250 for (key, value) in self
251 .props
252 .iter()
253 .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
254 {
255 headers.insert(
256 HeaderName::from_str(key).map_err(|e| {
257 Error::new(
258 ErrorKind::DataInvalid,
259 format!("Invalid header name: {key}"),
260 )
261 .with_source(e)
262 })?,
263 HeaderValue::from_str(value).map_err(|e| {
264 Error::new(
265 ErrorKind::DataInvalid,
266 format!("Invalid header value: {value}"),
267 )
268 .with_source(e)
269 })?,
270 );
271 }
272
273 Ok(headers)
274 }
275
276 pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
278 let mut params = HashMap::new();
279
280 if let Some(scope) = self.props.get("scope") {
281 params.insert("scope".to_string(), scope.to_string());
282 } else {
283 params.insert("scope".to_string(), "catalog".to_string());
284 }
285
286 let optional_params = ["audience", "resource"];
287 for param_name in optional_params {
288 if let Some(value) = self.props.get(param_name) {
289 params.insert(param_name.to_string(), value.to_string());
290 }
291 }
292
293 params
294 }
295
296 pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
298 if let Some(uri) = config.overrides.remove("uri") {
299 self.uri = uri;
300 }
301
302 let mut props = config.defaults;
303 props.extend(self.props);
304 props.extend(config.overrides);
305
306 self.props = props;
307 self
308 }
309}
310
311#[derive(Debug)]
312struct RestContext {
313 client: HttpClient,
314 config: RestCatalogConfig,
318}
319
320#[derive(Debug)]
322pub struct RestCatalog {
323 user_config: RestCatalogConfig,
327 ctx: OnceCell<RestContext>,
328 file_io_extensions: io::Extensions,
330}
331
332impl RestCatalog {
333 fn new(config: RestCatalogConfig) -> Self {
335 Self {
336 user_config: config,
337 ctx: OnceCell::new(),
338 file_io_extensions: io::Extensions::default(),
339 }
340 }
341
342 pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
344 self.file_io_extensions.add(ext);
345 self
346 }
347
348 async fn context(&self) -> Result<&RestContext> {
350 self.ctx
351 .get_or_try_init(|| async {
352 let client = HttpClient::new(&self.user_config)?;
353 let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
354 let config = self.user_config.clone().merge_with_config(catalog_config);
355 let client = client.update_with(&config)?;
356
357 Ok(RestContext { config, client })
358 })
359 .await
360 }
361
362 async fn load_config(
366 client: &HttpClient,
367 user_config: &RestCatalogConfig,
368 ) -> Result<CatalogConfig> {
369 let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
370
371 if let Some(warehouse_location) = &user_config.warehouse {
372 request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
373 }
374
375 let request = request_builder.build()?;
376
377 let http_response = client.query_catalog(request).await?;
378
379 match http_response.status() {
380 StatusCode::OK => deserialize_catalog_response(http_response).await,
381 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
382 }
383 }
384
385 async fn load_file_io(
386 &self,
387 metadata_location: Option<&str>,
388 extra_config: Option<HashMap<String, String>>,
389 ) -> Result<FileIO> {
390 let mut props = self.context().await?.config.props.clone();
391 if let Some(config) = extra_config {
392 props.extend(config);
393 }
394
395 let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
398 Some(url) if Url::parse(url).is_ok() => Some(url),
399 Some(_) => None,
400 None => None,
401 };
402
403 let file_io = match metadata_location.or(warehouse_path) {
404 Some(url) => FileIO::from_path(url)?
405 .with_props(props)
406 .with_extensions(self.file_io_extensions.clone())
407 .build()?,
408 None => {
409 return Err(Error::new(
410 ErrorKind::Unexpected,
411 "Unable to load file io, neither warehouse nor metadata location is set!",
412 ))?;
413 }
414 };
415
416 Ok(file_io)
417 }
418
419 pub async fn invalidate_token(&self) -> Result<()> {
422 self.context().await?.client.invalidate_token().await
423 }
424
425 pub async fn regenerate_token(&self) -> Result<()> {
432 self.context().await?.client.regenerate_token().await
433 }
434}
435
436#[async_trait]
439impl Catalog for RestCatalog {
440 async fn list_namespaces(
441 &self,
442 parent: Option<&NamespaceIdent>,
443 ) -> Result<Vec<NamespaceIdent>> {
444 let context = self.context().await?;
445 let endpoint = context.config.namespaces_endpoint();
446 let mut namespaces = Vec::new();
447 let mut next_token = None;
448
449 loop {
450 let mut request = context.client.request(Method::GET, endpoint.clone());
451
452 if let Some(ns) = parent {
454 request = request.query(&[("parent", ns.to_url_string())]);
455 }
456
457 if let Some(token) = next_token {
458 request = request.query(&[("pageToken", token)]);
459 }
460
461 let http_response = context.client.query_catalog(request.build()?).await?;
462
463 match http_response.status() {
464 StatusCode::OK => {
465 let response =
466 deserialize_catalog_response::<ListNamespaceResponse>(http_response)
467 .await?;
468
469 let ns_identifiers = response
470 .namespaces
471 .into_iter()
472 .map(NamespaceIdent::from_vec)
473 .collect::<Result<Vec<NamespaceIdent>>>()?;
474
475 namespaces.extend(ns_identifiers);
476
477 match response.next_page_token {
478 Some(token) => next_token = Some(token),
479 None => break,
480 }
481 }
482 StatusCode::NOT_FOUND => {
483 return Err(Error::new(
484 ErrorKind::Unexpected,
485 "The parent parameter of the namespace provided does not exist",
486 ));
487 }
488 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
489 }
490 }
491
492 Ok(namespaces)
493 }
494
495 async fn create_namespace(
496 &self,
497 namespace: &NamespaceIdent,
498 properties: HashMap<String, String>,
499 ) -> Result<Namespace> {
500 let context = self.context().await?;
501
502 let request = context
503 .client
504 .request(Method::POST, context.config.namespaces_endpoint())
505 .json(&NamespaceSerde {
506 namespace: namespace.as_ref().clone(),
507 properties: Some(properties),
508 })
509 .build()?;
510
511 let http_response = context.client.query_catalog(request).await?;
512
513 match http_response.status() {
514 StatusCode::OK => {
515 let response =
516 deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
517 Namespace::try_from(response)
518 }
519 StatusCode::CONFLICT => Err(Error::new(
520 ErrorKind::Unexpected,
521 "Tried to create a namespace that already exists",
522 )),
523 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
524 }
525 }
526
527 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
528 let context = self.context().await?;
529
530 let request = context
531 .client
532 .request(Method::GET, context.config.namespace_endpoint(namespace))
533 .build()?;
534
535 let http_response = context.client.query_catalog(request).await?;
536
537 match http_response.status() {
538 StatusCode::OK => {
539 let response =
540 deserialize_catalog_response::<NamespaceSerde>(http_response).await?;
541 Namespace::try_from(response)
542 }
543 StatusCode::NOT_FOUND => Err(Error::new(
544 ErrorKind::Unexpected,
545 "Tried to get a namespace that does not exist",
546 )),
547 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
548 }
549 }
550
551 async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
552 let context = self.context().await?;
553
554 let request = context
555 .client
556 .request(Method::HEAD, context.config.namespace_endpoint(ns))
557 .build()?;
558
559 let http_response = context.client.query_catalog(request).await?;
560
561 match http_response.status() {
562 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
563 StatusCode::NOT_FOUND => Ok(false),
564 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
565 }
566 }
567
568 async fn update_namespace(
569 &self,
570 _namespace: &NamespaceIdent,
571 _properties: HashMap<String, String>,
572 ) -> Result<()> {
573 Err(Error::new(
574 ErrorKind::FeatureUnsupported,
575 "Updating namespace not supported yet!",
576 ))
577 }
578
579 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
580 let context = self.context().await?;
581
582 let request = context
583 .client
584 .request(Method::DELETE, context.config.namespace_endpoint(namespace))
585 .build()?;
586
587 let http_response = context.client.query_catalog(request).await?;
588
589 match http_response.status() {
590 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
591 StatusCode::NOT_FOUND => Err(Error::new(
592 ErrorKind::Unexpected,
593 "Tried to drop a namespace that does not exist",
594 )),
595 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
596 }
597 }
598
599 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
600 let context = self.context().await?;
601 let endpoint = context.config.tables_endpoint(namespace);
602 let mut identifiers = Vec::new();
603 let mut next_token = None;
604
605 loop {
606 let mut request = context.client.request(Method::GET, endpoint.clone());
607
608 if let Some(token) = next_token {
609 request = request.query(&[("pageToken", token)]);
610 }
611
612 let http_response = context.client.query_catalog(request.build()?).await?;
613
614 match http_response.status() {
615 StatusCode::OK => {
616 let response =
617 deserialize_catalog_response::<ListTableResponse>(http_response).await?;
618
619 identifiers.extend(response.identifiers);
620
621 match response.next_page_token {
622 Some(token) => next_token = Some(token),
623 None => break,
624 }
625 }
626 StatusCode::NOT_FOUND => {
627 return Err(Error::new(
628 ErrorKind::Unexpected,
629 "Tried to list tables of a namespace that does not exist",
630 ));
631 }
632 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
633 }
634 }
635
636 Ok(identifiers)
637 }
638
639 async fn create_table(
646 &self,
647 namespace: &NamespaceIdent,
648 creation: TableCreation,
649 ) -> Result<Table> {
650 let context = self.context().await?;
651
652 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
653
654 let request = context
655 .client
656 .request(Method::POST, context.config.tables_endpoint(namespace))
657 .json(&CreateTableRequest {
658 name: creation.name,
659 location: creation.location,
660 schema: creation.schema,
661 partition_spec: creation.partition_spec,
662 write_order: creation.sort_order,
663 stage_create: Some(false),
664 properties: if creation.properties.is_empty() {
665 None
666 } else {
667 Some(creation.properties)
668 },
669 })
670 .build()?;
671
672 let http_response = context.client.query_catalog(request).await?;
673
674 let response = match http_response.status() {
675 StatusCode::OK => {
676 deserialize_catalog_response::<LoadTableResponse>(http_response).await?
677 }
678 StatusCode::NOT_FOUND => {
679 return Err(Error::new(
680 ErrorKind::Unexpected,
681 "Tried to create a table under a namespace that does not exist",
682 ));
683 }
684 StatusCode::CONFLICT => {
685 return Err(Error::new(
686 ErrorKind::Unexpected,
687 "The table already exists",
688 ));
689 }
690 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
691 };
692
693 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
694 ErrorKind::DataInvalid,
695 "Metadata location missing in `create_table` response!",
696 ))?;
697
698 let config = response
699 .config
700 .unwrap_or_default()
701 .into_iter()
702 .chain(self.user_config.props.clone())
703 .collect();
704
705 let file_io = self
706 .load_file_io(Some(metadata_location), Some(config))
707 .await?;
708
709 let table_builder = Table::builder()
710 .identifier(table_ident.clone())
711 .file_io(file_io)
712 .metadata(response.metadata);
713
714 if let Some(metadata_location) = response.metadata_location {
715 table_builder.metadata_location(metadata_location).build()
716 } else {
717 table_builder.build()
718 }
719 }
720
721 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
727 let context = self.context().await?;
728
729 let request = context
730 .client
731 .request(Method::GET, context.config.table_endpoint(table_ident))
732 .build()?;
733
734 let http_response = context.client.query_catalog(request).await?;
735
736 let response = match http_response.status() {
737 StatusCode::OK | StatusCode::NOT_MODIFIED => {
738 deserialize_catalog_response::<LoadTableResponse>(http_response).await?
739 }
740 StatusCode::NOT_FOUND => {
741 return Err(Error::new(
742 ErrorKind::Unexpected,
743 "Tried to load a table that does not exist",
744 ));
745 }
746 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
747 };
748
749 let config = response
750 .config
751 .unwrap_or_default()
752 .into_iter()
753 .chain(self.user_config.props.clone())
754 .collect();
755
756 let file_io = self
757 .load_file_io(response.metadata_location.as_deref(), Some(config))
758 .await?;
759
760 let table_builder = Table::builder()
761 .identifier(table_ident.clone())
762 .file_io(file_io)
763 .metadata(response.metadata);
764
765 if let Some(metadata_location) = response.metadata_location {
766 table_builder.metadata_location(metadata_location).build()
767 } else {
768 table_builder.build()
769 }
770 }
771
772 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
774 let context = self.context().await?;
775
776 let request = context
777 .client
778 .request(Method::DELETE, context.config.table_endpoint(table))
779 .build()?;
780
781 let http_response = context.client.query_catalog(request).await?;
782
783 match http_response.status() {
784 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
785 StatusCode::NOT_FOUND => Err(Error::new(
786 ErrorKind::Unexpected,
787 "Tried to drop a table that does not exist",
788 )),
789 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
790 }
791 }
792
793 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
795 let context = self.context().await?;
796
797 let request = context
798 .client
799 .request(Method::HEAD, context.config.table_endpoint(table))
800 .build()?;
801
802 let http_response = context.client.query_catalog(request).await?;
803
804 match http_response.status() {
805 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
806 StatusCode::NOT_FOUND => Ok(false),
807 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
808 }
809 }
810
811 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
813 let context = self.context().await?;
814
815 let request = context
816 .client
817 .request(Method::POST, context.config.rename_table_endpoint())
818 .json(&RenameTableRequest {
819 source: src.clone(),
820 destination: dest.clone(),
821 })
822 .build()?;
823
824 let http_response = context.client.query_catalog(request).await?;
825
826 match http_response.status() {
827 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
828 StatusCode::NOT_FOUND => Err(Error::new(
829 ErrorKind::Unexpected,
830 "Tried to rename a table that does not exist (is the namespace correct?)",
831 )),
832 StatusCode::CONFLICT => Err(Error::new(
833 ErrorKind::Unexpected,
834 "Tried to rename a table to a name that already exists",
835 )),
836 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
837 }
838 }
839
840 async fn register_table(
841 &self,
842 table_ident: &TableIdent,
843 metadata_location: String,
844 ) -> Result<Table> {
845 let context = self.context().await?;
846
847 let request = context
848 .client
849 .request(
850 Method::POST,
851 context
852 .config
853 .register_table_endpoint(table_ident.namespace()),
854 )
855 .json(&RegisterTableRequest {
856 name: table_ident.name.clone(),
857 metadata_location: metadata_location.clone(),
858 overwrite: Some(false),
859 })
860 .build()?;
861
862 let http_response = context.client.query_catalog(request).await?;
863
864 let response: LoadTableResponse = match http_response.status() {
865 StatusCode::OK => {
866 deserialize_catalog_response::<LoadTableResponse>(http_response).await?
867 }
868 StatusCode::NOT_FOUND => {
869 return Err(Error::new(
870 ErrorKind::NamespaceNotFound,
871 "The namespace specified does not exist.",
872 ));
873 }
874 StatusCode::CONFLICT => {
875 return Err(Error::new(
876 ErrorKind::TableAlreadyExists,
877 "The given table already exists.",
878 ));
879 }
880 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
881 };
882
883 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
884 ErrorKind::DataInvalid,
885 "Metadata location missing in `register_table` response!",
886 ))?;
887
888 let file_io = self.load_file_io(Some(metadata_location), None).await?;
889
890 Table::builder()
891 .identifier(table_ident.clone())
892 .file_io(file_io)
893 .metadata(response.metadata)
894 .metadata_location(metadata_location.clone())
895 .build()
896 }
897
898 async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
899 let context = self.context().await?;
900
901 let request = context
902 .client
903 .request(
904 Method::POST,
905 context.config.table_endpoint(commit.identifier()),
906 )
907 .json(&CommitTableRequest {
908 identifier: commit.identifier().clone(),
909 requirements: commit.take_requirements(),
910 updates: commit.take_updates(),
911 })
912 .build()?;
913
914 let http_response = context.client.query_catalog(request).await?;
915
916 let response: CommitTableResponse = match http_response.status() {
917 StatusCode::OK => deserialize_catalog_response(http_response).await?,
918 StatusCode::NOT_FOUND => {
919 return Err(Error::new(
920 ErrorKind::TableNotFound,
921 "Tried to update a table that does not exist",
922 ));
923 }
924 StatusCode::CONFLICT => {
925 return Err(Error::new(
926 ErrorKind::CatalogCommitConflicts,
927 "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
928 )
929 .with_retryable(true));
930 }
931 StatusCode::INTERNAL_SERVER_ERROR => {
932 return Err(Error::new(
933 ErrorKind::Unexpected,
934 "An unknown server-side problem occurred; the commit state is unknown.",
935 ));
936 }
937 StatusCode::BAD_GATEWAY => {
938 return Err(Error::new(
939 ErrorKind::Unexpected,
940 "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
941 ));
942 }
943 StatusCode::GATEWAY_TIMEOUT => {
944 return Err(Error::new(
945 ErrorKind::Unexpected,
946 "A server-side gateway timeout occurred; the commit state is unknown.",
947 ));
948 }
949 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
950 };
951
952 let file_io = self
953 .load_file_io(Some(&response.metadata_location), None)
954 .await?;
955
956 Table::builder()
957 .identifier(commit.identifier().clone())
958 .file_io(file_io)
959 .metadata(response.metadata)
960 .metadata_location(response.metadata_location)
961 .build()
962 }
963}
964
965#[cfg(test)]
966mod tests {
967 use std::fs::File;
968 use std::io::BufReader;
969 use std::sync::Arc;
970
971 use chrono::{TimeZone, Utc};
972 use iceberg::spec::{
973 FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
974 SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
975 UnboundPartitionField, UnboundPartitionSpec,
976 };
977 use iceberg::transaction::{ApplyTransactionAction, Transaction};
978 use mockito::{Mock, Server, ServerGuard};
979 use serde_json::json;
980 use uuid::uuid;
981
982 use super::*;
983
984 #[tokio::test]
985 async fn test_update_config() {
986 let mut server = Server::new_async().await;
987
988 let config_mock = server
989 .mock("GET", "/v1/config")
990 .with_status(200)
991 .with_body(
992 r#"{
993 "overrides": {
994 "warehouse": "s3://iceberg-catalog"
995 },
996 "defaults": {}
997 }"#,
998 )
999 .create_async()
1000 .await;
1001
1002 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1003
1004 assert_eq!(
1005 catalog
1006 .context()
1007 .await
1008 .unwrap()
1009 .config
1010 .props
1011 .get("warehouse"),
1012 Some(&"s3://iceberg-catalog".to_string())
1013 );
1014
1015 config_mock.assert_async().await;
1016 }
1017
1018 async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1019 server
1020 .mock("GET", "/v1/config")
1021 .with_status(200)
1022 .with_body(
1023 r#"{
1024 "overrides": {
1025 "warehouse": "s3://iceberg-catalog"
1026 },
1027 "defaults": {}
1028 }"#,
1029 )
1030 .create_async()
1031 .await
1032 }
1033
1034 async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1035 create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1036 }
1037
1038 async fn create_oauth_mock_with_path(
1039 server: &mut ServerGuard,
1040 path: &str,
1041 token: &str,
1042 status: usize,
1043 ) -> Mock {
1044 let body = format!(
1045 r#"{{
1046 "access_token": "{token}",
1047 "token_type": "Bearer",
1048 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1049 "expires_in": 86400
1050 }}"#
1051 );
1052 server
1053 .mock("POST", path)
1054 .with_status(status)
1055 .with_body(body)
1056 .expect(1)
1057 .create_async()
1058 .await
1059 }
1060
1061 #[tokio::test]
1062 async fn test_oauth() {
1063 let mut server = Server::new_async().await;
1064 let oauth_mock = create_oauth_mock(&mut server).await;
1065 let config_mock = create_config_mock(&mut server).await;
1066
1067 let mut props = HashMap::new();
1068 props.insert("credential".to_string(), "client1:secret1".to_string());
1069
1070 let catalog = RestCatalog::new(
1071 RestCatalogConfig::builder()
1072 .uri(server.url())
1073 .props(props)
1074 .build(),
1075 );
1076
1077 let token = catalog.context().await.unwrap().client.token().await;
1078 oauth_mock.assert_async().await;
1079 config_mock.assert_async().await;
1080 assert_eq!(token, Some("ey000000000000".to_string()));
1081 }
1082
1083 #[tokio::test]
1084 async fn test_oauth_with_optional_param() {
1085 let mut props = HashMap::new();
1086 props.insert("credential".to_string(), "client1:secret1".to_string());
1087 props.insert("scope".to_string(), "custom_scope".to_string());
1088 props.insert("audience".to_string(), "custom_audience".to_string());
1089 props.insert("resource".to_string(), "custom_resource".to_string());
1090
1091 let mut server = Server::new_async().await;
1092 let oauth_mock = server
1093 .mock("POST", "/v1/oauth/tokens")
1094 .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1095 .match_body(mockito::Matcher::Regex(
1096 "audience=custom_audience".to_string(),
1097 ))
1098 .match_body(mockito::Matcher::Regex(
1099 "resource=custom_resource".to_string(),
1100 ))
1101 .with_status(200)
1102 .with_body(
1103 r#"{
1104 "access_token": "ey000000000000",
1105 "token_type": "Bearer",
1106 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1107 "expires_in": 86400
1108 }"#,
1109 )
1110 .expect(1)
1111 .create_async()
1112 .await;
1113
1114 let config_mock = create_config_mock(&mut server).await;
1115
1116 let catalog = RestCatalog::new(
1117 RestCatalogConfig::builder()
1118 .uri(server.url())
1119 .props(props)
1120 .build(),
1121 );
1122
1123 let token = catalog.context().await.unwrap().client.token().await;
1124
1125 oauth_mock.assert_async().await;
1126 config_mock.assert_async().await;
1127 assert_eq!(token, Some("ey000000000000".to_string()));
1128 }
1129
1130 #[tokio::test]
1131 async fn test_invalidate_token() {
1132 let mut server = Server::new_async().await;
1133 let oauth_mock = create_oauth_mock(&mut server).await;
1134 let config_mock = create_config_mock(&mut server).await;
1135
1136 let mut props = HashMap::new();
1137 props.insert("credential".to_string(), "client1:secret1".to_string());
1138
1139 let catalog = RestCatalog::new(
1140 RestCatalogConfig::builder()
1141 .uri(server.url())
1142 .props(props)
1143 .build(),
1144 );
1145
1146 let token = catalog.context().await.unwrap().client.token().await;
1147 oauth_mock.assert_async().await;
1148 config_mock.assert_async().await;
1149 assert_eq!(token, Some("ey000000000000".to_string()));
1150
1151 let oauth_mock =
1152 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1153 .await;
1154 catalog.invalidate_token().await.unwrap();
1155 let token = catalog.context().await.unwrap().client.token().await;
1156 oauth_mock.assert_async().await;
1157 assert_eq!(token, Some("ey000000000001".to_string()));
1158 }
1159
1160 #[tokio::test]
1161 async fn test_invalidate_token_failing_request() {
1162 let mut server = Server::new_async().await;
1163 let oauth_mock = create_oauth_mock(&mut server).await;
1164 let config_mock = create_config_mock(&mut server).await;
1165
1166 let mut props = HashMap::new();
1167 props.insert("credential".to_string(), "client1:secret1".to_string());
1168
1169 let catalog = RestCatalog::new(
1170 RestCatalogConfig::builder()
1171 .uri(server.url())
1172 .props(props)
1173 .build(),
1174 );
1175
1176 let token = catalog.context().await.unwrap().client.token().await;
1177 oauth_mock.assert_async().await;
1178 config_mock.assert_async().await;
1179 assert_eq!(token, Some("ey000000000000".to_string()));
1180
1181 let oauth_mock =
1182 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1183 .await;
1184 catalog.invalidate_token().await.unwrap();
1185 let token = catalog.context().await.unwrap().client.token().await;
1186 oauth_mock.assert_async().await;
1187 assert_eq!(token, None);
1188 }
1189
1190 #[tokio::test]
1191 async fn test_regenerate_token() {
1192 let mut server = Server::new_async().await;
1193 let oauth_mock = create_oauth_mock(&mut server).await;
1194 let config_mock = create_config_mock(&mut server).await;
1195
1196 let mut props = HashMap::new();
1197 props.insert("credential".to_string(), "client1:secret1".to_string());
1198
1199 let catalog = RestCatalog::new(
1200 RestCatalogConfig::builder()
1201 .uri(server.url())
1202 .props(props)
1203 .build(),
1204 );
1205
1206 let token = catalog.context().await.unwrap().client.token().await;
1207 oauth_mock.assert_async().await;
1208 config_mock.assert_async().await;
1209 assert_eq!(token, Some("ey000000000000".to_string()));
1210
1211 let oauth_mock =
1212 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1213 .await;
1214 catalog.regenerate_token().await.unwrap();
1215 oauth_mock.assert_async().await;
1216 let token = catalog.context().await.unwrap().client.token().await;
1217 assert_eq!(token, Some("ey000000000001".to_string()));
1218 }
1219
1220 #[tokio::test]
1221 async fn test_regenerate_token_failing_request() {
1222 let mut server = Server::new_async().await;
1223 let oauth_mock = create_oauth_mock(&mut server).await;
1224 let config_mock = create_config_mock(&mut server).await;
1225
1226 let mut props = HashMap::new();
1227 props.insert("credential".to_string(), "client1:secret1".to_string());
1228
1229 let catalog = RestCatalog::new(
1230 RestCatalogConfig::builder()
1231 .uri(server.url())
1232 .props(props)
1233 .build(),
1234 );
1235
1236 let token = catalog.context().await.unwrap().client.token().await;
1237 oauth_mock.assert_async().await;
1238 config_mock.assert_async().await;
1239 assert_eq!(token, Some("ey000000000000".to_string()));
1240
1241 let oauth_mock =
1242 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1243 .await;
1244 let invalidate_result = catalog.regenerate_token().await;
1245 assert!(invalidate_result.is_err());
1246 oauth_mock.assert_async().await;
1247 let token = catalog.context().await.unwrap().client.token().await;
1248
1249 assert_eq!(token, Some("ey000000000000".to_string()));
1251 }
1252
1253 #[tokio::test]
1254 async fn test_http_headers() {
1255 let server = Server::new_async().await;
1256 let mut props = HashMap::new();
1257 props.insert("credential".to_string(), "client1:secret1".to_string());
1258
1259 let config = RestCatalogConfig::builder()
1260 .uri(server.url())
1261 .props(props)
1262 .build();
1263 let headers: HeaderMap = config.extra_headers().unwrap();
1264
1265 let expected_headers = HeaderMap::from_iter([
1266 (
1267 header::CONTENT_TYPE,
1268 HeaderValue::from_static("application/json"),
1269 ),
1270 (
1271 HeaderName::from_static("x-client-version"),
1272 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1273 ),
1274 (
1275 header::USER_AGENT,
1276 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1277 ),
1278 ]);
1279 assert_eq!(headers, expected_headers);
1280 }
1281
1282 #[tokio::test]
1283 async fn test_http_headers_with_custom_headers() {
1284 let server = Server::new_async().await;
1285 let mut props = HashMap::new();
1286 props.insert("credential".to_string(), "client1:secret1".to_string());
1287 props.insert(
1288 "header.content-type".to_string(),
1289 "application/yaml".to_string(),
1290 );
1291 props.insert(
1292 "header.customized-header".to_string(),
1293 "some/value".to_string(),
1294 );
1295
1296 let config = RestCatalogConfig::builder()
1297 .uri(server.url())
1298 .props(props)
1299 .build();
1300 let headers: HeaderMap = config.extra_headers().unwrap();
1301
1302 let expected_headers = HeaderMap::from_iter([
1303 (
1304 header::CONTENT_TYPE,
1305 HeaderValue::from_static("application/yaml"),
1306 ),
1307 (
1308 HeaderName::from_static("x-client-version"),
1309 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1310 ),
1311 (
1312 header::USER_AGENT,
1313 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1314 ),
1315 (
1316 HeaderName::from_static("customized-header"),
1317 HeaderValue::from_static("some/value"),
1318 ),
1319 ]);
1320 assert_eq!(headers, expected_headers);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_oauth_with_oauth2_server_uri() {
1325 let mut server = Server::new_async().await;
1326 let config_mock = create_config_mock(&mut server).await;
1327
1328 let mut auth_server = Server::new_async().await;
1329 let auth_server_path = "/some/path";
1330 let oauth_mock =
1331 create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1332 .await;
1333
1334 let mut props = HashMap::new();
1335 props.insert("credential".to_string(), "client1:secret1".to_string());
1336 props.insert(
1337 "oauth2-server-uri".to_string(),
1338 format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1339 );
1340
1341 let catalog = RestCatalog::new(
1342 RestCatalogConfig::builder()
1343 .uri(server.url())
1344 .props(props)
1345 .build(),
1346 );
1347
1348 let token = catalog.context().await.unwrap().client.token().await;
1349
1350 oauth_mock.assert_async().await;
1351 config_mock.assert_async().await;
1352 assert_eq!(token, Some("ey000000000000".to_string()));
1353 }
1354
1355 #[tokio::test]
1356 async fn test_config_override() {
1357 let mut server = Server::new_async().await;
1358 let mut redirect_server = Server::new_async().await;
1359 let new_uri = redirect_server.url();
1360
1361 let config_mock = server
1362 .mock("GET", "/v1/config")
1363 .with_status(200)
1364 .with_body(
1365 json!(
1366 {
1367 "overrides": {
1368 "uri": new_uri,
1369 "warehouse": "s3://iceberg-catalog",
1370 "prefix": "ice/warehouses/my"
1371 },
1372 "defaults": {},
1373 }
1374 )
1375 .to_string(),
1376 )
1377 .create_async()
1378 .await;
1379
1380 let list_ns_mock = redirect_server
1381 .mock("GET", "/v1/ice/warehouses/my/namespaces")
1382 .with_body(
1383 r#"{
1384 "namespaces": []
1385 }"#,
1386 )
1387 .create_async()
1388 .await;
1389
1390 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1391
1392 let _namespaces = catalog.list_namespaces(None).await.unwrap();
1393
1394 config_mock.assert_async().await;
1395 list_ns_mock.assert_async().await;
1396 }
1397
1398 #[tokio::test]
1399 async fn test_list_namespace() {
1400 let mut server = Server::new_async().await;
1401
1402 let config_mock = create_config_mock(&mut server).await;
1403
1404 let list_ns_mock = server
1405 .mock("GET", "/v1/namespaces")
1406 .with_body(
1407 r#"{
1408 "namespaces": [
1409 ["ns1", "ns11"],
1410 ["ns2"]
1411 ]
1412 }"#,
1413 )
1414 .create_async()
1415 .await;
1416
1417 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1418
1419 let namespaces = catalog.list_namespaces(None).await.unwrap();
1420
1421 let expected_ns = vec![
1422 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1423 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1424 ];
1425
1426 assert_eq!(expected_ns, namespaces);
1427
1428 config_mock.assert_async().await;
1429 list_ns_mock.assert_async().await;
1430 }
1431
1432 #[tokio::test]
1433 async fn test_list_namespace_with_pagination() {
1434 let mut server = Server::new_async().await;
1435
1436 let config_mock = create_config_mock(&mut server).await;
1437
1438 let list_ns_mock_page1 = server
1439 .mock("GET", "/v1/namespaces")
1440 .with_body(
1441 r#"{
1442 "namespaces": [
1443 ["ns1", "ns11"],
1444 ["ns2"]
1445 ],
1446 "next-page-token": "token123"
1447 }"#,
1448 )
1449 .create_async()
1450 .await;
1451
1452 let list_ns_mock_page2 = server
1453 .mock("GET", "/v1/namespaces?pageToken=token123")
1454 .with_body(
1455 r#"{
1456 "namespaces": [
1457 ["ns3"],
1458 ["ns4", "ns41"]
1459 ]
1460 }"#,
1461 )
1462 .create_async()
1463 .await;
1464
1465 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1466
1467 let namespaces = catalog.list_namespaces(None).await.unwrap();
1468
1469 let expected_ns = vec![
1470 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1471 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1472 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1473 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1474 ];
1475
1476 assert_eq!(expected_ns, namespaces);
1477
1478 config_mock.assert_async().await;
1479 list_ns_mock_page1.assert_async().await;
1480 list_ns_mock_page2.assert_async().await;
1481 }
1482
1483 #[tokio::test]
1484 async fn test_list_namespace_with_multiple_pages() {
1485 let mut server = Server::new_async().await;
1486
1487 let config_mock = create_config_mock(&mut server).await;
1488
1489 let list_ns_mock_page1 = server
1491 .mock("GET", "/v1/namespaces")
1492 .with_body(
1493 r#"{
1494 "namespaces": [
1495 ["ns1", "ns11"],
1496 ["ns2"]
1497 ],
1498 "next-page-token": "page2"
1499 }"#,
1500 )
1501 .create_async()
1502 .await;
1503
1504 let list_ns_mock_page2 = server
1506 .mock("GET", "/v1/namespaces?pageToken=page2")
1507 .with_body(
1508 r#"{
1509 "namespaces": [
1510 ["ns3"],
1511 ["ns4", "ns41"]
1512 ],
1513 "next-page-token": "page3"
1514 }"#,
1515 )
1516 .create_async()
1517 .await;
1518
1519 let list_ns_mock_page3 = server
1521 .mock("GET", "/v1/namespaces?pageToken=page3")
1522 .with_body(
1523 r#"{
1524 "namespaces": [
1525 ["ns5", "ns51", "ns511"]
1526 ],
1527 "next-page-token": "page4"
1528 }"#,
1529 )
1530 .create_async()
1531 .await;
1532
1533 let list_ns_mock_page4 = server
1535 .mock("GET", "/v1/namespaces?pageToken=page4")
1536 .with_body(
1537 r#"{
1538 "namespaces": [
1539 ["ns6"],
1540 ["ns7"]
1541 ],
1542 "next-page-token": "page5"
1543 }"#,
1544 )
1545 .create_async()
1546 .await;
1547
1548 let list_ns_mock_page5 = server
1550 .mock("GET", "/v1/namespaces?pageToken=page5")
1551 .with_body(
1552 r#"{
1553 "namespaces": [
1554 ["ns8", "ns81"]
1555 ]
1556 }"#,
1557 )
1558 .create_async()
1559 .await;
1560
1561 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1562
1563 let namespaces = catalog.list_namespaces(None).await.unwrap();
1564
1565 let expected_ns = vec![
1566 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1567 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1568 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1569 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1570 NamespaceIdent::from_vec(vec![
1571 "ns5".to_string(),
1572 "ns51".to_string(),
1573 "ns511".to_string(),
1574 ])
1575 .unwrap(),
1576 NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1577 NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1578 NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1579 ];
1580
1581 assert_eq!(expected_ns, namespaces);
1582
1583 config_mock.assert_async().await;
1585 list_ns_mock_page1.assert_async().await;
1586 list_ns_mock_page2.assert_async().await;
1587 list_ns_mock_page3.assert_async().await;
1588 list_ns_mock_page4.assert_async().await;
1589 list_ns_mock_page5.assert_async().await;
1590 }
1591
1592 #[tokio::test]
1593 async fn test_create_namespace() {
1594 let mut server = Server::new_async().await;
1595
1596 let config_mock = create_config_mock(&mut server).await;
1597
1598 let create_ns_mock = server
1599 .mock("POST", "/v1/namespaces")
1600 .with_body(
1601 r#"{
1602 "namespace": [ "ns1", "ns11"],
1603 "properties" : {
1604 "key1": "value1"
1605 }
1606 }"#,
1607 )
1608 .create_async()
1609 .await;
1610
1611 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1612
1613 let namespaces = catalog
1614 .create_namespace(
1615 &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1616 HashMap::from([("key1".to_string(), "value1".to_string())]),
1617 )
1618 .await
1619 .unwrap();
1620
1621 let expected_ns = Namespace::with_properties(
1622 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1623 HashMap::from([("key1".to_string(), "value1".to_string())]),
1624 );
1625
1626 assert_eq!(expected_ns, namespaces);
1627
1628 config_mock.assert_async().await;
1629 create_ns_mock.assert_async().await;
1630 }
1631
1632 #[tokio::test]
1633 async fn test_get_namespace() {
1634 let mut server = Server::new_async().await;
1635
1636 let config_mock = create_config_mock(&mut server).await;
1637
1638 let get_ns_mock = server
1639 .mock("GET", "/v1/namespaces/ns1")
1640 .with_body(
1641 r#"{
1642 "namespace": [ "ns1"],
1643 "properties" : {
1644 "key1": "value1"
1645 }
1646 }"#,
1647 )
1648 .create_async()
1649 .await;
1650
1651 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1652
1653 let namespaces = catalog
1654 .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1655 .await
1656 .unwrap();
1657
1658 let expected_ns = Namespace::with_properties(
1659 NamespaceIdent::new("ns1".to_string()),
1660 HashMap::from([("key1".to_string(), "value1".to_string())]),
1661 );
1662
1663 assert_eq!(expected_ns, namespaces);
1664
1665 config_mock.assert_async().await;
1666 get_ns_mock.assert_async().await;
1667 }
1668
1669 #[tokio::test]
1670 async fn check_namespace_exists() {
1671 let mut server = Server::new_async().await;
1672
1673 let config_mock = create_config_mock(&mut server).await;
1674
1675 let get_ns_mock = server
1676 .mock("HEAD", "/v1/namespaces/ns1")
1677 .with_status(204)
1678 .create_async()
1679 .await;
1680
1681 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1682
1683 assert!(
1684 catalog
1685 .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1686 .await
1687 .unwrap()
1688 );
1689
1690 config_mock.assert_async().await;
1691 get_ns_mock.assert_async().await;
1692 }
1693
1694 #[tokio::test]
1695 async fn test_drop_namespace() {
1696 let mut server = Server::new_async().await;
1697
1698 let config_mock = create_config_mock(&mut server).await;
1699
1700 let drop_ns_mock = server
1701 .mock("DELETE", "/v1/namespaces/ns1")
1702 .with_status(204)
1703 .create_async()
1704 .await;
1705
1706 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1707
1708 catalog
1709 .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1710 .await
1711 .unwrap();
1712
1713 config_mock.assert_async().await;
1714 drop_ns_mock.assert_async().await;
1715 }
1716
1717 #[tokio::test]
1718 async fn test_list_tables() {
1719 let mut server = Server::new_async().await;
1720
1721 let config_mock = create_config_mock(&mut server).await;
1722
1723 let list_tables_mock = server
1724 .mock("GET", "/v1/namespaces/ns1/tables")
1725 .with_status(200)
1726 .with_body(
1727 r#"{
1728 "identifiers": [
1729 {
1730 "namespace": ["ns1"],
1731 "name": "table1"
1732 },
1733 {
1734 "namespace": ["ns1"],
1735 "name": "table2"
1736 }
1737 ]
1738 }"#,
1739 )
1740 .create_async()
1741 .await;
1742
1743 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1744
1745 let tables = catalog
1746 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1747 .await
1748 .unwrap();
1749
1750 let expected_tables = vec![
1751 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1752 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1753 ];
1754
1755 assert_eq!(tables, expected_tables);
1756
1757 config_mock.assert_async().await;
1758 list_tables_mock.assert_async().await;
1759 }
1760
1761 #[tokio::test]
1762 async fn test_list_tables_with_pagination() {
1763 let mut server = Server::new_async().await;
1764
1765 let config_mock = create_config_mock(&mut server).await;
1766
1767 let list_tables_mock_page1 = server
1768 .mock("GET", "/v1/namespaces/ns1/tables")
1769 .with_status(200)
1770 .with_body(
1771 r#"{
1772 "identifiers": [
1773 {
1774 "namespace": ["ns1"],
1775 "name": "table1"
1776 },
1777 {
1778 "namespace": ["ns1"],
1779 "name": "table2"
1780 }
1781 ],
1782 "next-page-token": "token456"
1783 }"#,
1784 )
1785 .create_async()
1786 .await;
1787
1788 let list_tables_mock_page2 = server
1789 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1790 .with_status(200)
1791 .with_body(
1792 r#"{
1793 "identifiers": [
1794 {
1795 "namespace": ["ns1"],
1796 "name": "table3"
1797 },
1798 {
1799 "namespace": ["ns1"],
1800 "name": "table4"
1801 }
1802 ]
1803 }"#,
1804 )
1805 .create_async()
1806 .await;
1807
1808 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1809
1810 let tables = catalog
1811 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1812 .await
1813 .unwrap();
1814
1815 let expected_tables = vec![
1816 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1817 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1818 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1819 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1820 ];
1821
1822 assert_eq!(tables, expected_tables);
1823
1824 config_mock.assert_async().await;
1825 list_tables_mock_page1.assert_async().await;
1826 list_tables_mock_page2.assert_async().await;
1827 }
1828
1829 #[tokio::test]
1830 async fn test_list_tables_with_multiple_pages() {
1831 let mut server = Server::new_async().await;
1832
1833 let config_mock = create_config_mock(&mut server).await;
1834
1835 let list_tables_mock_page1 = server
1837 .mock("GET", "/v1/namespaces/ns1/tables")
1838 .with_status(200)
1839 .with_body(
1840 r#"{
1841 "identifiers": [
1842 {
1843 "namespace": ["ns1"],
1844 "name": "table1"
1845 },
1846 {
1847 "namespace": ["ns1"],
1848 "name": "table2"
1849 }
1850 ],
1851 "next-page-token": "page2"
1852 }"#,
1853 )
1854 .create_async()
1855 .await;
1856
1857 let list_tables_mock_page2 = server
1859 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1860 .with_status(200)
1861 .with_body(
1862 r#"{
1863 "identifiers": [
1864 {
1865 "namespace": ["ns1"],
1866 "name": "table3"
1867 },
1868 {
1869 "namespace": ["ns1"],
1870 "name": "table4"
1871 }
1872 ],
1873 "next-page-token": "page3"
1874 }"#,
1875 )
1876 .create_async()
1877 .await;
1878
1879 let list_tables_mock_page3 = server
1881 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
1882 .with_status(200)
1883 .with_body(
1884 r#"{
1885 "identifiers": [
1886 {
1887 "namespace": ["ns1"],
1888 "name": "table5"
1889 }
1890 ],
1891 "next-page-token": "page4"
1892 }"#,
1893 )
1894 .create_async()
1895 .await;
1896
1897 let list_tables_mock_page4 = server
1899 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
1900 .with_status(200)
1901 .with_body(
1902 r#"{
1903 "identifiers": [
1904 {
1905 "namespace": ["ns1"],
1906 "name": "table6"
1907 },
1908 {
1909 "namespace": ["ns1"],
1910 "name": "table7"
1911 }
1912 ],
1913 "next-page-token": "page5"
1914 }"#,
1915 )
1916 .create_async()
1917 .await;
1918
1919 let list_tables_mock_page5 = server
1921 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
1922 .with_status(200)
1923 .with_body(
1924 r#"{
1925 "identifiers": [
1926 {
1927 "namespace": ["ns1"],
1928 "name": "table8"
1929 }
1930 ]
1931 }"#,
1932 )
1933 .create_async()
1934 .await;
1935
1936 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1937
1938 let tables = catalog
1939 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1940 .await
1941 .unwrap();
1942
1943 let expected_tables = vec![
1944 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1945 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1946 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1947 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1948 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
1949 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
1950 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
1951 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
1952 ];
1953
1954 assert_eq!(tables, expected_tables);
1955
1956 config_mock.assert_async().await;
1958 list_tables_mock_page1.assert_async().await;
1959 list_tables_mock_page2.assert_async().await;
1960 list_tables_mock_page3.assert_async().await;
1961 list_tables_mock_page4.assert_async().await;
1962 list_tables_mock_page5.assert_async().await;
1963 }
1964
1965 #[tokio::test]
1966 async fn test_drop_tables() {
1967 let mut server = Server::new_async().await;
1968
1969 let config_mock = create_config_mock(&mut server).await;
1970
1971 let delete_table_mock = server
1972 .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
1973 .with_status(204)
1974 .create_async()
1975 .await;
1976
1977 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1978
1979 catalog
1980 .drop_table(&TableIdent::new(
1981 NamespaceIdent::new("ns1".to_string()),
1982 "table1".to_string(),
1983 ))
1984 .await
1985 .unwrap();
1986
1987 config_mock.assert_async().await;
1988 delete_table_mock.assert_async().await;
1989 }
1990
1991 #[tokio::test]
1992 async fn test_check_table_exists() {
1993 let mut server = Server::new_async().await;
1994
1995 let config_mock = create_config_mock(&mut server).await;
1996
1997 let check_table_exists_mock = server
1998 .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
1999 .with_status(204)
2000 .create_async()
2001 .await;
2002
2003 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2004
2005 assert!(
2006 catalog
2007 .table_exists(&TableIdent::new(
2008 NamespaceIdent::new("ns1".to_string()),
2009 "table1".to_string(),
2010 ))
2011 .await
2012 .unwrap()
2013 );
2014
2015 config_mock.assert_async().await;
2016 check_table_exists_mock.assert_async().await;
2017 }
2018
2019 #[tokio::test]
2020 async fn test_rename_table() {
2021 let mut server = Server::new_async().await;
2022
2023 let config_mock = create_config_mock(&mut server).await;
2024
2025 let rename_table_mock = server
2026 .mock("POST", "/v1/tables/rename")
2027 .with_status(204)
2028 .create_async()
2029 .await;
2030
2031 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2032
2033 catalog
2034 .rename_table(
2035 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2036 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2037 )
2038 .await
2039 .unwrap();
2040
2041 config_mock.assert_async().await;
2042 rename_table_mock.assert_async().await;
2043 }
2044
2045 #[tokio::test]
2046 async fn test_load_table() {
2047 let mut server = Server::new_async().await;
2048
2049 let config_mock = create_config_mock(&mut server).await;
2050
2051 let rename_table_mock = server
2052 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2053 .with_status(200)
2054 .with_body_from_file(format!(
2055 "{}/testdata/{}",
2056 env!("CARGO_MANIFEST_DIR"),
2057 "load_table_response.json"
2058 ))
2059 .create_async()
2060 .await;
2061
2062 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2063
2064 let table = catalog
2065 .load_table(&TableIdent::new(
2066 NamespaceIdent::new("ns1".to_string()),
2067 "test1".to_string(),
2068 ))
2069 .await
2070 .unwrap();
2071
2072 assert_eq!(
2073 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2074 table.identifier()
2075 );
2076 assert_eq!(
2077 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2078 table.metadata_location().unwrap()
2079 );
2080 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2081 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2082 assert_eq!(
2083 uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2084 table.metadata().uuid()
2085 );
2086 assert_eq!(
2087 Utc.timestamp_millis_opt(1646787054459).unwrap(),
2088 table.metadata().last_updated_timestamp().unwrap()
2089 );
2090 assert_eq!(
2091 vec![&Arc::new(
2092 Schema::builder()
2093 .with_fields(vec![
2094 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2095 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2096 .into(),
2097 ])
2098 .build()
2099 .unwrap()
2100 )],
2101 table.metadata().schemas_iter().collect::<Vec<_>>()
2102 );
2103 assert_eq!(
2104 &HashMap::from([
2105 ("owner".to_string(), "bryan".to_string()),
2106 (
2107 "write.metadata.compression-codec".to_string(),
2108 "gzip".to_string()
2109 )
2110 ]),
2111 table.metadata().properties()
2112 );
2113 assert_eq!(vec![&Arc::new(Snapshot::builder()
2114 .with_snapshot_id(3497810964824022504)
2115 .with_timestamp_ms(1646787054459)
2116 .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2117 .with_sequence_number(0)
2118 .with_schema_id(0)
2119 .with_summary(Summary {
2120 operation: Operation::Append,
2121 additional_properties: HashMap::from_iter([
2122 ("spark.app.id", "local-1646787004168"),
2123 ("added-data-files", "1"),
2124 ("added-records", "1"),
2125 ("added-files-size", "697"),
2126 ("changed-partition-count", "1"),
2127 ("total-records", "1"),
2128 ("total-files-size", "697"),
2129 ("total-data-files", "1"),
2130 ("total-delete-files", "0"),
2131 ("total-position-deletes", "0"),
2132 ("total-equality-deletes", "0")
2133 ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2134 }).build()
2135 )], table.metadata().snapshots().collect::<Vec<_>>());
2136 assert_eq!(
2137 &[SnapshotLog {
2138 timestamp_ms: 1646787054459,
2139 snapshot_id: 3497810964824022504,
2140 }],
2141 table.metadata().history()
2142 );
2143 assert_eq!(
2144 vec![&Arc::new(SortOrder {
2145 order_id: 0,
2146 fields: vec![],
2147 })],
2148 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2149 );
2150
2151 config_mock.assert_async().await;
2152 rename_table_mock.assert_async().await;
2153 }
2154
2155 #[tokio::test]
2156 async fn test_load_table_404() {
2157 let mut server = Server::new_async().await;
2158
2159 let config_mock = create_config_mock(&mut server).await;
2160
2161 let rename_table_mock = server
2162 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2163 .with_status(404)
2164 .with_body(r#"
2165{
2166 "error": {
2167 "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2168 "type": "NoSuchNamespaceErrorException",
2169 "code": 404
2170 }
2171}
2172 "#)
2173 .create_async()
2174 .await;
2175
2176 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2177
2178 let table = catalog
2179 .load_table(&TableIdent::new(
2180 NamespaceIdent::new("ns1".to_string()),
2181 "test1".to_string(),
2182 ))
2183 .await;
2184
2185 assert!(table.is_err());
2186 assert!(table.err().unwrap().message().contains("does not exist"));
2187
2188 config_mock.assert_async().await;
2189 rename_table_mock.assert_async().await;
2190 }
2191
2192 #[tokio::test]
2193 async fn test_create_table() {
2194 let mut server = Server::new_async().await;
2195
2196 let config_mock = create_config_mock(&mut server).await;
2197
2198 let create_table_mock = server
2199 .mock("POST", "/v1/namespaces/ns1/tables")
2200 .with_status(200)
2201 .with_body_from_file(format!(
2202 "{}/testdata/{}",
2203 env!("CARGO_MANIFEST_DIR"),
2204 "create_table_response.json"
2205 ))
2206 .create_async()
2207 .await;
2208
2209 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2210
2211 let table_creation = TableCreation::builder()
2212 .name("test1".to_string())
2213 .schema(
2214 Schema::builder()
2215 .with_fields(vec![
2216 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2217 .into(),
2218 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2219 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2220 .into(),
2221 ])
2222 .with_schema_id(1)
2223 .with_identifier_field_ids(vec![2])
2224 .build()
2225 .unwrap(),
2226 )
2227 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2228 .partition_spec(
2229 UnboundPartitionSpec::builder()
2230 .add_partition_fields(vec![
2231 UnboundPartitionField::builder()
2232 .source_id(1)
2233 .transform(Transform::Truncate(3))
2234 .name("id".to_string())
2235 .build(),
2236 ])
2237 .unwrap()
2238 .build(),
2239 )
2240 .sort_order(
2241 SortOrder::builder()
2242 .with_sort_field(
2243 SortField::builder()
2244 .source_id(2)
2245 .transform(Transform::Identity)
2246 .direction(SortDirection::Ascending)
2247 .null_order(NullOrder::First)
2248 .build(),
2249 )
2250 .build_unbound()
2251 .unwrap(),
2252 )
2253 .build();
2254
2255 let table = catalog
2256 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2257 .await
2258 .unwrap();
2259
2260 assert_eq!(
2261 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2262 table.identifier()
2263 );
2264 assert_eq!(
2265 "s3://warehouse/database/table/metadata.json",
2266 table.metadata_location().unwrap()
2267 );
2268 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2269 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2270 assert_eq!(
2271 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2272 table.metadata().uuid()
2273 );
2274 assert_eq!(
2275 1657810967051,
2276 table
2277 .metadata()
2278 .last_updated_timestamp()
2279 .unwrap()
2280 .timestamp_millis()
2281 );
2282 assert_eq!(
2283 vec![&Arc::new(
2284 Schema::builder()
2285 .with_fields(vec![
2286 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2287 .into(),
2288 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2289 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2290 .into(),
2291 ])
2292 .with_schema_id(0)
2293 .with_identifier_field_ids(vec![2])
2294 .build()
2295 .unwrap()
2296 )],
2297 table.metadata().schemas_iter().collect::<Vec<_>>()
2298 );
2299 assert_eq!(
2300 &HashMap::from([
2301 (
2302 "write.delete.parquet.compression-codec".to_string(),
2303 "zstd".to_string()
2304 ),
2305 (
2306 "write.metadata.compression-codec".to_string(),
2307 "gzip".to_string()
2308 ),
2309 (
2310 "write.summary.partition-limit".to_string(),
2311 "100".to_string()
2312 ),
2313 (
2314 "write.parquet.compression-codec".to_string(),
2315 "zstd".to_string()
2316 ),
2317 ]),
2318 table.metadata().properties()
2319 );
2320 assert!(table.metadata().current_snapshot().is_none());
2321 assert!(table.metadata().history().is_empty());
2322 assert_eq!(
2323 vec![&Arc::new(SortOrder {
2324 order_id: 0,
2325 fields: vec![],
2326 })],
2327 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2328 );
2329
2330 config_mock.assert_async().await;
2331 create_table_mock.assert_async().await;
2332 }
2333
2334 #[tokio::test]
2335 async fn test_create_table_409() {
2336 let mut server = Server::new_async().await;
2337
2338 let config_mock = create_config_mock(&mut server).await;
2339
2340 let create_table_mock = server
2341 .mock("POST", "/v1/namespaces/ns1/tables")
2342 .with_status(409)
2343 .with_body(r#"
2344{
2345 "error": {
2346 "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2347 "type": "AlreadyExistsException",
2348 "code": 409
2349 }
2350}
2351 "#)
2352 .create_async()
2353 .await;
2354
2355 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2356
2357 let table_creation = TableCreation::builder()
2358 .name("test1".to_string())
2359 .schema(
2360 Schema::builder()
2361 .with_fields(vec![
2362 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2363 .into(),
2364 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2365 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2366 .into(),
2367 ])
2368 .with_schema_id(1)
2369 .with_identifier_field_ids(vec![2])
2370 .build()
2371 .unwrap(),
2372 )
2373 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2374 .build();
2375
2376 let table_result = catalog
2377 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2378 .await;
2379
2380 assert!(table_result.is_err());
2381 assert!(
2382 table_result
2383 .err()
2384 .unwrap()
2385 .message()
2386 .contains("already exists")
2387 );
2388
2389 config_mock.assert_async().await;
2390 create_table_mock.assert_async().await;
2391 }
2392
2393 #[tokio::test]
2394 async fn test_update_table() {
2395 let mut server = Server::new_async().await;
2396
2397 let config_mock = create_config_mock(&mut server).await;
2398
2399 let load_table_mock = server
2400 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2401 .with_status(200)
2402 .with_body_from_file(format!(
2403 "{}/testdata/{}",
2404 env!("CARGO_MANIFEST_DIR"),
2405 "load_table_response.json"
2406 ))
2407 .create_async()
2408 .await;
2409
2410 let update_table_mock = server
2411 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2412 .with_status(200)
2413 .with_body_from_file(format!(
2414 "{}/testdata/{}",
2415 env!("CARGO_MANIFEST_DIR"),
2416 "update_table_response.json"
2417 ))
2418 .create_async()
2419 .await;
2420
2421 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2422
2423 let table1 = {
2424 let file = File::open(format!(
2425 "{}/testdata/{}",
2426 env!("CARGO_MANIFEST_DIR"),
2427 "create_table_response.json"
2428 ))
2429 .unwrap();
2430 let reader = BufReader::new(file);
2431 let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap();
2432
2433 Table::builder()
2434 .metadata(resp.metadata)
2435 .metadata_location(resp.metadata_location.unwrap())
2436 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2437 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2438 .build()
2439 .unwrap()
2440 };
2441
2442 let tx = Transaction::new(&table1);
2443 let table = tx
2444 .upgrade_table_version()
2445 .set_format_version(FormatVersion::V2)
2446 .apply(tx)
2447 .unwrap()
2448 .commit(&catalog)
2449 .await
2450 .unwrap();
2451
2452 assert_eq!(
2453 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2454 table.identifier()
2455 );
2456 assert_eq!(
2457 "s3://warehouse/database/table/metadata.json",
2458 table.metadata_location().unwrap()
2459 );
2460 assert_eq!(FormatVersion::V2, table.metadata().format_version());
2461 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2462 assert_eq!(
2463 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2464 table.metadata().uuid()
2465 );
2466 assert_eq!(
2467 1657810967051,
2468 table
2469 .metadata()
2470 .last_updated_timestamp()
2471 .unwrap()
2472 .timestamp_millis()
2473 );
2474 assert_eq!(
2475 vec![&Arc::new(
2476 Schema::builder()
2477 .with_fields(vec![
2478 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2479 .into(),
2480 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2481 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2482 .into(),
2483 ])
2484 .with_schema_id(0)
2485 .with_identifier_field_ids(vec![2])
2486 .build()
2487 .unwrap()
2488 )],
2489 table.metadata().schemas_iter().collect::<Vec<_>>()
2490 );
2491 assert_eq!(
2492 &HashMap::from([
2493 (
2494 "write.delete.parquet.compression-codec".to_string(),
2495 "zstd".to_string()
2496 ),
2497 (
2498 "write.metadata.compression-codec".to_string(),
2499 "gzip".to_string()
2500 ),
2501 (
2502 "write.summary.partition-limit".to_string(),
2503 "100".to_string()
2504 ),
2505 (
2506 "write.parquet.compression-codec".to_string(),
2507 "zstd".to_string()
2508 ),
2509 ]),
2510 table.metadata().properties()
2511 );
2512 assert!(table.metadata().current_snapshot().is_none());
2513 assert!(table.metadata().history().is_empty());
2514 assert_eq!(
2515 vec![&Arc::new(SortOrder {
2516 order_id: 0,
2517 fields: vec![],
2518 })],
2519 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2520 );
2521
2522 config_mock.assert_async().await;
2523 update_table_mock.assert_async().await;
2524 load_table_mock.assert_async().await
2525 }
2526
2527 #[tokio::test]
2528 async fn test_update_table_404() {
2529 let mut server = Server::new_async().await;
2530
2531 let config_mock = create_config_mock(&mut server).await;
2532
2533 let load_table_mock = server
2534 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2535 .with_status(200)
2536 .with_body_from_file(format!(
2537 "{}/testdata/{}",
2538 env!("CARGO_MANIFEST_DIR"),
2539 "load_table_response.json"
2540 ))
2541 .create_async()
2542 .await;
2543
2544 let update_table_mock = server
2545 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2546 .with_status(404)
2547 .with_body(
2548 r#"
2549{
2550 "error": {
2551 "message": "The given table does not exist",
2552 "type": "NoSuchTableException",
2553 "code": 404
2554 }
2555}
2556 "#,
2557 )
2558 .create_async()
2559 .await;
2560
2561 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2562
2563 let table1 = {
2564 let file = File::open(format!(
2565 "{}/testdata/{}",
2566 env!("CARGO_MANIFEST_DIR"),
2567 "create_table_response.json"
2568 ))
2569 .unwrap();
2570 let reader = BufReader::new(file);
2571 let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap();
2572
2573 Table::builder()
2574 .metadata(resp.metadata)
2575 .metadata_location(resp.metadata_location.unwrap())
2576 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2577 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2578 .build()
2579 .unwrap()
2580 };
2581
2582 let tx = Transaction::new(&table1);
2583 let table_result = tx
2584 .upgrade_table_version()
2585 .set_format_version(FormatVersion::V2)
2586 .apply(tx)
2587 .unwrap()
2588 .commit(&catalog)
2589 .await;
2590
2591 assert!(table_result.is_err());
2592 assert!(
2593 table_result
2594 .err()
2595 .unwrap()
2596 .message()
2597 .contains("does not exist")
2598 );
2599
2600 config_mock.assert_async().await;
2601 update_table_mock.assert_async().await;
2602 load_table_mock.assert_async().await;
2603 }
2604
2605 #[tokio::test]
2606 async fn test_register_table() {
2607 let mut server = Server::new_async().await;
2608
2609 let config_mock = create_config_mock(&mut server).await;
2610
2611 let register_table_mock = server
2612 .mock("POST", "/v1/namespaces/ns1/register")
2613 .with_status(200)
2614 .with_body_from_file(format!(
2615 "{}/testdata/{}",
2616 env!("CARGO_MANIFEST_DIR"),
2617 "load_table_response.json"
2618 ))
2619 .create_async()
2620 .await;
2621
2622 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2623 let table_ident =
2624 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2625 let metadata_location = String::from(
2626 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2627 );
2628
2629 let table = catalog
2630 .register_table(&table_ident, metadata_location)
2631 .await
2632 .unwrap();
2633
2634 assert_eq!(
2635 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2636 table.identifier()
2637 );
2638 assert_eq!(
2639 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2640 table.metadata_location().unwrap()
2641 );
2642
2643 config_mock.assert_async().await;
2644 register_table_mock.assert_async().await;
2645 }
2646
2647 #[tokio::test]
2648 async fn test_register_table_404() {
2649 let mut server = Server::new_async().await;
2650
2651 let config_mock = create_config_mock(&mut server).await;
2652
2653 let register_table_mock = server
2654 .mock("POST", "/v1/namespaces/ns1/register")
2655 .with_status(404)
2656 .with_body(
2657 r#"
2658{
2659 "error": {
2660 "message": "The namespace specified does not exist",
2661 "type": "NoSuchNamespaceErrorException",
2662 "code": 404
2663 }
2664}
2665 "#,
2666 )
2667 .create_async()
2668 .await;
2669
2670 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2671
2672 let table_ident =
2673 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2674 let metadata_location = String::from(
2675 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2676 );
2677 let table = catalog
2678 .register_table(&table_ident, metadata_location)
2679 .await;
2680
2681 assert!(table.is_err());
2682 assert!(table.err().unwrap().message().contains("does not exist"));
2683
2684 config_mock.assert_async().await;
2685 register_table_mock.assert_async().await;
2686 }
2687
2688 #[tokio::test]
2689 async fn test_create_rest_catalog() {
2690 let builder = RestCatalogBuilder::default().with_client(Client::new());
2691
2692 let catalog = builder
2693 .load(
2694 "test",
2695 HashMap::from([
2696 (
2697 REST_CATALOG_PROP_URI.to_string(),
2698 "http://localhost:8080".to_string(),
2699 ),
2700 ("a".to_string(), "b".to_string()),
2701 ]),
2702 )
2703 .await;
2704
2705 assert!(catalog.is_ok());
2706
2707 let catalog_config = catalog.unwrap().user_config;
2708 assert_eq!(catalog_config.name.as_deref(), Some("test"));
2709 assert_eq!(catalog_config.uri, "http://localhost:8080");
2710 assert_eq!(catalog_config.warehouse, None);
2711 assert!(catalog_config.client.is_some());
2712
2713 assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2714 assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2715 }
2716
2717 #[tokio::test]
2718 async fn test_create_rest_catalog_no_uri() {
2719 let builder = RestCatalogBuilder::default();
2720
2721 let catalog = builder
2722 .load(
2723 "test",
2724 HashMap::from([(
2725 REST_CATALOG_PROP_WAREHOUSE.to_string(),
2726 "s3://warehouse".to_string(),
2727 )]),
2728 )
2729 .await;
2730
2731 assert!(catalog.is_err());
2732 if let Err(err) = catalog {
2733 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2734 assert_eq!(err.message(), "Catalog uri is required");
2735 }
2736 }
2737}