iceberg/writer/file_writer/
location_generator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains the location generator and file name generator for generating path of data file.
19
20use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22
23use crate::Result;
24use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
25
26/// `LocationGenerator` used to generate the location of data file.
27pub trait LocationGenerator: Clone + Send + Sync + 'static {
28    /// Generate an absolute path for the given file name that includes the partition path.
29    ///
30    /// # Arguments
31    ///
32    /// * `partition_key` - The partition key of the file. If None, generate a non-partitioned path.
33    /// * `file_name` - The name of the file
34    ///
35    /// # Returns
36    ///
37    /// An absolute path that includes the partition path, e.g.,
38    /// "/table/data/id=1/name=alice/part-00000.parquet"
39    /// or non-partitioned path:
40    /// "/table/data/part-00000.parquet"
41    fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String;
42}
43
44const WRITE_DATA_LOCATION: &str = "write.data.path";
45const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path";
46const DEFAULT_DATA_DIR: &str = "/data";
47
48#[derive(Clone, Debug)]
49/// `DefaultLocationGenerator` used to generate the data dir location of data file.
50/// The location is generated based on the table location and the data location in table properties.
51pub struct DefaultLocationGenerator {
52    data_location: String,
53}
54
55impl DefaultLocationGenerator {
56    /// Create a new `DefaultLocationGenerator`.
57    pub fn new(table_metadata: TableMetadata) -> Result<Self> {
58        let table_location = table_metadata.location();
59        let prop = table_metadata.properties();
60        let configured_data_location = prop
61            .get(WRITE_DATA_LOCATION)
62            .or(prop.get(WRITE_FOLDER_STORAGE_LOCATION));
63        let data_location = if let Some(data_location) = configured_data_location {
64            data_location.clone()
65        } else {
66            format!("{table_location}{DEFAULT_DATA_DIR}")
67        };
68        Ok(Self { data_location })
69    }
70
71    /// Create a new `DefaultLocationGenerator` with a specified data location.
72    ///
73    /// # Arguments
74    ///
75    /// * `data_location` - The data location to use for generating file locations.
76    pub fn with_data_location(data_location: String) -> Self {
77        Self { data_location }
78    }
79}
80
81impl LocationGenerator for DefaultLocationGenerator {
82    fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String {
83        if PartitionKey::is_effectively_none(partition_key) {
84            format!("{}/{}", self.data_location, file_name)
85        } else {
86            format!(
87                "{}/{}/{}",
88                self.data_location,
89                partition_key.unwrap().to_path(),
90                file_name
91            )
92        }
93    }
94}
95
96/// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file.
97pub trait FileNameGenerator: Clone + Send + Sync + 'static {
98    /// Generate a file name.
99    fn generate_file_name(&self) -> String;
100}
101
102/// `DefaultFileNameGenerator` used to generate file name for data file. The file name can be
103/// passed to `LocationGenerator` to generate the location of the file.
104/// The file name format is "{prefix}-{file_count}[-{suffix}].{file_format}".
105#[derive(Clone, Debug)]
106pub struct DefaultFileNameGenerator {
107    prefix: String,
108    suffix: String,
109    format: String,
110    file_count: Arc<AtomicU64>,
111}
112
113impl DefaultFileNameGenerator {
114    /// Create a new `FileNameGenerator`.
115    pub fn new(prefix: String, suffix: Option<String>, format: DataFileFormat) -> Self {
116        let suffix = if let Some(suffix) = suffix {
117            format!("-{suffix}")
118        } else {
119            "".to_string()
120        };
121
122        Self {
123            prefix,
124            suffix,
125            format: format.to_string(),
126            file_count: Arc::new(AtomicU64::new(0)),
127        }
128    }
129}
130
131impl FileNameGenerator for DefaultFileNameGenerator {
132    fn generate_file_name(&self) -> String {
133        let file_id = self
134            .file_count
135            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
136        format!(
137            "{}-{:05}{}.{}",
138            self.prefix, file_id, self.suffix, self.format
139        )
140    }
141}
142
143#[cfg(test)]
144pub(crate) mod test {
145    use std::collections::HashMap;
146    use std::sync::Arc;
147
148    use uuid::Uuid;
149
150    use super::LocationGenerator;
151    use crate::spec::{
152        FormatVersion, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Schema,
153        Struct, StructType, TableMetadata, Transform, Type,
154    };
155    use crate::writer::file_writer::location_generator::{
156        DefaultLocationGenerator, FileNameGenerator, WRITE_DATA_LOCATION,
157        WRITE_FOLDER_STORAGE_LOCATION,
158    };
159
160    #[test]
161    fn test_default_location_generate() {
162        let mut table_metadata = TableMetadata {
163            format_version: FormatVersion::V2,
164            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
165            location: "s3://data.db/table".to_string(),
166            last_updated_ms: 1515100955770,
167            last_column_id: 1,
168            schemas: HashMap::new(),
169            current_schema_id: 1,
170            partition_specs: HashMap::new(),
171            default_spec: PartitionSpec::unpartition_spec().into(),
172            default_partition_type: StructType::new(vec![]),
173            last_partition_id: 1000,
174            default_sort_order_id: 0,
175            sort_orders: HashMap::from_iter(vec![]),
176            snapshots: HashMap::default(),
177            current_snapshot_id: None,
178            last_sequence_number: 1,
179            properties: HashMap::new(),
180            snapshot_log: Vec::new(),
181            metadata_log: vec![],
182            refs: HashMap::new(),
183            statistics: HashMap::new(),
184            partition_statistics: HashMap::new(),
185            encryption_keys: HashMap::new(),
186            next_row_id: 0,
187        };
188
189        let file_name_generator = super::DefaultFileNameGenerator::new(
190            "part".to_string(),
191            Some("test".to_string()),
192            crate::spec::DataFileFormat::Parquet,
193        );
194
195        // test default data location
196        let location_generator =
197            super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
198        let location =
199            location_generator.generate_location(None, &file_name_generator.generate_file_name());
200        assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet");
201
202        // test custom data location
203        table_metadata.properties.insert(
204            WRITE_FOLDER_STORAGE_LOCATION.to_string(),
205            "s3://data.db/table/data_1".to_string(),
206        );
207        let location_generator =
208            super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
209        let location =
210            location_generator.generate_location(None, &file_name_generator.generate_file_name());
211        assert_eq!(
212            location,
213            "s3://data.db/table/data_1/part-00001-test.parquet"
214        );
215
216        table_metadata.properties.insert(
217            WRITE_DATA_LOCATION.to_string(),
218            "s3://data.db/table/data_2".to_string(),
219        );
220        let location_generator =
221            super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
222        let location =
223            location_generator.generate_location(None, &file_name_generator.generate_file_name());
224        assert_eq!(
225            location,
226            "s3://data.db/table/data_2/part-00002-test.parquet"
227        );
228
229        table_metadata.properties.insert(
230            WRITE_DATA_LOCATION.to_string(),
231            // invalid table location
232            "s3://data.db/data_3".to_string(),
233        );
234        let location_generator =
235            super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
236        let location =
237            location_generator.generate_location(None, &file_name_generator.generate_file_name());
238        assert_eq!(location, "s3://data.db/data_3/part-00003-test.parquet");
239    }
240
241    #[test]
242    fn test_location_generate_with_partition() {
243        // Create a schema with two fields: id (int) and name (string)
244        let schema = Arc::new(
245            Schema::builder()
246                .with_schema_id(1)
247                .with_fields(vec![
248                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
249                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
250                ])
251                .build()
252                .unwrap(),
253        );
254
255        // Create a partition spec with both fields
256        let partition_spec = PartitionSpec::builder(schema.clone())
257            .add_partition_field("id", "id", Transform::Identity)
258            .unwrap()
259            .add_partition_field("name", "name", Transform::Identity)
260            .unwrap()
261            .build()
262            .unwrap();
263
264        // Create partition data with values
265        let partition_data =
266            Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
267
268        // Create a partition key
269        let partition_key = PartitionKey::new(partition_spec, schema, partition_data);
270
271        let location_gen = DefaultLocationGenerator::with_data_location("/base/path".to_string());
272        let file_name = "data-00000.parquet";
273        let location = location_gen.generate_location(Some(&partition_key), file_name);
274        assert_eq!(location, "/base/path/id=42/name=alice/data-00000.parquet");
275
276        // Create a table metadata for DefaultLocationGenerator
277        let table_metadata = TableMetadata {
278            format_version: FormatVersion::V2,
279            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
280            location: "s3://data.db/table".to_string(),
281            last_updated_ms: 1515100955770,
282            last_column_id: 2,
283            schemas: HashMap::new(),
284            current_schema_id: 1,
285            partition_specs: HashMap::new(),
286            default_spec: PartitionSpec::unpartition_spec().into(),
287            default_partition_type: StructType::new(vec![]),
288            last_partition_id: 1000,
289            default_sort_order_id: 0,
290            sort_orders: HashMap::from_iter(vec![]),
291            snapshots: HashMap::default(),
292            current_snapshot_id: None,
293            last_sequence_number: 1,
294            properties: HashMap::new(),
295            snapshot_log: Vec::new(),
296            metadata_log: vec![],
297            refs: HashMap::new(),
298            statistics: HashMap::new(),
299            partition_statistics: HashMap::new(),
300            encryption_keys: HashMap::new(),
301            next_row_id: 0,
302        };
303
304        // Test with DefaultLocationGenerator
305        let default_location_gen = super::DefaultLocationGenerator::new(table_metadata).unwrap();
306        let location = default_location_gen.generate_location(Some(&partition_key), file_name);
307        assert_eq!(
308            location,
309            "s3://data.db/table/data/id=42/name=alice/data-00000.parquet"
310        );
311    }
312}