1use std::any::Any;
21use std::collections::HashMap;
22use std::future::Future;
23use std::str::FromStr;
24
25use async_trait::async_trait;
26use iceberg::io::{self, FileIO};
27use iceberg::table::Table;
28use iceberg::{
29 Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30 TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34 HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41 HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44 CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45 CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46 NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49pub const REST_CATALOG_PROP_URI: &str = "uri";
51pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60#[derive(Debug)]
62pub struct RestCatalogBuilder(RestCatalogConfig);
63
64impl Default for RestCatalogBuilder {
65 fn default() -> Self {
66 Self(RestCatalogConfig {
67 name: None,
68 uri: "".to_string(),
69 warehouse: None,
70 props: HashMap::new(),
71 client: None,
72 })
73 }
74}
75
76impl CatalogBuilder for RestCatalogBuilder {
77 type C = RestCatalog;
78
79 fn load(
80 mut self,
81 name: impl Into<String>,
82 props: HashMap<String, String>,
83 ) -> impl Future<Output = Result<Self::C>> + Send {
84 self.0.name = Some(name.into());
85
86 if props.contains_key(REST_CATALOG_PROP_URI) {
87 self.0.uri = props
88 .get(REST_CATALOG_PROP_URI)
89 .cloned()
90 .unwrap_or_default();
91 }
92
93 if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
94 self.0.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
95 }
96
97 self.0.props = props
99 .into_iter()
100 .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
101 .collect();
102
103 let result = {
104 if self.0.name.is_none() {
105 Err(Error::new(
106 ErrorKind::DataInvalid,
107 "Catalog name is required",
108 ))
109 } else if self.0.uri.is_empty() {
110 Err(Error::new(
111 ErrorKind::DataInvalid,
112 "Catalog uri is required",
113 ))
114 } else {
115 Ok(RestCatalog::new(self.0))
116 }
117 };
118
119 std::future::ready(result)
120 }
121}
122
123impl RestCatalogBuilder {
124 pub fn with_client(mut self, client: Client) -> Self {
126 self.0.client = Some(client);
127 self
128 }
129}
130
131#[derive(Clone, Debug, TypedBuilder)]
133pub(crate) struct RestCatalogConfig {
134 #[builder(default, setter(strip_option))]
135 name: Option<String>,
136
137 uri: String,
138
139 #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
140 warehouse: Option<String>,
141
142 #[builder(default)]
143 props: HashMap<String, String>,
144
145 #[builder(default)]
146 client: Option<Client>,
147}
148
149impl RestCatalogConfig {
150 fn url_prefixed(&self, parts: &[&str]) -> String {
151 [&self.uri, PATH_V1]
152 .into_iter()
153 .chain(self.props.get("prefix").map(|s| &**s))
154 .chain(parts.iter().cloned())
155 .join("/")
156 }
157
158 fn config_endpoint(&self) -> String {
159 [&self.uri, PATH_V1, "config"].join("/")
160 }
161
162 pub(crate) fn get_token_endpoint(&self) -> String {
163 if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
164 oauth2_uri.to_string()
165 } else {
166 [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
167 }
168 }
169
170 fn namespaces_endpoint(&self) -> String {
171 self.url_prefixed(&["namespaces"])
172 }
173
174 fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
175 self.url_prefixed(&["namespaces", &ns.to_url_string()])
176 }
177
178 fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
179 self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
180 }
181
182 fn rename_table_endpoint(&self) -> String {
183 self.url_prefixed(&["tables", "rename"])
184 }
185
186 fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
187 self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
188 }
189
190 fn table_endpoint(&self, table: &TableIdent) -> String {
191 self.url_prefixed(&[
192 "namespaces",
193 &table.namespace.to_url_string(),
194 "tables",
195 &table.name,
196 ])
197 }
198
199 pub(crate) fn client(&self) -> Option<Client> {
201 self.client.clone()
202 }
203
204 pub(crate) fn token(&self) -> Option<String> {
208 self.props.get("token").cloned()
209 }
210
211 pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
220 let cred = self.props.get("credential")?;
221
222 match cred.split_once(':') {
223 Some((client_id, client_secret)) => {
224 Some((Some(client_id.to_string()), client_secret.to_string()))
225 }
226 None => Some((None, cred.to_string())),
227 }
228 }
229
230 pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
237 let mut headers = HeaderMap::from_iter([
238 (
239 header::CONTENT_TYPE,
240 HeaderValue::from_static("application/json"),
241 ),
242 (
243 HeaderName::from_static("x-client-version"),
244 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
245 ),
246 (
247 header::USER_AGENT,
248 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
249 ),
250 ]);
251
252 for (key, value) in self
253 .props
254 .iter()
255 .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
256 {
257 headers.insert(
258 HeaderName::from_str(key).map_err(|e| {
259 Error::new(
260 ErrorKind::DataInvalid,
261 format!("Invalid header name: {key}"),
262 )
263 .with_source(e)
264 })?,
265 HeaderValue::from_str(value).map_err(|e| {
266 Error::new(
267 ErrorKind::DataInvalid,
268 format!("Invalid header value: {value}"),
269 )
270 .with_source(e)
271 })?,
272 );
273 }
274
275 Ok(headers)
276 }
277
278 pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
280 let mut params = HashMap::new();
281
282 if let Some(scope) = self.props.get("scope") {
283 params.insert("scope".to_string(), scope.to_string());
284 } else {
285 params.insert("scope".to_string(), "catalog".to_string());
286 }
287
288 let optional_params = ["audience", "resource"];
289 for param_name in optional_params {
290 if let Some(value) = self.props.get(param_name) {
291 params.insert(param_name.to_string(), value.to_string());
292 }
293 }
294
295 params
296 }
297
298 pub(crate) fn disable_header_redaction(&self) -> bool {
303 self.props
304 .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
305 .map(|v| v.eq_ignore_ascii_case("true"))
306 .unwrap_or(false)
307 }
308
309 pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
311 if let Some(uri) = config.overrides.remove("uri") {
312 self.uri = uri;
313 }
314
315 let mut props = config.defaults;
316 props.extend(self.props);
317 props.extend(config.overrides);
318
319 self.props = props;
320 self
321 }
322}
323
324#[derive(Debug)]
325struct RestContext {
326 client: HttpClient,
327 config: RestCatalogConfig,
331}
332
333#[derive(Debug)]
335pub struct RestCatalog {
336 user_config: RestCatalogConfig,
340 ctx: OnceCell<RestContext>,
341 file_io_extensions: io::Extensions,
343}
344
345impl RestCatalog {
346 fn new(config: RestCatalogConfig) -> Self {
348 Self {
349 user_config: config,
350 ctx: OnceCell::new(),
351 file_io_extensions: io::Extensions::default(),
352 }
353 }
354
355 pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
357 self.file_io_extensions.add(ext);
358 self
359 }
360
361 async fn context(&self) -> Result<&RestContext> {
363 self.ctx
364 .get_or_try_init(|| async {
365 let client = HttpClient::new(&self.user_config)?;
366 let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
367 let config = self.user_config.clone().merge_with_config(catalog_config);
368 let client = client.update_with(&config)?;
369
370 Ok(RestContext { config, client })
371 })
372 .await
373 }
374
375 async fn load_config(
379 client: &HttpClient,
380 user_config: &RestCatalogConfig,
381 ) -> Result<CatalogConfig> {
382 let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
383
384 if let Some(warehouse_location) = &user_config.warehouse {
385 request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
386 }
387
388 let request = request_builder.build()?;
389
390 let http_response = client.query_catalog(request).await?;
391
392 match http_response.status() {
393 StatusCode::OK => deserialize_catalog_response(http_response).await,
394 _ => Err(deserialize_unexpected_catalog_error(
395 http_response,
396 client.disable_header_redaction(),
397 )
398 .await),
399 }
400 }
401
402 async fn load_file_io(
403 &self,
404 metadata_location: Option<&str>,
405 extra_config: Option<HashMap<String, String>>,
406 ) -> Result<FileIO> {
407 let mut props = self.context().await?.config.props.clone();
408 if let Some(config) = extra_config {
409 props.extend(config);
410 }
411
412 let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
415 Some(url) if Url::parse(url).is_ok() => Some(url),
416 Some(_) => None,
417 None => None,
418 };
419
420 let file_io = match metadata_location.or(warehouse_path) {
421 Some(url) => FileIO::from_path(url)?
422 .with_props(props)
423 .with_extensions(self.file_io_extensions.clone())
424 .build()?,
425 None => {
426 return Err(Error::new(
427 ErrorKind::Unexpected,
428 "Unable to load file io, neither warehouse nor metadata location is set!",
429 ))?;
430 }
431 };
432
433 Ok(file_io)
434 }
435
436 pub async fn invalidate_token(&self) -> Result<()> {
439 self.context().await?.client.invalidate_token().await
440 }
441
442 pub async fn regenerate_token(&self) -> Result<()> {
449 self.context().await?.client.regenerate_token().await
450 }
451}
452
453#[async_trait]
456impl Catalog for RestCatalog {
457 async fn list_namespaces(
458 &self,
459 parent: Option<&NamespaceIdent>,
460 ) -> Result<Vec<NamespaceIdent>> {
461 let context = self.context().await?;
462 let endpoint = context.config.namespaces_endpoint();
463 let mut namespaces = Vec::new();
464 let mut next_token = None;
465
466 loop {
467 let mut request = context.client.request(Method::GET, endpoint.clone());
468
469 if let Some(ns) = parent {
471 request = request.query(&[("parent", ns.to_url_string())]);
472 }
473
474 if let Some(token) = next_token {
475 request = request.query(&[("pageToken", token)]);
476 }
477
478 let http_response = context.client.query_catalog(request.build()?).await?;
479
480 match http_response.status() {
481 StatusCode::OK => {
482 let response =
483 deserialize_catalog_response::<ListNamespaceResponse>(http_response)
484 .await?;
485
486 namespaces.extend(response.namespaces);
487
488 match response.next_page_token {
489 Some(token) => next_token = Some(token),
490 None => break,
491 }
492 }
493 StatusCode::NOT_FOUND => {
494 return Err(Error::new(
495 ErrorKind::Unexpected,
496 "The parent parameter of the namespace provided does not exist",
497 ));
498 }
499 _ => {
500 return Err(deserialize_unexpected_catalog_error(
501 http_response,
502 context.client.disable_header_redaction(),
503 )
504 .await);
505 }
506 }
507 }
508
509 Ok(namespaces)
510 }
511
512 async fn create_namespace(
513 &self,
514 namespace: &NamespaceIdent,
515 properties: HashMap<String, String>,
516 ) -> Result<Namespace> {
517 let context = self.context().await?;
518
519 let request = context
520 .client
521 .request(Method::POST, context.config.namespaces_endpoint())
522 .json(&CreateNamespaceRequest {
523 namespace: namespace.clone(),
524 properties,
525 })
526 .build()?;
527
528 let http_response = context.client.query_catalog(request).await?;
529
530 match http_response.status() {
531 StatusCode::OK => {
532 let response =
533 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
534 Ok(Namespace::from(response))
535 }
536 StatusCode::CONFLICT => Err(Error::new(
537 ErrorKind::Unexpected,
538 "Tried to create a namespace that already exists",
539 )),
540 _ => Err(deserialize_unexpected_catalog_error(
541 http_response,
542 context.client.disable_header_redaction(),
543 )
544 .await),
545 }
546 }
547
548 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
549 let context = self.context().await?;
550
551 let request = context
552 .client
553 .request(Method::GET, context.config.namespace_endpoint(namespace))
554 .build()?;
555
556 let http_response = context.client.query_catalog(request).await?;
557
558 match http_response.status() {
559 StatusCode::OK => {
560 let response =
561 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
562 Ok(Namespace::from(response))
563 }
564 StatusCode::NOT_FOUND => Err(Error::new(
565 ErrorKind::Unexpected,
566 "Tried to get a namespace that does not exist",
567 )),
568 _ => Err(deserialize_unexpected_catalog_error(
569 http_response,
570 context.client.disable_header_redaction(),
571 )
572 .await),
573 }
574 }
575
576 async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
577 let context = self.context().await?;
578
579 let request = context
580 .client
581 .request(Method::HEAD, context.config.namespace_endpoint(ns))
582 .build()?;
583
584 let http_response = context.client.query_catalog(request).await?;
585
586 match http_response.status() {
587 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
588 StatusCode::NOT_FOUND => Ok(false),
589 _ => Err(deserialize_unexpected_catalog_error(
590 http_response,
591 context.client.disable_header_redaction(),
592 )
593 .await),
594 }
595 }
596
597 async fn update_namespace(
598 &self,
599 _namespace: &NamespaceIdent,
600 _properties: HashMap<String, String>,
601 ) -> Result<()> {
602 Err(Error::new(
603 ErrorKind::FeatureUnsupported,
604 "Updating namespace not supported yet!",
605 ))
606 }
607
608 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
609 let context = self.context().await?;
610
611 let request = context
612 .client
613 .request(Method::DELETE, context.config.namespace_endpoint(namespace))
614 .build()?;
615
616 let http_response = context.client.query_catalog(request).await?;
617
618 match http_response.status() {
619 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
620 StatusCode::NOT_FOUND => Err(Error::new(
621 ErrorKind::Unexpected,
622 "Tried to drop a namespace that does not exist",
623 )),
624 _ => Err(deserialize_unexpected_catalog_error(
625 http_response,
626 context.client.disable_header_redaction(),
627 )
628 .await),
629 }
630 }
631
632 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
633 let context = self.context().await?;
634 let endpoint = context.config.tables_endpoint(namespace);
635 let mut identifiers = Vec::new();
636 let mut next_token = None;
637
638 loop {
639 let mut request = context.client.request(Method::GET, endpoint.clone());
640
641 if let Some(token) = next_token {
642 request = request.query(&[("pageToken", token)]);
643 }
644
645 let http_response = context.client.query_catalog(request.build()?).await?;
646
647 match http_response.status() {
648 StatusCode::OK => {
649 let response =
650 deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
651
652 identifiers.extend(response.identifiers);
653
654 match response.next_page_token {
655 Some(token) => next_token = Some(token),
656 None => break,
657 }
658 }
659 StatusCode::NOT_FOUND => {
660 return Err(Error::new(
661 ErrorKind::Unexpected,
662 "Tried to list tables of a namespace that does not exist",
663 ));
664 }
665 _ => {
666 return Err(deserialize_unexpected_catalog_error(
667 http_response,
668 context.client.disable_header_redaction(),
669 )
670 .await);
671 }
672 }
673 }
674
675 Ok(identifiers)
676 }
677
678 async fn create_table(
685 &self,
686 namespace: &NamespaceIdent,
687 creation: TableCreation,
688 ) -> Result<Table> {
689 let context = self.context().await?;
690
691 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
692
693 let request = context
694 .client
695 .request(Method::POST, context.config.tables_endpoint(namespace))
696 .json(&CreateTableRequest {
697 name: creation.name,
698 location: creation.location,
699 schema: creation.schema,
700 partition_spec: creation.partition_spec,
701 write_order: creation.sort_order,
702 stage_create: Some(false),
703 properties: creation.properties,
704 })
705 .build()?;
706
707 let http_response = context.client.query_catalog(request).await?;
708
709 let response = match http_response.status() {
710 StatusCode::OK => {
711 deserialize_catalog_response::<LoadTableResult>(http_response).await?
712 }
713 StatusCode::NOT_FOUND => {
714 return Err(Error::new(
715 ErrorKind::Unexpected,
716 "Tried to create a table under a namespace that does not exist",
717 ));
718 }
719 StatusCode::CONFLICT => {
720 return Err(Error::new(
721 ErrorKind::Unexpected,
722 "The table already exists",
723 ));
724 }
725 _ => {
726 return Err(deserialize_unexpected_catalog_error(
727 http_response,
728 context.client.disable_header_redaction(),
729 )
730 .await);
731 }
732 };
733
734 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
735 ErrorKind::DataInvalid,
736 "Metadata location missing in `create_table` response!",
737 ))?;
738
739 let config = response
740 .config
741 .into_iter()
742 .chain(self.user_config.props.clone())
743 .collect();
744
745 let file_io = self
746 .load_file_io(Some(metadata_location), Some(config))
747 .await?;
748
749 let table_builder = Table::builder()
750 .identifier(table_ident.clone())
751 .file_io(file_io)
752 .metadata(response.metadata);
753
754 if let Some(metadata_location) = response.metadata_location {
755 table_builder.metadata_location(metadata_location).build()
756 } else {
757 table_builder.build()
758 }
759 }
760
761 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
767 let context = self.context().await?;
768
769 let request = context
770 .client
771 .request(Method::GET, context.config.table_endpoint(table_ident))
772 .build()?;
773
774 let http_response = context.client.query_catalog(request).await?;
775
776 let response = match http_response.status() {
777 StatusCode::OK | StatusCode::NOT_MODIFIED => {
778 deserialize_catalog_response::<LoadTableResult>(http_response).await?
779 }
780 StatusCode::NOT_FOUND => {
781 return Err(Error::new(
782 ErrorKind::Unexpected,
783 "Tried to load a table that does not exist",
784 ));
785 }
786 _ => {
787 return Err(deserialize_unexpected_catalog_error(
788 http_response,
789 context.client.disable_header_redaction(),
790 )
791 .await);
792 }
793 };
794
795 let config = response
796 .config
797 .into_iter()
798 .chain(self.user_config.props.clone())
799 .collect();
800
801 let file_io = self
802 .load_file_io(response.metadata_location.as_deref(), Some(config))
803 .await?;
804
805 let table_builder = Table::builder()
806 .identifier(table_ident.clone())
807 .file_io(file_io)
808 .metadata(response.metadata);
809
810 if let Some(metadata_location) = response.metadata_location {
811 table_builder.metadata_location(metadata_location).build()
812 } else {
813 table_builder.build()
814 }
815 }
816
817 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
819 let context = self.context().await?;
820
821 let request = context
822 .client
823 .request(Method::DELETE, context.config.table_endpoint(table))
824 .build()?;
825
826 let http_response = context.client.query_catalog(request).await?;
827
828 match http_response.status() {
829 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
830 StatusCode::NOT_FOUND => Err(Error::new(
831 ErrorKind::Unexpected,
832 "Tried to drop a table that does not exist",
833 )),
834 _ => Err(deserialize_unexpected_catalog_error(
835 http_response,
836 context.client.disable_header_redaction(),
837 )
838 .await),
839 }
840 }
841
842 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
844 let context = self.context().await?;
845
846 let request = context
847 .client
848 .request(Method::HEAD, context.config.table_endpoint(table))
849 .build()?;
850
851 let http_response = context.client.query_catalog(request).await?;
852
853 match http_response.status() {
854 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
855 StatusCode::NOT_FOUND => Ok(false),
856 _ => Err(deserialize_unexpected_catalog_error(
857 http_response,
858 context.client.disable_header_redaction(),
859 )
860 .await),
861 }
862 }
863
864 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
866 let context = self.context().await?;
867
868 let request = context
869 .client
870 .request(Method::POST, context.config.rename_table_endpoint())
871 .json(&RenameTableRequest {
872 source: src.clone(),
873 destination: dest.clone(),
874 })
875 .build()?;
876
877 let http_response = context.client.query_catalog(request).await?;
878
879 match http_response.status() {
880 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
881 StatusCode::NOT_FOUND => Err(Error::new(
882 ErrorKind::Unexpected,
883 "Tried to rename a table that does not exist (is the namespace correct?)",
884 )),
885 StatusCode::CONFLICT => Err(Error::new(
886 ErrorKind::Unexpected,
887 "Tried to rename a table to a name that already exists",
888 )),
889 _ => Err(deserialize_unexpected_catalog_error(
890 http_response,
891 context.client.disable_header_redaction(),
892 )
893 .await),
894 }
895 }
896
897 async fn register_table(
898 &self,
899 table_ident: &TableIdent,
900 metadata_location: String,
901 ) -> Result<Table> {
902 let context = self.context().await?;
903
904 let request = context
905 .client
906 .request(
907 Method::POST,
908 context
909 .config
910 .register_table_endpoint(table_ident.namespace()),
911 )
912 .json(&RegisterTableRequest {
913 name: table_ident.name.clone(),
914 metadata_location: metadata_location.clone(),
915 overwrite: Some(false),
916 })
917 .build()?;
918
919 let http_response = context.client.query_catalog(request).await?;
920
921 let response: LoadTableResult = match http_response.status() {
922 StatusCode::OK => {
923 deserialize_catalog_response::<LoadTableResult>(http_response).await?
924 }
925 StatusCode::NOT_FOUND => {
926 return Err(Error::new(
927 ErrorKind::NamespaceNotFound,
928 "The namespace specified does not exist.",
929 ));
930 }
931 StatusCode::CONFLICT => {
932 return Err(Error::new(
933 ErrorKind::TableAlreadyExists,
934 "The given table already exists.",
935 ));
936 }
937 _ => {
938 return Err(deserialize_unexpected_catalog_error(
939 http_response,
940 context.client.disable_header_redaction(),
941 )
942 .await);
943 }
944 };
945
946 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
947 ErrorKind::DataInvalid,
948 "Metadata location missing in `register_table` response!",
949 ))?;
950
951 let file_io = self.load_file_io(Some(metadata_location), None).await?;
952
953 Table::builder()
954 .identifier(table_ident.clone())
955 .file_io(file_io)
956 .metadata(response.metadata)
957 .metadata_location(metadata_location.clone())
958 .build()
959 }
960
961 async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
962 let context = self.context().await?;
963
964 let request = context
965 .client
966 .request(
967 Method::POST,
968 context.config.table_endpoint(commit.identifier()),
969 )
970 .json(&CommitTableRequest {
971 identifier: Some(commit.identifier().clone()),
972 requirements: commit.take_requirements(),
973 updates: commit.take_updates(),
974 })
975 .build()?;
976
977 let http_response = context.client.query_catalog(request).await?;
978
979 let response: CommitTableResponse = match http_response.status() {
980 StatusCode::OK => deserialize_catalog_response(http_response).await?,
981 StatusCode::NOT_FOUND => {
982 return Err(Error::new(
983 ErrorKind::TableNotFound,
984 "Tried to update a table that does not exist",
985 ));
986 }
987 StatusCode::CONFLICT => {
988 return Err(Error::new(
989 ErrorKind::CatalogCommitConflicts,
990 "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
991 )
992 .with_retryable(true));
993 }
994 StatusCode::INTERNAL_SERVER_ERROR => {
995 return Err(Error::new(
996 ErrorKind::Unexpected,
997 "An unknown server-side problem occurred; the commit state is unknown.",
998 ));
999 }
1000 StatusCode::BAD_GATEWAY => {
1001 return Err(Error::new(
1002 ErrorKind::Unexpected,
1003 "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1004 ));
1005 }
1006 StatusCode::GATEWAY_TIMEOUT => {
1007 return Err(Error::new(
1008 ErrorKind::Unexpected,
1009 "A server-side gateway timeout occurred; the commit state is unknown.",
1010 ));
1011 }
1012 _ => {
1013 return Err(deserialize_unexpected_catalog_error(
1014 http_response,
1015 context.client.disable_header_redaction(),
1016 )
1017 .await);
1018 }
1019 };
1020
1021 let file_io = self
1022 .load_file_io(Some(&response.metadata_location), None)
1023 .await?;
1024
1025 Table::builder()
1026 .identifier(commit.identifier().clone())
1027 .file_io(file_io)
1028 .metadata(response.metadata)
1029 .metadata_location(response.metadata_location)
1030 .build()
1031 }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036 use std::fs::File;
1037 use std::io::BufReader;
1038 use std::sync::Arc;
1039
1040 use chrono::{TimeZone, Utc};
1041 use iceberg::spec::{
1042 FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1043 SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1044 UnboundPartitionField, UnboundPartitionSpec,
1045 };
1046 use iceberg::transaction::{ApplyTransactionAction, Transaction};
1047 use mockito::{Mock, Server, ServerGuard};
1048 use serde_json::json;
1049 use uuid::uuid;
1050
1051 use super::*;
1052
1053 #[tokio::test]
1054 async fn test_update_config() {
1055 let mut server = Server::new_async().await;
1056
1057 let config_mock = server
1058 .mock("GET", "/v1/config")
1059 .with_status(200)
1060 .with_body(
1061 r#"{
1062 "overrides": {
1063 "warehouse": "s3://iceberg-catalog"
1064 },
1065 "defaults": {}
1066 }"#,
1067 )
1068 .create_async()
1069 .await;
1070
1071 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1072
1073 assert_eq!(
1074 catalog
1075 .context()
1076 .await
1077 .unwrap()
1078 .config
1079 .props
1080 .get("warehouse"),
1081 Some(&"s3://iceberg-catalog".to_string())
1082 );
1083
1084 config_mock.assert_async().await;
1085 }
1086
1087 async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1088 server
1089 .mock("GET", "/v1/config")
1090 .with_status(200)
1091 .with_body(
1092 r#"{
1093 "overrides": {
1094 "warehouse": "s3://iceberg-catalog"
1095 },
1096 "defaults": {}
1097 }"#,
1098 )
1099 .create_async()
1100 .await
1101 }
1102
1103 async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1104 create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1105 }
1106
1107 async fn create_oauth_mock_with_path(
1108 server: &mut ServerGuard,
1109 path: &str,
1110 token: &str,
1111 status: usize,
1112 ) -> Mock {
1113 let body = format!(
1114 r#"{{
1115 "access_token": "{token}",
1116 "token_type": "Bearer",
1117 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1118 "expires_in": 86400
1119 }}"#
1120 );
1121 server
1122 .mock("POST", path)
1123 .with_status(status)
1124 .with_body(body)
1125 .expect(1)
1126 .create_async()
1127 .await
1128 }
1129
1130 #[tokio::test]
1131 async fn test_oauth() {
1132 let mut server = Server::new_async().await;
1133 let oauth_mock = create_oauth_mock(&mut server).await;
1134 let config_mock = create_config_mock(&mut server).await;
1135
1136 let mut props = HashMap::new();
1137 props.insert("credential".to_string(), "client1:secret1".to_string());
1138
1139 let catalog = RestCatalog::new(
1140 RestCatalogConfig::builder()
1141 .uri(server.url())
1142 .props(props)
1143 .build(),
1144 );
1145
1146 let token = catalog.context().await.unwrap().client.token().await;
1147 oauth_mock.assert_async().await;
1148 config_mock.assert_async().await;
1149 assert_eq!(token, Some("ey000000000000".to_string()));
1150 }
1151
1152 #[tokio::test]
1153 async fn test_oauth_with_optional_param() {
1154 let mut props = HashMap::new();
1155 props.insert("credential".to_string(), "client1:secret1".to_string());
1156 props.insert("scope".to_string(), "custom_scope".to_string());
1157 props.insert("audience".to_string(), "custom_audience".to_string());
1158 props.insert("resource".to_string(), "custom_resource".to_string());
1159
1160 let mut server = Server::new_async().await;
1161 let oauth_mock = server
1162 .mock("POST", "/v1/oauth/tokens")
1163 .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1164 .match_body(mockito::Matcher::Regex(
1165 "audience=custom_audience".to_string(),
1166 ))
1167 .match_body(mockito::Matcher::Regex(
1168 "resource=custom_resource".to_string(),
1169 ))
1170 .with_status(200)
1171 .with_body(
1172 r#"{
1173 "access_token": "ey000000000000",
1174 "token_type": "Bearer",
1175 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1176 "expires_in": 86400
1177 }"#,
1178 )
1179 .expect(1)
1180 .create_async()
1181 .await;
1182
1183 let config_mock = create_config_mock(&mut server).await;
1184
1185 let catalog = RestCatalog::new(
1186 RestCatalogConfig::builder()
1187 .uri(server.url())
1188 .props(props)
1189 .build(),
1190 );
1191
1192 let token = catalog.context().await.unwrap().client.token().await;
1193
1194 oauth_mock.assert_async().await;
1195 config_mock.assert_async().await;
1196 assert_eq!(token, Some("ey000000000000".to_string()));
1197 }
1198
1199 #[tokio::test]
1200 async fn test_invalidate_token() {
1201 let mut server = Server::new_async().await;
1202 let oauth_mock = create_oauth_mock(&mut server).await;
1203 let config_mock = create_config_mock(&mut server).await;
1204
1205 let mut props = HashMap::new();
1206 props.insert("credential".to_string(), "client1:secret1".to_string());
1207
1208 let catalog = RestCatalog::new(
1209 RestCatalogConfig::builder()
1210 .uri(server.url())
1211 .props(props)
1212 .build(),
1213 );
1214
1215 let token = catalog.context().await.unwrap().client.token().await;
1216 oauth_mock.assert_async().await;
1217 config_mock.assert_async().await;
1218 assert_eq!(token, Some("ey000000000000".to_string()));
1219
1220 let oauth_mock =
1221 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1222 .await;
1223 catalog.invalidate_token().await.unwrap();
1224 let token = catalog.context().await.unwrap().client.token().await;
1225 oauth_mock.assert_async().await;
1226 assert_eq!(token, Some("ey000000000001".to_string()));
1227 }
1228
1229 #[tokio::test]
1230 async fn test_invalidate_token_failing_request() {
1231 let mut server = Server::new_async().await;
1232 let oauth_mock = create_oauth_mock(&mut server).await;
1233 let config_mock = create_config_mock(&mut server).await;
1234
1235 let mut props = HashMap::new();
1236 props.insert("credential".to_string(), "client1:secret1".to_string());
1237
1238 let catalog = RestCatalog::new(
1239 RestCatalogConfig::builder()
1240 .uri(server.url())
1241 .props(props)
1242 .build(),
1243 );
1244
1245 let token = catalog.context().await.unwrap().client.token().await;
1246 oauth_mock.assert_async().await;
1247 config_mock.assert_async().await;
1248 assert_eq!(token, Some("ey000000000000".to_string()));
1249
1250 let oauth_mock =
1251 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1252 .await;
1253 catalog.invalidate_token().await.unwrap();
1254 let token = catalog.context().await.unwrap().client.token().await;
1255 oauth_mock.assert_async().await;
1256 assert_eq!(token, None);
1257 }
1258
1259 #[tokio::test]
1260 async fn test_regenerate_token() {
1261 let mut server = Server::new_async().await;
1262 let oauth_mock = create_oauth_mock(&mut server).await;
1263 let config_mock = create_config_mock(&mut server).await;
1264
1265 let mut props = HashMap::new();
1266 props.insert("credential".to_string(), "client1:secret1".to_string());
1267
1268 let catalog = RestCatalog::new(
1269 RestCatalogConfig::builder()
1270 .uri(server.url())
1271 .props(props)
1272 .build(),
1273 );
1274
1275 let token = catalog.context().await.unwrap().client.token().await;
1276 oauth_mock.assert_async().await;
1277 config_mock.assert_async().await;
1278 assert_eq!(token, Some("ey000000000000".to_string()));
1279
1280 let oauth_mock =
1281 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1282 .await;
1283 catalog.regenerate_token().await.unwrap();
1284 oauth_mock.assert_async().await;
1285 let token = catalog.context().await.unwrap().client.token().await;
1286 assert_eq!(token, Some("ey000000000001".to_string()));
1287 }
1288
1289 #[tokio::test]
1290 async fn test_regenerate_token_failing_request() {
1291 let mut server = Server::new_async().await;
1292 let oauth_mock = create_oauth_mock(&mut server).await;
1293 let config_mock = create_config_mock(&mut server).await;
1294
1295 let mut props = HashMap::new();
1296 props.insert("credential".to_string(), "client1:secret1".to_string());
1297
1298 let catalog = RestCatalog::new(
1299 RestCatalogConfig::builder()
1300 .uri(server.url())
1301 .props(props)
1302 .build(),
1303 );
1304
1305 let token = catalog.context().await.unwrap().client.token().await;
1306 oauth_mock.assert_async().await;
1307 config_mock.assert_async().await;
1308 assert_eq!(token, Some("ey000000000000".to_string()));
1309
1310 let oauth_mock =
1311 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1312 .await;
1313 let invalidate_result = catalog.regenerate_token().await;
1314 assert!(invalidate_result.is_err());
1315 oauth_mock.assert_async().await;
1316 let token = catalog.context().await.unwrap().client.token().await;
1317
1318 assert_eq!(token, Some("ey000000000000".to_string()));
1320 }
1321
1322 #[tokio::test]
1323 async fn test_http_headers() {
1324 let server = Server::new_async().await;
1325 let mut props = HashMap::new();
1326 props.insert("credential".to_string(), "client1:secret1".to_string());
1327
1328 let config = RestCatalogConfig::builder()
1329 .uri(server.url())
1330 .props(props)
1331 .build();
1332 let headers: HeaderMap = config.extra_headers().unwrap();
1333
1334 let expected_headers = HeaderMap::from_iter([
1335 (
1336 header::CONTENT_TYPE,
1337 HeaderValue::from_static("application/json"),
1338 ),
1339 (
1340 HeaderName::from_static("x-client-version"),
1341 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1342 ),
1343 (
1344 header::USER_AGENT,
1345 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1346 ),
1347 ]);
1348 assert_eq!(headers, expected_headers);
1349 }
1350
1351 #[tokio::test]
1352 async fn test_http_headers_with_custom_headers() {
1353 let server = Server::new_async().await;
1354 let mut props = HashMap::new();
1355 props.insert("credential".to_string(), "client1:secret1".to_string());
1356 props.insert(
1357 "header.content-type".to_string(),
1358 "application/yaml".to_string(),
1359 );
1360 props.insert(
1361 "header.customized-header".to_string(),
1362 "some/value".to_string(),
1363 );
1364
1365 let config = RestCatalogConfig::builder()
1366 .uri(server.url())
1367 .props(props)
1368 .build();
1369 let headers: HeaderMap = config.extra_headers().unwrap();
1370
1371 let expected_headers = HeaderMap::from_iter([
1372 (
1373 header::CONTENT_TYPE,
1374 HeaderValue::from_static("application/yaml"),
1375 ),
1376 (
1377 HeaderName::from_static("x-client-version"),
1378 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1379 ),
1380 (
1381 header::USER_AGENT,
1382 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1383 ),
1384 (
1385 HeaderName::from_static("customized-header"),
1386 HeaderValue::from_static("some/value"),
1387 ),
1388 ]);
1389 assert_eq!(headers, expected_headers);
1390 }
1391
1392 #[tokio::test]
1393 async fn test_oauth_with_oauth2_server_uri() {
1394 let mut server = Server::new_async().await;
1395 let config_mock = create_config_mock(&mut server).await;
1396
1397 let mut auth_server = Server::new_async().await;
1398 let auth_server_path = "/some/path";
1399 let oauth_mock =
1400 create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1401 .await;
1402
1403 let mut props = HashMap::new();
1404 props.insert("credential".to_string(), "client1:secret1".to_string());
1405 props.insert(
1406 "oauth2-server-uri".to_string(),
1407 format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1408 );
1409
1410 let catalog = RestCatalog::new(
1411 RestCatalogConfig::builder()
1412 .uri(server.url())
1413 .props(props)
1414 .build(),
1415 );
1416
1417 let token = catalog.context().await.unwrap().client.token().await;
1418
1419 oauth_mock.assert_async().await;
1420 config_mock.assert_async().await;
1421 assert_eq!(token, Some("ey000000000000".to_string()));
1422 }
1423
1424 #[tokio::test]
1425 async fn test_config_override() {
1426 let mut server = Server::new_async().await;
1427 let mut redirect_server = Server::new_async().await;
1428 let new_uri = redirect_server.url();
1429
1430 let config_mock = server
1431 .mock("GET", "/v1/config")
1432 .with_status(200)
1433 .with_body(
1434 json!(
1435 {
1436 "overrides": {
1437 "uri": new_uri,
1438 "warehouse": "s3://iceberg-catalog",
1439 "prefix": "ice/warehouses/my"
1440 },
1441 "defaults": {},
1442 }
1443 )
1444 .to_string(),
1445 )
1446 .create_async()
1447 .await;
1448
1449 let list_ns_mock = redirect_server
1450 .mock("GET", "/v1/ice/warehouses/my/namespaces")
1451 .with_body(
1452 r#"{
1453 "namespaces": []
1454 }"#,
1455 )
1456 .create_async()
1457 .await;
1458
1459 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1460
1461 let _namespaces = catalog.list_namespaces(None).await.unwrap();
1462
1463 config_mock.assert_async().await;
1464 list_ns_mock.assert_async().await;
1465 }
1466
1467 #[tokio::test]
1468 async fn test_list_namespace() {
1469 let mut server = Server::new_async().await;
1470
1471 let config_mock = create_config_mock(&mut server).await;
1472
1473 let list_ns_mock = server
1474 .mock("GET", "/v1/namespaces")
1475 .with_body(
1476 r#"{
1477 "namespaces": [
1478 ["ns1", "ns11"],
1479 ["ns2"]
1480 ]
1481 }"#,
1482 )
1483 .create_async()
1484 .await;
1485
1486 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1487
1488 let namespaces = catalog.list_namespaces(None).await.unwrap();
1489
1490 let expected_ns = vec![
1491 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1492 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1493 ];
1494
1495 assert_eq!(expected_ns, namespaces);
1496
1497 config_mock.assert_async().await;
1498 list_ns_mock.assert_async().await;
1499 }
1500
1501 #[tokio::test]
1502 async fn test_list_namespace_with_pagination() {
1503 let mut server = Server::new_async().await;
1504
1505 let config_mock = create_config_mock(&mut server).await;
1506
1507 let list_ns_mock_page1 = server
1508 .mock("GET", "/v1/namespaces")
1509 .with_body(
1510 r#"{
1511 "namespaces": [
1512 ["ns1", "ns11"],
1513 ["ns2"]
1514 ],
1515 "next-page-token": "token123"
1516 }"#,
1517 )
1518 .create_async()
1519 .await;
1520
1521 let list_ns_mock_page2 = server
1522 .mock("GET", "/v1/namespaces?pageToken=token123")
1523 .with_body(
1524 r#"{
1525 "namespaces": [
1526 ["ns3"],
1527 ["ns4", "ns41"]
1528 ]
1529 }"#,
1530 )
1531 .create_async()
1532 .await;
1533
1534 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1535
1536 let namespaces = catalog.list_namespaces(None).await.unwrap();
1537
1538 let expected_ns = vec![
1539 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1540 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1541 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1542 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1543 ];
1544
1545 assert_eq!(expected_ns, namespaces);
1546
1547 config_mock.assert_async().await;
1548 list_ns_mock_page1.assert_async().await;
1549 list_ns_mock_page2.assert_async().await;
1550 }
1551
1552 #[tokio::test]
1553 async fn test_list_namespace_with_multiple_pages() {
1554 let mut server = Server::new_async().await;
1555
1556 let config_mock = create_config_mock(&mut server).await;
1557
1558 let list_ns_mock_page1 = server
1560 .mock("GET", "/v1/namespaces")
1561 .with_body(
1562 r#"{
1563 "namespaces": [
1564 ["ns1", "ns11"],
1565 ["ns2"]
1566 ],
1567 "next-page-token": "page2"
1568 }"#,
1569 )
1570 .create_async()
1571 .await;
1572
1573 let list_ns_mock_page2 = server
1575 .mock("GET", "/v1/namespaces?pageToken=page2")
1576 .with_body(
1577 r#"{
1578 "namespaces": [
1579 ["ns3"],
1580 ["ns4", "ns41"]
1581 ],
1582 "next-page-token": "page3"
1583 }"#,
1584 )
1585 .create_async()
1586 .await;
1587
1588 let list_ns_mock_page3 = server
1590 .mock("GET", "/v1/namespaces?pageToken=page3")
1591 .with_body(
1592 r#"{
1593 "namespaces": [
1594 ["ns5", "ns51", "ns511"]
1595 ],
1596 "next-page-token": "page4"
1597 }"#,
1598 )
1599 .create_async()
1600 .await;
1601
1602 let list_ns_mock_page4 = server
1604 .mock("GET", "/v1/namespaces?pageToken=page4")
1605 .with_body(
1606 r#"{
1607 "namespaces": [
1608 ["ns6"],
1609 ["ns7"]
1610 ],
1611 "next-page-token": "page5"
1612 }"#,
1613 )
1614 .create_async()
1615 .await;
1616
1617 let list_ns_mock_page5 = server
1619 .mock("GET", "/v1/namespaces?pageToken=page5")
1620 .with_body(
1621 r#"{
1622 "namespaces": [
1623 ["ns8", "ns81"]
1624 ]
1625 }"#,
1626 )
1627 .create_async()
1628 .await;
1629
1630 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1631
1632 let namespaces = catalog.list_namespaces(None).await.unwrap();
1633
1634 let expected_ns = vec![
1635 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1636 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1637 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1638 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1639 NamespaceIdent::from_vec(vec![
1640 "ns5".to_string(),
1641 "ns51".to_string(),
1642 "ns511".to_string(),
1643 ])
1644 .unwrap(),
1645 NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1646 NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1647 NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1648 ];
1649
1650 assert_eq!(expected_ns, namespaces);
1651
1652 config_mock.assert_async().await;
1654 list_ns_mock_page1.assert_async().await;
1655 list_ns_mock_page2.assert_async().await;
1656 list_ns_mock_page3.assert_async().await;
1657 list_ns_mock_page4.assert_async().await;
1658 list_ns_mock_page5.assert_async().await;
1659 }
1660
1661 #[tokio::test]
1662 async fn test_create_namespace() {
1663 let mut server = Server::new_async().await;
1664
1665 let config_mock = create_config_mock(&mut server).await;
1666
1667 let create_ns_mock = server
1668 .mock("POST", "/v1/namespaces")
1669 .with_body(
1670 r#"{
1671 "namespace": [ "ns1", "ns11"],
1672 "properties" : {
1673 "key1": "value1"
1674 }
1675 }"#,
1676 )
1677 .create_async()
1678 .await;
1679
1680 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1681
1682 let namespaces = catalog
1683 .create_namespace(
1684 &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1685 HashMap::from([("key1".to_string(), "value1".to_string())]),
1686 )
1687 .await
1688 .unwrap();
1689
1690 let expected_ns = Namespace::with_properties(
1691 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1692 HashMap::from([("key1".to_string(), "value1".to_string())]),
1693 );
1694
1695 assert_eq!(expected_ns, namespaces);
1696
1697 config_mock.assert_async().await;
1698 create_ns_mock.assert_async().await;
1699 }
1700
1701 #[tokio::test]
1702 async fn test_get_namespace() {
1703 let mut server = Server::new_async().await;
1704
1705 let config_mock = create_config_mock(&mut server).await;
1706
1707 let get_ns_mock = server
1708 .mock("GET", "/v1/namespaces/ns1")
1709 .with_body(
1710 r#"{
1711 "namespace": [ "ns1"],
1712 "properties" : {
1713 "key1": "value1"
1714 }
1715 }"#,
1716 )
1717 .create_async()
1718 .await;
1719
1720 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1721
1722 let namespaces = catalog
1723 .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1724 .await
1725 .unwrap();
1726
1727 let expected_ns = Namespace::with_properties(
1728 NamespaceIdent::new("ns1".to_string()),
1729 HashMap::from([("key1".to_string(), "value1".to_string())]),
1730 );
1731
1732 assert_eq!(expected_ns, namespaces);
1733
1734 config_mock.assert_async().await;
1735 get_ns_mock.assert_async().await;
1736 }
1737
1738 #[tokio::test]
1739 async fn check_namespace_exists() {
1740 let mut server = Server::new_async().await;
1741
1742 let config_mock = create_config_mock(&mut server).await;
1743
1744 let get_ns_mock = server
1745 .mock("HEAD", "/v1/namespaces/ns1")
1746 .with_status(204)
1747 .create_async()
1748 .await;
1749
1750 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1751
1752 assert!(
1753 catalog
1754 .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1755 .await
1756 .unwrap()
1757 );
1758
1759 config_mock.assert_async().await;
1760 get_ns_mock.assert_async().await;
1761 }
1762
1763 #[tokio::test]
1764 async fn test_drop_namespace() {
1765 let mut server = Server::new_async().await;
1766
1767 let config_mock = create_config_mock(&mut server).await;
1768
1769 let drop_ns_mock = server
1770 .mock("DELETE", "/v1/namespaces/ns1")
1771 .with_status(204)
1772 .create_async()
1773 .await;
1774
1775 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1776
1777 catalog
1778 .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1779 .await
1780 .unwrap();
1781
1782 config_mock.assert_async().await;
1783 drop_ns_mock.assert_async().await;
1784 }
1785
1786 #[tokio::test]
1787 async fn test_list_tables() {
1788 let mut server = Server::new_async().await;
1789
1790 let config_mock = create_config_mock(&mut server).await;
1791
1792 let list_tables_mock = server
1793 .mock("GET", "/v1/namespaces/ns1/tables")
1794 .with_status(200)
1795 .with_body(
1796 r#"{
1797 "identifiers": [
1798 {
1799 "namespace": ["ns1"],
1800 "name": "table1"
1801 },
1802 {
1803 "namespace": ["ns1"],
1804 "name": "table2"
1805 }
1806 ]
1807 }"#,
1808 )
1809 .create_async()
1810 .await;
1811
1812 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1813
1814 let tables = catalog
1815 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1816 .await
1817 .unwrap();
1818
1819 let expected_tables = vec![
1820 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1821 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1822 ];
1823
1824 assert_eq!(tables, expected_tables);
1825
1826 config_mock.assert_async().await;
1827 list_tables_mock.assert_async().await;
1828 }
1829
1830 #[tokio::test]
1831 async fn test_list_tables_with_pagination() {
1832 let mut server = Server::new_async().await;
1833
1834 let config_mock = create_config_mock(&mut server).await;
1835
1836 let list_tables_mock_page1 = server
1837 .mock("GET", "/v1/namespaces/ns1/tables")
1838 .with_status(200)
1839 .with_body(
1840 r#"{
1841 "identifiers": [
1842 {
1843 "namespace": ["ns1"],
1844 "name": "table1"
1845 },
1846 {
1847 "namespace": ["ns1"],
1848 "name": "table2"
1849 }
1850 ],
1851 "next-page-token": "token456"
1852 }"#,
1853 )
1854 .create_async()
1855 .await;
1856
1857 let list_tables_mock_page2 = server
1858 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1859 .with_status(200)
1860 .with_body(
1861 r#"{
1862 "identifiers": [
1863 {
1864 "namespace": ["ns1"],
1865 "name": "table3"
1866 },
1867 {
1868 "namespace": ["ns1"],
1869 "name": "table4"
1870 }
1871 ]
1872 }"#,
1873 )
1874 .create_async()
1875 .await;
1876
1877 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1878
1879 let tables = catalog
1880 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1881 .await
1882 .unwrap();
1883
1884 let expected_tables = vec![
1885 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1886 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1887 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1888 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1889 ];
1890
1891 assert_eq!(tables, expected_tables);
1892
1893 config_mock.assert_async().await;
1894 list_tables_mock_page1.assert_async().await;
1895 list_tables_mock_page2.assert_async().await;
1896 }
1897
1898 #[tokio::test]
1899 async fn test_list_tables_with_multiple_pages() {
1900 let mut server = Server::new_async().await;
1901
1902 let config_mock = create_config_mock(&mut server).await;
1903
1904 let list_tables_mock_page1 = server
1906 .mock("GET", "/v1/namespaces/ns1/tables")
1907 .with_status(200)
1908 .with_body(
1909 r#"{
1910 "identifiers": [
1911 {
1912 "namespace": ["ns1"],
1913 "name": "table1"
1914 },
1915 {
1916 "namespace": ["ns1"],
1917 "name": "table2"
1918 }
1919 ],
1920 "next-page-token": "page2"
1921 }"#,
1922 )
1923 .create_async()
1924 .await;
1925
1926 let list_tables_mock_page2 = server
1928 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1929 .with_status(200)
1930 .with_body(
1931 r#"{
1932 "identifiers": [
1933 {
1934 "namespace": ["ns1"],
1935 "name": "table3"
1936 },
1937 {
1938 "namespace": ["ns1"],
1939 "name": "table4"
1940 }
1941 ],
1942 "next-page-token": "page3"
1943 }"#,
1944 )
1945 .create_async()
1946 .await;
1947
1948 let list_tables_mock_page3 = server
1950 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
1951 .with_status(200)
1952 .with_body(
1953 r#"{
1954 "identifiers": [
1955 {
1956 "namespace": ["ns1"],
1957 "name": "table5"
1958 }
1959 ],
1960 "next-page-token": "page4"
1961 }"#,
1962 )
1963 .create_async()
1964 .await;
1965
1966 let list_tables_mock_page4 = server
1968 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
1969 .with_status(200)
1970 .with_body(
1971 r#"{
1972 "identifiers": [
1973 {
1974 "namespace": ["ns1"],
1975 "name": "table6"
1976 },
1977 {
1978 "namespace": ["ns1"],
1979 "name": "table7"
1980 }
1981 ],
1982 "next-page-token": "page5"
1983 }"#,
1984 )
1985 .create_async()
1986 .await;
1987
1988 let list_tables_mock_page5 = server
1990 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
1991 .with_status(200)
1992 .with_body(
1993 r#"{
1994 "identifiers": [
1995 {
1996 "namespace": ["ns1"],
1997 "name": "table8"
1998 }
1999 ]
2000 }"#,
2001 )
2002 .create_async()
2003 .await;
2004
2005 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2006
2007 let tables = catalog
2008 .list_tables(&NamespaceIdent::new("ns1".to_string()))
2009 .await
2010 .unwrap();
2011
2012 let expected_tables = vec![
2013 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2014 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2015 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2016 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2017 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2018 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2019 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2020 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2021 ];
2022
2023 assert_eq!(tables, expected_tables);
2024
2025 config_mock.assert_async().await;
2027 list_tables_mock_page1.assert_async().await;
2028 list_tables_mock_page2.assert_async().await;
2029 list_tables_mock_page3.assert_async().await;
2030 list_tables_mock_page4.assert_async().await;
2031 list_tables_mock_page5.assert_async().await;
2032 }
2033
2034 #[tokio::test]
2035 async fn test_drop_tables() {
2036 let mut server = Server::new_async().await;
2037
2038 let config_mock = create_config_mock(&mut server).await;
2039
2040 let delete_table_mock = server
2041 .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2042 .with_status(204)
2043 .create_async()
2044 .await;
2045
2046 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2047
2048 catalog
2049 .drop_table(&TableIdent::new(
2050 NamespaceIdent::new("ns1".to_string()),
2051 "table1".to_string(),
2052 ))
2053 .await
2054 .unwrap();
2055
2056 config_mock.assert_async().await;
2057 delete_table_mock.assert_async().await;
2058 }
2059
2060 #[tokio::test]
2061 async fn test_check_table_exists() {
2062 let mut server = Server::new_async().await;
2063
2064 let config_mock = create_config_mock(&mut server).await;
2065
2066 let check_table_exists_mock = server
2067 .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2068 .with_status(204)
2069 .create_async()
2070 .await;
2071
2072 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2073
2074 assert!(
2075 catalog
2076 .table_exists(&TableIdent::new(
2077 NamespaceIdent::new("ns1".to_string()),
2078 "table1".to_string(),
2079 ))
2080 .await
2081 .unwrap()
2082 );
2083
2084 config_mock.assert_async().await;
2085 check_table_exists_mock.assert_async().await;
2086 }
2087
2088 #[tokio::test]
2089 async fn test_rename_table() {
2090 let mut server = Server::new_async().await;
2091
2092 let config_mock = create_config_mock(&mut server).await;
2093
2094 let rename_table_mock = server
2095 .mock("POST", "/v1/tables/rename")
2096 .with_status(204)
2097 .create_async()
2098 .await;
2099
2100 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2101
2102 catalog
2103 .rename_table(
2104 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2105 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2106 )
2107 .await
2108 .unwrap();
2109
2110 config_mock.assert_async().await;
2111 rename_table_mock.assert_async().await;
2112 }
2113
2114 #[tokio::test]
2115 async fn test_load_table() {
2116 let mut server = Server::new_async().await;
2117
2118 let config_mock = create_config_mock(&mut server).await;
2119
2120 let rename_table_mock = server
2121 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2122 .with_status(200)
2123 .with_body_from_file(format!(
2124 "{}/testdata/{}",
2125 env!("CARGO_MANIFEST_DIR"),
2126 "load_table_response.json"
2127 ))
2128 .create_async()
2129 .await;
2130
2131 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2132
2133 let table = catalog
2134 .load_table(&TableIdent::new(
2135 NamespaceIdent::new("ns1".to_string()),
2136 "test1".to_string(),
2137 ))
2138 .await
2139 .unwrap();
2140
2141 assert_eq!(
2142 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2143 table.identifier()
2144 );
2145 assert_eq!(
2146 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2147 table.metadata_location().unwrap()
2148 );
2149 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2150 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2151 assert_eq!(
2152 uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2153 table.metadata().uuid()
2154 );
2155 assert_eq!(
2156 Utc.timestamp_millis_opt(1646787054459).unwrap(),
2157 table.metadata().last_updated_timestamp().unwrap()
2158 );
2159 assert_eq!(
2160 vec![&Arc::new(
2161 Schema::builder()
2162 .with_fields(vec![
2163 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2164 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2165 .into(),
2166 ])
2167 .build()
2168 .unwrap()
2169 )],
2170 table.metadata().schemas_iter().collect::<Vec<_>>()
2171 );
2172 assert_eq!(
2173 &HashMap::from([
2174 ("owner".to_string(), "bryan".to_string()),
2175 (
2176 "write.metadata.compression-codec".to_string(),
2177 "gzip".to_string()
2178 )
2179 ]),
2180 table.metadata().properties()
2181 );
2182 assert_eq!(vec![&Arc::new(Snapshot::builder()
2183 .with_snapshot_id(3497810964824022504)
2184 .with_timestamp_ms(1646787054459)
2185 .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2186 .with_sequence_number(0)
2187 .with_schema_id(0)
2188 .with_summary(Summary {
2189 operation: Operation::Append,
2190 additional_properties: HashMap::from_iter([
2191 ("spark.app.id", "local-1646787004168"),
2192 ("added-data-files", "1"),
2193 ("added-records", "1"),
2194 ("added-files-size", "697"),
2195 ("changed-partition-count", "1"),
2196 ("total-records", "1"),
2197 ("total-files-size", "697"),
2198 ("total-data-files", "1"),
2199 ("total-delete-files", "0"),
2200 ("total-position-deletes", "0"),
2201 ("total-equality-deletes", "0")
2202 ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2203 }).build()
2204 )], table.metadata().snapshots().collect::<Vec<_>>());
2205 assert_eq!(
2206 &[SnapshotLog {
2207 timestamp_ms: 1646787054459,
2208 snapshot_id: 3497810964824022504,
2209 }],
2210 table.metadata().history()
2211 );
2212 assert_eq!(
2213 vec![&Arc::new(SortOrder {
2214 order_id: 0,
2215 fields: vec![],
2216 })],
2217 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2218 );
2219
2220 config_mock.assert_async().await;
2221 rename_table_mock.assert_async().await;
2222 }
2223
2224 #[tokio::test]
2225 async fn test_load_table_404() {
2226 let mut server = Server::new_async().await;
2227
2228 let config_mock = create_config_mock(&mut server).await;
2229
2230 let rename_table_mock = server
2231 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2232 .with_status(404)
2233 .with_body(r#"
2234{
2235 "error": {
2236 "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2237 "type": "NoSuchNamespaceErrorException",
2238 "code": 404
2239 }
2240}
2241 "#)
2242 .create_async()
2243 .await;
2244
2245 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2246
2247 let table = catalog
2248 .load_table(&TableIdent::new(
2249 NamespaceIdent::new("ns1".to_string()),
2250 "test1".to_string(),
2251 ))
2252 .await;
2253
2254 assert!(table.is_err());
2255 assert!(table.err().unwrap().message().contains("does not exist"));
2256
2257 config_mock.assert_async().await;
2258 rename_table_mock.assert_async().await;
2259 }
2260
2261 #[tokio::test]
2262 async fn test_create_table() {
2263 let mut server = Server::new_async().await;
2264
2265 let config_mock = create_config_mock(&mut server).await;
2266
2267 let create_table_mock = server
2268 .mock("POST", "/v1/namespaces/ns1/tables")
2269 .with_status(200)
2270 .with_body_from_file(format!(
2271 "{}/testdata/{}",
2272 env!("CARGO_MANIFEST_DIR"),
2273 "create_table_response.json"
2274 ))
2275 .create_async()
2276 .await;
2277
2278 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2279
2280 let table_creation = TableCreation::builder()
2281 .name("test1".to_string())
2282 .schema(
2283 Schema::builder()
2284 .with_fields(vec![
2285 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2286 .into(),
2287 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2288 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2289 .into(),
2290 ])
2291 .with_schema_id(1)
2292 .with_identifier_field_ids(vec![2])
2293 .build()
2294 .unwrap(),
2295 )
2296 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2297 .partition_spec(
2298 UnboundPartitionSpec::builder()
2299 .add_partition_fields(vec![
2300 UnboundPartitionField::builder()
2301 .source_id(1)
2302 .transform(Transform::Truncate(3))
2303 .name("id".to_string())
2304 .build(),
2305 ])
2306 .unwrap()
2307 .build(),
2308 )
2309 .sort_order(
2310 SortOrder::builder()
2311 .with_sort_field(
2312 SortField::builder()
2313 .source_id(2)
2314 .transform(Transform::Identity)
2315 .direction(SortDirection::Ascending)
2316 .null_order(NullOrder::First)
2317 .build(),
2318 )
2319 .build_unbound()
2320 .unwrap(),
2321 )
2322 .build();
2323
2324 let table = catalog
2325 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2326 .await
2327 .unwrap();
2328
2329 assert_eq!(
2330 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2331 table.identifier()
2332 );
2333 assert_eq!(
2334 "s3://warehouse/database/table/metadata.json",
2335 table.metadata_location().unwrap()
2336 );
2337 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2338 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2339 assert_eq!(
2340 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2341 table.metadata().uuid()
2342 );
2343 assert_eq!(
2344 1657810967051,
2345 table
2346 .metadata()
2347 .last_updated_timestamp()
2348 .unwrap()
2349 .timestamp_millis()
2350 );
2351 assert_eq!(
2352 vec![&Arc::new(
2353 Schema::builder()
2354 .with_fields(vec![
2355 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2356 .into(),
2357 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2358 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2359 .into(),
2360 ])
2361 .with_schema_id(0)
2362 .with_identifier_field_ids(vec![2])
2363 .build()
2364 .unwrap()
2365 )],
2366 table.metadata().schemas_iter().collect::<Vec<_>>()
2367 );
2368 assert_eq!(
2369 &HashMap::from([
2370 (
2371 "write.delete.parquet.compression-codec".to_string(),
2372 "zstd".to_string()
2373 ),
2374 (
2375 "write.metadata.compression-codec".to_string(),
2376 "gzip".to_string()
2377 ),
2378 (
2379 "write.summary.partition-limit".to_string(),
2380 "100".to_string()
2381 ),
2382 (
2383 "write.parquet.compression-codec".to_string(),
2384 "zstd".to_string()
2385 ),
2386 ]),
2387 table.metadata().properties()
2388 );
2389 assert!(table.metadata().current_snapshot().is_none());
2390 assert!(table.metadata().history().is_empty());
2391 assert_eq!(
2392 vec![&Arc::new(SortOrder {
2393 order_id: 0,
2394 fields: vec![],
2395 })],
2396 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2397 );
2398
2399 config_mock.assert_async().await;
2400 create_table_mock.assert_async().await;
2401 }
2402
2403 #[tokio::test]
2404 async fn test_create_table_409() {
2405 let mut server = Server::new_async().await;
2406
2407 let config_mock = create_config_mock(&mut server).await;
2408
2409 let create_table_mock = server
2410 .mock("POST", "/v1/namespaces/ns1/tables")
2411 .with_status(409)
2412 .with_body(r#"
2413{
2414 "error": {
2415 "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2416 "type": "AlreadyExistsException",
2417 "code": 409
2418 }
2419}
2420 "#)
2421 .create_async()
2422 .await;
2423
2424 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2425
2426 let table_creation = TableCreation::builder()
2427 .name("test1".to_string())
2428 .schema(
2429 Schema::builder()
2430 .with_fields(vec![
2431 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2432 .into(),
2433 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2434 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2435 .into(),
2436 ])
2437 .with_schema_id(1)
2438 .with_identifier_field_ids(vec![2])
2439 .build()
2440 .unwrap(),
2441 )
2442 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2443 .build();
2444
2445 let table_result = catalog
2446 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2447 .await;
2448
2449 assert!(table_result.is_err());
2450 assert!(
2451 table_result
2452 .err()
2453 .unwrap()
2454 .message()
2455 .contains("already exists")
2456 );
2457
2458 config_mock.assert_async().await;
2459 create_table_mock.assert_async().await;
2460 }
2461
2462 #[tokio::test]
2463 async fn test_update_table() {
2464 let mut server = Server::new_async().await;
2465
2466 let config_mock = create_config_mock(&mut server).await;
2467
2468 let load_table_mock = server
2469 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2470 .with_status(200)
2471 .with_body_from_file(format!(
2472 "{}/testdata/{}",
2473 env!("CARGO_MANIFEST_DIR"),
2474 "load_table_response.json"
2475 ))
2476 .create_async()
2477 .await;
2478
2479 let update_table_mock = server
2480 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2481 .with_status(200)
2482 .with_body_from_file(format!(
2483 "{}/testdata/{}",
2484 env!("CARGO_MANIFEST_DIR"),
2485 "update_table_response.json"
2486 ))
2487 .create_async()
2488 .await;
2489
2490 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2491
2492 let table1 = {
2493 let file = File::open(format!(
2494 "{}/testdata/{}",
2495 env!("CARGO_MANIFEST_DIR"),
2496 "create_table_response.json"
2497 ))
2498 .unwrap();
2499 let reader = BufReader::new(file);
2500 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2501
2502 Table::builder()
2503 .metadata(resp.metadata)
2504 .metadata_location(resp.metadata_location.unwrap())
2505 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2506 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2507 .build()
2508 .unwrap()
2509 };
2510
2511 let tx = Transaction::new(&table1);
2512 let table = tx
2513 .upgrade_table_version()
2514 .set_format_version(FormatVersion::V2)
2515 .apply(tx)
2516 .unwrap()
2517 .commit(&catalog)
2518 .await
2519 .unwrap();
2520
2521 assert_eq!(
2522 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2523 table.identifier()
2524 );
2525 assert_eq!(
2526 "s3://warehouse/database/table/metadata.json",
2527 table.metadata_location().unwrap()
2528 );
2529 assert_eq!(FormatVersion::V2, table.metadata().format_version());
2530 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2531 assert_eq!(
2532 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2533 table.metadata().uuid()
2534 );
2535 assert_eq!(
2536 1657810967051,
2537 table
2538 .metadata()
2539 .last_updated_timestamp()
2540 .unwrap()
2541 .timestamp_millis()
2542 );
2543 assert_eq!(
2544 vec![&Arc::new(
2545 Schema::builder()
2546 .with_fields(vec![
2547 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2548 .into(),
2549 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2550 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2551 .into(),
2552 ])
2553 .with_schema_id(0)
2554 .with_identifier_field_ids(vec![2])
2555 .build()
2556 .unwrap()
2557 )],
2558 table.metadata().schemas_iter().collect::<Vec<_>>()
2559 );
2560 assert_eq!(
2561 &HashMap::from([
2562 (
2563 "write.delete.parquet.compression-codec".to_string(),
2564 "zstd".to_string()
2565 ),
2566 (
2567 "write.metadata.compression-codec".to_string(),
2568 "gzip".to_string()
2569 ),
2570 (
2571 "write.summary.partition-limit".to_string(),
2572 "100".to_string()
2573 ),
2574 (
2575 "write.parquet.compression-codec".to_string(),
2576 "zstd".to_string()
2577 ),
2578 ]),
2579 table.metadata().properties()
2580 );
2581 assert!(table.metadata().current_snapshot().is_none());
2582 assert!(table.metadata().history().is_empty());
2583 assert_eq!(
2584 vec![&Arc::new(SortOrder {
2585 order_id: 0,
2586 fields: vec![],
2587 })],
2588 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2589 );
2590
2591 config_mock.assert_async().await;
2592 update_table_mock.assert_async().await;
2593 load_table_mock.assert_async().await
2594 }
2595
2596 #[tokio::test]
2597 async fn test_update_table_404() {
2598 let mut server = Server::new_async().await;
2599
2600 let config_mock = create_config_mock(&mut server).await;
2601
2602 let load_table_mock = server
2603 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2604 .with_status(200)
2605 .with_body_from_file(format!(
2606 "{}/testdata/{}",
2607 env!("CARGO_MANIFEST_DIR"),
2608 "load_table_response.json"
2609 ))
2610 .create_async()
2611 .await;
2612
2613 let update_table_mock = server
2614 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2615 .with_status(404)
2616 .with_body(
2617 r#"
2618{
2619 "error": {
2620 "message": "The given table does not exist",
2621 "type": "NoSuchTableException",
2622 "code": 404
2623 }
2624}
2625 "#,
2626 )
2627 .create_async()
2628 .await;
2629
2630 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2631
2632 let table1 = {
2633 let file = File::open(format!(
2634 "{}/testdata/{}",
2635 env!("CARGO_MANIFEST_DIR"),
2636 "create_table_response.json"
2637 ))
2638 .unwrap();
2639 let reader = BufReader::new(file);
2640 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2641
2642 Table::builder()
2643 .metadata(resp.metadata)
2644 .metadata_location(resp.metadata_location.unwrap())
2645 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2646 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2647 .build()
2648 .unwrap()
2649 };
2650
2651 let tx = Transaction::new(&table1);
2652 let table_result = tx
2653 .upgrade_table_version()
2654 .set_format_version(FormatVersion::V2)
2655 .apply(tx)
2656 .unwrap()
2657 .commit(&catalog)
2658 .await;
2659
2660 assert!(table_result.is_err());
2661 assert!(
2662 table_result
2663 .err()
2664 .unwrap()
2665 .message()
2666 .contains("does not exist")
2667 );
2668
2669 config_mock.assert_async().await;
2670 update_table_mock.assert_async().await;
2671 load_table_mock.assert_async().await;
2672 }
2673
2674 #[tokio::test]
2675 async fn test_register_table() {
2676 let mut server = Server::new_async().await;
2677
2678 let config_mock = create_config_mock(&mut server).await;
2679
2680 let register_table_mock = server
2681 .mock("POST", "/v1/namespaces/ns1/register")
2682 .with_status(200)
2683 .with_body_from_file(format!(
2684 "{}/testdata/{}",
2685 env!("CARGO_MANIFEST_DIR"),
2686 "load_table_response.json"
2687 ))
2688 .create_async()
2689 .await;
2690
2691 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2692 let table_ident =
2693 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2694 let metadata_location = String::from(
2695 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2696 );
2697
2698 let table = catalog
2699 .register_table(&table_ident, metadata_location)
2700 .await
2701 .unwrap();
2702
2703 assert_eq!(
2704 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2705 table.identifier()
2706 );
2707 assert_eq!(
2708 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2709 table.metadata_location().unwrap()
2710 );
2711
2712 config_mock.assert_async().await;
2713 register_table_mock.assert_async().await;
2714 }
2715
2716 #[tokio::test]
2717 async fn test_register_table_404() {
2718 let mut server = Server::new_async().await;
2719
2720 let config_mock = create_config_mock(&mut server).await;
2721
2722 let register_table_mock = server
2723 .mock("POST", "/v1/namespaces/ns1/register")
2724 .with_status(404)
2725 .with_body(
2726 r#"
2727{
2728 "error": {
2729 "message": "The namespace specified does not exist",
2730 "type": "NoSuchNamespaceErrorException",
2731 "code": 404
2732 }
2733}
2734 "#,
2735 )
2736 .create_async()
2737 .await;
2738
2739 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2740
2741 let table_ident =
2742 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2743 let metadata_location = String::from(
2744 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2745 );
2746 let table = catalog
2747 .register_table(&table_ident, metadata_location)
2748 .await;
2749
2750 assert!(table.is_err());
2751 assert!(table.err().unwrap().message().contains("does not exist"));
2752
2753 config_mock.assert_async().await;
2754 register_table_mock.assert_async().await;
2755 }
2756
2757 #[tokio::test]
2758 async fn test_create_rest_catalog() {
2759 let builder = RestCatalogBuilder::default().with_client(Client::new());
2760
2761 let catalog = builder
2762 .load(
2763 "test",
2764 HashMap::from([
2765 (
2766 REST_CATALOG_PROP_URI.to_string(),
2767 "http://localhost:8080".to_string(),
2768 ),
2769 ("a".to_string(), "b".to_string()),
2770 ]),
2771 )
2772 .await;
2773
2774 assert!(catalog.is_ok());
2775
2776 let catalog_config = catalog.unwrap().user_config;
2777 assert_eq!(catalog_config.name.as_deref(), Some("test"));
2778 assert_eq!(catalog_config.uri, "http://localhost:8080");
2779 assert_eq!(catalog_config.warehouse, None);
2780 assert!(catalog_config.client.is_some());
2781
2782 assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2783 assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2784 }
2785
2786 #[tokio::test]
2787 async fn test_create_rest_catalog_no_uri() {
2788 let builder = RestCatalogBuilder::default();
2789
2790 let catalog = builder
2791 .load(
2792 "test",
2793 HashMap::from([(
2794 REST_CATALOG_PROP_WAREHOUSE.to_string(),
2795 "s3://warehouse".to_string(),
2796 )]),
2797 )
2798 .await;
2799
2800 assert!(catalog.is_err());
2801 if let Err(err) = catalog {
2802 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2803 assert_eq!(err.message(), "Catalog uri is required");
2804 }
2805 }
2806}