iceberg_storage_opendal/
s3.rs1use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use iceberg::io::{
23 CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
24 S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD,
25 S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY,
26 S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE,
27};
28use iceberg::{Error, ErrorKind, Result};
29use opendal::services::S3Config;
30use opendal::{Configurator, Operator};
31pub use reqsign::{AwsCredential, AwsCredentialLoad};
32use reqwest::Client;
33use url::Url;
34
35use crate::utils::{from_opendal_error, is_truthy};
36
37pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
39 let mut cfg = S3Config::default();
40 cfg.enable_virtual_host_style = true;
46 if let Some(endpoint) = m.remove(S3_ENDPOINT) {
47 cfg.endpoint = Some(endpoint);
48 };
49 if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
50 cfg.access_key_id = Some(access_key_id);
51 };
52 if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
53 cfg.secret_access_key = Some(secret_access_key);
54 };
55 if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
56 cfg.session_token = Some(session_token);
57 };
58 if let Some(region) = m.remove(S3_REGION) {
59 cfg.region = Some(region);
60 };
61 if let Some(region) = m.remove(CLIENT_REGION) {
62 cfg.region = Some(region);
63 };
64 if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
65 cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
66 };
67 if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
68 cfg.role_arn = Some(arn);
69 }
70 if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
71 cfg.external_id = Some(external_id);
72 };
73 if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
74 cfg.role_session_name = Some(session_name);
75 };
76 let s3_sse_key = m.remove(S3_SSE_KEY);
77 if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
78 match sse_type.to_lowercase().as_str() {
79 "none" => {}
81 "s3" => {
83 cfg.server_side_encryption = Some("AES256".to_string());
84 }
85 "kms" => {
87 cfg.server_side_encryption = Some("aws:kms".to_string());
88 cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
89 }
90 "custom" => {
92 cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
93 cfg.server_side_encryption_customer_key = s3_sse_key;
94 cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
95 }
96 _ => {
97 return Err(Error::new(
98 ErrorKind::DataInvalid,
99 format!(
100 "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
101 ),
102 ));
103 }
104 }
105 };
106
107 if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS)
108 && is_truthy(allow_anonymous.to_lowercase().as_str())
109 {
110 cfg.allow_anonymous = true;
111 }
112 if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA)
113 && is_truthy(disable_ec2_metadata.to_lowercase().as_str())
114 {
115 cfg.disable_ec2_metadata = true;
116 };
117 if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD)
118 && is_truthy(disable_config_load.to_lowercase().as_str())
119 {
120 cfg.disable_config_load = true;
121 };
122
123 Ok(cfg)
124}
125
126pub(crate) fn s3_config_build(
128 cfg: &S3Config,
129 customized_credential_load: &Option<CustomAwsCredentialLoader>,
130 path: &str,
131) -> Result<Operator> {
132 let url = Url::parse(path)?;
133 let bucket = url.host_str().ok_or_else(|| {
134 Error::new(
135 ErrorKind::DataInvalid,
136 format!("Invalid s3 url: {path}, missing bucket"),
137 )
138 })?;
139
140 let mut builder = cfg
141 .clone()
142 .into_builder()
143 .bucket(bucket);
145
146 if let Some(customized_credential_load) = customized_credential_load {
147 builder = builder
148 .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
149 }
150
151 Ok(Operator::new(builder).map_err(from_opendal_error)?.finish())
152}
153
154#[derive(Clone)]
159pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
160
161impl std::fmt::Debug for CustomAwsCredentialLoader {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("CustomAwsCredentialLoader")
164 .finish_non_exhaustive()
165 }
166}
167
168impl CustomAwsCredentialLoader {
169 pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
171 Self(loader)
172 }
173
174 pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
176 Box::new(self)
177 }
178}
179
180#[async_trait]
181impl AwsCredentialLoad for CustomAwsCredentialLoader {
182 async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
183 self.0.load_credential(client).await
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use std::collections::HashMap;
190
191 use iceberg::io::S3_PATH_STYLE_ACCESS;
192
193 use super::s3_config_parse;
194
195 fn parse_with(prop: Option<&str>) -> bool {
196 let mut props = HashMap::new();
197 if let Some(v) = prop {
198 props.insert(S3_PATH_STYLE_ACCESS.to_string(), v.to_string());
199 }
200 s3_config_parse(props).unwrap().enable_virtual_host_style
201 }
202
203 #[test]
204 fn s3_config_parse_path_style_access() {
205 assert!(parse_with(None));
207 assert!(parse_with(Some("false")));
208 assert!(!parse_with(Some("true")));
209 }
210}