iceberg_storage_opendal/
s3.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use iceberg::io::{
22    CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
23    S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD,
24    S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY,
25    S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE,
26};
27use iceberg::{Error, ErrorKind, Result};
28use opendal::services::S3Config;
29use opendal::{Configurator, Operator};
30/// AWS credentials: access key ID, secret access key, and optional session token.
31pub use reqsign_aws_v4::Credential as AwsCredential;
32/// Trait for types that can asynchronously supply [`AwsCredential`] to a [`CustomAwsCredentialLoader`].
33pub use reqsign_core::ProvideCredential;
34use reqsign_core::{ProvideCredentialChain, ProvideCredentialDyn};
35use url::Url;
36
37use crate::utils::{from_opendal_error, is_truthy};
38
39/// Parse iceberg props to s3 config.
40pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
41    let mut cfg = S3Config::default();
42    // Match Iceberg `S3FileIOProperties.PATH_STYLE_ACCESS_DEFAULT = false`:
43    // virtual-host-style addressing is the spec default. opendal's own
44    // default is path-style, which disagrees with the Java SDK and breaks
45    // S3-compatible stores that only accept virtual-hosted-style URLs.
46    // Any explicit `s3.path-style-access` property below overrides this.
47    cfg.enable_virtual_host_style = true;
48    if let Some(endpoint) = m.remove(S3_ENDPOINT) {
49        cfg.endpoint = Some(endpoint);
50    };
51    if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
52        cfg.access_key_id = Some(access_key_id);
53    };
54    if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
55        cfg.secret_access_key = Some(secret_access_key);
56    };
57    if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
58        cfg.session_token = Some(session_token);
59    };
60    if let Some(region) = m.remove(S3_REGION) {
61        cfg.region = Some(region);
62    };
63    if let Some(region) = m.remove(CLIENT_REGION) {
64        cfg.region = Some(region);
65    };
66    if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
67        cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
68    };
69    if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
70        cfg.role_arn = Some(arn);
71    }
72    if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
73        cfg.external_id = Some(external_id);
74    };
75    if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
76        cfg.role_session_name = Some(session_name);
77    };
78    let s3_sse_key = m.remove(S3_SSE_KEY);
79    if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
80        match sse_type.to_lowercase().as_str() {
81            // No Server Side Encryption
82            "none" => {}
83            // S3 SSE-S3 encryption (S3 managed keys). https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
84            "s3" => {
85                cfg.server_side_encryption = Some("AES256".to_string());
86            }
87            // S3 SSE KMS, either using default or custom KMS key. https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
88            "kms" => {
89                cfg.server_side_encryption = Some("aws:kms".to_string());
90                cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
91            }
92            // S3 SSE-C, using customer managed keys. https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
93            "custom" => {
94                cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
95                cfg.server_side_encryption_customer_key = s3_sse_key;
96                cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
97            }
98            _ => {
99                return Err(Error::new(
100                    ErrorKind::DataInvalid,
101                    format!(
102                        "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
103                    ),
104                ));
105            }
106        }
107    };
108
109    if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS)
110        && is_truthy(allow_anonymous.to_lowercase().as_str())
111    {
112        cfg.allow_anonymous = true;
113    }
114    if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA)
115        && is_truthy(disable_ec2_metadata.to_lowercase().as_str())
116    {
117        cfg.disable_ec2_metadata = true;
118    };
119    if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD)
120        && is_truthy(disable_config_load.to_lowercase().as_str())
121    {
122        cfg.disable_config_load = true;
123    };
124
125    Ok(cfg)
126}
127
128/// Build new opendal operator from give path.
129pub(crate) fn s3_config_build(
130    cfg: &S3Config,
131    customized_credential_load: &Option<CustomAwsCredentialLoader>,
132    path: &str,
133) -> Result<Operator> {
134    let url = Url::parse(path)?;
135    let bucket = url.host_str().ok_or_else(|| {
136        Error::new(
137            ErrorKind::DataInvalid,
138            format!("Invalid s3 url: {path}, missing bucket"),
139        )
140    })?;
141
142    let mut builder = cfg
143        .clone()
144        .into_builder()
145        // Set bucket name.
146        .bucket(bucket);
147
148    if let Some(loader) = customized_credential_load {
149        let chain = ProvideCredentialChain::new().push(Arc::clone(&loader.0));
150        builder = builder.credential_provider_chain(chain);
151    }
152
153    Ok(Operator::new(builder).map_err(from_opendal_error)?.finish())
154}
155
156/// Custom AWS credential loader.
157///
158/// Wraps any [`ProvideCredential`] implementation for use with the S3 storage backend.
159/// Use [`CustomAwsCredentialLoader::new`] to create one, then pass it to
160/// [`OpenDalStorageFactory::S3`](crate::OpenDalStorageFactory).
161pub struct CustomAwsCredentialLoader(Arc<dyn ProvideCredentialDyn<Credential = AwsCredential>>);
162
163impl Clone for CustomAwsCredentialLoader {
164    fn clone(&self) -> Self {
165        Self(Arc::clone(&self.0))
166    }
167}
168
169impl std::fmt::Debug for CustomAwsCredentialLoader {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("CustomAwsCredentialLoader")
172            .finish_non_exhaustive()
173    }
174}
175
176impl CustomAwsCredentialLoader {
177    /// Create a new custom AWS credential loader from any [`ProvideCredential`] implementation.
178    pub fn new(provider: impl ProvideCredential<Credential = AwsCredential> + 'static) -> Self {
179        Self(Arc::new(provider) as Arc<dyn ProvideCredentialDyn<Credential = AwsCredential>>)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use std::collections::HashMap;
186
187    use iceberg::io::S3_PATH_STYLE_ACCESS;
188
189    use super::s3_config_parse;
190
191    fn parse_with(prop: Option<&str>) -> bool {
192        let mut props = HashMap::new();
193        if let Some(v) = prop {
194            props.insert(S3_PATH_STYLE_ACCESS.to_string(), v.to_string());
195        }
196        s3_config_parse(props).unwrap().enable_virtual_host_style
197    }
198
199    #[test]
200    fn s3_config_parse_path_style_access() {
201        // Match Iceberg S3FileIOProperties.PATH_STYLE_ACCESS_DEFAULT = false.
202        assert!(parse_with(None));
203        assert!(parse_with(Some("false")));
204        assert!(!parse_with(Some("true")));
205    }
206}