iceberg/transform/
bucket.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow_array::ArrayRef;
21use arrow_schema::{DataType, TimeUnit};
22
23use super::TransformFunction;
24use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType};
25
26#[derive(Debug)]
27pub struct Bucket {
28    mod_n: u32,
29}
30
31impl Bucket {
32    pub fn new(mod_n: u32) -> Self {
33        Self { mod_n }
34    }
35}
36
37impl Bucket {
38    /// When switch the hash function, we only need to change this function.
39    #[inline]
40    fn hash_bytes(mut v: &[u8]) -> i32 {
41        murmur3::murmur3_32(&mut v, 0).unwrap() as i32
42    }
43
44    #[inline]
45    fn hash_int(v: i32) -> i32 {
46        Self::hash_long(v as i64)
47    }
48
49    #[inline]
50    fn hash_long(v: i64) -> i32 {
51        Self::hash_bytes(v.to_le_bytes().as_slice())
52    }
53
54    /// v is days from unix epoch
55    #[inline]
56    fn hash_date(v: i32) -> i32 {
57        Self::hash_int(v)
58    }
59
60    /// v is microseconds from midnight
61    #[inline]
62    fn hash_time(v: i64) -> i32 {
63        Self::hash_long(v)
64    }
65
66    /// v is microseconds from unix epoch
67    #[inline]
68    fn hash_timestamp(v: i64) -> i32 {
69        Self::hash_long(v)
70    }
71
72    #[inline]
73    fn hash_str(s: &str) -> i32 {
74        Self::hash_bytes(s.as_bytes())
75    }
76
77    /// Decimal values are hashed using the minimum number of bytes required to hold the unscaled value as a two’s complement big-endian
78    /// ref: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
79    #[inline]
80    fn hash_decimal(v: i128) -> i32 {
81        if v == 0 {
82            return Self::hash_bytes(&[0]);
83        }
84
85        let bytes = v.to_be_bytes();
86        let start = if v > 0 {
87            // Positive: skip 0x00 unless next byte would appear negative
88            bytes
89                .windows(2)
90                .position(|w| w[0] != 0x00 || w[1] & 0x80 != 0)
91                .unwrap_or(15)
92        } else {
93            // Negative: skip 0xFF only if next byte stays negative
94            bytes
95                .windows(2)
96                .position(|w| w[0] != 0xFF || w[1] & 0x80 == 0)
97                .unwrap_or(15)
98        };
99
100        Self::hash_bytes(&bytes[start..])
101    }
102
103    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
104    /// ref: https://iceberg.apache.org/spec/#partitioning
105    #[inline]
106    fn bucket_n(&self, v: i32) -> i32 {
107        (v & i32::MAX) % (self.mod_n as i32)
108    }
109
110    #[inline]
111    fn bucket_int(&self, v: i32) -> i32 {
112        self.bucket_n(Self::hash_int(v))
113    }
114
115    #[inline]
116    fn bucket_long(&self, v: i64) -> i32 {
117        self.bucket_n(Self::hash_long(v))
118    }
119
120    #[inline]
121    fn bucket_decimal(&self, v: i128) -> i32 {
122        self.bucket_n(Self::hash_decimal(v))
123    }
124
125    #[inline]
126    fn bucket_date(&self, v: i32) -> i32 {
127        self.bucket_n(Self::hash_date(v))
128    }
129
130    #[inline]
131    fn bucket_time(&self, v: i64) -> i32 {
132        self.bucket_n(Self::hash_time(v))
133    }
134
135    #[inline]
136    fn bucket_timestamp(&self, v: i64) -> i32 {
137        self.bucket_n(Self::hash_timestamp(v))
138    }
139
140    #[inline]
141    fn bucket_str(&self, v: &str) -> i32 {
142        self.bucket_n(Self::hash_str(v))
143    }
144
145    #[inline]
146    fn bucket_bytes(&self, v: &[u8]) -> i32 {
147        self.bucket_n(Self::hash_bytes(v))
148    }
149}
150
151impl TransformFunction for Bucket {
152    fn transform(&self, input: ArrayRef) -> crate::Result<ArrayRef> {
153        let res: arrow_array::Int32Array = match input.data_type() {
154            DataType::Int32 => input
155                .as_any()
156                .downcast_ref::<arrow_array::Int32Array>()
157                .unwrap()
158                .unary(|v| self.bucket_int(v)),
159            DataType::Int64 => input
160                .as_any()
161                .downcast_ref::<arrow_array::Int64Array>()
162                .unwrap()
163                .unary(|v| self.bucket_long(v)),
164            DataType::Decimal128(_, _) => input
165                .as_any()
166                .downcast_ref::<arrow_array::Decimal128Array>()
167                .unwrap()
168                .unary(|v| self.bucket_decimal(v)),
169            DataType::Date32 => input
170                .as_any()
171                .downcast_ref::<arrow_array::Date32Array>()
172                .unwrap()
173                .unary(|v| self.bucket_date(v)),
174            DataType::Time64(TimeUnit::Microsecond) => input
175                .as_any()
176                .downcast_ref::<arrow_array::Time64MicrosecondArray>()
177                .unwrap()
178                .unary(|v| self.bucket_time(v)),
179            DataType::Timestamp(TimeUnit::Microsecond, _) => input
180                .as_any()
181                .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
182                .unwrap()
183                .unary(|v| self.bucket_timestamp(v)),
184            DataType::Time64(TimeUnit::Nanosecond) => input
185                .as_any()
186                .downcast_ref::<arrow_array::Time64NanosecondArray>()
187                .unwrap()
188                .unary(|v| self.bucket_time(v / 1000)),
189            DataType::Timestamp(TimeUnit::Nanosecond, _) => input
190                .as_any()
191                .downcast_ref::<arrow_array::TimestampNanosecondArray>()
192                .unwrap()
193                .unary(|v| self.bucket_timestamp(v / 1000)),
194            DataType::Utf8 => arrow_array::Int32Array::from_iter(
195                input
196                    .as_any()
197                    .downcast_ref::<arrow_array::StringArray>()
198                    .unwrap()
199                    .iter()
200                    .map(|v| v.map(|v| self.bucket_str(v))),
201            ),
202            DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
203                input
204                    .as_any()
205                    .downcast_ref::<arrow_array::LargeStringArray>()
206                    .unwrap()
207                    .iter()
208                    .map(|v| v.map(|v| self.bucket_str(v))),
209            ),
210            DataType::Binary => arrow_array::Int32Array::from_iter(
211                input
212                    .as_any()
213                    .downcast_ref::<arrow_array::BinaryArray>()
214                    .unwrap()
215                    .iter()
216                    .map(|v| v.map(|v| self.bucket_bytes(v))),
217            ),
218            DataType::LargeBinary => arrow_array::Int32Array::from_iter(
219                input
220                    .as_any()
221                    .downcast_ref::<arrow_array::LargeBinaryArray>()
222                    .unwrap()
223                    .iter()
224                    .map(|v| v.map(|v| self.bucket_bytes(v))),
225            ),
226            DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
227                input
228                    .as_any()
229                    .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
230                    .unwrap()
231                    .iter()
232                    .map(|v| v.map(|v| self.bucket_bytes(v))),
233            ),
234            _ => {
235                return Err(crate::Error::new(
236                    crate::ErrorKind::FeatureUnsupported,
237                    format!(
238                        "Unsupported data type for bucket transform: {:?}",
239                        input.data_type()
240                    ),
241                ));
242            }
243        };
244        Ok(Arc::new(res))
245    }
246
247    fn transform_literal(&self, input: &Datum) -> crate::Result<Option<Datum>> {
248        let val = match (input.data_type(), input.literal()) {
249            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => self.bucket_int(*v),
250            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => self.bucket_long(*v),
251            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => self.bucket_decimal(*v),
252            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => self.bucket_date(*v),
253            (PrimitiveType::Time, PrimitiveLiteral::Long(v)) => self.bucket_time(*v),
254            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v),
255            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v),
256            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
257                self.bucket_timestamp(*v / 1000)
258            }
259            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
260                self.bucket_timestamp(*v / 1000)
261            }
262            (PrimitiveType::String, PrimitiveLiteral::String(v)) => self.bucket_str(v.as_str()),
263            (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(v)) => {
264                self.bucket_bytes(uuid::Uuid::from_u128(*v).as_ref())
265            }
266            (PrimitiveType::Binary, PrimitiveLiteral::Binary(v)) => self.bucket_bytes(v.as_ref()),
267            (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(v)) => self.bucket_bytes(v.as_ref()),
268            _ => {
269                return Err(crate::Error::new(
270                    crate::ErrorKind::FeatureUnsupported,
271                    format!(
272                        "Unsupported data type for bucket transform: {:?}",
273                        input.data_type()
274                    ),
275                ));
276            }
277        };
278        Ok(Some(Datum::int(val)))
279    }
280}
281
282#[cfg(test)]
283mod test {
284    use std::sync::Arc;
285
286    use arrow_array::{ArrayRef, Int32Array, TimestampMicrosecondArray, TimestampNanosecondArray};
287    use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
288
289    use super::Bucket;
290    use crate::Result;
291    use crate::expr::PredicateOperator;
292    use crate::spec::PrimitiveType::{
293        Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp,
294        TimestampNs, Timestamptz, TimestamptzNs, Uuid,
295    };
296    use crate::spec::Type::{Primitive, Struct};
297    use crate::spec::decimal_utils::decimal_new;
298    use crate::spec::{Datum, NestedField, PrimitiveType, StructType, Transform, Type};
299    use crate::transform::TransformFunction;
300    use crate::transform::test::{TestProjectionFixture, TestTransformFixture};
301
302    #[test]
303    fn test_bucket_transform() {
304        let trans = Transform::Bucket(8);
305
306        let fixture = TestTransformFixture {
307            display: "bucket[8]".to_string(),
308            json: r#""bucket[8]""#.to_string(),
309            dedup_name: "bucket[8]".to_string(),
310            preserves_order: false,
311            satisfies_order_of: vec![
312                (Transform::Bucket(8), true),
313                (Transform::Bucket(4), false),
314                (Transform::Void, false),
315                (Transform::Day, false),
316            ],
317            trans_types: vec![
318                (Primitive(Binary), Some(Primitive(Int))),
319                (Primitive(Date), Some(Primitive(Int))),
320                (
321                    Primitive(Decimal {
322                        precision: 8,
323                        scale: 5,
324                    }),
325                    Some(Primitive(Int)),
326                ),
327                (Primitive(Fixed(8)), Some(Primitive(Int))),
328                (Primitive(Int), Some(Primitive(Int))),
329                (Primitive(Long), Some(Primitive(Int))),
330                (Primitive(StringType), Some(Primitive(Int))),
331                (Primitive(Uuid), Some(Primitive(Int))),
332                (Primitive(Time), Some(Primitive(Int))),
333                (Primitive(Timestamp), Some(Primitive(Int))),
334                (Primitive(Timestamptz), Some(Primitive(Int))),
335                (Primitive(TimestampNs), Some(Primitive(Int))),
336                (Primitive(TimestamptzNs), Some(Primitive(Int))),
337                (
338                    Struct(StructType::new(vec![
339                        NestedField::optional(1, "a", Primitive(Timestamp)).into(),
340                    ])),
341                    None,
342                ),
343            ],
344        };
345
346        fixture.assert_transform(trans);
347    }
348
349    #[test]
350    fn test_projection_bucket_uuid() -> Result<()> {
351        let value = uuid::Uuid::from_u64_pair(123, 456);
352        let another = uuid::Uuid::from_u64_pair(456, 123);
353
354        let fixture = TestProjectionFixture::new(
355            Transform::Bucket(10),
356            "name",
357            NestedField::required(1, "value", Type::Primitive(PrimitiveType::Uuid)),
358        );
359
360        fixture.assert_projection(
361            &fixture.binary_predicate(PredicateOperator::Eq, Datum::uuid(value)),
362            Some("name = 4"),
363        )?;
364
365        fixture.assert_projection(
366            &fixture.binary_predicate(PredicateOperator::NotEq, Datum::uuid(value)),
367            None,
368        )?;
369
370        fixture.assert_projection(
371            &fixture.binary_predicate(PredicateOperator::LessThan, Datum::uuid(value)),
372            None,
373        )?;
374
375        fixture.assert_projection(
376            &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::uuid(value)),
377            None,
378        )?;
379
380        fixture.assert_projection(
381            &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::uuid(value)),
382            None,
383        )?;
384
385        fixture.assert_projection(
386            &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::uuid(value)),
387            None,
388        )?;
389
390        fixture.assert_projection(
391            &fixture.set_predicate(PredicateOperator::In, vec![
392                Datum::uuid(value),
393                Datum::uuid(another),
394            ]),
395            Some("name IN (4, 6)"),
396        )?;
397
398        fixture.assert_projection(
399            &fixture.set_predicate(PredicateOperator::NotIn, vec![
400                Datum::uuid(value),
401                Datum::uuid(another),
402            ]),
403            None,
404        )?;
405
406        Ok(())
407    }
408
409    #[test]
410    fn test_projection_bucket_fixed() -> Result<()> {
411        let value = "abcdefg".as_bytes().to_vec();
412        let another = "abcdehij".as_bytes().to_vec();
413
414        let fixture = TestProjectionFixture::new(
415            Transform::Bucket(10),
416            "name",
417            NestedField::required(
418                1,
419                "value",
420                Type::Primitive(PrimitiveType::Fixed(value.len() as u64)),
421            ),
422        );
423
424        fixture.assert_projection(
425            &fixture.binary_predicate(PredicateOperator::Eq, Datum::fixed(value.clone())),
426            Some("name = 4"),
427        )?;
428
429        fixture.assert_projection(
430            &fixture.binary_predicate(PredicateOperator::NotEq, Datum::fixed(value.clone())),
431            None,
432        )?;
433
434        fixture.assert_projection(
435            &fixture.binary_predicate(PredicateOperator::LessThan, Datum::fixed(value.clone())),
436            None,
437        )?;
438
439        fixture.assert_projection(
440            &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::fixed(value.clone())),
441            None,
442        )?;
443
444        fixture.assert_projection(
445            &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::fixed(value.clone())),
446            None,
447        )?;
448
449        fixture.assert_projection(
450            &fixture.binary_predicate(
451                PredicateOperator::GreaterThanOrEq,
452                Datum::fixed(value.clone()),
453            ),
454            None,
455        )?;
456
457        fixture.assert_projection(
458            &fixture.set_predicate(PredicateOperator::In, vec![
459                Datum::fixed(value.clone()),
460                Datum::fixed(another.clone()),
461            ]),
462            Some("name IN (4, 6)"),
463        )?;
464
465        fixture.assert_projection(
466            &fixture.set_predicate(PredicateOperator::NotIn, vec![
467                Datum::fixed(value.clone()),
468                Datum::fixed(another.clone()),
469            ]),
470            None,
471        )?;
472
473        Ok(())
474    }
475
476    #[test]
477    fn test_projection_bucket_string() -> Result<()> {
478        let value = "abcdefg";
479        let another = "abcdefgabc";
480
481        let fixture = TestProjectionFixture::new(
482            Transform::Bucket(10),
483            "name",
484            NestedField::required(1, "value", Type::Primitive(PrimitiveType::String)),
485        );
486
487        fixture.assert_projection(
488            &fixture.binary_predicate(PredicateOperator::Eq, Datum::string(value)),
489            Some("name = 4"),
490        )?;
491
492        fixture.assert_projection(
493            &fixture.binary_predicate(PredicateOperator::NotEq, Datum::string(value)),
494            None,
495        )?;
496
497        fixture.assert_projection(
498            &fixture.binary_predicate(PredicateOperator::LessThan, Datum::string(value)),
499            None,
500        )?;
501
502        fixture.assert_projection(
503            &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::string(value)),
504            None,
505        )?;
506
507        fixture.assert_projection(
508            &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::string(value)),
509            None,
510        )?;
511
512        fixture.assert_projection(
513            &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::string(value)),
514            None,
515        )?;
516
517        fixture.assert_projection(
518            &fixture.set_predicate(PredicateOperator::In, vec![
519                Datum::string(value),
520                Datum::string(another),
521            ]),
522            Some("name IN (9, 4)"),
523        )?;
524
525        fixture.assert_projection(
526            &fixture.set_predicate(PredicateOperator::NotIn, vec![
527                Datum::string(value),
528                Datum::string(another),
529            ]),
530            None,
531        )?;
532
533        Ok(())
534    }
535
536    #[test]
537    fn test_projection_bucket_decimal() -> Result<()> {
538        let prev = "99.00";
539        let curr = "100.00";
540        let next = "101.00";
541
542        let fixture = TestProjectionFixture::new(
543            Transform::Bucket(10),
544            "name",
545            NestedField::required(
546                1,
547                "value",
548                Type::Primitive(PrimitiveType::Decimal {
549                    precision: 9,
550                    scale: 2,
551                }),
552            ),
553        );
554
555        fixture.assert_projection(
556            &fixture.binary_predicate(PredicateOperator::Eq, Datum::decimal_from_str(curr)?),
557            Some("name = 2"),
558        )?;
559
560        fixture.assert_projection(
561            &fixture.binary_predicate(PredicateOperator::NotEq, Datum::decimal_from_str(curr)?),
562            None,
563        )?;
564
565        fixture.assert_projection(
566            &fixture.binary_predicate(PredicateOperator::LessThan, Datum::decimal_from_str(curr)?),
567            None,
568        )?;
569
570        fixture.assert_projection(
571            &fixture.binary_predicate(
572                PredicateOperator::LessThanOrEq,
573                Datum::decimal_from_str(curr)?,
574            ),
575            None,
576        )?;
577
578        fixture.assert_projection(
579            &fixture.binary_predicate(
580                PredicateOperator::GreaterThan,
581                Datum::decimal_from_str(curr)?,
582            ),
583            None,
584        )?;
585
586        fixture.assert_projection(
587            &fixture.binary_predicate(
588                PredicateOperator::GreaterThanOrEq,
589                Datum::decimal_from_str(curr)?,
590            ),
591            None,
592        )?;
593
594        fixture.assert_projection(
595            &fixture.set_predicate(PredicateOperator::In, vec![
596                Datum::decimal_from_str(next)?,
597                Datum::decimal_from_str(curr)?,
598                Datum::decimal_from_str(prev)?,
599            ]),
600            Some("name IN (2, 6)"),
601        )?;
602
603        fixture.assert_projection(
604            &fixture.set_predicate(PredicateOperator::NotIn, vec![
605                Datum::decimal_from_str(curr)?,
606                Datum::decimal_from_str(next)?,
607            ]),
608            None,
609        )?;
610
611        Ok(())
612    }
613
614    #[test]
615    fn test_projection_bucket_long() -> Result<()> {
616        let value = 100;
617        let fixture = TestProjectionFixture::new(
618            Transform::Bucket(10),
619            "name",
620            NestedField::required(1, "value", Type::Primitive(PrimitiveType::Long)),
621        );
622
623        fixture.assert_projection(
624            &fixture.binary_predicate(PredicateOperator::Eq, Datum::long(value)),
625            Some("name = 6"),
626        )?;
627
628        fixture.assert_projection(
629            &fixture.binary_predicate(PredicateOperator::NotEq, Datum::long(value)),
630            None,
631        )?;
632
633        fixture.assert_projection(
634            &fixture.binary_predicate(PredicateOperator::LessThan, Datum::long(value)),
635            None,
636        )?;
637
638        fixture.assert_projection(
639            &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::long(value)),
640            None,
641        )?;
642
643        fixture.assert_projection(
644            &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::long(value)),
645            None,
646        )?;
647
648        fixture.assert_projection(
649            &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::long(value)),
650            None,
651        )?;
652
653        fixture.assert_projection(
654            &fixture.set_predicate(PredicateOperator::In, vec![
655                Datum::long(value - 1),
656                Datum::long(value),
657                Datum::long(value + 1),
658            ]),
659            Some("name IN (8, 7, 6)"),
660        )?;
661
662        fixture.assert_projection(
663            &fixture.set_predicate(PredicateOperator::NotIn, vec![
664                Datum::long(value),
665                Datum::long(value + 1),
666            ]),
667            None,
668        )?;
669
670        Ok(())
671    }
672
673    #[test]
674    fn test_projection_bucket_integer() -> Result<()> {
675        let value = 100;
676
677        let fixture = TestProjectionFixture::new(
678            Transform::Bucket(10),
679            "name",
680            NestedField::required(1, "value", Type::Primitive(PrimitiveType::Int)),
681        );
682
683        fixture.assert_projection(
684            &fixture.binary_predicate(PredicateOperator::Eq, Datum::int(value)),
685            Some("name = 6"),
686        )?;
687
688        fixture.assert_projection(
689            &fixture.binary_predicate(PredicateOperator::NotEq, Datum::int(value)),
690            None,
691        )?;
692
693        fixture.assert_projection(
694            &fixture.binary_predicate(PredicateOperator::LessThan, Datum::int(value)),
695            None,
696        )?;
697
698        fixture.assert_projection(
699            &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::int(value)),
700            None,
701        )?;
702
703        fixture.assert_projection(
704            &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::int(value)),
705            None,
706        )?;
707
708        fixture.assert_projection(
709            &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::int(value)),
710            None,
711        )?;
712
713        fixture.assert_projection(
714            &fixture.set_predicate(PredicateOperator::In, vec![
715                Datum::int(value - 1),
716                Datum::int(value),
717                Datum::int(value + 1),
718            ]),
719            Some("name IN (8, 7, 6)"),
720        )?;
721
722        fixture.assert_projection(
723            &fixture.set_predicate(PredicateOperator::NotIn, vec![
724                Datum::int(value),
725                Datum::int(value + 1),
726            ]),
727            None,
728        )?;
729
730        Ok(())
731    }
732
733    #[test]
734    fn test_hash() {
735        // test int
736        assert_eq!(Bucket::hash_int(34), 2017239379);
737        // test long
738        assert_eq!(Bucket::hash_long(34), 2017239379);
739        // test decimal
740        assert_eq!(Bucket::hash_decimal(1420), -500754589);
741        // test date
742        let date = NaiveDate::from_ymd_opt(2017, 11, 16).unwrap();
743        assert_eq!(
744            Bucket::hash_date(
745                date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
746                    .num_days() as i32
747            ),
748            -653330422
749        );
750        // test time
751        let time = NaiveTime::from_hms_opt(22, 31, 8).unwrap();
752        assert_eq!(
753            Bucket::hash_time(
754                time.signed_duration_since(NaiveTime::from_hms_opt(0, 0, 0).unwrap())
755                    .num_microseconds()
756                    .unwrap()
757            ),
758            -662762989
759        );
760        // test timestamp
761        let timestamp =
762            NaiveDateTime::parse_from_str("2017-11-16 22:31:08", "%Y-%m-%d %H:%M:%S").unwrap();
763        assert_eq!(
764            Bucket::hash_timestamp(
765                timestamp
766                    .signed_duration_since(
767                        NaiveDateTime::parse_from_str("1970-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
768                            .unwrap()
769                    )
770                    .num_microseconds()
771                    .unwrap()
772            ),
773            -2047944441
774        );
775        // test timestamp with tz
776        let timestamp = DateTime::parse_from_rfc3339("2017-11-16T14:31:08-08:00").unwrap();
777        assert_eq!(
778            Bucket::hash_timestamp(
779                timestamp
780                    .signed_duration_since(
781                        DateTime::parse_from_rfc3339("1970-01-01T00:00:00-00:00").unwrap()
782                    )
783                    .num_microseconds()
784                    .unwrap()
785            ),
786            -2047944441
787        );
788        // test str
789        assert_eq!(Bucket::hash_str("iceberg"), 1210000089);
790        // test uuid
791        assert_eq!(
792            Bucket::hash_bytes(
793                [
794                    0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, 0xA4, 0x79, 0x3F, 0x34, 0x9C,
795                    0xB7, 0x85, 0xE7
796                ]
797                .as_ref()
798            ),
799            1488055340
800        );
801        // test fixed and binary
802        assert_eq!(
803            Bucket::hash_bytes([0x00, 0x01, 0x02, 0x03].as_ref()),
804            -188683207
805        );
806    }
807
808    #[test]
809    fn test_hash_decimal_with_negative_value() {
810        // Test cases from GitHub issue #1981
811        assert_eq!(Bucket::hash_decimal(1), -463810133);
812        assert_eq!(Bucket::hash_decimal(-1), -43192051);
813
814        // Additional test cases for edge case values
815        assert_eq!(Bucket::hash_decimal(0), Bucket::hash_decimal(0));
816        assert_eq!(Bucket::hash_decimal(127), Bucket::hash_decimal(127));
817        assert_eq!(Bucket::hash_decimal(-128), Bucket::hash_decimal(-128));
818
819        // Test minimum representation is used
820        // -1 should hash as [0xFF] not [0xFF, 0xFF, ..., 0xFF]
821        // 128 should hash as [0x00, 0x80] not [0x00, 0x00, ..., 0x80]
822        assert_eq!(Bucket::hash_decimal(128), Bucket::hash_bytes(&[0x00, 0x80]));
823        assert_eq!(
824            Bucket::hash_decimal(-129),
825            Bucket::hash_bytes(&[0xFF, 0x7F])
826        );
827    }
828
829    #[test]
830    fn test_int_literal() {
831        let bucket = Bucket::new(10);
832        assert_eq!(
833            bucket.transform_literal(&Datum::int(34)).unwrap().unwrap(),
834            Datum::int(9)
835        );
836    }
837
838    #[test]
839    fn test_long_literal() {
840        let bucket = Bucket::new(10);
841        assert_eq!(
842            bucket.transform_literal(&Datum::long(34)).unwrap().unwrap(),
843            Datum::int(9)
844        );
845    }
846
847    #[test]
848    fn test_decimal_literal() {
849        let bucket = Bucket::new(10);
850        assert_eq!(
851            bucket
852                .transform_literal(&Datum::decimal(decimal_new(1420, 0)).unwrap())
853                .unwrap()
854                .unwrap(),
855            Datum::int(9)
856        );
857    }
858
859    #[test]
860    fn test_date_literal() {
861        let bucket = Bucket::new(100);
862        assert_eq!(
863            bucket
864                .transform_literal(&Datum::date(17486))
865                .unwrap()
866                .unwrap(),
867            Datum::int(26)
868        );
869    }
870
871    #[test]
872    fn test_time_literal() {
873        let bucket = Bucket::new(100);
874        assert_eq!(
875            bucket
876                .transform_literal(&Datum::time_micros(81068000000).unwrap())
877                .unwrap()
878                .unwrap(),
879            Datum::int(59)
880        );
881    }
882
883    #[test]
884    fn test_timestamp_literal() {
885        let bucket = Bucket::new(100);
886        assert_eq!(
887            bucket
888                .transform_literal(&Datum::timestamp_micros(1510871468000000))
889                .unwrap()
890                .unwrap(),
891            Datum::int(7)
892        );
893    }
894
895    #[test]
896    fn test_str_literal() {
897        let bucket = Bucket::new(100);
898        assert_eq!(
899            bucket
900                .transform_literal(&Datum::string("iceberg"))
901                .unwrap()
902                .unwrap(),
903            Datum::int(89)
904        );
905    }
906
907    #[test]
908    fn test_uuid_literal() {
909        let bucket = Bucket::new(100);
910        assert_eq!(
911            bucket
912                .transform_literal(&Datum::uuid(
913                    "F79C3E09-677C-4BBD-A479-3F349CB785E7".parse().unwrap()
914                ))
915                .unwrap()
916                .unwrap(),
917            Datum::int(40)
918        );
919    }
920
921    #[test]
922    fn test_binary_literal() {
923        let bucket = Bucket::new(128);
924        assert_eq!(
925            bucket
926                .transform_literal(&Datum::binary(b"\x00\x01\x02\x03".to_vec()))
927                .unwrap()
928                .unwrap(),
929            Datum::int(57)
930        );
931    }
932
933    #[test]
934    fn test_fixed_literal() {
935        let bucket = Bucket::new(128);
936        assert_eq!(
937            bucket
938                .transform_literal(&Datum::fixed(b"foo".to_vec()))
939                .unwrap()
940                .unwrap(),
941            Datum::int(32)
942        );
943    }
944
945    #[test]
946    fn test_timestamptz_literal() {
947        let bucket = Bucket::new(100);
948        assert_eq!(
949            bucket
950                .transform_literal(&Datum::timestamptz_micros(1510871468000000))
951                .unwrap()
952                .unwrap(),
953            Datum::int(7)
954        );
955    }
956
957    #[test]
958    fn test_timestamp_ns_literal() {
959        let bucket = Bucket::new(100);
960        let ns_value = 1510871468000000i64 * 1000;
961        assert_eq!(
962            bucket
963                .transform_literal(&Datum::timestamp_nanos(ns_value))
964                .unwrap()
965                .unwrap(),
966            Datum::int(7)
967        );
968    }
969
970    #[test]
971    fn test_timestamptz_ns_literal() {
972        let bucket = Bucket::new(100);
973        let ns_value = 1510871468000000i64 * 1000;
974        assert_eq!(
975            bucket
976                .transform_literal(&Datum::timestamptz_nanos(ns_value))
977                .unwrap()
978                .unwrap(),
979            Datum::int(7)
980        );
981    }
982
983    #[test]
984    fn test_transform_timestamp_nanos_and_micros_array_equivalence() {
985        let bucket = Bucket::new(100);
986        let micros_value = 1510871468000000;
987        let nanos_value = micros_value * 1000;
988
989        let micro_array = TimestampMicrosecondArray::from_iter_values(vec![micros_value]);
990        let nano_array = TimestampNanosecondArray::from_iter_values(vec![nanos_value]);
991
992        let transformed_micro: ArrayRef = bucket.transform(Arc::new(micro_array)).unwrap();
993        let transformed_nano: ArrayRef = bucket.transform(Arc::new(nano_array)).unwrap();
994
995        let micro_result = transformed_micro
996            .as_any()
997            .downcast_ref::<Int32Array>()
998            .unwrap();
999        let nano_result = transformed_nano
1000            .as_any()
1001            .downcast_ref::<Int32Array>()
1002            .unwrap();
1003
1004        assert_eq!(micro_result.value(0), nano_result.value(0));
1005    }
1006}