1use std::sync::Arc;
22
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use typed_builder::TypedBuilder;
26
27use super::transform::Transform;
28use super::{NestedField, Schema, SchemaRef, StructType};
29use crate::spec::Struct;
30use crate::{Error, ErrorKind, Result};
31
32pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
33pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
34
35#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
37#[serde(rename_all = "kebab-case")]
38pub struct PartitionField {
39 pub source_id: i32,
41 pub field_id: i32,
44 pub name: String,
46 pub transform: Transform,
48}
49
50impl PartitionField {
51 pub fn into_unbound(self) -> UnboundPartitionField {
53 self.into()
54 }
55}
56
57pub type PartitionSpecRef = Arc<PartitionSpec>;
59#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
66#[serde(rename_all = "kebab-case")]
67pub struct PartitionSpec {
68 spec_id: i32,
70 fields: Vec<PartitionField>,
72}
73
74impl PartitionSpec {
75 pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
77 PartitionSpecBuilder::new(schema)
78 }
79
80 pub fn fields(&self) -> &[PartitionField] {
82 &self.fields
83 }
84
85 pub fn spec_id(&self) -> i32 {
87 self.spec_id
88 }
89
90 pub fn unpartition_spec() -> Self {
92 Self {
93 spec_id: DEFAULT_PARTITION_SPEC_ID,
94 fields: vec![],
95 }
96 }
97
98 pub fn is_unpartitioned(&self) -> bool {
102 self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void)
103 }
104
105 pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
107 PartitionSpecBuilder::partition_type(&self.fields, schema)
108 }
109
110 pub fn into_unbound(self) -> UnboundPartitionSpec {
112 self.into()
113 }
114
115 pub fn with_spec_id(self, spec_id: i32) -> Self {
117 Self { spec_id, ..self }
118 }
119
120 pub fn has_sequential_ids(&self) -> bool {
124 has_sequential_ids(self.fields.iter().map(|f| f.field_id))
125 }
126
127 pub fn highest_field_id(&self) -> Option<i32> {
129 self.fields.iter().map(|f| f.field_id).max()
130 }
131
132 pub fn is_compatible_with(&self, other: &PartitionSpec) -> bool {
142 if self.fields.len() != other.fields.len() {
143 return false;
144 }
145
146 for (this_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
147 if this_field.source_id != other_field.source_id
148 || this_field.name != other_field.name
149 || this_field.transform != other_field.transform
150 {
151 return false;
152 }
153 }
154
155 true
156 }
157
158 pub fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
161 let partition_type = self.partition_type(&schema).unwrap();
162 let field_types = partition_type.fields();
163
164 self.fields
165 .iter()
166 .enumerate()
167 .map(|(i, field)| {
168 let value = data[i].as_ref();
169 format!(
170 "{}={}",
171 field.name,
172 field
173 .transform
174 .to_human_string(&field_types[i].field_type, value)
175 )
176 })
177 .join("/")
178 }
179}
180
181#[derive(Clone, Debug)]
184pub struct PartitionKey {
185 spec: PartitionSpec,
187 schema: SchemaRef,
189 data: Struct,
191}
192
193impl PartitionKey {
194 pub fn new(spec: PartitionSpec, schema: SchemaRef, data: Struct) -> Self {
196 Self { spec, schema, data }
197 }
198
199 pub fn copy_with_data(&self, data: Struct) -> Self {
201 Self {
202 spec: self.spec.clone(),
203 schema: self.schema.clone(),
204 data,
205 }
206 }
207
208 pub fn to_path(&self) -> String {
210 self.spec.partition_to_path(&self.data, self.schema.clone())
211 }
212
213 pub fn is_effectively_none(partition_key: Option<&PartitionKey>) -> bool {
216 match partition_key {
217 None => true,
218 Some(pk) => pk.spec.is_unpartitioned(),
219 }
220 }
221
222 pub fn spec(&self) -> &PartitionSpec {
224 &self.spec
225 }
226
227 pub fn schema(&self) -> &SchemaRef {
229 &self.schema
230 }
231
232 pub fn data(&self) -> &Struct {
234 &self.data
235 }
236}
237
238pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
240#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
242#[serde(rename_all = "kebab-case")]
243pub struct UnboundPartitionField {
244 pub source_id: i32,
246 #[builder(default, setter(strip_option(fallback = field_id_opt)))]
249 #[serde(skip_serializing_if = "Option::is_none")]
250 pub field_id: Option<i32>,
251 pub name: String,
253 pub transform: Transform,
255}
256
257#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
261#[serde(rename_all = "kebab-case")]
262pub struct UnboundPartitionSpec {
263 #[serde(skip_serializing_if = "Option::is_none")]
265 pub(crate) spec_id: Option<i32>,
266 pub(crate) fields: Vec<UnboundPartitionField>,
268}
269
270impl UnboundPartitionSpec {
271 pub fn builder() -> UnboundPartitionSpecBuilder {
273 UnboundPartitionSpecBuilder::default()
274 }
275
276 pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<PartitionSpec> {
278 PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
279 }
280
281 pub fn spec_id(&self) -> Option<i32> {
283 self.spec_id
284 }
285
286 pub fn fields(&self) -> &[UnboundPartitionField] {
288 &self.fields
289 }
290
291 pub fn with_spec_id(self, spec_id: i32) -> Self {
293 Self {
294 spec_id: Some(spec_id),
295 ..self
296 }
297 }
298}
299
300fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
301 for (index, field_id) in field_ids.enumerate() {
302 let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
303 .checked_add(1)
304 .and_then(|id| id.checked_add(index as i64))
305 .unwrap_or(i64::MAX);
306
307 if field_id as i64 != expected_id {
308 return false;
309 }
310 }
311
312 true
313}
314
315impl From<PartitionField> for UnboundPartitionField {
316 fn from(field: PartitionField) -> Self {
317 UnboundPartitionField {
318 source_id: field.source_id,
319 field_id: Some(field.field_id),
320 name: field.name,
321 transform: field.transform,
322 }
323 }
324}
325
326impl From<PartitionSpec> for UnboundPartitionSpec {
327 fn from(spec: PartitionSpec) -> Self {
328 UnboundPartitionSpec {
329 spec_id: Some(spec.spec_id),
330 fields: spec.fields.into_iter().map(Into::into).collect(),
331 }
332 }
333}
334
335#[derive(Debug, Default)]
337pub struct UnboundPartitionSpecBuilder {
338 spec_id: Option<i32>,
339 fields: Vec<UnboundPartitionField>,
340}
341
342impl UnboundPartitionSpecBuilder {
343 pub fn new() -> Self {
345 Self {
346 spec_id: None,
347 fields: vec![],
348 }
349 }
350
351 pub fn with_spec_id(mut self, spec_id: i32) -> Self {
353 self.spec_id = Some(spec_id);
354 self
355 }
356
357 pub fn add_partition_field(
359 self,
360 source_id: i32,
361 target_name: impl ToString,
362 transformation: Transform,
363 ) -> Result<Self> {
364 let field = UnboundPartitionField {
365 source_id,
366 field_id: None,
367 name: target_name.to_string(),
368 transform: transformation,
369 };
370 self.add_partition_field_internal(field)
371 }
372
373 pub fn add_partition_fields(
375 self,
376 fields: impl IntoIterator<Item = UnboundPartitionField>,
377 ) -> Result<Self> {
378 let mut builder = self;
379 for field in fields {
380 builder = builder.add_partition_field_internal(field)?;
381 }
382 Ok(builder)
383 }
384
385 fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
386 self.check_name_set_and_unique(&field.name)?;
387 self.check_for_redundant_partitions(field.source_id, &field.transform)?;
388 if let Some(partition_field_id) = field.field_id {
389 self.check_partition_id_unique(partition_field_id)?;
390 }
391 self.fields.push(field);
392 Ok(self)
393 }
394
395 pub fn build(self) -> UnboundPartitionSpec {
397 UnboundPartitionSpec {
398 spec_id: self.spec_id,
399 fields: self.fields,
400 }
401 }
402}
403
404#[derive(Debug)]
406pub struct PartitionSpecBuilder {
407 spec_id: Option<i32>,
408 last_assigned_field_id: i32,
409 fields: Vec<UnboundPartitionField>,
410 schema: SchemaRef,
411}
412
413impl PartitionSpecBuilder {
414 pub fn new(schema: impl Into<SchemaRef>) -> Self {
416 Self {
417 spec_id: None,
418 fields: vec![],
419 last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
420 schema: schema.into(),
421 }
422 }
423
424 pub fn new_from_unbound(
426 unbound: UnboundPartitionSpec,
427 schema: impl Into<SchemaRef>,
428 ) -> Result<Self> {
429 let mut builder =
430 Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
431
432 for field in unbound.fields {
433 builder = builder.add_unbound_field(field)?;
434 }
435 Ok(builder)
436 }
437
438 pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self {
444 self.last_assigned_field_id = last_assigned_field_id;
445 self
446 }
447
448 pub fn with_spec_id(mut self, spec_id: i32) -> Self {
450 self.spec_id = Some(spec_id);
451 self
452 }
453
454 pub fn add_partition_field(
456 self,
457 source_name: impl AsRef<str>,
458 target_name: impl Into<String>,
459 transform: Transform,
460 ) -> Result<Self> {
461 let source_id = self
462 .schema
463 .field_by_name(source_name.as_ref())
464 .ok_or_else(|| {
465 Error::new(
466 ErrorKind::DataInvalid,
467 format!(
468 "Cannot find source column with name: {} in schema",
469 source_name.as_ref()
470 ),
471 )
472 })?
473 .id;
474 let field = UnboundPartitionField {
475 source_id,
476 field_id: None,
477 name: target_name.into(),
478 transform,
479 };
480
481 self.add_unbound_field(field)
482 }
483
484 pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
489 self.check_name_set_and_unique(&field.name)?;
490 self.check_for_redundant_partitions(field.source_id, &field.transform)?;
491 Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
492 Self::check_transform_compatibility(&field, &self.schema)?;
493 if let Some(partition_field_id) = field.field_id {
494 self.check_partition_id_unique(partition_field_id)?;
495 }
496
497 self.fields.push(field);
499 Ok(self)
500 }
501
502 pub fn add_unbound_fields(
504 self,
505 fields: impl IntoIterator<Item = UnboundPartitionField>,
506 ) -> Result<Self> {
507 let mut builder = self;
508 for field in fields {
509 builder = builder.add_unbound_field(field)?;
510 }
511 Ok(builder)
512 }
513
514 pub fn build(self) -> Result<PartitionSpec> {
516 let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?;
517 Ok(PartitionSpec {
518 spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
519 fields,
520 })
521 }
522
523 fn set_field_ids(
524 fields: Vec<UnboundPartitionField>,
525 last_assigned_field_id: i32,
526 ) -> Result<Vec<PartitionField>> {
527 let mut last_assigned_field_id = last_assigned_field_id;
528 let assigned_ids = fields
531 .iter()
532 .filter_map(|f| f.field_id)
533 .collect::<std::collections::HashSet<_>>();
534
535 fn _check_add_1(prev: i32) -> Result<i32> {
536 prev.checked_add(1).ok_or_else(|| {
537 Error::new(
538 ErrorKind::DataInvalid,
539 "Cannot assign more partition ids. Overflow.",
540 )
541 })
542 }
543
544 let mut bound_fields = Vec::with_capacity(fields.len());
545 for field in fields.into_iter() {
546 let partition_field_id = if let Some(partition_field_id) = field.field_id {
547 last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
548 partition_field_id
549 } else {
550 last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
551 while assigned_ids.contains(&last_assigned_field_id) {
552 last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
553 }
554 last_assigned_field_id
555 };
556
557 bound_fields.push(PartitionField {
558 source_id: field.source_id,
559 field_id: partition_field_id,
560 name: field.name,
561 transform: field.transform,
562 })
563 }
564
565 Ok(bound_fields)
566 }
567
568 fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> Result<StructType> {
570 let mut struct_fields = Vec::with_capacity(fields.len());
571 for partition_field in fields {
572 let field = schema
573 .field_by_id(partition_field.source_id)
574 .ok_or_else(|| {
575 Error::new(
576 ErrorKind::Unexpected,
579 format!(
580 "No column with source column id {} in schema {:?}",
581 partition_field.source_id, schema
582 ),
583 )
584 })?;
585 let res_type = partition_field.transform.result_type(&field.field_type)?;
586 let field =
587 NestedField::optional(partition_field.field_id, &partition_field.name, res_type)
588 .into();
589 struct_fields.push(field);
590 }
591 Ok(StructType::new(struct_fields))
592 }
593
594 fn check_name_does_not_collide_with_schema(
599 field: &UnboundPartitionField,
600 schema: &Schema,
601 ) -> Result<()> {
602 match schema.field_by_name(field.name.as_str()) {
603 Some(schema_collision) => {
604 if field.transform == Transform::Identity {
605 if schema_collision.id == field.source_id {
606 Ok(())
607 } else {
608 Err(Error::new(
609 ErrorKind::DataInvalid,
610 format!(
611 "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`",
612 field.name, schema_collision.id, field.source_id
613 ),
614 ))
615 }
616 } else {
617 Err(Error::new(
618 ErrorKind::DataInvalid,
619 format!(
620 "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.",
621 field.name
622 ),
623 ))
624 }
625 }
626 None => Ok(()),
627 }
628 }
629
630 fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> {
633 let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
634 Error::new(
635 ErrorKind::DataInvalid,
636 format!(
637 "Cannot find partition source field with id `{}` in schema",
638 field.source_id
639 ),
640 )
641 })?;
642
643 if field.transform != Transform::Void {
644 if !schema_field.field_type.is_primitive() {
645 return Err(Error::new(
646 ErrorKind::DataInvalid,
647 format!(
648 "Cannot partition by non-primitive source field: '{}'.",
649 schema_field.field_type
650 ),
651 ));
652 }
653
654 if field
655 .transform
656 .result_type(&schema_field.field_type)
657 .is_err()
658 {
659 return Err(Error::new(
660 ErrorKind::DataInvalid,
661 format!(
662 "Invalid source type: '{}' for transform: '{}'.",
663 schema_field.field_type,
664 field.transform.dedup_name()
665 ),
666 ));
667 }
668 }
669
670 Ok(())
671 }
672}
673
674trait CorePartitionSpecValidator {
676 fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
678 if name.is_empty() {
679 return Err(Error::new(
680 ErrorKind::DataInvalid,
681 "Cannot use empty partition name",
682 ));
683 }
684
685 if self.fields().iter().any(|f| f.name == name) {
686 return Err(Error::new(
687 ErrorKind::DataInvalid,
688 format!("Cannot use partition name more than once: {name}"),
689 ));
690 }
691 Ok(())
692 }
693
694 fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
696 let collision = self.fields().iter().find(|f| {
697 f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
698 });
699
700 if let Some(collision) = collision {
701 Err(Error::new(
702 ErrorKind::DataInvalid,
703 format!(
704 "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
705 source_id,
706 transform.dedup_name(),
707 collision.name
708 ),
709 ))
710 } else {
711 Ok(())
712 }
713 }
714
715 fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
717 if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
718 return Err(Error::new(
719 ErrorKind::DataInvalid,
720 format!("Cannot use field id more than once in one PartitionSpec: {field_id}"),
721 ));
722 }
723
724 Ok(())
725 }
726
727 fn fields(&self) -> &Vec<UnboundPartitionField>;
728}
729
730impl CorePartitionSpecValidator for PartitionSpecBuilder {
731 fn fields(&self) -> &Vec<UnboundPartitionField> {
732 &self.fields
733 }
734}
735
736impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
737 fn fields(&self) -> &Vec<UnboundPartitionField> {
738 &self.fields
739 }
740}
741
742#[cfg(test)]
743mod tests {
744 use super::*;
745 use crate::spec::{Literal, PrimitiveType, Type};
746
747 #[test]
748 fn test_partition_spec() {
749 let spec = r#"
750 {
751 "spec-id": 1,
752 "fields": [ {
753 "source-id": 4,
754 "field-id": 1000,
755 "name": "ts_day",
756 "transform": "day"
757 }, {
758 "source-id": 1,
759 "field-id": 1001,
760 "name": "id_bucket",
761 "transform": "bucket[16]"
762 }, {
763 "source-id": 2,
764 "field-id": 1002,
765 "name": "id_truncate",
766 "transform": "truncate[4]"
767 } ]
768 }
769 "#;
770
771 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
772 assert_eq!(4, partition_spec.fields[0].source_id);
773 assert_eq!(1000, partition_spec.fields[0].field_id);
774 assert_eq!("ts_day", partition_spec.fields[0].name);
775 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
776
777 assert_eq!(1, partition_spec.fields[1].source_id);
778 assert_eq!(1001, partition_spec.fields[1].field_id);
779 assert_eq!("id_bucket", partition_spec.fields[1].name);
780 assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
781
782 assert_eq!(2, partition_spec.fields[2].source_id);
783 assert_eq!(1002, partition_spec.fields[2].field_id);
784 assert_eq!("id_truncate", partition_spec.fields[2].name);
785 assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
786 }
787
788 #[test]
789 fn test_is_unpartitioned() {
790 let schema = Schema::builder()
791 .with_fields(vec![
792 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
793 .into(),
794 NestedField::required(
795 2,
796 "name",
797 Type::Primitive(crate::spec::PrimitiveType::String),
798 )
799 .into(),
800 ])
801 .build()
802 .unwrap();
803 let partition_spec = PartitionSpec::builder(schema.clone())
804 .with_spec_id(1)
805 .build()
806 .unwrap();
807 assert!(
808 partition_spec.is_unpartitioned(),
809 "Empty partition spec should be unpartitioned"
810 );
811
812 let partition_spec = PartitionSpec::builder(schema.clone())
813 .add_unbound_fields(vec![
814 UnboundPartitionField::builder()
815 .source_id(1)
816 .name("id".to_string())
817 .transform(Transform::Identity)
818 .build(),
819 UnboundPartitionField::builder()
820 .source_id(2)
821 .name("name_string".to_string())
822 .transform(Transform::Void)
823 .build(),
824 ])
825 .unwrap()
826 .with_spec_id(1)
827 .build()
828 .unwrap();
829 assert!(
830 !partition_spec.is_unpartitioned(),
831 "Partition spec with one non void transform should not be unpartitioned"
832 );
833
834 let partition_spec = PartitionSpec::builder(schema.clone())
835 .with_spec_id(1)
836 .add_unbound_fields(vec![
837 UnboundPartitionField::builder()
838 .source_id(1)
839 .name("id_void".to_string())
840 .transform(Transform::Void)
841 .build(),
842 UnboundPartitionField::builder()
843 .source_id(2)
844 .name("name_void".to_string())
845 .transform(Transform::Void)
846 .build(),
847 ])
848 .unwrap()
849 .build()
850 .unwrap();
851 assert!(
852 partition_spec.is_unpartitioned(),
853 "Partition spec with all void field should be unpartitioned"
854 );
855 }
856
857 #[test]
858 fn test_unbound_partition_spec() {
859 let spec = r#"
860 {
861 "spec-id": 1,
862 "fields": [ {
863 "source-id": 4,
864 "field-id": 1000,
865 "name": "ts_day",
866 "transform": "day"
867 }, {
868 "source-id": 1,
869 "field-id": 1001,
870 "name": "id_bucket",
871 "transform": "bucket[16]"
872 }, {
873 "source-id": 2,
874 "field-id": 1002,
875 "name": "id_truncate",
876 "transform": "truncate[4]"
877 } ]
878 }
879 "#;
880
881 let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
882 assert_eq!(Some(1), partition_spec.spec_id);
883
884 assert_eq!(4, partition_spec.fields[0].source_id);
885 assert_eq!(Some(1000), partition_spec.fields[0].field_id);
886 assert_eq!("ts_day", partition_spec.fields[0].name);
887 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
888
889 assert_eq!(1, partition_spec.fields[1].source_id);
890 assert_eq!(Some(1001), partition_spec.fields[1].field_id);
891 assert_eq!("id_bucket", partition_spec.fields[1].name);
892 assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
893
894 assert_eq!(2, partition_spec.fields[2].source_id);
895 assert_eq!(Some(1002), partition_spec.fields[2].field_id);
896 assert_eq!("id_truncate", partition_spec.fields[2].name);
897 assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
898
899 let spec = r#"
900 {
901 "fields": [ {
902 "source-id": 4,
903 "name": "ts_day",
904 "transform": "day"
905 } ]
906 }
907 "#;
908 let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
909 assert_eq!(None, partition_spec.spec_id);
910
911 assert_eq!(4, partition_spec.fields[0].source_id);
912 assert_eq!(None, partition_spec.fields[0].field_id);
913 assert_eq!("ts_day", partition_spec.fields[0].name);
914 assert_eq!(Transform::Day, partition_spec.fields[0].transform);
915 }
916
917 #[test]
918 fn test_unbound_partition_spec_serialization_skips_none_fields() {
919 let spec = UnboundPartitionSpec::builder()
920 .add_partition_field(4, "ts_day".to_string(), Transform::Day)
921 .unwrap()
922 .build();
923
924 let value = serde_json::to_value(&spec).unwrap();
925 let object = value.as_object().unwrap();
926 assert!(!object.contains_key("spec-id"));
927 let field = object["fields"][0].as_object().unwrap();
928 assert!(!field.contains_key("field-id"));
929
930 let value = serde_json::to_value(spec.with_spec_id(1)).unwrap();
931 let object = value.as_object().unwrap();
932 assert_eq!(Some(&serde_json::json!(1)), object.get("spec-id"));
933
934 let spec: UnboundPartitionSpec = serde_json::from_str(
937 r#"{
938 "spec-id": null,
939 "fields": [
940 {"source-id": 4, "name": "ts_day", "transform": "day", "field-id": null}
941 ]
942 }"#,
943 )
944 .unwrap();
945 assert_eq!(None, spec.spec_id);
946 assert_eq!(None, spec.fields[0].field_id);
947 }
948
949 #[test]
950 fn test_new_unpartition() {
951 let schema = Schema::builder()
952 .with_fields(vec![
953 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
954 .into(),
955 NestedField::required(
956 2,
957 "name",
958 Type::Primitive(crate::spec::PrimitiveType::String),
959 )
960 .into(),
961 ])
962 .build()
963 .unwrap();
964 let partition_spec = PartitionSpec::builder(schema.clone())
965 .with_spec_id(0)
966 .build()
967 .unwrap();
968 let partition_type = partition_spec.partition_type(&schema).unwrap();
969 assert_eq!(0, partition_type.fields().len());
970
971 let unpartition_spec = PartitionSpec::unpartition_spec();
972 assert_eq!(partition_spec, unpartition_spec);
973 }
974
975 #[test]
976 fn test_partition_type() {
977 let spec = r#"
978 {
979 "spec-id": 1,
980 "fields": [ {
981 "source-id": 4,
982 "field-id": 1000,
983 "name": "ts_day",
984 "transform": "day"
985 }, {
986 "source-id": 1,
987 "field-id": 1001,
988 "name": "id_bucket",
989 "transform": "bucket[16]"
990 }, {
991 "source-id": 2,
992 "field-id": 1002,
993 "name": "id_truncate",
994 "transform": "truncate[4]"
995 } ]
996 }
997 "#;
998
999 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1000 let schema = Schema::builder()
1001 .with_fields(vec![
1002 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1003 .into(),
1004 NestedField::required(
1005 2,
1006 "name",
1007 Type::Primitive(crate::spec::PrimitiveType::String),
1008 )
1009 .into(),
1010 NestedField::required(
1011 3,
1012 "ts",
1013 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1014 )
1015 .into(),
1016 NestedField::required(
1017 4,
1018 "ts_day",
1019 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1020 )
1021 .into(),
1022 NestedField::required(
1023 5,
1024 "id_bucket",
1025 Type::Primitive(crate::spec::PrimitiveType::Int),
1026 )
1027 .into(),
1028 NestedField::required(
1029 6,
1030 "id_truncate",
1031 Type::Primitive(crate::spec::PrimitiveType::Int),
1032 )
1033 .into(),
1034 ])
1035 .build()
1036 .unwrap();
1037
1038 let partition_type = partition_spec.partition_type(&schema).unwrap();
1039 assert_eq!(3, partition_type.fields().len());
1040 assert_eq!(
1041 *partition_type.fields()[0],
1042 NestedField::optional(
1043 partition_spec.fields[0].field_id,
1044 &partition_spec.fields[0].name,
1045 Type::Primitive(crate::spec::PrimitiveType::Date)
1046 )
1047 );
1048 assert_eq!(
1049 *partition_type.fields()[1],
1050 NestedField::optional(
1051 partition_spec.fields[1].field_id,
1052 &partition_spec.fields[1].name,
1053 Type::Primitive(crate::spec::PrimitiveType::Int)
1054 )
1055 );
1056 assert_eq!(
1057 *partition_type.fields()[2],
1058 NestedField::optional(
1059 partition_spec.fields[2].field_id,
1060 &partition_spec.fields[2].name,
1061 Type::Primitive(crate::spec::PrimitiveType::String)
1062 )
1063 );
1064 }
1065
1066 #[test]
1067 fn test_partition_empty() {
1068 let spec = r#"
1069 {
1070 "spec-id": 1,
1071 "fields": []
1072 }
1073 "#;
1074
1075 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1076 let schema = Schema::builder()
1077 .with_fields(vec![
1078 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1079 .into(),
1080 NestedField::required(
1081 2,
1082 "name",
1083 Type::Primitive(crate::spec::PrimitiveType::String),
1084 )
1085 .into(),
1086 NestedField::required(
1087 3,
1088 "ts",
1089 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1090 )
1091 .into(),
1092 NestedField::required(
1093 4,
1094 "ts_day",
1095 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1096 )
1097 .into(),
1098 NestedField::required(
1099 5,
1100 "id_bucket",
1101 Type::Primitive(crate::spec::PrimitiveType::Int),
1102 )
1103 .into(),
1104 NestedField::required(
1105 6,
1106 "id_truncate",
1107 Type::Primitive(crate::spec::PrimitiveType::Int),
1108 )
1109 .into(),
1110 ])
1111 .build()
1112 .unwrap();
1113
1114 let partition_type = partition_spec.partition_type(&schema).unwrap();
1115 assert_eq!(0, partition_type.fields().len());
1116 }
1117
1118 #[test]
1119 fn test_partition_error() {
1120 let spec = r#"
1121 {
1122 "spec-id": 1,
1123 "fields": [ {
1124 "source-id": 4,
1125 "field-id": 1000,
1126 "name": "ts_day",
1127 "transform": "day"
1128 }, {
1129 "source-id": 1,
1130 "field-id": 1001,
1131 "name": "id_bucket",
1132 "transform": "bucket[16]"
1133 }, {
1134 "source-id": 2,
1135 "field-id": 1002,
1136 "name": "id_truncate",
1137 "transform": "truncate[4]"
1138 } ]
1139 }
1140 "#;
1141
1142 let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1143 let schema = Schema::builder()
1144 .with_fields(vec![
1145 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1146 .into(),
1147 NestedField::required(
1148 2,
1149 "name",
1150 Type::Primitive(crate::spec::PrimitiveType::String),
1151 )
1152 .into(),
1153 ])
1154 .build()
1155 .unwrap();
1156
1157 assert!(partition_spec.partition_type(&schema).is_err());
1158 }
1159
1160 #[test]
1161 fn test_builder_disallow_duplicate_names() {
1162 UnboundPartitionSpec::builder()
1163 .add_partition_field(1, "ts_day".to_string(), Transform::Day)
1164 .unwrap()
1165 .add_partition_field(2, "ts_day".to_string(), Transform::Day)
1166 .unwrap_err();
1167 }
1168
1169 #[test]
1170 fn test_builder_disallow_duplicate_field_ids() {
1171 let schema = Schema::builder()
1172 .with_fields(vec![
1173 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1174 .into(),
1175 NestedField::required(
1176 2,
1177 "name",
1178 Type::Primitive(crate::spec::PrimitiveType::String),
1179 )
1180 .into(),
1181 ])
1182 .build()
1183 .unwrap();
1184 PartitionSpec::builder(schema.clone())
1185 .add_unbound_field(UnboundPartitionField {
1186 source_id: 1,
1187 field_id: Some(1000),
1188 name: "id".to_string(),
1189 transform: Transform::Identity,
1190 })
1191 .unwrap()
1192 .add_unbound_field(UnboundPartitionField {
1193 source_id: 2,
1194 field_id: Some(1000),
1195 name: "id_bucket".to_string(),
1196 transform: Transform::Bucket(16),
1197 })
1198 .unwrap_err();
1199 }
1200
1201 #[test]
1202 fn test_builder_auto_assign_field_ids() {
1203 let schema = Schema::builder()
1204 .with_fields(vec![
1205 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1206 .into(),
1207 NestedField::required(
1208 2,
1209 "name",
1210 Type::Primitive(crate::spec::PrimitiveType::String),
1211 )
1212 .into(),
1213 NestedField::required(
1214 3,
1215 "ts",
1216 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1217 )
1218 .into(),
1219 ])
1220 .build()
1221 .unwrap();
1222 let spec = PartitionSpec::builder(schema.clone())
1223 .with_spec_id(1)
1224 .add_unbound_field(UnboundPartitionField {
1225 source_id: 1,
1226 name: "id".to_string(),
1227 transform: Transform::Identity,
1228 field_id: Some(1012),
1229 })
1230 .unwrap()
1231 .add_unbound_field(UnboundPartitionField {
1232 source_id: 2,
1233 name: "name_void".to_string(),
1234 transform: Transform::Void,
1235 field_id: None,
1236 })
1237 .unwrap()
1238 .add_unbound_field(UnboundPartitionField {
1240 source_id: 3,
1241 name: "year".to_string(),
1242 transform: Transform::Year,
1243 field_id: Some(1),
1244 })
1245 .unwrap()
1246 .build()
1247 .unwrap();
1248
1249 assert_eq!(1012, spec.fields[0].field_id);
1250 assert_eq!(1013, spec.fields[1].field_id);
1251 assert_eq!(1, spec.fields[2].field_id);
1252 }
1253
1254 #[test]
1255 fn test_builder_valid_schema() {
1256 let schema = Schema::builder()
1257 .with_fields(vec![
1258 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1259 .into(),
1260 NestedField::required(
1261 2,
1262 "name",
1263 Type::Primitive(crate::spec::PrimitiveType::String),
1264 )
1265 .into(),
1266 ])
1267 .build()
1268 .unwrap();
1269
1270 PartitionSpec::builder(schema.clone())
1271 .with_spec_id(1)
1272 .build()
1273 .unwrap();
1274
1275 let spec = PartitionSpec::builder(schema.clone())
1276 .with_spec_id(1)
1277 .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
1278 .unwrap()
1279 .build()
1280 .unwrap();
1281
1282 assert_eq!(spec, PartitionSpec {
1283 spec_id: 1,
1284 fields: vec![PartitionField {
1285 source_id: 1,
1286 field_id: 1000,
1287 name: "id_bucket[16]".to_string(),
1288 transform: Transform::Bucket(16),
1289 }],
1290 });
1291 assert_eq!(
1292 spec.partition_type(&schema).unwrap(),
1293 StructType::new(vec![
1294 NestedField::optional(1000, "id_bucket[16]", Type::Primitive(PrimitiveType::Int))
1295 .into()
1296 ])
1297 )
1298 }
1299
1300 #[test]
1301 fn test_collision_with_schema_name() {
1302 let schema = Schema::builder()
1303 .with_fields(vec![
1304 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1305 .into(),
1306 ])
1307 .build()
1308 .unwrap();
1309
1310 PartitionSpec::builder(schema.clone())
1311 .with_spec_id(1)
1312 .build()
1313 .unwrap();
1314
1315 let err = PartitionSpec::builder(schema)
1316 .with_spec_id(1)
1317 .add_unbound_field(UnboundPartitionField {
1318 source_id: 1,
1319 field_id: None,
1320 name: "id".to_string(),
1321 transform: Transform::Bucket(16),
1322 })
1323 .unwrap_err();
1324 assert!(err.message().contains("conflicts with schema"))
1325 }
1326
1327 #[test]
1328 fn test_builder_collision_is_ok_for_identity_transforms() {
1329 let schema = Schema::builder()
1330 .with_fields(vec![
1331 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1332 .into(),
1333 NestedField::required(
1334 2,
1335 "number",
1336 Type::Primitive(crate::spec::PrimitiveType::Int),
1337 )
1338 .into(),
1339 ])
1340 .build()
1341 .unwrap();
1342
1343 PartitionSpec::builder(schema.clone())
1344 .with_spec_id(1)
1345 .build()
1346 .unwrap();
1347
1348 PartitionSpec::builder(schema.clone())
1349 .with_spec_id(1)
1350 .add_unbound_field(UnboundPartitionField {
1351 source_id: 1,
1352 field_id: None,
1353 name: "id".to_string(),
1354 transform: Transform::Identity,
1355 })
1356 .unwrap()
1357 .build()
1358 .unwrap();
1359
1360 PartitionSpec::builder(schema)
1362 .with_spec_id(1)
1363 .add_unbound_field(UnboundPartitionField {
1364 source_id: 2,
1365 field_id: None,
1366 name: "id".to_string(),
1367 transform: Transform::Identity,
1368 })
1369 .unwrap_err();
1370 }
1371
1372 #[test]
1373 fn test_builder_all_source_ids_must_exist() {
1374 let schema = Schema::builder()
1375 .with_fields(vec![
1376 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1377 .into(),
1378 NestedField::required(
1379 2,
1380 "name",
1381 Type::Primitive(crate::spec::PrimitiveType::String),
1382 )
1383 .into(),
1384 NestedField::required(
1385 3,
1386 "ts",
1387 Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1388 )
1389 .into(),
1390 ])
1391 .build()
1392 .unwrap();
1393
1394 PartitionSpec::builder(schema.clone())
1396 .with_spec_id(1)
1397 .add_unbound_fields(vec![
1398 UnboundPartitionField {
1399 source_id: 1,
1400 field_id: None,
1401 name: "id_bucket".to_string(),
1402 transform: Transform::Bucket(16),
1403 },
1404 UnboundPartitionField {
1405 source_id: 2,
1406 field_id: None,
1407 name: "name".to_string(),
1408 transform: Transform::Identity,
1409 },
1410 ])
1411 .unwrap()
1412 .build()
1413 .unwrap();
1414
1415 PartitionSpec::builder(schema)
1417 .with_spec_id(1)
1418 .add_unbound_fields(vec![
1419 UnboundPartitionField {
1420 source_id: 1,
1421 field_id: None,
1422 name: "id_bucket".to_string(),
1423 transform: Transform::Bucket(16),
1424 },
1425 UnboundPartitionField {
1426 source_id: 4,
1427 field_id: None,
1428 name: "name".to_string(),
1429 transform: Transform::Identity,
1430 },
1431 ])
1432 .unwrap_err();
1433 }
1434
1435 #[test]
1436 fn test_builder_disallows_redundant() {
1437 let err = UnboundPartitionSpec::builder()
1438 .with_spec_id(1)
1439 .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16))
1440 .unwrap()
1441 .add_partition_field(
1442 1,
1443 "id_bucket_with_other_name".to_string(),
1444 Transform::Bucket(16),
1445 )
1446 .unwrap_err();
1447 assert!(err.message().contains("redundant partition"));
1448 }
1449
1450 #[test]
1451 fn test_builder_incompatible_transforms_disallowed() {
1452 let schema = Schema::builder()
1453 .with_fields(vec![
1454 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1455 .into(),
1456 ])
1457 .build()
1458 .unwrap();
1459
1460 PartitionSpec::builder(schema)
1461 .with_spec_id(1)
1462 .add_unbound_field(UnboundPartitionField {
1463 source_id: 1,
1464 field_id: None,
1465 name: "id_year".to_string(),
1466 transform: Transform::Year,
1467 })
1468 .unwrap_err();
1469 }
1470
1471 #[test]
1472 fn test_build_unbound_specs_without_partition_id() {
1473 let spec = UnboundPartitionSpec::builder()
1474 .with_spec_id(1)
1475 .add_partition_fields(vec![UnboundPartitionField {
1476 source_id: 1,
1477 field_id: None,
1478 name: "id_bucket[16]".to_string(),
1479 transform: Transform::Bucket(16),
1480 }])
1481 .unwrap()
1482 .build();
1483
1484 assert_eq!(spec, UnboundPartitionSpec {
1485 spec_id: Some(1),
1486 fields: vec![UnboundPartitionField {
1487 source_id: 1,
1488 field_id: None,
1489 name: "id_bucket[16]".to_string(),
1490 transform: Transform::Bucket(16),
1491 }]
1492 });
1493 }
1494
1495 #[test]
1496 fn test_is_compatible_with() {
1497 let schema = Schema::builder()
1498 .with_fields(vec![
1499 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1500 .into(),
1501 NestedField::required(
1502 2,
1503 "name",
1504 Type::Primitive(crate::spec::PrimitiveType::String),
1505 )
1506 .into(),
1507 ])
1508 .build()
1509 .unwrap();
1510
1511 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1512 .with_spec_id(1)
1513 .add_unbound_field(UnboundPartitionField {
1514 source_id: 1,
1515 field_id: None,
1516 name: "id_bucket".to_string(),
1517 transform: Transform::Bucket(16),
1518 })
1519 .unwrap()
1520 .build()
1521 .unwrap();
1522
1523 let partition_spec_2 = PartitionSpec::builder(schema)
1524 .with_spec_id(1)
1525 .add_unbound_field(UnboundPartitionField {
1526 source_id: 1,
1527 field_id: None,
1528 name: "id_bucket".to_string(),
1529 transform: Transform::Bucket(16),
1530 })
1531 .unwrap()
1532 .build()
1533 .unwrap();
1534
1535 assert!(partition_spec_1.is_compatible_with(&partition_spec_2));
1536 }
1537
1538 #[test]
1539 fn test_not_compatible_with_transform_different() {
1540 let schema = Schema::builder()
1541 .with_fields(vec![
1542 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1543 .into(),
1544 ])
1545 .build()
1546 .unwrap();
1547
1548 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1549 .with_spec_id(1)
1550 .add_unbound_field(UnboundPartitionField {
1551 source_id: 1,
1552 field_id: None,
1553 name: "id_bucket".to_string(),
1554 transform: Transform::Bucket(16),
1555 })
1556 .unwrap()
1557 .build()
1558 .unwrap();
1559
1560 let partition_spec_2 = PartitionSpec::builder(schema)
1561 .with_spec_id(1)
1562 .add_unbound_field(UnboundPartitionField {
1563 source_id: 1,
1564 field_id: None,
1565 name: "id_bucket".to_string(),
1566 transform: Transform::Bucket(32),
1567 })
1568 .unwrap()
1569 .build()
1570 .unwrap();
1571
1572 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1573 }
1574
1575 #[test]
1576 fn test_not_compatible_with_source_id_different() {
1577 let schema = Schema::builder()
1578 .with_fields(vec![
1579 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1580 .into(),
1581 NestedField::required(
1582 2,
1583 "name",
1584 Type::Primitive(crate::spec::PrimitiveType::String),
1585 )
1586 .into(),
1587 ])
1588 .build()
1589 .unwrap();
1590
1591 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1592 .with_spec_id(1)
1593 .add_unbound_field(UnboundPartitionField {
1594 source_id: 1,
1595 field_id: None,
1596 name: "id_bucket".to_string(),
1597 transform: Transform::Bucket(16),
1598 })
1599 .unwrap()
1600 .build()
1601 .unwrap();
1602
1603 let partition_spec_2 = PartitionSpec::builder(schema)
1604 .with_spec_id(1)
1605 .add_unbound_field(UnboundPartitionField {
1606 source_id: 2,
1607 field_id: None,
1608 name: "id_bucket".to_string(),
1609 transform: Transform::Bucket(16),
1610 })
1611 .unwrap()
1612 .build()
1613 .unwrap();
1614
1615 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1616 }
1617
1618 #[test]
1619 fn test_not_compatible_with_order_different() {
1620 let schema = Schema::builder()
1621 .with_fields(vec![
1622 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1623 .into(),
1624 NestedField::required(
1625 2,
1626 "name",
1627 Type::Primitive(crate::spec::PrimitiveType::String),
1628 )
1629 .into(),
1630 ])
1631 .build()
1632 .unwrap();
1633
1634 let partition_spec_1 = PartitionSpec::builder(schema.clone())
1635 .with_spec_id(1)
1636 .add_unbound_field(UnboundPartitionField {
1637 source_id: 1,
1638 field_id: None,
1639 name: "id_bucket".to_string(),
1640 transform: Transform::Bucket(16),
1641 })
1642 .unwrap()
1643 .add_unbound_field(UnboundPartitionField {
1644 source_id: 2,
1645 field_id: None,
1646 name: "name".to_string(),
1647 transform: Transform::Identity,
1648 })
1649 .unwrap()
1650 .build()
1651 .unwrap();
1652
1653 let partition_spec_2 = PartitionSpec::builder(schema)
1654 .with_spec_id(1)
1655 .add_unbound_field(UnboundPartitionField {
1656 source_id: 2,
1657 field_id: None,
1658 name: "name".to_string(),
1659 transform: Transform::Identity,
1660 })
1661 .unwrap()
1662 .add_unbound_field(UnboundPartitionField {
1663 source_id: 1,
1664 field_id: None,
1665 name: "id_bucket".to_string(),
1666 transform: Transform::Bucket(16),
1667 })
1668 .unwrap()
1669 .build()
1670 .unwrap();
1671
1672 assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1673 }
1674
1675 #[test]
1676 fn test_highest_field_id_unpartitioned() {
1677 let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
1678 .with_spec_id(1)
1679 .build()
1680 .unwrap();
1681
1682 assert!(spec.highest_field_id().is_none());
1683 }
1684
1685 #[test]
1686 fn test_highest_field_id() {
1687 let schema = Schema::builder()
1688 .with_fields(vec![
1689 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1690 .into(),
1691 NestedField::required(
1692 2,
1693 "name",
1694 Type::Primitive(crate::spec::PrimitiveType::String),
1695 )
1696 .into(),
1697 ])
1698 .build()
1699 .unwrap();
1700
1701 let spec = PartitionSpec::builder(schema)
1702 .with_spec_id(1)
1703 .add_unbound_field(UnboundPartitionField {
1704 source_id: 1,
1705 field_id: Some(1001),
1706 name: "id".to_string(),
1707 transform: Transform::Identity,
1708 })
1709 .unwrap()
1710 .add_unbound_field(UnboundPartitionField {
1711 source_id: 2,
1712 field_id: Some(1000),
1713 name: "name".to_string(),
1714 transform: Transform::Identity,
1715 })
1716 .unwrap()
1717 .build()
1718 .unwrap();
1719
1720 assert_eq!(Some(1001), spec.highest_field_id());
1721 }
1722
1723 #[test]
1724 fn test_has_sequential_ids() {
1725 let schema = Schema::builder()
1726 .with_fields(vec![
1727 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1728 .into(),
1729 NestedField::required(
1730 2,
1731 "name",
1732 Type::Primitive(crate::spec::PrimitiveType::String),
1733 )
1734 .into(),
1735 ])
1736 .build()
1737 .unwrap();
1738
1739 let spec = PartitionSpec::builder(schema)
1740 .with_spec_id(1)
1741 .add_unbound_field(UnboundPartitionField {
1742 source_id: 1,
1743 field_id: Some(1000),
1744 name: "id".to_string(),
1745 transform: Transform::Identity,
1746 })
1747 .unwrap()
1748 .add_unbound_field(UnboundPartitionField {
1749 source_id: 2,
1750 field_id: Some(1001),
1751 name: "name".to_string(),
1752 transform: Transform::Identity,
1753 })
1754 .unwrap()
1755 .build()
1756 .unwrap();
1757
1758 assert_eq!(1000, spec.fields[0].field_id);
1759 assert_eq!(1001, spec.fields[1].field_id);
1760 assert!(spec.has_sequential_ids());
1761 }
1762
1763 #[test]
1764 fn test_sequential_ids_must_start_at_1000() {
1765 let schema = Schema::builder()
1766 .with_fields(vec![
1767 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1768 .into(),
1769 NestedField::required(
1770 2,
1771 "name",
1772 Type::Primitive(crate::spec::PrimitiveType::String),
1773 )
1774 .into(),
1775 ])
1776 .build()
1777 .unwrap();
1778
1779 let spec = PartitionSpec::builder(schema)
1780 .with_spec_id(1)
1781 .add_unbound_field(UnboundPartitionField {
1782 source_id: 1,
1783 field_id: Some(999),
1784 name: "id".to_string(),
1785 transform: Transform::Identity,
1786 })
1787 .unwrap()
1788 .add_unbound_field(UnboundPartitionField {
1789 source_id: 2,
1790 field_id: Some(1000),
1791 name: "name".to_string(),
1792 transform: Transform::Identity,
1793 })
1794 .unwrap()
1795 .build()
1796 .unwrap();
1797
1798 assert_eq!(999, spec.fields[0].field_id);
1799 assert_eq!(1000, spec.fields[1].field_id);
1800 assert!(!spec.has_sequential_ids());
1801 }
1802
1803 #[test]
1804 fn test_sequential_ids_must_have_no_gaps() {
1805 let schema = Schema::builder()
1806 .with_fields(vec![
1807 NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1808 .into(),
1809 NestedField::required(
1810 2,
1811 "name",
1812 Type::Primitive(crate::spec::PrimitiveType::String),
1813 )
1814 .into(),
1815 ])
1816 .build()
1817 .unwrap();
1818
1819 let spec = PartitionSpec::builder(schema)
1820 .with_spec_id(1)
1821 .add_unbound_field(UnboundPartitionField {
1822 source_id: 1,
1823 field_id: Some(1000),
1824 name: "id".to_string(),
1825 transform: Transform::Identity,
1826 })
1827 .unwrap()
1828 .add_unbound_field(UnboundPartitionField {
1829 source_id: 2,
1830 field_id: Some(1002),
1831 name: "name".to_string(),
1832 transform: Transform::Identity,
1833 })
1834 .unwrap()
1835 .build()
1836 .unwrap();
1837
1838 assert_eq!(1000, spec.fields[0].field_id);
1839 assert_eq!(1002, spec.fields[1].field_id);
1840 assert!(!spec.has_sequential_ids());
1841 }
1842
1843 #[test]
1844 fn test_partition_to_path() {
1845 let schema = Schema::builder()
1846 .with_fields(vec![
1847 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1848 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1849 NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
1850 .into(),
1851 NestedField::required(4, "empty", Type::Primitive(PrimitiveType::String)).into(),
1852 ])
1853 .build()
1854 .unwrap();
1855
1856 let spec = PartitionSpec::builder(schema.clone())
1857 .add_partition_field("id", "id", Transform::Identity)
1858 .unwrap()
1859 .add_partition_field("name", "name", Transform::Identity)
1860 .unwrap()
1861 .add_partition_field("timestamp", "ts_hour", Transform::Hour)
1862 .unwrap()
1863 .add_partition_field("empty", "empty_void", Transform::Void)
1864 .unwrap()
1865 .build()
1866 .unwrap();
1867
1868 let data = Struct::from_iter([
1869 Some(Literal::int(42)),
1870 Some(Literal::string("alice")),
1871 Some(Literal::int(1000)),
1872 Some(Literal::string("empty")),
1873 ]);
1874
1875 assert_eq!(
1876 spec.partition_to_path(&data, schema.into()),
1877 "id=42/name=alice/ts_hour=1000/empty_void=null"
1878 );
1879 }
1880}