iceberg_storage_opendal/
s3.rs1use std::collections::HashMap;
19use std::sync::Arc;
20
21use iceberg::io::{
22 CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
23 S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD,
24 S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY,
25 S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE,
26};
27use iceberg::{Error, ErrorKind, Result};
28use opendal::services::S3Config;
29use opendal::{Configurator, Operator};
30pub use reqsign_aws_v4::Credential as AwsCredential;
32pub use reqsign_core::ProvideCredential;
34use reqsign_core::{ProvideCredentialChain, ProvideCredentialDyn};
35use url::Url;
36
37use crate::utils::{from_opendal_error, is_truthy};
38
39pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
41 let mut cfg = S3Config::default();
42 cfg.enable_virtual_host_style = true;
48 if let Some(endpoint) = m.remove(S3_ENDPOINT) {
49 cfg.endpoint = Some(endpoint);
50 };
51 if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
52 cfg.access_key_id = Some(access_key_id);
53 };
54 if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
55 cfg.secret_access_key = Some(secret_access_key);
56 };
57 if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
58 cfg.session_token = Some(session_token);
59 };
60 if let Some(region) = m.remove(S3_REGION) {
61 cfg.region = Some(region);
62 };
63 if let Some(region) = m.remove(CLIENT_REGION) {
64 cfg.region = Some(region);
65 };
66 if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
67 cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
68 };
69 if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
70 cfg.role_arn = Some(arn);
71 }
72 if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
73 cfg.external_id = Some(external_id);
74 };
75 if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
76 cfg.role_session_name = Some(session_name);
77 };
78 let s3_sse_key = m.remove(S3_SSE_KEY);
79 if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
80 match sse_type.to_lowercase().as_str() {
81 "none" => {}
83 "s3" => {
85 cfg.server_side_encryption = Some("AES256".to_string());
86 }
87 "kms" => {
89 cfg.server_side_encryption = Some("aws:kms".to_string());
90 cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
91 }
92 "custom" => {
94 cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
95 cfg.server_side_encryption_customer_key = s3_sse_key;
96 cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
97 }
98 _ => {
99 return Err(Error::new(
100 ErrorKind::DataInvalid,
101 format!(
102 "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
103 ),
104 ));
105 }
106 }
107 };
108
109 if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS)
110 && is_truthy(allow_anonymous.to_lowercase().as_str())
111 {
112 cfg.allow_anonymous = true;
113 }
114 if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA)
115 && is_truthy(disable_ec2_metadata.to_lowercase().as_str())
116 {
117 cfg.disable_ec2_metadata = true;
118 };
119 if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD)
120 && is_truthy(disable_config_load.to_lowercase().as_str())
121 {
122 cfg.disable_config_load = true;
123 };
124
125 Ok(cfg)
126}
127
128pub(crate) fn s3_config_build(
130 cfg: &S3Config,
131 customized_credential_load: &Option<CustomAwsCredentialLoader>,
132 path: &str,
133) -> Result<Operator> {
134 let url = Url::parse(path)?;
135 let bucket = url.host_str().ok_or_else(|| {
136 Error::new(
137 ErrorKind::DataInvalid,
138 format!("Invalid s3 url: {path}, missing bucket"),
139 )
140 })?;
141
142 let mut builder = cfg
143 .clone()
144 .into_builder()
145 .bucket(bucket);
147
148 if let Some(loader) = customized_credential_load {
149 let chain = ProvideCredentialChain::new().push(Arc::clone(&loader.0));
150 builder = builder.credential_provider_chain(chain);
151 }
152
153 Ok(Operator::new(builder).map_err(from_opendal_error)?.finish())
154}
155
156pub struct CustomAwsCredentialLoader(Arc<dyn ProvideCredentialDyn<Credential = AwsCredential>>);
162
163impl Clone for CustomAwsCredentialLoader {
164 fn clone(&self) -> Self {
165 Self(Arc::clone(&self.0))
166 }
167}
168
169impl std::fmt::Debug for CustomAwsCredentialLoader {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("CustomAwsCredentialLoader")
172 .finish_non_exhaustive()
173 }
174}
175
176impl CustomAwsCredentialLoader {
177 pub fn new(provider: impl ProvideCredential<Credential = AwsCredential> + 'static) -> Self {
179 Self(Arc::new(provider) as Arc<dyn ProvideCredentialDyn<Credential = AwsCredential>>)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use std::collections::HashMap;
186
187 use iceberg::io::S3_PATH_STYLE_ACCESS;
188
189 use super::s3_config_parse;
190
191 fn parse_with(prop: Option<&str>) -> bool {
192 let mut props = HashMap::new();
193 if let Some(v) = prop {
194 props.insert(S3_PATH_STYLE_ACCESS.to_string(), v.to_string());
195 }
196 s3_config_parse(props).unwrap().enable_virtual_host_style
197 }
198
199 #[test]
200 fn s3_config_parse_path_style_access() {
201 assert!(parse_with(None));
203 assert!(parse_with(Some("false")));
204 assert!(!parse_with(Some("true")));
205 }
206}