iceberg/io/storage_oss.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::HashMap;
use opendal::services::OssConfig;
use opendal::{Configurator, Operator};
use url::Url;
use crate::{Error, ErrorKind, Result};
/// Required configuration arguments for creating an Aliyun OSS Operator with OpenDAL:
/// - `oss.endpoint`: The OSS service endpoint URL
/// - `oss.access-key-id`: The access key ID for authentication
/// - `oss.access-key-secret`: The access key secret for authentication
/// Aliyun oss endpoint.
pub const OSS_ENDPOINT: &str = "oss.endpoint";
/// Aliyun oss access key id.
pub const OSS_ACCESS_KEY_ID: &str = "oss.access-key-id";
/// Aliyun oss access key secret.
pub const OSS_ACCESS_KEY_SECRET: &str = "oss.access-key-secret";
/// Parse iceberg props to oss config.
pub(crate) fn oss_config_parse(mut m: HashMap<String, String>) -> Result<OssConfig> {
let mut cfg: OssConfig = OssConfig::default();
if let Some(endpoint) = m.remove(OSS_ENDPOINT) {
cfg.endpoint = Some(endpoint);
};
if let Some(access_key_id) = m.remove(OSS_ACCESS_KEY_ID) {
cfg.access_key_id = Some(access_key_id);
};
if let Some(access_key_secret) = m.remove(OSS_ACCESS_KEY_SECRET) {
cfg.access_key_secret = Some(access_key_secret);
};
Ok(cfg)
}
/// Build new opendal operator from give path.
pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) -> Result<Operator> {
let url = Url::parse(path)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid oss url: {}, missing bucket", path),
)
})?;
let builder = cfg.clone().into_builder().bucket(bucket);
Ok(Operator::new(builder)?.finish())
}