iceberg/spec/
partition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Partitioning
20 */
21use std::sync::Arc;
22
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use typed_builder::TypedBuilder;
26
27use super::transform::Transform;
28use super::{NestedField, Schema, SchemaRef, StructType};
29use crate::spec::Struct;
30use crate::{Error, ErrorKind, Result};
31
32pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
33pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
34
35/// Partition fields capture the transform from table data to partition values.
36#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
37#[serde(rename_all = "kebab-case")]
38pub struct PartitionField {
39    /// A source column id from the table’s schema
40    pub source_id: i32,
41    /// A partition field id that is used to identify a partition field and is unique within a partition spec.
42    /// In v2 table metadata, it is unique across all partition specs.
43    pub field_id: i32,
44    /// A partition name.
45    pub name: String,
46    /// A transform that is applied to the source column to produce a partition value.
47    pub transform: Transform,
48}
49
50impl PartitionField {
51    /// To unbound partition field
52    pub fn into_unbound(self) -> UnboundPartitionField {
53        self.into()
54    }
55}
56
57/// Reference to [`PartitionSpec`].
58pub type PartitionSpecRef = Arc<PartitionSpec>;
59/// Partition spec that defines how to produce a tuple of partition values from a record.
60///
61/// A [`PartitionSpec`] is originally obtained by binding an [`UnboundPartitionSpec`] to a schema and is
62/// only guaranteed to be valid for that schema. The main difference between [`PartitionSpec`] and
63/// [`UnboundPartitionSpec`] is that the former has field ids assigned,
64/// while field ids are optional for [`UnboundPartitionSpec`].
65#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
66#[serde(rename_all = "kebab-case")]
67pub struct PartitionSpec {
68    /// Identifier for PartitionSpec
69    spec_id: i32,
70    /// Details of the partition spec
71    fields: Vec<PartitionField>,
72}
73
74impl PartitionSpec {
75    /// Create a new partition spec builder with the given schema.
76    pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
77        PartitionSpecBuilder::new(schema)
78    }
79
80    /// Fields of the partition spec
81    pub fn fields(&self) -> &[PartitionField] {
82        &self.fields
83    }
84
85    /// Spec id of the partition spec
86    pub fn spec_id(&self) -> i32 {
87        self.spec_id
88    }
89
90    /// Get a new unpartitioned partition spec
91    pub fn unpartition_spec() -> Self {
92        Self {
93            spec_id: DEFAULT_PARTITION_SPEC_ID,
94            fields: vec![],
95        }
96    }
97
98    /// Returns if the partition spec is unpartitioned.
99    ///
100    /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform.
101    pub fn is_unpartitioned(&self) -> bool {
102        self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void)
103    }
104
105    /// Returns the partition type of this partition spec.
106    pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
107        PartitionSpecBuilder::partition_type(&self.fields, schema)
108    }
109
110    /// Convert to unbound partition spec
111    pub fn into_unbound(self) -> UnboundPartitionSpec {
112        self.into()
113    }
114
115    /// Change the spec id of the partition spec
116    pub fn with_spec_id(self, spec_id: i32) -> Self {
117        Self { spec_id, ..self }
118    }
119
120    /// Check if this partition spec has sequential partition ids.
121    /// Sequential ids start from 1000 and increment by 1 for each field.
122    /// This is required for spec version 1
123    pub fn has_sequential_ids(&self) -> bool {
124        has_sequential_ids(self.fields.iter().map(|f| f.field_id))
125    }
126
127    /// Get the highest field id in the partition spec.
128    pub fn highest_field_id(&self) -> Option<i32> {
129        self.fields.iter().map(|f| f.field_id).max()
130    }
131
132    /// Check if this partition spec is compatible with another partition spec.
133    ///
134    /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and
135    /// spec_id ignored. The following must be identical:
136    /// * The number of fields
137    /// * Field order
138    /// * Field names
139    /// * Source column ids
140    /// * Transforms
141    pub fn is_compatible_with(&self, other: &PartitionSpec) -> bool {
142        if self.fields.len() != other.fields.len() {
143            return false;
144        }
145
146        for (this_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
147            if this_field.source_id != other_field.source_id
148                || this_field.name != other_field.name
149                || this_field.transform != other_field.transform
150            {
151                return false;
152            }
153        }
154
155        true
156    }
157
158    /// Returns partition path string containing partition type and partition
159    /// value as key-value pairs.
160    pub fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
161        let partition_type = self.partition_type(&schema).unwrap();
162        let field_types = partition_type.fields();
163
164        self.fields
165            .iter()
166            .enumerate()
167            .map(|(i, field)| {
168                let value = data[i].as_ref();
169                format!(
170                    "{}={}",
171                    field.name,
172                    field
173                        .transform
174                        .to_human_string(&field_types[i].field_type, value)
175                )
176            })
177            .join("/")
178    }
179}
180
181/// A partition key represents a specific partition in a table, containing the partition spec,
182/// schema, and the actual partition values.
183#[derive(Clone, Debug)]
184pub struct PartitionKey {
185    /// The partition spec that contains the partition fields.
186    spec: PartitionSpec,
187    /// The schema to which the partition spec is bound.
188    schema: SchemaRef,
189    /// Partition fields' values in struct.
190    data: Struct,
191}
192
193impl PartitionKey {
194    /// Creates a new partition key with the given spec, schema, and data.
195    pub fn new(spec: PartitionSpec, schema: SchemaRef, data: Struct) -> Self {
196        Self { spec, schema, data }
197    }
198
199    /// Creates a new partition key from another partition key, with a new data field.
200    pub fn copy_with_data(&self, data: Struct) -> Self {
201        Self {
202            spec: self.spec.clone(),
203            schema: self.schema.clone(),
204            data,
205        }
206    }
207
208    /// Generates a partition path based on the partition values.
209    pub fn to_path(&self) -> String {
210        self.spec.partition_to_path(&self.data, self.schema.clone())
211    }
212
213    /// Returns `true` if the partition key is absent (`None`)
214    /// or represents an unpartitioned spec.
215    pub fn is_effectively_none(partition_key: Option<&PartitionKey>) -> bool {
216        match partition_key {
217            None => true,
218            Some(pk) => pk.spec.is_unpartitioned(),
219        }
220    }
221
222    /// Returns the associated [`PartitionSpec`].
223    pub fn spec(&self) -> &PartitionSpec {
224        &self.spec
225    }
226
227    /// Returns the associated [`SchemaRef`].
228    pub fn schema(&self) -> &SchemaRef {
229        &self.schema
230    }
231
232    /// Returns the associated [`Struct`].
233    pub fn data(&self) -> &Struct {
234        &self.data
235    }
236}
237
238/// Reference to [`UnboundPartitionSpec`].
239pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
240/// Unbound partition field can be built without a schema and later bound to a schema.
241#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
242#[serde(rename_all = "kebab-case")]
243pub struct UnboundPartitionField {
244    /// A source column id from the table’s schema
245    pub source_id: i32,
246    /// A partition field id that is used to identify a partition field and is unique within a partition spec.
247    /// In v2 table metadata, it is unique across all partition specs.
248    #[builder(default, setter(strip_option(fallback = field_id_opt)))]
249    pub field_id: Option<i32>,
250    /// A partition name.
251    pub name: String,
252    /// A transform that is applied to the source column to produce a partition value.
253    pub transform: Transform,
254}
255
256/// Unbound partition spec can be built without a schema and later bound to a schema.
257/// They are used to transport schema information as part of the REST specification.
258/// The main difference to [`PartitionSpec`] is that the field ids are optional.
259#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
260#[serde(rename_all = "kebab-case")]
261pub struct UnboundPartitionSpec {
262    /// Identifier for PartitionSpec
263    pub(crate) spec_id: Option<i32>,
264    /// Details of the partition spec
265    pub(crate) fields: Vec<UnboundPartitionField>,
266}
267
268impl UnboundPartitionSpec {
269    /// Create unbound partition spec builder
270    pub fn builder() -> UnboundPartitionSpecBuilder {
271        UnboundPartitionSpecBuilder::default()
272    }
273
274    /// Bind this unbound partition spec to a schema.
275    pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<PartitionSpec> {
276        PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
277    }
278
279    /// Spec id of the partition spec
280    pub fn spec_id(&self) -> Option<i32> {
281        self.spec_id
282    }
283
284    /// Fields of the partition spec
285    pub fn fields(&self) -> &[UnboundPartitionField] {
286        &self.fields
287    }
288
289    /// Change the spec id of the partition spec
290    pub fn with_spec_id(self, spec_id: i32) -> Self {
291        Self {
292            spec_id: Some(spec_id),
293            ..self
294        }
295    }
296}
297
298fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
299    for (index, field_id) in field_ids.enumerate() {
300        let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
301            .checked_add(1)
302            .and_then(|id| id.checked_add(index as i64))
303            .unwrap_or(i64::MAX);
304
305        if field_id as i64 != expected_id {
306            return false;
307        }
308    }
309
310    true
311}
312
313impl From<PartitionField> for UnboundPartitionField {
314    fn from(field: PartitionField) -> Self {
315        UnboundPartitionField {
316            source_id: field.source_id,
317            field_id: Some(field.field_id),
318            name: field.name,
319            transform: field.transform,
320        }
321    }
322}
323
324impl From<PartitionSpec> for UnboundPartitionSpec {
325    fn from(spec: PartitionSpec) -> Self {
326        UnboundPartitionSpec {
327            spec_id: Some(spec.spec_id),
328            fields: spec.fields.into_iter().map(Into::into).collect(),
329        }
330    }
331}
332
333/// Create a new UnboundPartitionSpec
334#[derive(Debug, Default)]
335pub struct UnboundPartitionSpecBuilder {
336    spec_id: Option<i32>,
337    fields: Vec<UnboundPartitionField>,
338}
339
340impl UnboundPartitionSpecBuilder {
341    /// Create a new partition spec builder with the given schema.
342    pub fn new() -> Self {
343        Self {
344            spec_id: None,
345            fields: vec![],
346        }
347    }
348
349    /// Set the spec id for the partition spec.
350    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
351        self.spec_id = Some(spec_id);
352        self
353    }
354
355    /// Add a new partition field to the partition spec from an unbound partition field.
356    pub fn add_partition_field(
357        self,
358        source_id: i32,
359        target_name: impl ToString,
360        transformation: Transform,
361    ) -> Result<Self> {
362        let field = UnboundPartitionField {
363            source_id,
364            field_id: None,
365            name: target_name.to_string(),
366            transform: transformation,
367        };
368        self.add_partition_field_internal(field)
369    }
370
371    /// Add multiple partition fields to the partition spec.
372    pub fn add_partition_fields(
373        self,
374        fields: impl IntoIterator<Item = UnboundPartitionField>,
375    ) -> Result<Self> {
376        let mut builder = self;
377        for field in fields {
378            builder = builder.add_partition_field_internal(field)?;
379        }
380        Ok(builder)
381    }
382
383    fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
384        self.check_name_set_and_unique(&field.name)?;
385        self.check_for_redundant_partitions(field.source_id, &field.transform)?;
386        if let Some(partition_field_id) = field.field_id {
387            self.check_partition_id_unique(partition_field_id)?;
388        }
389        self.fields.push(field);
390        Ok(self)
391    }
392
393    /// Build the unbound partition spec.
394    pub fn build(self) -> UnboundPartitionSpec {
395        UnboundPartitionSpec {
396            spec_id: self.spec_id,
397            fields: self.fields,
398        }
399    }
400}
401
402/// Create valid partition specs for a given schema.
403#[derive(Debug)]
404pub struct PartitionSpecBuilder {
405    spec_id: Option<i32>,
406    last_assigned_field_id: i32,
407    fields: Vec<UnboundPartitionField>,
408    schema: SchemaRef,
409}
410
411impl PartitionSpecBuilder {
412    /// Create a new partition spec builder with the given schema.
413    pub fn new(schema: impl Into<SchemaRef>) -> Self {
414        Self {
415            spec_id: None,
416            fields: vec![],
417            last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
418            schema: schema.into(),
419        }
420    }
421
422    /// Create a new partition spec builder from an existing unbound partition spec.
423    pub fn new_from_unbound(
424        unbound: UnboundPartitionSpec,
425        schema: impl Into<SchemaRef>,
426    ) -> Result<Self> {
427        let mut builder =
428            Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
429
430        for field in unbound.fields {
431            builder = builder.add_unbound_field(field)?;
432        }
433        Ok(builder)
434    }
435
436    /// Set the last assigned field id for the partition spec.
437    ///
438    /// Set this field when a new partition spec is created for an existing TableMetaData.
439    /// As `field_id` must be unique in V2 metadata, this should be set to
440    /// the highest field id used previously.
441    pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self {
442        self.last_assigned_field_id = last_assigned_field_id;
443        self
444    }
445
446    /// Set the spec id for the partition spec.
447    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
448        self.spec_id = Some(spec_id);
449        self
450    }
451
452    /// Add a new partition field to the partition spec.
453    pub fn add_partition_field(
454        self,
455        source_name: impl AsRef<str>,
456        target_name: impl Into<String>,
457        transform: Transform,
458    ) -> Result<Self> {
459        let source_id = self
460            .schema
461            .field_by_name(source_name.as_ref())
462            .ok_or_else(|| {
463                Error::new(
464                    ErrorKind::DataInvalid,
465                    format!(
466                        "Cannot find source column with name: {} in schema",
467                        source_name.as_ref()
468                    ),
469                )
470            })?
471            .id;
472        let field = UnboundPartitionField {
473            source_id,
474            field_id: None,
475            name: target_name.into(),
476            transform,
477        };
478
479        self.add_unbound_field(field)
480    }
481
482    /// Add a new partition field to the partition spec.
483    ///
484    /// If partition field id is set, it is used as the field id.
485    /// Otherwise, a new `field_id` is assigned.
486    pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
487        self.check_name_set_and_unique(&field.name)?;
488        self.check_for_redundant_partitions(field.source_id, &field.transform)?;
489        Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
490        Self::check_transform_compatibility(&field, &self.schema)?;
491        if let Some(partition_field_id) = field.field_id {
492            self.check_partition_id_unique(partition_field_id)?;
493        }
494
495        // Non-fallible from here
496        self.fields.push(field);
497        Ok(self)
498    }
499
500    /// Wrapper around `with_unbound_fields` to add multiple partition fields.
501    pub fn add_unbound_fields(
502        self,
503        fields: impl IntoIterator<Item = UnboundPartitionField>,
504    ) -> Result<Self> {
505        let mut builder = self;
506        for field in fields {
507            builder = builder.add_unbound_field(field)?;
508        }
509        Ok(builder)
510    }
511
512    /// Build a bound partition spec with the given schema.
513    pub fn build(self) -> Result<PartitionSpec> {
514        let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?;
515        Ok(PartitionSpec {
516            spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
517            fields,
518        })
519    }
520
521    fn set_field_ids(
522        fields: Vec<UnboundPartitionField>,
523        last_assigned_field_id: i32,
524    ) -> Result<Vec<PartitionField>> {
525        let mut last_assigned_field_id = last_assigned_field_id;
526        // Already assigned partition ids. If we see one of these during iteration,
527        // we skip it.
528        let assigned_ids = fields
529            .iter()
530            .filter_map(|f| f.field_id)
531            .collect::<std::collections::HashSet<_>>();
532
533        fn _check_add_1(prev: i32) -> Result<i32> {
534            prev.checked_add(1).ok_or_else(|| {
535                Error::new(
536                    ErrorKind::DataInvalid,
537                    "Cannot assign more partition ids. Overflow.",
538                )
539            })
540        }
541
542        let mut bound_fields = Vec::with_capacity(fields.len());
543        for field in fields.into_iter() {
544            let partition_field_id = if let Some(partition_field_id) = field.field_id {
545                last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
546                partition_field_id
547            } else {
548                last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
549                while assigned_ids.contains(&last_assigned_field_id) {
550                    last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
551                }
552                last_assigned_field_id
553            };
554
555            bound_fields.push(PartitionField {
556                source_id: field.source_id,
557                field_id: partition_field_id,
558                name: field.name,
559                transform: field.transform,
560            })
561        }
562
563        Ok(bound_fields)
564    }
565
566    /// Returns the partition type of this partition spec.
567    fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> Result<StructType> {
568        let mut struct_fields = Vec::with_capacity(fields.len());
569        for partition_field in fields {
570            let field = schema
571                .field_by_id(partition_field.source_id)
572                .ok_or_else(|| {
573                    Error::new(
574                        // This should never occur as check_transform_compatibility
575                        // already ensures that the source field exists in the schema
576                        ErrorKind::Unexpected,
577                        format!(
578                            "No column with source column id {} in schema {:?}",
579                            partition_field.source_id, schema
580                        ),
581                    )
582                })?;
583            let res_type = partition_field.transform.result_type(&field.field_type)?;
584            let field =
585                NestedField::optional(partition_field.field_id, &partition_field.name, res_type)
586                    .into();
587            struct_fields.push(field);
588        }
589        Ok(StructType::new(struct_fields))
590    }
591
592    /// Ensure that the partition name is unique among columns in the schema.
593    /// Duplicate names are allowed if:
594    /// 1. The column is sourced from the column with the same name.
595    /// 2. AND the transformation is identity
596    fn check_name_does_not_collide_with_schema(
597        field: &UnboundPartitionField,
598        schema: &Schema,
599    ) -> Result<()> {
600        match schema.field_by_name(field.name.as_str()) {
601            Some(schema_collision) => {
602                if field.transform == Transform::Identity {
603                    if schema_collision.id == field.source_id {
604                        Ok(())
605                    } else {
606                        Err(Error::new(
607                            ErrorKind::DataInvalid,
608                            format!(
609                                "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`",
610                                field.name, schema_collision.id, field.source_id
611                            ),
612                        ))
613                    }
614                } else {
615                    Err(Error::new(
616                        ErrorKind::DataInvalid,
617                        format!(
618                            "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.",
619                            field.name
620                        ),
621                    ))
622                }
623            }
624            None => Ok(()),
625        }
626    }
627
628    /// Ensure that the transformation of the field is compatible with type of the field
629    /// in the schema. Implicitly also checks if the source field exists in the schema.
630    fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> {
631        let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
632            Error::new(
633                ErrorKind::DataInvalid,
634                format!(
635                    "Cannot find partition source field with id `{}` in schema",
636                    field.source_id
637                ),
638            )
639        })?;
640
641        if field.transform != Transform::Void {
642            if !schema_field.field_type.is_primitive() {
643                return Err(Error::new(
644                    ErrorKind::DataInvalid,
645                    format!(
646                        "Cannot partition by non-primitive source field: '{}'.",
647                        schema_field.field_type
648                    ),
649                ));
650            }
651
652            if field
653                .transform
654                .result_type(&schema_field.field_type)
655                .is_err()
656            {
657                return Err(Error::new(
658                    ErrorKind::DataInvalid,
659                    format!(
660                        "Invalid source type: '{}' for transform: '{}'.",
661                        schema_field.field_type,
662                        field.transform.dedup_name()
663                    ),
664                ));
665            }
666        }
667
668        Ok(())
669    }
670}
671
672/// Contains checks that are common to both PartitionSpecBuilder and UnboundPartitionSpecBuilder
673trait CorePartitionSpecValidator {
674    /// Ensure that the partition name is unique among the partition fields and is not empty.
675    fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
676        if name.is_empty() {
677            return Err(Error::new(
678                ErrorKind::DataInvalid,
679                "Cannot use empty partition name",
680            ));
681        }
682
683        if self.fields().iter().any(|f| f.name == name) {
684            return Err(Error::new(
685                ErrorKind::DataInvalid,
686                format!("Cannot use partition name more than once: {name}"),
687            ));
688        }
689        Ok(())
690    }
691
692    /// For a single source-column transformations must be unique.
693    fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
694        let collision = self.fields().iter().find(|f| {
695            f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
696        });
697
698        if let Some(collision) = collision {
699            Err(Error::new(
700                ErrorKind::DataInvalid,
701                format!(
702                    "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
703                    source_id,
704                    transform.dedup_name(),
705                    collision.name
706                ),
707            ))
708        } else {
709            Ok(())
710        }
711    }
712
713    /// Check field / partition_id unique within the partition spec if set
714    fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
715        if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
716            return Err(Error::new(
717                ErrorKind::DataInvalid,
718                format!("Cannot use field id more than once in one PartitionSpec: {field_id}"),
719            ));
720        }
721
722        Ok(())
723    }
724
725    fn fields(&self) -> &Vec<UnboundPartitionField>;
726}
727
728impl CorePartitionSpecValidator for PartitionSpecBuilder {
729    fn fields(&self) -> &Vec<UnboundPartitionField> {
730        &self.fields
731    }
732}
733
734impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
735    fn fields(&self) -> &Vec<UnboundPartitionField> {
736        &self.fields
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use super::*;
743    use crate::spec::{Literal, PrimitiveType, Type};
744
745    #[test]
746    fn test_partition_spec() {
747        let spec = r#"
748        {
749        "spec-id": 1,
750        "fields": [ {
751            "source-id": 4,
752            "field-id": 1000,
753            "name": "ts_day",
754            "transform": "day"
755            }, {
756            "source-id": 1,
757            "field-id": 1001,
758            "name": "id_bucket",
759            "transform": "bucket[16]"
760            }, {
761            "source-id": 2,
762            "field-id": 1002,
763            "name": "id_truncate",
764            "transform": "truncate[4]"
765            } ]
766        }
767        "#;
768
769        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
770        assert_eq!(4, partition_spec.fields[0].source_id);
771        assert_eq!(1000, partition_spec.fields[0].field_id);
772        assert_eq!("ts_day", partition_spec.fields[0].name);
773        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
774
775        assert_eq!(1, partition_spec.fields[1].source_id);
776        assert_eq!(1001, partition_spec.fields[1].field_id);
777        assert_eq!("id_bucket", partition_spec.fields[1].name);
778        assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
779
780        assert_eq!(2, partition_spec.fields[2].source_id);
781        assert_eq!(1002, partition_spec.fields[2].field_id);
782        assert_eq!("id_truncate", partition_spec.fields[2].name);
783        assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
784    }
785
786    #[test]
787    fn test_is_unpartitioned() {
788        let schema = Schema::builder()
789            .with_fields(vec![
790                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
791                    .into(),
792                NestedField::required(
793                    2,
794                    "name",
795                    Type::Primitive(crate::spec::PrimitiveType::String),
796                )
797                .into(),
798            ])
799            .build()
800            .unwrap();
801        let partition_spec = PartitionSpec::builder(schema.clone())
802            .with_spec_id(1)
803            .build()
804            .unwrap();
805        assert!(
806            partition_spec.is_unpartitioned(),
807            "Empty partition spec should be unpartitioned"
808        );
809
810        let partition_spec = PartitionSpec::builder(schema.clone())
811            .add_unbound_fields(vec![
812                UnboundPartitionField::builder()
813                    .source_id(1)
814                    .name("id".to_string())
815                    .transform(Transform::Identity)
816                    .build(),
817                UnboundPartitionField::builder()
818                    .source_id(2)
819                    .name("name_string".to_string())
820                    .transform(Transform::Void)
821                    .build(),
822            ])
823            .unwrap()
824            .with_spec_id(1)
825            .build()
826            .unwrap();
827        assert!(
828            !partition_spec.is_unpartitioned(),
829            "Partition spec with one non void transform should not be unpartitioned"
830        );
831
832        let partition_spec = PartitionSpec::builder(schema.clone())
833            .with_spec_id(1)
834            .add_unbound_fields(vec![
835                UnboundPartitionField::builder()
836                    .source_id(1)
837                    .name("id_void".to_string())
838                    .transform(Transform::Void)
839                    .build(),
840                UnboundPartitionField::builder()
841                    .source_id(2)
842                    .name("name_void".to_string())
843                    .transform(Transform::Void)
844                    .build(),
845            ])
846            .unwrap()
847            .build()
848            .unwrap();
849        assert!(
850            partition_spec.is_unpartitioned(),
851            "Partition spec with all void field should be unpartitioned"
852        );
853    }
854
855    #[test]
856    fn test_unbound_partition_spec() {
857        let spec = r#"
858		{
859		"spec-id": 1,
860		"fields": [ {
861			"source-id": 4,
862			"field-id": 1000,
863			"name": "ts_day",
864			"transform": "day"
865			}, {
866			"source-id": 1,
867			"field-id": 1001,
868			"name": "id_bucket",
869			"transform": "bucket[16]"
870			}, {
871			"source-id": 2,
872			"field-id": 1002,
873			"name": "id_truncate",
874			"transform": "truncate[4]"
875			} ]
876		}
877		"#;
878
879        let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
880        assert_eq!(Some(1), partition_spec.spec_id);
881
882        assert_eq!(4, partition_spec.fields[0].source_id);
883        assert_eq!(Some(1000), partition_spec.fields[0].field_id);
884        assert_eq!("ts_day", partition_spec.fields[0].name);
885        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
886
887        assert_eq!(1, partition_spec.fields[1].source_id);
888        assert_eq!(Some(1001), partition_spec.fields[1].field_id);
889        assert_eq!("id_bucket", partition_spec.fields[1].name);
890        assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
891
892        assert_eq!(2, partition_spec.fields[2].source_id);
893        assert_eq!(Some(1002), partition_spec.fields[2].field_id);
894        assert_eq!("id_truncate", partition_spec.fields[2].name);
895        assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
896
897        let spec = r#"
898		{
899		"fields": [ {
900			"source-id": 4,
901			"name": "ts_day",
902			"transform": "day"
903			} ]
904		}
905		"#;
906        let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
907        assert_eq!(None, partition_spec.spec_id);
908
909        assert_eq!(4, partition_spec.fields[0].source_id);
910        assert_eq!(None, partition_spec.fields[0].field_id);
911        assert_eq!("ts_day", partition_spec.fields[0].name);
912        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
913    }
914
915    #[test]
916    fn test_new_unpartition() {
917        let schema = Schema::builder()
918            .with_fields(vec![
919                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
920                    .into(),
921                NestedField::required(
922                    2,
923                    "name",
924                    Type::Primitive(crate::spec::PrimitiveType::String),
925                )
926                .into(),
927            ])
928            .build()
929            .unwrap();
930        let partition_spec = PartitionSpec::builder(schema.clone())
931            .with_spec_id(0)
932            .build()
933            .unwrap();
934        let partition_type = partition_spec.partition_type(&schema).unwrap();
935        assert_eq!(0, partition_type.fields().len());
936
937        let unpartition_spec = PartitionSpec::unpartition_spec();
938        assert_eq!(partition_spec, unpartition_spec);
939    }
940
941    #[test]
942    fn test_partition_type() {
943        let spec = r#"
944            {
945            "spec-id": 1,
946            "fields": [ {
947                "source-id": 4,
948                "field-id": 1000,
949                "name": "ts_day",
950                "transform": "day"
951                }, {
952                "source-id": 1,
953                "field-id": 1001,
954                "name": "id_bucket",
955                "transform": "bucket[16]"
956                }, {
957                "source-id": 2,
958                "field-id": 1002,
959                "name": "id_truncate",
960                "transform": "truncate[4]"
961                } ]
962            }
963            "#;
964
965        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
966        let schema = Schema::builder()
967            .with_fields(vec![
968                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
969                    .into(),
970                NestedField::required(
971                    2,
972                    "name",
973                    Type::Primitive(crate::spec::PrimitiveType::String),
974                )
975                .into(),
976                NestedField::required(
977                    3,
978                    "ts",
979                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
980                )
981                .into(),
982                NestedField::required(
983                    4,
984                    "ts_day",
985                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
986                )
987                .into(),
988                NestedField::required(
989                    5,
990                    "id_bucket",
991                    Type::Primitive(crate::spec::PrimitiveType::Int),
992                )
993                .into(),
994                NestedField::required(
995                    6,
996                    "id_truncate",
997                    Type::Primitive(crate::spec::PrimitiveType::Int),
998                )
999                .into(),
1000            ])
1001            .build()
1002            .unwrap();
1003
1004        let partition_type = partition_spec.partition_type(&schema).unwrap();
1005        assert_eq!(3, partition_type.fields().len());
1006        assert_eq!(
1007            *partition_type.fields()[0],
1008            NestedField::optional(
1009                partition_spec.fields[0].field_id,
1010                &partition_spec.fields[0].name,
1011                Type::Primitive(crate::spec::PrimitiveType::Date)
1012            )
1013        );
1014        assert_eq!(
1015            *partition_type.fields()[1],
1016            NestedField::optional(
1017                partition_spec.fields[1].field_id,
1018                &partition_spec.fields[1].name,
1019                Type::Primitive(crate::spec::PrimitiveType::Int)
1020            )
1021        );
1022        assert_eq!(
1023            *partition_type.fields()[2],
1024            NestedField::optional(
1025                partition_spec.fields[2].field_id,
1026                &partition_spec.fields[2].name,
1027                Type::Primitive(crate::spec::PrimitiveType::String)
1028            )
1029        );
1030    }
1031
1032    #[test]
1033    fn test_partition_empty() {
1034        let spec = r#"
1035            {
1036            "spec-id": 1,
1037            "fields": []
1038            }
1039            "#;
1040
1041        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1042        let schema = Schema::builder()
1043            .with_fields(vec![
1044                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1045                    .into(),
1046                NestedField::required(
1047                    2,
1048                    "name",
1049                    Type::Primitive(crate::spec::PrimitiveType::String),
1050                )
1051                .into(),
1052                NestedField::required(
1053                    3,
1054                    "ts",
1055                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1056                )
1057                .into(),
1058                NestedField::required(
1059                    4,
1060                    "ts_day",
1061                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1062                )
1063                .into(),
1064                NestedField::required(
1065                    5,
1066                    "id_bucket",
1067                    Type::Primitive(crate::spec::PrimitiveType::Int),
1068                )
1069                .into(),
1070                NestedField::required(
1071                    6,
1072                    "id_truncate",
1073                    Type::Primitive(crate::spec::PrimitiveType::Int),
1074                )
1075                .into(),
1076            ])
1077            .build()
1078            .unwrap();
1079
1080        let partition_type = partition_spec.partition_type(&schema).unwrap();
1081        assert_eq!(0, partition_type.fields().len());
1082    }
1083
1084    #[test]
1085    fn test_partition_error() {
1086        let spec = r#"
1087        {
1088        "spec-id": 1,
1089        "fields": [ {
1090            "source-id": 4,
1091            "field-id": 1000,
1092            "name": "ts_day",
1093            "transform": "day"
1094            }, {
1095            "source-id": 1,
1096            "field-id": 1001,
1097            "name": "id_bucket",
1098            "transform": "bucket[16]"
1099            }, {
1100            "source-id": 2,
1101            "field-id": 1002,
1102            "name": "id_truncate",
1103            "transform": "truncate[4]"
1104            } ]
1105        }
1106        "#;
1107
1108        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1109        let schema = Schema::builder()
1110            .with_fields(vec![
1111                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1112                    .into(),
1113                NestedField::required(
1114                    2,
1115                    "name",
1116                    Type::Primitive(crate::spec::PrimitiveType::String),
1117                )
1118                .into(),
1119            ])
1120            .build()
1121            .unwrap();
1122
1123        assert!(partition_spec.partition_type(&schema).is_err());
1124    }
1125
1126    #[test]
1127    fn test_builder_disallow_duplicate_names() {
1128        UnboundPartitionSpec::builder()
1129            .add_partition_field(1, "ts_day".to_string(), Transform::Day)
1130            .unwrap()
1131            .add_partition_field(2, "ts_day".to_string(), Transform::Day)
1132            .unwrap_err();
1133    }
1134
1135    #[test]
1136    fn test_builder_disallow_duplicate_field_ids() {
1137        let schema = Schema::builder()
1138            .with_fields(vec![
1139                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1140                    .into(),
1141                NestedField::required(
1142                    2,
1143                    "name",
1144                    Type::Primitive(crate::spec::PrimitiveType::String),
1145                )
1146                .into(),
1147            ])
1148            .build()
1149            .unwrap();
1150        PartitionSpec::builder(schema.clone())
1151            .add_unbound_field(UnboundPartitionField {
1152                source_id: 1,
1153                field_id: Some(1000),
1154                name: "id".to_string(),
1155                transform: Transform::Identity,
1156            })
1157            .unwrap()
1158            .add_unbound_field(UnboundPartitionField {
1159                source_id: 2,
1160                field_id: Some(1000),
1161                name: "id_bucket".to_string(),
1162                transform: Transform::Bucket(16),
1163            })
1164            .unwrap_err();
1165    }
1166
1167    #[test]
1168    fn test_builder_auto_assign_field_ids() {
1169        let schema = Schema::builder()
1170            .with_fields(vec![
1171                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1172                    .into(),
1173                NestedField::required(
1174                    2,
1175                    "name",
1176                    Type::Primitive(crate::spec::PrimitiveType::String),
1177                )
1178                .into(),
1179                NestedField::required(
1180                    3,
1181                    "ts",
1182                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1183                )
1184                .into(),
1185            ])
1186            .build()
1187            .unwrap();
1188        let spec = PartitionSpec::builder(schema.clone())
1189            .with_spec_id(1)
1190            .add_unbound_field(UnboundPartitionField {
1191                source_id: 1,
1192                name: "id".to_string(),
1193                transform: Transform::Identity,
1194                field_id: Some(1012),
1195            })
1196            .unwrap()
1197            .add_unbound_field(UnboundPartitionField {
1198                source_id: 2,
1199                name: "name_void".to_string(),
1200                transform: Transform::Void,
1201                field_id: None,
1202            })
1203            .unwrap()
1204            // Should keep its ID even if its lower
1205            .add_unbound_field(UnboundPartitionField {
1206                source_id: 3,
1207                name: "year".to_string(),
1208                transform: Transform::Year,
1209                field_id: Some(1),
1210            })
1211            .unwrap()
1212            .build()
1213            .unwrap();
1214
1215        assert_eq!(1012, spec.fields[0].field_id);
1216        assert_eq!(1013, spec.fields[1].field_id);
1217        assert_eq!(1, spec.fields[2].field_id);
1218    }
1219
1220    #[test]
1221    fn test_builder_valid_schema() {
1222        let schema = Schema::builder()
1223            .with_fields(vec![
1224                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1225                    .into(),
1226                NestedField::required(
1227                    2,
1228                    "name",
1229                    Type::Primitive(crate::spec::PrimitiveType::String),
1230                )
1231                .into(),
1232            ])
1233            .build()
1234            .unwrap();
1235
1236        PartitionSpec::builder(schema.clone())
1237            .with_spec_id(1)
1238            .build()
1239            .unwrap();
1240
1241        let spec = PartitionSpec::builder(schema.clone())
1242            .with_spec_id(1)
1243            .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
1244            .unwrap()
1245            .build()
1246            .unwrap();
1247
1248        assert_eq!(spec, PartitionSpec {
1249            spec_id: 1,
1250            fields: vec![PartitionField {
1251                source_id: 1,
1252                field_id: 1000,
1253                name: "id_bucket[16]".to_string(),
1254                transform: Transform::Bucket(16),
1255            }],
1256        });
1257        assert_eq!(
1258            spec.partition_type(&schema).unwrap(),
1259            StructType::new(vec![
1260                NestedField::optional(1000, "id_bucket[16]", Type::Primitive(PrimitiveType::Int))
1261                    .into()
1262            ])
1263        )
1264    }
1265
1266    #[test]
1267    fn test_collision_with_schema_name() {
1268        let schema = Schema::builder()
1269            .with_fields(vec![
1270                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1271                    .into(),
1272            ])
1273            .build()
1274            .unwrap();
1275
1276        PartitionSpec::builder(schema.clone())
1277            .with_spec_id(1)
1278            .build()
1279            .unwrap();
1280
1281        let err = PartitionSpec::builder(schema)
1282            .with_spec_id(1)
1283            .add_unbound_field(UnboundPartitionField {
1284                source_id: 1,
1285                field_id: None,
1286                name: "id".to_string(),
1287                transform: Transform::Bucket(16),
1288            })
1289            .unwrap_err();
1290        assert!(err.message().contains("conflicts with schema"))
1291    }
1292
1293    #[test]
1294    fn test_builder_collision_is_ok_for_identity_transforms() {
1295        let schema = Schema::builder()
1296            .with_fields(vec![
1297                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1298                    .into(),
1299                NestedField::required(
1300                    2,
1301                    "number",
1302                    Type::Primitive(crate::spec::PrimitiveType::Int),
1303                )
1304                .into(),
1305            ])
1306            .build()
1307            .unwrap();
1308
1309        PartitionSpec::builder(schema.clone())
1310            .with_spec_id(1)
1311            .build()
1312            .unwrap();
1313
1314        PartitionSpec::builder(schema.clone())
1315            .with_spec_id(1)
1316            .add_unbound_field(UnboundPartitionField {
1317                source_id: 1,
1318                field_id: None,
1319                name: "id".to_string(),
1320                transform: Transform::Identity,
1321            })
1322            .unwrap()
1323            .build()
1324            .unwrap();
1325
1326        // Not OK for different source id
1327        PartitionSpec::builder(schema)
1328            .with_spec_id(1)
1329            .add_unbound_field(UnboundPartitionField {
1330                source_id: 2,
1331                field_id: None,
1332                name: "id".to_string(),
1333                transform: Transform::Identity,
1334            })
1335            .unwrap_err();
1336    }
1337
1338    #[test]
1339    fn test_builder_all_source_ids_must_exist() {
1340        let schema = Schema::builder()
1341            .with_fields(vec![
1342                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1343                    .into(),
1344                NestedField::required(
1345                    2,
1346                    "name",
1347                    Type::Primitive(crate::spec::PrimitiveType::String),
1348                )
1349                .into(),
1350                NestedField::required(
1351                    3,
1352                    "ts",
1353                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1354                )
1355                .into(),
1356            ])
1357            .build()
1358            .unwrap();
1359
1360        // Valid
1361        PartitionSpec::builder(schema.clone())
1362            .with_spec_id(1)
1363            .add_unbound_fields(vec![
1364                UnboundPartitionField {
1365                    source_id: 1,
1366                    field_id: None,
1367                    name: "id_bucket".to_string(),
1368                    transform: Transform::Bucket(16),
1369                },
1370                UnboundPartitionField {
1371                    source_id: 2,
1372                    field_id: None,
1373                    name: "name".to_string(),
1374                    transform: Transform::Identity,
1375                },
1376            ])
1377            .unwrap()
1378            .build()
1379            .unwrap();
1380
1381        // Invalid
1382        PartitionSpec::builder(schema)
1383            .with_spec_id(1)
1384            .add_unbound_fields(vec![
1385                UnboundPartitionField {
1386                    source_id: 1,
1387                    field_id: None,
1388                    name: "id_bucket".to_string(),
1389                    transform: Transform::Bucket(16),
1390                },
1391                UnboundPartitionField {
1392                    source_id: 4,
1393                    field_id: None,
1394                    name: "name".to_string(),
1395                    transform: Transform::Identity,
1396                },
1397            ])
1398            .unwrap_err();
1399    }
1400
1401    #[test]
1402    fn test_builder_disallows_redundant() {
1403        let err = UnboundPartitionSpec::builder()
1404            .with_spec_id(1)
1405            .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16))
1406            .unwrap()
1407            .add_partition_field(
1408                1,
1409                "id_bucket_with_other_name".to_string(),
1410                Transform::Bucket(16),
1411            )
1412            .unwrap_err();
1413        assert!(err.message().contains("redundant partition"));
1414    }
1415
1416    #[test]
1417    fn test_builder_incompatible_transforms_disallowed() {
1418        let schema = Schema::builder()
1419            .with_fields(vec![
1420                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1421                    .into(),
1422            ])
1423            .build()
1424            .unwrap();
1425
1426        PartitionSpec::builder(schema)
1427            .with_spec_id(1)
1428            .add_unbound_field(UnboundPartitionField {
1429                source_id: 1,
1430                field_id: None,
1431                name: "id_year".to_string(),
1432                transform: Transform::Year,
1433            })
1434            .unwrap_err();
1435    }
1436
1437    #[test]
1438    fn test_build_unbound_specs_without_partition_id() {
1439        let spec = UnboundPartitionSpec::builder()
1440            .with_spec_id(1)
1441            .add_partition_fields(vec![UnboundPartitionField {
1442                source_id: 1,
1443                field_id: None,
1444                name: "id_bucket[16]".to_string(),
1445                transform: Transform::Bucket(16),
1446            }])
1447            .unwrap()
1448            .build();
1449
1450        assert_eq!(spec, UnboundPartitionSpec {
1451            spec_id: Some(1),
1452            fields: vec![UnboundPartitionField {
1453                source_id: 1,
1454                field_id: None,
1455                name: "id_bucket[16]".to_string(),
1456                transform: Transform::Bucket(16),
1457            }]
1458        });
1459    }
1460
1461    #[test]
1462    fn test_is_compatible_with() {
1463        let schema = Schema::builder()
1464            .with_fields(vec![
1465                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1466                    .into(),
1467                NestedField::required(
1468                    2,
1469                    "name",
1470                    Type::Primitive(crate::spec::PrimitiveType::String),
1471                )
1472                .into(),
1473            ])
1474            .build()
1475            .unwrap();
1476
1477        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1478            .with_spec_id(1)
1479            .add_unbound_field(UnboundPartitionField {
1480                source_id: 1,
1481                field_id: None,
1482                name: "id_bucket".to_string(),
1483                transform: Transform::Bucket(16),
1484            })
1485            .unwrap()
1486            .build()
1487            .unwrap();
1488
1489        let partition_spec_2 = PartitionSpec::builder(schema)
1490            .with_spec_id(1)
1491            .add_unbound_field(UnboundPartitionField {
1492                source_id: 1,
1493                field_id: None,
1494                name: "id_bucket".to_string(),
1495                transform: Transform::Bucket(16),
1496            })
1497            .unwrap()
1498            .build()
1499            .unwrap();
1500
1501        assert!(partition_spec_1.is_compatible_with(&partition_spec_2));
1502    }
1503
1504    #[test]
1505    fn test_not_compatible_with_transform_different() {
1506        let schema = Schema::builder()
1507            .with_fields(vec![
1508                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1509                    .into(),
1510            ])
1511            .build()
1512            .unwrap();
1513
1514        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1515            .with_spec_id(1)
1516            .add_unbound_field(UnboundPartitionField {
1517                source_id: 1,
1518                field_id: None,
1519                name: "id_bucket".to_string(),
1520                transform: Transform::Bucket(16),
1521            })
1522            .unwrap()
1523            .build()
1524            .unwrap();
1525
1526        let partition_spec_2 = PartitionSpec::builder(schema)
1527            .with_spec_id(1)
1528            .add_unbound_field(UnboundPartitionField {
1529                source_id: 1,
1530                field_id: None,
1531                name: "id_bucket".to_string(),
1532                transform: Transform::Bucket(32),
1533            })
1534            .unwrap()
1535            .build()
1536            .unwrap();
1537
1538        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1539    }
1540
1541    #[test]
1542    fn test_not_compatible_with_source_id_different() {
1543        let schema = Schema::builder()
1544            .with_fields(vec![
1545                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1546                    .into(),
1547                NestedField::required(
1548                    2,
1549                    "name",
1550                    Type::Primitive(crate::spec::PrimitiveType::String),
1551                )
1552                .into(),
1553            ])
1554            .build()
1555            .unwrap();
1556
1557        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1558            .with_spec_id(1)
1559            .add_unbound_field(UnboundPartitionField {
1560                source_id: 1,
1561                field_id: None,
1562                name: "id_bucket".to_string(),
1563                transform: Transform::Bucket(16),
1564            })
1565            .unwrap()
1566            .build()
1567            .unwrap();
1568
1569        let partition_spec_2 = PartitionSpec::builder(schema)
1570            .with_spec_id(1)
1571            .add_unbound_field(UnboundPartitionField {
1572                source_id: 2,
1573                field_id: None,
1574                name: "id_bucket".to_string(),
1575                transform: Transform::Bucket(16),
1576            })
1577            .unwrap()
1578            .build()
1579            .unwrap();
1580
1581        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1582    }
1583
1584    #[test]
1585    fn test_not_compatible_with_order_different() {
1586        let schema = Schema::builder()
1587            .with_fields(vec![
1588                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1589                    .into(),
1590                NestedField::required(
1591                    2,
1592                    "name",
1593                    Type::Primitive(crate::spec::PrimitiveType::String),
1594                )
1595                .into(),
1596            ])
1597            .build()
1598            .unwrap();
1599
1600        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1601            .with_spec_id(1)
1602            .add_unbound_field(UnboundPartitionField {
1603                source_id: 1,
1604                field_id: None,
1605                name: "id_bucket".to_string(),
1606                transform: Transform::Bucket(16),
1607            })
1608            .unwrap()
1609            .add_unbound_field(UnboundPartitionField {
1610                source_id: 2,
1611                field_id: None,
1612                name: "name".to_string(),
1613                transform: Transform::Identity,
1614            })
1615            .unwrap()
1616            .build()
1617            .unwrap();
1618
1619        let partition_spec_2 = PartitionSpec::builder(schema)
1620            .with_spec_id(1)
1621            .add_unbound_field(UnboundPartitionField {
1622                source_id: 2,
1623                field_id: None,
1624                name: "name".to_string(),
1625                transform: Transform::Identity,
1626            })
1627            .unwrap()
1628            .add_unbound_field(UnboundPartitionField {
1629                source_id: 1,
1630                field_id: None,
1631                name: "id_bucket".to_string(),
1632                transform: Transform::Bucket(16),
1633            })
1634            .unwrap()
1635            .build()
1636            .unwrap();
1637
1638        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1639    }
1640
1641    #[test]
1642    fn test_highest_field_id_unpartitioned() {
1643        let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
1644            .with_spec_id(1)
1645            .build()
1646            .unwrap();
1647
1648        assert!(spec.highest_field_id().is_none());
1649    }
1650
1651    #[test]
1652    fn test_highest_field_id() {
1653        let schema = Schema::builder()
1654            .with_fields(vec![
1655                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1656                    .into(),
1657                NestedField::required(
1658                    2,
1659                    "name",
1660                    Type::Primitive(crate::spec::PrimitiveType::String),
1661                )
1662                .into(),
1663            ])
1664            .build()
1665            .unwrap();
1666
1667        let spec = PartitionSpec::builder(schema)
1668            .with_spec_id(1)
1669            .add_unbound_field(UnboundPartitionField {
1670                source_id: 1,
1671                field_id: Some(1001),
1672                name: "id".to_string(),
1673                transform: Transform::Identity,
1674            })
1675            .unwrap()
1676            .add_unbound_field(UnboundPartitionField {
1677                source_id: 2,
1678                field_id: Some(1000),
1679                name: "name".to_string(),
1680                transform: Transform::Identity,
1681            })
1682            .unwrap()
1683            .build()
1684            .unwrap();
1685
1686        assert_eq!(Some(1001), spec.highest_field_id());
1687    }
1688
1689    #[test]
1690    fn test_has_sequential_ids() {
1691        let schema = Schema::builder()
1692            .with_fields(vec![
1693                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1694                    .into(),
1695                NestedField::required(
1696                    2,
1697                    "name",
1698                    Type::Primitive(crate::spec::PrimitiveType::String),
1699                )
1700                .into(),
1701            ])
1702            .build()
1703            .unwrap();
1704
1705        let spec = PartitionSpec::builder(schema)
1706            .with_spec_id(1)
1707            .add_unbound_field(UnboundPartitionField {
1708                source_id: 1,
1709                field_id: Some(1000),
1710                name: "id".to_string(),
1711                transform: Transform::Identity,
1712            })
1713            .unwrap()
1714            .add_unbound_field(UnboundPartitionField {
1715                source_id: 2,
1716                field_id: Some(1001),
1717                name: "name".to_string(),
1718                transform: Transform::Identity,
1719            })
1720            .unwrap()
1721            .build()
1722            .unwrap();
1723
1724        assert_eq!(1000, spec.fields[0].field_id);
1725        assert_eq!(1001, spec.fields[1].field_id);
1726        assert!(spec.has_sequential_ids());
1727    }
1728
1729    #[test]
1730    fn test_sequential_ids_must_start_at_1000() {
1731        let schema = Schema::builder()
1732            .with_fields(vec![
1733                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1734                    .into(),
1735                NestedField::required(
1736                    2,
1737                    "name",
1738                    Type::Primitive(crate::spec::PrimitiveType::String),
1739                )
1740                .into(),
1741            ])
1742            .build()
1743            .unwrap();
1744
1745        let spec = PartitionSpec::builder(schema)
1746            .with_spec_id(1)
1747            .add_unbound_field(UnboundPartitionField {
1748                source_id: 1,
1749                field_id: Some(999),
1750                name: "id".to_string(),
1751                transform: Transform::Identity,
1752            })
1753            .unwrap()
1754            .add_unbound_field(UnboundPartitionField {
1755                source_id: 2,
1756                field_id: Some(1000),
1757                name: "name".to_string(),
1758                transform: Transform::Identity,
1759            })
1760            .unwrap()
1761            .build()
1762            .unwrap();
1763
1764        assert_eq!(999, spec.fields[0].field_id);
1765        assert_eq!(1000, spec.fields[1].field_id);
1766        assert!(!spec.has_sequential_ids());
1767    }
1768
1769    #[test]
1770    fn test_sequential_ids_must_have_no_gaps() {
1771        let schema = Schema::builder()
1772            .with_fields(vec![
1773                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1774                    .into(),
1775                NestedField::required(
1776                    2,
1777                    "name",
1778                    Type::Primitive(crate::spec::PrimitiveType::String),
1779                )
1780                .into(),
1781            ])
1782            .build()
1783            .unwrap();
1784
1785        let spec = PartitionSpec::builder(schema)
1786            .with_spec_id(1)
1787            .add_unbound_field(UnboundPartitionField {
1788                source_id: 1,
1789                field_id: Some(1000),
1790                name: "id".to_string(),
1791                transform: Transform::Identity,
1792            })
1793            .unwrap()
1794            .add_unbound_field(UnboundPartitionField {
1795                source_id: 2,
1796                field_id: Some(1002),
1797                name: "name".to_string(),
1798                transform: Transform::Identity,
1799            })
1800            .unwrap()
1801            .build()
1802            .unwrap();
1803
1804        assert_eq!(1000, spec.fields[0].field_id);
1805        assert_eq!(1002, spec.fields[1].field_id);
1806        assert!(!spec.has_sequential_ids());
1807    }
1808
1809    #[test]
1810    fn test_partition_to_path() {
1811        let schema = Schema::builder()
1812            .with_fields(vec![
1813                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1814                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1815                NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
1816                    .into(),
1817                NestedField::required(4, "empty", Type::Primitive(PrimitiveType::String)).into(),
1818            ])
1819            .build()
1820            .unwrap();
1821
1822        let spec = PartitionSpec::builder(schema.clone())
1823            .add_partition_field("id", "id", Transform::Identity)
1824            .unwrap()
1825            .add_partition_field("name", "name", Transform::Identity)
1826            .unwrap()
1827            .add_partition_field("timestamp", "ts_hour", Transform::Hour)
1828            .unwrap()
1829            .add_partition_field("empty", "empty_void", Transform::Void)
1830            .unwrap()
1831            .build()
1832            .unwrap();
1833
1834        let data = Struct::from_iter([
1835            Some(Literal::int(42)),
1836            Some(Literal::string("alice")),
1837            Some(Literal::int(1000)),
1838            Some(Literal::string("empty")),
1839        ]);
1840
1841        assert_eq!(
1842            spec.partition_to_path(&data, schema.into()),
1843            "id=42/name=alice/ts_hour=1000/empty_void=null"
1844        );
1845    }
1846}