1use std::collections::{HashMap, HashSet};
19use std::future::Future;
20use std::ops::RangeFrom;
21
22use uuid::Uuid;
23
24use crate::error::Result;
25use crate::spec::{
26 DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
27 ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot,
28 SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary,
29 TableProperties, update_snapshot_summaries,
30};
31use crate::table::Table;
32use crate::transaction::ActionCommit;
33use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
34
35const META_ROOT_PATH: &str = "metadata";
36
37pub(crate) trait SnapshotProduceOperation: Send + Sync {
63 fn operation(&self) -> Operation;
68
69 #[allow(unused)]
71 fn delete_entries(
72 &self,
73 snapshot_produce: &SnapshotProducer,
74 ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
75
76 fn existing_manifest(
85 &self,
86 snapshot_produce: &SnapshotProducer<'_>,
87 ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
88}
89
90pub(crate) struct DefaultManifestProcess;
91
92impl ManifestProcess for DefaultManifestProcess {
93 fn process_manifests(
94 &self,
95 _snapshot_produce: &SnapshotProducer<'_>,
96 manifests: Vec<ManifestFile>,
97 ) -> Vec<ManifestFile> {
98 manifests
99 }
100}
101
102pub(crate) trait ManifestProcess: Send + Sync {
103 fn process_manifests(
104 &self,
105 snapshot_produce: &SnapshotProducer<'_>,
106 manifests: Vec<ManifestFile>,
107 ) -> Vec<ManifestFile>;
108}
109
110pub(crate) struct SnapshotProducer<'a> {
111 pub(crate) table: &'a Table,
112 snapshot_id: i64,
113 commit_uuid: Uuid,
114 key_metadata: Option<Vec<u8>>,
115 snapshot_properties: HashMap<String, String>,
116 added_data_files: Vec<DataFile>,
117 manifest_counter: RangeFrom<u64>,
121}
122
123impl<'a> SnapshotProducer<'a> {
124 pub(crate) fn new(
125 table: &'a Table,
126 commit_uuid: Uuid,
127 key_metadata: Option<Vec<u8>>,
128 snapshot_properties: HashMap<String, String>,
129 added_data_files: Vec<DataFile>,
130 ) -> Self {
131 Self {
132 table,
133 snapshot_id: Self::generate_unique_snapshot_id(table),
134 commit_uuid,
135 key_metadata,
136 snapshot_properties,
137 added_data_files,
138 manifest_counter: (0..),
139 }
140 }
141
142 pub(crate) fn validate_added_data_files(&self) -> Result<()> {
143 for data_file in &self.added_data_files {
144 if data_file.content_type() != crate::spec::DataContentType::Data {
145 return Err(Error::new(
146 ErrorKind::DataInvalid,
147 "Only data content type is allowed for fast append",
148 ));
149 }
150 if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
152 return Err(Error::new(
153 ErrorKind::DataInvalid,
154 "Data file partition spec id does not match table default partition spec id",
155 ));
156 }
157 Self::validate_partition_value(
158 data_file.partition(),
159 self.table.metadata().default_partition_type(),
160 )?;
161 }
162
163 Ok(())
164 }
165
166 pub(crate) async fn validate_duplicate_files(&self) -> Result<()> {
167 let new_files: HashSet<&str> = self
168 .added_data_files
169 .iter()
170 .map(|df| df.file_path.as_str())
171 .collect();
172
173 let mut referenced_files = Vec::new();
174 if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
175 let manifest_list = current_snapshot
176 .load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
177 .await?;
178 for manifest_list_entry in manifest_list.entries() {
179 let manifest = manifest_list_entry
180 .load_manifest(self.table.file_io())
181 .await?;
182 for entry in manifest.entries() {
183 let file_path = entry.file_path();
184 if new_files.contains(file_path) && entry.is_alive() {
185 referenced_files.push(file_path.to_string());
186 }
187 }
188 }
189 }
190
191 if !referenced_files.is_empty() {
192 return Err(Error::new(
193 ErrorKind::DataInvalid,
194 format!(
195 "Cannot add files that are already referenced by table, files: {}",
196 referenced_files.join(", ")
197 ),
198 ));
199 }
200
201 Ok(())
202 }
203
204 fn generate_unique_snapshot_id(table: &Table) -> i64 {
205 let generate_random_id = || -> i64 {
206 let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
207 let snapshot_id = (lhs ^ rhs) as i64;
208 if snapshot_id < 0 {
209 -snapshot_id
210 } else {
211 snapshot_id
212 }
213 };
214 let mut snapshot_id = generate_random_id();
215
216 while table
217 .metadata()
218 .snapshots()
219 .any(|s| s.snapshot_id() == snapshot_id)
220 {
221 snapshot_id = generate_random_id();
222 }
223 snapshot_id
224 }
225
226 fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
227 let new_manifest_path = format!(
228 "{}/{}/{}-m{}.{}",
229 self.table.metadata().location(),
230 META_ROOT_PATH,
231 self.commit_uuid,
232 self.manifest_counter.next().unwrap(),
233 DataFileFormat::Avro
234 );
235 let output_file = self.table.file_io().new_output(new_manifest_path)?;
236 let builder = ManifestWriterBuilder::new(
237 output_file,
238 Some(self.snapshot_id),
239 self.key_metadata.clone(),
240 self.table.metadata().current_schema().clone(),
241 self.table
242 .metadata()
243 .default_partition_spec()
244 .as_ref()
245 .clone(),
246 );
247 match self.table.metadata().format_version() {
248 FormatVersion::V1 => Ok(builder.build_v1()),
249 FormatVersion::V2 => match content {
250 ManifestContentType::Data => Ok(builder.build_v2_data()),
251 ManifestContentType::Deletes => Ok(builder.build_v2_deletes()),
252 },
253 FormatVersion::V3 => match content {
254 ManifestContentType::Data => Ok(builder.build_v3_data()),
255 ManifestContentType::Deletes => Ok(builder.build_v3_deletes()),
256 },
257 }
258 }
259
260 fn validate_partition_value(
262 partition_value: &Struct,
263 partition_type: &StructType,
264 ) -> Result<()> {
265 if partition_value.fields().len() != partition_type.fields().len() {
266 return Err(Error::new(
267 ErrorKind::DataInvalid,
268 "Partition value is not compatible with partition type",
269 ));
270 }
271
272 for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) {
273 let field = field.field_type.as_primitive_type().ok_or_else(|| {
274 Error::new(
275 ErrorKind::Unexpected,
276 "Partition field should only be primitive type.",
277 )
278 })?;
279 if let Some(value) = value
280 && !field.compatible(&value.as_primitive_literal().unwrap())
281 {
282 return Err(Error::new(
283 ErrorKind::DataInvalid,
284 "Partition value is not compatible partition type",
285 ));
286 }
287 }
288 Ok(())
289 }
290
291 async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
293 let added_data_files = std::mem::take(&mut self.added_data_files);
294 if added_data_files.is_empty() {
295 return Err(Error::new(
296 ErrorKind::PreconditionFailed,
297 "No added data files found when write an added manifest file",
298 ));
299 }
300
301 let snapshot_id = self.snapshot_id;
302 let format_version = self.table.metadata().format_version();
303 let manifest_entries = added_data_files.into_iter().map(|data_file| {
304 let builder = ManifestEntry::builder()
305 .status(crate::spec::ManifestStatus::Added)
306 .data_file(data_file);
307 if format_version == FormatVersion::V1 {
308 builder.snapshot_id(snapshot_id).build()
309 } else {
310 builder.build()
313 }
314 });
315 let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
316 for entry in manifest_entries {
317 writer.add_entry(entry)?;
318 }
319 writer.write_manifest_file().await
320 }
321
322 async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
323 &mut self,
324 snapshot_produce_operation: &OP,
325 manifest_process: &MP,
326 ) -> Result<Vec<ManifestFile>> {
327 if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() {
333 return Err(Error::new(
334 ErrorKind::PreconditionFailed,
335 "No added data files or added snapshot properties found when write a manifest file",
336 ));
337 }
338
339 let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
340 let mut manifest_files = existing_manifests;
341
342 if !self.added_data_files.is_empty() {
344 let added_manifest = self.write_added_manifest().await?;
345 manifest_files.push(added_manifest);
346 }
347
348 let manifest_files = manifest_process.process_manifests(self, manifest_files);
352 Ok(manifest_files)
353 }
354
355 fn summary<OP: SnapshotProduceOperation>(
357 &self,
358 snapshot_produce_operation: &OP,
359 ) -> Result<Summary> {
360 let mut summary_collector = SnapshotSummaryCollector::default();
361 let table_metadata = self.table.metadata_ref();
362
363 let partition_summary_limit = if let Some(limit) = table_metadata
364 .properties()
365 .get(TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
366 {
367 if let Ok(limit) = limit.parse::<u64>() {
368 limit
369 } else {
370 TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
371 }
372 } else {
373 TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
374 };
375
376 summary_collector.set_partition_summary_limit(partition_summary_limit);
377
378 for data_file in &self.added_data_files {
379 summary_collector.add_file(
380 data_file,
381 table_metadata.current_schema().clone(),
382 table_metadata.default_partition_spec().clone(),
383 );
384 }
385
386 let previous_snapshot = table_metadata
387 .snapshot_by_id(self.snapshot_id)
388 .and_then(|snapshot| snapshot.parent_snapshot_id())
389 .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));
390
391 let mut additional_properties = summary_collector.build();
392 additional_properties.extend(self.snapshot_properties.clone());
393
394 let summary = Summary {
395 operation: snapshot_produce_operation.operation(),
396 additional_properties,
397 };
398
399 update_snapshot_summaries(
400 summary,
401 previous_snapshot.map(|s| s.summary()),
402 snapshot_produce_operation.operation() == Operation::Overwrite,
403 )
404 }
405
406 fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
407 format!(
408 "{}/{}/snap-{}-{}-{}.{}",
409 self.table.metadata().location(),
410 META_ROOT_PATH,
411 self.snapshot_id,
412 attempt,
413 self.commit_uuid,
414 DataFileFormat::Avro
415 )
416 }
417
418 pub(crate) async fn commit<OP: SnapshotProduceOperation, MP: ManifestProcess>(
420 mut self,
421 snapshot_produce_operation: OP,
422 process: MP,
423 ) -> Result<ActionCommit> {
424 let manifest_list_path = self.generate_manifest_list_file_path(0);
425 let next_seq_num = self.table.metadata().next_sequence_number();
426 let first_row_id = self.table.metadata().next_row_id();
427 let mut manifest_list_writer = match self.table.metadata().format_version() {
428 FormatVersion::V1 => ManifestListWriter::v1(
429 self.table
430 .file_io()
431 .new_output(manifest_list_path.clone())?,
432 self.snapshot_id,
433 self.table.metadata().current_snapshot_id(),
434 ),
435 FormatVersion::V2 => ManifestListWriter::v2(
436 self.table
437 .file_io()
438 .new_output(manifest_list_path.clone())?,
439 self.snapshot_id,
440 self.table.metadata().current_snapshot_id(),
441 next_seq_num,
442 ),
443 FormatVersion::V3 => ManifestListWriter::v3(
444 self.table
445 .file_io()
446 .new_output(manifest_list_path.clone())?,
447 self.snapshot_id,
448 self.table.metadata().current_snapshot_id(),
449 next_seq_num,
450 Some(first_row_id),
451 ),
452 };
453
454 let summary = self.summary(&snapshot_produce_operation).map_err(|err| {
458 Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err)
459 })?;
460
461 let new_manifests = self
462 .manifest_file(&snapshot_produce_operation, &process)
463 .await?;
464
465 manifest_list_writer.add_manifests(new_manifests.into_iter())?;
466 let writer_next_row_id = manifest_list_writer.next_row_id();
467 manifest_list_writer.close().await?;
468
469 let commit_ts = chrono::Utc::now().timestamp_millis();
470 let new_snapshot = Snapshot::builder()
471 .with_manifest_list(manifest_list_path)
472 .with_snapshot_id(self.snapshot_id)
473 .with_parent_snapshot_id(self.table.metadata().current_snapshot_id())
474 .with_sequence_number(next_seq_num)
475 .with_summary(summary)
476 .with_schema_id(self.table.metadata().current_schema_id())
477 .with_timestamp_ms(commit_ts);
478
479 let new_snapshot = if let Some(writer_next_row_id) = writer_next_row_id {
480 let assigned_rows = writer_next_row_id - self.table.metadata().next_row_id();
481 new_snapshot
482 .with_row_range(first_row_id, assigned_rows)
483 .build()
484 } else {
485 new_snapshot.build()
486 };
487
488 let updates = vec![
489 TableUpdate::AddSnapshot {
490 snapshot: new_snapshot,
491 },
492 TableUpdate::SetSnapshotRef {
493 ref_name: MAIN_BRANCH.to_string(),
494 reference: SnapshotReference::new(
495 self.snapshot_id,
496 SnapshotRetention::branch(None, None, None),
497 ),
498 },
499 ];
500
501 let requirements = vec![
502 TableRequirement::UuidMatch {
503 uuid: self.table.metadata().uuid(),
504 },
505 TableRequirement::RefSnapshotIdMatch {
506 r#ref: MAIN_BRANCH.to_string(),
507 snapshot_id: self.table.metadata().current_snapshot_id(),
508 },
509 ];
510
511 Ok(ActionCommit::new(updates, requirements))
512 }
513}