iceberg_storage_opendal/
s3.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use iceberg::io::{
23    CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
24    S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD,
25    S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY,
26    S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE,
27};
28use iceberg::{Error, ErrorKind, Result};
29use opendal::services::S3Config;
30use opendal::{Configurator, Operator};
31pub use reqsign::{AwsCredential, AwsCredentialLoad};
32use reqwest::Client;
33use url::Url;
34
35use crate::utils::{from_opendal_error, is_truthy};
36
37/// Parse iceberg props to s3 config.
38pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
39    let mut cfg = S3Config::default();
40    // Match Iceberg `S3FileIOProperties.PATH_STYLE_ACCESS_DEFAULT = false`:
41    // virtual-host-style addressing is the spec default. opendal's own
42    // default is path-style, which disagrees with the Java SDK and breaks
43    // S3-compatible stores that only accept virtual-hosted-style URLs.
44    // Any explicit `s3.path-style-access` property below overrides this.
45    cfg.enable_virtual_host_style = true;
46    if let Some(endpoint) = m.remove(S3_ENDPOINT) {
47        cfg.endpoint = Some(endpoint);
48    };
49    if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
50        cfg.access_key_id = Some(access_key_id);
51    };
52    if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
53        cfg.secret_access_key = Some(secret_access_key);
54    };
55    if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
56        cfg.session_token = Some(session_token);
57    };
58    if let Some(region) = m.remove(S3_REGION) {
59        cfg.region = Some(region);
60    };
61    if let Some(region) = m.remove(CLIENT_REGION) {
62        cfg.region = Some(region);
63    };
64    if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
65        cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
66    };
67    if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
68        cfg.role_arn = Some(arn);
69    }
70    if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
71        cfg.external_id = Some(external_id);
72    };
73    if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
74        cfg.role_session_name = Some(session_name);
75    };
76    let s3_sse_key = m.remove(S3_SSE_KEY);
77    if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
78        match sse_type.to_lowercase().as_str() {
79            // No Server Side Encryption
80            "none" => {}
81            // S3 SSE-S3 encryption (S3 managed keys). https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
82            "s3" => {
83                cfg.server_side_encryption = Some("AES256".to_string());
84            }
85            // S3 SSE KMS, either using default or custom KMS key. https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
86            "kms" => {
87                cfg.server_side_encryption = Some("aws:kms".to_string());
88                cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
89            }
90            // S3 SSE-C, using customer managed keys. https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
91            "custom" => {
92                cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
93                cfg.server_side_encryption_customer_key = s3_sse_key;
94                cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
95            }
96            _ => {
97                return Err(Error::new(
98                    ErrorKind::DataInvalid,
99                    format!(
100                        "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
101                    ),
102                ));
103            }
104        }
105    };
106
107    if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS)
108        && is_truthy(allow_anonymous.to_lowercase().as_str())
109    {
110        cfg.allow_anonymous = true;
111    }
112    if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA)
113        && is_truthy(disable_ec2_metadata.to_lowercase().as_str())
114    {
115        cfg.disable_ec2_metadata = true;
116    };
117    if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD)
118        && is_truthy(disable_config_load.to_lowercase().as_str())
119    {
120        cfg.disable_config_load = true;
121    };
122
123    Ok(cfg)
124}
125
126/// Build new opendal operator from give path.
127pub(crate) fn s3_config_build(
128    cfg: &S3Config,
129    customized_credential_load: &Option<CustomAwsCredentialLoader>,
130    path: &str,
131) -> Result<Operator> {
132    let url = Url::parse(path)?;
133    let bucket = url.host_str().ok_or_else(|| {
134        Error::new(
135            ErrorKind::DataInvalid,
136            format!("Invalid s3 url: {path}, missing bucket"),
137        )
138    })?;
139
140    let mut builder = cfg
141        .clone()
142        .into_builder()
143        // Set bucket name.
144        .bucket(bucket);
145
146    if let Some(customized_credential_load) = customized_credential_load {
147        builder = builder
148            .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
149    }
150
151    Ok(Operator::new(builder).map_err(from_opendal_error)?.finish())
152}
153
154/// Custom AWS credential loader.
155/// This can be used to load credentials from a custom source, such as the AWS SDK.
156///
157/// This should be set as an extension on `FileIOBuilder`.
158#[derive(Clone)]
159pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
160
161impl std::fmt::Debug for CustomAwsCredentialLoader {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("CustomAwsCredentialLoader")
164            .finish_non_exhaustive()
165    }
166}
167
168impl CustomAwsCredentialLoader {
169    /// Create a new custom AWS credential loader.
170    pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
171        Self(loader)
172    }
173
174    /// Convert this loader into an opendal compatible loader for customized AWS credentials.
175    pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
176        Box::new(self)
177    }
178}
179
180#[async_trait]
181impl AwsCredentialLoad for CustomAwsCredentialLoader {
182    async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
183        self.0.load_credential(client).await
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use std::collections::HashMap;
190
191    use iceberg::io::S3_PATH_STYLE_ACCESS;
192
193    use super::s3_config_parse;
194
195    fn parse_with(prop: Option<&str>) -> bool {
196        let mut props = HashMap::new();
197        if let Some(v) = prop {
198            props.insert(S3_PATH_STYLE_ACCESS.to_string(), v.to_string());
199        }
200        s3_config_parse(props).unwrap().enable_virtual_host_style
201    }
202
203    #[test]
204    fn s3_config_parse_path_style_access() {
205        // Match Iceberg S3FileIOProperties.PATH_STYLE_ACCESS_DEFAULT = false.
206        assert!(parse_with(None));
207        assert!(parse_with(Some("false")));
208        assert!(!parse_with(Some("true")));
209    }
210}