iceberg/io/storage/opendal/
s3.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use opendal::services::S3Config;
23use opendal::{Configurator, Operator};
24pub use reqsign::{AwsCredential, AwsCredentialLoad};
25use reqwest::Client;
26use url::Url;
27
28use crate::io::{
29    CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
30    S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD,
31    S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY,
32    S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, is_truthy,
33};
34use crate::{Error, ErrorKind, Result};
35
36/// Parse iceberg props to s3 config.
37pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
38    let mut cfg = S3Config::default();
39    if let Some(endpoint) = m.remove(S3_ENDPOINT) {
40        cfg.endpoint = Some(endpoint);
41    };
42    if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
43        cfg.access_key_id = Some(access_key_id);
44    };
45    if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
46        cfg.secret_access_key = Some(secret_access_key);
47    };
48    if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
49        cfg.session_token = Some(session_token);
50    };
51    if let Some(region) = m.remove(S3_REGION) {
52        cfg.region = Some(region);
53    };
54    if let Some(region) = m.remove(CLIENT_REGION) {
55        cfg.region = Some(region);
56    };
57    if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
58        cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
59    };
60    if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
61        cfg.role_arn = Some(arn);
62    }
63    if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
64        cfg.external_id = Some(external_id);
65    };
66    if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
67        cfg.role_session_name = Some(session_name);
68    };
69    let s3_sse_key = m.remove(S3_SSE_KEY);
70    if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
71        match sse_type.to_lowercase().as_str() {
72            // No Server Side Encryption
73            "none" => {}
74            // S3 SSE-S3 encryption (S3 managed keys). https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
75            "s3" => {
76                cfg.server_side_encryption = Some("AES256".to_string());
77            }
78            // S3 SSE KMS, either using default or custom KMS key. https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
79            "kms" => {
80                cfg.server_side_encryption = Some("aws:kms".to_string());
81                cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
82            }
83            // S3 SSE-C, using customer managed keys. https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
84            "custom" => {
85                cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
86                cfg.server_side_encryption_customer_key = s3_sse_key;
87                cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
88            }
89            _ => {
90                return Err(Error::new(
91                    ErrorKind::DataInvalid,
92                    format!(
93                        "Invalid {S3_SSE_TYPE}: {sse_type}. Expected one of (custom, kms, s3, none)"
94                    ),
95                ));
96            }
97        }
98    };
99
100    if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS)
101        && is_truthy(allow_anonymous.to_lowercase().as_str())
102    {
103        cfg.allow_anonymous = true;
104    }
105    if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA)
106        && is_truthy(disable_ec2_metadata.to_lowercase().as_str())
107    {
108        cfg.disable_ec2_metadata = true;
109    };
110    if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD)
111        && is_truthy(disable_config_load.to_lowercase().as_str())
112    {
113        cfg.disable_config_load = true;
114    };
115
116    Ok(cfg)
117}
118
119/// Build new opendal operator from give path.
120pub(crate) fn s3_config_build(
121    cfg: &S3Config,
122    customized_credential_load: &Option<CustomAwsCredentialLoader>,
123    path: &str,
124) -> Result<Operator> {
125    let url = Url::parse(path)?;
126    let bucket = url.host_str().ok_or_else(|| {
127        Error::new(
128            ErrorKind::DataInvalid,
129            format!("Invalid s3 url: {path}, missing bucket"),
130        )
131    })?;
132
133    let mut builder = cfg
134        .clone()
135        .into_builder()
136        // Set bucket name.
137        .bucket(bucket);
138
139    if let Some(customized_credential_load) = customized_credential_load {
140        builder = builder
141            .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
142    }
143
144    Ok(Operator::new(builder)?.finish())
145}
146
147/// Custom AWS credential loader.
148/// This can be used to load credentials from a custom source, such as the AWS SDK.
149///
150/// This should be set as an extension on `FileIOBuilder`.
151#[derive(Clone)]
152pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
153
154impl std::fmt::Debug for CustomAwsCredentialLoader {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct("CustomAwsCredentialLoader")
157            .finish_non_exhaustive()
158    }
159}
160
161impl CustomAwsCredentialLoader {
162    /// Create a new custom AWS credential loader.
163    pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
164        Self(loader)
165    }
166
167    /// Convert this loader into an opendal compatible loader for customized AWS credentials.
168    pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
169        Box::new(self)
170    }
171}
172
173#[async_trait]
174impl AwsCredentialLoad for CustomAwsCredentialLoader {
175    async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
176        self.0.load_credential(client).await
177    }
178}