iceberg/io/
storage_s3.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use opendal::services::S3Config;
23use opendal::{Configurator, Operator};
24pub use reqsign::{AwsCredential, AwsCredentialLoad};
25use reqwest::Client;
26use url::Url;
27
28use crate::io::is_truthy;
29use crate::{Error, ErrorKind, Result};
30
31/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
32/// S3 endpoint.
33pub const S3_ENDPOINT: &str = "s3.endpoint";
34/// S3 access key id.
35pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
36/// S3 secret access key.
37pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
38/// S3 session token.
39/// This is required when using temporary credentials.
40pub const S3_SESSION_TOKEN: &str = "s3.session-token";
41/// S3 region.
42pub const S3_REGION: &str = "s3.region";
43/// Region to use for the S3 client.
44///
45/// This takes precedence over [`S3_REGION`].
46pub const CLIENT_REGION: &str = "client.region";
47/// S3 Path Style Access.
48pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access";
49/// S3 Server Side Encryption Type.
50pub const S3_SSE_TYPE: &str = "s3.sse.type";
51/// S3 Server Side Encryption Key.
52/// If S3 encryption type is kms, input is a KMS Key ID.
53/// In case this property is not set, default key "aws/s3" is used.
54/// If encryption type is custom, input is a custom base-64 AES256 symmetric key.
55pub const S3_SSE_KEY: &str = "s3.sse.key";
56/// S3 Server Side Encryption MD5.
57pub const S3_SSE_MD5: &str = "s3.sse.md5";
58/// If set, all AWS clients will assume a role of the given ARN, instead of using the default
59/// credential chain.
60pub const S3_ASSUME_ROLE_ARN: &str = "client.assume-role.arn";
61/// Optional external ID used to assume an IAM role.
62pub const S3_ASSUME_ROLE_EXTERNAL_ID: &str = "client.assume-role.external-id";
63/// Optional session name used to assume an IAM role.
64pub const S3_ASSUME_ROLE_SESSION_NAME: &str = "client.assume-role.session-name";
65/// Option to skip signing requests (e.g. for public buckets/folders).
66pub const S3_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
67/// Option to skip loading the credential from EC2 metadata (typically used in conjunction with
68/// `S3_ALLOW_ANONYMOUS`).
69pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata";
70/// Option to skip loading configuration from config file and the env.
71pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable-config-load";
72
73/// Parse iceberg props to s3 config.
74pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
75    let mut cfg = S3Config::default();
76    if let Some(endpoint) = m.remove(S3_ENDPOINT) {
77        cfg.endpoint = Some(endpoint);
78    };
79    if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
80        cfg.access_key_id = Some(access_key_id);
81    };
82    if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
83        cfg.secret_access_key = Some(secret_access_key);
84    };
85    if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
86        cfg.session_token = Some(session_token);
87    };
88    if let Some(region) = m.remove(S3_REGION) {
89        cfg.region = Some(region);
90    };
91    if let Some(region) = m.remove(CLIENT_REGION) {
92        cfg.region = Some(region);
93    };
94    if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
95        cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
96    };
97    if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
98        cfg.role_arn = Some(arn);
99    }
100    if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
101        cfg.external_id = Some(external_id);
102    };
103    if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
104        cfg.role_session_name = Some(session_name);
105    };
106    let s3_sse_key = m.remove(S3_SSE_KEY);
107    if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
108        match sse_type.to_lowercase().as_str() {
109            // No Server Side Encryption
110            "none" => {}
111            // S3 SSE-S3 encryption (S3 managed keys). https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
112            "s3" => {
113                cfg.server_side_encryption = Some("AES256".to_string());
114            }
115            // S3 SSE KMS, either using default or custom KMS key. https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
116            "kms" => {
117                cfg.server_side_encryption = Some("aws:kms".to_string());
118                cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
119            }
120            // S3 SSE-C, using customer managed keys. https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
121            "custom" => {
122                cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
123                cfg.server_side_encryption_customer_key = s3_sse_key;
124                cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
125            }
126            _ => {
127                return Err(Error::new(
128                    ErrorKind::DataInvalid,
129                    format!(
130                        "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
131                    ),
132                ));
133            }
134        }
135    };
136
137    if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS) {
138        if is_truthy(allow_anonymous.to_lowercase().as_str()) {
139            cfg.allow_anonymous = true;
140        }
141    }
142    if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA) {
143        if is_truthy(disable_ec2_metadata.to_lowercase().as_str()) {
144            cfg.disable_ec2_metadata = true;
145        }
146    };
147    if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD) {
148        if is_truthy(disable_config_load.to_lowercase().as_str()) {
149            cfg.disable_config_load = true;
150        }
151    };
152
153    Ok(cfg)
154}
155
156/// Build new opendal operator from give path.
157pub(crate) fn s3_config_build(
158    cfg: &S3Config,
159    customized_credential_load: &Option<CustomAwsCredentialLoader>,
160    path: &str,
161) -> Result<Operator> {
162    let url = Url::parse(path)?;
163    let bucket = url.host_str().ok_or_else(|| {
164        Error::new(
165            ErrorKind::DataInvalid,
166            format!("Invalid s3 url: {path}, missing bucket"),
167        )
168    })?;
169
170    let mut builder = cfg
171        .clone()
172        .into_builder()
173        // Set bucket name.
174        .bucket(bucket);
175
176    if let Some(customized_credential_load) = customized_credential_load {
177        builder = builder
178            .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
179    }
180
181    Ok(Operator::new(builder)?.finish())
182}
183
184/// Custom AWS credential loader.
185/// This can be used to load credentials from a custom source, such as the AWS SDK.
186///
187/// This should be set as an extension on `FileIOBuilder`.
188#[derive(Clone)]
189pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
190
191impl std::fmt::Debug for CustomAwsCredentialLoader {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        f.debug_struct("CustomAwsCredentialLoader")
194            .finish_non_exhaustive()
195    }
196}
197
198impl CustomAwsCredentialLoader {
199    /// Create a new custom AWS credential loader.
200    pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
201        Self(loader)
202    }
203
204    /// Convert this loader into an opendal compatible loader for customized AWS credentials.
205    pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
206        Box::new(self)
207    }
208}
209
210#[async_trait]
211impl AwsCredentialLoad for CustomAwsCredentialLoader {
212    async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
213        self.0.load_credential(client).await
214    }
215}