iceberg/io/
storage_azdls.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use opendal::Configurator;
23use opendal::services::AzdlsConfig;
24use url::Url;
25
26use crate::{Error, ErrorKind, Result, ensure_data_valid};
27
28/// A connection string.
29///
30/// Note, this string is parsed first, and any other passed adls.* properties
31/// will override values from the connection string.
32const ADLS_CONNECTION_STRING: &str = "adls.connection-string";
33
34/// The account that you want to connect to.
35pub const ADLS_ACCOUNT_NAME: &str = "adls.account-name";
36
37/// The key to authentication against the account.
38pub const ADLS_ACCOUNT_KEY: &str = "adls.account-key";
39
40/// The shared access signature.
41pub const ADLS_SAS_TOKEN: &str = "adls.sas-token";
42
43/// The tenant-id.
44pub const ADLS_TENANT_ID: &str = "adls.tenant-id";
45
46/// The client-id.
47pub const ADLS_CLIENT_ID: &str = "adls.client-id";
48
49/// The client-secret.
50pub const ADLS_CLIENT_SECRET: &str = "adls.client-secret";
51
52/// The authority host of the service principal.
53/// - required for client_credentials authentication
54/// - default value: `https://login.microsoftonline.com`
55pub const ADLS_AUTHORITY_HOST: &str = "adls.authority-host";
56
57/// Parses adls.* prefixed configuration properties.
58pub(crate) fn azdls_config_parse(mut properties: HashMap<String, String>) -> Result<AzdlsConfig> {
59    let mut config = AzdlsConfig::default();
60
61    if let Some(_conn_str) = properties.remove(ADLS_CONNECTION_STRING) {
62        return Err(Error::new(
63            ErrorKind::FeatureUnsupported,
64            "Azdls: connection string currently not supported",
65        ));
66    }
67
68    if let Some(account_name) = properties.remove(ADLS_ACCOUNT_NAME) {
69        config.account_name = Some(account_name);
70    }
71
72    if let Some(account_key) = properties.remove(ADLS_ACCOUNT_KEY) {
73        config.account_key = Some(account_key);
74    }
75
76    if let Some(sas_token) = properties.remove(ADLS_SAS_TOKEN) {
77        config.sas_token = Some(sas_token);
78    }
79
80    if let Some(tenant_id) = properties.remove(ADLS_TENANT_ID) {
81        config.tenant_id = Some(tenant_id);
82    }
83
84    if let Some(client_id) = properties.remove(ADLS_CLIENT_ID) {
85        config.client_id = Some(client_id);
86    }
87
88    if let Some(client_secret) = properties.remove(ADLS_CLIENT_SECRET) {
89        config.client_secret = Some(client_secret);
90    }
91
92    if let Some(authority_host) = properties.remove(ADLS_AUTHORITY_HOST) {
93        config.authority_host = Some(authority_host);
94    }
95
96    Ok(config)
97}
98
99/// Builds an OpenDAL operator from the AzdlsConfig and path.
100///
101/// The path is expected to include the scheme in a format like:
102/// `abfss://<myfs>@<myaccount>.dfs.core.windows.net/mydir/myfile.parquet`.
103pub(crate) fn azdls_create_operator<'a>(
104    absolute_path: &'a str,
105    config: &AzdlsConfig,
106    configured_scheme: &AzureStorageScheme,
107) -> Result<(opendal::Operator, &'a str)> {
108    let path = absolute_path.parse::<AzureStoragePath>()?;
109    match_path_with_config(&path, config, configured_scheme)?;
110
111    let op = azdls_config_build(config, &path)?;
112
113    // Paths to files in ADLS tend to be written in fully qualified form,
114    // including their filesystem and account name.
115    // OpenDAL's operator methods expect only the relative path, so we split it
116    // off and save it for later use.
117    let relative_path_len = path.path.len();
118    let (_, relative_path) = absolute_path.split_at(absolute_path.len() - relative_path_len);
119
120    Ok((op, relative_path))
121}
122
123/// Note that `abf[s]` and `wasb[s]` variants have different implications:
124/// - `abfs[s]` is used to refer to files in ADLS Gen2, backed by blob storage;
125///   paths are expected to contain the `dfs` storage service.
126/// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are
127///   expected to contain the `blob` storage service.
128#[derive(Debug, PartialEq)]
129pub(crate) enum AzureStorageScheme {
130    Abfs,
131    Abfss,
132    Wasb,
133    Wasbs,
134}
135
136impl AzureStorageScheme {
137    // Returns the respective encrypted or plain-text HTTP scheme.
138    pub fn as_http_scheme(&self) -> &str {
139        match self {
140            AzureStorageScheme::Abfs | AzureStorageScheme::Wasb => "http",
141            AzureStorageScheme::Abfss | AzureStorageScheme::Wasbs => "https",
142        }
143    }
144}
145
146impl Display for AzureStorageScheme {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        match self {
149            AzureStorageScheme::Abfs => write!(f, "abfs"),
150            AzureStorageScheme::Abfss => write!(f, "abfss"),
151            AzureStorageScheme::Wasb => write!(f, "wasb"),
152            AzureStorageScheme::Wasbs => write!(f, "wasbs"),
153        }
154    }
155}
156
157impl FromStr for AzureStorageScheme {
158    type Err = Error;
159
160    fn from_str(s: &str) -> Result<Self> {
161        match s {
162            "abfs" => Ok(AzureStorageScheme::Abfs),
163            "abfss" => Ok(AzureStorageScheme::Abfss),
164            "wasb" => Ok(AzureStorageScheme::Wasb),
165            "wasbs" => Ok(AzureStorageScheme::Wasbs),
166            _ => Err(Error::new(
167                ErrorKind::DataInvalid,
168                format!("Unexpected Azure Storage scheme: {}", s),
169            )),
170        }
171    }
172}
173
174/// Validates whether the given path matches what's configured for the backend.
175fn match_path_with_config(
176    path: &AzureStoragePath,
177    config: &AzdlsConfig,
178    configured_scheme: &AzureStorageScheme,
179) -> Result<()> {
180    ensure_data_valid!(
181        &path.scheme == configured_scheme,
182        "Storage::Azdls: Scheme mismatch: configured {}, passed {}",
183        configured_scheme,
184        path.scheme
185    );
186
187    if let Some(ref configured_account_name) = config.account_name {
188        ensure_data_valid!(
189            &path.account_name == configured_account_name,
190            "Storage::Azdls: Account name mismatch: configured {}, path {}",
191            configured_account_name,
192            path.account_name
193        );
194    }
195
196    if let Some(ref configured_endpoint) = config.endpoint {
197        let passed_http_scheme = path.scheme.as_http_scheme();
198        ensure_data_valid!(
199            configured_endpoint.starts_with(passed_http_scheme),
200            "Storage::Azdls: Endpoint {} does not use the expected http scheme {}.",
201            configured_endpoint,
202            passed_http_scheme
203        );
204
205        let ends_with_expected_suffix = configured_endpoint
206            .trim_end_matches('/')
207            .ends_with(&path.endpoint_suffix);
208        ensure_data_valid!(
209            ends_with_expected_suffix,
210            "Storage::Azdls: Endpoint suffix {} used with configured endpoint {}.",
211            path.endpoint_suffix,
212            configured_endpoint,
213        );
214    }
215
216    Ok(())
217}
218
219fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result<opendal::Operator> {
220    let mut builder = config.clone().into_builder();
221
222    if config.endpoint.is_none() {
223        // If no endpoint is provided, we construct it from the fully-qualified path.
224        builder = builder.endpoint(&path.as_endpoint());
225    }
226    builder = builder.filesystem(&path.filesystem);
227
228    Ok(opendal::Operator::new(builder)?.finish())
229}
230
231/// Represents a fully qualified path to blob/ file in Azure Storage.
232#[derive(Debug, PartialEq)]
233struct AzureStoragePath {
234    /// The scheme of the URL, e.g., `abfss`, `abfs`, `wasbs`, or `wasb`.
235    scheme: AzureStorageScheme,
236
237    /// Under Blob Storage, this is considered the _container_.
238    filesystem: String,
239
240    account_name: String,
241
242    /// The endpoint suffix, e.g., `core.windows.net` for the public cloud
243    /// endpoint.
244    endpoint_suffix: String,
245
246    /// Path to the file.
247    ///
248    /// It is relative to the `root` of the `AzdlsConfig`.
249    path: String,
250}
251
252impl AzureStoragePath {
253    /// Converts the AzureStoragePath into a full endpoint URL.
254    ///
255    /// This is possible because the path is fully qualified.
256    fn as_endpoint(&self) -> String {
257        format!(
258            "{}://{}.dfs.{}",
259            self.scheme.as_http_scheme(),
260            self.account_name,
261            self.endpoint_suffix
262        )
263    }
264}
265
266impl FromStr for AzureStoragePath {
267    type Err = Error;
268
269    fn from_str(path: &str) -> Result<Self> {
270        let url = Url::parse(path)?;
271
272        let filesystem = url.username();
273        ensure_data_valid!(
274            !filesystem.is_empty(),
275            "AzureStoragePath: No container or filesystem name in path: {}",
276            path
277        );
278
279        let (account_name, storage_service, endpoint_suffix) = parse_azure_storage_endpoint(&url)?;
280        let scheme = validate_storage_and_scheme(storage_service, url.scheme())?;
281
282        Ok(AzureStoragePath {
283            scheme,
284            filesystem: filesystem.to_string(),
285            account_name: account_name.to_string(),
286            endpoint_suffix: endpoint_suffix.to_string(),
287            path: url.path().to_string(),
288        })
289    }
290}
291
292fn parse_azure_storage_endpoint(url: &Url) -> Result<(&str, &str, &str)> {
293    let host = url.host_str().ok_or(Error::new(
294        ErrorKind::DataInvalid,
295        "AzureStoragePath: No host",
296    ))?;
297
298    let (account_name, endpoint) = host.split_once('.').ok_or(Error::new(
299        ErrorKind::DataInvalid,
300        "AzureStoragePath: No account name",
301    ))?;
302    if account_name.is_empty() {
303        return Err(Error::new(
304            ErrorKind::DataInvalid,
305            "AzureStoragePath: No account name",
306        ));
307    }
308
309    let (storage, endpoint_suffix) = endpoint.split_once('.').ok_or(Error::new(
310        ErrorKind::DataInvalid,
311        "AzureStoragePath: No storage service",
312    ))?;
313
314    Ok((account_name, storage, endpoint_suffix))
315}
316
317fn validate_storage_and_scheme(
318    storage_service: &str,
319    scheme_str: &str,
320) -> Result<AzureStorageScheme> {
321    let scheme = scheme_str.parse::<AzureStorageScheme>()?;
322    match scheme {
323        AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => {
324            ensure_data_valid!(
325                storage_service == "dfs",
326                "AzureStoragePath: Unexpected storage service for abfs[s]: {}",
327                storage_service
328            );
329            Ok(scheme)
330        }
331        AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => {
332            ensure_data_valid!(
333                storage_service == "blob",
334                "AzureStoragePath: Unexpected storage service for wasb[s]: {}",
335                storage_service
336            );
337            Ok(scheme)
338        }
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use std::collections::HashMap;
345
346    use opendal::services::AzdlsConfig;
347
348    use super::{AzureStoragePath, AzureStorageScheme, azdls_create_operator};
349    use crate::io::azdls_config_parse;
350
351    #[test]
352    fn test_azdls_config_parse() {
353        let test_cases = vec![
354            (
355                "account name and key",
356                HashMap::from([
357                    (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()),
358                    (super::ADLS_ACCOUNT_KEY.to_string(), "secret".to_string()),
359                ]),
360                Some(AzdlsConfig {
361                    account_name: Some("test".to_string()),
362                    account_key: Some("secret".to_string()),
363                    ..Default::default()
364                }),
365            ),
366            (
367                "account name and SAS token",
368                HashMap::from([
369                    (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()),
370                    (super::ADLS_SAS_TOKEN.to_string(), "token".to_string()),
371                ]),
372                Some(AzdlsConfig {
373                    account_name: Some("test".to_string()),
374                    sas_token: Some("token".to_string()),
375                    ..Default::default()
376                }),
377            ),
378            (
379                "account name and ADD credentials",
380                HashMap::from([
381                    (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()),
382                    (super::ADLS_CLIENT_ID.to_string(), "abcdef".to_string()),
383                    (super::ADLS_CLIENT_SECRET.to_string(), "secret".to_string()),
384                    (super::ADLS_TENANT_ID.to_string(), "12345".to_string()),
385                ]),
386                Some(AzdlsConfig {
387                    account_name: Some("test".to_string()),
388                    client_id: Some("abcdef".to_string()),
389                    client_secret: Some("secret".to_string()),
390                    tenant_id: Some("12345".to_string()),
391                    ..Default::default()
392                }),
393            ),
394        ];
395
396        for (name, properties, expected) in test_cases {
397            let config = azdls_config_parse(properties);
398            match expected {
399                Some(expected_config) => {
400                    assert!(config.is_ok(), "Test case {} failed: {:?}", name, config);
401                    assert_eq!(config.unwrap(), expected_config, "Test case: {}", name);
402                }
403                None => {
404                    assert!(config.is_err(), "Test case {} expected error.", name);
405                }
406            }
407        }
408    }
409
410    #[test]
411    fn test_azdls_create_operator() {
412        let test_cases = vec![
413            (
414                "basic",
415                (
416                    "abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
417                    AzdlsConfig {
418                        account_name: Some("myaccount".to_string()),
419                        endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
420                        ..Default::default()
421                    },
422                    AzureStorageScheme::Abfss,
423                ),
424                Some(("myfs", "/path/to/file.parquet")),
425            ),
426            (
427                "different account",
428                (
429                    "abfss://myfs@anotheraccount.dfs.core.windows.net/path/to/file.parquet",
430                    AzdlsConfig {
431                        account_name: Some("myaccount".to_string()),
432                        endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
433                        ..Default::default()
434                    },
435                    AzureStorageScheme::Abfss,
436                ),
437                None,
438            ),
439            (
440                "different scheme",
441                (
442                    "wasbs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
443                    AzdlsConfig {
444                        account_name: Some("myaccount".to_string()),
445                        endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
446                        ..Default::default()
447                    },
448                    AzureStorageScheme::Abfss,
449                ),
450                None,
451            ),
452            (
453                "incompatible scheme for endpoint",
454                (
455                    "abfs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
456                    AzdlsConfig {
457                        account_name: Some("myaccount".to_string()),
458                        endpoint: Some("http://myaccount.dfs.core.windows.net".to_string()),
459                        ..Default::default()
460                    },
461                    AzureStorageScheme::Abfss,
462                ),
463                None,
464            ),
465            (
466                "different endpoint suffix",
467                (
468                    "abfss://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet",
469                    AzdlsConfig {
470                        account_name: Some("myaccount".to_string()),
471                        endpoint: Some("https://myaccount.dfs.core.chinacloudapi.cn".to_string()),
472                        ..Default::default()
473                    },
474                    AzureStorageScheme::Abfss,
475                ),
476                None,
477            ),
478            (
479                "endpoint inferred from fully qualified path",
480                (
481                    "abfs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
482                    AzdlsConfig {
483                        filesystem: "myfs".to_string(),
484                        account_name: Some("myaccount".to_string()),
485                        endpoint: None,
486                        ..Default::default()
487                    },
488                    AzureStorageScheme::Abfs,
489                ),
490                Some(("myfs", "/path/to/file.parquet")),
491            ),
492        ];
493
494        for (name, input, expected) in test_cases {
495            let result = azdls_create_operator(input.0, &input.1, &input.2);
496            match expected {
497                Some((expected_filesystem, expected_path)) => {
498                    assert!(result.is_ok(), "Test case {} failed: {:?}", name, result);
499
500                    let (op, relative_path) = result.unwrap();
501                    assert_eq!(op.info().name(), expected_filesystem);
502                    assert_eq!(relative_path, expected_path);
503                }
504                None => {
505                    assert!(result.is_err(), "Test case {} expected error.", name);
506                }
507            }
508        }
509    }
510
511    #[test]
512    fn test_azure_storage_path_parse() {
513        let test_cases = vec![
514            (
515                "succeeds",
516                "abfss://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet",
517                Some(AzureStoragePath {
518                    scheme: AzureStorageScheme::Abfss,
519                    filesystem: "somefs".to_string(),
520                    account_name: "myaccount".to_string(),
521                    endpoint_suffix: "core.windows.net".to_string(),
522                    path: "/path/to/file.parquet".to_string(),
523                }),
524            ),
525            (
526                "unexpected scheme",
527                "adls://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet",
528                None,
529            ),
530            (
531                "no filesystem",
532                "abfss://myaccount.dfs.core.windows.net/path/to/file.parquet",
533                None,
534            ),
535            (
536                "no account name",
537                "abfs://myfs@dfs.core.windows.net/path/to/file.parquet",
538                None,
539            ),
540        ];
541
542        for (name, input, expected) in test_cases {
543            let result = input.parse::<AzureStoragePath>();
544            match expected {
545                Some(expected_path) => {
546                    assert!(result.is_ok(), "Test case {} failed: {:?}", name, result);
547                    assert_eq!(result.unwrap(), expected_path, "Test case: {}", name);
548                }
549                None => {
550                    assert!(result.is_err(), "Test case {} expected error.", name);
551                }
552            }
553        }
554    }
555
556    #[test]
557    fn test_azure_storage_path_endpoint() {
558        let test_cases = vec![
559            (
560                "abfss uses https",
561                AzureStoragePath {
562                    scheme: AzureStorageScheme::Abfss,
563                    filesystem: "myfs".to_string(),
564                    account_name: "myaccount".to_string(),
565                    endpoint_suffix: "core.windows.net".to_string(),
566                    path: "/path/to/file.parquet".to_string(),
567                },
568                "https://myaccount.dfs.core.windows.net",
569            ),
570            (
571                "abfs uses http",
572                AzureStoragePath {
573                    scheme: AzureStorageScheme::Abfs,
574                    filesystem: "myfs".to_string(),
575                    account_name: "myaccount".to_string(),
576                    endpoint_suffix: "core.windows.net".to_string(),
577                    path: "/path/to/file.parquet".to_string(),
578                },
579                "http://myaccount.dfs.core.windows.net",
580            ),
581            (
582                "wasbs uses https and dfs",
583                AzureStoragePath {
584                    scheme: AzureStorageScheme::Abfss,
585                    filesystem: "myfs".to_string(),
586                    account_name: "myaccount".to_string(),
587                    endpoint_suffix: "core.windows.net".to_string(),
588                    path: "/path/to/file.parquet".to_string(),
589                },
590                "https://myaccount.dfs.core.windows.net",
591            ),
592        ];
593
594        for (name, path, expected) in test_cases {
595            let endpoint = path.as_endpoint();
596            assert_eq!(endpoint, expected, "Test case: {}", name);
597        }
598    }
599}