use std::sync::Arc;
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use super::transform::Transform;
use super::{NestedField, Schema, SchemaRef, StructType};
use crate::{Error, ErrorKind, Result};
pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
pub type BoundPartitionSpecRef = Arc<BoundPartitionSpec>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
pub struct PartitionField {
pub source_id: i32,
pub field_id: i32,
pub name: String,
pub transform: Transform,
}
impl PartitionField {
pub fn into_unbound(self) -> UnboundPartitionField {
self.into()
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BoundPartitionSpec {
spec_id: i32,
fields: Vec<PartitionField>,
schema: SchemaRef,
partition_type: StructType,
}
pub type SchemalessPartitionSpecRef = Arc<SchemalessPartitionSpec>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct SchemalessPartitionSpec {
spec_id: i32,
fields: Vec<PartitionField>,
}
impl BoundPartitionSpec {
pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
PartitionSpecBuilder::new(schema)
}
pub fn unpartition_spec(schema: impl Into<SchemaRef>) -> Self {
Self {
spec_id: DEFAULT_PARTITION_SPEC_ID,
fields: vec![],
schema: schema.into(),
partition_type: StructType::new(vec![]),
}
}
pub fn spec_id(&self) -> i32 {
self.spec_id
}
pub fn fields(&self) -> &[PartitionField] {
&self.fields
}
pub fn schema(&self) -> &Schema {
&self.schema
}
pub fn schema_ref(&self) -> &SchemaRef {
&self.schema
}
pub fn is_unpartitioned(&self) -> bool {
self.fields.is_empty() || self.fields.iter().all(|f| f.transform == Transform::Void)
}
pub fn into_unbound(self) -> UnboundPartitionSpec {
self.into()
}
pub fn into_schemaless(self) -> SchemalessPartitionSpec {
self.into()
}
pub fn has_sequential_ids(&self) -> bool {
has_sequential_ids(self.fields.iter().map(|f| f.field_id))
}
pub fn highest_field_id(&self) -> Option<i32> {
self.fields.iter().map(|f| f.field_id).max()
}
pub fn partition_type(&self) -> &StructType {
&self.partition_type
}
pub fn is_compatible_with_schemaless(&self, other: &SchemalessPartitionSpec) -> bool {
if self.fields.len() != other.fields.len() {
return false;
}
for (this_field, other_field) in self.fields.iter().zip(other.fields.iter()) {
if this_field.source_id != other_field.source_id
|| this_field.name != other_field.name
|| this_field.transform != other_field.transform
{
return false;
}
}
true
}
}
impl SchemalessPartitionSpec {
pub fn fields(&self) -> &[PartitionField] {
&self.fields
}
pub fn spec_id(&self) -> i32 {
self.spec_id
}
pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<BoundPartitionSpec> {
PartitionSpecBuilder::new_from_unbound(self.into_unbound(), schema)?.build()
}
pub fn unpartition_spec() -> Self {
Self {
spec_id: DEFAULT_PARTITION_SPEC_ID,
fields: vec![],
}
}
pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
PartitionSpecBuilder::partition_type(&self.fields, schema)
}
pub fn into_unbound(self) -> UnboundPartitionSpec {
self.into()
}
}
pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
pub struct UnboundPartitionField {
pub source_id: i32,
#[builder(default, setter(strip_option))]
pub field_id: Option<i32>,
pub name: String,
pub transform: Transform,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
#[serde(rename_all = "kebab-case")]
pub struct UnboundPartitionSpec {
pub(crate) spec_id: Option<i32>,
pub(crate) fields: Vec<UnboundPartitionField>,
}
impl UnboundPartitionSpec {
pub fn builder() -> UnboundPartitionSpecBuilder {
UnboundPartitionSpecBuilder::default()
}
pub fn bind(self, schema: impl Into<SchemaRef>) -> Result<BoundPartitionSpec> {
PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
}
pub fn spec_id(&self) -> Option<i32> {
self.spec_id
}
pub fn fields(&self) -> &[UnboundPartitionField] {
&self.fields
}
pub fn with_spec_id(self, spec_id: i32) -> Self {
Self {
spec_id: Some(spec_id),
..self
}
}
}
fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
for (index, field_id) in field_ids.enumerate() {
let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
.checked_add(1)
.and_then(|id| id.checked_add(index as i64))
.unwrap_or(i64::MAX);
if field_id as i64 != expected_id {
return false;
}
}
true
}
impl From<PartitionField> for UnboundPartitionField {
fn from(field: PartitionField) -> Self {
UnboundPartitionField {
source_id: field.source_id,
field_id: Some(field.field_id),
name: field.name,
transform: field.transform,
}
}
}
impl From<BoundPartitionSpec> for UnboundPartitionSpec {
fn from(spec: BoundPartitionSpec) -> Self {
UnboundPartitionSpec {
spec_id: Some(spec.spec_id),
fields: spec.fields.into_iter().map(Into::into).collect(),
}
}
}
impl From<SchemalessPartitionSpec> for UnboundPartitionSpec {
fn from(spec: SchemalessPartitionSpec) -> Self {
UnboundPartitionSpec {
spec_id: Some(spec.spec_id),
fields: spec.fields.into_iter().map(Into::into).collect(),
}
}
}
impl From<BoundPartitionSpec> for SchemalessPartitionSpec {
fn from(spec: BoundPartitionSpec) -> Self {
SchemalessPartitionSpec {
spec_id: spec.spec_id,
fields: spec.fields,
}
}
}
#[derive(Debug, Default)]
pub struct UnboundPartitionSpecBuilder {
spec_id: Option<i32>,
fields: Vec<UnboundPartitionField>,
}
impl UnboundPartitionSpecBuilder {
pub fn new() -> Self {
Self {
spec_id: None,
fields: vec![],
}
}
pub fn with_spec_id(mut self, spec_id: i32) -> Self {
self.spec_id = Some(spec_id);
self
}
pub fn add_partition_field(
self,
source_id: i32,
target_name: impl ToString,
transformation: Transform,
) -> Result<Self> {
let field = UnboundPartitionField {
source_id,
field_id: None,
name: target_name.to_string(),
transform: transformation,
};
self.add_partition_field_internal(field)
}
pub fn add_partition_fields(
self,
fields: impl IntoIterator<Item = UnboundPartitionField>,
) -> Result<Self> {
let mut builder = self;
for field in fields {
builder = builder.add_partition_field_internal(field)?;
}
Ok(builder)
}
fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
self.check_name_set_and_unique(&field.name)?;
self.check_for_redundant_partitions(field.source_id, &field.transform)?;
if let Some(partition_field_id) = field.field_id {
self.check_partition_id_unique(partition_field_id)?;
}
self.fields.push(field);
Ok(self)
}
pub fn build(self) -> UnboundPartitionSpec {
UnboundPartitionSpec {
spec_id: self.spec_id,
fields: self.fields,
}
}
}
#[derive(Debug)]
pub struct PartitionSpecBuilder {
spec_id: Option<i32>,
last_assigned_field_id: i32,
fields: Vec<UnboundPartitionField>,
schema: SchemaRef,
}
impl PartitionSpecBuilder {
pub fn new(schema: impl Into<SchemaRef>) -> Self {
Self {
spec_id: None,
fields: vec![],
last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
schema: schema.into(),
}
}
pub fn new_from_unbound(
unbound: UnboundPartitionSpec,
schema: impl Into<SchemaRef>,
) -> Result<Self> {
let mut builder =
Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
for field in unbound.fields {
builder = builder.add_unbound_field(field)?;
}
Ok(builder)
}
pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self {
self.last_assigned_field_id = last_assigned_field_id;
self
}
pub fn with_spec_id(mut self, spec_id: i32) -> Self {
self.spec_id = Some(spec_id);
self
}
pub fn add_partition_field(
self,
source_name: impl AsRef<str>,
target_name: impl Into<String>,
transform: Transform,
) -> Result<Self> {
let source_id = self
.schema
.field_by_name(source_name.as_ref())
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot find source column with name: {} in schema",
source_name.as_ref()
),
)
})?
.id;
let field = UnboundPartitionField {
source_id,
field_id: None,
name: target_name.into(),
transform,
};
self.add_unbound_field(field)
}
pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
self.check_name_set_and_unique(&field.name)?;
self.check_for_redundant_partitions(field.source_id, &field.transform)?;
Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
Self::check_transform_compatibility(&field, &self.schema)?;
if let Some(partition_field_id) = field.field_id {
self.check_partition_id_unique(partition_field_id)?;
}
self.fields.push(field);
Ok(self)
}
pub fn add_unbound_fields(
self,
fields: impl IntoIterator<Item = UnboundPartitionField>,
) -> Result<Self> {
let mut builder = self;
for field in fields {
builder = builder.add_unbound_field(field)?;
}
Ok(builder)
}
pub fn build(self) -> Result<BoundPartitionSpec> {
let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?;
let partition_type = Self::partition_type(&fields, &self.schema)?;
Ok(BoundPartitionSpec {
spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
fields,
partition_type,
schema: self.schema,
})
}
fn set_field_ids(
fields: Vec<UnboundPartitionField>,
last_assigned_field_id: i32,
) -> Result<Vec<PartitionField>> {
let mut last_assigned_field_id = last_assigned_field_id;
let assigned_ids = fields
.iter()
.filter_map(|f| f.field_id)
.collect::<std::collections::HashSet<_>>();
fn _check_add_1(prev: i32) -> Result<i32> {
prev.checked_add(1).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Cannot assign more partition ids. Overflow.",
)
})
}
let mut bound_fields = Vec::with_capacity(fields.len());
for field in fields.into_iter() {
let partition_field_id = if let Some(partition_field_id) = field.field_id {
last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
partition_field_id
} else {
last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
while assigned_ids.contains(&last_assigned_field_id) {
last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
}
last_assigned_field_id
};
bound_fields.push(PartitionField {
source_id: field.source_id,
field_id: partition_field_id,
name: field.name,
transform: field.transform,
})
}
Ok(bound_fields)
}
fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> Result<StructType> {
let mut struct_fields = Vec::with_capacity(fields.len());
for partition_field in fields {
let field = schema
.field_by_id(partition_field.source_id)
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!(
"No column with source column id {} in schema {:?}",
partition_field.source_id, schema
),
)
})?;
let res_type = partition_field.transform.result_type(&field.field_type)?;
let field =
NestedField::optional(partition_field.field_id, &partition_field.name, res_type)
.into();
struct_fields.push(field);
}
Ok(StructType::new(struct_fields))
}
fn check_name_does_not_collide_with_schema(
field: &UnboundPartitionField,
schema: &Schema,
) -> Result<()> {
match schema.field_by_name(field.name.as_str()) {
Some(schema_collision) => {
if field.transform == Transform::Identity {
if schema_collision.id == field.source_id {
Ok(())
} else {
Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`",
field.name, schema_collision.id, field.source_id
),
))
}
} else {
Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.",
field.name
),
))
}
}
None => Ok(()),
}
}
fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> {
let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot find partition source field with id `{}` in schema",
field.source_id
),
)
})?;
if field.transform != Transform::Void {
if !schema_field.field_type.is_primitive() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot partition by non-primitive source field: '{}'.",
schema_field.field_type
),
));
}
if field
.transform
.result_type(&schema_field.field_type)
.is_err()
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid source type: '{}' for transform: '{}'.",
schema_field.field_type,
field.transform.dedup_name()
),
));
}
}
Ok(())
}
}
trait CorePartitionSpecValidator {
fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
if name.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Cannot use empty partition name",
));
}
if self.fields().iter().any(|f| f.name == name) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Cannot use partition name more than once: {}", name),
));
}
Ok(())
}
fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> {
let collision = self.fields().iter().find(|f| {
f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name()
});
if let Some(collision) = collision {
Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`",
source_id, transform.dedup_name(), collision.name
),
))
} else {
Ok(())
}
}
fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot use field id more than once in one PartitionSpec: {}",
field_id
),
));
}
Ok(())
}
fn fields(&self) -> &Vec<UnboundPartitionField>;
}
impl CorePartitionSpecValidator for PartitionSpecBuilder {
fn fields(&self) -> &Vec<UnboundPartitionField> {
&self.fields
}
}
impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
fn fields(&self) -> &Vec<UnboundPartitionField> {
&self.fields
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spec::{PrimitiveType, Type};
#[test]
fn test_partition_spec() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;
let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(1000, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(1001, partition_spec.fields[1].field_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(1002, partition_spec.fields[2].field_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
}
#[test]
fn test_is_unpartitioned() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
assert!(
partition_spec.is_unpartitioned(),
"Empty partition spec should be unpartitioned"
);
let partition_spec = BoundPartitionSpec::builder(schema.clone())
.add_unbound_fields(vec![
UnboundPartitionField::builder()
.source_id(1)
.name("id".to_string())
.transform(Transform::Identity)
.build(),
UnboundPartitionField::builder()
.source_id(2)
.name("name_string".to_string())
.transform(Transform::Void)
.build(),
])
.unwrap()
.with_spec_id(1)
.build()
.unwrap();
assert!(
!partition_spec.is_unpartitioned(),
"Partition spec with one non void transform should not be unpartitioned"
);
let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![
UnboundPartitionField::builder()
.source_id(1)
.name("id_void".to_string())
.transform(Transform::Void)
.build(),
UnboundPartitionField::builder()
.source_id(2)
.name("name_void".to_string())
.transform(Transform::Void)
.build(),
])
.unwrap()
.build()
.unwrap();
assert!(
partition_spec.is_unpartitioned(),
"Partition spec with all void field should be unpartitioned"
);
}
#[test]
fn test_unbound_partition_spec() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;
let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(Some(1), partition_spec.spec_id);
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(Some(1000), partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(Some(1001), partition_spec.fields[1].field_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(Some(1002), partition_spec.fields[2].field_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
let spec = r#"
{
"fields": [ {
"source-id": 4,
"name": "ts_day",
"transform": "day"
} ]
}
"#;
let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(None, partition_spec.spec_id);
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(None, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
}
#[test]
fn test_new_unpartition() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();
let partition_type = partition_spec.partition_type();
assert_eq!(0, partition_type.fields().len());
let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema);
assert_eq!(partition_spec, unpartition_spec);
}
#[test]
fn test_partition_type() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;
let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap();
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
NestedField::required(
3,
"ts",
Type::Primitive(crate::spec::PrimitiveType::Timestamp),
)
.into(),
NestedField::required(
4,
"ts_day",
Type::Primitive(crate::spec::PrimitiveType::Timestamp),
)
.into(),
NestedField::required(
5,
"id_bucket",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into(),
NestedField::required(
6,
"id_truncate",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into(),
])
.build()
.unwrap();
let partition_type = partition_spec.partition_type(&schema).unwrap();
assert_eq!(3, partition_type.fields().len());
assert_eq!(
*partition_type.fields()[0],
NestedField::optional(
partition_spec.fields[0].field_id,
&partition_spec.fields[0].name,
Type::Primitive(crate::spec::PrimitiveType::Date)
)
);
assert_eq!(
*partition_type.fields()[1],
NestedField::optional(
partition_spec.fields[1].field_id,
&partition_spec.fields[1].name,
Type::Primitive(crate::spec::PrimitiveType::Int)
)
);
assert_eq!(
*partition_type.fields()[2],
NestedField::optional(
partition_spec.fields[2].field_id,
&partition_spec.fields[2].name,
Type::Primitive(crate::spec::PrimitiveType::String)
)
);
}
#[test]
fn test_partition_empty() {
let spec = r#"
{
"spec-id": 1,
"fields": []
}
"#;
let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap();
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
NestedField::required(
3,
"ts",
Type::Primitive(crate::spec::PrimitiveType::Timestamp),
)
.into(),
NestedField::required(
4,
"ts_day",
Type::Primitive(crate::spec::PrimitiveType::Timestamp),
)
.into(),
NestedField::required(
5,
"id_bucket",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into(),
NestedField::required(
6,
"id_truncate",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into(),
])
.build()
.unwrap();
let partition_type = partition_spec.partition_type(&schema).unwrap();
assert_eq!(0, partition_type.fields().len());
}
#[test]
fn test_partition_error() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;
let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap();
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
assert!(partition_spec.partition_type(&schema).is_err());
}
#[test]
fn test_schemaless_bind_schema_keeps_field_ids_and_spec_id() {
let schema: Schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(99)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: Some(1010),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: Some(1001),
name: "name_void".to_string(),
transform: Transform::Void,
})
.unwrap()
.build()
.unwrap();
let schemaless_partition_spec = SchemalessPartitionSpec::from(partition_spec.clone());
let bound_partition_spec = schemaless_partition_spec.bind(schema).unwrap();
assert_eq!(partition_spec, bound_partition_spec);
assert_eq!(partition_spec.fields[0].field_id, 1010);
assert_eq!(partition_spec.fields[1].field_id, 1001);
assert_eq!(bound_partition_spec.spec_id(), 99);
}
#[test]
fn test_builder_disallow_duplicate_names() {
UnboundPartitionSpec::builder()
.add_partition_field(1, "ts_day".to_string(), Transform::Day)
.unwrap()
.add_partition_field(2, "ts_day".to_string(), Transform::Day)
.unwrap_err();
}
#[test]
fn test_builder_disallow_duplicate_field_ids() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
BoundPartitionSpec::builder(schema.clone())
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: Some(1000),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: Some(1000),
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap_err();
}
#[test]
fn test_builder_auto_assign_field_ids() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
NestedField::required(
3,
"ts",
Type::Primitive(crate::spec::PrimitiveType::Timestamp),
)
.into(),
])
.build()
.unwrap();
let spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
name: "id".to_string(),
transform: Transform::Identity,
field_id: Some(1012),
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
name: "name_void".to_string(),
transform: Transform::Void,
field_id: None,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 3,
name: "year".to_string(),
transform: Transform::Year,
field_id: Some(1),
})
.unwrap()
.build()
.unwrap();
assert_eq!(1012, spec.fields[0].field_id);
assert_eq!(1013, spec.fields[1].field_id);
assert_eq!(1, spec.fields[2].field_id);
}
#[test]
fn test_builder_valid_schema() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
let spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
.unwrap()
.build()
.unwrap();
assert_eq!(spec, BoundPartitionSpec {
spec_id: 1,
schema: schema.into(),
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
}],
partition_type: StructType::new(vec![NestedField::optional(
1000,
"id_bucket[16]",
Type::Primitive(PrimitiveType::Int)
)
.into()])
});
}
#[test]
fn test_collision_with_schema_name() {
let schema = Schema::builder()
.with_fields(vec![NestedField::required(
1,
"id",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into()])
.build()
.unwrap();
BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
let err = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id".to_string(),
transform: Transform::Bucket(16),
})
.unwrap_err();
assert!(err.message().contains("conflicts with schema"))
}
#[test]
fn test_builder_collision_is_ok_for_identity_transforms() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"number",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into(),
])
.build()
.unwrap();
BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.build()
.unwrap();
BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: None,
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap_err();
}
#[test]
fn test_builder_all_source_ids_must_exist() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
NestedField::required(
3,
"ts",
Type::Primitive(crate::spec::PrimitiveType::Timestamp),
)
.into(),
])
.build()
.unwrap();
BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![
UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
},
UnboundPartitionField {
source_id: 2,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
},
])
.unwrap()
.build()
.unwrap();
BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_fields(vec![
UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
},
UnboundPartitionField {
source_id: 4,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
},
])
.unwrap_err();
}
#[test]
fn test_builder_disallows_redundant() {
let err = UnboundPartitionSpec::builder()
.with_spec_id(1)
.add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16))
.unwrap()
.add_partition_field(
1,
"id_bucket_with_other_name".to_string(),
Transform::Bucket(16),
)
.unwrap_err();
assert!(err.message().contains("redundant partition"));
}
#[test]
fn test_builder_incompatible_transforms_disallowed() {
let schema = Schema::builder()
.with_fields(vec![NestedField::required(
1,
"id",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into()])
.build()
.unwrap();
BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_year".to_string(),
transform: Transform::Year,
})
.unwrap_err();
}
#[test]
fn test_build_unbound_specs_without_partition_id() {
let spec = UnboundPartitionSpec::builder()
.with_spec_id(1)
.add_partition_fields(vec![UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
}])
.unwrap()
.build();
assert_eq!(spec, UnboundPartitionSpec {
spec_id: Some(1),
fields: vec![UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
}]
});
}
#[test]
fn test_is_compatible_with() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap()
.build()
.unwrap();
let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap()
.build()
.unwrap();
assert!(partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless()));
}
#[test]
fn test_not_compatible_with_transform_different() {
let schema = Schema::builder()
.with_fields(vec![NestedField::required(
1,
"id",
Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into()])
.build()
.unwrap();
let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap()
.build()
.unwrap();
let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(32),
})
.unwrap()
.build()
.unwrap();
assert!(
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
);
}
#[test]
fn test_not_compatible_with_source_id_different() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap()
.build()
.unwrap();
let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap()
.build()
.unwrap();
assert!(
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
);
}
#[test]
fn test_not_compatible_with_order_different() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
})
.unwrap()
.build()
.unwrap();
let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
.unwrap()
.build()
.unwrap();
assert!(
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
);
}
#[test]
fn test_highest_field_id_unpartitioned() {
let spec =
BoundPartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
.with_spec_id(1)
.build()
.unwrap();
assert!(spec.highest_field_id().is_none());
}
#[test]
fn test_highest_field_id() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: Some(1001),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: Some(1000),
name: "name".to_string(),
transform: Transform::Identity,
})
.unwrap()
.build()
.unwrap();
assert_eq!(Some(1001), spec.highest_field_id());
}
#[test]
fn test_has_sequential_ids() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: Some(1000),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: Some(1001),
name: "name".to_string(),
transform: Transform::Identity,
})
.unwrap()
.build()
.unwrap();
assert_eq!(1000, spec.fields[0].field_id);
assert_eq!(1001, spec.fields[1].field_id);
assert!(spec.has_sequential_ids());
}
#[test]
fn test_sequential_ids_must_start_at_1000() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: Some(999),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: Some(1000),
name: "name".to_string(),
transform: Transform::Identity,
})
.unwrap()
.build()
.unwrap();
assert_eq!(999, spec.fields[0].field_id);
assert_eq!(1000, spec.fields[1].field_id);
assert!(!spec.has_sequential_ids());
}
#[test]
fn test_sequential_ids_must_have_no_gaps() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int))
.into(),
NestedField::required(
2,
"name",
Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();
let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: Some(1000),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
field_id: Some(1002),
name: "name".to_string(),
transform: Transform::Identity,
})
.unwrap()
.build()
.unwrap();
assert_eq!(1000, spec.fields[0].field_id);
assert_eq!(1002, spec.fields[1].field_id);
assert!(!spec.has_sequential_ids());
}
}