1use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use opendal::Configurator;
23use opendal::services::AzdlsConfig;
24use url::Url;
25
26use crate::{Error, ErrorKind, Result, ensure_data_valid};
27
28const ADLS_CONNECTION_STRING: &str = "adls.connection-string";
33
34pub const ADLS_ACCOUNT_NAME: &str = "adls.account-name";
36
37pub const ADLS_ACCOUNT_KEY: &str = "adls.account-key";
39
40pub const ADLS_SAS_TOKEN: &str = "adls.sas-token";
42
43pub const ADLS_TENANT_ID: &str = "adls.tenant-id";
45
46pub const ADLS_CLIENT_ID: &str = "adls.client-id";
48
49pub const ADLS_CLIENT_SECRET: &str = "adls.client-secret";
51
52pub const ADLS_AUTHORITY_HOST: &str = "adls.authority-host";
56
57pub(crate) fn azdls_config_parse(mut properties: HashMap<String, String>) -> Result<AzdlsConfig> {
59 let mut config = AzdlsConfig::default();
60
61 if let Some(_conn_str) = properties.remove(ADLS_CONNECTION_STRING) {
62 return Err(Error::new(
63 ErrorKind::FeatureUnsupported,
64 "Azdls: connection string currently not supported",
65 ));
66 }
67
68 if let Some(account_name) = properties.remove(ADLS_ACCOUNT_NAME) {
69 config.account_name = Some(account_name);
70 }
71
72 if let Some(account_key) = properties.remove(ADLS_ACCOUNT_KEY) {
73 config.account_key = Some(account_key);
74 }
75
76 if let Some(sas_token) = properties.remove(ADLS_SAS_TOKEN) {
77 config.sas_token = Some(sas_token);
78 }
79
80 if let Some(tenant_id) = properties.remove(ADLS_TENANT_ID) {
81 config.tenant_id = Some(tenant_id);
82 }
83
84 if let Some(client_id) = properties.remove(ADLS_CLIENT_ID) {
85 config.client_id = Some(client_id);
86 }
87
88 if let Some(client_secret) = properties.remove(ADLS_CLIENT_SECRET) {
89 config.client_secret = Some(client_secret);
90 }
91
92 if let Some(authority_host) = properties.remove(ADLS_AUTHORITY_HOST) {
93 config.authority_host = Some(authority_host);
94 }
95
96 Ok(config)
97}
98
99pub(crate) fn azdls_create_operator<'a>(
104 absolute_path: &'a str,
105 config: &AzdlsConfig,
106 configured_scheme: &AzureStorageScheme,
107) -> Result<(opendal::Operator, &'a str)> {
108 let path = absolute_path.parse::<AzureStoragePath>()?;
109 match_path_with_config(&path, config, configured_scheme)?;
110
111 let op = azdls_config_build(config, &path)?;
112
113 let relative_path_len = path.path.len();
118 let (_, relative_path) = absolute_path.split_at(absolute_path.len() - relative_path_len);
119
120 Ok((op, relative_path))
121}
122
123#[derive(Debug, PartialEq)]
129pub(crate) enum AzureStorageScheme {
130 Abfs,
131 Abfss,
132 Wasb,
133 Wasbs,
134}
135
136impl AzureStorageScheme {
137 pub fn as_http_scheme(&self) -> &str {
139 match self {
140 AzureStorageScheme::Abfs | AzureStorageScheme::Wasb => "http",
141 AzureStorageScheme::Abfss | AzureStorageScheme::Wasbs => "https",
142 }
143 }
144}
145
146impl Display for AzureStorageScheme {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 match self {
149 AzureStorageScheme::Abfs => write!(f, "abfs"),
150 AzureStorageScheme::Abfss => write!(f, "abfss"),
151 AzureStorageScheme::Wasb => write!(f, "wasb"),
152 AzureStorageScheme::Wasbs => write!(f, "wasbs"),
153 }
154 }
155}
156
157impl FromStr for AzureStorageScheme {
158 type Err = Error;
159
160 fn from_str(s: &str) -> Result<Self> {
161 match s {
162 "abfs" => Ok(AzureStorageScheme::Abfs),
163 "abfss" => Ok(AzureStorageScheme::Abfss),
164 "wasb" => Ok(AzureStorageScheme::Wasb),
165 "wasbs" => Ok(AzureStorageScheme::Wasbs),
166 _ => Err(Error::new(
167 ErrorKind::DataInvalid,
168 format!("Unexpected Azure Storage scheme: {}", s),
169 )),
170 }
171 }
172}
173
174fn match_path_with_config(
176 path: &AzureStoragePath,
177 config: &AzdlsConfig,
178 configured_scheme: &AzureStorageScheme,
179) -> Result<()> {
180 ensure_data_valid!(
181 &path.scheme == configured_scheme,
182 "Storage::Azdls: Scheme mismatch: configured {}, passed {}",
183 configured_scheme,
184 path.scheme
185 );
186
187 if let Some(ref configured_account_name) = config.account_name {
188 ensure_data_valid!(
189 &path.account_name == configured_account_name,
190 "Storage::Azdls: Account name mismatch: configured {}, path {}",
191 configured_account_name,
192 path.account_name
193 );
194 }
195
196 if let Some(ref configured_endpoint) = config.endpoint {
197 let passed_http_scheme = path.scheme.as_http_scheme();
198 ensure_data_valid!(
199 configured_endpoint.starts_with(passed_http_scheme),
200 "Storage::Azdls: Endpoint {} does not use the expected http scheme {}.",
201 configured_endpoint,
202 passed_http_scheme
203 );
204
205 let ends_with_expected_suffix = configured_endpoint
206 .trim_end_matches('/')
207 .ends_with(&path.endpoint_suffix);
208 ensure_data_valid!(
209 ends_with_expected_suffix,
210 "Storage::Azdls: Endpoint suffix {} used with configured endpoint {}.",
211 path.endpoint_suffix,
212 configured_endpoint,
213 );
214 }
215
216 Ok(())
217}
218
219fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result<opendal::Operator> {
220 let mut builder = config.clone().into_builder();
221
222 if config.endpoint.is_none() {
223 builder = builder.endpoint(&path.as_endpoint());
225 }
226 builder = builder.filesystem(&path.filesystem);
227
228 Ok(opendal::Operator::new(builder)?.finish())
229}
230
231#[derive(Debug, PartialEq)]
233struct AzureStoragePath {
234 scheme: AzureStorageScheme,
236
237 filesystem: String,
239
240 account_name: String,
241
242 endpoint_suffix: String,
245
246 path: String,
250}
251
252impl AzureStoragePath {
253 fn as_endpoint(&self) -> String {
257 format!(
258 "{}://{}.dfs.{}",
259 self.scheme.as_http_scheme(),
260 self.account_name,
261 self.endpoint_suffix
262 )
263 }
264}
265
266impl FromStr for AzureStoragePath {
267 type Err = Error;
268
269 fn from_str(path: &str) -> Result<Self> {
270 let url = Url::parse(path)?;
271
272 let filesystem = url.username();
273 ensure_data_valid!(
274 !filesystem.is_empty(),
275 "AzureStoragePath: No container or filesystem name in path: {}",
276 path
277 );
278
279 let (account_name, storage_service, endpoint_suffix) = parse_azure_storage_endpoint(&url)?;
280 let scheme = validate_storage_and_scheme(storage_service, url.scheme())?;
281
282 Ok(AzureStoragePath {
283 scheme,
284 filesystem: filesystem.to_string(),
285 account_name: account_name.to_string(),
286 endpoint_suffix: endpoint_suffix.to_string(),
287 path: url.path().to_string(),
288 })
289 }
290}
291
292fn parse_azure_storage_endpoint(url: &Url) -> Result<(&str, &str, &str)> {
293 let host = url.host_str().ok_or(Error::new(
294 ErrorKind::DataInvalid,
295 "AzureStoragePath: No host",
296 ))?;
297
298 let (account_name, endpoint) = host.split_once('.').ok_or(Error::new(
299 ErrorKind::DataInvalid,
300 "AzureStoragePath: No account name",
301 ))?;
302 if account_name.is_empty() {
303 return Err(Error::new(
304 ErrorKind::DataInvalid,
305 "AzureStoragePath: No account name",
306 ));
307 }
308
309 let (storage, endpoint_suffix) = endpoint.split_once('.').ok_or(Error::new(
310 ErrorKind::DataInvalid,
311 "AzureStoragePath: No storage service",
312 ))?;
313
314 Ok((account_name, storage, endpoint_suffix))
315}
316
317fn validate_storage_and_scheme(
318 storage_service: &str,
319 scheme_str: &str,
320) -> Result<AzureStorageScheme> {
321 let scheme = scheme_str.parse::<AzureStorageScheme>()?;
322 match scheme {
323 AzureStorageScheme::Abfss | AzureStorageScheme::Abfs => {
324 ensure_data_valid!(
325 storage_service == "dfs",
326 "AzureStoragePath: Unexpected storage service for abfs[s]: {}",
327 storage_service
328 );
329 Ok(scheme)
330 }
331 AzureStorageScheme::Wasbs | AzureStorageScheme::Wasb => {
332 ensure_data_valid!(
333 storage_service == "blob",
334 "AzureStoragePath: Unexpected storage service for wasb[s]: {}",
335 storage_service
336 );
337 Ok(scheme)
338 }
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use std::collections::HashMap;
345
346 use opendal::services::AzdlsConfig;
347
348 use super::{AzureStoragePath, AzureStorageScheme, azdls_create_operator};
349 use crate::io::azdls_config_parse;
350
351 #[test]
352 fn test_azdls_config_parse() {
353 let test_cases = vec![
354 (
355 "account name and key",
356 HashMap::from([
357 (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()),
358 (super::ADLS_ACCOUNT_KEY.to_string(), "secret".to_string()),
359 ]),
360 Some(AzdlsConfig {
361 account_name: Some("test".to_string()),
362 account_key: Some("secret".to_string()),
363 ..Default::default()
364 }),
365 ),
366 (
367 "account name and SAS token",
368 HashMap::from([
369 (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()),
370 (super::ADLS_SAS_TOKEN.to_string(), "token".to_string()),
371 ]),
372 Some(AzdlsConfig {
373 account_name: Some("test".to_string()),
374 sas_token: Some("token".to_string()),
375 ..Default::default()
376 }),
377 ),
378 (
379 "account name and ADD credentials",
380 HashMap::from([
381 (super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string()),
382 (super::ADLS_CLIENT_ID.to_string(), "abcdef".to_string()),
383 (super::ADLS_CLIENT_SECRET.to_string(), "secret".to_string()),
384 (super::ADLS_TENANT_ID.to_string(), "12345".to_string()),
385 ]),
386 Some(AzdlsConfig {
387 account_name: Some("test".to_string()),
388 client_id: Some("abcdef".to_string()),
389 client_secret: Some("secret".to_string()),
390 tenant_id: Some("12345".to_string()),
391 ..Default::default()
392 }),
393 ),
394 ];
395
396 for (name, properties, expected) in test_cases {
397 let config = azdls_config_parse(properties);
398 match expected {
399 Some(expected_config) => {
400 assert!(config.is_ok(), "Test case {} failed: {:?}", name, config);
401 assert_eq!(config.unwrap(), expected_config, "Test case: {}", name);
402 }
403 None => {
404 assert!(config.is_err(), "Test case {} expected error.", name);
405 }
406 }
407 }
408 }
409
410 #[test]
411 fn test_azdls_create_operator() {
412 let test_cases = vec![
413 (
414 "basic",
415 (
416 "abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
417 AzdlsConfig {
418 account_name: Some("myaccount".to_string()),
419 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
420 ..Default::default()
421 },
422 AzureStorageScheme::Abfss,
423 ),
424 Some(("myfs", "/path/to/file.parquet")),
425 ),
426 (
427 "different account",
428 (
429 "abfss://myfs@anotheraccount.dfs.core.windows.net/path/to/file.parquet",
430 AzdlsConfig {
431 account_name: Some("myaccount".to_string()),
432 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
433 ..Default::default()
434 },
435 AzureStorageScheme::Abfss,
436 ),
437 None,
438 ),
439 (
440 "different scheme",
441 (
442 "wasbs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
443 AzdlsConfig {
444 account_name: Some("myaccount".to_string()),
445 endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()),
446 ..Default::default()
447 },
448 AzureStorageScheme::Abfss,
449 ),
450 None,
451 ),
452 (
453 "incompatible scheme for endpoint",
454 (
455 "abfs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
456 AzdlsConfig {
457 account_name: Some("myaccount".to_string()),
458 endpoint: Some("http://myaccount.dfs.core.windows.net".to_string()),
459 ..Default::default()
460 },
461 AzureStorageScheme::Abfss,
462 ),
463 None,
464 ),
465 (
466 "different endpoint suffix",
467 (
468 "abfss://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet",
469 AzdlsConfig {
470 account_name: Some("myaccount".to_string()),
471 endpoint: Some("https://myaccount.dfs.core.chinacloudapi.cn".to_string()),
472 ..Default::default()
473 },
474 AzureStorageScheme::Abfss,
475 ),
476 None,
477 ),
478 (
479 "endpoint inferred from fully qualified path",
480 (
481 "abfs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet",
482 AzdlsConfig {
483 filesystem: "myfs".to_string(),
484 account_name: Some("myaccount".to_string()),
485 endpoint: None,
486 ..Default::default()
487 },
488 AzureStorageScheme::Abfs,
489 ),
490 Some(("myfs", "/path/to/file.parquet")),
491 ),
492 ];
493
494 for (name, input, expected) in test_cases {
495 let result = azdls_create_operator(input.0, &input.1, &input.2);
496 match expected {
497 Some((expected_filesystem, expected_path)) => {
498 assert!(result.is_ok(), "Test case {} failed: {:?}", name, result);
499
500 let (op, relative_path) = result.unwrap();
501 assert_eq!(op.info().name(), expected_filesystem);
502 assert_eq!(relative_path, expected_path);
503 }
504 None => {
505 assert!(result.is_err(), "Test case {} expected error.", name);
506 }
507 }
508 }
509 }
510
511 #[test]
512 fn test_azure_storage_path_parse() {
513 let test_cases = vec![
514 (
515 "succeeds",
516 "abfss://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet",
517 Some(AzureStoragePath {
518 scheme: AzureStorageScheme::Abfss,
519 filesystem: "somefs".to_string(),
520 account_name: "myaccount".to_string(),
521 endpoint_suffix: "core.windows.net".to_string(),
522 path: "/path/to/file.parquet".to_string(),
523 }),
524 ),
525 (
526 "unexpected scheme",
527 "adls://somefs@myaccount.dfs.core.windows.net/path/to/file.parquet",
528 None,
529 ),
530 (
531 "no filesystem",
532 "abfss://myaccount.dfs.core.windows.net/path/to/file.parquet",
533 None,
534 ),
535 (
536 "no account name",
537 "abfs://myfs@dfs.core.windows.net/path/to/file.parquet",
538 None,
539 ),
540 ];
541
542 for (name, input, expected) in test_cases {
543 let result = input.parse::<AzureStoragePath>();
544 match expected {
545 Some(expected_path) => {
546 assert!(result.is_ok(), "Test case {} failed: {:?}", name, result);
547 assert_eq!(result.unwrap(), expected_path, "Test case: {}", name);
548 }
549 None => {
550 assert!(result.is_err(), "Test case {} expected error.", name);
551 }
552 }
553 }
554 }
555
556 #[test]
557 fn test_azure_storage_path_endpoint() {
558 let test_cases = vec![
559 (
560 "abfss uses https",
561 AzureStoragePath {
562 scheme: AzureStorageScheme::Abfss,
563 filesystem: "myfs".to_string(),
564 account_name: "myaccount".to_string(),
565 endpoint_suffix: "core.windows.net".to_string(),
566 path: "/path/to/file.parquet".to_string(),
567 },
568 "https://myaccount.dfs.core.windows.net",
569 ),
570 (
571 "abfs uses http",
572 AzureStoragePath {
573 scheme: AzureStorageScheme::Abfs,
574 filesystem: "myfs".to_string(),
575 account_name: "myaccount".to_string(),
576 endpoint_suffix: "core.windows.net".to_string(),
577 path: "/path/to/file.parquet".to_string(),
578 },
579 "http://myaccount.dfs.core.windows.net",
580 ),
581 (
582 "wasbs uses https and dfs",
583 AzureStoragePath {
584 scheme: AzureStorageScheme::Abfss,
585 filesystem: "myfs".to_string(),
586 account_name: "myaccount".to_string(),
587 endpoint_suffix: "core.windows.net".to_string(),
588 path: "/path/to/file.parquet".to_string(),
589 },
590 "https://myaccount.dfs.core.windows.net",
591 ),
592 ];
593
594 for (name, path, expected) in test_cases {
595 let endpoint = path.as_endpoint();
596 assert_eq!(endpoint, expected, "Test case: {}", name);
597 }
598 }
599}