iceberg/writer/file_writer/
location_generator.rs1use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22
23use crate::Result;
24use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
25
26pub trait LocationGenerator: Clone + Send + Sync + 'static {
28 fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String;
42}
43
44const WRITE_DATA_LOCATION: &str = "write.data.path";
45const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path";
46const DEFAULT_DATA_DIR: &str = "/data";
47
48#[derive(Clone, Debug)]
49pub struct DefaultLocationGenerator {
52 data_location: String,
53}
54
55impl DefaultLocationGenerator {
56 pub fn new(table_metadata: TableMetadata) -> Result<Self> {
58 let table_location = table_metadata.location();
59 let prop = table_metadata.properties();
60 let configured_data_location = prop
61 .get(WRITE_DATA_LOCATION)
62 .or(prop.get(WRITE_FOLDER_STORAGE_LOCATION));
63 let data_location = if let Some(data_location) = configured_data_location {
64 data_location.clone()
65 } else {
66 format!("{table_location}{DEFAULT_DATA_DIR}")
67 };
68 Ok(Self { data_location })
69 }
70
71 pub fn with_data_location(data_location: String) -> Self {
77 Self { data_location }
78 }
79}
80
81impl LocationGenerator for DefaultLocationGenerator {
82 fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String {
83 if PartitionKey::is_effectively_none(partition_key) {
84 format!("{}/{}", self.data_location, file_name)
85 } else {
86 format!(
87 "{}/{}/{}",
88 self.data_location,
89 partition_key.unwrap().to_path(),
90 file_name
91 )
92 }
93 }
94}
95
96pub trait FileNameGenerator: Clone + Send + Sync + 'static {
98 fn generate_file_name(&self) -> String;
100}
101
102#[derive(Clone, Debug)]
106pub struct DefaultFileNameGenerator {
107 prefix: String,
108 suffix: String,
109 format: String,
110 file_count: Arc<AtomicU64>,
111}
112
113impl DefaultFileNameGenerator {
114 pub fn new(prefix: String, suffix: Option<String>, format: DataFileFormat) -> Self {
116 let suffix = if let Some(suffix) = suffix {
117 format!("-{suffix}")
118 } else {
119 "".to_string()
120 };
121
122 Self {
123 prefix,
124 suffix,
125 format: format.to_string(),
126 file_count: Arc::new(AtomicU64::new(0)),
127 }
128 }
129}
130
131impl FileNameGenerator for DefaultFileNameGenerator {
132 fn generate_file_name(&self) -> String {
133 let file_id = self
134 .file_count
135 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
136 format!(
137 "{}-{:05}{}.{}",
138 self.prefix, file_id, self.suffix, self.format
139 )
140 }
141}
142
143#[cfg(test)]
144pub(crate) mod test {
145 use std::collections::HashMap;
146 use std::sync::Arc;
147
148 use uuid::Uuid;
149
150 use super::LocationGenerator;
151 use crate::spec::{
152 FormatVersion, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Schema,
153 Struct, StructType, TableMetadata, Transform, Type,
154 };
155 use crate::writer::file_writer::location_generator::{
156 DefaultLocationGenerator, FileNameGenerator, WRITE_DATA_LOCATION,
157 WRITE_FOLDER_STORAGE_LOCATION,
158 };
159
160 #[test]
161 fn test_default_location_generate() {
162 let mut table_metadata = TableMetadata {
163 format_version: FormatVersion::V2,
164 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
165 location: "s3://data.db/table".to_string(),
166 last_updated_ms: 1515100955770,
167 last_column_id: 1,
168 schemas: HashMap::new(),
169 current_schema_id: 1,
170 partition_specs: HashMap::new(),
171 default_spec: PartitionSpec::unpartition_spec().into(),
172 default_partition_type: StructType::new(vec![]),
173 last_partition_id: 1000,
174 default_sort_order_id: 0,
175 sort_orders: HashMap::from_iter(vec![]),
176 snapshots: HashMap::default(),
177 current_snapshot_id: None,
178 last_sequence_number: 1,
179 properties: HashMap::new(),
180 snapshot_log: Vec::new(),
181 metadata_log: vec![],
182 refs: HashMap::new(),
183 statistics: HashMap::new(),
184 partition_statistics: HashMap::new(),
185 encryption_keys: HashMap::new(),
186 next_row_id: 0,
187 };
188
189 let file_name_generator = super::DefaultFileNameGenerator::new(
190 "part".to_string(),
191 Some("test".to_string()),
192 crate::spec::DataFileFormat::Parquet,
193 );
194
195 let location_generator =
197 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
198 let location =
199 location_generator.generate_location(None, &file_name_generator.generate_file_name());
200 assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet");
201
202 table_metadata.properties.insert(
204 WRITE_FOLDER_STORAGE_LOCATION.to_string(),
205 "s3://data.db/table/data_1".to_string(),
206 );
207 let location_generator =
208 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
209 let location =
210 location_generator.generate_location(None, &file_name_generator.generate_file_name());
211 assert_eq!(
212 location,
213 "s3://data.db/table/data_1/part-00001-test.parquet"
214 );
215
216 table_metadata.properties.insert(
217 WRITE_DATA_LOCATION.to_string(),
218 "s3://data.db/table/data_2".to_string(),
219 );
220 let location_generator =
221 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
222 let location =
223 location_generator.generate_location(None, &file_name_generator.generate_file_name());
224 assert_eq!(
225 location,
226 "s3://data.db/table/data_2/part-00002-test.parquet"
227 );
228
229 table_metadata.properties.insert(
230 WRITE_DATA_LOCATION.to_string(),
231 "s3://data.db/data_3".to_string(),
233 );
234 let location_generator =
235 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
236 let location =
237 location_generator.generate_location(None, &file_name_generator.generate_file_name());
238 assert_eq!(location, "s3://data.db/data_3/part-00003-test.parquet");
239 }
240
241 #[test]
242 fn test_location_generate_with_partition() {
243 let schema = Arc::new(
245 Schema::builder()
246 .with_schema_id(1)
247 .with_fields(vec![
248 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
249 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
250 ])
251 .build()
252 .unwrap(),
253 );
254
255 let partition_spec = PartitionSpec::builder(schema.clone())
257 .add_partition_field("id", "id", Transform::Identity)
258 .unwrap()
259 .add_partition_field("name", "name", Transform::Identity)
260 .unwrap()
261 .build()
262 .unwrap();
263
264 let partition_data =
266 Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
267
268 let partition_key = PartitionKey::new(partition_spec, schema, partition_data);
270
271 let location_gen = DefaultLocationGenerator::with_data_location("/base/path".to_string());
272 let file_name = "data-00000.parquet";
273 let location = location_gen.generate_location(Some(&partition_key), file_name);
274 assert_eq!(location, "/base/path/id=42/name=alice/data-00000.parquet");
275
276 let table_metadata = TableMetadata {
278 format_version: FormatVersion::V2,
279 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
280 location: "s3://data.db/table".to_string(),
281 last_updated_ms: 1515100955770,
282 last_column_id: 2,
283 schemas: HashMap::new(),
284 current_schema_id: 1,
285 partition_specs: HashMap::new(),
286 default_spec: PartitionSpec::unpartition_spec().into(),
287 default_partition_type: StructType::new(vec![]),
288 last_partition_id: 1000,
289 default_sort_order_id: 0,
290 sort_orders: HashMap::from_iter(vec![]),
291 snapshots: HashMap::default(),
292 current_snapshot_id: None,
293 last_sequence_number: 1,
294 properties: HashMap::new(),
295 snapshot_log: Vec::new(),
296 metadata_log: vec![],
297 refs: HashMap::new(),
298 statistics: HashMap::new(),
299 partition_statistics: HashMap::new(),
300 encryption_keys: HashMap::new(),
301 next_row_id: 0,
302 };
303
304 let default_location_gen = super::DefaultLocationGenerator::new(table_metadata).unwrap();
306 let location = default_location_gen.generate_location(Some(&partition_key), file_name);
307 assert_eq!(
308 location,
309 "s3://data.db/table/data/id=42/name=alice/data-00000.parquet"
310 );
311 }
312}