1use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use opendal::services::S3Config;
23use opendal::{Configurator, Operator};
24pub use reqsign::{AwsCredential, AwsCredentialLoad};
25use reqwest::Client;
26use url::Url;
27
28use crate::io::is_truthy;
29use crate::{Error, ErrorKind, Result};
30
31pub const S3_ENDPOINT: &str = "s3.endpoint";
34pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
36pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
38pub const S3_SESSION_TOKEN: &str = "s3.session-token";
41pub const S3_REGION: &str = "s3.region";
43pub const CLIENT_REGION: &str = "client.region";
47pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access";
49pub const S3_SSE_TYPE: &str = "s3.sse.type";
51pub const S3_SSE_KEY: &str = "s3.sse.key";
56pub const S3_SSE_MD5: &str = "s3.sse.md5";
58pub const S3_ASSUME_ROLE_ARN: &str = "client.assume-role.arn";
61pub const S3_ASSUME_ROLE_EXTERNAL_ID: &str = "client.assume-role.external-id";
63pub const S3_ASSUME_ROLE_SESSION_NAME: &str = "client.assume-role.session-name";
65pub const S3_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
67pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata";
70pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable-config-load";
72
73pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
75 let mut cfg = S3Config::default();
76 if let Some(endpoint) = m.remove(S3_ENDPOINT) {
77 cfg.endpoint = Some(endpoint);
78 };
79 if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
80 cfg.access_key_id = Some(access_key_id);
81 };
82 if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
83 cfg.secret_access_key = Some(secret_access_key);
84 };
85 if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
86 cfg.session_token = Some(session_token);
87 };
88 if let Some(region) = m.remove(S3_REGION) {
89 cfg.region = Some(region);
90 };
91 if let Some(region) = m.remove(CLIENT_REGION) {
92 cfg.region = Some(region);
93 };
94 if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
95 cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
96 };
97 if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
98 cfg.role_arn = Some(arn);
99 }
100 if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
101 cfg.external_id = Some(external_id);
102 };
103 if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
104 cfg.role_session_name = Some(session_name);
105 };
106 let s3_sse_key = m.remove(S3_SSE_KEY);
107 if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
108 match sse_type.to_lowercase().as_str() {
109 "none" => {}
111 "s3" => {
113 cfg.server_side_encryption = Some("AES256".to_string());
114 }
115 "kms" => {
117 cfg.server_side_encryption = Some("aws:kms".to_string());
118 cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
119 }
120 "custom" => {
122 cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
123 cfg.server_side_encryption_customer_key = s3_sse_key;
124 cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
125 }
126 _ => {
127 return Err(Error::new(
128 ErrorKind::DataInvalid,
129 format!(
130 "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
131 ),
132 ));
133 }
134 }
135 };
136
137 if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS) {
138 if is_truthy(allow_anonymous.to_lowercase().as_str()) {
139 cfg.allow_anonymous = true;
140 }
141 }
142 if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA) {
143 if is_truthy(disable_ec2_metadata.to_lowercase().as_str()) {
144 cfg.disable_ec2_metadata = true;
145 }
146 };
147 if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD) {
148 if is_truthy(disable_config_load.to_lowercase().as_str()) {
149 cfg.disable_config_load = true;
150 }
151 };
152
153 Ok(cfg)
154}
155
156pub(crate) fn s3_config_build(
158 cfg: &S3Config,
159 customized_credential_load: &Option<CustomAwsCredentialLoader>,
160 path: &str,
161) -> Result<Operator> {
162 let url = Url::parse(path)?;
163 let bucket = url.host_str().ok_or_else(|| {
164 Error::new(
165 ErrorKind::DataInvalid,
166 format!("Invalid s3 url: {path}, missing bucket"),
167 )
168 })?;
169
170 let mut builder = cfg
171 .clone()
172 .into_builder()
173 .bucket(bucket);
175
176 if let Some(customized_credential_load) = customized_credential_load {
177 builder = builder
178 .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
179 }
180
181 Ok(Operator::new(builder)?.finish())
182}
183
184#[derive(Clone)]
189pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
190
191impl std::fmt::Debug for CustomAwsCredentialLoader {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 f.debug_struct("CustomAwsCredentialLoader")
194 .finish_non_exhaustive()
195 }
196}
197
198impl CustomAwsCredentialLoader {
199 pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
201 Self(loader)
202 }
203
204 pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
206 Box::new(self)
207 }
208}
209
210#[async_trait]
211impl AwsCredentialLoad for CustomAwsCredentialLoader {
212 async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
213 self.0.load_credential(client).await
214 }
215}