Skip to main content

iceberg/spec/
partition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Partitioning
20 */
21use std::sync::Arc;
22
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use typed_builder::TypedBuilder;
26
27use super::transform::Transform;
28use super::{NestedField, Schema, SchemaRef, StructType};
29use crate::spec::Struct;
30use crate::{Error, ErrorKind, Result};
31
32pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
33pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
34
35/// Partition fields capture the transform from table data to partition values.
36#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
37#[serde(rename_all = "kebab-case")]
38pub struct PartitionField {
39    /// A source column id from the table’s schema
40    pub source_id: i32,
41    /// A partition field id that is used to identify a partition field and is unique within a partition spec.
42    /// In v2 table metadata, it is unique across all partition specs.
43    pub field_id: i32,
44    /// A partition name.
45    pub name: String,
46    /// A transform that is applied to the source column to produce a partition value.
47    pub transform: Transform,
48}
49
50impl PartitionField {
51    /// To unbound partition field
52    pub fn into_unbound(self) -> UnboundPartitionField {
53        self.into()
54    }
55}
56
57/// Reference to [`PartitionSpec`].
58pub type PartitionSpecRef = Arc<PartitionSpec>;
59/// Partition spec that defines how to produce a tuple of partition values from a record.
60///
61/// A [`PartitionSpec`] is originally obtained by binding an [`UnboundPartitionSpec`] to a schema and is
62/// only guaranteed to be valid for that schema. The main difference between [`PartitionSpec`] and
63/// [`UnboundPartitionSpec`] is that the former has field ids assigned,
64/// while field ids are optional for [`UnboundPartitionSpec`].
65#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
66#[serde(rename_all = "kebab-case")]
67pub struct PartitionSpec {
68    /// Identifier for PartitionSpec
69    spec_id: i32,
70    /// Details of the partition spec
71    fields: Vec<PartitionField>,
72}
73
74impl PartitionSpec {
75    /// Create a new partition spec builder with the given schema.
76    pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
77        PartitionSpecBuilder::new(schema)
78    }
79
80    /// Fields of the partition spec
81    pub fn fields(&self) -> &[PartitionField] {
82        &self.fields
83    }
84
85    /// Spec id of the partition spec
86    pub fn spec_id(&self) -> i32 {
87        self.spec_id
88    }
89
90    /// Get a new unpartitioned partition spec
91    pub fn unpartition_spec() -> Self {
92        Self {
93            spec_id: DEFAULT_PARTITION_SPEC_ID,
94            fields: vec![],
95        }
96    }
97
98    /// Returns if the partition spec is unpartitioned.
99    ///
100    /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform.
101    pub fn is_unpartitioned(&self) -> bool {
102        self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void)
103    }
104
105    /// Returns the partition type of this partition spec.
106    pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
107        PartitionSpecBuilder::partition_type(&self.fields, schema)
108    }
109
110    /// Convert to unbound partition spec
111    pub fn into_unbound(self) -> UnboundPartitionSpec {
112        self.into()
113    }
114
115    /// Change the spec id of the partition spec
116    pub fn with_spec_id(self, spec_id: i32) -> Self {
117        Self { spec_id, ..self }
118    }
119
120    /// Check if this partition spec has sequential partition ids.
121    /// Sequential ids start from 1000 and increment by 1 for each field.
122    /// This is required for spec version 1
123    pub fn has_sequential_ids(&self) -> bool {
124        has_sequential_ids(self.fields.iter().map(|f| f.field_id))
125    }
126
127    /// Get the highest field id in the partition spec.
128    pub fn highest_field_id(&self) -> Option<i32> {
129        self.fields.iter().map(|f| f.field_id).max()
130    }
131
132    /// Check if this partition spec is compatible with another partition spec.
133    ///
134    /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and
135    /// spec_id ignored. The following must be identical:
136    /// * The number of fields
137    /// * Field order
138    /// * Field names
139    /// * Source column ids
140    /// * Transforms
141    pub fn is_compatible_with(&self, other: &PartitionSpec) -> bool {
142        if self.fields.len() != other.fields.len() {
143            return false;
144        }
145
146        for (this_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
147            if this_field.source_id != other_field.source_id
148                || this_field.name != other_field.name
149                || this_field.transform != other_field.transform
150            {
151                return false;
152            }
153        }
154
155        true
156    }
157
158    /// Returns partition path string containing partition type and partition
159    /// value as key-value pairs.
160    pub fn partition_to_path(&self, data: &Struct, schema: SchemaRef) -> String {
161        let partition_type = self.partition_type(&schema).unwrap();
162        let field_types = partition_type.fields();
163
164        self.fields
165            .iter()
166            .enumerate()
167            .map(|(i, field)| {
168                let value = data[i].as_ref();
169                format!(
170                    "{}={}",
171                    field.name,
172                    field
173                        .transform
174                        .to_human_string(&field_types[i].field_type, value)
175                )
176            })
177            .join("/")
178    }
179}
180
181/// A partition key represents a specific partition in a table, containing the partition spec,
182/// schema, and the actual partition values.
183#[derive(Clone, Debug)]
184pub struct PartitionKey {
185    /// The partition spec that contains the partition fields.
186    spec: PartitionSpec,
187    /// The schema to which the partition spec is bound.
188    schema: SchemaRef,
189    /// Partition fields' values in struct.
190    data: Struct,
191}
192
193impl PartitionKey {
194    /// Creates a new partition key with the given spec, schema, and data.
195    pub fn new(spec: PartitionSpec, schema: SchemaRef, data: Struct) -> Self {
196        Self { spec, schema, data }
197    }
198
199    /// Creates a new partition key from another partition key, with a new data field.
200    pub fn copy_with_data(&self, data: Struct) -> Self {
201        Self {
202            spec: self.spec.clone(),
203            schema: self.schema.clone(),
204            data,
205        }
206    }
207
208    /// Generates a partition path based on the partition values.
209    pub fn to_path(&self) -> String {
210        self.spec.partition_to_path(&self.data, self.schema.clone())
211    }
212
213    /// Returns `true` if the partition key is absent (`None`)
214    /// or represents an unpartitioned spec.
215    pub fn is_effectively_none(partition_key: Option<&PartitionKey>) -> bool {
216        match partition_key {
217            None => true,
218            Some(pk) => pk.spec.is_unpartitioned(),
219        }
220    }
221
222    /// Returns the associated [`PartitionSpec`].
223    pub fn spec(&self) -> &PartitionSpec {
224        &self.spec
225    }
226
227    /// Returns the associated [`SchemaRef`].
228    pub fn schema(&self) -> &SchemaRef {
229        &self.schema
230    }
231
232    /// Returns the associated [`Struct`].
233    pub fn data(&self) -> &Struct {
234        &self.data
235    }
236}
237
238/// Reference to [`UnboundPartitionSpec`].
239pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
240/// Unbound partition field can be built without a schema and later bound to a schema.
241#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
242#[serde(rename_all = "kebab-case")]
243pub struct UnboundPartitionField {
244    /// A source column id from the table’s schema
245    pub source_id: i32,
246    /// A partition field id that is used to identify a partition field and is unique within a partition spec.
247    /// In v2 table metadata, it is unique across all partition specs.
248    #[builder(default, setter(strip_option(fallback = field_id_opt)))]
249    #[serde(skip_serializing_if = "Option::is_none")]
250    pub field_id: Option<i32>,
251    /// A partition name.
252    pub name: String,
253    /// A transform that is applied to the source column to produce a partition value.
254    pub transform: Transform,
255}
256
257/// Unbound partition spec can be built without a schema and later bound to a schema.
258/// They are used to transport schema information as part of the REST specification.
259/// The main difference to [`PartitionSpec`] is that the field ids are optional.
260#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
261#[serde(rename_all = "kebab-case")]
262pub struct UnboundPartitionSpec {
263    /// Identifier for PartitionSpec
264    #[serde(skip_serializing_if = "Option::is_none")]
265    pub(crate) spec_id: Option<i32>,
266    /// Details of the partition spec
267    pub(crate) fields: Vec<UnboundPartitionField>,
268}
269
270impl UnboundPartitionSpec {
271    /// Create unbound partition spec builder
272    pub fn builder() -> UnboundPartitionSpecBuilder {
273        UnboundPartitionSpecBuilder::default()
274    }
275
276    /// Bind this unbound partition spec to a schema.
277    pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<PartitionSpec> {
278        PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
279    }
280
281    /// Spec id of the partition spec
282    pub fn spec_id(&self) -> Option<i32> {
283        self.spec_id
284    }
285
286    /// Fields of the partition spec
287    pub fn fields(&self) -> &[UnboundPartitionField] {
288        &self.fields
289    }
290
291    /// Change the spec id of the partition spec
292    pub fn with_spec_id(self, spec_id: i32) -> Self {
293        Self {
294            spec_id: Some(spec_id),
295            ..self
296        }
297    }
298}
299
300fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
301    for (index, field_id) in field_ids.enumerate() {
302        let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
303            .checked_add(1)
304            .and_then(|id| id.checked_add(index as i64))
305            .unwrap_or(i64::MAX);
306
307        if field_id as i64 != expected_id {
308            return false;
309        }
310    }
311
312    true
313}
314
315impl From<PartitionField> for UnboundPartitionField {
316    fn from(field: PartitionField) -> Self {
317        UnboundPartitionField {
318            source_id: field.source_id,
319            field_id: Some(field.field_id),
320            name: field.name,
321            transform: field.transform,
322        }
323    }
324}
325
326impl From<PartitionSpec> for UnboundPartitionSpec {
327    fn from(spec: PartitionSpec) -> Self {
328        UnboundPartitionSpec {
329            spec_id: Some(spec.spec_id),
330            fields: spec.fields.into_iter().map(Into::into).collect(),
331        }
332    }
333}
334
335/// Create a new UnboundPartitionSpec
336#[derive(Debug, Default)]
337pub struct UnboundPartitionSpecBuilder {
338    spec_id: Option<i32>,
339    fields: Vec<UnboundPartitionField>,
340}
341
342impl UnboundPartitionSpecBuilder {
343    /// Create a new partition spec builder with the given schema.
344    pub fn new() -> Self {
345        Self {
346            spec_id: None,
347            fields: vec![],
348        }
349    }
350
351    /// Set the spec id for the partition spec.
352    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
353        self.spec_id = Some(spec_id);
354        self
355    }
356
357    /// Add a new partition field to the partition spec from an unbound partition field.
358    pub fn add_partition_field(
359        self,
360        source_id: i32,
361        target_name: impl ToString,
362        transformation: Transform,
363    ) -> Result<Self> {
364        let field = UnboundPartitionField {
365            source_id,
366            field_id: None,
367            name: target_name.to_string(),
368            transform: transformation,
369        };
370        self.add_partition_field_internal(field)
371    }
372
373    /// Add multiple partition fields to the partition spec.
374    pub fn add_partition_fields(
375        self,
376        fields: impl IntoIterator<Item = UnboundPartitionField>,
377    ) -> Result<Self> {
378        let mut builder = self;
379        for field in fields {
380            builder = builder.add_partition_field_internal(field)?;
381        }
382        Ok(builder)
383    }
384
385    fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
386        self.check_name_set_and_unique(&field.name)?;
387        self.check_for_redundant_partitions(field.source_id, &field.transform)?;
388        if let Some(partition_field_id) = field.field_id {
389            self.check_partition_id_unique(partition_field_id)?;
390        }
391        self.fields.push(field);
392        Ok(self)
393    }
394
395    /// Build the unbound partition spec.
396    pub fn build(self) -> UnboundPartitionSpec {
397        UnboundPartitionSpec {
398            spec_id: self.spec_id,
399            fields: self.fields,
400        }
401    }
402}
403
404/// Create valid partition specs for a given schema.
405#[derive(Debug)]
406pub struct PartitionSpecBuilder {
407    spec_id: Option<i32>,
408    last_assigned_field_id: i32,
409    fields: Vec<UnboundPartitionField>,
410    schema: SchemaRef,
411}
412
413impl PartitionSpecBuilder {
414    /// Create a new partition spec builder with the given schema.
415    pub fn new(schema: impl Into<SchemaRef>) -> Self {
416        Self {
417            spec_id: None,
418            fields: vec![],
419            last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
420            schema: schema.into(),
421        }
422    }
423
424    /// Create a new partition spec builder from an existing unbound partition spec.
425    pub fn new_from_unbound(
426        unbound: UnboundPartitionSpec,
427        schema: impl Into<SchemaRef>,
428    ) -> Result<Self> {
429        let mut builder =
430            Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
431
432        for field in unbound.fields {
433            builder = builder.add_unbound_field(field)?;
434        }
435        Ok(builder)
436    }
437
438    /// Set the last assigned field id for the partition spec.
439    ///
440    /// Set this field when a new partition spec is created for an existing TableMetaData.
441    /// As `field_id` must be unique in V2 metadata, this should be set to
442    /// the highest field id used previously.
443    pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self {
444        self.last_assigned_field_id = last_assigned_field_id;
445        self
446    }
447
448    /// Set the spec id for the partition spec.
449    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
450        self.spec_id = Some(spec_id);
451        self
452    }
453
454    /// Add a new partition field to the partition spec.
455    pub fn add_partition_field(
456        self,
457        source_name: impl AsRef<str>,
458        target_name: impl Into<String>,
459        transform: Transform,
460    ) -> Result<Self> {
461        let source_id = self
462            .schema
463            .field_by_name(source_name.as_ref())
464            .ok_or_else(|| {
465                Error::new(
466                    ErrorKind::DataInvalid,
467                    format!(
468                        "Cannot find source column with name: {} in schema",
469                        source_name.as_ref()
470                    ),
471                )
472            })?
473            .id;
474        let field = UnboundPartitionField {
475            source_id,
476            field_id: None,
477            name: target_name.into(),
478            transform,
479        };
480
481        self.add_unbound_field(field)
482    }
483
484    /// Add a new partition field to the partition spec.
485    ///
486    /// If partition field id is set, it is used as the field id.
487    /// Otherwise, a new `field_id` is assigned.
488    pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
489        self.check_name_set_and_unique(&field.name)?;
490        self.check_for_redundant_partitions(field.source_id, &field.transform)?;
491        Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
492        Self::check_transform_compatibility(&field, &self.schema)?;
493        if let Some(partition_field_id) = field.field_id {
494            self.check_partition_id_unique(partition_field_id)?;
495        }
496
497        // Non-fallible from here
498        self.fields.push(field);
499        Ok(self)
500    }
501
502    /// Wrapper around `with_unbound_fields` to add multiple partition fields.
503    pub fn add_unbound_fields(
504        self,
505        fields: impl IntoIterator<Item = UnboundPartitionField>,
506    ) -> Result<Self> {
507        let mut builder = self;
508        for field in fields {
509            builder = builder.add_unbound_field(field)?;
510        }
511        Ok(builder)
512    }
513
514    /// Build a bound partition spec with the given schema.
515    pub fn build(self) -> Result<PartitionSpec> {
516        let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?;
517        Ok(PartitionSpec {
518            spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
519            fields,
520        })
521    }
522
523    fn set_field_ids(
524        fields: Vec<UnboundPartitionField>,
525        last_assigned_field_id: i32,
526    ) -> Result<Vec<PartitionField>> {
527        let mut last_assigned_field_id = last_assigned_field_id;
528        // Already assigned partition ids. If we see one of these during iteration,
529        // we skip it.
530        let assigned_ids = fields
531            .iter()
532            .filter_map(|f| f.field_id)
533            .collect::<std::collections::HashSet<_>>();
534
535        fn _check_add_1(prev: i32) -> Result<i32> {
536            prev.checked_add(1).ok_or_else(|| {
537                Error::new(
538                    ErrorKind::DataInvalid,
539                    "Cannot assign more partition ids. Overflow.",
540                )
541            })
542        }
543
544        let mut bound_fields = Vec::with_capacity(fields.len());
545        for field in fields.into_iter() {
546            let partition_field_id = if let Some(partition_field_id) = field.field_id {
547                last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
548                partition_field_id
549            } else {
550                last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
551                while assigned_ids.contains(&last_assigned_field_id) {
552                    last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
553                }
554                last_assigned_field_id
555            };
556
557            bound_fields.push(PartitionField {
558                source_id: field.source_id,
559                field_id: partition_field_id,
560                name: field.name,
561                transform: field.transform,
562            })
563        }
564
565        Ok(bound_fields)
566    }
567
568    /// Returns the partition type of this partition spec.
569    fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> Result<StructType> {
570        let mut struct_fields = Vec::with_capacity(fields.len());
571        for partition_field in fields {
572            let field = schema
573                .field_by_id(partition_field.source_id)
574                .ok_or_else(|| {
575                    Error::new(
576                        // This should never occur as check_transform_compatibility
577                        // already ensures that the source field exists in the schema
578                        ErrorKind::Unexpected,
579                        format!(
580                            "No column with source column id {} in schema {:?}",
581                            partition_field.source_id, schema
582                        ),
583                    )
584                })?;
585            let res_type = partition_field.transform.result_type(&field.field_type)?;
586            let field =
587                NestedField::optional(partition_field.field_id, &partition_field.name, res_type)
588                    .into();
589            struct_fields.push(field);
590        }
591        Ok(StructType::new(struct_fields))
592    }
593
594    /// Ensure that the partition name is unique among columns in the schema.
595    /// Duplicate names are allowed if:
596    /// 1. The column is sourced from the column with the same name.
597    /// 2. AND the transformation is identity
598    fn check_name_does_not_collide_with_schema(
599        field: &UnboundPartitionField,
600        schema: &Schema,
601    ) -> Result<()> {
602        match schema.field_by_name(field.name.as_str()) {
603            Some(schema_collision) => {
604                if field.transform == Transform::Identity {
605                    if schema_collision.id == field.source_id {
606                        Ok(())
607                    } else {
608                        Err(Error::new(
609                            ErrorKind::DataInvalid,
610                            format!(
611                                "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`",
612                                field.name, schema_collision.id, field.source_id
613                            ),
614                        ))
615                    }
616                } else {
617                    Err(Error::new(
618                        ErrorKind::DataInvalid,
619                        format!(
620                            "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.",
621                            field.name
622                        ),
623                    ))
624                }
625            }
626            None => Ok(()),
627        }
628    }
629
630    /// Ensure that the transformation of the field is compatible with type of the field
631    /// in the schema. Implicitly also checks if the source field exists in the schema.
632    fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> {
633        let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
634            Error::new(
635                ErrorKind::DataInvalid,
636                format!(
637                    "Cannot find partition source field with id `{}` in schema",
638                    field.source_id
639                ),
640            )
641        })?;
642
643        if field.transform != Transform::Void {
644            if !schema_field.field_type.is_primitive() {
645                return Err(Error::new(
646                    ErrorKind::DataInvalid,
647                    format!(
648                        "Cannot partition by non-primitive source field: '{}'.",
649                        schema_field.field_type
650                    ),
651                ));
652            }
653
654            if field
655                .transform
656                .result_type(&schema_field.field_type)
657                .is_err()
658            {
659                return Err(Error::new(
660                    ErrorKind::DataInvalid,
661                    format!(
662                        "Invalid source type: '{}' for transform: '{}'.",
663                        schema_field.field_type,
664                        field.transform.dedup_name()
665                    ),
666                ));
667            }
668        }
669
670        Ok(())
671    }
672}
673
674/// Contains checks that are common to both PartitionSpecBuilder and UnboundPartitionSpecBuilder
675trait CorePartitionSpecValidator {
676    /// Ensure that the partition name is unique among the partition fields and is not empty.
677    fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
678        if name.is_empty() {
679            return Err(Error::new(
680                ErrorKind::DataInvalid,
681                "Cannot use empty partition name",
682            ));
683        }
684
685        if self.fields().iter().any(|f| f.name == name) {
686            return Err(Error::new(
687                ErrorKind::DataInvalid,
688                format!("Cannot use partition name more than once: {name}"),
689            ));
690        }
691        Ok(())
692    }
693
694    /// For a single source-column transformations must be unique.
695    fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
696        let collision = self.fields().iter().find(|f| {
697            f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
698        });
699
700        if let Some(collision) = collision {
701            Err(Error::new(
702                ErrorKind::DataInvalid,
703                format!(
704                    "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
705                    source_id,
706                    transform.dedup_name(),
707                    collision.name
708                ),
709            ))
710        } else {
711            Ok(())
712        }
713    }
714
715    /// Check field / partition_id unique within the partition spec if set
716    fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
717        if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
718            return Err(Error::new(
719                ErrorKind::DataInvalid,
720                format!("Cannot use field id more than once in one PartitionSpec: {field_id}"),
721            ));
722        }
723
724        Ok(())
725    }
726
727    fn fields(&self) -> &Vec<UnboundPartitionField>;
728}
729
730impl CorePartitionSpecValidator for PartitionSpecBuilder {
731    fn fields(&self) -> &Vec<UnboundPartitionField> {
732        &self.fields
733    }
734}
735
736impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
737    fn fields(&self) -> &Vec<UnboundPartitionField> {
738        &self.fields
739    }
740}
741
742#[cfg(test)]
743mod tests {
744    use super::*;
745    use crate::spec::{Literal, PrimitiveType, Type};
746
747    #[test]
748    fn test_partition_spec() {
749        let spec = r#"
750        {
751        "spec-id": 1,
752        "fields": [ {
753            "source-id": 4,
754            "field-id": 1000,
755            "name": "ts_day",
756            "transform": "day"
757            }, {
758            "source-id": 1,
759            "field-id": 1001,
760            "name": "id_bucket",
761            "transform": "bucket[16]"
762            }, {
763            "source-id": 2,
764            "field-id": 1002,
765            "name": "id_truncate",
766            "transform": "truncate[4]"
767            } ]
768        }
769        "#;
770
771        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
772        assert_eq!(4, partition_spec.fields[0].source_id);
773        assert_eq!(1000, partition_spec.fields[0].field_id);
774        assert_eq!("ts_day", partition_spec.fields[0].name);
775        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
776
777        assert_eq!(1, partition_spec.fields[1].source_id);
778        assert_eq!(1001, partition_spec.fields[1].field_id);
779        assert_eq!("id_bucket", partition_spec.fields[1].name);
780        assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
781
782        assert_eq!(2, partition_spec.fields[2].source_id);
783        assert_eq!(1002, partition_spec.fields[2].field_id);
784        assert_eq!("id_truncate", partition_spec.fields[2].name);
785        assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
786    }
787
788    #[test]
789    fn test_is_unpartitioned() {
790        let schema = Schema::builder()
791            .with_fields(vec![
792                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
793                    .into(),
794                NestedField::required(
795                    2,
796                    "name",
797                    Type::Primitive(crate::spec::PrimitiveType::String),
798                )
799                .into(),
800            ])
801            .build()
802            .unwrap();
803        let partition_spec = PartitionSpec::builder(schema.clone())
804            .with_spec_id(1)
805            .build()
806            .unwrap();
807        assert!(
808            partition_spec.is_unpartitioned(),
809            "Empty partition spec should be unpartitioned"
810        );
811
812        let partition_spec = PartitionSpec::builder(schema.clone())
813            .add_unbound_fields(vec![
814                UnboundPartitionField::builder()
815                    .source_id(1)
816                    .name("id".to_string())
817                    .transform(Transform::Identity)
818                    .build(),
819                UnboundPartitionField::builder()
820                    .source_id(2)
821                    .name("name_string".to_string())
822                    .transform(Transform::Void)
823                    .build(),
824            ])
825            .unwrap()
826            .with_spec_id(1)
827            .build()
828            .unwrap();
829        assert!(
830            !partition_spec.is_unpartitioned(),
831            "Partition spec with one non void transform should not be unpartitioned"
832        );
833
834        let partition_spec = PartitionSpec::builder(schema.clone())
835            .with_spec_id(1)
836            .add_unbound_fields(vec![
837                UnboundPartitionField::builder()
838                    .source_id(1)
839                    .name("id_void".to_string())
840                    .transform(Transform::Void)
841                    .build(),
842                UnboundPartitionField::builder()
843                    .source_id(2)
844                    .name("name_void".to_string())
845                    .transform(Transform::Void)
846                    .build(),
847            ])
848            .unwrap()
849            .build()
850            .unwrap();
851        assert!(
852            partition_spec.is_unpartitioned(),
853            "Partition spec with all void field should be unpartitioned"
854        );
855    }
856
857    #[test]
858    fn test_unbound_partition_spec() {
859        let spec = r#"
860		{
861		"spec-id": 1,
862		"fields": [ {
863			"source-id": 4,
864			"field-id": 1000,
865			"name": "ts_day",
866			"transform": "day"
867			}, {
868			"source-id": 1,
869			"field-id": 1001,
870			"name": "id_bucket",
871			"transform": "bucket[16]"
872			}, {
873			"source-id": 2,
874			"field-id": 1002,
875			"name": "id_truncate",
876			"transform": "truncate[4]"
877			} ]
878		}
879		"#;
880
881        let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
882        assert_eq!(Some(1), partition_spec.spec_id);
883
884        assert_eq!(4, partition_spec.fields[0].source_id);
885        assert_eq!(Some(1000), partition_spec.fields[0].field_id);
886        assert_eq!("ts_day", partition_spec.fields[0].name);
887        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
888
889        assert_eq!(1, partition_spec.fields[1].source_id);
890        assert_eq!(Some(1001), partition_spec.fields[1].field_id);
891        assert_eq!("id_bucket", partition_spec.fields[1].name);
892        assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
893
894        assert_eq!(2, partition_spec.fields[2].source_id);
895        assert_eq!(Some(1002), partition_spec.fields[2].field_id);
896        assert_eq!("id_truncate", partition_spec.fields[2].name);
897        assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
898
899        let spec = r#"
900		{
901		"fields": [ {
902			"source-id": 4,
903			"name": "ts_day",
904			"transform": "day"
905			} ]
906		}
907		"#;
908        let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
909        assert_eq!(None, partition_spec.spec_id);
910
911        assert_eq!(4, partition_spec.fields[0].source_id);
912        assert_eq!(None, partition_spec.fields[0].field_id);
913        assert_eq!("ts_day", partition_spec.fields[0].name);
914        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
915    }
916
917    #[test]
918    fn test_unbound_partition_spec_serialization_skips_none_fields() {
919        let spec = UnboundPartitionSpec::builder()
920            .add_partition_field(4, "ts_day".to_string(), Transform::Day)
921            .unwrap()
922            .build();
923
924        let value = serde_json::to_value(&spec).unwrap();
925        let object = value.as_object().unwrap();
926        assert!(!object.contains_key("spec-id"));
927        let field = object["fields"][0].as_object().unwrap();
928        assert!(!field.contains_key("field-id"));
929
930        let value = serde_json::to_value(spec.with_spec_id(1)).unwrap();
931        let object = value.as_object().unwrap();
932        assert_eq!(Some(&serde_json::json!(1)), object.get("spec-id"));
933
934        // Explicit nulls must still deserialize to `None` for backwards
935        // compatibility.
936        let spec: UnboundPartitionSpec = serde_json::from_str(
937            r#"{
938                "spec-id": null,
939                "fields": [
940                    {"source-id": 4, "name": "ts_day", "transform": "day", "field-id": null}
941                ]
942            }"#,
943        )
944        .unwrap();
945        assert_eq!(None, spec.spec_id);
946        assert_eq!(None, spec.fields[0].field_id);
947    }
948
949    #[test]
950    fn test_new_unpartition() {
951        let schema = Schema::builder()
952            .with_fields(vec![
953                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
954                    .into(),
955                NestedField::required(
956                    2,
957                    "name",
958                    Type::Primitive(crate::spec::PrimitiveType::String),
959                )
960                .into(),
961            ])
962            .build()
963            .unwrap();
964        let partition_spec = PartitionSpec::builder(schema.clone())
965            .with_spec_id(0)
966            .build()
967            .unwrap();
968        let partition_type = partition_spec.partition_type(&schema).unwrap();
969        assert_eq!(0, partition_type.fields().len());
970
971        let unpartition_spec = PartitionSpec::unpartition_spec();
972        assert_eq!(partition_spec, unpartition_spec);
973    }
974
975    #[test]
976    fn test_partition_type() {
977        let spec = r#"
978            {
979            "spec-id": 1,
980            "fields": [ {
981                "source-id": 4,
982                "field-id": 1000,
983                "name": "ts_day",
984                "transform": "day"
985                }, {
986                "source-id": 1,
987                "field-id": 1001,
988                "name": "id_bucket",
989                "transform": "bucket[16]"
990                }, {
991                "source-id": 2,
992                "field-id": 1002,
993                "name": "id_truncate",
994                "transform": "truncate[4]"
995                } ]
996            }
997            "#;
998
999        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1000        let schema = Schema::builder()
1001            .with_fields(vec![
1002                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1003                    .into(),
1004                NestedField::required(
1005                    2,
1006                    "name",
1007                    Type::Primitive(crate::spec::PrimitiveType::String),
1008                )
1009                .into(),
1010                NestedField::required(
1011                    3,
1012                    "ts",
1013                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1014                )
1015                .into(),
1016                NestedField::required(
1017                    4,
1018                    "ts_day",
1019                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1020                )
1021                .into(),
1022                NestedField::required(
1023                    5,
1024                    "id_bucket",
1025                    Type::Primitive(crate::spec::PrimitiveType::Int),
1026                )
1027                .into(),
1028                NestedField::required(
1029                    6,
1030                    "id_truncate",
1031                    Type::Primitive(crate::spec::PrimitiveType::Int),
1032                )
1033                .into(),
1034            ])
1035            .build()
1036            .unwrap();
1037
1038        let partition_type = partition_spec.partition_type(&schema).unwrap();
1039        assert_eq!(3, partition_type.fields().len());
1040        assert_eq!(
1041            *partition_type.fields()[0],
1042            NestedField::optional(
1043                partition_spec.fields[0].field_id,
1044                &partition_spec.fields[0].name,
1045                Type::Primitive(crate::spec::PrimitiveType::Date)
1046            )
1047        );
1048        assert_eq!(
1049            *partition_type.fields()[1],
1050            NestedField::optional(
1051                partition_spec.fields[1].field_id,
1052                &partition_spec.fields[1].name,
1053                Type::Primitive(crate::spec::PrimitiveType::Int)
1054            )
1055        );
1056        assert_eq!(
1057            *partition_type.fields()[2],
1058            NestedField::optional(
1059                partition_spec.fields[2].field_id,
1060                &partition_spec.fields[2].name,
1061                Type::Primitive(crate::spec::PrimitiveType::String)
1062            )
1063        );
1064    }
1065
1066    #[test]
1067    fn test_partition_empty() {
1068        let spec = r#"
1069            {
1070            "spec-id": 1,
1071            "fields": []
1072            }
1073            "#;
1074
1075        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1076        let schema = Schema::builder()
1077            .with_fields(vec![
1078                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1079                    .into(),
1080                NestedField::required(
1081                    2,
1082                    "name",
1083                    Type::Primitive(crate::spec::PrimitiveType::String),
1084                )
1085                .into(),
1086                NestedField::required(
1087                    3,
1088                    "ts",
1089                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1090                )
1091                .into(),
1092                NestedField::required(
1093                    4,
1094                    "ts_day",
1095                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1096                )
1097                .into(),
1098                NestedField::required(
1099                    5,
1100                    "id_bucket",
1101                    Type::Primitive(crate::spec::PrimitiveType::Int),
1102                )
1103                .into(),
1104                NestedField::required(
1105                    6,
1106                    "id_truncate",
1107                    Type::Primitive(crate::spec::PrimitiveType::Int),
1108                )
1109                .into(),
1110            ])
1111            .build()
1112            .unwrap();
1113
1114        let partition_type = partition_spec.partition_type(&schema).unwrap();
1115        assert_eq!(0, partition_type.fields().len());
1116    }
1117
1118    #[test]
1119    fn test_partition_error() {
1120        let spec = r#"
1121        {
1122        "spec-id": 1,
1123        "fields": [ {
1124            "source-id": 4,
1125            "field-id": 1000,
1126            "name": "ts_day",
1127            "transform": "day"
1128            }, {
1129            "source-id": 1,
1130            "field-id": 1001,
1131            "name": "id_bucket",
1132            "transform": "bucket[16]"
1133            }, {
1134            "source-id": 2,
1135            "field-id": 1002,
1136            "name": "id_truncate",
1137            "transform": "truncate[4]"
1138            } ]
1139        }
1140        "#;
1141
1142        let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
1143        let schema = Schema::builder()
1144            .with_fields(vec![
1145                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1146                    .into(),
1147                NestedField::required(
1148                    2,
1149                    "name",
1150                    Type::Primitive(crate::spec::PrimitiveType::String),
1151                )
1152                .into(),
1153            ])
1154            .build()
1155            .unwrap();
1156
1157        assert!(partition_spec.partition_type(&schema).is_err());
1158    }
1159
1160    #[test]
1161    fn test_builder_disallow_duplicate_names() {
1162        UnboundPartitionSpec::builder()
1163            .add_partition_field(1, "ts_day".to_string(), Transform::Day)
1164            .unwrap()
1165            .add_partition_field(2, "ts_day".to_string(), Transform::Day)
1166            .unwrap_err();
1167    }
1168
1169    #[test]
1170    fn test_builder_disallow_duplicate_field_ids() {
1171        let schema = Schema::builder()
1172            .with_fields(vec![
1173                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1174                    .into(),
1175                NestedField::required(
1176                    2,
1177                    "name",
1178                    Type::Primitive(crate::spec::PrimitiveType::String),
1179                )
1180                .into(),
1181            ])
1182            .build()
1183            .unwrap();
1184        PartitionSpec::builder(schema.clone())
1185            .add_unbound_field(UnboundPartitionField {
1186                source_id: 1,
1187                field_id: Some(1000),
1188                name: "id".to_string(),
1189                transform: Transform::Identity,
1190            })
1191            .unwrap()
1192            .add_unbound_field(UnboundPartitionField {
1193                source_id: 2,
1194                field_id: Some(1000),
1195                name: "id_bucket".to_string(),
1196                transform: Transform::Bucket(16),
1197            })
1198            .unwrap_err();
1199    }
1200
1201    #[test]
1202    fn test_builder_auto_assign_field_ids() {
1203        let schema = Schema::builder()
1204            .with_fields(vec![
1205                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1206                    .into(),
1207                NestedField::required(
1208                    2,
1209                    "name",
1210                    Type::Primitive(crate::spec::PrimitiveType::String),
1211                )
1212                .into(),
1213                NestedField::required(
1214                    3,
1215                    "ts",
1216                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1217                )
1218                .into(),
1219            ])
1220            .build()
1221            .unwrap();
1222        let spec = PartitionSpec::builder(schema.clone())
1223            .with_spec_id(1)
1224            .add_unbound_field(UnboundPartitionField {
1225                source_id: 1,
1226                name: "id".to_string(),
1227                transform: Transform::Identity,
1228                field_id: Some(1012),
1229            })
1230            .unwrap()
1231            .add_unbound_field(UnboundPartitionField {
1232                source_id: 2,
1233                name: "name_void".to_string(),
1234                transform: Transform::Void,
1235                field_id: None,
1236            })
1237            .unwrap()
1238            // Should keep its ID even if its lower
1239            .add_unbound_field(UnboundPartitionField {
1240                source_id: 3,
1241                name: "year".to_string(),
1242                transform: Transform::Year,
1243                field_id: Some(1),
1244            })
1245            .unwrap()
1246            .build()
1247            .unwrap();
1248
1249        assert_eq!(1012, spec.fields[0].field_id);
1250        assert_eq!(1013, spec.fields[1].field_id);
1251        assert_eq!(1, spec.fields[2].field_id);
1252    }
1253
1254    #[test]
1255    fn test_builder_valid_schema() {
1256        let schema = Schema::builder()
1257            .with_fields(vec![
1258                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1259                    .into(),
1260                NestedField::required(
1261                    2,
1262                    "name",
1263                    Type::Primitive(crate::spec::PrimitiveType::String),
1264                )
1265                .into(),
1266            ])
1267            .build()
1268            .unwrap();
1269
1270        PartitionSpec::builder(schema.clone())
1271            .with_spec_id(1)
1272            .build()
1273            .unwrap();
1274
1275        let spec = PartitionSpec::builder(schema.clone())
1276            .with_spec_id(1)
1277            .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
1278            .unwrap()
1279            .build()
1280            .unwrap();
1281
1282        assert_eq!(spec, PartitionSpec {
1283            spec_id: 1,
1284            fields: vec![PartitionField {
1285                source_id: 1,
1286                field_id: 1000,
1287                name: "id_bucket[16]".to_string(),
1288                transform: Transform::Bucket(16),
1289            }],
1290        });
1291        assert_eq!(
1292            spec.partition_type(&schema).unwrap(),
1293            StructType::new(vec![
1294                NestedField::optional(1000, "id_bucket[16]", Type::Primitive(PrimitiveType::Int))
1295                    .into()
1296            ])
1297        )
1298    }
1299
1300    #[test]
1301    fn test_collision_with_schema_name() {
1302        let schema = Schema::builder()
1303            .with_fields(vec![
1304                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1305                    .into(),
1306            ])
1307            .build()
1308            .unwrap();
1309
1310        PartitionSpec::builder(schema.clone())
1311            .with_spec_id(1)
1312            .build()
1313            .unwrap();
1314
1315        let err = PartitionSpec::builder(schema)
1316            .with_spec_id(1)
1317            .add_unbound_field(UnboundPartitionField {
1318                source_id: 1,
1319                field_id: None,
1320                name: "id".to_string(),
1321                transform: Transform::Bucket(16),
1322            })
1323            .unwrap_err();
1324        assert!(err.message().contains("conflicts with schema"))
1325    }
1326
1327    #[test]
1328    fn test_builder_collision_is_ok_for_identity_transforms() {
1329        let schema = Schema::builder()
1330            .with_fields(vec![
1331                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1332                    .into(),
1333                NestedField::required(
1334                    2,
1335                    "number",
1336                    Type::Primitive(crate::spec::PrimitiveType::Int),
1337                )
1338                .into(),
1339            ])
1340            .build()
1341            .unwrap();
1342
1343        PartitionSpec::builder(schema.clone())
1344            .with_spec_id(1)
1345            .build()
1346            .unwrap();
1347
1348        PartitionSpec::builder(schema.clone())
1349            .with_spec_id(1)
1350            .add_unbound_field(UnboundPartitionField {
1351                source_id: 1,
1352                field_id: None,
1353                name: "id".to_string(),
1354                transform: Transform::Identity,
1355            })
1356            .unwrap()
1357            .build()
1358            .unwrap();
1359
1360        // Not OK for different source id
1361        PartitionSpec::builder(schema)
1362            .with_spec_id(1)
1363            .add_unbound_field(UnboundPartitionField {
1364                source_id: 2,
1365                field_id: None,
1366                name: "id".to_string(),
1367                transform: Transform::Identity,
1368            })
1369            .unwrap_err();
1370    }
1371
1372    #[test]
1373    fn test_builder_all_source_ids_must_exist() {
1374        let schema = Schema::builder()
1375            .with_fields(vec![
1376                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1377                    .into(),
1378                NestedField::required(
1379                    2,
1380                    "name",
1381                    Type::Primitive(crate::spec::PrimitiveType::String),
1382                )
1383                .into(),
1384                NestedField::required(
1385                    3,
1386                    "ts",
1387                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
1388                )
1389                .into(),
1390            ])
1391            .build()
1392            .unwrap();
1393
1394        // Valid
1395        PartitionSpec::builder(schema.clone())
1396            .with_spec_id(1)
1397            .add_unbound_fields(vec![
1398                UnboundPartitionField {
1399                    source_id: 1,
1400                    field_id: None,
1401                    name: "id_bucket".to_string(),
1402                    transform: Transform::Bucket(16),
1403                },
1404                UnboundPartitionField {
1405                    source_id: 2,
1406                    field_id: None,
1407                    name: "name".to_string(),
1408                    transform: Transform::Identity,
1409                },
1410            ])
1411            .unwrap()
1412            .build()
1413            .unwrap();
1414
1415        // Invalid
1416        PartitionSpec::builder(schema)
1417            .with_spec_id(1)
1418            .add_unbound_fields(vec![
1419                UnboundPartitionField {
1420                    source_id: 1,
1421                    field_id: None,
1422                    name: "id_bucket".to_string(),
1423                    transform: Transform::Bucket(16),
1424                },
1425                UnboundPartitionField {
1426                    source_id: 4,
1427                    field_id: None,
1428                    name: "name".to_string(),
1429                    transform: Transform::Identity,
1430                },
1431            ])
1432            .unwrap_err();
1433    }
1434
1435    #[test]
1436    fn test_builder_disallows_redundant() {
1437        let err = UnboundPartitionSpec::builder()
1438            .with_spec_id(1)
1439            .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16))
1440            .unwrap()
1441            .add_partition_field(
1442                1,
1443                "id_bucket_with_other_name".to_string(),
1444                Transform::Bucket(16),
1445            )
1446            .unwrap_err();
1447        assert!(err.message().contains("redundant partition"));
1448    }
1449
1450    #[test]
1451    fn test_builder_incompatible_transforms_disallowed() {
1452        let schema = Schema::builder()
1453            .with_fields(vec![
1454                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1455                    .into(),
1456            ])
1457            .build()
1458            .unwrap();
1459
1460        PartitionSpec::builder(schema)
1461            .with_spec_id(1)
1462            .add_unbound_field(UnboundPartitionField {
1463                source_id: 1,
1464                field_id: None,
1465                name: "id_year".to_string(),
1466                transform: Transform::Year,
1467            })
1468            .unwrap_err();
1469    }
1470
1471    #[test]
1472    fn test_build_unbound_specs_without_partition_id() {
1473        let spec = UnboundPartitionSpec::builder()
1474            .with_spec_id(1)
1475            .add_partition_fields(vec![UnboundPartitionField {
1476                source_id: 1,
1477                field_id: None,
1478                name: "id_bucket[16]".to_string(),
1479                transform: Transform::Bucket(16),
1480            }])
1481            .unwrap()
1482            .build();
1483
1484        assert_eq!(spec, UnboundPartitionSpec {
1485            spec_id: Some(1),
1486            fields: vec![UnboundPartitionField {
1487                source_id: 1,
1488                field_id: None,
1489                name: "id_bucket[16]".to_string(),
1490                transform: Transform::Bucket(16),
1491            }]
1492        });
1493    }
1494
1495    #[test]
1496    fn test_is_compatible_with() {
1497        let schema = Schema::builder()
1498            .with_fields(vec![
1499                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1500                    .into(),
1501                NestedField::required(
1502                    2,
1503                    "name",
1504                    Type::Primitive(crate::spec::PrimitiveType::String),
1505                )
1506                .into(),
1507            ])
1508            .build()
1509            .unwrap();
1510
1511        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1512            .with_spec_id(1)
1513            .add_unbound_field(UnboundPartitionField {
1514                source_id: 1,
1515                field_id: None,
1516                name: "id_bucket".to_string(),
1517                transform: Transform::Bucket(16),
1518            })
1519            .unwrap()
1520            .build()
1521            .unwrap();
1522
1523        let partition_spec_2 = PartitionSpec::builder(schema)
1524            .with_spec_id(1)
1525            .add_unbound_field(UnboundPartitionField {
1526                source_id: 1,
1527                field_id: None,
1528                name: "id_bucket".to_string(),
1529                transform: Transform::Bucket(16),
1530            })
1531            .unwrap()
1532            .build()
1533            .unwrap();
1534
1535        assert!(partition_spec_1.is_compatible_with(&partition_spec_2));
1536    }
1537
1538    #[test]
1539    fn test_not_compatible_with_transform_different() {
1540        let schema = Schema::builder()
1541            .with_fields(vec![
1542                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1543                    .into(),
1544            ])
1545            .build()
1546            .unwrap();
1547
1548        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1549            .with_spec_id(1)
1550            .add_unbound_field(UnboundPartitionField {
1551                source_id: 1,
1552                field_id: None,
1553                name: "id_bucket".to_string(),
1554                transform: Transform::Bucket(16),
1555            })
1556            .unwrap()
1557            .build()
1558            .unwrap();
1559
1560        let partition_spec_2 = PartitionSpec::builder(schema)
1561            .with_spec_id(1)
1562            .add_unbound_field(UnboundPartitionField {
1563                source_id: 1,
1564                field_id: None,
1565                name: "id_bucket".to_string(),
1566                transform: Transform::Bucket(32),
1567            })
1568            .unwrap()
1569            .build()
1570            .unwrap();
1571
1572        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1573    }
1574
1575    #[test]
1576    fn test_not_compatible_with_source_id_different() {
1577        let schema = Schema::builder()
1578            .with_fields(vec![
1579                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1580                    .into(),
1581                NestedField::required(
1582                    2,
1583                    "name",
1584                    Type::Primitive(crate::spec::PrimitiveType::String),
1585                )
1586                .into(),
1587            ])
1588            .build()
1589            .unwrap();
1590
1591        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1592            .with_spec_id(1)
1593            .add_unbound_field(UnboundPartitionField {
1594                source_id: 1,
1595                field_id: None,
1596                name: "id_bucket".to_string(),
1597                transform: Transform::Bucket(16),
1598            })
1599            .unwrap()
1600            .build()
1601            .unwrap();
1602
1603        let partition_spec_2 = PartitionSpec::builder(schema)
1604            .with_spec_id(1)
1605            .add_unbound_field(UnboundPartitionField {
1606                source_id: 2,
1607                field_id: None,
1608                name: "id_bucket".to_string(),
1609                transform: Transform::Bucket(16),
1610            })
1611            .unwrap()
1612            .build()
1613            .unwrap();
1614
1615        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1616    }
1617
1618    #[test]
1619    fn test_not_compatible_with_order_different() {
1620        let schema = Schema::builder()
1621            .with_fields(vec![
1622                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1623                    .into(),
1624                NestedField::required(
1625                    2,
1626                    "name",
1627                    Type::Primitive(crate::spec::PrimitiveType::String),
1628                )
1629                .into(),
1630            ])
1631            .build()
1632            .unwrap();
1633
1634        let partition_spec_1 = PartitionSpec::builder(schema.clone())
1635            .with_spec_id(1)
1636            .add_unbound_field(UnboundPartitionField {
1637                source_id: 1,
1638                field_id: None,
1639                name: "id_bucket".to_string(),
1640                transform: Transform::Bucket(16),
1641            })
1642            .unwrap()
1643            .add_unbound_field(UnboundPartitionField {
1644                source_id: 2,
1645                field_id: None,
1646                name: "name".to_string(),
1647                transform: Transform::Identity,
1648            })
1649            .unwrap()
1650            .build()
1651            .unwrap();
1652
1653        let partition_spec_2 = PartitionSpec::builder(schema)
1654            .with_spec_id(1)
1655            .add_unbound_field(UnboundPartitionField {
1656                source_id: 2,
1657                field_id: None,
1658                name: "name".to_string(),
1659                transform: Transform::Identity,
1660            })
1661            .unwrap()
1662            .add_unbound_field(UnboundPartitionField {
1663                source_id: 1,
1664                field_id: None,
1665                name: "id_bucket".to_string(),
1666                transform: Transform::Bucket(16),
1667            })
1668            .unwrap()
1669            .build()
1670            .unwrap();
1671
1672        assert!(!partition_spec_1.is_compatible_with(&partition_spec_2));
1673    }
1674
1675    #[test]
1676    fn test_highest_field_id_unpartitioned() {
1677        let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
1678            .with_spec_id(1)
1679            .build()
1680            .unwrap();
1681
1682        assert!(spec.highest_field_id().is_none());
1683    }
1684
1685    #[test]
1686    fn test_highest_field_id() {
1687        let schema = Schema::builder()
1688            .with_fields(vec![
1689                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1690                    .into(),
1691                NestedField::required(
1692                    2,
1693                    "name",
1694                    Type::Primitive(crate::spec::PrimitiveType::String),
1695                )
1696                .into(),
1697            ])
1698            .build()
1699            .unwrap();
1700
1701        let spec = PartitionSpec::builder(schema)
1702            .with_spec_id(1)
1703            .add_unbound_field(UnboundPartitionField {
1704                source_id: 1,
1705                field_id: Some(1001),
1706                name: "id".to_string(),
1707                transform: Transform::Identity,
1708            })
1709            .unwrap()
1710            .add_unbound_field(UnboundPartitionField {
1711                source_id: 2,
1712                field_id: Some(1000),
1713                name: "name".to_string(),
1714                transform: Transform::Identity,
1715            })
1716            .unwrap()
1717            .build()
1718            .unwrap();
1719
1720        assert_eq!(Some(1001), spec.highest_field_id());
1721    }
1722
1723    #[test]
1724    fn test_has_sequential_ids() {
1725        let schema = Schema::builder()
1726            .with_fields(vec![
1727                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1728                    .into(),
1729                NestedField::required(
1730                    2,
1731                    "name",
1732                    Type::Primitive(crate::spec::PrimitiveType::String),
1733                )
1734                .into(),
1735            ])
1736            .build()
1737            .unwrap();
1738
1739        let spec = PartitionSpec::builder(schema)
1740            .with_spec_id(1)
1741            .add_unbound_field(UnboundPartitionField {
1742                source_id: 1,
1743                field_id: Some(1000),
1744                name: "id".to_string(),
1745                transform: Transform::Identity,
1746            })
1747            .unwrap()
1748            .add_unbound_field(UnboundPartitionField {
1749                source_id: 2,
1750                field_id: Some(1001),
1751                name: "name".to_string(),
1752                transform: Transform::Identity,
1753            })
1754            .unwrap()
1755            .build()
1756            .unwrap();
1757
1758        assert_eq!(1000, spec.fields[0].field_id);
1759        assert_eq!(1001, spec.fields[1].field_id);
1760        assert!(spec.has_sequential_ids());
1761    }
1762
1763    #[test]
1764    fn test_sequential_ids_must_start_at_1000() {
1765        let schema = Schema::builder()
1766            .with_fields(vec![
1767                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1768                    .into(),
1769                NestedField::required(
1770                    2,
1771                    "name",
1772                    Type::Primitive(crate::spec::PrimitiveType::String),
1773                )
1774                .into(),
1775            ])
1776            .build()
1777            .unwrap();
1778
1779        let spec = PartitionSpec::builder(schema)
1780            .with_spec_id(1)
1781            .add_unbound_field(UnboundPartitionField {
1782                source_id: 1,
1783                field_id: Some(999),
1784                name: "id".to_string(),
1785                transform: Transform::Identity,
1786            })
1787            .unwrap()
1788            .add_unbound_field(UnboundPartitionField {
1789                source_id: 2,
1790                field_id: Some(1000),
1791                name: "name".to_string(),
1792                transform: Transform::Identity,
1793            })
1794            .unwrap()
1795            .build()
1796            .unwrap();
1797
1798        assert_eq!(999, spec.fields[0].field_id);
1799        assert_eq!(1000, spec.fields[1].field_id);
1800        assert!(!spec.has_sequential_ids());
1801    }
1802
1803    #[test]
1804    fn test_sequential_ids_must_have_no_gaps() {
1805        let schema = Schema::builder()
1806            .with_fields(vec![
1807                NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
1808                    .into(),
1809                NestedField::required(
1810                    2,
1811                    "name",
1812                    Type::Primitive(crate::spec::PrimitiveType::String),
1813                )
1814                .into(),
1815            ])
1816            .build()
1817            .unwrap();
1818
1819        let spec = PartitionSpec::builder(schema)
1820            .with_spec_id(1)
1821            .add_unbound_field(UnboundPartitionField {
1822                source_id: 1,
1823                field_id: Some(1000),
1824                name: "id".to_string(),
1825                transform: Transform::Identity,
1826            })
1827            .unwrap()
1828            .add_unbound_field(UnboundPartitionField {
1829                source_id: 2,
1830                field_id: Some(1002),
1831                name: "name".to_string(),
1832                transform: Transform::Identity,
1833            })
1834            .unwrap()
1835            .build()
1836            .unwrap();
1837
1838        assert_eq!(1000, spec.fields[0].field_id);
1839        assert_eq!(1002, spec.fields[1].field_id);
1840        assert!(!spec.has_sequential_ids());
1841    }
1842
1843    #[test]
1844    fn test_partition_to_path() {
1845        let schema = Schema::builder()
1846            .with_fields(vec![
1847                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1848                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1849                NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
1850                    .into(),
1851                NestedField::required(4, "empty", Type::Primitive(PrimitiveType::String)).into(),
1852            ])
1853            .build()
1854            .unwrap();
1855
1856        let spec = PartitionSpec::builder(schema.clone())
1857            .add_partition_field("id", "id", Transform::Identity)
1858            .unwrap()
1859            .add_partition_field("name", "name", Transform::Identity)
1860            .unwrap()
1861            .add_partition_field("timestamp", "ts_hour", Transform::Hour)
1862            .unwrap()
1863            .add_partition_field("empty", "empty_void", Transform::Void)
1864            .unwrap()
1865            .build()
1866            .unwrap();
1867
1868        let data = Struct::from_iter([
1869            Some(Literal::int(42)),
1870            Some(Literal::string("alice")),
1871            Some(Literal::int(1000)),
1872            Some(Literal::string("empty")),
1873        ]);
1874
1875        assert_eq!(
1876            spec.partition_to_path(&data, schema.into()),
1877            "id=42/name=alice/ts_hour=1000/empty_void=null"
1878        );
1879    }
1880}