iceberg/io/storage/opendal/
s3.rs1use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use opendal::services::S3Config;
23use opendal::{Configurator, Operator};
24pub use reqsign::{AwsCredential, AwsCredentialLoad};
25use reqwest::Client;
26use url::Url;
27
28use crate::io::{
29 CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
30 S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD,
31 S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY,
32 S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, is_truthy,
33};
34use crate::{Error, ErrorKind, Result};
35
36pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
38 let mut cfg = S3Config::default();
39 if let Some(endpoint) = m.remove(S3_ENDPOINT) {
40 cfg.endpoint = Some(endpoint);
41 };
42 if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
43 cfg.access_key_id = Some(access_key_id);
44 };
45 if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
46 cfg.secret_access_key = Some(secret_access_key);
47 };
48 if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
49 cfg.session_token = Some(session_token);
50 };
51 if let Some(region) = m.remove(S3_REGION) {
52 cfg.region = Some(region);
53 };
54 if let Some(region) = m.remove(CLIENT_REGION) {
55 cfg.region = Some(region);
56 };
57 if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
58 cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
59 };
60 if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
61 cfg.role_arn = Some(arn);
62 }
63 if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
64 cfg.external_id = Some(external_id);
65 };
66 if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
67 cfg.role_session_name = Some(session_name);
68 };
69 let s3_sse_key = m.remove(S3_SSE_KEY);
70 if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
71 match sse_type.to_lowercase().as_str() {
72 "none" => {}
74 "s3" => {
76 cfg.server_side_encryption = Some("AES256".to_string());
77 }
78 "kms" => {
80 cfg.server_side_encryption = Some("aws:kms".to_string());
81 cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
82 }
83 "custom" => {
85 cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
86 cfg.server_side_encryption_customer_key = s3_sse_key;
87 cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
88 }
89 _ => {
90 return Err(Error::new(
91 ErrorKind::DataInvalid,
92 format!(
93 "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
94 ),
95 ));
96 }
97 }
98 };
99
100 if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS)
101 && is_truthy(allow_anonymous.to_lowercase().as_str())
102 {
103 cfg.allow_anonymous = true;
104 }
105 if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA)
106 && is_truthy(disable_ec2_metadata.to_lowercase().as_str())
107 {
108 cfg.disable_ec2_metadata = true;
109 };
110 if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD)
111 && is_truthy(disable_config_load.to_lowercase().as_str())
112 {
113 cfg.disable_config_load = true;
114 };
115
116 Ok(cfg)
117}
118
119pub(crate) fn s3_config_build(
121 cfg: &S3Config,
122 customized_credential_load: &Option<CustomAwsCredentialLoader>,
123 path: &str,
124) -> Result<Operator> {
125 let url = Url::parse(path)?;
126 let bucket = url.host_str().ok_or_else(|| {
127 Error::new(
128 ErrorKind::DataInvalid,
129 format!("Invalid s3 url: {path}, missing bucket"),
130 )
131 })?;
132
133 let mut builder = cfg
134 .clone()
135 .into_builder()
136 .bucket(bucket);
138
139 if let Some(customized_credential_load) = customized_credential_load {
140 builder = builder
141 .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
142 }
143
144 Ok(Operator::new(builder)?.finish())
145}
146
147#[derive(Clone)]
152pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
153
154impl std::fmt::Debug for CustomAwsCredentialLoader {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.debug_struct("CustomAwsCredentialLoader")
157 .finish_non_exhaustive()
158 }
159}
160
161impl CustomAwsCredentialLoader {
162 pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
164 Self(loader)
165 }
166
167 pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
169 Box::new(self)
170 }
171}
172
173#[async_trait]
174impl AwsCredentialLoad for CustomAwsCredentialLoader {
175 async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
176 self.0.load_credential(client).await
177 }
178}