iceberg/io/
storage_gcs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17//! Google Cloud Storage properties
18
19use std::collections::HashMap;
20
21use opendal::Operator;
22use opendal::services::GcsConfig;
23use url::Url;
24
25use crate::io::is_truthy;
26use crate::{Error, ErrorKind, Result};
27
28// Reference: https://github.com/apache/iceberg/blob/main/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
29
30/// Google Cloud Project ID
31pub const GCS_PROJECT_ID: &str = "gcs.project-id";
32/// Google Cloud Storage endpoint
33pub const GCS_SERVICE_PATH: &str = "gcs.service.path";
34/// Google Cloud user project
35pub const GCS_USER_PROJECT: &str = "gcs.user-project";
36/// Allow unauthenticated requests
37pub const GCS_NO_AUTH: &str = "gcs.no-auth";
38/// Google Cloud Storage credentials JSON string, base64 encoded.
39///
40/// E.g. base64::prelude::BASE64_STANDARD.encode(serde_json::to_string(credential).as_bytes())
41pub const GCS_CREDENTIALS_JSON: &str = "gcs.credentials-json";
42/// Google Cloud Storage token
43pub const GCS_TOKEN: &str = "gcs.oauth2.token";
44
45/// Option to skip signing requests (e.g. for public buckets/folders).
46pub const GCS_ALLOW_ANONYMOUS: &str = "gcs.allow-anonymous";
47/// Option to skip loading the credential from GCE metadata server (typically used in conjunction with `GCS_ALLOW_ANONYMOUS`).
48pub const GCS_DISABLE_VM_METADATA: &str = "gcs.disable-vm-metadata";
49/// Option to skip loading configuration from config file and the env.
50pub const GCS_DISABLE_CONFIG_LOAD: &str = "gcs.disable-config-load";
51
52/// Parse iceberg properties to [`GcsConfig`].
53pub(crate) fn gcs_config_parse(mut m: HashMap<String, String>) -> Result<GcsConfig> {
54    let mut cfg = GcsConfig::default();
55
56    if let Some(cred) = m.remove(GCS_CREDENTIALS_JSON) {
57        cfg.credential = Some(cred);
58    }
59
60    if let Some(token) = m.remove(GCS_TOKEN) {
61        cfg.token = Some(token);
62    }
63
64    if let Some(endpoint) = m.remove(GCS_SERVICE_PATH) {
65        cfg.endpoint = Some(endpoint);
66    }
67
68    if m.remove(GCS_NO_AUTH).is_some() {
69        cfg.allow_anonymous = true;
70        cfg.disable_vm_metadata = true;
71        cfg.disable_config_load = true;
72    }
73
74    if let Some(allow_anonymous) = m.remove(GCS_ALLOW_ANONYMOUS) {
75        if is_truthy(allow_anonymous.to_lowercase().as_str()) {
76            cfg.allow_anonymous = true;
77        }
78    }
79    if let Some(disable_ec2_metadata) = m.remove(GCS_DISABLE_VM_METADATA) {
80        if is_truthy(disable_ec2_metadata.to_lowercase().as_str()) {
81            cfg.disable_vm_metadata = true;
82        }
83    };
84    if let Some(disable_config_load) = m.remove(GCS_DISABLE_CONFIG_LOAD) {
85        if is_truthy(disable_config_load.to_lowercase().as_str()) {
86            cfg.disable_config_load = true;
87        }
88    };
89
90    Ok(cfg)
91}
92
93/// Build a new OpenDAL [`Operator`] based on a provided [`GcsConfig`].
94pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result<Operator> {
95    let url = Url::parse(path)?;
96    let bucket = url.host_str().ok_or_else(|| {
97        Error::new(
98            ErrorKind::DataInvalid,
99            format!("Invalid gcs url: {}, bucket is required", path),
100        )
101    })?;
102
103    let mut cfg = cfg.clone();
104    cfg.bucket = bucket.to_string();
105    Ok(Operator::from_config(cfg)?.finish())
106}