1use std::any::Any;
21use std::collections::HashMap;
22use std::future::Future;
23use std::str::FromStr;
24
25use async_trait::async_trait;
26use iceberg::io::{self, FileIO};
27use iceberg::table::Table;
28use iceberg::{
29 Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
30 TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34 HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41 HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44 CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45 CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46 NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49pub const REST_CATALOG_PROP_URI: &str = "uri";
51pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53
54const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
55const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
56const PATH_V1: &str = "v1";
57
58#[derive(Debug)]
60pub struct RestCatalogBuilder(RestCatalogConfig);
61
62impl Default for RestCatalogBuilder {
63 fn default() -> Self {
64 Self(RestCatalogConfig {
65 name: None,
66 uri: "".to_string(),
67 warehouse: None,
68 props: HashMap::new(),
69 client: None,
70 })
71 }
72}
73
74impl CatalogBuilder for RestCatalogBuilder {
75 type C = RestCatalog;
76
77 fn load(
78 mut self,
79 name: impl Into<String>,
80 props: HashMap<String, String>,
81 ) -> impl Future<Output = Result<Self::C>> + Send {
82 self.0.name = Some(name.into());
83
84 if props.contains_key(REST_CATALOG_PROP_URI) {
85 self.0.uri = props
86 .get(REST_CATALOG_PROP_URI)
87 .cloned()
88 .unwrap_or_default();
89 }
90
91 if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
92 self.0.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
93 }
94
95 self.0.props = props
97 .into_iter()
98 .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
99 .collect();
100
101 let result = {
102 if self.0.name.is_none() {
103 Err(Error::new(
104 ErrorKind::DataInvalid,
105 "Catalog name is required",
106 ))
107 } else if self.0.uri.is_empty() {
108 Err(Error::new(
109 ErrorKind::DataInvalid,
110 "Catalog uri is required",
111 ))
112 } else {
113 Ok(RestCatalog::new(self.0))
114 }
115 };
116
117 std::future::ready(result)
118 }
119}
120
121impl RestCatalogBuilder {
122 pub fn with_client(mut self, client: Client) -> Self {
124 self.0.client = Some(client);
125 self
126 }
127}
128
129#[derive(Clone, Debug, TypedBuilder)]
131pub(crate) struct RestCatalogConfig {
132 #[builder(default, setter(strip_option))]
133 name: Option<String>,
134
135 uri: String,
136
137 #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
138 warehouse: Option<String>,
139
140 #[builder(default)]
141 props: HashMap<String, String>,
142
143 #[builder(default)]
144 client: Option<Client>,
145}
146
147impl RestCatalogConfig {
148 fn url_prefixed(&self, parts: &[&str]) -> String {
149 [&self.uri, PATH_V1]
150 .into_iter()
151 .chain(self.props.get("prefix").map(|s| &**s))
152 .chain(parts.iter().cloned())
153 .join("/")
154 }
155
156 fn config_endpoint(&self) -> String {
157 [&self.uri, PATH_V1, "config"].join("/")
158 }
159
160 pub(crate) fn get_token_endpoint(&self) -> String {
161 if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
162 oauth2_uri.to_string()
163 } else {
164 [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
165 }
166 }
167
168 fn namespaces_endpoint(&self) -> String {
169 self.url_prefixed(&["namespaces"])
170 }
171
172 fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
173 self.url_prefixed(&["namespaces", &ns.to_url_string()])
174 }
175
176 fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
177 self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
178 }
179
180 fn rename_table_endpoint(&self) -> String {
181 self.url_prefixed(&["tables", "rename"])
182 }
183
184 fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
185 self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
186 }
187
188 fn table_endpoint(&self, table: &TableIdent) -> String {
189 self.url_prefixed(&[
190 "namespaces",
191 &table.namespace.to_url_string(),
192 "tables",
193 &table.name,
194 ])
195 }
196
197 pub(crate) fn client(&self) -> Option<Client> {
199 self.client.clone()
200 }
201
202 pub(crate) fn token(&self) -> Option<String> {
206 self.props.get("token").cloned()
207 }
208
209 pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
218 let cred = self.props.get("credential")?;
219
220 match cred.split_once(':') {
221 Some((client_id, client_secret)) => {
222 Some((Some(client_id.to_string()), client_secret.to_string()))
223 }
224 None => Some((None, cred.to_string())),
225 }
226 }
227
228 pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
235 let mut headers = HeaderMap::from_iter([
236 (
237 header::CONTENT_TYPE,
238 HeaderValue::from_static("application/json"),
239 ),
240 (
241 HeaderName::from_static("x-client-version"),
242 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
243 ),
244 (
245 header::USER_AGENT,
246 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
247 ),
248 ]);
249
250 for (key, value) in self
251 .props
252 .iter()
253 .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
254 {
255 headers.insert(
256 HeaderName::from_str(key).map_err(|e| {
257 Error::new(
258 ErrorKind::DataInvalid,
259 format!("Invalid header name: {key}"),
260 )
261 .with_source(e)
262 })?,
263 HeaderValue::from_str(value).map_err(|e| {
264 Error::new(
265 ErrorKind::DataInvalid,
266 format!("Invalid header value: {value}"),
267 )
268 .with_source(e)
269 })?,
270 );
271 }
272
273 Ok(headers)
274 }
275
276 pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
278 let mut params = HashMap::new();
279
280 if let Some(scope) = self.props.get("scope") {
281 params.insert("scope".to_string(), scope.to_string());
282 } else {
283 params.insert("scope".to_string(), "catalog".to_string());
284 }
285
286 let optional_params = ["audience", "resource"];
287 for param_name in optional_params {
288 if let Some(value) = self.props.get(param_name) {
289 params.insert(param_name.to_string(), value.to_string());
290 }
291 }
292
293 params
294 }
295
296 pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
298 if let Some(uri) = config.overrides.remove("uri") {
299 self.uri = uri;
300 }
301
302 let mut props = config.defaults;
303 props.extend(self.props);
304 props.extend(config.overrides);
305
306 self.props = props;
307 self
308 }
309}
310
311#[derive(Debug)]
312struct RestContext {
313 client: HttpClient,
314 config: RestCatalogConfig,
318}
319
320#[derive(Debug)]
322pub struct RestCatalog {
323 user_config: RestCatalogConfig,
327 ctx: OnceCell<RestContext>,
328 file_io_extensions: io::Extensions,
330}
331
332impl RestCatalog {
333 fn new(config: RestCatalogConfig) -> Self {
335 Self {
336 user_config: config,
337 ctx: OnceCell::new(),
338 file_io_extensions: io::Extensions::default(),
339 }
340 }
341
342 pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
344 self.file_io_extensions.add(ext);
345 self
346 }
347
348 async fn context(&self) -> Result<&RestContext> {
350 self.ctx
351 .get_or_try_init(|| async {
352 let client = HttpClient::new(&self.user_config)?;
353 let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
354 let config = self.user_config.clone().merge_with_config(catalog_config);
355 let client = client.update_with(&config)?;
356
357 Ok(RestContext { config, client })
358 })
359 .await
360 }
361
362 async fn load_config(
366 client: &HttpClient,
367 user_config: &RestCatalogConfig,
368 ) -> Result<CatalogConfig> {
369 let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
370
371 if let Some(warehouse_location) = &user_config.warehouse {
372 request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
373 }
374
375 let request = request_builder.build()?;
376
377 let http_response = client.query_catalog(request).await?;
378
379 match http_response.status() {
380 StatusCode::OK => deserialize_catalog_response(http_response).await,
381 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
382 }
383 }
384
385 async fn load_file_io(
386 &self,
387 metadata_location: Option<&str>,
388 extra_config: Option<HashMap<String, String>>,
389 ) -> Result<FileIO> {
390 let mut props = self.context().await?.config.props.clone();
391 if let Some(config) = extra_config {
392 props.extend(config);
393 }
394
395 let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
398 Some(url) if Url::parse(url).is_ok() => Some(url),
399 Some(_) => None,
400 None => None,
401 };
402
403 let file_io = match metadata_location.or(warehouse_path) {
404 Some(url) => FileIO::from_path(url)?
405 .with_props(props)
406 .with_extensions(self.file_io_extensions.clone())
407 .build()?,
408 None => {
409 return Err(Error::new(
410 ErrorKind::Unexpected,
411 "Unable to load file io, neither warehouse nor metadata location is set!",
412 ))?;
413 }
414 };
415
416 Ok(file_io)
417 }
418
419 pub async fn invalidate_token(&self) -> Result<()> {
422 self.context().await?.client.invalidate_token().await
423 }
424
425 pub async fn regenerate_token(&self) -> Result<()> {
432 self.context().await?.client.regenerate_token().await
433 }
434}
435
436#[async_trait]
439impl Catalog for RestCatalog {
440 async fn list_namespaces(
441 &self,
442 parent: Option<&NamespaceIdent>,
443 ) -> Result<Vec<NamespaceIdent>> {
444 let context = self.context().await?;
445 let endpoint = context.config.namespaces_endpoint();
446 let mut namespaces = Vec::new();
447 let mut next_token = None;
448
449 loop {
450 let mut request = context.client.request(Method::GET, endpoint.clone());
451
452 if let Some(ns) = parent {
454 request = request.query(&[("parent", ns.to_url_string())]);
455 }
456
457 if let Some(token) = next_token {
458 request = request.query(&[("pageToken", token)]);
459 }
460
461 let http_response = context.client.query_catalog(request.build()?).await?;
462
463 match http_response.status() {
464 StatusCode::OK => {
465 let response =
466 deserialize_catalog_response::<ListNamespaceResponse>(http_response)
467 .await?;
468
469 namespaces.extend(response.namespaces);
470
471 match response.next_page_token {
472 Some(token) => next_token = Some(token),
473 None => break,
474 }
475 }
476 StatusCode::NOT_FOUND => {
477 return Err(Error::new(
478 ErrorKind::Unexpected,
479 "The parent parameter of the namespace provided does not exist",
480 ));
481 }
482 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
483 }
484 }
485
486 Ok(namespaces)
487 }
488
489 async fn create_namespace(
490 &self,
491 namespace: &NamespaceIdent,
492 properties: HashMap<String, String>,
493 ) -> Result<Namespace> {
494 let context = self.context().await?;
495
496 let request = context
497 .client
498 .request(Method::POST, context.config.namespaces_endpoint())
499 .json(&CreateNamespaceRequest {
500 namespace: namespace.clone(),
501 properties,
502 })
503 .build()?;
504
505 let http_response = context.client.query_catalog(request).await?;
506
507 match http_response.status() {
508 StatusCode::OK => {
509 let response =
510 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
511 Ok(Namespace::from(response))
512 }
513 StatusCode::CONFLICT => Err(Error::new(
514 ErrorKind::Unexpected,
515 "Tried to create a namespace that already exists",
516 )),
517 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
518 }
519 }
520
521 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
522 let context = self.context().await?;
523
524 let request = context
525 .client
526 .request(Method::GET, context.config.namespace_endpoint(namespace))
527 .build()?;
528
529 let http_response = context.client.query_catalog(request).await?;
530
531 match http_response.status() {
532 StatusCode::OK => {
533 let response =
534 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
535 Ok(Namespace::from(response))
536 }
537 StatusCode::NOT_FOUND => Err(Error::new(
538 ErrorKind::Unexpected,
539 "Tried to get a namespace that does not exist",
540 )),
541 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
542 }
543 }
544
545 async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
546 let context = self.context().await?;
547
548 let request = context
549 .client
550 .request(Method::HEAD, context.config.namespace_endpoint(ns))
551 .build()?;
552
553 let http_response = context.client.query_catalog(request).await?;
554
555 match http_response.status() {
556 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
557 StatusCode::NOT_FOUND => Ok(false),
558 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
559 }
560 }
561
562 async fn update_namespace(
563 &self,
564 _namespace: &NamespaceIdent,
565 _properties: HashMap<String, String>,
566 ) -> Result<()> {
567 Err(Error::new(
568 ErrorKind::FeatureUnsupported,
569 "Updating namespace not supported yet!",
570 ))
571 }
572
573 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
574 let context = self.context().await?;
575
576 let request = context
577 .client
578 .request(Method::DELETE, context.config.namespace_endpoint(namespace))
579 .build()?;
580
581 let http_response = context.client.query_catalog(request).await?;
582
583 match http_response.status() {
584 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
585 StatusCode::NOT_FOUND => Err(Error::new(
586 ErrorKind::Unexpected,
587 "Tried to drop a namespace that does not exist",
588 )),
589 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
590 }
591 }
592
593 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
594 let context = self.context().await?;
595 let endpoint = context.config.tables_endpoint(namespace);
596 let mut identifiers = Vec::new();
597 let mut next_token = None;
598
599 loop {
600 let mut request = context.client.request(Method::GET, endpoint.clone());
601
602 if let Some(token) = next_token {
603 request = request.query(&[("pageToken", token)]);
604 }
605
606 let http_response = context.client.query_catalog(request.build()?).await?;
607
608 match http_response.status() {
609 StatusCode::OK => {
610 let response =
611 deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
612
613 identifiers.extend(response.identifiers);
614
615 match response.next_page_token {
616 Some(token) => next_token = Some(token),
617 None => break,
618 }
619 }
620 StatusCode::NOT_FOUND => {
621 return Err(Error::new(
622 ErrorKind::Unexpected,
623 "Tried to list tables of a namespace that does not exist",
624 ));
625 }
626 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
627 }
628 }
629
630 Ok(identifiers)
631 }
632
633 async fn create_table(
640 &self,
641 namespace: &NamespaceIdent,
642 creation: TableCreation,
643 ) -> Result<Table> {
644 let context = self.context().await?;
645
646 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
647
648 let request = context
649 .client
650 .request(Method::POST, context.config.tables_endpoint(namespace))
651 .json(&CreateTableRequest {
652 name: creation.name,
653 location: creation.location,
654 schema: creation.schema,
655 partition_spec: creation.partition_spec,
656 write_order: creation.sort_order,
657 stage_create: Some(false),
658 properties: creation.properties,
659 })
660 .build()?;
661
662 let http_response = context.client.query_catalog(request).await?;
663
664 let response = match http_response.status() {
665 StatusCode::OK => {
666 deserialize_catalog_response::<LoadTableResult>(http_response).await?
667 }
668 StatusCode::NOT_FOUND => {
669 return Err(Error::new(
670 ErrorKind::Unexpected,
671 "Tried to create a table under a namespace that does not exist",
672 ));
673 }
674 StatusCode::CONFLICT => {
675 return Err(Error::new(
676 ErrorKind::Unexpected,
677 "The table already exists",
678 ));
679 }
680 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
681 };
682
683 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
684 ErrorKind::DataInvalid,
685 "Metadata location missing in `create_table` response!",
686 ))?;
687
688 let config = response
689 .config
690 .into_iter()
691 .chain(self.user_config.props.clone())
692 .collect();
693
694 let file_io = self
695 .load_file_io(Some(metadata_location), Some(config))
696 .await?;
697
698 let table_builder = Table::builder()
699 .identifier(table_ident.clone())
700 .file_io(file_io)
701 .metadata(response.metadata);
702
703 if let Some(metadata_location) = response.metadata_location {
704 table_builder.metadata_location(metadata_location).build()
705 } else {
706 table_builder.build()
707 }
708 }
709
710 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
716 let context = self.context().await?;
717
718 let request = context
719 .client
720 .request(Method::GET, context.config.table_endpoint(table_ident))
721 .build()?;
722
723 let http_response = context.client.query_catalog(request).await?;
724
725 let response = match http_response.status() {
726 StatusCode::OK | StatusCode::NOT_MODIFIED => {
727 deserialize_catalog_response::<LoadTableResult>(http_response).await?
728 }
729 StatusCode::NOT_FOUND => {
730 return Err(Error::new(
731 ErrorKind::Unexpected,
732 "Tried to load a table that does not exist",
733 ));
734 }
735 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
736 };
737
738 let config = response
739 .config
740 .into_iter()
741 .chain(self.user_config.props.clone())
742 .collect();
743
744 let file_io = self
745 .load_file_io(response.metadata_location.as_deref(), Some(config))
746 .await?;
747
748 let table_builder = Table::builder()
749 .identifier(table_ident.clone())
750 .file_io(file_io)
751 .metadata(response.metadata);
752
753 if let Some(metadata_location) = response.metadata_location {
754 table_builder.metadata_location(metadata_location).build()
755 } else {
756 table_builder.build()
757 }
758 }
759
760 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
762 let context = self.context().await?;
763
764 let request = context
765 .client
766 .request(Method::DELETE, context.config.table_endpoint(table))
767 .build()?;
768
769 let http_response = context.client.query_catalog(request).await?;
770
771 match http_response.status() {
772 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
773 StatusCode::NOT_FOUND => Err(Error::new(
774 ErrorKind::Unexpected,
775 "Tried to drop a table that does not exist",
776 )),
777 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
778 }
779 }
780
781 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
783 let context = self.context().await?;
784
785 let request = context
786 .client
787 .request(Method::HEAD, context.config.table_endpoint(table))
788 .build()?;
789
790 let http_response = context.client.query_catalog(request).await?;
791
792 match http_response.status() {
793 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
794 StatusCode::NOT_FOUND => Ok(false),
795 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
796 }
797 }
798
799 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
801 let context = self.context().await?;
802
803 let request = context
804 .client
805 .request(Method::POST, context.config.rename_table_endpoint())
806 .json(&RenameTableRequest {
807 source: src.clone(),
808 destination: dest.clone(),
809 })
810 .build()?;
811
812 let http_response = context.client.query_catalog(request).await?;
813
814 match http_response.status() {
815 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
816 StatusCode::NOT_FOUND => Err(Error::new(
817 ErrorKind::Unexpected,
818 "Tried to rename a table that does not exist (is the namespace correct?)",
819 )),
820 StatusCode::CONFLICT => Err(Error::new(
821 ErrorKind::Unexpected,
822 "Tried to rename a table to a name that already exists",
823 )),
824 _ => Err(deserialize_unexpected_catalog_error(http_response).await),
825 }
826 }
827
828 async fn register_table(
829 &self,
830 table_ident: &TableIdent,
831 metadata_location: String,
832 ) -> Result<Table> {
833 let context = self.context().await?;
834
835 let request = context
836 .client
837 .request(
838 Method::POST,
839 context
840 .config
841 .register_table_endpoint(table_ident.namespace()),
842 )
843 .json(&RegisterTableRequest {
844 name: table_ident.name.clone(),
845 metadata_location: metadata_location.clone(),
846 overwrite: Some(false),
847 })
848 .build()?;
849
850 let http_response = context.client.query_catalog(request).await?;
851
852 let response: LoadTableResult = match http_response.status() {
853 StatusCode::OK => {
854 deserialize_catalog_response::<LoadTableResult>(http_response).await?
855 }
856 StatusCode::NOT_FOUND => {
857 return Err(Error::new(
858 ErrorKind::NamespaceNotFound,
859 "The namespace specified does not exist.",
860 ));
861 }
862 StatusCode::CONFLICT => {
863 return Err(Error::new(
864 ErrorKind::TableAlreadyExists,
865 "The given table already exists.",
866 ));
867 }
868 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
869 };
870
871 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
872 ErrorKind::DataInvalid,
873 "Metadata location missing in `register_table` response!",
874 ))?;
875
876 let file_io = self.load_file_io(Some(metadata_location), None).await?;
877
878 Table::builder()
879 .identifier(table_ident.clone())
880 .file_io(file_io)
881 .metadata(response.metadata)
882 .metadata_location(metadata_location.clone())
883 .build()
884 }
885
886 async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
887 let context = self.context().await?;
888
889 let request = context
890 .client
891 .request(
892 Method::POST,
893 context.config.table_endpoint(commit.identifier()),
894 )
895 .json(&CommitTableRequest {
896 identifier: Some(commit.identifier().clone()),
897 requirements: commit.take_requirements(),
898 updates: commit.take_updates(),
899 })
900 .build()?;
901
902 let http_response = context.client.query_catalog(request).await?;
903
904 let response: CommitTableResponse = match http_response.status() {
905 StatusCode::OK => deserialize_catalog_response(http_response).await?,
906 StatusCode::NOT_FOUND => {
907 return Err(Error::new(
908 ErrorKind::TableNotFound,
909 "Tried to update a table that does not exist",
910 ));
911 }
912 StatusCode::CONFLICT => {
913 return Err(Error::new(
914 ErrorKind::CatalogCommitConflicts,
915 "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
916 )
917 .with_retryable(true));
918 }
919 StatusCode::INTERNAL_SERVER_ERROR => {
920 return Err(Error::new(
921 ErrorKind::Unexpected,
922 "An unknown server-side problem occurred; the commit state is unknown.",
923 ));
924 }
925 StatusCode::BAD_GATEWAY => {
926 return Err(Error::new(
927 ErrorKind::Unexpected,
928 "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
929 ));
930 }
931 StatusCode::GATEWAY_TIMEOUT => {
932 return Err(Error::new(
933 ErrorKind::Unexpected,
934 "A server-side gateway timeout occurred; the commit state is unknown.",
935 ));
936 }
937 _ => return Err(deserialize_unexpected_catalog_error(http_response).await),
938 };
939
940 let file_io = self
941 .load_file_io(Some(&response.metadata_location), None)
942 .await?;
943
944 Table::builder()
945 .identifier(commit.identifier().clone())
946 .file_io(file_io)
947 .metadata(response.metadata)
948 .metadata_location(response.metadata_location)
949 .build()
950 }
951}
952
953#[cfg(test)]
954mod tests {
955 use std::fs::File;
956 use std::io::BufReader;
957 use std::sync::Arc;
958
959 use chrono::{TimeZone, Utc};
960 use iceberg::spec::{
961 FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
962 SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
963 UnboundPartitionField, UnboundPartitionSpec,
964 };
965 use iceberg::transaction::{ApplyTransactionAction, Transaction};
966 use mockito::{Mock, Server, ServerGuard};
967 use serde_json::json;
968 use uuid::uuid;
969
970 use super::*;
971
972 #[tokio::test]
973 async fn test_update_config() {
974 let mut server = Server::new_async().await;
975
976 let config_mock = server
977 .mock("GET", "/v1/config")
978 .with_status(200)
979 .with_body(
980 r#"{
981 "overrides": {
982 "warehouse": "s3://iceberg-catalog"
983 },
984 "defaults": {}
985 }"#,
986 )
987 .create_async()
988 .await;
989
990 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
991
992 assert_eq!(
993 catalog
994 .context()
995 .await
996 .unwrap()
997 .config
998 .props
999 .get("warehouse"),
1000 Some(&"s3://iceberg-catalog".to_string())
1001 );
1002
1003 config_mock.assert_async().await;
1004 }
1005
1006 async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1007 server
1008 .mock("GET", "/v1/config")
1009 .with_status(200)
1010 .with_body(
1011 r#"{
1012 "overrides": {
1013 "warehouse": "s3://iceberg-catalog"
1014 },
1015 "defaults": {}
1016 }"#,
1017 )
1018 .create_async()
1019 .await
1020 }
1021
1022 async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1023 create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1024 }
1025
1026 async fn create_oauth_mock_with_path(
1027 server: &mut ServerGuard,
1028 path: &str,
1029 token: &str,
1030 status: usize,
1031 ) -> Mock {
1032 let body = format!(
1033 r#"{{
1034 "access_token": "{token}",
1035 "token_type": "Bearer",
1036 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1037 "expires_in": 86400
1038 }}"#
1039 );
1040 server
1041 .mock("POST", path)
1042 .with_status(status)
1043 .with_body(body)
1044 .expect(1)
1045 .create_async()
1046 .await
1047 }
1048
1049 #[tokio::test]
1050 async fn test_oauth() {
1051 let mut server = Server::new_async().await;
1052 let oauth_mock = create_oauth_mock(&mut server).await;
1053 let config_mock = create_config_mock(&mut server).await;
1054
1055 let mut props = HashMap::new();
1056 props.insert("credential".to_string(), "client1:secret1".to_string());
1057
1058 let catalog = RestCatalog::new(
1059 RestCatalogConfig::builder()
1060 .uri(server.url())
1061 .props(props)
1062 .build(),
1063 );
1064
1065 let token = catalog.context().await.unwrap().client.token().await;
1066 oauth_mock.assert_async().await;
1067 config_mock.assert_async().await;
1068 assert_eq!(token, Some("ey000000000000".to_string()));
1069 }
1070
1071 #[tokio::test]
1072 async fn test_oauth_with_optional_param() {
1073 let mut props = HashMap::new();
1074 props.insert("credential".to_string(), "client1:secret1".to_string());
1075 props.insert("scope".to_string(), "custom_scope".to_string());
1076 props.insert("audience".to_string(), "custom_audience".to_string());
1077 props.insert("resource".to_string(), "custom_resource".to_string());
1078
1079 let mut server = Server::new_async().await;
1080 let oauth_mock = server
1081 .mock("POST", "/v1/oauth/tokens")
1082 .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1083 .match_body(mockito::Matcher::Regex(
1084 "audience=custom_audience".to_string(),
1085 ))
1086 .match_body(mockito::Matcher::Regex(
1087 "resource=custom_resource".to_string(),
1088 ))
1089 .with_status(200)
1090 .with_body(
1091 r#"{
1092 "access_token": "ey000000000000",
1093 "token_type": "Bearer",
1094 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1095 "expires_in": 86400
1096 }"#,
1097 )
1098 .expect(1)
1099 .create_async()
1100 .await;
1101
1102 let config_mock = create_config_mock(&mut server).await;
1103
1104 let catalog = RestCatalog::new(
1105 RestCatalogConfig::builder()
1106 .uri(server.url())
1107 .props(props)
1108 .build(),
1109 );
1110
1111 let token = catalog.context().await.unwrap().client.token().await;
1112
1113 oauth_mock.assert_async().await;
1114 config_mock.assert_async().await;
1115 assert_eq!(token, Some("ey000000000000".to_string()));
1116 }
1117
1118 #[tokio::test]
1119 async fn test_invalidate_token() {
1120 let mut server = Server::new_async().await;
1121 let oauth_mock = create_oauth_mock(&mut server).await;
1122 let config_mock = create_config_mock(&mut server).await;
1123
1124 let mut props = HashMap::new();
1125 props.insert("credential".to_string(), "client1:secret1".to_string());
1126
1127 let catalog = RestCatalog::new(
1128 RestCatalogConfig::builder()
1129 .uri(server.url())
1130 .props(props)
1131 .build(),
1132 );
1133
1134 let token = catalog.context().await.unwrap().client.token().await;
1135 oauth_mock.assert_async().await;
1136 config_mock.assert_async().await;
1137 assert_eq!(token, Some("ey000000000000".to_string()));
1138
1139 let oauth_mock =
1140 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1141 .await;
1142 catalog.invalidate_token().await.unwrap();
1143 let token = catalog.context().await.unwrap().client.token().await;
1144 oauth_mock.assert_async().await;
1145 assert_eq!(token, Some("ey000000000001".to_string()));
1146 }
1147
1148 #[tokio::test]
1149 async fn test_invalidate_token_failing_request() {
1150 let mut server = Server::new_async().await;
1151 let oauth_mock = create_oauth_mock(&mut server).await;
1152 let config_mock = create_config_mock(&mut server).await;
1153
1154 let mut props = HashMap::new();
1155 props.insert("credential".to_string(), "client1:secret1".to_string());
1156
1157 let catalog = RestCatalog::new(
1158 RestCatalogConfig::builder()
1159 .uri(server.url())
1160 .props(props)
1161 .build(),
1162 );
1163
1164 let token = catalog.context().await.unwrap().client.token().await;
1165 oauth_mock.assert_async().await;
1166 config_mock.assert_async().await;
1167 assert_eq!(token, Some("ey000000000000".to_string()));
1168
1169 let oauth_mock =
1170 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1171 .await;
1172 catalog.invalidate_token().await.unwrap();
1173 let token = catalog.context().await.unwrap().client.token().await;
1174 oauth_mock.assert_async().await;
1175 assert_eq!(token, None);
1176 }
1177
1178 #[tokio::test]
1179 async fn test_regenerate_token() {
1180 let mut server = Server::new_async().await;
1181 let oauth_mock = create_oauth_mock(&mut server).await;
1182 let config_mock = create_config_mock(&mut server).await;
1183
1184 let mut props = HashMap::new();
1185 props.insert("credential".to_string(), "client1:secret1".to_string());
1186
1187 let catalog = RestCatalog::new(
1188 RestCatalogConfig::builder()
1189 .uri(server.url())
1190 .props(props)
1191 .build(),
1192 );
1193
1194 let token = catalog.context().await.unwrap().client.token().await;
1195 oauth_mock.assert_async().await;
1196 config_mock.assert_async().await;
1197 assert_eq!(token, Some("ey000000000000".to_string()));
1198
1199 let oauth_mock =
1200 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1201 .await;
1202 catalog.regenerate_token().await.unwrap();
1203 oauth_mock.assert_async().await;
1204 let token = catalog.context().await.unwrap().client.token().await;
1205 assert_eq!(token, Some("ey000000000001".to_string()));
1206 }
1207
1208 #[tokio::test]
1209 async fn test_regenerate_token_failing_request() {
1210 let mut server = Server::new_async().await;
1211 let oauth_mock = create_oauth_mock(&mut server).await;
1212 let config_mock = create_config_mock(&mut server).await;
1213
1214 let mut props = HashMap::new();
1215 props.insert("credential".to_string(), "client1:secret1".to_string());
1216
1217 let catalog = RestCatalog::new(
1218 RestCatalogConfig::builder()
1219 .uri(server.url())
1220 .props(props)
1221 .build(),
1222 );
1223
1224 let token = catalog.context().await.unwrap().client.token().await;
1225 oauth_mock.assert_async().await;
1226 config_mock.assert_async().await;
1227 assert_eq!(token, Some("ey000000000000".to_string()));
1228
1229 let oauth_mock =
1230 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1231 .await;
1232 let invalidate_result = catalog.regenerate_token().await;
1233 assert!(invalidate_result.is_err());
1234 oauth_mock.assert_async().await;
1235 let token = catalog.context().await.unwrap().client.token().await;
1236
1237 assert_eq!(token, Some("ey000000000000".to_string()));
1239 }
1240
1241 #[tokio::test]
1242 async fn test_http_headers() {
1243 let server = Server::new_async().await;
1244 let mut props = HashMap::new();
1245 props.insert("credential".to_string(), "client1:secret1".to_string());
1246
1247 let config = RestCatalogConfig::builder()
1248 .uri(server.url())
1249 .props(props)
1250 .build();
1251 let headers: HeaderMap = config.extra_headers().unwrap();
1252
1253 let expected_headers = HeaderMap::from_iter([
1254 (
1255 header::CONTENT_TYPE,
1256 HeaderValue::from_static("application/json"),
1257 ),
1258 (
1259 HeaderName::from_static("x-client-version"),
1260 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1261 ),
1262 (
1263 header::USER_AGENT,
1264 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1265 ),
1266 ]);
1267 assert_eq!(headers, expected_headers);
1268 }
1269
1270 #[tokio::test]
1271 async fn test_http_headers_with_custom_headers() {
1272 let server = Server::new_async().await;
1273 let mut props = HashMap::new();
1274 props.insert("credential".to_string(), "client1:secret1".to_string());
1275 props.insert(
1276 "header.content-type".to_string(),
1277 "application/yaml".to_string(),
1278 );
1279 props.insert(
1280 "header.customized-header".to_string(),
1281 "some/value".to_string(),
1282 );
1283
1284 let config = RestCatalogConfig::builder()
1285 .uri(server.url())
1286 .props(props)
1287 .build();
1288 let headers: HeaderMap = config.extra_headers().unwrap();
1289
1290 let expected_headers = HeaderMap::from_iter([
1291 (
1292 header::CONTENT_TYPE,
1293 HeaderValue::from_static("application/yaml"),
1294 ),
1295 (
1296 HeaderName::from_static("x-client-version"),
1297 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1298 ),
1299 (
1300 header::USER_AGENT,
1301 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1302 ),
1303 (
1304 HeaderName::from_static("customized-header"),
1305 HeaderValue::from_static("some/value"),
1306 ),
1307 ]);
1308 assert_eq!(headers, expected_headers);
1309 }
1310
1311 #[tokio::test]
1312 async fn test_oauth_with_oauth2_server_uri() {
1313 let mut server = Server::new_async().await;
1314 let config_mock = create_config_mock(&mut server).await;
1315
1316 let mut auth_server = Server::new_async().await;
1317 let auth_server_path = "/some/path";
1318 let oauth_mock =
1319 create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1320 .await;
1321
1322 let mut props = HashMap::new();
1323 props.insert("credential".to_string(), "client1:secret1".to_string());
1324 props.insert(
1325 "oauth2-server-uri".to_string(),
1326 format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1327 );
1328
1329 let catalog = RestCatalog::new(
1330 RestCatalogConfig::builder()
1331 .uri(server.url())
1332 .props(props)
1333 .build(),
1334 );
1335
1336 let token = catalog.context().await.unwrap().client.token().await;
1337
1338 oauth_mock.assert_async().await;
1339 config_mock.assert_async().await;
1340 assert_eq!(token, Some("ey000000000000".to_string()));
1341 }
1342
1343 #[tokio::test]
1344 async fn test_config_override() {
1345 let mut server = Server::new_async().await;
1346 let mut redirect_server = Server::new_async().await;
1347 let new_uri = redirect_server.url();
1348
1349 let config_mock = server
1350 .mock("GET", "/v1/config")
1351 .with_status(200)
1352 .with_body(
1353 json!(
1354 {
1355 "overrides": {
1356 "uri": new_uri,
1357 "warehouse": "s3://iceberg-catalog",
1358 "prefix": "ice/warehouses/my"
1359 },
1360 "defaults": {},
1361 }
1362 )
1363 .to_string(),
1364 )
1365 .create_async()
1366 .await;
1367
1368 let list_ns_mock = redirect_server
1369 .mock("GET", "/v1/ice/warehouses/my/namespaces")
1370 .with_body(
1371 r#"{
1372 "namespaces": []
1373 }"#,
1374 )
1375 .create_async()
1376 .await;
1377
1378 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1379
1380 let _namespaces = catalog.list_namespaces(None).await.unwrap();
1381
1382 config_mock.assert_async().await;
1383 list_ns_mock.assert_async().await;
1384 }
1385
1386 #[tokio::test]
1387 async fn test_list_namespace() {
1388 let mut server = Server::new_async().await;
1389
1390 let config_mock = create_config_mock(&mut server).await;
1391
1392 let list_ns_mock = server
1393 .mock("GET", "/v1/namespaces")
1394 .with_body(
1395 r#"{
1396 "namespaces": [
1397 ["ns1", "ns11"],
1398 ["ns2"]
1399 ]
1400 }"#,
1401 )
1402 .create_async()
1403 .await;
1404
1405 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1406
1407 let namespaces = catalog.list_namespaces(None).await.unwrap();
1408
1409 let expected_ns = vec![
1410 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1411 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1412 ];
1413
1414 assert_eq!(expected_ns, namespaces);
1415
1416 config_mock.assert_async().await;
1417 list_ns_mock.assert_async().await;
1418 }
1419
1420 #[tokio::test]
1421 async fn test_list_namespace_with_pagination() {
1422 let mut server = Server::new_async().await;
1423
1424 let config_mock = create_config_mock(&mut server).await;
1425
1426 let list_ns_mock_page1 = server
1427 .mock("GET", "/v1/namespaces")
1428 .with_body(
1429 r#"{
1430 "namespaces": [
1431 ["ns1", "ns11"],
1432 ["ns2"]
1433 ],
1434 "next-page-token": "token123"
1435 }"#,
1436 )
1437 .create_async()
1438 .await;
1439
1440 let list_ns_mock_page2 = server
1441 .mock("GET", "/v1/namespaces?pageToken=token123")
1442 .with_body(
1443 r#"{
1444 "namespaces": [
1445 ["ns3"],
1446 ["ns4", "ns41"]
1447 ]
1448 }"#,
1449 )
1450 .create_async()
1451 .await;
1452
1453 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1454
1455 let namespaces = catalog.list_namespaces(None).await.unwrap();
1456
1457 let expected_ns = vec![
1458 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1459 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1460 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1461 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1462 ];
1463
1464 assert_eq!(expected_ns, namespaces);
1465
1466 config_mock.assert_async().await;
1467 list_ns_mock_page1.assert_async().await;
1468 list_ns_mock_page2.assert_async().await;
1469 }
1470
1471 #[tokio::test]
1472 async fn test_list_namespace_with_multiple_pages() {
1473 let mut server = Server::new_async().await;
1474
1475 let config_mock = create_config_mock(&mut server).await;
1476
1477 let list_ns_mock_page1 = server
1479 .mock("GET", "/v1/namespaces")
1480 .with_body(
1481 r#"{
1482 "namespaces": [
1483 ["ns1", "ns11"],
1484 ["ns2"]
1485 ],
1486 "next-page-token": "page2"
1487 }"#,
1488 )
1489 .create_async()
1490 .await;
1491
1492 let list_ns_mock_page2 = server
1494 .mock("GET", "/v1/namespaces?pageToken=page2")
1495 .with_body(
1496 r#"{
1497 "namespaces": [
1498 ["ns3"],
1499 ["ns4", "ns41"]
1500 ],
1501 "next-page-token": "page3"
1502 }"#,
1503 )
1504 .create_async()
1505 .await;
1506
1507 let list_ns_mock_page3 = server
1509 .mock("GET", "/v1/namespaces?pageToken=page3")
1510 .with_body(
1511 r#"{
1512 "namespaces": [
1513 ["ns5", "ns51", "ns511"]
1514 ],
1515 "next-page-token": "page4"
1516 }"#,
1517 )
1518 .create_async()
1519 .await;
1520
1521 let list_ns_mock_page4 = server
1523 .mock("GET", "/v1/namespaces?pageToken=page4")
1524 .with_body(
1525 r#"{
1526 "namespaces": [
1527 ["ns6"],
1528 ["ns7"]
1529 ],
1530 "next-page-token": "page5"
1531 }"#,
1532 )
1533 .create_async()
1534 .await;
1535
1536 let list_ns_mock_page5 = server
1538 .mock("GET", "/v1/namespaces?pageToken=page5")
1539 .with_body(
1540 r#"{
1541 "namespaces": [
1542 ["ns8", "ns81"]
1543 ]
1544 }"#,
1545 )
1546 .create_async()
1547 .await;
1548
1549 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1550
1551 let namespaces = catalog.list_namespaces(None).await.unwrap();
1552
1553 let expected_ns = vec![
1554 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1555 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1556 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1557 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1558 NamespaceIdent::from_vec(vec![
1559 "ns5".to_string(),
1560 "ns51".to_string(),
1561 "ns511".to_string(),
1562 ])
1563 .unwrap(),
1564 NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1565 NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1566 NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1567 ];
1568
1569 assert_eq!(expected_ns, namespaces);
1570
1571 config_mock.assert_async().await;
1573 list_ns_mock_page1.assert_async().await;
1574 list_ns_mock_page2.assert_async().await;
1575 list_ns_mock_page3.assert_async().await;
1576 list_ns_mock_page4.assert_async().await;
1577 list_ns_mock_page5.assert_async().await;
1578 }
1579
1580 #[tokio::test]
1581 async fn test_create_namespace() {
1582 let mut server = Server::new_async().await;
1583
1584 let config_mock = create_config_mock(&mut server).await;
1585
1586 let create_ns_mock = server
1587 .mock("POST", "/v1/namespaces")
1588 .with_body(
1589 r#"{
1590 "namespace": [ "ns1", "ns11"],
1591 "properties" : {
1592 "key1": "value1"
1593 }
1594 }"#,
1595 )
1596 .create_async()
1597 .await;
1598
1599 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1600
1601 let namespaces = catalog
1602 .create_namespace(
1603 &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1604 HashMap::from([("key1".to_string(), "value1".to_string())]),
1605 )
1606 .await
1607 .unwrap();
1608
1609 let expected_ns = Namespace::with_properties(
1610 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1611 HashMap::from([("key1".to_string(), "value1".to_string())]),
1612 );
1613
1614 assert_eq!(expected_ns, namespaces);
1615
1616 config_mock.assert_async().await;
1617 create_ns_mock.assert_async().await;
1618 }
1619
1620 #[tokio::test]
1621 async fn test_get_namespace() {
1622 let mut server = Server::new_async().await;
1623
1624 let config_mock = create_config_mock(&mut server).await;
1625
1626 let get_ns_mock = server
1627 .mock("GET", "/v1/namespaces/ns1")
1628 .with_body(
1629 r#"{
1630 "namespace": [ "ns1"],
1631 "properties" : {
1632 "key1": "value1"
1633 }
1634 }"#,
1635 )
1636 .create_async()
1637 .await;
1638
1639 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1640
1641 let namespaces = catalog
1642 .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1643 .await
1644 .unwrap();
1645
1646 let expected_ns = Namespace::with_properties(
1647 NamespaceIdent::new("ns1".to_string()),
1648 HashMap::from([("key1".to_string(), "value1".to_string())]),
1649 );
1650
1651 assert_eq!(expected_ns, namespaces);
1652
1653 config_mock.assert_async().await;
1654 get_ns_mock.assert_async().await;
1655 }
1656
1657 #[tokio::test]
1658 async fn check_namespace_exists() {
1659 let mut server = Server::new_async().await;
1660
1661 let config_mock = create_config_mock(&mut server).await;
1662
1663 let get_ns_mock = server
1664 .mock("HEAD", "/v1/namespaces/ns1")
1665 .with_status(204)
1666 .create_async()
1667 .await;
1668
1669 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1670
1671 assert!(
1672 catalog
1673 .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1674 .await
1675 .unwrap()
1676 );
1677
1678 config_mock.assert_async().await;
1679 get_ns_mock.assert_async().await;
1680 }
1681
1682 #[tokio::test]
1683 async fn test_drop_namespace() {
1684 let mut server = Server::new_async().await;
1685
1686 let config_mock = create_config_mock(&mut server).await;
1687
1688 let drop_ns_mock = server
1689 .mock("DELETE", "/v1/namespaces/ns1")
1690 .with_status(204)
1691 .create_async()
1692 .await;
1693
1694 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1695
1696 catalog
1697 .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1698 .await
1699 .unwrap();
1700
1701 config_mock.assert_async().await;
1702 drop_ns_mock.assert_async().await;
1703 }
1704
1705 #[tokio::test]
1706 async fn test_list_tables() {
1707 let mut server = Server::new_async().await;
1708
1709 let config_mock = create_config_mock(&mut server).await;
1710
1711 let list_tables_mock = server
1712 .mock("GET", "/v1/namespaces/ns1/tables")
1713 .with_status(200)
1714 .with_body(
1715 r#"{
1716 "identifiers": [
1717 {
1718 "namespace": ["ns1"],
1719 "name": "table1"
1720 },
1721 {
1722 "namespace": ["ns1"],
1723 "name": "table2"
1724 }
1725 ]
1726 }"#,
1727 )
1728 .create_async()
1729 .await;
1730
1731 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1732
1733 let tables = catalog
1734 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1735 .await
1736 .unwrap();
1737
1738 let expected_tables = vec![
1739 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1740 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1741 ];
1742
1743 assert_eq!(tables, expected_tables);
1744
1745 config_mock.assert_async().await;
1746 list_tables_mock.assert_async().await;
1747 }
1748
1749 #[tokio::test]
1750 async fn test_list_tables_with_pagination() {
1751 let mut server = Server::new_async().await;
1752
1753 let config_mock = create_config_mock(&mut server).await;
1754
1755 let list_tables_mock_page1 = server
1756 .mock("GET", "/v1/namespaces/ns1/tables")
1757 .with_status(200)
1758 .with_body(
1759 r#"{
1760 "identifiers": [
1761 {
1762 "namespace": ["ns1"],
1763 "name": "table1"
1764 },
1765 {
1766 "namespace": ["ns1"],
1767 "name": "table2"
1768 }
1769 ],
1770 "next-page-token": "token456"
1771 }"#,
1772 )
1773 .create_async()
1774 .await;
1775
1776 let list_tables_mock_page2 = server
1777 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1778 .with_status(200)
1779 .with_body(
1780 r#"{
1781 "identifiers": [
1782 {
1783 "namespace": ["ns1"],
1784 "name": "table3"
1785 },
1786 {
1787 "namespace": ["ns1"],
1788 "name": "table4"
1789 }
1790 ]
1791 }"#,
1792 )
1793 .create_async()
1794 .await;
1795
1796 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1797
1798 let tables = catalog
1799 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1800 .await
1801 .unwrap();
1802
1803 let expected_tables = vec![
1804 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1805 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1806 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1807 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1808 ];
1809
1810 assert_eq!(tables, expected_tables);
1811
1812 config_mock.assert_async().await;
1813 list_tables_mock_page1.assert_async().await;
1814 list_tables_mock_page2.assert_async().await;
1815 }
1816
1817 #[tokio::test]
1818 async fn test_list_tables_with_multiple_pages() {
1819 let mut server = Server::new_async().await;
1820
1821 let config_mock = create_config_mock(&mut server).await;
1822
1823 let list_tables_mock_page1 = server
1825 .mock("GET", "/v1/namespaces/ns1/tables")
1826 .with_status(200)
1827 .with_body(
1828 r#"{
1829 "identifiers": [
1830 {
1831 "namespace": ["ns1"],
1832 "name": "table1"
1833 },
1834 {
1835 "namespace": ["ns1"],
1836 "name": "table2"
1837 }
1838 ],
1839 "next-page-token": "page2"
1840 }"#,
1841 )
1842 .create_async()
1843 .await;
1844
1845 let list_tables_mock_page2 = server
1847 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
1848 .with_status(200)
1849 .with_body(
1850 r#"{
1851 "identifiers": [
1852 {
1853 "namespace": ["ns1"],
1854 "name": "table3"
1855 },
1856 {
1857 "namespace": ["ns1"],
1858 "name": "table4"
1859 }
1860 ],
1861 "next-page-token": "page3"
1862 }"#,
1863 )
1864 .create_async()
1865 .await;
1866
1867 let list_tables_mock_page3 = server
1869 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
1870 .with_status(200)
1871 .with_body(
1872 r#"{
1873 "identifiers": [
1874 {
1875 "namespace": ["ns1"],
1876 "name": "table5"
1877 }
1878 ],
1879 "next-page-token": "page4"
1880 }"#,
1881 )
1882 .create_async()
1883 .await;
1884
1885 let list_tables_mock_page4 = server
1887 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
1888 .with_status(200)
1889 .with_body(
1890 r#"{
1891 "identifiers": [
1892 {
1893 "namespace": ["ns1"],
1894 "name": "table6"
1895 },
1896 {
1897 "namespace": ["ns1"],
1898 "name": "table7"
1899 }
1900 ],
1901 "next-page-token": "page5"
1902 }"#,
1903 )
1904 .create_async()
1905 .await;
1906
1907 let list_tables_mock_page5 = server
1909 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
1910 .with_status(200)
1911 .with_body(
1912 r#"{
1913 "identifiers": [
1914 {
1915 "namespace": ["ns1"],
1916 "name": "table8"
1917 }
1918 ]
1919 }"#,
1920 )
1921 .create_async()
1922 .await;
1923
1924 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1925
1926 let tables = catalog
1927 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1928 .await
1929 .unwrap();
1930
1931 let expected_tables = vec![
1932 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1933 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1934 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1935 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1936 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
1937 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
1938 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
1939 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
1940 ];
1941
1942 assert_eq!(tables, expected_tables);
1943
1944 config_mock.assert_async().await;
1946 list_tables_mock_page1.assert_async().await;
1947 list_tables_mock_page2.assert_async().await;
1948 list_tables_mock_page3.assert_async().await;
1949 list_tables_mock_page4.assert_async().await;
1950 list_tables_mock_page5.assert_async().await;
1951 }
1952
1953 #[tokio::test]
1954 async fn test_drop_tables() {
1955 let mut server = Server::new_async().await;
1956
1957 let config_mock = create_config_mock(&mut server).await;
1958
1959 let delete_table_mock = server
1960 .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
1961 .with_status(204)
1962 .create_async()
1963 .await;
1964
1965 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1966
1967 catalog
1968 .drop_table(&TableIdent::new(
1969 NamespaceIdent::new("ns1".to_string()),
1970 "table1".to_string(),
1971 ))
1972 .await
1973 .unwrap();
1974
1975 config_mock.assert_async().await;
1976 delete_table_mock.assert_async().await;
1977 }
1978
1979 #[tokio::test]
1980 async fn test_check_table_exists() {
1981 let mut server = Server::new_async().await;
1982
1983 let config_mock = create_config_mock(&mut server).await;
1984
1985 let check_table_exists_mock = server
1986 .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
1987 .with_status(204)
1988 .create_async()
1989 .await;
1990
1991 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
1992
1993 assert!(
1994 catalog
1995 .table_exists(&TableIdent::new(
1996 NamespaceIdent::new("ns1".to_string()),
1997 "table1".to_string(),
1998 ))
1999 .await
2000 .unwrap()
2001 );
2002
2003 config_mock.assert_async().await;
2004 check_table_exists_mock.assert_async().await;
2005 }
2006
2007 #[tokio::test]
2008 async fn test_rename_table() {
2009 let mut server = Server::new_async().await;
2010
2011 let config_mock = create_config_mock(&mut server).await;
2012
2013 let rename_table_mock = server
2014 .mock("POST", "/v1/tables/rename")
2015 .with_status(204)
2016 .create_async()
2017 .await;
2018
2019 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2020
2021 catalog
2022 .rename_table(
2023 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2024 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2025 )
2026 .await
2027 .unwrap();
2028
2029 config_mock.assert_async().await;
2030 rename_table_mock.assert_async().await;
2031 }
2032
2033 #[tokio::test]
2034 async fn test_load_table() {
2035 let mut server = Server::new_async().await;
2036
2037 let config_mock = create_config_mock(&mut server).await;
2038
2039 let rename_table_mock = server
2040 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2041 .with_status(200)
2042 .with_body_from_file(format!(
2043 "{}/testdata/{}",
2044 env!("CARGO_MANIFEST_DIR"),
2045 "load_table_response.json"
2046 ))
2047 .create_async()
2048 .await;
2049
2050 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2051
2052 let table = catalog
2053 .load_table(&TableIdent::new(
2054 NamespaceIdent::new("ns1".to_string()),
2055 "test1".to_string(),
2056 ))
2057 .await
2058 .unwrap();
2059
2060 assert_eq!(
2061 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2062 table.identifier()
2063 );
2064 assert_eq!(
2065 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2066 table.metadata_location().unwrap()
2067 );
2068 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2069 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2070 assert_eq!(
2071 uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2072 table.metadata().uuid()
2073 );
2074 assert_eq!(
2075 Utc.timestamp_millis_opt(1646787054459).unwrap(),
2076 table.metadata().last_updated_timestamp().unwrap()
2077 );
2078 assert_eq!(
2079 vec![&Arc::new(
2080 Schema::builder()
2081 .with_fields(vec![
2082 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2083 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2084 .into(),
2085 ])
2086 .build()
2087 .unwrap()
2088 )],
2089 table.metadata().schemas_iter().collect::<Vec<_>>()
2090 );
2091 assert_eq!(
2092 &HashMap::from([
2093 ("owner".to_string(), "bryan".to_string()),
2094 (
2095 "write.metadata.compression-codec".to_string(),
2096 "gzip".to_string()
2097 )
2098 ]),
2099 table.metadata().properties()
2100 );
2101 assert_eq!(vec![&Arc::new(Snapshot::builder()
2102 .with_snapshot_id(3497810964824022504)
2103 .with_timestamp_ms(1646787054459)
2104 .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2105 .with_sequence_number(0)
2106 .with_schema_id(0)
2107 .with_summary(Summary {
2108 operation: Operation::Append,
2109 additional_properties: HashMap::from_iter([
2110 ("spark.app.id", "local-1646787004168"),
2111 ("added-data-files", "1"),
2112 ("added-records", "1"),
2113 ("added-files-size", "697"),
2114 ("changed-partition-count", "1"),
2115 ("total-records", "1"),
2116 ("total-files-size", "697"),
2117 ("total-data-files", "1"),
2118 ("total-delete-files", "0"),
2119 ("total-position-deletes", "0"),
2120 ("total-equality-deletes", "0")
2121 ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2122 }).build()
2123 )], table.metadata().snapshots().collect::<Vec<_>>());
2124 assert_eq!(
2125 &[SnapshotLog {
2126 timestamp_ms: 1646787054459,
2127 snapshot_id: 3497810964824022504,
2128 }],
2129 table.metadata().history()
2130 );
2131 assert_eq!(
2132 vec![&Arc::new(SortOrder {
2133 order_id: 0,
2134 fields: vec![],
2135 })],
2136 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2137 );
2138
2139 config_mock.assert_async().await;
2140 rename_table_mock.assert_async().await;
2141 }
2142
2143 #[tokio::test]
2144 async fn test_load_table_404() {
2145 let mut server = Server::new_async().await;
2146
2147 let config_mock = create_config_mock(&mut server).await;
2148
2149 let rename_table_mock = server
2150 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2151 .with_status(404)
2152 .with_body(r#"
2153{
2154 "error": {
2155 "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2156 "type": "NoSuchNamespaceErrorException",
2157 "code": 404
2158 }
2159}
2160 "#)
2161 .create_async()
2162 .await;
2163
2164 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2165
2166 let table = catalog
2167 .load_table(&TableIdent::new(
2168 NamespaceIdent::new("ns1".to_string()),
2169 "test1".to_string(),
2170 ))
2171 .await;
2172
2173 assert!(table.is_err());
2174 assert!(table.err().unwrap().message().contains("does not exist"));
2175
2176 config_mock.assert_async().await;
2177 rename_table_mock.assert_async().await;
2178 }
2179
2180 #[tokio::test]
2181 async fn test_create_table() {
2182 let mut server = Server::new_async().await;
2183
2184 let config_mock = create_config_mock(&mut server).await;
2185
2186 let create_table_mock = server
2187 .mock("POST", "/v1/namespaces/ns1/tables")
2188 .with_status(200)
2189 .with_body_from_file(format!(
2190 "{}/testdata/{}",
2191 env!("CARGO_MANIFEST_DIR"),
2192 "create_table_response.json"
2193 ))
2194 .create_async()
2195 .await;
2196
2197 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2198
2199 let table_creation = TableCreation::builder()
2200 .name("test1".to_string())
2201 .schema(
2202 Schema::builder()
2203 .with_fields(vec![
2204 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2205 .into(),
2206 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2207 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2208 .into(),
2209 ])
2210 .with_schema_id(1)
2211 .with_identifier_field_ids(vec![2])
2212 .build()
2213 .unwrap(),
2214 )
2215 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2216 .partition_spec(
2217 UnboundPartitionSpec::builder()
2218 .add_partition_fields(vec![
2219 UnboundPartitionField::builder()
2220 .source_id(1)
2221 .transform(Transform::Truncate(3))
2222 .name("id".to_string())
2223 .build(),
2224 ])
2225 .unwrap()
2226 .build(),
2227 )
2228 .sort_order(
2229 SortOrder::builder()
2230 .with_sort_field(
2231 SortField::builder()
2232 .source_id(2)
2233 .transform(Transform::Identity)
2234 .direction(SortDirection::Ascending)
2235 .null_order(NullOrder::First)
2236 .build(),
2237 )
2238 .build_unbound()
2239 .unwrap(),
2240 )
2241 .build();
2242
2243 let table = catalog
2244 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2245 .await
2246 .unwrap();
2247
2248 assert_eq!(
2249 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2250 table.identifier()
2251 );
2252 assert_eq!(
2253 "s3://warehouse/database/table/metadata.json",
2254 table.metadata_location().unwrap()
2255 );
2256 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2257 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2258 assert_eq!(
2259 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2260 table.metadata().uuid()
2261 );
2262 assert_eq!(
2263 1657810967051,
2264 table
2265 .metadata()
2266 .last_updated_timestamp()
2267 .unwrap()
2268 .timestamp_millis()
2269 );
2270 assert_eq!(
2271 vec![&Arc::new(
2272 Schema::builder()
2273 .with_fields(vec![
2274 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2275 .into(),
2276 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2277 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2278 .into(),
2279 ])
2280 .with_schema_id(0)
2281 .with_identifier_field_ids(vec![2])
2282 .build()
2283 .unwrap()
2284 )],
2285 table.metadata().schemas_iter().collect::<Vec<_>>()
2286 );
2287 assert_eq!(
2288 &HashMap::from([
2289 (
2290 "write.delete.parquet.compression-codec".to_string(),
2291 "zstd".to_string()
2292 ),
2293 (
2294 "write.metadata.compression-codec".to_string(),
2295 "gzip".to_string()
2296 ),
2297 (
2298 "write.summary.partition-limit".to_string(),
2299 "100".to_string()
2300 ),
2301 (
2302 "write.parquet.compression-codec".to_string(),
2303 "zstd".to_string()
2304 ),
2305 ]),
2306 table.metadata().properties()
2307 );
2308 assert!(table.metadata().current_snapshot().is_none());
2309 assert!(table.metadata().history().is_empty());
2310 assert_eq!(
2311 vec![&Arc::new(SortOrder {
2312 order_id: 0,
2313 fields: vec![],
2314 })],
2315 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2316 );
2317
2318 config_mock.assert_async().await;
2319 create_table_mock.assert_async().await;
2320 }
2321
2322 #[tokio::test]
2323 async fn test_create_table_409() {
2324 let mut server = Server::new_async().await;
2325
2326 let config_mock = create_config_mock(&mut server).await;
2327
2328 let create_table_mock = server
2329 .mock("POST", "/v1/namespaces/ns1/tables")
2330 .with_status(409)
2331 .with_body(r#"
2332{
2333 "error": {
2334 "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2335 "type": "AlreadyExistsException",
2336 "code": 409
2337 }
2338}
2339 "#)
2340 .create_async()
2341 .await;
2342
2343 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2344
2345 let table_creation = TableCreation::builder()
2346 .name("test1".to_string())
2347 .schema(
2348 Schema::builder()
2349 .with_fields(vec![
2350 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2351 .into(),
2352 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2353 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2354 .into(),
2355 ])
2356 .with_schema_id(1)
2357 .with_identifier_field_ids(vec![2])
2358 .build()
2359 .unwrap(),
2360 )
2361 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2362 .build();
2363
2364 let table_result = catalog
2365 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2366 .await;
2367
2368 assert!(table_result.is_err());
2369 assert!(
2370 table_result
2371 .err()
2372 .unwrap()
2373 .message()
2374 .contains("already exists")
2375 );
2376
2377 config_mock.assert_async().await;
2378 create_table_mock.assert_async().await;
2379 }
2380
2381 #[tokio::test]
2382 async fn test_update_table() {
2383 let mut server = Server::new_async().await;
2384
2385 let config_mock = create_config_mock(&mut server).await;
2386
2387 let load_table_mock = server
2388 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2389 .with_status(200)
2390 .with_body_from_file(format!(
2391 "{}/testdata/{}",
2392 env!("CARGO_MANIFEST_DIR"),
2393 "load_table_response.json"
2394 ))
2395 .create_async()
2396 .await;
2397
2398 let update_table_mock = server
2399 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2400 .with_status(200)
2401 .with_body_from_file(format!(
2402 "{}/testdata/{}",
2403 env!("CARGO_MANIFEST_DIR"),
2404 "update_table_response.json"
2405 ))
2406 .create_async()
2407 .await;
2408
2409 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2410
2411 let table1 = {
2412 let file = File::open(format!(
2413 "{}/testdata/{}",
2414 env!("CARGO_MANIFEST_DIR"),
2415 "create_table_response.json"
2416 ))
2417 .unwrap();
2418 let reader = BufReader::new(file);
2419 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2420
2421 Table::builder()
2422 .metadata(resp.metadata)
2423 .metadata_location(resp.metadata_location.unwrap())
2424 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2425 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2426 .build()
2427 .unwrap()
2428 };
2429
2430 let tx = Transaction::new(&table1);
2431 let table = tx
2432 .upgrade_table_version()
2433 .set_format_version(FormatVersion::V2)
2434 .apply(tx)
2435 .unwrap()
2436 .commit(&catalog)
2437 .await
2438 .unwrap();
2439
2440 assert_eq!(
2441 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2442 table.identifier()
2443 );
2444 assert_eq!(
2445 "s3://warehouse/database/table/metadata.json",
2446 table.metadata_location().unwrap()
2447 );
2448 assert_eq!(FormatVersion::V2, table.metadata().format_version());
2449 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2450 assert_eq!(
2451 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2452 table.metadata().uuid()
2453 );
2454 assert_eq!(
2455 1657810967051,
2456 table
2457 .metadata()
2458 .last_updated_timestamp()
2459 .unwrap()
2460 .timestamp_millis()
2461 );
2462 assert_eq!(
2463 vec![&Arc::new(
2464 Schema::builder()
2465 .with_fields(vec![
2466 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2467 .into(),
2468 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2469 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2470 .into(),
2471 ])
2472 .with_schema_id(0)
2473 .with_identifier_field_ids(vec![2])
2474 .build()
2475 .unwrap()
2476 )],
2477 table.metadata().schemas_iter().collect::<Vec<_>>()
2478 );
2479 assert_eq!(
2480 &HashMap::from([
2481 (
2482 "write.delete.parquet.compression-codec".to_string(),
2483 "zstd".to_string()
2484 ),
2485 (
2486 "write.metadata.compression-codec".to_string(),
2487 "gzip".to_string()
2488 ),
2489 (
2490 "write.summary.partition-limit".to_string(),
2491 "100".to_string()
2492 ),
2493 (
2494 "write.parquet.compression-codec".to_string(),
2495 "zstd".to_string()
2496 ),
2497 ]),
2498 table.metadata().properties()
2499 );
2500 assert!(table.metadata().current_snapshot().is_none());
2501 assert!(table.metadata().history().is_empty());
2502 assert_eq!(
2503 vec![&Arc::new(SortOrder {
2504 order_id: 0,
2505 fields: vec![],
2506 })],
2507 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2508 );
2509
2510 config_mock.assert_async().await;
2511 update_table_mock.assert_async().await;
2512 load_table_mock.assert_async().await
2513 }
2514
2515 #[tokio::test]
2516 async fn test_update_table_404() {
2517 let mut server = Server::new_async().await;
2518
2519 let config_mock = create_config_mock(&mut server).await;
2520
2521 let load_table_mock = server
2522 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2523 .with_status(200)
2524 .with_body_from_file(format!(
2525 "{}/testdata/{}",
2526 env!("CARGO_MANIFEST_DIR"),
2527 "load_table_response.json"
2528 ))
2529 .create_async()
2530 .await;
2531
2532 let update_table_mock = server
2533 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2534 .with_status(404)
2535 .with_body(
2536 r#"
2537{
2538 "error": {
2539 "message": "The given table does not exist",
2540 "type": "NoSuchTableException",
2541 "code": 404
2542 }
2543}
2544 "#,
2545 )
2546 .create_async()
2547 .await;
2548
2549 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2550
2551 let table1 = {
2552 let file = File::open(format!(
2553 "{}/testdata/{}",
2554 env!("CARGO_MANIFEST_DIR"),
2555 "create_table_response.json"
2556 ))
2557 .unwrap();
2558 let reader = BufReader::new(file);
2559 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2560
2561 Table::builder()
2562 .metadata(resp.metadata)
2563 .metadata_location(resp.metadata_location.unwrap())
2564 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2565 .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
2566 .build()
2567 .unwrap()
2568 };
2569
2570 let tx = Transaction::new(&table1);
2571 let table_result = tx
2572 .upgrade_table_version()
2573 .set_format_version(FormatVersion::V2)
2574 .apply(tx)
2575 .unwrap()
2576 .commit(&catalog)
2577 .await;
2578
2579 assert!(table_result.is_err());
2580 assert!(
2581 table_result
2582 .err()
2583 .unwrap()
2584 .message()
2585 .contains("does not exist")
2586 );
2587
2588 config_mock.assert_async().await;
2589 update_table_mock.assert_async().await;
2590 load_table_mock.assert_async().await;
2591 }
2592
2593 #[tokio::test]
2594 async fn test_register_table() {
2595 let mut server = Server::new_async().await;
2596
2597 let config_mock = create_config_mock(&mut server).await;
2598
2599 let register_table_mock = server
2600 .mock("POST", "/v1/namespaces/ns1/register")
2601 .with_status(200)
2602 .with_body_from_file(format!(
2603 "{}/testdata/{}",
2604 env!("CARGO_MANIFEST_DIR"),
2605 "load_table_response.json"
2606 ))
2607 .create_async()
2608 .await;
2609
2610 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2611 let table_ident =
2612 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2613 let metadata_location = String::from(
2614 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2615 );
2616
2617 let table = catalog
2618 .register_table(&table_ident, metadata_location)
2619 .await
2620 .unwrap();
2621
2622 assert_eq!(
2623 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2624 table.identifier()
2625 );
2626 assert_eq!(
2627 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2628 table.metadata_location().unwrap()
2629 );
2630
2631 config_mock.assert_async().await;
2632 register_table_mock.assert_async().await;
2633 }
2634
2635 #[tokio::test]
2636 async fn test_register_table_404() {
2637 let mut server = Server::new_async().await;
2638
2639 let config_mock = create_config_mock(&mut server).await;
2640
2641 let register_table_mock = server
2642 .mock("POST", "/v1/namespaces/ns1/register")
2643 .with_status(404)
2644 .with_body(
2645 r#"
2646{
2647 "error": {
2648 "message": "The namespace specified does not exist",
2649 "type": "NoSuchNamespaceErrorException",
2650 "code": 404
2651 }
2652}
2653 "#,
2654 )
2655 .create_async()
2656 .await;
2657
2658 let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2659
2660 let table_ident =
2661 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2662 let metadata_location = String::from(
2663 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2664 );
2665 let table = catalog
2666 .register_table(&table_ident, metadata_location)
2667 .await;
2668
2669 assert!(table.is_err());
2670 assert!(table.err().unwrap().message().contains("does not exist"));
2671
2672 config_mock.assert_async().await;
2673 register_table_mock.assert_async().await;
2674 }
2675
2676 #[tokio::test]
2677 async fn test_create_rest_catalog() {
2678 let builder = RestCatalogBuilder::default().with_client(Client::new());
2679
2680 let catalog = builder
2681 .load(
2682 "test",
2683 HashMap::from([
2684 (
2685 REST_CATALOG_PROP_URI.to_string(),
2686 "http://localhost:8080".to_string(),
2687 ),
2688 ("a".to_string(), "b".to_string()),
2689 ]),
2690 )
2691 .await;
2692
2693 assert!(catalog.is_ok());
2694
2695 let catalog_config = catalog.unwrap().user_config;
2696 assert_eq!(catalog_config.name.as_deref(), Some("test"));
2697 assert_eq!(catalog_config.uri, "http://localhost:8080");
2698 assert_eq!(catalog_config.warehouse, None);
2699 assert!(catalog_config.client.is_some());
2700
2701 assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2702 assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2703 }
2704
2705 #[tokio::test]
2706 async fn test_create_rest_catalog_no_uri() {
2707 let builder = RestCatalogBuilder::default();
2708
2709 let catalog = builder
2710 .load(
2711 "test",
2712 HashMap::from([(
2713 REST_CATALOG_PROP_WAREHOUSE.to_string(),
2714 "s3://warehouse".to_string(),
2715 )]),
2716 )
2717 .await;
2718
2719 assert!(catalog.is_err());
2720 if let Err(err) = catalog {
2721 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2722 assert_eq!(err.message(), "Catalog uri is required");
2723 }
2724 }
2725}