1use std::sync::Arc;
22
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use typed_builder::TypedBuilder;
26
27use super::transform::Transform;
28use super::{NestedField, Schema, SchemaRef, StructType};
29use crate::spec::Struct;
30use crate::{Error, ErrorKind, Result};
31
32pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
33pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
34
35#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
37#[serde(rename_all = "kebab-case")]
38pub struct PartitionField {
39 pub source_id: i32,
41 pub field_id: i32,
44 pub name: String,
46 pub transform: Transform,
48}
49
50impl PartitionField {
51 pub fn into_unbound(self) -> UnboundPartitionField {
53 self.into()
54 }
55}
56
57pub type PartitionSpecRef = Arc<PartitionSpec>;
59#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
66#[serde(rename_all = "kebab-case")]
67pub struct PartitionSpec {
68 spec_id: i32,
70 fields: Vec<PartitionField>,
72}
73
74impl PartitionSpec {
75 pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
77 PartitionSpecBuilder::new(schema)
78 }
79
80 pub fn fields(&self) -> &[PartitionField] {
82 &self.fields
83 }
84
85 pub fn spec_id(&self) -> i32 {
87 self.spec_id
88 }
89
90 pub fn unpartition_spec() -> Self {
92 Self {
93 spec_id: DEFAULT_PARTITION_SPEC_ID,
94 fields: vec![],
95 }
96 }
97
98 pub fn is_unpartitioned(&self) -> bool {
102 self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void)
103 }
104
105 pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
107 PartitionSpecBuilder::partition_type(&self.fields, schema)
108 }
109
110 pub fn into_unbound(self) -> UnboundPartitionSpec {
112 self.into()
113 }
114
115 pub fn with_spec_id(self, spec_id: i32) -> Self {
117 Self { spec_id, ..self }
118 }
119
120 pub fn has_sequential_ids(&self) -> bool {
124 has_sequential_ids(self.fields.iter().map(|f| f.field_id))
125 }
126
127 pub fn highest_field_id(&self) -> Option<i32> {
129 self.fields.iter().map(|f| f.field_id).max()
130 }
131
132 pub fn is_compatible_with(&self, other: &PartitionSpec) -> bool {
142 if self.fields.len() != other.fields.len() {
143 return false;
144 }
145
146 for (this_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
147 if this_field.source_id != other_field.source_id
148 || this_field.name != other_field.name
149 || this_field.transform != other_field.transform
150 {
151 return false;
152 }
153 }
154
155 true
156 }
157
158 pub fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
161 let partition_type = self.partition_type(&schema).unwrap();
162 let field_types = partition_type.fields();
163
164 self.fields
165 .iter()
166 .enumerate()
167 .map(|(i, field)| {
168 let value = data[i].as_ref();
169 format!(
170 "{}={}",
171 field.name,
172 field
173 .transform
174 .to_human_string(&field_types[i].field_type, value)
175 )
176 })
177 .join("/")
178 }
179}
180
181#[derive(Clone, Debug)]
184pub struct PartitionKey {
185 spec: PartitionSpec,
187 schema: SchemaRef,
189 data: Struct,
191}
192
193impl PartitionKey {
194 pub fn new(spec: PartitionSpec, schema: SchemaRef, data: Struct) -> Self {
196 Self { spec, schema, data }
197 }
198
199 pub fn copy_with_data(&self, data: Struct) -> Self {
201 Self {
202 spec: self.spec.clone(),
203 schema: self.schema.clone(),
204 data,
205 }
206 }
207
208 pub fn to_path(&self) -> String {
210 self.spec.partition_to_path(&self.data, self.schema.clone())
211 }
212
213 pub fn is_effectively_none(partition_key: Option<&PartitionKey>) -> bool {
216 match partition_key {
217 None => true,
218 Some(pk) => pk.spec.is_unpartitioned(),
219 }
220 }
221
222 pub fn spec(&self) -> &PartitionSpec {
224 &self.spec
225 }
226
227 pub fn schema(&self) -> &SchemaRef {
229 &self.schema
230 }
231
232 pub fn data(&self) -> &Struct {
234 &self.data
235 }
236}
237
238pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
240#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
242#[serde(rename_all = "kebab-case")]
243pub struct UnboundPartitionField {
244 pub source_id: i32,
246 #[builder(default, setter(strip_option(fallback = field_id_opt)))]
249 pub field_id: Option<i32>,
250 pub name: String,
252 pub transform: Transform,
254}
255
256#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
260#[serde(rename_all = "kebab-case")]
261pub struct UnboundPartitionSpec {
262 pub(crate) spec_id: Option<i32>,
264 pub(crate) fields: Vec<UnboundPartitionField>,
266}
267
268impl UnboundPartitionSpec {
269 pub fn builder() -> UnboundPartitionSpecBuilder {
271 UnboundPartitionSpecBuilder::default()
272 }
273
274 pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<PartitionSpec> {
276 PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
277 }
278
279 pub fn spec_id(&self) -> Option<i32> {
281 self.spec_id
282 }
283
284 pub fn fields(&self) -> &[UnboundPartitionField] {
286 &self.fields
287 }
288
289 pub fn with_spec_id(self, spec_id: i32) -> Self {
291 Self {
292 spec_id: Some(spec_id),
293 ..self
294 }
295 }
296}
297
298fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
299 for (index, field_id) in field_ids.enumerate() {
300 let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
301 .checked_add(1)
302 .and_then(|id| id.checked_add(index as i64))
303 .unwrap_or(i64::MAX);
304
305 if field_id as i64 != expected_id {
306 return false;
307 }
308 }
309
310 true
311}
312
313impl From<PartitionField> for UnboundPartitionField {
314 fn from(field: PartitionField) -> Self {
315 UnboundPartitionField {
316 source_id: field.source_id,
317 field_id: Some(field.field_id),
318 name: field.name,
319 transform: field.transform,
320 }
321 }
322}
323
324impl From<PartitionSpec> for UnboundPartitionSpec {
325 fn from(spec: PartitionSpec) -> Self {
326 UnboundPartitionSpec {
327 spec_id: Some(spec.spec_id),
328 fields: spec.fields.into_iter().map(Into::into).collect(),
329 }
330 }
331}
332
333#[derive(Debug, Default)]
335pub struct UnboundPartitionSpecBuilder {
336 spec_id: Option<i32>,
337 fields: Vec<UnboundPartitionField>,
338}
339
340impl UnboundPartitionSpecBuilder {
341 pub fn new() -> Self {
343 Self {
344 spec_id: None,
345 fields: vec![],
346 }
347 }
348
349 pub fn with_spec_id(mut self, spec_id: i32) -> Self {
351 self.spec_id = Some(spec_id);
352 self
353 }
354
355 pub fn add_partition_field(
357 self,
358 source_id: i32,
359 target_name: impl ToString,
360 transformation: Transform,
361 ) -> Result<Self> {
362 let field = UnboundPartitionField {
363 source_id,
364 field_id: None,
365 name: target_name.to_string(),
366 transform: transformation,
367 };
368 self.add_partition_field_internal(field)
369 }
370
371 pub fn add_partition_fields(
373 self,
374 fields: impl IntoIterator<Item = UnboundPartitionField>,
375 ) -> Result<Self> {
376 let mut builder = self;
377 for field in fields {
378 builder = builder.add_partition_field_internal(field)?;
379 }
380 Ok(builder)
381 }
382
383 fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
384 self.check_name_set_and_unique(&field.name)?;
385 self.check_for_redundant_partitions(field.source_id, &field.transform)?;
386 if let Some(partition_field_id) = field.field_id {
387 self.check_partition_id_unique(partition_field_id)?;
388 }
389 self.fields.push(field);
390 Ok(self)
391 }
392
393 pub fn build(self) -> UnboundPartitionSpec {
395 UnboundPartitionSpec {
396 spec_id: self.spec_id,
397 fields: self.fields,
398 }
399 }
400}
401
402#[derive(Debug)]
404pub struct PartitionSpecBuilder {
405 spec_id: Option<i32>,
406 last_assigned_field_id: i32,
407 fields: Vec<UnboundPartitionField>,
408 schema: SchemaRef,
409}
410
411impl PartitionSpecBuilder {
412 pub fn new(schema: impl Into<SchemaRef>) -> Self {
414 Self {
415 spec_id: None,
416 fields: vec![],
417 last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
418 schema: schema.into(),
419 }
420 }
421
422 pub fn new_from_unbound(
424 unbound: UnboundPartitionSpec,
425 schema: impl Into<SchemaRef>,
426 ) -> Result<Self> {
427 let mut builder =
428 Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
429
430 for field in unbound.fields {
431 builder = builder.add_unbound_field(field)?;
432 }
433 Ok(builder)
434 }
435
436 pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self {
442 self.last_assigned_field_id = last_assigned_field_id;
443 self
444 }
445
446 pub fn with_spec_id(mut self, spec_id: i32) -> Self {
448 self.spec_id = Some(spec_id);
449 self
450 }
451
452 pub fn add_partition_field(
454 self,
455 source_name: impl AsRef<str>,
456 target_name: impl Into<String>,
457 transform: Transform,
458 ) -> Result<Self> {
459 let source_id = self
460 .schema
461 .field_by_name(source_name.as_ref())
462 .ok_or_else(|| {
463 Error::new(
464 ErrorKind::DataInvalid,
465 format!(
466 "Cannot find source column with name: {} in schema",
467 source_name.as_ref()
468 ),
469 )
470 })?
471 .id;
472 let field = UnboundPartitionField {
473 source_id,
474 field_id: None,
475 name: target_name.into(),
476 transform,
477 };
478
479 self.add_unbound_field(field)
480 }
481
482 pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
487 self.check_name_set_and_unique(&field.name)?;
488 self.check_for_redundant_partitions(field.source_id, &field.transform)?;
489 Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
490 Self::check_transform_compatibility(&field, &self.schema)?;
491 if let Some(partition_field_id) = field.field_id {
492 self.check_partition_id_unique(partition_field_id)?;
493 }
494
495 self.fields.push(field);
497 Ok(self)
498 }
499
500 pub fn add_unbound_fields(
502 self,
503 fields: impl IntoIterator<Item = UnboundPartitionField>,
504 ) -> Result<Self> {
505 let mut builder = self;
506 for field in fields {
507 builder = builder.add_unbound_field(field)?;
508 }
509 Ok(builder)
510 }
511
512 pub fn build(self) -> Result<PartitionSpec> {
514 let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?;
515 Ok(PartitionSpec {
516 spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
517 fields,
518 })
519 }
520
521 fn set_field_ids(
522 fields: Vec<UnboundPartitionField>,
523 last_assigned_field_id: i32,
524 ) -> Result<Vec<PartitionField>> {
525 let mut last_assigned_field_id = last_assigned_field_id;
526 let assigned_ids = fields
529 .iter()
530 .filter_map(|f| f.field_id)
531 .collect::<std::collections::HashSet<_>>();
532
533 fn _check_add_1(prev: i32) -> Result<i32> {
534 prev.checked_add(1).ok_or_else(|| {
535 Error::new(
536 ErrorKind::DataInvalid,
537 "Cannot assign more partition ids. Overflow.",
538 )
539 })
540 }
541
542 let mut bound_fields = Vec::with_capacity(fields.len());
543 for field in fields.into_iter() {
544 let partition_field_id = if let Some(partition_field_id) = field.field_id {
545 last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
546 partition_field_id
547 } else {
548 last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
549 while assigned_ids.contains(&last_assigned_field_id) {
550 last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
551 }
552 last_assigned_field_id
553 };
554
555 bound_fields.push(PartitionField {
556 source_id: field.source_id,
557 field_id: partition_field_id,
558 name: field.name,
559 transform: field.transform,
560 })
561 }
562
563 Ok(bound_fields)
564 }
565
566 fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> Result<StructType> {
568 let mut struct_fields = Vec::with_capacity(fields.len());
569 for partition_field in fields {
570 let field = schema
571 .field_by_id(partition_field.source_id)
572 .ok_or_else(|| {
573 Error::new(
574 ErrorKind::Unexpected,
577 format!(
578 "No column with source column id {} in schema {:?}",
579 partition_field.source_id, schema
580 ),
581 )
582 })?;
583 let res_type = partition_field.transform.result_type(&field.field_type)?;
584 let field =
585 NestedField::optional(partition_field.field_id, &partition_field.name, res_type)
586 .into();
587 struct_fields.push(field);
588 }
589 Ok(StructType::new(struct_fields))
590 }
591
592 fn check_name_does_not_collide_with_schema(
597 field: &UnboundPartitionField,
598 schema: &Schema,
599 ) -> Result<()> {
600 match schema.field_by_name(field.name.as_str()) {
601 Some(schema_collision) => {
602 if field.transform == Transform::Identity {
603 if schema_collision.id == field.source_id {
604 Ok(())
605 } else {
606 Err(Error::new(
607 ErrorKind::DataInvalid,
608 format!(
609 "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`",
610 field.name, schema_collision.id, field.source_id
611 ),
612 ))
613 }
614 } else {
615 Err(Error::new(
616 ErrorKind::DataInvalid,
617 format!(
618 "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.",
619 field.name
620 ),
621 ))
622 }
623 }
624 None => Ok(()),
625 }
626 }
627
628 fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> {
631 let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
632 Error::new(
633 ErrorKind::DataInvalid,
634 format!(
635 "Cannot find partition source field with id `{}` in schema",
636 field.source_id
637 ),
638 )
639 })?;
640
641 if field.transform != Transform::Void {
642 if !schema_field.field_type.is_primitive() {
643 return Err(Error::new(
644 ErrorKind::DataInvalid,
645 format!(
646 "Cannot partition by non-primitive source field: '{}'.",
647 schema_field.field_type
648 ),
649 ));
650 }
651
652 if field
653 .transform
654 .result_type(&schema_field.field_type)
655 .is_err()
656 {
657 return Err(Error::new(
658 ErrorKind::DataInvalid,
659 format!(
660 "Invalid source type: '{}' for transform: '{}'.",
661 schema_field.field_type,
662 field.transform.dedup_name()
663 ),
664 ));
665 }
666 }
667
668 Ok(())
669 }
670}
671
672trait CorePartitionSpecValidator {
674 fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
676 if name.is_empty() {
677 return Err(Error::new(
678 ErrorKind::DataInvalid,
679 "Cannot use empty partition name",
680 ));
681 }
682
683 if self.fields().iter().any(|f| f.name == name) {
684 return Err(Error::new(
685 ErrorKind::DataInvalid,
686 format!("Cannot use partition name more than once: {name}"),
687 ));
688 }
689 Ok(())
690 }
691
692 fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
694 let collision = self.fields().iter().find(|f| {
695 f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
696 });
697
698 if let Some(collision) = collision {
699 Err(Error::new(
700 ErrorKind::DataInvalid,
701 format!(
702 "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
703 source_id,
704 transform.dedup_name(),
705 collision.name
706 ),
707 ))
708 } else {
709 Ok(())
710 }
711 }
712
713 fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
715 if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
716 return Err(Error::new(
717 ErrorKind::DataInvalid,
718 format!("Cannot use field id more than once in one PartitionSpec: {field_id}"),
719 ));
720 }
721
722 Ok(())
723 }
724
725 fn fields(&self) -> &Vec<UnboundPartitionField>;
726}
727
728impl CorePartitionSpecValidator for PartitionSpecBuilder {
729 fn fields(&self) -> &Vec<UnboundPartitionField> {
730 &self.fields
731 }
732}
733
734impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
735 fn fields(&self) -> &Vec<UnboundPartitionField> {
736 &self.fields
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use super::*;
743 use crate::spec::{Literal, PrimitiveType, Type};
744
745 #[test]
746 fn test_partition_spec() {
747 let spec = r#"
748 {
749 "spec-id": 1,
750 "fields": [ {
751 "source-id": 4,
752 "field-id": 1000,
753 "name": "ts_day",
754 "transform": "day"
755 }, {
756 "source-id": 1,
757 "field-id": 1001,
758 "name": "id_bucket",
759 "transform": "bucket[16]"
760 }, {
761 "source-id": 2,
762 "field-id": 1002,
763 "name": "id_truncate",
764 "transform": "truncate[4]"
765 } ]
766 }
767 "#;
768
769 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
770 assert_eq!(4, partition_spec.fields[0].source_id);
771 assert_eq!(1000, partition_spec.fields[0].field_id);
772 assert_eq!("ts_day", partition_spec.fields[0].name);
773 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
774
775 assert_eq!(1, partition_spec.fields[1].source_id);
776 assert_eq!(1001, partition_spec.fields[1].field_id);
777 assert_eq!("id_bucket", partition_spec.fields[1].name);
778 assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
779
780 assert_eq!(2, partition_spec.fields[2].source_id);
781 assert_eq!(1002, partition_spec.fields[2].field_id);
782 assert_eq!("id_truncate", partition_spec.fields[2].name);
783 assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
784 }
785
786 #[test]
787 fn test_is_unpartitioned() {
788 let schema = Schema::builder()
789 .with_fields(vec![
790 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
791 .into(),
792 NestedField::required(
793 2,
794 "name",
795 Type::Primitive(crate::spec::PrimitiveType::String),
796 )
797 .into(),
798 ])
799 .build()
800 .unwrap();
801 let partition_spec = PartitionSpec::builder(schema.clone())
802 .with_spec_id(1)
803 .build()
804 .unwrap();
805 assert!(
806 partition_spec.is_unpartitioned(),
807 "Empty partition spec should be unpartitioned"
808 );
809
810 let partition_spec = PartitionSpec::builder(schema.clone())
811 .add_unbound_fields(vec![
812 UnboundPartitionField::builder()
813 .source_id(1)
814 .name("id".to_string())
815 .transform(Transform::Identity)
816 .build(),
817 UnboundPartitionField::builder()
818 .source_id(2)
819 .name("name_string".to_string())
820 .transform(Transform::Void)
821 .build(),
822 ])
823 .unwrap()
824 .with_spec_id(1)
825 .build()
826 .unwrap();
827 assert!(
828 !partition_spec.is_unpartitioned(),
829 "Partition spec with one non void transform should not be unpartitioned"
830 );
831
832 let partition_spec = PartitionSpec::builder(schema.clone())
833 .with_spec_id(1)
834 .add_unbound_fields(vec![
835 UnboundPartitionField::builder()
836 .source_id(1)
837 .name("id_void".to_string())
838 .transform(Transform::Void)
839 .build(),
840 UnboundPartitionField::builder()
841 .source_id(2)
842 .name("name_void".to_string())
843 .transform(Transform::Void)
844 .build(),
845 ])
846 .unwrap()
847 .build()
848 .unwrap();
849 assert!(
850 partition_spec.is_unpartitioned(),
851 "Partition spec with all void field should be unpartitioned"
852 );
853 }
854
855 #[test]
856 fn test_unbound_partition_spec() {
857 let spec = r#"
858 {
859 "spec-id": 1,
860 "fields": [ {
861 "source-id": 4,
862 "field-id": 1000,
863 "name": "ts_day",
864 "transform": "day"
865 }, {
866 "source-id": 1,
867 "field-id": 1001,
868 "name": "id_bucket",
869 "transform": "bucket[16]"
870 }, {
871 "source-id": 2,
872 "field-id": 1002,
873 "name": "id_truncate",
874 "transform": "truncate[4]"
875 } ]
876 }
877 "#;
878
879 let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
880 assert_eq!(Some(1), partition_spec.spec_id);
881
882 assert_eq!(4, partition_spec.fields[0].source_id);
883 assert_eq!(Some(1000), partition_spec.fields[0].field_id);
884 assert_eq!("ts_day", partition_spec.fields[0].name);
885 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
886
887 assert_eq!(1, partition_spec.fields[1].source_id);
888 assert_eq!(Some(1001), partition_spec.fields[1].field_id);
889 assert_eq!("id_bucket", partition_spec.fields[1].name);
890 assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
891
892 assert_eq!(2, partition_spec.fields[2].source_id);
893 assert_eq!(Some(1002), partition_spec.fields[2].field_id);
894 assert_eq!("id_truncate", partition_spec.fields[2].name);
895 assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
896
897 let spec = r#"
898 {
899 "fields": [ {
900 "source-id": 4,
901 "name": "ts_day",
902 "transform": "day"
903 } ]
904 }
905 "#;
906 let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
907 assert_eq!(None, partition_spec.spec_id);
908
909 assert_eq!(4, partition_spec.fields[0].source_id);
910 assert_eq!(None, partition_spec.fields[0].field_id);
911 assert_eq!("ts_day", partition_spec.fields[0].name);
912 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
913 }
914
915 #[test]
916 fn test_new_unpartition() {
917 let schema = Schema::builder()
918 .with_fields(vec![
919 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
920 .into(),
921 NestedField::required(
922 2,
923 "name",
924 Type::Primitive(crate::spec::PrimitiveType::String),
925 )
926 .into(),
927 ])
928 .build()
929 .unwrap();
930 let partition_spec = PartitionSpec::builder(schema.clone())
931 .with_spec_id(0)
932 .build()
933 .unwrap();
934 let partition_type = partition_spec.partition_type(&schema).unwrap();
935 assert_eq!(0, partition_type.fields().len());
936
937 let unpartition_spec = PartitionSpec::unpartition_spec();
938 assert_eq!(partition_spec, unpartition_spec);
939 }
940
941 #[test]
942 fn test_partition_type() {
943 let spec = r#"
944 {
945 "spec-id": 1,
946 "fields": [ {
947 "source-id": 4,
948 "field-id": 1000,
949 "name": "ts_day",
950 "transform": "day"
951 }, {
952 "source-id": 1,
953 "field-id": 1001,
954 "name": "id_bucket",
955 "transform": "bucket[16]"
956 }, {
957 "source-id": 2,
958 "field-id": 1002,
959 "name": "id_truncate",
960 "transform": "truncate[4]"
961 } ]
962 }
963 "#;
964
965 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
966 let schema = Schema::builder()
967 .with_fields(vec![
968 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
969 .into(),
970 NestedField::required(
971 2,
972 "name",
973 Type::Primitive(crate::spec::PrimitiveType::String),
974 )
975 .into(),
976 NestedField::required(
977 3,
978 "ts",
979 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
980 )
981 .into(),
982 NestedField::required(
983 4,
984 "ts_day",
985 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
986 )
987 .into(),
988 NestedField::required(
989 5,
990 "id_bucket",
991 Type::Primitive(crate::spec::PrimitiveType::Int),
992 )
993 .into(),
994 NestedField::required(
995 6,
996 "id_truncate",
997 Type::Primitive(crate::spec::PrimitiveType::Int),
998 )
999 .into(),
1000 ])
1001 .build()
1002 .unwrap();
1003
1004 let partition_type = partition_spec.partition_type(&schema).unwrap();
1005 assert_eq!(3, partition_type.fields().len());
1006 assert_eq!(
1007 *partition_type.fields()[0],
1008 NestedField::optional(
1009 partition_spec.fields[0].field_id,
1010 &partition_spec.fields[0].name,
1011 Type::Primitive(crate::spec::PrimitiveType::Date)
1012 )
1013 );
1014 assert_eq!(
1015 *partition_type.fields()[1],
1016 NestedField::optional(
1017 partition_spec.fields[1].field_id,
1018 &partition_spec.fields[1].name,
1019 Type::Primitive(crate::spec::PrimitiveType::Int)
1020 )
1021 );
1022 assert_eq!(
1023 *partition_type.fields()[2],
1024 NestedField::optional(
1025 partition_spec.fields[2].field_id,
1026 &partition_spec.fields[2].name,
1027 Type::Primitive(crate::spec::PrimitiveType::String)
1028 )
1029 );
1030 }
1031
1032 #[test]
1033 fn test_partition_empty() {
1034 let spec = r#"
1035 {
1036 "spec-id": 1,
1037 "fields": []
1038 }
1039 "#;
1040
1041 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1042 let schema = Schema::builder()
1043 .with_fields(vec![
1044 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1045 .into(),
1046 NestedField::required(
1047 2,
1048 "name",
1049 Type::Primitive(crate::spec::PrimitiveType::String),
1050 )
1051 .into(),
1052 NestedField::required(
1053 3,
1054 "ts",
1055 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1056 )
1057 .into(),
1058 NestedField::required(
1059 4,
1060 "ts_day",
1061 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1062 )
1063 .into(),
1064 NestedField::required(
1065 5,
1066 "id_bucket",
1067 Type::Primitive(crate::spec::PrimitiveType::Int),
1068 )
1069 .into(),
1070 NestedField::required(
1071 6,
1072 "id_truncate",
1073 Type::Primitive(crate::spec::PrimitiveType::Int),
1074 )
1075 .into(),
1076 ])
1077 .build()
1078 .unwrap();
1079
1080 let partition_type = partition_spec.partition_type(&schema).unwrap();
1081 assert_eq!(0, partition_type.fields().len());
1082 }
1083
1084 #[test]
1085 fn test_partition_error() {
1086 let spec = r#"
1087 {
1088 "spec-id": 1,
1089 "fields": [ {
1090 "source-id": 4,
1091 "field-id": 1000,
1092 "name": "ts_day",
1093 "transform": "day"
1094 }, {
1095 "source-id": 1,
1096 "field-id": 1001,
1097 "name": "id_bucket",
1098 "transform": "bucket[16]"
1099 }, {
1100 "source-id": 2,
1101 "field-id": 1002,
1102 "name": "id_truncate",
1103 "transform": "truncate[4]"
1104 } ]
1105 }
1106 "#;
1107
1108 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1109 let schema = Schema::builder()
1110 .with_fields(vec![
1111 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1112 .into(),
1113 NestedField::required(
1114 2,
1115 "name",
1116 Type::Primitive(crate::spec::PrimitiveType::String),
1117 )
1118 .into(),
1119 ])
1120 .build()
1121 .unwrap();
1122
1123 assert!(partition_spec.partition_type(&schema).is_err());
1124 }
1125
1126 #[test]
1127 fn test_builder_disallow_duplicate_names() {
1128 UnboundPartitionSpec::builder()
1129 .add_partition_field(1, "ts_day".to_string(), Transform::Day)
1130 .unwrap()
1131 .add_partition_field(2, "ts_day".to_string(), Transform::Day)
1132 .unwrap_err();
1133 }
1134
1135 #[test]
1136 fn test_builder_disallow_duplicate_field_ids() {
1137 let schema = Schema::builder()
1138 .with_fields(vec![
1139 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1140 .into(),
1141 NestedField::required(
1142 2,
1143 "name",
1144 Type::Primitive(crate::spec::PrimitiveType::String),
1145 )
1146 .into(),
1147 ])
1148 .build()
1149 .unwrap();
1150 PartitionSpec::builder(schema.clone())
1151 .add_unbound_field(UnboundPartitionField {
1152 source_id: 1,
1153 field_id: Some(1000),
1154 name: "id".to_string(),
1155 transform: Transform::Identity,
1156 })
1157 .unwrap()
1158 .add_unbound_field(UnboundPartitionField {
1159 source_id: 2,
1160 field_id: Some(1000),
1161 name: "id_bucket".to_string(),
1162 transform: Transform::Bucket(16),
1163 })
1164 .unwrap_err();
1165 }
1166
1167 #[test]
1168 fn test_builder_auto_assign_field_ids() {
1169 let schema = Schema::builder()
1170 .with_fields(vec![
1171 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1172 .into(),
1173 NestedField::required(
1174 2,
1175 "name",
1176 Type::Primitive(crate::spec::PrimitiveType::String),
1177 )
1178 .into(),
1179 NestedField::required(
1180 3,
1181 "ts",
1182 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1183 )
1184 .into(),
1185 ])
1186 .build()
1187 .unwrap();
1188 let spec = PartitionSpec::builder(schema.clone())
1189 .with_spec_id(1)
1190 .add_unbound_field(UnboundPartitionField {
1191 source_id: 1,
1192 name: "id".to_string(),
1193 transform: Transform::Identity,
1194 field_id: Some(1012),
1195 })
1196 .unwrap()
1197 .add_unbound_field(UnboundPartitionField {
1198 source_id: 2,
1199 name: "name_void".to_string(),
1200 transform: Transform::Void,
1201 field_id: None,
1202 })
1203 .unwrap()
1204 .add_unbound_field(UnboundPartitionField {
1206 source_id: 3,
1207 name: "year".to_string(),
1208 transform: Transform::Year,
1209 field_id: Some(1),
1210 })
1211 .unwrap()
1212 .build()
1213 .unwrap();
1214
1215 assert_eq!(1012, spec.fields[0].field_id);
1216 assert_eq!(1013, spec.fields[1].field_id);
1217 assert_eq!(1, spec.fields[2].field_id);
1218 }
1219
1220 #[test]
1221 fn test_builder_valid_schema() {
1222 let schema = Schema::builder()
1223 .with_fields(vec![
1224 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1225 .into(),
1226 NestedField::required(
1227 2,
1228 "name",
1229 Type::Primitive(crate::spec::PrimitiveType::String),
1230 )
1231 .into(),
1232 ])
1233 .build()
1234 .unwrap();
1235
1236 PartitionSpec::builder(schema.clone())
1237 .with_spec_id(1)
1238 .build()
1239 .unwrap();
1240
1241 let spec = PartitionSpec::builder(schema.clone())
1242 .with_spec_id(1)
1243 .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
1244 .unwrap()
1245 .build()
1246 .unwrap();
1247
1248 assert_eq!(spec, PartitionSpec {
1249 spec_id: 1,
1250 fields: vec![PartitionField {
1251 source_id: 1,
1252 field_id: 1000,
1253 name: "id_bucket[16]".to_string(),
1254 transform: Transform::Bucket(16),
1255 }],
1256 });
1257 assert_eq!(
1258 spec.partition_type(&schema).unwrap(),
1259 StructType::new(vec![
1260 NestedField::optional(1000, "id_bucket[16]", Type::Primitive(PrimitiveType::Int))
1261 .into()
1262 ])
1263 )
1264 }
1265
1266 #[test]
1267 fn test_collision_with_schema_name() {
1268 let schema = Schema::builder()
1269 .with_fields(vec![
1270 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1271 .into(),
1272 ])
1273 .build()
1274 .unwrap();
1275
1276 PartitionSpec::builder(schema.clone())
1277 .with_spec_id(1)
1278 .build()
1279 .unwrap();
1280
1281 let err = PartitionSpec::builder(schema)
1282 .with_spec_id(1)
1283 .add_unbound_field(UnboundPartitionField {
1284 source_id: 1,
1285 field_id: None,
1286 name: "id".to_string(),
1287 transform: Transform::Bucket(16),
1288 })
1289 .unwrap_err();
1290 assert!(err.message().contains("conflicts with schema"))
1291 }
1292
1293 #[test]
1294 fn test_builder_collision_is_ok_for_identity_transforms() {
1295 let schema = Schema::builder()
1296 .with_fields(vec![
1297 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1298 .into(),
1299 NestedField::required(
1300 2,
1301 "number",
1302 Type::Primitive(crate::spec::PrimitiveType::Int),
1303 )
1304 .into(),
1305 ])
1306 .build()
1307 .unwrap();
1308
1309 PartitionSpec::builder(schema.clone())
1310 .with_spec_id(1)
1311 .build()
1312 .unwrap();
1313
1314 PartitionSpec::builder(schema.clone())
1315 .with_spec_id(1)
1316 .add_unbound_field(UnboundPartitionField {
1317 source_id: 1,
1318 field_id: None,
1319 name: "id".to_string(),
1320 transform: Transform::Identity,
1321 })
1322 .unwrap()
1323 .build()
1324 .unwrap();
1325
1326 PartitionSpec::builder(schema)
1328 .with_spec_id(1)
1329 .add_unbound_field(UnboundPartitionField {
1330 source_id: 2,
1331 field_id: None,
1332 name: "id".to_string(),
1333 transform: Transform::Identity,
1334 })
1335 .unwrap_err();
1336 }
1337
1338 #[test]
1339 fn test_builder_all_source_ids_must_exist() {
1340 let schema = Schema::builder()
1341 .with_fields(vec![
1342 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1343 .into(),
1344 NestedField::required(
1345 2,
1346 "name",
1347 Type::Primitive(crate::spec::PrimitiveType::String),
1348 )
1349 .into(),
1350 NestedField::required(
1351 3,
1352 "ts",
1353 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1354 )
1355 .into(),
1356 ])
1357 .build()
1358 .unwrap();
1359
1360 PartitionSpec::builder(schema.clone())
1362 .with_spec_id(1)
1363 .add_unbound_fields(vec![
1364 UnboundPartitionField {
1365 source_id: 1,
1366 field_id: None,
1367 name: "id_bucket".to_string(),
1368 transform: Transform::Bucket(16),
1369 },
1370 UnboundPartitionField {
1371 source_id: 2,
1372 field_id: None,
1373 name: "name".to_string(),
1374 transform: Transform::Identity,
1375 },
1376 ])
1377 .unwrap()
1378 .build()
1379 .unwrap();
1380
1381 PartitionSpec::builder(schema)
1383 .with_spec_id(1)
1384 .add_unbound_fields(vec![
1385 UnboundPartitionField {
1386 source_id: 1,
1387 field_id: None,
1388 name: "id_bucket".to_string(),
1389 transform: Transform::Bucket(16),
1390 },
1391 UnboundPartitionField {
1392 source_id: 4,
1393 field_id: None,
1394 name: "name".to_string(),
1395 transform: Transform::Identity,
1396 },
1397 ])
1398 .unwrap_err();
1399 }
1400
1401 #[test]
1402 fn test_builder_disallows_redundant() {
1403 let err = UnboundPartitionSpec::builder()
1404 .with_spec_id(1)
1405 .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16))
1406 .unwrap()
1407 .add_partition_field(
1408 1,
1409 "id_bucket_with_other_name".to_string(),
1410 Transform::Bucket(16),
1411 )
1412 .unwrap_err();
1413 assert!(err.message().contains("redundant partition"));
1414 }
1415
1416 #[test]
1417 fn test_builder_incompatible_transforms_disallowed() {
1418 let schema = Schema::builder()
1419 .with_fields(vec![
1420 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1421 .into(),
1422 ])
1423 .build()
1424 .unwrap();
1425
1426 PartitionSpec::builder(schema)
1427 .with_spec_id(1)
1428 .add_unbound_field(UnboundPartitionField {
1429 source_id: 1,
1430 field_id: None,
1431 name: "id_year".to_string(),
1432 transform: Transform::Year,
1433 })
1434 .unwrap_err();
1435 }
1436
1437 #[test]
1438 fn test_build_unbound_specs_without_partition_id() {
1439 let spec = UnboundPartitionSpec::builder()
1440 .with_spec_id(1)
1441 .add_partition_fields(vec![UnboundPartitionField {
1442 source_id: 1,
1443 field_id: None,
1444 name: "id_bucket[16]".to_string(),
1445 transform: Transform::Bucket(16),
1446 }])
1447 .unwrap()
1448 .build();
1449
1450 assert_eq!(spec, UnboundPartitionSpec {
1451 spec_id: Some(1),
1452 fields: vec![UnboundPartitionField {
1453 source_id: 1,
1454 field_id: None,
1455 name: "id_bucket[16]".to_string(),
1456 transform: Transform::Bucket(16),
1457 }]
1458 });
1459 }
1460
1461 #[test]
1462 fn test_is_compatible_with() {
1463 let schema = Schema::builder()
1464 .with_fields(vec![
1465 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1466 .into(),
1467 NestedField::required(
1468 2,
1469 "name",
1470 Type::Primitive(crate::spec::PrimitiveType::String),
1471 )
1472 .into(),
1473 ])
1474 .build()
1475 .unwrap();
1476
1477 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1478 .with_spec_id(1)
1479 .add_unbound_field(UnboundPartitionField {
1480 source_id: 1,
1481 field_id: None,
1482 name: "id_bucket".to_string(),
1483 transform: Transform::Bucket(16),
1484 })
1485 .unwrap()
1486 .build()
1487 .unwrap();
1488
1489 let partition_spec_2 = PartitionSpec::builder(schema)
1490 .with_spec_id(1)
1491 .add_unbound_field(UnboundPartitionField {
1492 source_id: 1,
1493 field_id: None,
1494 name: "id_bucket".to_string(),
1495 transform: Transform::Bucket(16),
1496 })
1497 .unwrap()
1498 .build()
1499 .unwrap();
1500
1501 assert!(partition_spec_1.is_compatible_with(&partition_spec_2));
1502 }
1503
1504 #[test]
1505 fn test_not_compatible_with_transform_different() {
1506 let schema = Schema::builder()
1507 .with_fields(vec![
1508 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1509 .into(),
1510 ])
1511 .build()
1512 .unwrap();
1513
1514 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1515 .with_spec_id(1)
1516 .add_unbound_field(UnboundPartitionField {
1517 source_id: 1,
1518 field_id: None,
1519 name: "id_bucket".to_string(),
1520 transform: Transform::Bucket(16),
1521 })
1522 .unwrap()
1523 .build()
1524 .unwrap();
1525
1526 let partition_spec_2 = PartitionSpec::builder(schema)
1527 .with_spec_id(1)
1528 .add_unbound_field(UnboundPartitionField {
1529 source_id: 1,
1530 field_id: None,
1531 name: "id_bucket".to_string(),
1532 transform: Transform::Bucket(32),
1533 })
1534 .unwrap()
1535 .build()
1536 .unwrap();
1537
1538 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1539 }
1540
1541 #[test]
1542 fn test_not_compatible_with_source_id_different() {
1543 let schema = Schema::builder()
1544 .with_fields(vec![
1545 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1546 .into(),
1547 NestedField::required(
1548 2,
1549 "name",
1550 Type::Primitive(crate::spec::PrimitiveType::String),
1551 )
1552 .into(),
1553 ])
1554 .build()
1555 .unwrap();
1556
1557 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1558 .with_spec_id(1)
1559 .add_unbound_field(UnboundPartitionField {
1560 source_id: 1,
1561 field_id: None,
1562 name: "id_bucket".to_string(),
1563 transform: Transform::Bucket(16),
1564 })
1565 .unwrap()
1566 .build()
1567 .unwrap();
1568
1569 let partition_spec_2 = PartitionSpec::builder(schema)
1570 .with_spec_id(1)
1571 .add_unbound_field(UnboundPartitionField {
1572 source_id: 2,
1573 field_id: None,
1574 name: "id_bucket".to_string(),
1575 transform: Transform::Bucket(16),
1576 })
1577 .unwrap()
1578 .build()
1579 .unwrap();
1580
1581 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1582 }
1583
1584 #[test]
1585 fn test_not_compatible_with_order_different() {
1586 let schema = Schema::builder()
1587 .with_fields(vec![
1588 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1589 .into(),
1590 NestedField::required(
1591 2,
1592 "name",
1593 Type::Primitive(crate::spec::PrimitiveType::String),
1594 )
1595 .into(),
1596 ])
1597 .build()
1598 .unwrap();
1599
1600 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1601 .with_spec_id(1)
1602 .add_unbound_field(UnboundPartitionField {
1603 source_id: 1,
1604 field_id: None,
1605 name: "id_bucket".to_string(),
1606 transform: Transform::Bucket(16),
1607 })
1608 .unwrap()
1609 .add_unbound_field(UnboundPartitionField {
1610 source_id: 2,
1611 field_id: None,
1612 name: "name".to_string(),
1613 transform: Transform::Identity,
1614 })
1615 .unwrap()
1616 .build()
1617 .unwrap();
1618
1619 let partition_spec_2 = PartitionSpec::builder(schema)
1620 .with_spec_id(1)
1621 .add_unbound_field(UnboundPartitionField {
1622 source_id: 2,
1623 field_id: None,
1624 name: "name".to_string(),
1625 transform: Transform::Identity,
1626 })
1627 .unwrap()
1628 .add_unbound_field(UnboundPartitionField {
1629 source_id: 1,
1630 field_id: None,
1631 name: "id_bucket".to_string(),
1632 transform: Transform::Bucket(16),
1633 })
1634 .unwrap()
1635 .build()
1636 .unwrap();
1637
1638 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1639 }
1640
1641 #[test]
1642 fn test_highest_field_id_unpartitioned() {
1643 let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
1644 .with_spec_id(1)
1645 .build()
1646 .unwrap();
1647
1648 assert!(spec.highest_field_id().is_none());
1649 }
1650
1651 #[test]
1652 fn test_highest_field_id() {
1653 let schema = Schema::builder()
1654 .with_fields(vec![
1655 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1656 .into(),
1657 NestedField::required(
1658 2,
1659 "name",
1660 Type::Primitive(crate::spec::PrimitiveType::String),
1661 )
1662 .into(),
1663 ])
1664 .build()
1665 .unwrap();
1666
1667 let spec = PartitionSpec::builder(schema)
1668 .with_spec_id(1)
1669 .add_unbound_field(UnboundPartitionField {
1670 source_id: 1,
1671 field_id: Some(1001),
1672 name: "id".to_string(),
1673 transform: Transform::Identity,
1674 })
1675 .unwrap()
1676 .add_unbound_field(UnboundPartitionField {
1677 source_id: 2,
1678 field_id: Some(1000),
1679 name: "name".to_string(),
1680 transform: Transform::Identity,
1681 })
1682 .unwrap()
1683 .build()
1684 .unwrap();
1685
1686 assert_eq!(Some(1001), spec.highest_field_id());
1687 }
1688
1689 #[test]
1690 fn test_has_sequential_ids() {
1691 let schema = Schema::builder()
1692 .with_fields(vec![
1693 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1694 .into(),
1695 NestedField::required(
1696 2,
1697 "name",
1698 Type::Primitive(crate::spec::PrimitiveType::String),
1699 )
1700 .into(),
1701 ])
1702 .build()
1703 .unwrap();
1704
1705 let spec = PartitionSpec::builder(schema)
1706 .with_spec_id(1)
1707 .add_unbound_field(UnboundPartitionField {
1708 source_id: 1,
1709 field_id: Some(1000),
1710 name: "id".to_string(),
1711 transform: Transform::Identity,
1712 })
1713 .unwrap()
1714 .add_unbound_field(UnboundPartitionField {
1715 source_id: 2,
1716 field_id: Some(1001),
1717 name: "name".to_string(),
1718 transform: Transform::Identity,
1719 })
1720 .unwrap()
1721 .build()
1722 .unwrap();
1723
1724 assert_eq!(1000, spec.fields[0].field_id);
1725 assert_eq!(1001, spec.fields[1].field_id);
1726 assert!(spec.has_sequential_ids());
1727 }
1728
1729 #[test]
1730 fn test_sequential_ids_must_start_at_1000() {
1731 let schema = Schema::builder()
1732 .with_fields(vec![
1733 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1734 .into(),
1735 NestedField::required(
1736 2,
1737 "name",
1738 Type::Primitive(crate::spec::PrimitiveType::String),
1739 )
1740 .into(),
1741 ])
1742 .build()
1743 .unwrap();
1744
1745 let spec = PartitionSpec::builder(schema)
1746 .with_spec_id(1)
1747 .add_unbound_field(UnboundPartitionField {
1748 source_id: 1,
1749 field_id: Some(999),
1750 name: "id".to_string(),
1751 transform: Transform::Identity,
1752 })
1753 .unwrap()
1754 .add_unbound_field(UnboundPartitionField {
1755 source_id: 2,
1756 field_id: Some(1000),
1757 name: "name".to_string(),
1758 transform: Transform::Identity,
1759 })
1760 .unwrap()
1761 .build()
1762 .unwrap();
1763
1764 assert_eq!(999, spec.fields[0].field_id);
1765 assert_eq!(1000, spec.fields[1].field_id);
1766 assert!(!spec.has_sequential_ids());
1767 }
1768
1769 #[test]
1770 fn test_sequential_ids_must_have_no_gaps() {
1771 let schema = Schema::builder()
1772 .with_fields(vec![
1773 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1774 .into(),
1775 NestedField::required(
1776 2,
1777 "name",
1778 Type::Primitive(crate::spec::PrimitiveType::String),
1779 )
1780 .into(),
1781 ])
1782 .build()
1783 .unwrap();
1784
1785 let spec = PartitionSpec::builder(schema)
1786 .with_spec_id(1)
1787 .add_unbound_field(UnboundPartitionField {
1788 source_id: 1,
1789 field_id: Some(1000),
1790 name: "id".to_string(),
1791 transform: Transform::Identity,
1792 })
1793 .unwrap()
1794 .add_unbound_field(UnboundPartitionField {
1795 source_id: 2,
1796 field_id: Some(1002),
1797 name: "name".to_string(),
1798 transform: Transform::Identity,
1799 })
1800 .unwrap()
1801 .build()
1802 .unwrap();
1803
1804 assert_eq!(1000, spec.fields[0].field_id);
1805 assert_eq!(1002, spec.fields[1].field_id);
1806 assert!(!spec.has_sequential_ids());
1807 }
1808
1809 #[test]
1810 fn test_partition_to_path() {
1811 let schema = Schema::builder()
1812 .with_fields(vec![
1813 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1814 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1815 NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
1816 .into(),
1817 NestedField::required(4, "empty", Type::Primitive(PrimitiveType::String)).into(),
1818 ])
1819 .build()
1820 .unwrap();
1821
1822 let spec = PartitionSpec::builder(schema.clone())
1823 .add_partition_field("id", "id", Transform::Identity)
1824 .unwrap()
1825 .add_partition_field("name", "name", Transform::Identity)
1826 .unwrap()
1827 .add_partition_field("timestamp", "ts_hour", Transform::Hour)
1828 .unwrap()
1829 .add_partition_field("empty", "empty_void", Transform::Void)
1830 .unwrap()
1831 .build()
1832 .unwrap();
1833
1834 let data = Struct::from_iter([
1835 Some(Literal::int(42)),
1836 Some(Literal::string("alice")),
1837 Some(Literal::int(1000)),
1838 Some(Literal::string("empty")),
1839 ]);
1840
1841 assert_eq!(
1842 spec.partition_to_path(&data, schema.into()),
1843 "id=42/name=alice/ts_hour=1000/empty_void=null"
1844 );
1845 }
1846}