1use std::collections::HashMap;
21use std::future::Future;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
27use iceberg::table::Table;
28use iceberg::{
29 Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, Runtime,
30 TableCommit, TableCreation, TableIdent,
31};
32use itertools::Itertools;
33use reqwest::header::{
34 HeaderMap, HeaderName, HeaderValue, {self},
35};
36use reqwest::{Client, Method, StatusCode, Url};
37use tokio::sync::OnceCell;
38use typed_builder::TypedBuilder;
39
40use crate::client::{
41 HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
42};
43use crate::types::{
44 CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
45 CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
46 NamespaceResponse, RegisterTableRequest, RenameTableRequest,
47};
48
49pub const REST_CATALOG_PROP_URI: &str = "uri";
51pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
53pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction";
55
56const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
57const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
58const PATH_V1: &str = "v1";
59
60#[derive(Debug)]
62pub struct RestCatalogBuilder {
63 config: RestCatalogConfig,
64 storage_factory: Option<Arc<dyn StorageFactory>>,
65 runtime: Option<Runtime>,
66}
67
68impl Default for RestCatalogBuilder {
69 fn default() -> Self {
70 Self {
71 config: RestCatalogConfig {
72 name: None,
73 uri: "".to_string(),
74 warehouse: None,
75 props: HashMap::new(),
76 client: None,
77 },
78 storage_factory: None,
79 runtime: None,
80 }
81 }
82}
83
84impl CatalogBuilder for RestCatalogBuilder {
85 type C = RestCatalog;
86
87 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
88 self.storage_factory = Some(storage_factory);
89 self
90 }
91
92 fn with_runtime(mut self, runtime: Runtime) -> Self {
93 self.runtime = Some(runtime);
94 self
95 }
96
97 fn load(
98 mut self,
99 name: impl Into<String>,
100 props: HashMap<String, String>,
101 ) -> impl Future<Output = Result<Self::C>> + Send {
102 self.config.name = Some(name.into());
103
104 if props.contains_key(REST_CATALOG_PROP_URI) {
105 self.config.uri = props
106 .get(REST_CATALOG_PROP_URI)
107 .cloned()
108 .unwrap_or_default();
109 }
110
111 if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) {
112 self.config.warehouse = props.get(REST_CATALOG_PROP_WAREHOUSE).cloned()
113 }
114
115 self.config.props = props
117 .into_iter()
118 .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE)
119 .collect();
120
121 let result = {
122 if self.config.name.is_none() {
123 Err(Error::new(
124 ErrorKind::DataInvalid,
125 "Catalog name is required",
126 ))
127 } else if self.config.uri.is_empty() {
128 Err(Error::new(
129 ErrorKind::DataInvalid,
130 "Catalog uri is required",
131 ))
132 } else {
133 let runtime = self.runtime.unwrap_or_else(Runtime::current);
134 Ok(RestCatalog::new(self.config, self.storage_factory, runtime))
135 }
136 };
137
138 std::future::ready(result)
139 }
140}
141
142impl RestCatalogBuilder {
143 pub fn with_client(mut self, client: Client) -> Self {
145 self.config.client = Some(client);
146 self
147 }
148}
149
150#[derive(Clone, Debug, TypedBuilder)]
152pub(crate) struct RestCatalogConfig {
153 #[builder(default, setter(strip_option))]
154 name: Option<String>,
155
156 uri: String,
157
158 #[builder(default, setter(strip_option(fallback = warehouse_opt)))]
159 warehouse: Option<String>,
160
161 #[builder(default)]
162 props: HashMap<String, String>,
163
164 #[builder(default)]
165 client: Option<Client>,
166}
167
168impl RestCatalogConfig {
169 fn url_prefixed(&self, parts: &[&str]) -> String {
170 [&self.uri, PATH_V1]
171 .into_iter()
172 .chain(self.props.get("prefix").map(|s| &**s))
173 .chain(parts.iter().cloned())
174 .join("/")
175 }
176
177 fn config_endpoint(&self) -> String {
178 [&self.uri, PATH_V1, "config"].join("/")
179 }
180
181 pub(crate) fn get_token_endpoint(&self) -> String {
182 if let Some(oauth2_uri) = self.props.get("oauth2-server-uri") {
183 oauth2_uri.to_string()
184 } else {
185 [&self.uri, PATH_V1, "oauth", "tokens"].join("/")
186 }
187 }
188
189 fn namespaces_endpoint(&self) -> String {
190 self.url_prefixed(&["namespaces"])
191 }
192
193 fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
194 self.url_prefixed(&["namespaces", &ns.to_url_string()])
195 }
196
197 fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
198 self.url_prefixed(&["namespaces", &ns.to_url_string(), "tables"])
199 }
200
201 fn rename_table_endpoint(&self) -> String {
202 self.url_prefixed(&["tables", "rename"])
203 }
204
205 fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
206 self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
207 }
208
209 fn table_endpoint(&self, table: &TableIdent) -> String {
210 self.url_prefixed(&[
211 "namespaces",
212 &table.namespace.to_url_string(),
213 "tables",
214 &table.name,
215 ])
216 }
217
218 pub(crate) fn client(&self) -> Option<Client> {
220 self.client.clone()
221 }
222
223 pub(crate) fn token(&self) -> Option<String> {
227 self.props.get("token").cloned()
228 }
229
230 pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
239 let cred = self.props.get("credential")?;
240
241 match cred.split_once(':') {
242 Some((client_id, client_secret)) => {
243 Some((Some(client_id.to_string()), client_secret.to_string()))
244 }
245 None => Some((None, cred.to_string())),
246 }
247 }
248
249 pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
256 let mut headers = HeaderMap::from_iter([
257 (
258 header::CONTENT_TYPE,
259 HeaderValue::from_static("application/json"),
260 ),
261 (
262 HeaderName::from_static("x-client-version"),
263 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
264 ),
265 (
266 header::USER_AGENT,
267 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
268 ),
269 ]);
270
271 for (key, value) in self
272 .props
273 .iter()
274 .filter_map(|(k, v)| k.strip_prefix("header.").map(|k| (k, v)))
275 {
276 headers.insert(
277 HeaderName::from_str(key).map_err(|e| {
278 Error::new(
279 ErrorKind::DataInvalid,
280 format!("Invalid header name: {key}"),
281 )
282 .with_source(e)
283 })?,
284 HeaderValue::from_str(value).map_err(|e| {
285 Error::new(
286 ErrorKind::DataInvalid,
287 format!("Invalid header value: {value}"),
288 )
289 .with_source(e)
290 })?,
291 );
292 }
293
294 Ok(headers)
295 }
296
297 pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
299 let mut params = HashMap::new();
300
301 if let Some(scope) = self.props.get("scope") {
302 params.insert("scope".to_string(), scope.to_string());
303 } else {
304 params.insert("scope".to_string(), "catalog".to_string());
305 }
306
307 let optional_params = ["audience", "resource"];
308 for param_name in optional_params {
309 if let Some(value) = self.props.get(param_name) {
310 params.insert(param_name.to_string(), value.to_string());
311 }
312 }
313
314 params
315 }
316
317 pub(crate) fn disable_header_redaction(&self) -> bool {
322 self.props
323 .get(REST_CATALOG_PROP_DISABLE_HEADER_REDACTION)
324 .map(|v| v.eq_ignore_ascii_case("true"))
325 .unwrap_or(false)
326 }
327
328 pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) -> Self {
330 if let Some(uri) = config.overrides.remove("uri") {
331 self.uri = uri;
332 }
333
334 let mut props = config.defaults;
335 props.extend(self.props);
336 props.extend(config.overrides);
337
338 self.props = props;
339 self
340 }
341}
342
343#[derive(Debug)]
344struct RestContext {
345 client: HttpClient,
346 config: RestCatalogConfig,
350}
351
352#[derive(Debug)]
354pub struct RestCatalog {
355 user_config: RestCatalogConfig,
359 ctx: OnceCell<RestContext>,
360 storage_factory: Option<Arc<dyn StorageFactory>>,
362 runtime: Runtime,
363}
364
365impl RestCatalog {
366 fn new(
368 config: RestCatalogConfig,
369 storage_factory: Option<Arc<dyn StorageFactory>>,
370 runtime: Runtime,
371 ) -> Self {
372 Self {
373 user_config: config,
374 ctx: OnceCell::new(),
375 storage_factory,
376 runtime,
377 }
378 }
379
380 async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> {
382 let context = self.context().await?;
383
384 let mut request_builder = context
385 .client
386 .request(Method::DELETE, context.config.table_endpoint(table));
387
388 if purge {
389 request_builder = request_builder.query(&[("purgeRequested", "true")]);
390 }
391
392 let request = request_builder.build()?;
393 let http_response = context.client.query_catalog(request).await?;
394
395 match http_response.status() {
396 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
397 StatusCode::NOT_FOUND => Err(Error::new(
398 ErrorKind::TableNotFound,
399 "Tried to drop a table that does not exist",
400 )),
401 _ => Err(deserialize_unexpected_catalog_error(
402 http_response,
403 context.client.disable_header_redaction(),
404 )
405 .await),
406 }
407 }
408
409 async fn context(&self) -> Result<&RestContext> {
411 self.ctx
412 .get_or_try_init(|| async {
413 let client = HttpClient::new(&self.user_config)?;
414 let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
415 let config = self.user_config.clone().merge_with_config(catalog_config);
416 let client = client.update_with(&config)?;
417
418 Ok(RestContext { config, client })
419 })
420 .await
421 }
422
423 async fn load_config(
427 client: &HttpClient,
428 user_config: &RestCatalogConfig,
429 ) -> Result<CatalogConfig> {
430 let mut request_builder = client.request(Method::GET, user_config.config_endpoint());
431
432 if let Some(warehouse_location) = &user_config.warehouse {
433 request_builder = request_builder.query(&[("warehouse", warehouse_location)]);
434 }
435
436 let request = request_builder.build()?;
437
438 let http_response = client.query_catalog(request).await?;
439
440 match http_response.status() {
441 StatusCode::OK => deserialize_catalog_response(http_response).await,
442 _ => Err(deserialize_unexpected_catalog_error(
443 http_response,
444 client.disable_header_redaction(),
445 )
446 .await),
447 }
448 }
449
450 async fn load_file_io(
451 &self,
452 metadata_location: Option<&str>,
453 extra_config: Option<HashMap<String, String>>,
454 ) -> Result<FileIO> {
455 let mut props = self.context().await?.config.props.clone();
456 if let Some(config) = extra_config {
457 props.extend(config);
458 }
459
460 let warehouse_path = match self.context().await?.config.warehouse.as_deref() {
463 Some(url) if Url::parse(url).is_ok() => Some(url),
464 Some(_) => None,
465 None => None,
466 };
467
468 if metadata_location.or(warehouse_path).is_none() {
469 return Err(Error::new(
470 ErrorKind::Unexpected,
471 "Unable to load file io, neither warehouse nor metadata location is set!",
472 ));
473 }
474
475 let factory = self
477 .storage_factory
478 .clone()
479 .ok_or_else(|| {
480 Error::new(
481 ErrorKind::Unexpected,
482 "StorageFactory must be provided for RestCatalog. Use `with_storage_factory` to configure it.",
483 )
484 })?;
485
486 let file_io = FileIOBuilder::new(factory).with_props(props).build();
487
488 Ok(file_io)
489 }
490
491 pub async fn invalidate_token(&self) -> Result<()> {
494 self.context().await?.client.invalidate_token().await
495 }
496
497 pub async fn regenerate_token(&self) -> Result<()> {
504 self.context().await?.client.regenerate_token().await
505 }
506}
507
508#[async_trait]
511impl Catalog for RestCatalog {
512 async fn list_namespaces(
513 &self,
514 parent: Option<&NamespaceIdent>,
515 ) -> Result<Vec<NamespaceIdent>> {
516 let context = self.context().await?;
517 let endpoint = context.config.namespaces_endpoint();
518 let mut namespaces = Vec::new();
519 let mut next_token = None;
520
521 loop {
522 let mut request = context.client.request(Method::GET, endpoint.clone());
523
524 if let Some(ns) = parent {
526 request = request.query(&[("parent", ns.to_url_string())]);
527 }
528
529 if let Some(token) = next_token {
530 request = request.query(&[("pageToken", token)]);
531 }
532
533 let http_response = context.client.query_catalog(request.build()?).await?;
534
535 match http_response.status() {
536 StatusCode::OK => {
537 let response =
538 deserialize_catalog_response::<ListNamespaceResponse>(http_response)
539 .await?;
540
541 namespaces.extend(response.namespaces);
542
543 match response.next_page_token {
544 Some(token) => next_token = Some(token),
545 None => break,
546 }
547 }
548 StatusCode::NOT_FOUND => {
549 return Err(Error::new(
550 ErrorKind::NamespaceNotFound,
551 "The parent parameter of the namespace provided does not exist",
552 ));
553 }
554 _ => {
555 return Err(deserialize_unexpected_catalog_error(
556 http_response,
557 context.client.disable_header_redaction(),
558 )
559 .await);
560 }
561 }
562 }
563
564 Ok(namespaces)
565 }
566
567 async fn create_namespace(
568 &self,
569 namespace: &NamespaceIdent,
570 properties: HashMap<String, String>,
571 ) -> Result<Namespace> {
572 let context = self.context().await?;
573
574 let request = context
575 .client
576 .request(Method::POST, context.config.namespaces_endpoint())
577 .json(&CreateNamespaceRequest {
578 namespace: namespace.clone(),
579 properties,
580 })
581 .build()?;
582
583 let http_response = context.client.query_catalog(request).await?;
584
585 match http_response.status() {
586 StatusCode::OK => {
587 let response =
588 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
589 Ok(Namespace::from(response))
590 }
591 StatusCode::CONFLICT => Err(Error::new(
592 ErrorKind::NamespaceAlreadyExists,
593 "Tried to create a namespace that already exists",
594 )),
595 _ => Err(deserialize_unexpected_catalog_error(
596 http_response,
597 context.client.disable_header_redaction(),
598 )
599 .await),
600 }
601 }
602
603 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
604 let context = self.context().await?;
605
606 let request = context
607 .client
608 .request(Method::GET, context.config.namespace_endpoint(namespace))
609 .build()?;
610
611 let http_response = context.client.query_catalog(request).await?;
612
613 match http_response.status() {
614 StatusCode::OK => {
615 let response =
616 deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
617 Ok(Namespace::from(response))
618 }
619 StatusCode::NOT_FOUND => Err(Error::new(
620 ErrorKind::NamespaceNotFound,
621 "Tried to get a namespace that does not exist",
622 )),
623 _ => Err(deserialize_unexpected_catalog_error(
624 http_response,
625 context.client.disable_header_redaction(),
626 )
627 .await),
628 }
629 }
630
631 async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
632 let context = self.context().await?;
633
634 let request = context
635 .client
636 .request(Method::HEAD, context.config.namespace_endpoint(ns))
637 .build()?;
638
639 let http_response = context.client.query_catalog(request).await?;
640
641 match http_response.status() {
642 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
643 StatusCode::NOT_FOUND => Ok(false),
644 _ => Err(deserialize_unexpected_catalog_error(
645 http_response,
646 context.client.disable_header_redaction(),
647 )
648 .await),
649 }
650 }
651
652 async fn update_namespace(
653 &self,
654 _namespace: &NamespaceIdent,
655 _properties: HashMap<String, String>,
656 ) -> Result<()> {
657 Err(Error::new(
658 ErrorKind::FeatureUnsupported,
659 "Updating namespace not supported yet!",
660 ))
661 }
662
663 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
664 let context = self.context().await?;
665
666 let request = context
667 .client
668 .request(Method::DELETE, context.config.namespace_endpoint(namespace))
669 .build()?;
670
671 let http_response = context.client.query_catalog(request).await?;
672
673 match http_response.status() {
674 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
675 StatusCode::NOT_FOUND => Err(Error::new(
676 ErrorKind::NamespaceNotFound,
677 "Tried to drop a namespace that does not exist",
678 )),
679 _ => Err(deserialize_unexpected_catalog_error(
680 http_response,
681 context.client.disable_header_redaction(),
682 )
683 .await),
684 }
685 }
686
687 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
688 let context = self.context().await?;
689 let endpoint = context.config.tables_endpoint(namespace);
690 let mut identifiers = Vec::new();
691 let mut next_token = None;
692
693 loop {
694 let mut request = context.client.request(Method::GET, endpoint.clone());
695
696 if let Some(token) = next_token {
697 request = request.query(&[("pageToken", token)]);
698 }
699
700 let http_response = context.client.query_catalog(request.build()?).await?;
701
702 match http_response.status() {
703 StatusCode::OK => {
704 let response =
705 deserialize_catalog_response::<ListTablesResponse>(http_response).await?;
706
707 identifiers.extend(response.identifiers);
708
709 match response.next_page_token {
710 Some(token) => next_token = Some(token),
711 None => break,
712 }
713 }
714 StatusCode::NOT_FOUND => {
715 return Err(Error::new(
716 ErrorKind::NamespaceNotFound,
717 "Tried to list tables of a namespace that does not exist",
718 ));
719 }
720 _ => {
721 return Err(deserialize_unexpected_catalog_error(
722 http_response,
723 context.client.disable_header_redaction(),
724 )
725 .await);
726 }
727 }
728 }
729
730 Ok(identifiers)
731 }
732
733 async fn create_table(
740 &self,
741 namespace: &NamespaceIdent,
742 creation: TableCreation,
743 ) -> Result<Table> {
744 let context = self.context().await?;
745
746 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
747
748 let request = context
749 .client
750 .request(Method::POST, context.config.tables_endpoint(namespace))
751 .json(&CreateTableRequest {
752 name: creation.name,
753 location: creation.location,
754 schema: creation.schema,
755 partition_spec: creation.partition_spec,
756 write_order: creation.sort_order,
757 stage_create: Some(false),
758 properties: creation.properties,
759 })
760 .build()?;
761
762 let http_response = context.client.query_catalog(request).await?;
763
764 let response = match http_response.status() {
765 StatusCode::OK => {
766 deserialize_catalog_response::<LoadTableResult>(http_response).await?
767 }
768 StatusCode::NOT_FOUND => {
769 return Err(Error::new(
770 ErrorKind::NamespaceNotFound,
771 "Tried to create a table under a namespace that does not exist",
772 ));
773 }
774 StatusCode::CONFLICT => {
775 return Err(Error::new(
776 ErrorKind::TableAlreadyExists,
777 "The table already exists",
778 ));
779 }
780 _ => {
781 return Err(deserialize_unexpected_catalog_error(
782 http_response,
783 context.client.disable_header_redaction(),
784 )
785 .await);
786 }
787 };
788
789 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
790 ErrorKind::DataInvalid,
791 "Metadata location missing in `create_table` response!",
792 ))?;
793
794 let config = response
795 .config
796 .into_iter()
797 .chain(self.user_config.props.clone())
798 .collect();
799
800 let file_io = self
801 .load_file_io(Some(metadata_location), Some(config))
802 .await?;
803
804 let table_builder = Table::builder()
805 .identifier(table_ident.clone())
806 .file_io(file_io)
807 .metadata(response.metadata)
808 .runtime(self.runtime.clone());
809
810 if let Some(metadata_location) = response.metadata_location {
811 table_builder.metadata_location(metadata_location).build()
812 } else {
813 table_builder.build()
814 }
815 }
816
817 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
823 let context = self.context().await?;
824
825 let request = context
826 .client
827 .request(Method::GET, context.config.table_endpoint(table_ident))
828 .build()?;
829
830 let http_response = context.client.query_catalog(request).await?;
831
832 let response = match http_response.status() {
833 StatusCode::OK | StatusCode::NOT_MODIFIED => {
834 deserialize_catalog_response::<LoadTableResult>(http_response).await?
835 }
836 StatusCode::NOT_FOUND => {
837 return Err(Error::new(
838 ErrorKind::TableNotFound,
839 "Tried to load a table that does not exist",
840 ));
841 }
842 _ => {
843 return Err(deserialize_unexpected_catalog_error(
844 http_response,
845 context.client.disable_header_redaction(),
846 )
847 .await);
848 }
849 };
850
851 let config = response
852 .config
853 .into_iter()
854 .chain(self.user_config.props.clone())
855 .collect();
856
857 let file_io = self
858 .load_file_io(response.metadata_location.as_deref(), Some(config))
859 .await?;
860
861 let table_builder = Table::builder()
862 .identifier(table_ident.clone())
863 .file_io(file_io)
864 .metadata(response.metadata)
865 .runtime(self.runtime.clone());
866
867 if let Some(metadata_location) = response.metadata_location {
868 table_builder.metadata_location(metadata_location).build()
869 } else {
870 table_builder.build()
871 }
872 }
873
874 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
876 self.delete_table(table, false).await
877 }
878
879 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
882 self.delete_table(table, true).await
883 }
884
885 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
887 let context = self.context().await?;
888
889 let request = context
890 .client
891 .request(Method::HEAD, context.config.table_endpoint(table))
892 .build()?;
893
894 let http_response = context.client.query_catalog(request).await?;
895
896 match http_response.status() {
897 StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
898 StatusCode::NOT_FOUND => Ok(false),
899 _ => Err(deserialize_unexpected_catalog_error(
900 http_response,
901 context.client.disable_header_redaction(),
902 )
903 .await),
904 }
905 }
906
907 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
909 let context = self.context().await?;
910
911 let request = context
912 .client
913 .request(Method::POST, context.config.rename_table_endpoint())
914 .json(&RenameTableRequest {
915 source: src.clone(),
916 destination: dest.clone(),
917 })
918 .build()?;
919
920 let http_response = context.client.query_catalog(request).await?;
921
922 match http_response.status() {
923 StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
924 StatusCode::NOT_FOUND => Err(Error::new(
925 ErrorKind::TableNotFound,
926 "Tried to rename a table that does not exist (is the namespace correct?)",
927 )),
928 StatusCode::CONFLICT => Err(Error::new(
929 ErrorKind::TableAlreadyExists,
930 "Tried to rename a table to a name that already exists",
931 )),
932 _ => Err(deserialize_unexpected_catalog_error(
933 http_response,
934 context.client.disable_header_redaction(),
935 )
936 .await),
937 }
938 }
939
940 async fn register_table(
941 &self,
942 table_ident: &TableIdent,
943 metadata_location: String,
944 ) -> Result<Table> {
945 let context = self.context().await?;
946
947 let request = context
948 .client
949 .request(
950 Method::POST,
951 context
952 .config
953 .register_table_endpoint(table_ident.namespace()),
954 )
955 .json(&RegisterTableRequest {
956 name: table_ident.name.clone(),
957 metadata_location: metadata_location.clone(),
958 overwrite: Some(false),
959 })
960 .build()?;
961
962 let http_response = context.client.query_catalog(request).await?;
963
964 let response: LoadTableResult = match http_response.status() {
965 StatusCode::OK => {
966 deserialize_catalog_response::<LoadTableResult>(http_response).await?
967 }
968 StatusCode::NOT_FOUND => {
969 return Err(Error::new(
970 ErrorKind::NamespaceNotFound,
971 "The namespace specified does not exist.",
972 ));
973 }
974 StatusCode::CONFLICT => {
975 return Err(Error::new(
976 ErrorKind::TableAlreadyExists,
977 "The given table already exists.",
978 ));
979 }
980 _ => {
981 return Err(deserialize_unexpected_catalog_error(
982 http_response,
983 context.client.disable_header_redaction(),
984 )
985 .await);
986 }
987 };
988
989 let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
990 ErrorKind::DataInvalid,
991 "Metadata location missing in `register_table` response!",
992 ))?;
993
994 let file_io = self.load_file_io(Some(metadata_location), None).await?;
995
996 Table::builder()
997 .identifier(table_ident.clone())
998 .file_io(file_io)
999 .metadata(response.metadata)
1000 .metadata_location(metadata_location.clone())
1001 .runtime(self.runtime.clone())
1002 .build()
1003 }
1004
1005 async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
1006 let context = self.context().await?;
1007
1008 let request = context
1009 .client
1010 .request(
1011 Method::POST,
1012 context.config.table_endpoint(commit.identifier()),
1013 )
1014 .json(&CommitTableRequest {
1015 identifier: Some(commit.identifier().clone()),
1016 requirements: commit.take_requirements(),
1017 updates: commit.take_updates(),
1018 })
1019 .build()?;
1020
1021 let http_response = context.client.query_catalog(request).await?;
1022
1023 let response: CommitTableResponse = match http_response.status() {
1024 StatusCode::OK => deserialize_catalog_response(http_response).await?,
1025 StatusCode::NOT_FOUND => {
1026 return Err(Error::new(
1027 ErrorKind::TableNotFound,
1028 "Tried to update a table that does not exist",
1029 ));
1030 }
1031 StatusCode::CONFLICT => {
1032 return Err(Error::new(
1033 ErrorKind::CatalogCommitConflicts,
1034 "CatalogCommitConflicts, one or more requirements failed. The client may retry.",
1035 )
1036 .with_retryable(true));
1037 }
1038 StatusCode::INTERNAL_SERVER_ERROR => {
1039 return Err(Error::new(
1040 ErrorKind::Unexpected,
1041 "An unknown server-side problem occurred; the commit state is unknown.",
1042 ));
1043 }
1044 StatusCode::BAD_GATEWAY => {
1045 return Err(Error::new(
1046 ErrorKind::Unexpected,
1047 "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
1048 ));
1049 }
1050 StatusCode::GATEWAY_TIMEOUT => {
1051 return Err(Error::new(
1052 ErrorKind::Unexpected,
1053 "A server-side gateway timeout occurred; the commit state is unknown.",
1054 ));
1055 }
1056 _ => {
1057 return Err(deserialize_unexpected_catalog_error(
1058 http_response,
1059 context.client.disable_header_redaction(),
1060 )
1061 .await);
1062 }
1063 };
1064
1065 let file_io = self
1066 .load_file_io(Some(&response.metadata_location), None)
1067 .await?;
1068
1069 Table::builder()
1070 .identifier(commit.identifier().clone())
1071 .file_io(file_io)
1072 .metadata(response.metadata)
1073 .metadata_location(response.metadata_location)
1074 .runtime(self.runtime.clone())
1075 .build()
1076 }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use std::fs::File;
1082 use std::io::BufReader;
1083 use std::sync::Arc;
1084
1085 use chrono::{TimeZone, Utc};
1086 use iceberg::io::LocalFsStorageFactory;
1087 use iceberg::spec::{
1088 FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
1089 SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
1090 UnboundPartitionField, UnboundPartitionSpec,
1091 };
1092 use iceberg::test_utils::test_runtime;
1093 use iceberg::transaction::{ApplyTransactionAction, Transaction};
1094 use mockito::{Mock, Server, ServerGuard};
1095 use serde_json::json;
1096 use uuid::uuid;
1097
1098 use super::*;
1099
1100 #[tokio::test]
1101 async fn test_update_config() {
1102 let mut server = Server::new_async().await;
1103
1104 let config_mock = server
1105 .mock("GET", "/v1/config")
1106 .with_status(200)
1107 .with_body(
1108 r#"{
1109 "overrides": {
1110 "warehouse": "s3://iceberg-catalog"
1111 },
1112 "defaults": {}
1113 }"#,
1114 )
1115 .create_async()
1116 .await;
1117
1118 let catalog = RestCatalog::new(
1119 RestCatalogConfig::builder().uri(server.url()).build(),
1120 Some(Arc::new(LocalFsStorageFactory)),
1121 Runtime::current(),
1122 );
1123
1124 assert_eq!(
1125 catalog
1126 .context()
1127 .await
1128 .unwrap()
1129 .config
1130 .props
1131 .get("warehouse"),
1132 Some(&"s3://iceberg-catalog".to_string())
1133 );
1134
1135 config_mock.assert_async().await;
1136 }
1137
1138 async fn create_config_mock(server: &mut ServerGuard) -> Mock {
1139 server
1140 .mock("GET", "/v1/config")
1141 .with_status(200)
1142 .with_body(
1143 r#"{
1144 "overrides": {
1145 "warehouse": "s3://iceberg-catalog"
1146 },
1147 "defaults": {}
1148 }"#,
1149 )
1150 .create_async()
1151 .await
1152 }
1153
1154 async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
1155 create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
1156 }
1157
1158 async fn create_oauth_mock_with_path(
1159 server: &mut ServerGuard,
1160 path: &str,
1161 token: &str,
1162 status: usize,
1163 ) -> Mock {
1164 let body = format!(
1165 r#"{{
1166 "access_token": "{token}",
1167 "token_type": "Bearer",
1168 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1169 "expires_in": 86400
1170 }}"#
1171 );
1172 server
1173 .mock("POST", path)
1174 .with_status(status)
1175 .with_body(body)
1176 .expect(1)
1177 .create_async()
1178 .await
1179 }
1180
1181 #[tokio::test]
1182 async fn test_oauth() {
1183 let mut server = Server::new_async().await;
1184 let oauth_mock = create_oauth_mock(&mut server).await;
1185 let config_mock = create_config_mock(&mut server).await;
1186
1187 let mut props = HashMap::new();
1188 props.insert("credential".to_string(), "client1:secret1".to_string());
1189
1190 let catalog = RestCatalog::new(
1191 RestCatalogConfig::builder()
1192 .uri(server.url())
1193 .props(props)
1194 .build(),
1195 Some(Arc::new(LocalFsStorageFactory)),
1196 Runtime::current(),
1197 );
1198
1199 let token = catalog.context().await.unwrap().client.token().await;
1200 oauth_mock.assert_async().await;
1201 config_mock.assert_async().await;
1202 assert_eq!(token, Some("ey000000000000".to_string()));
1203 }
1204
1205 #[tokio::test]
1206 async fn test_oauth_with_optional_param() {
1207 let mut props = HashMap::new();
1208 props.insert("credential".to_string(), "client1:secret1".to_string());
1209 props.insert("scope".to_string(), "custom_scope".to_string());
1210 props.insert("audience".to_string(), "custom_audience".to_string());
1211 props.insert("resource".to_string(), "custom_resource".to_string());
1212
1213 let mut server = Server::new_async().await;
1214 let oauth_mock = server
1215 .mock("POST", "/v1/oauth/tokens")
1216 .match_body(mockito::Matcher::Regex("scope=custom_scope".to_string()))
1217 .match_body(mockito::Matcher::Regex(
1218 "audience=custom_audience".to_string(),
1219 ))
1220 .match_body(mockito::Matcher::Regex(
1221 "resource=custom_resource".to_string(),
1222 ))
1223 .with_status(200)
1224 .with_body(
1225 r#"{
1226 "access_token": "ey000000000000",
1227 "token_type": "Bearer",
1228 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1229 "expires_in": 86400
1230 }"#,
1231 )
1232 .expect(1)
1233 .create_async()
1234 .await;
1235
1236 let config_mock = create_config_mock(&mut server).await;
1237
1238 let catalog = RestCatalog::new(
1239 RestCatalogConfig::builder()
1240 .uri(server.url())
1241 .props(props)
1242 .build(),
1243 Some(Arc::new(LocalFsStorageFactory)),
1244 Runtime::current(),
1245 );
1246
1247 let token = catalog.context().await.unwrap().client.token().await;
1248
1249 oauth_mock.assert_async().await;
1250 config_mock.assert_async().await;
1251 assert_eq!(token, Some("ey000000000000".to_string()));
1252 }
1253
1254 #[tokio::test]
1255 async fn test_invalidate_token() {
1256 let mut server = Server::new_async().await;
1257 let oauth_mock = create_oauth_mock(&mut server).await;
1258 let config_mock = create_config_mock(&mut server).await;
1259
1260 let mut props = HashMap::new();
1261 props.insert("credential".to_string(), "client1:secret1".to_string());
1262
1263 let catalog = RestCatalog::new(
1264 RestCatalogConfig::builder()
1265 .uri(server.url())
1266 .props(props)
1267 .build(),
1268 Some(Arc::new(LocalFsStorageFactory)),
1269 Runtime::current(),
1270 );
1271
1272 let token = catalog.context().await.unwrap().client.token().await;
1273 oauth_mock.assert_async().await;
1274 config_mock.assert_async().await;
1275 assert_eq!(token, Some("ey000000000000".to_string()));
1276
1277 let oauth_mock =
1278 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1279 .await;
1280 catalog.invalidate_token().await.unwrap();
1281 let token = catalog.context().await.unwrap().client.token().await;
1282 oauth_mock.assert_async().await;
1283 assert_eq!(token, Some("ey000000000001".to_string()));
1284 }
1285
1286 #[tokio::test]
1287 async fn test_invalidate_token_failing_request() {
1288 let mut server = Server::new_async().await;
1289 let oauth_mock = create_oauth_mock(&mut server).await;
1290 let config_mock = create_config_mock(&mut server).await;
1291
1292 let mut props = HashMap::new();
1293 props.insert("credential".to_string(), "client1:secret1".to_string());
1294
1295 let catalog = RestCatalog::new(
1296 RestCatalogConfig::builder()
1297 .uri(server.url())
1298 .props(props)
1299 .build(),
1300 Some(Arc::new(LocalFsStorageFactory)),
1301 Runtime::current(),
1302 );
1303
1304 let token = catalog.context().await.unwrap().client.token().await;
1305 oauth_mock.assert_async().await;
1306 config_mock.assert_async().await;
1307 assert_eq!(token, Some("ey000000000000".to_string()));
1308
1309 let oauth_mock =
1310 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1311 .await;
1312 catalog.invalidate_token().await.unwrap();
1313 let token = catalog.context().await.unwrap().client.token().await;
1314 oauth_mock.assert_async().await;
1315 assert_eq!(token, None);
1316 }
1317
1318 #[tokio::test]
1319 async fn test_regenerate_token() {
1320 let mut server = Server::new_async().await;
1321 let oauth_mock = create_oauth_mock(&mut server).await;
1322 let config_mock = create_config_mock(&mut server).await;
1323
1324 let mut props = HashMap::new();
1325 props.insert("credential".to_string(), "client1:secret1".to_string());
1326
1327 let catalog = RestCatalog::new(
1328 RestCatalogConfig::builder()
1329 .uri(server.url())
1330 .props(props)
1331 .build(),
1332 Some(Arc::new(LocalFsStorageFactory)),
1333 Runtime::current(),
1334 );
1335
1336 let token = catalog.context().await.unwrap().client.token().await;
1337 oauth_mock.assert_async().await;
1338 config_mock.assert_async().await;
1339 assert_eq!(token, Some("ey000000000000".to_string()));
1340
1341 let oauth_mock =
1342 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1343 .await;
1344 catalog.regenerate_token().await.unwrap();
1345 oauth_mock.assert_async().await;
1346 let token = catalog.context().await.unwrap().client.token().await;
1347 assert_eq!(token, Some("ey000000000001".to_string()));
1348 }
1349
1350 #[tokio::test]
1351 async fn test_regenerate_token_failing_request() {
1352 let mut server = Server::new_async().await;
1353 let oauth_mock = create_oauth_mock(&mut server).await;
1354 let config_mock = create_config_mock(&mut server).await;
1355
1356 let mut props = HashMap::new();
1357 props.insert("credential".to_string(), "client1:secret1".to_string());
1358
1359 let catalog = RestCatalog::new(
1360 RestCatalogConfig::builder()
1361 .uri(server.url())
1362 .props(props)
1363 .build(),
1364 Some(Arc::new(LocalFsStorageFactory)),
1365 Runtime::current(),
1366 );
1367
1368 let token = catalog.context().await.unwrap().client.token().await;
1369 oauth_mock.assert_async().await;
1370 config_mock.assert_async().await;
1371 assert_eq!(token, Some("ey000000000000".to_string()));
1372
1373 let oauth_mock =
1374 create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1375 .await;
1376 let invalidate_result = catalog.regenerate_token().await;
1377 assert!(invalidate_result.is_err());
1378 oauth_mock.assert_async().await;
1379 let token = catalog.context().await.unwrap().client.token().await;
1380
1381 assert_eq!(token, Some("ey000000000000".to_string()));
1383 }
1384
1385 #[tokio::test]
1386 async fn test_http_headers() {
1387 let server = Server::new_async().await;
1388 let mut props = HashMap::new();
1389 props.insert("credential".to_string(), "client1:secret1".to_string());
1390
1391 let config = RestCatalogConfig::builder()
1392 .uri(server.url())
1393 .props(props)
1394 .build();
1395 let headers: HeaderMap = config.extra_headers().unwrap();
1396
1397 let expected_headers = HeaderMap::from_iter([
1398 (
1399 header::CONTENT_TYPE,
1400 HeaderValue::from_static("application/json"),
1401 ),
1402 (
1403 HeaderName::from_static("x-client-version"),
1404 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1405 ),
1406 (
1407 header::USER_AGENT,
1408 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1409 ),
1410 ]);
1411 assert_eq!(headers, expected_headers);
1412 }
1413
1414 #[tokio::test]
1415 async fn test_http_headers_with_custom_headers() {
1416 let server = Server::new_async().await;
1417 let mut props = HashMap::new();
1418 props.insert("credential".to_string(), "client1:secret1".to_string());
1419 props.insert(
1420 "header.content-type".to_string(),
1421 "application/yaml".to_string(),
1422 );
1423 props.insert(
1424 "header.customized-header".to_string(),
1425 "some/value".to_string(),
1426 );
1427
1428 let config = RestCatalogConfig::builder()
1429 .uri(server.url())
1430 .props(props)
1431 .build();
1432 let headers: HeaderMap = config.extra_headers().unwrap();
1433
1434 let expected_headers = HeaderMap::from_iter([
1435 (
1436 header::CONTENT_TYPE,
1437 HeaderValue::from_static("application/yaml"),
1438 ),
1439 (
1440 HeaderName::from_static("x-client-version"),
1441 HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
1442 ),
1443 (
1444 header::USER_AGENT,
1445 HeaderValue::from_str(&format!("iceberg-rs/{CARGO_PKG_VERSION}")).unwrap(),
1446 ),
1447 (
1448 HeaderName::from_static("customized-header"),
1449 HeaderValue::from_static("some/value"),
1450 ),
1451 ]);
1452 assert_eq!(headers, expected_headers);
1453 }
1454
1455 #[tokio::test]
1456 async fn test_oauth_with_oauth2_server_uri() {
1457 let mut server = Server::new_async().await;
1458 let config_mock = create_config_mock(&mut server).await;
1459
1460 let mut auth_server = Server::new_async().await;
1461 let auth_server_path = "/some/path";
1462 let oauth_mock =
1463 create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1464 .await;
1465
1466 let mut props = HashMap::new();
1467 props.insert("credential".to_string(), "client1:secret1".to_string());
1468 props.insert(
1469 "oauth2-server-uri".to_string(),
1470 format!("{}{}", auth_server.url(), auth_server_path).to_string(),
1471 );
1472
1473 let catalog = RestCatalog::new(
1474 RestCatalogConfig::builder()
1475 .uri(server.url())
1476 .props(props)
1477 .build(),
1478 Some(Arc::new(LocalFsStorageFactory)),
1479 Runtime::current(),
1480 );
1481
1482 let token = catalog.context().await.unwrap().client.token().await;
1483
1484 oauth_mock.assert_async().await;
1485 config_mock.assert_async().await;
1486 assert_eq!(token, Some("ey000000000000".to_string()));
1487 }
1488
1489 #[tokio::test]
1490 async fn test_config_override() {
1491 let mut server = Server::new_async().await;
1492 let mut redirect_server = Server::new_async().await;
1493 let new_uri = redirect_server.url();
1494
1495 let config_mock = server
1496 .mock("GET", "/v1/config")
1497 .with_status(200)
1498 .with_body(
1499 json!(
1500 {
1501 "overrides": {
1502 "uri": new_uri,
1503 "warehouse": "s3://iceberg-catalog",
1504 "prefix": "ice/warehouses/my"
1505 },
1506 "defaults": {},
1507 }
1508 )
1509 .to_string(),
1510 )
1511 .create_async()
1512 .await;
1513
1514 let list_ns_mock = redirect_server
1515 .mock("GET", "/v1/ice/warehouses/my/namespaces")
1516 .with_body(
1517 r#"{
1518 "namespaces": []
1519 }"#,
1520 )
1521 .create_async()
1522 .await;
1523
1524 let catalog = RestCatalog::new(
1525 RestCatalogConfig::builder().uri(server.url()).build(),
1526 Some(Arc::new(LocalFsStorageFactory)),
1527 Runtime::current(),
1528 );
1529
1530 let _namespaces = catalog.list_namespaces(None).await.unwrap();
1531
1532 config_mock.assert_async().await;
1533 list_ns_mock.assert_async().await;
1534 }
1535
1536 #[tokio::test]
1537 async fn test_list_namespace() {
1538 let mut server = Server::new_async().await;
1539
1540 let config_mock = create_config_mock(&mut server).await;
1541
1542 let list_ns_mock = server
1543 .mock("GET", "/v1/namespaces")
1544 .with_body(
1545 r#"{
1546 "namespaces": [
1547 ["ns1", "ns11"],
1548 ["ns2"]
1549 ]
1550 }"#,
1551 )
1552 .create_async()
1553 .await;
1554
1555 let catalog = RestCatalog::new(
1556 RestCatalogConfig::builder().uri(server.url()).build(),
1557 Some(Arc::new(LocalFsStorageFactory)),
1558 Runtime::current(),
1559 );
1560
1561 let namespaces = catalog.list_namespaces(None).await.unwrap();
1562
1563 let expected_ns = vec![
1564 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1565 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1566 ];
1567
1568 assert_eq!(expected_ns, namespaces);
1569
1570 config_mock.assert_async().await;
1571 list_ns_mock.assert_async().await;
1572 }
1573
1574 #[tokio::test]
1575 async fn test_list_namespace_with_pagination() {
1576 let mut server = Server::new_async().await;
1577
1578 let config_mock = create_config_mock(&mut server).await;
1579
1580 let list_ns_mock_page1 = server
1581 .mock("GET", "/v1/namespaces")
1582 .with_body(
1583 r#"{
1584 "namespaces": [
1585 ["ns1", "ns11"],
1586 ["ns2"]
1587 ],
1588 "next-page-token": "token123"
1589 }"#,
1590 )
1591 .create_async()
1592 .await;
1593
1594 let list_ns_mock_page2 = server
1595 .mock("GET", "/v1/namespaces?pageToken=token123")
1596 .with_body(
1597 r#"{
1598 "namespaces": [
1599 ["ns3"],
1600 ["ns4", "ns41"]
1601 ]
1602 }"#,
1603 )
1604 .create_async()
1605 .await;
1606
1607 let catalog = RestCatalog::new(
1608 RestCatalogConfig::builder().uri(server.url()).build(),
1609 Some(Arc::new(LocalFsStorageFactory)),
1610 Runtime::current(),
1611 );
1612
1613 let namespaces = catalog.list_namespaces(None).await.unwrap();
1614
1615 let expected_ns = vec![
1616 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1617 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1618 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1619 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1620 ];
1621
1622 assert_eq!(expected_ns, namespaces);
1623
1624 config_mock.assert_async().await;
1625 list_ns_mock_page1.assert_async().await;
1626 list_ns_mock_page2.assert_async().await;
1627 }
1628
1629 #[tokio::test]
1630 async fn test_list_namespace_with_multiple_pages() {
1631 let mut server = Server::new_async().await;
1632
1633 let config_mock = create_config_mock(&mut server).await;
1634
1635 let list_ns_mock_page1 = server
1637 .mock("GET", "/v1/namespaces")
1638 .with_body(
1639 r#"{
1640 "namespaces": [
1641 ["ns1", "ns11"],
1642 ["ns2"]
1643 ],
1644 "next-page-token": "page2"
1645 }"#,
1646 )
1647 .create_async()
1648 .await;
1649
1650 let list_ns_mock_page2 = server
1652 .mock("GET", "/v1/namespaces?pageToken=page2")
1653 .with_body(
1654 r#"{
1655 "namespaces": [
1656 ["ns3"],
1657 ["ns4", "ns41"]
1658 ],
1659 "next-page-token": "page3"
1660 }"#,
1661 )
1662 .create_async()
1663 .await;
1664
1665 let list_ns_mock_page3 = server
1667 .mock("GET", "/v1/namespaces?pageToken=page3")
1668 .with_body(
1669 r#"{
1670 "namespaces": [
1671 ["ns5", "ns51", "ns511"]
1672 ],
1673 "next-page-token": "page4"
1674 }"#,
1675 )
1676 .create_async()
1677 .await;
1678
1679 let list_ns_mock_page4 = server
1681 .mock("GET", "/v1/namespaces?pageToken=page4")
1682 .with_body(
1683 r#"{
1684 "namespaces": [
1685 ["ns6"],
1686 ["ns7"]
1687 ],
1688 "next-page-token": "page5"
1689 }"#,
1690 )
1691 .create_async()
1692 .await;
1693
1694 let list_ns_mock_page5 = server
1696 .mock("GET", "/v1/namespaces?pageToken=page5")
1697 .with_body(
1698 r#"{
1699 "namespaces": [
1700 ["ns8", "ns81"]
1701 ]
1702 }"#,
1703 )
1704 .create_async()
1705 .await;
1706
1707 let catalog = RestCatalog::new(
1708 RestCatalogConfig::builder().uri(server.url()).build(),
1709 Some(Arc::new(LocalFsStorageFactory)),
1710 Runtime::current(),
1711 );
1712
1713 let namespaces = catalog.list_namespaces(None).await.unwrap();
1714
1715 let expected_ns = vec![
1716 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1717 NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
1718 NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
1719 NamespaceIdent::from_vec(vec!["ns4".to_string(), "ns41".to_string()]).unwrap(),
1720 NamespaceIdent::from_vec(vec![
1721 "ns5".to_string(),
1722 "ns51".to_string(),
1723 "ns511".to_string(),
1724 ])
1725 .unwrap(),
1726 NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
1727 NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
1728 NamespaceIdent::from_vec(vec!["ns8".to_string(), "ns81".to_string()]).unwrap(),
1729 ];
1730
1731 assert_eq!(expected_ns, namespaces);
1732
1733 config_mock.assert_async().await;
1735 list_ns_mock_page1.assert_async().await;
1736 list_ns_mock_page2.assert_async().await;
1737 list_ns_mock_page3.assert_async().await;
1738 list_ns_mock_page4.assert_async().await;
1739 list_ns_mock_page5.assert_async().await;
1740 }
1741
1742 #[tokio::test]
1743 async fn test_create_namespace() {
1744 let mut server = Server::new_async().await;
1745
1746 let config_mock = create_config_mock(&mut server).await;
1747
1748 let create_ns_mock = server
1749 .mock("POST", "/v1/namespaces")
1750 .with_body(
1751 r#"{
1752 "namespace": [ "ns1", "ns11"],
1753 "properties" : {
1754 "key1": "value1"
1755 }
1756 }"#,
1757 )
1758 .create_async()
1759 .await;
1760
1761 let catalog = RestCatalog::new(
1762 RestCatalogConfig::builder().uri(server.url()).build(),
1763 Some(Arc::new(LocalFsStorageFactory)),
1764 Runtime::current(),
1765 );
1766
1767 let namespaces = catalog
1768 .create_namespace(
1769 &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1770 HashMap::from([("key1".to_string(), "value1".to_string())]),
1771 )
1772 .await
1773 .unwrap();
1774
1775 let expected_ns = Namespace::with_properties(
1776 NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(),
1777 HashMap::from([("key1".to_string(), "value1".to_string())]),
1778 );
1779
1780 assert_eq!(expected_ns, namespaces);
1781
1782 config_mock.assert_async().await;
1783 create_ns_mock.assert_async().await;
1784 }
1785
1786 #[tokio::test]
1787 async fn test_get_namespace() {
1788 let mut server = Server::new_async().await;
1789
1790 let config_mock = create_config_mock(&mut server).await;
1791
1792 let get_ns_mock = server
1793 .mock("GET", "/v1/namespaces/ns1")
1794 .with_body(
1795 r#"{
1796 "namespace": [ "ns1"],
1797 "properties" : {
1798 "key1": "value1"
1799 }
1800 }"#,
1801 )
1802 .create_async()
1803 .await;
1804
1805 let catalog = RestCatalog::new(
1806 RestCatalogConfig::builder().uri(server.url()).build(),
1807 Some(Arc::new(LocalFsStorageFactory)),
1808 Runtime::current(),
1809 );
1810
1811 let namespaces = catalog
1812 .get_namespace(&NamespaceIdent::new("ns1".to_string()))
1813 .await
1814 .unwrap();
1815
1816 let expected_ns = Namespace::with_properties(
1817 NamespaceIdent::new("ns1".to_string()),
1818 HashMap::from([("key1".to_string(), "value1".to_string())]),
1819 );
1820
1821 assert_eq!(expected_ns, namespaces);
1822
1823 config_mock.assert_async().await;
1824 get_ns_mock.assert_async().await;
1825 }
1826
1827 #[tokio::test]
1828 async fn check_namespace_exists() {
1829 let mut server = Server::new_async().await;
1830
1831 let config_mock = create_config_mock(&mut server).await;
1832
1833 let get_ns_mock = server
1834 .mock("HEAD", "/v1/namespaces/ns1")
1835 .with_status(204)
1836 .create_async()
1837 .await;
1838
1839 let catalog = RestCatalog::new(
1840 RestCatalogConfig::builder().uri(server.url()).build(),
1841 Some(Arc::new(LocalFsStorageFactory)),
1842 Runtime::current(),
1843 );
1844
1845 assert!(
1846 catalog
1847 .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
1848 .await
1849 .unwrap()
1850 );
1851
1852 config_mock.assert_async().await;
1853 get_ns_mock.assert_async().await;
1854 }
1855
1856 #[tokio::test]
1857 async fn test_drop_namespace() {
1858 let mut server = Server::new_async().await;
1859
1860 let config_mock = create_config_mock(&mut server).await;
1861
1862 let drop_ns_mock = server
1863 .mock("DELETE", "/v1/namespaces/ns1")
1864 .with_status(204)
1865 .create_async()
1866 .await;
1867
1868 let catalog = RestCatalog::new(
1869 RestCatalogConfig::builder().uri(server.url()).build(),
1870 Some(Arc::new(LocalFsStorageFactory)),
1871 Runtime::current(),
1872 );
1873
1874 catalog
1875 .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
1876 .await
1877 .unwrap();
1878
1879 config_mock.assert_async().await;
1880 drop_ns_mock.assert_async().await;
1881 }
1882
1883 #[tokio::test]
1884 async fn test_list_tables() {
1885 let mut server = Server::new_async().await;
1886
1887 let config_mock = create_config_mock(&mut server).await;
1888
1889 let list_tables_mock = server
1890 .mock("GET", "/v1/namespaces/ns1/tables")
1891 .with_status(200)
1892 .with_body(
1893 r#"{
1894 "identifiers": [
1895 {
1896 "namespace": ["ns1"],
1897 "name": "table1"
1898 },
1899 {
1900 "namespace": ["ns1"],
1901 "name": "table2"
1902 }
1903 ]
1904 }"#,
1905 )
1906 .create_async()
1907 .await;
1908
1909 let catalog = RestCatalog::new(
1910 RestCatalogConfig::builder().uri(server.url()).build(),
1911 Some(Arc::new(LocalFsStorageFactory)),
1912 Runtime::current(),
1913 );
1914
1915 let tables = catalog
1916 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1917 .await
1918 .unwrap();
1919
1920 let expected_tables = vec![
1921 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1922 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1923 ];
1924
1925 assert_eq!(tables, expected_tables);
1926
1927 config_mock.assert_async().await;
1928 list_tables_mock.assert_async().await;
1929 }
1930
1931 #[tokio::test]
1932 async fn test_list_tables_with_pagination() {
1933 let mut server = Server::new_async().await;
1934
1935 let config_mock = create_config_mock(&mut server).await;
1936
1937 let list_tables_mock_page1 = server
1938 .mock("GET", "/v1/namespaces/ns1/tables")
1939 .with_status(200)
1940 .with_body(
1941 r#"{
1942 "identifiers": [
1943 {
1944 "namespace": ["ns1"],
1945 "name": "table1"
1946 },
1947 {
1948 "namespace": ["ns1"],
1949 "name": "table2"
1950 }
1951 ],
1952 "next-page-token": "token456"
1953 }"#,
1954 )
1955 .create_async()
1956 .await;
1957
1958 let list_tables_mock_page2 = server
1959 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
1960 .with_status(200)
1961 .with_body(
1962 r#"{
1963 "identifiers": [
1964 {
1965 "namespace": ["ns1"],
1966 "name": "table3"
1967 },
1968 {
1969 "namespace": ["ns1"],
1970 "name": "table4"
1971 }
1972 ]
1973 }"#,
1974 )
1975 .create_async()
1976 .await;
1977
1978 let catalog = RestCatalog::new(
1979 RestCatalogConfig::builder().uri(server.url()).build(),
1980 Some(Arc::new(LocalFsStorageFactory)),
1981 Runtime::current(),
1982 );
1983
1984 let tables = catalog
1985 .list_tables(&NamespaceIdent::new("ns1".to_string()))
1986 .await
1987 .unwrap();
1988
1989 let expected_tables = vec![
1990 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
1991 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
1992 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
1993 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
1994 ];
1995
1996 assert_eq!(tables, expected_tables);
1997
1998 config_mock.assert_async().await;
1999 list_tables_mock_page1.assert_async().await;
2000 list_tables_mock_page2.assert_async().await;
2001 }
2002
2003 #[tokio::test]
2004 async fn test_list_tables_with_multiple_pages() {
2005 let mut server = Server::new_async().await;
2006
2007 let config_mock = create_config_mock(&mut server).await;
2008
2009 let list_tables_mock_page1 = server
2011 .mock("GET", "/v1/namespaces/ns1/tables")
2012 .with_status(200)
2013 .with_body(
2014 r#"{
2015 "identifiers": [
2016 {
2017 "namespace": ["ns1"],
2018 "name": "table1"
2019 },
2020 {
2021 "namespace": ["ns1"],
2022 "name": "table2"
2023 }
2024 ],
2025 "next-page-token": "page2"
2026 }"#,
2027 )
2028 .create_async()
2029 .await;
2030
2031 let list_tables_mock_page2 = server
2033 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
2034 .with_status(200)
2035 .with_body(
2036 r#"{
2037 "identifiers": [
2038 {
2039 "namespace": ["ns1"],
2040 "name": "table3"
2041 },
2042 {
2043 "namespace": ["ns1"],
2044 "name": "table4"
2045 }
2046 ],
2047 "next-page-token": "page3"
2048 }"#,
2049 )
2050 .create_async()
2051 .await;
2052
2053 let list_tables_mock_page3 = server
2055 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
2056 .with_status(200)
2057 .with_body(
2058 r#"{
2059 "identifiers": [
2060 {
2061 "namespace": ["ns1"],
2062 "name": "table5"
2063 }
2064 ],
2065 "next-page-token": "page4"
2066 }"#,
2067 )
2068 .create_async()
2069 .await;
2070
2071 let list_tables_mock_page4 = server
2073 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
2074 .with_status(200)
2075 .with_body(
2076 r#"{
2077 "identifiers": [
2078 {
2079 "namespace": ["ns1"],
2080 "name": "table6"
2081 },
2082 {
2083 "namespace": ["ns1"],
2084 "name": "table7"
2085 }
2086 ],
2087 "next-page-token": "page5"
2088 }"#,
2089 )
2090 .create_async()
2091 .await;
2092
2093 let list_tables_mock_page5 = server
2095 .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
2096 .with_status(200)
2097 .with_body(
2098 r#"{
2099 "identifiers": [
2100 {
2101 "namespace": ["ns1"],
2102 "name": "table8"
2103 }
2104 ]
2105 }"#,
2106 )
2107 .create_async()
2108 .await;
2109
2110 let catalog = RestCatalog::new(
2111 RestCatalogConfig::builder().uri(server.url()).build(),
2112 Some(Arc::new(LocalFsStorageFactory)),
2113 Runtime::current(),
2114 );
2115
2116 let tables = catalog
2117 .list_tables(&NamespaceIdent::new("ns1".to_string()))
2118 .await
2119 .unwrap();
2120
2121 let expected_tables = vec![
2122 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2123 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2124 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table3".to_string()),
2125 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table4".to_string()),
2126 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table5".to_string()),
2127 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table6".to_string()),
2128 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table7".to_string()),
2129 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table8".to_string()),
2130 ];
2131
2132 assert_eq!(tables, expected_tables);
2133
2134 config_mock.assert_async().await;
2136 list_tables_mock_page1.assert_async().await;
2137 list_tables_mock_page2.assert_async().await;
2138 list_tables_mock_page3.assert_async().await;
2139 list_tables_mock_page4.assert_async().await;
2140 list_tables_mock_page5.assert_async().await;
2141 }
2142
2143 #[tokio::test]
2144 async fn test_drop_tables() {
2145 let mut server = Server::new_async().await;
2146
2147 let config_mock = create_config_mock(&mut server).await;
2148
2149 let delete_table_mock = server
2150 .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
2151 .with_status(204)
2152 .create_async()
2153 .await;
2154
2155 let catalog = RestCatalog::new(
2156 RestCatalogConfig::builder().uri(server.url()).build(),
2157 Some(Arc::new(LocalFsStorageFactory)),
2158 Runtime::current(),
2159 );
2160
2161 catalog
2162 .drop_table(&TableIdent::new(
2163 NamespaceIdent::new("ns1".to_string()),
2164 "table1".to_string(),
2165 ))
2166 .await
2167 .unwrap();
2168
2169 config_mock.assert_async().await;
2170 delete_table_mock.assert_async().await;
2171 }
2172
2173 #[tokio::test]
2174 async fn test_check_table_exists() {
2175 let mut server = Server::new_async().await;
2176
2177 let config_mock = create_config_mock(&mut server).await;
2178
2179 let check_table_exists_mock = server
2180 .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
2181 .with_status(204)
2182 .create_async()
2183 .await;
2184
2185 let catalog = RestCatalog::new(
2186 RestCatalogConfig::builder().uri(server.url()).build(),
2187 Some(Arc::new(LocalFsStorageFactory)),
2188 Runtime::current(),
2189 );
2190
2191 assert!(
2192 catalog
2193 .table_exists(&TableIdent::new(
2194 NamespaceIdent::new("ns1".to_string()),
2195 "table1".to_string(),
2196 ))
2197 .await
2198 .unwrap()
2199 );
2200
2201 config_mock.assert_async().await;
2202 check_table_exists_mock.assert_async().await;
2203 }
2204
2205 #[tokio::test]
2206 async fn test_rename_table() {
2207 let mut server = Server::new_async().await;
2208
2209 let config_mock = create_config_mock(&mut server).await;
2210
2211 let rename_table_mock = server
2212 .mock("POST", "/v1/tables/rename")
2213 .with_status(204)
2214 .create_async()
2215 .await;
2216
2217 let catalog = RestCatalog::new(
2218 RestCatalogConfig::builder().uri(server.url()).build(),
2219 Some(Arc::new(LocalFsStorageFactory)),
2220 Runtime::current(),
2221 );
2222
2223 catalog
2224 .rename_table(
2225 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()),
2226 &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()),
2227 )
2228 .await
2229 .unwrap();
2230
2231 config_mock.assert_async().await;
2232 rename_table_mock.assert_async().await;
2233 }
2234
2235 #[tokio::test]
2236 async fn test_load_table() {
2237 let mut server = Server::new_async().await;
2238
2239 let config_mock = create_config_mock(&mut server).await;
2240
2241 let rename_table_mock = server
2242 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2243 .with_status(200)
2244 .with_body_from_file(format!(
2245 "{}/testdata/{}",
2246 env!("CARGO_MANIFEST_DIR"),
2247 "load_table_response.json"
2248 ))
2249 .create_async()
2250 .await;
2251
2252 let catalog = RestCatalog::new(
2253 RestCatalogConfig::builder().uri(server.url()).build(),
2254 Some(Arc::new(LocalFsStorageFactory)),
2255 Runtime::current(),
2256 );
2257
2258 let table = catalog
2259 .load_table(&TableIdent::new(
2260 NamespaceIdent::new("ns1".to_string()),
2261 "test1".to_string(),
2262 ))
2263 .await
2264 .unwrap();
2265
2266 assert_eq!(
2267 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2268 table.identifier()
2269 );
2270 assert_eq!(
2271 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2272 table.metadata_location().unwrap()
2273 );
2274 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2275 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2276 assert_eq!(
2277 uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
2278 table.metadata().uuid()
2279 );
2280 assert_eq!(
2281 Utc.timestamp_millis_opt(1646787054459).unwrap(),
2282 table.metadata().last_updated_timestamp().unwrap()
2283 );
2284 assert_eq!(
2285 vec![&Arc::new(
2286 Schema::builder()
2287 .with_fields(vec![
2288 NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2289 NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
2290 .into(),
2291 ])
2292 .build()
2293 .unwrap()
2294 )],
2295 table.metadata().schemas_iter().collect::<Vec<_>>()
2296 );
2297 assert_eq!(
2298 &HashMap::from([
2299 ("owner".to_string(), "bryan".to_string()),
2300 (
2301 "write.metadata.compression-codec".to_string(),
2302 "gzip".to_string()
2303 )
2304 ]),
2305 table.metadata().properties()
2306 );
2307 assert_eq!(vec![&Arc::new(Snapshot::builder()
2308 .with_snapshot_id(3497810964824022504)
2309 .with_timestamp_ms(1646787054459)
2310 .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
2311 .with_sequence_number(0)
2312 .with_schema_id(0)
2313 .with_summary(Summary {
2314 operation: Operation::Append,
2315 additional_properties: HashMap::from_iter([
2316 ("spark.app.id", "local-1646787004168"),
2317 ("added-data-files", "1"),
2318 ("added-records", "1"),
2319 ("added-files-size", "697"),
2320 ("changed-partition-count", "1"),
2321 ("total-records", "1"),
2322 ("total-files-size", "697"),
2323 ("total-data-files", "1"),
2324 ("total-delete-files", "0"),
2325 ("total-position-deletes", "0"),
2326 ("total-equality-deletes", "0")
2327 ].iter().map(|p| (p.0.to_string(), p.1.to_string()))),
2328 }).build()
2329 )], table.metadata().snapshots().collect::<Vec<_>>());
2330 assert_eq!(
2331 &[SnapshotLog {
2332 timestamp_ms: 1646787054459,
2333 snapshot_id: 3497810964824022504,
2334 }],
2335 table.metadata().history()
2336 );
2337 assert_eq!(
2338 vec![&Arc::new(SortOrder {
2339 order_id: 0,
2340 fields: vec![],
2341 })],
2342 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2343 );
2344
2345 config_mock.assert_async().await;
2346 rename_table_mock.assert_async().await;
2347 }
2348
2349 #[tokio::test]
2350 async fn test_load_table_404() {
2351 let mut server = Server::new_async().await;
2352
2353 let config_mock = create_config_mock(&mut server).await;
2354
2355 let rename_table_mock = server
2356 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2357 .with_status(404)
2358 .with_body(r#"
2359{
2360 "error": {
2361 "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2362 "type": "NoSuchNamespaceErrorException",
2363 "code": 404
2364 }
2365}
2366 "#)
2367 .create_async()
2368 .await;
2369
2370 let catalog = RestCatalog::new(
2371 RestCatalogConfig::builder().uri(server.url()).build(),
2372 Some(Arc::new(LocalFsStorageFactory)),
2373 Runtime::current(),
2374 );
2375
2376 let table = catalog
2377 .load_table(&TableIdent::new(
2378 NamespaceIdent::new("ns1".to_string()),
2379 "test1".to_string(),
2380 ))
2381 .await;
2382
2383 assert!(table.is_err());
2384 assert!(table.err().unwrap().message().contains("does not exist"));
2385
2386 config_mock.assert_async().await;
2387 rename_table_mock.assert_async().await;
2388 }
2389
2390 #[tokio::test]
2391 async fn test_create_table() {
2392 let mut server = Server::new_async().await;
2393
2394 let config_mock = create_config_mock(&mut server).await;
2395
2396 let create_table_mock = server
2397 .mock("POST", "/v1/namespaces/ns1/tables")
2398 .with_status(200)
2399 .with_body_from_file(format!(
2400 "{}/testdata/{}",
2401 env!("CARGO_MANIFEST_DIR"),
2402 "create_table_response.json"
2403 ))
2404 .create_async()
2405 .await;
2406
2407 let catalog = RestCatalog::new(
2408 RestCatalogConfig::builder().uri(server.url()).build(),
2409 Some(Arc::new(LocalFsStorageFactory)),
2410 Runtime::current(),
2411 );
2412
2413 let table_creation = TableCreation::builder()
2414 .name("test1".to_string())
2415 .schema(
2416 Schema::builder()
2417 .with_fields(vec![
2418 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2419 .into(),
2420 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2421 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2422 .into(),
2423 ])
2424 .with_schema_id(1)
2425 .with_identifier_field_ids(vec![2])
2426 .build()
2427 .unwrap(),
2428 )
2429 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2430 .partition_spec(
2431 UnboundPartitionSpec::builder()
2432 .add_partition_fields(vec![
2433 UnboundPartitionField::builder()
2434 .source_id(1)
2435 .transform(Transform::Truncate(3))
2436 .name("id".to_string())
2437 .build(),
2438 ])
2439 .unwrap()
2440 .build(),
2441 )
2442 .sort_order(
2443 SortOrder::builder()
2444 .with_sort_field(
2445 SortField::builder()
2446 .source_id(2)
2447 .transform(Transform::Identity)
2448 .direction(SortDirection::Ascending)
2449 .null_order(NullOrder::First)
2450 .build(),
2451 )
2452 .build_unbound()
2453 .unwrap(),
2454 )
2455 .build();
2456
2457 let table = catalog
2458 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2459 .await
2460 .unwrap();
2461
2462 assert_eq!(
2463 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2464 table.identifier()
2465 );
2466 assert_eq!(
2467 "s3://warehouse/database/table/metadata.json",
2468 table.metadata_location().unwrap()
2469 );
2470 assert_eq!(FormatVersion::V1, table.metadata().format_version());
2471 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2472 assert_eq!(
2473 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2474 table.metadata().uuid()
2475 );
2476 assert_eq!(
2477 1657810967051,
2478 table
2479 .metadata()
2480 .last_updated_timestamp()
2481 .unwrap()
2482 .timestamp_millis()
2483 );
2484 assert_eq!(
2485 vec![&Arc::new(
2486 Schema::builder()
2487 .with_fields(vec![
2488 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2489 .into(),
2490 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2491 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2492 .into(),
2493 ])
2494 .with_schema_id(0)
2495 .with_identifier_field_ids(vec![2])
2496 .build()
2497 .unwrap()
2498 )],
2499 table.metadata().schemas_iter().collect::<Vec<_>>()
2500 );
2501 assert_eq!(
2502 &HashMap::from([
2503 (
2504 "write.delete.parquet.compression-codec".to_string(),
2505 "zstd".to_string()
2506 ),
2507 (
2508 "write.metadata.compression-codec".to_string(),
2509 "gzip".to_string()
2510 ),
2511 (
2512 "write.summary.partition-limit".to_string(),
2513 "100".to_string()
2514 ),
2515 (
2516 "write.parquet.compression-codec".to_string(),
2517 "zstd".to_string()
2518 ),
2519 ]),
2520 table.metadata().properties()
2521 );
2522 assert!(table.metadata().current_snapshot().is_none());
2523 assert!(table.metadata().history().is_empty());
2524 assert_eq!(
2525 vec![&Arc::new(SortOrder {
2526 order_id: 0,
2527 fields: vec![],
2528 })],
2529 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2530 );
2531
2532 config_mock.assert_async().await;
2533 create_table_mock.assert_async().await;
2534 }
2535
2536 #[tokio::test]
2537 async fn test_create_table_409() {
2538 let mut server = Server::new_async().await;
2539
2540 let config_mock = create_config_mock(&mut server).await;
2541
2542 let create_table_mock = server
2543 .mock("POST", "/v1/namespaces/ns1/tables")
2544 .with_status(409)
2545 .with_body(r#"
2546{
2547 "error": {
2548 "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
2549 "type": "AlreadyExistsException",
2550 "code": 409
2551 }
2552}
2553 "#)
2554 .create_async()
2555 .await;
2556
2557 let catalog = RestCatalog::new(
2558 RestCatalogConfig::builder().uri(server.url()).build(),
2559 Some(Arc::new(LocalFsStorageFactory)),
2560 Runtime::current(),
2561 );
2562
2563 let table_creation = TableCreation::builder()
2564 .name("test1".to_string())
2565 .schema(
2566 Schema::builder()
2567 .with_fields(vec![
2568 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2569 .into(),
2570 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2571 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2572 .into(),
2573 ])
2574 .with_schema_id(1)
2575 .with_identifier_field_ids(vec![2])
2576 .build()
2577 .unwrap(),
2578 )
2579 .properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
2580 .build();
2581
2582 let table_result = catalog
2583 .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation)
2584 .await;
2585
2586 assert!(table_result.is_err());
2587 assert!(
2588 table_result
2589 .err()
2590 .unwrap()
2591 .message()
2592 .contains("already exists")
2593 );
2594
2595 config_mock.assert_async().await;
2596 create_table_mock.assert_async().await;
2597 }
2598
2599 #[tokio::test]
2600 async fn test_update_table() {
2601 let mut server = Server::new_async().await;
2602
2603 let config_mock = create_config_mock(&mut server).await;
2604
2605 let load_table_mock = server
2606 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2607 .with_status(200)
2608 .with_body_from_file(format!(
2609 "{}/testdata/{}",
2610 env!("CARGO_MANIFEST_DIR"),
2611 "load_table_response.json"
2612 ))
2613 .create_async()
2614 .await;
2615
2616 let update_table_mock = server
2617 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2618 .with_status(200)
2619 .with_body_from_file(format!(
2620 "{}/testdata/{}",
2621 env!("CARGO_MANIFEST_DIR"),
2622 "update_table_response.json"
2623 ))
2624 .create_async()
2625 .await;
2626
2627 let catalog = RestCatalog::new(
2628 RestCatalogConfig::builder().uri(server.url()).build(),
2629 Some(Arc::new(LocalFsStorageFactory)),
2630 Runtime::current(),
2631 );
2632
2633 let table1 = {
2634 let file = File::open(format!(
2635 "{}/testdata/{}",
2636 env!("CARGO_MANIFEST_DIR"),
2637 "create_table_response.json"
2638 ))
2639 .unwrap();
2640 let reader = BufReader::new(file);
2641 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2642
2643 Table::builder()
2644 .metadata(resp.metadata)
2645 .metadata_location(resp.metadata_location.unwrap())
2646 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2647 .file_io(FileIO::new_with_fs())
2648 .runtime(test_runtime())
2649 .build()
2650 .unwrap()
2651 };
2652
2653 let tx = Transaction::new(&table1);
2654 let table = tx
2655 .upgrade_table_version()
2656 .set_format_version(FormatVersion::V2)
2657 .apply(tx)
2658 .unwrap()
2659 .commit(&catalog)
2660 .await
2661 .unwrap();
2662
2663 assert_eq!(
2664 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2665 table.identifier()
2666 );
2667 assert_eq!(
2668 "s3://warehouse/database/table/metadata.json",
2669 table.metadata_location().unwrap()
2670 );
2671 assert_eq!(FormatVersion::V2, table.metadata().format_version());
2672 assert_eq!("s3://warehouse/database/table", table.metadata().location());
2673 assert_eq!(
2674 uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
2675 table.metadata().uuid()
2676 );
2677 assert_eq!(
2678 1657810967051,
2679 table
2680 .metadata()
2681 .last_updated_timestamp()
2682 .unwrap()
2683 .timestamp_millis()
2684 );
2685 assert_eq!(
2686 vec![&Arc::new(
2687 Schema::builder()
2688 .with_fields(vec![
2689 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String))
2690 .into(),
2691 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2692 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean))
2693 .into(),
2694 ])
2695 .with_schema_id(0)
2696 .with_identifier_field_ids(vec![2])
2697 .build()
2698 .unwrap()
2699 )],
2700 table.metadata().schemas_iter().collect::<Vec<_>>()
2701 );
2702 assert_eq!(
2703 &HashMap::from([
2704 (
2705 "write.delete.parquet.compression-codec".to_string(),
2706 "zstd".to_string()
2707 ),
2708 (
2709 "write.metadata.compression-codec".to_string(),
2710 "gzip".to_string()
2711 ),
2712 (
2713 "write.summary.partition-limit".to_string(),
2714 "100".to_string()
2715 ),
2716 (
2717 "write.parquet.compression-codec".to_string(),
2718 "zstd".to_string()
2719 ),
2720 ]),
2721 table.metadata().properties()
2722 );
2723 assert!(table.metadata().current_snapshot().is_none());
2724 assert!(table.metadata().history().is_empty());
2725 assert_eq!(
2726 vec![&Arc::new(SortOrder {
2727 order_id: 0,
2728 fields: vec![],
2729 })],
2730 table.metadata().sort_orders_iter().collect::<Vec<_>>()
2731 );
2732
2733 config_mock.assert_async().await;
2734 update_table_mock.assert_async().await;
2735 load_table_mock.assert_async().await
2736 }
2737
2738 #[tokio::test]
2739 async fn test_update_table_404() {
2740 let mut server = Server::new_async().await;
2741
2742 let config_mock = create_config_mock(&mut server).await;
2743
2744 let load_table_mock = server
2745 .mock("GET", "/v1/namespaces/ns1/tables/test1")
2746 .with_status(200)
2747 .with_body_from_file(format!(
2748 "{}/testdata/{}",
2749 env!("CARGO_MANIFEST_DIR"),
2750 "load_table_response.json"
2751 ))
2752 .create_async()
2753 .await;
2754
2755 let update_table_mock = server
2756 .mock("POST", "/v1/namespaces/ns1/tables/test1")
2757 .with_status(404)
2758 .with_body(
2759 r#"
2760{
2761 "error": {
2762 "message": "The given table does not exist",
2763 "type": "NoSuchTableException",
2764 "code": 404
2765 }
2766}
2767 "#,
2768 )
2769 .create_async()
2770 .await;
2771
2772 let catalog = RestCatalog::new(
2773 RestCatalogConfig::builder().uri(server.url()).build(),
2774 Some(Arc::new(LocalFsStorageFactory)),
2775 Runtime::current(),
2776 );
2777
2778 let table1 = {
2779 let file = File::open(format!(
2780 "{}/testdata/{}",
2781 env!("CARGO_MANIFEST_DIR"),
2782 "create_table_response.json"
2783 ))
2784 .unwrap();
2785 let reader = BufReader::new(file);
2786 let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap();
2787
2788 Table::builder()
2789 .metadata(resp.metadata)
2790 .metadata_location(resp.metadata_location.unwrap())
2791 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2792 .file_io(FileIO::new_with_fs())
2793 .runtime(test_runtime())
2794 .build()
2795 .unwrap()
2796 };
2797
2798 let tx = Transaction::new(&table1);
2799 let table_result = tx
2800 .upgrade_table_version()
2801 .set_format_version(FormatVersion::V2)
2802 .apply(tx)
2803 .unwrap()
2804 .commit(&catalog)
2805 .await;
2806
2807 assert!(table_result.is_err());
2808 assert!(
2809 table_result
2810 .err()
2811 .unwrap()
2812 .message()
2813 .contains("does not exist")
2814 );
2815
2816 config_mock.assert_async().await;
2817 update_table_mock.assert_async().await;
2818 load_table_mock.assert_async().await;
2819 }
2820
2821 #[tokio::test]
2822 async fn test_register_table() {
2823 let mut server = Server::new_async().await;
2824
2825 let config_mock = create_config_mock(&mut server).await;
2826
2827 let register_table_mock = server
2828 .mock("POST", "/v1/namespaces/ns1/register")
2829 .with_status(200)
2830 .with_body_from_file(format!(
2831 "{}/testdata/{}",
2832 env!("CARGO_MANIFEST_DIR"),
2833 "load_table_response.json"
2834 ))
2835 .create_async()
2836 .await;
2837
2838 let catalog = RestCatalog::new(
2839 RestCatalogConfig::builder().uri(server.url()).build(),
2840 Some(Arc::new(LocalFsStorageFactory)),
2841 Runtime::current(),
2842 );
2843 let table_ident =
2844 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2845 let metadata_location = String::from(
2846 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2847 );
2848
2849 let table = catalog
2850 .register_table(&table_ident, metadata_location)
2851 .await
2852 .unwrap();
2853
2854 assert_eq!(
2855 &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2856 table.identifier()
2857 );
2858 assert_eq!(
2859 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2860 table.metadata_location().unwrap()
2861 );
2862
2863 config_mock.assert_async().await;
2864 register_table_mock.assert_async().await;
2865 }
2866
2867 #[tokio::test]
2868 async fn test_register_table_404() {
2869 let mut server = Server::new_async().await;
2870
2871 let config_mock = create_config_mock(&mut server).await;
2872
2873 let register_table_mock = server
2874 .mock("POST", "/v1/namespaces/ns1/register")
2875 .with_status(404)
2876 .with_body(
2877 r#"
2878{
2879 "error": {
2880 "message": "The namespace specified does not exist",
2881 "type": "NoSuchNamespaceErrorException",
2882 "code": 404
2883 }
2884}
2885 "#,
2886 )
2887 .create_async()
2888 .await;
2889
2890 let catalog = RestCatalog::new(
2891 RestCatalogConfig::builder().uri(server.url()).build(),
2892 Some(Arc::new(LocalFsStorageFactory)),
2893 Runtime::current(),
2894 );
2895
2896 let table_ident =
2897 TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2898 let metadata_location = String::from(
2899 "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2900 );
2901 let table = catalog
2902 .register_table(&table_ident, metadata_location)
2903 .await;
2904
2905 assert!(table.is_err());
2906 assert!(table.err().unwrap().message().contains("does not exist"));
2907
2908 config_mock.assert_async().await;
2909 register_table_mock.assert_async().await;
2910 }
2911
2912 #[tokio::test]
2913 async fn test_create_rest_catalog() {
2914 let builder = RestCatalogBuilder::default().with_client(Client::new());
2915
2916 let catalog = builder
2917 .load(
2918 "test",
2919 HashMap::from([
2920 (
2921 REST_CATALOG_PROP_URI.to_string(),
2922 "http://localhost:8080".to_string(),
2923 ),
2924 ("a".to_string(), "b".to_string()),
2925 ]),
2926 )
2927 .await;
2928
2929 assert!(catalog.is_ok());
2930
2931 let catalog_config = catalog.unwrap().user_config;
2932 assert_eq!(catalog_config.name.as_deref(), Some("test"));
2933 assert_eq!(catalog_config.uri, "http://localhost:8080");
2934 assert_eq!(catalog_config.warehouse, None);
2935 assert!(catalog_config.client.is_some());
2936
2937 assert_eq!(catalog_config.props.get("a"), Some(&"b".to_string()));
2938 assert!(!catalog_config.props.contains_key(REST_CATALOG_PROP_URI));
2939 }
2940
2941 #[tokio::test]
2942 async fn test_create_rest_catalog_no_uri() {
2943 let builder = RestCatalogBuilder::default();
2944
2945 let catalog = builder
2946 .load(
2947 "test",
2948 HashMap::from([(
2949 REST_CATALOG_PROP_WAREHOUSE.to_string(),
2950 "s3://warehouse".to_string(),
2951 )]),
2952 )
2953 .await;
2954
2955 assert!(catalog.is_err());
2956 if let Err(err) = catalog {
2957 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2958 assert_eq!(err.message(), "Catalog uri is required");
2959 }
2960 }
2961}