iceberg/writer/file_writer/
location_generator.rs1use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22
23use crate::Result;
24use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
25
26pub trait LocationGenerator: Clone + Send + Sync + 'static {
28 fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String;
42}
43
44const WRITE_DATA_LOCATION: &str = "write.data.path";
45const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path";
46const DEFAULT_DATA_DIR: &str = "/data";
47
48#[derive(Clone, Debug)]
49pub struct DefaultLocationGenerator {
52 data_location: String,
53}
54
55impl DefaultLocationGenerator {
56 pub fn new(table_metadata: &TableMetadata) -> Result<Self> {
58 let table_location = table_metadata.location();
59 let prop = table_metadata.properties();
60 let configured_data_location = prop
61 .get(WRITE_DATA_LOCATION)
62 .or(prop.get(WRITE_FOLDER_STORAGE_LOCATION));
63 let data_location = if let Some(data_location) = configured_data_location {
64 data_location.clone()
65 } else {
66 format!("{table_location}{DEFAULT_DATA_DIR}")
67 };
68 Ok(Self { data_location })
69 }
70
71 pub fn with_data_location(data_location: String) -> Self {
77 Self { data_location }
78 }
79}
80
81impl LocationGenerator for DefaultLocationGenerator {
82 fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String {
83 if PartitionKey::is_effectively_none(partition_key) {
84 format!("{}/{}", self.data_location, file_name)
85 } else {
86 format!(
87 "{}/{}/{}",
88 self.data_location,
89 partition_key.unwrap().to_path(),
90 file_name
91 )
92 }
93 }
94}
95
96pub trait FileNameGenerator: Clone + Send + Sync + 'static {
98 fn generate_file_name(&self) -> String;
100}
101
102#[derive(Clone, Debug)]
106pub struct DefaultFileNameGenerator {
107 prefix: String,
108 suffix: String,
109 format: String,
110 file_count: Arc<AtomicU64>,
111}
112
113impl DefaultFileNameGenerator {
114 pub fn new(prefix: String, suffix: Option<String>, format: DataFileFormat) -> Self {
116 let suffix = if let Some(suffix) = suffix {
117 format!("-{suffix}")
118 } else {
119 "".to_string()
120 };
121
122 Self {
123 prefix,
124 suffix,
125 format: format.to_string(),
126 file_count: Arc::new(AtomicU64::new(0)),
127 }
128 }
129}
130
131impl FileNameGenerator for DefaultFileNameGenerator {
132 fn generate_file_name(&self) -> String {
133 let file_id = self
134 .file_count
135 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
136 format!(
137 "{}-{:05}{}.{}",
138 self.prefix, file_id, self.suffix, self.format
139 )
140 }
141}
142
143#[cfg(test)]
144pub(crate) mod test {
145 use std::collections::HashMap;
146 use std::sync::Arc;
147
148 use uuid::Uuid;
149
150 use super::LocationGenerator;
151 use crate::spec::{
152 FormatVersion, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Schema,
153 Struct, StructType, TableMetadata, Transform, Type,
154 };
155 use crate::writer::file_writer::location_generator::{
156 DefaultLocationGenerator, FileNameGenerator, WRITE_DATA_LOCATION,
157 WRITE_FOLDER_STORAGE_LOCATION,
158 };
159
160 #[test]
161 fn test_default_location_generate() {
162 let mut table_metadata = TableMetadata {
163 format_version: FormatVersion::V2,
164 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
165 location: "s3://data.db/table".to_string(),
166 last_updated_ms: 1515100955770,
167 last_column_id: 1,
168 schemas: HashMap::new(),
169 current_schema_id: 1,
170 partition_specs: HashMap::new(),
171 default_spec: PartitionSpec::unpartition_spec().into(),
172 default_partition_type: StructType::new(vec![]),
173 last_partition_id: 1000,
174 default_sort_order_id: 0,
175 sort_orders: HashMap::from_iter(vec![]),
176 snapshots: HashMap::default(),
177 current_snapshot_id: None,
178 last_sequence_number: 1,
179 properties: HashMap::new(),
180 snapshot_log: Vec::new(),
181 metadata_log: vec![],
182 refs: HashMap::new(),
183 statistics: HashMap::new(),
184 partition_statistics: HashMap::new(),
185 encryption_keys: HashMap::new(),
186 next_row_id: 0,
187 };
188
189 let file_name_generator = super::DefaultFileNameGenerator::new(
190 "part".to_string(),
191 Some("test".to_string()),
192 crate::spec::DataFileFormat::Parquet,
193 );
194
195 let location_generator = super::DefaultLocationGenerator::new(&table_metadata).unwrap();
197 let location =
198 location_generator.generate_location(None, &file_name_generator.generate_file_name());
199 assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet");
200
201 table_metadata.properties.insert(
203 WRITE_FOLDER_STORAGE_LOCATION.to_string(),
204 "s3://data.db/table/data_1".to_string(),
205 );
206 let location_generator = super::DefaultLocationGenerator::new(&table_metadata).unwrap();
207 let location =
208 location_generator.generate_location(None, &file_name_generator.generate_file_name());
209 assert_eq!(
210 location,
211 "s3://data.db/table/data_1/part-00001-test.parquet"
212 );
213
214 table_metadata.properties.insert(
215 WRITE_DATA_LOCATION.to_string(),
216 "s3://data.db/table/data_2".to_string(),
217 );
218 let location_generator = super::DefaultLocationGenerator::new(&table_metadata).unwrap();
219 let location =
220 location_generator.generate_location(None, &file_name_generator.generate_file_name());
221 assert_eq!(
222 location,
223 "s3://data.db/table/data_2/part-00002-test.parquet"
224 );
225
226 table_metadata.properties.insert(
227 WRITE_DATA_LOCATION.to_string(),
228 "s3://data.db/data_3".to_string(),
230 );
231 let location_generator = super::DefaultLocationGenerator::new(&table_metadata).unwrap();
232 let location =
233 location_generator.generate_location(None, &file_name_generator.generate_file_name());
234 assert_eq!(location, "s3://data.db/data_3/part-00003-test.parquet");
235 }
236
237 #[test]
238 fn test_location_generate_with_partition() {
239 let schema = Arc::new(
241 Schema::builder()
242 .with_schema_id(1)
243 .with_fields(vec![
244 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
245 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
246 ])
247 .build()
248 .unwrap(),
249 );
250
251 let partition_spec = PartitionSpec::builder(schema.clone())
253 .add_partition_field("id", "id", Transform::Identity)
254 .unwrap()
255 .add_partition_field("name", "name", Transform::Identity)
256 .unwrap()
257 .build()
258 .unwrap();
259
260 let partition_data =
262 Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
263
264 let partition_key = PartitionKey::new(partition_spec, schema, partition_data);
266
267 let location_gen = DefaultLocationGenerator::with_data_location("/base/path".to_string());
268 let file_name = "data-00000.parquet";
269 let location = location_gen.generate_location(Some(&partition_key), file_name);
270 assert_eq!(location, "/base/path/id=42/name=alice/data-00000.parquet");
271
272 let table_metadata = TableMetadata {
274 format_version: FormatVersion::V2,
275 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
276 location: "s3://data.db/table".to_string(),
277 last_updated_ms: 1515100955770,
278 last_column_id: 2,
279 schemas: HashMap::new(),
280 current_schema_id: 1,
281 partition_specs: HashMap::new(),
282 default_spec: PartitionSpec::unpartition_spec().into(),
283 default_partition_type: StructType::new(vec![]),
284 last_partition_id: 1000,
285 default_sort_order_id: 0,
286 sort_orders: HashMap::from_iter(vec![]),
287 snapshots: HashMap::default(),
288 current_snapshot_id: None,
289 last_sequence_number: 1,
290 properties: HashMap::new(),
291 snapshot_log: Vec::new(),
292 metadata_log: vec![],
293 refs: HashMap::new(),
294 statistics: HashMap::new(),
295 partition_statistics: HashMap::new(),
296 encryption_keys: HashMap::new(),
297 next_row_id: 0,
298 };
299
300 let default_location_gen = super::DefaultLocationGenerator::new(&table_metadata).unwrap();
302 let location = default_location_gen.generate_location(Some(&partition_key), file_name);
303 assert_eq!(
304 location,
305 "s3://data.db/table/data/id=42/name=alice/data-00000.parquet"
306 );
307 }
308}