1use std::sync::Arc;
19
20use arrow_array::ArrayRef;
21use arrow_schema::{DataType, TimeUnit};
22
23use super::TransformFunction;
24use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType};
25
26#[derive(Debug)]
27pub struct Bucket {
28 mod_n: u32,
29}
30
31impl Bucket {
32 pub fn new(mod_n: u32) -> Self {
33 Self { mod_n }
34 }
35}
36
37impl Bucket {
38 #[inline]
40 fn hash_bytes(mut v: &[u8]) -> i32 {
41 murmur3::murmur3_32(&mut v, 0).unwrap() as i32
42 }
43
44 #[inline]
45 fn hash_int(v: i32) -> i32 {
46 Self::hash_long(v as i64)
47 }
48
49 #[inline]
50 fn hash_long(v: i64) -> i32 {
51 Self::hash_bytes(v.to_le_bytes().as_slice())
52 }
53
54 #[inline]
56 fn hash_date(v: i32) -> i32 {
57 Self::hash_int(v)
58 }
59
60 #[inline]
62 fn hash_time(v: i64) -> i32 {
63 Self::hash_long(v)
64 }
65
66 #[inline]
68 fn hash_timestamp(v: i64) -> i32 {
69 Self::hash_long(v)
70 }
71
72 #[inline]
73 fn hash_str(s: &str) -> i32 {
74 Self::hash_bytes(s.as_bytes())
75 }
76
77 #[inline]
80 fn hash_decimal(v: i128) -> i32 {
81 if v == 0 {
82 return Self::hash_bytes(&[0]);
83 }
84
85 let bytes = v.to_be_bytes();
86 let start = if v > 0 {
87 bytes
89 .windows(2)
90 .position(|w| w[0] != 0x00 || w[1] & 0x80 != 0)
91 .unwrap_or(15)
92 } else {
93 bytes
95 .windows(2)
96 .position(|w| w[0] != 0xFF || w[1] & 0x80 == 0)
97 .unwrap_or(15)
98 };
99
100 Self::hash_bytes(&bytes[start..])
101 }
102
103 #[inline]
106 fn bucket_n(&self, v: i32) -> i32 {
107 (v & i32::MAX) % (self.mod_n as i32)
108 }
109
110 #[inline]
111 fn bucket_int(&self, v: i32) -> i32 {
112 self.bucket_n(Self::hash_int(v))
113 }
114
115 #[inline]
116 fn bucket_long(&self, v: i64) -> i32 {
117 self.bucket_n(Self::hash_long(v))
118 }
119
120 #[inline]
121 fn bucket_decimal(&self, v: i128) -> i32 {
122 self.bucket_n(Self::hash_decimal(v))
123 }
124
125 #[inline]
126 fn bucket_date(&self, v: i32) -> i32 {
127 self.bucket_n(Self::hash_date(v))
128 }
129
130 #[inline]
131 fn bucket_time(&self, v: i64) -> i32 {
132 self.bucket_n(Self::hash_time(v))
133 }
134
135 #[inline]
136 fn bucket_timestamp(&self, v: i64) -> i32 {
137 self.bucket_n(Self::hash_timestamp(v))
138 }
139
140 #[inline]
141 fn bucket_str(&self, v: &str) -> i32 {
142 self.bucket_n(Self::hash_str(v))
143 }
144
145 #[inline]
146 fn bucket_bytes(&self, v: &[u8]) -> i32 {
147 self.bucket_n(Self::hash_bytes(v))
148 }
149}
150
151impl TransformFunction for Bucket {
152 fn transform(&self, input: ArrayRef) -> crate::Result<ArrayRef> {
153 let res: arrow_array::Int32Array = match input.data_type() {
154 DataType::Int32 => input
155 .as_any()
156 .downcast_ref::<arrow_array::Int32Array>()
157 .unwrap()
158 .unary(|v| self.bucket_int(v)),
159 DataType::Int64 => input
160 .as_any()
161 .downcast_ref::<arrow_array::Int64Array>()
162 .unwrap()
163 .unary(|v| self.bucket_long(v)),
164 DataType::Decimal128(_, _) => input
165 .as_any()
166 .downcast_ref::<arrow_array::Decimal128Array>()
167 .unwrap()
168 .unary(|v| self.bucket_decimal(v)),
169 DataType::Date32 => input
170 .as_any()
171 .downcast_ref::<arrow_array::Date32Array>()
172 .unwrap()
173 .unary(|v| self.bucket_date(v)),
174 DataType::Time64(TimeUnit::Microsecond) => input
175 .as_any()
176 .downcast_ref::<arrow_array::Time64MicrosecondArray>()
177 .unwrap()
178 .unary(|v| self.bucket_time(v)),
179 DataType::Timestamp(TimeUnit::Microsecond, _) => input
180 .as_any()
181 .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
182 .unwrap()
183 .unary(|v| self.bucket_timestamp(v)),
184 DataType::Time64(TimeUnit::Nanosecond) => input
185 .as_any()
186 .downcast_ref::<arrow_array::Time64NanosecondArray>()
187 .unwrap()
188 .unary(|v| self.bucket_time(v / 1000)),
189 DataType::Timestamp(TimeUnit::Nanosecond, _) => input
190 .as_any()
191 .downcast_ref::<arrow_array::TimestampNanosecondArray>()
192 .unwrap()
193 .unary(|v| self.bucket_timestamp(v / 1000)),
194 DataType::Utf8 => arrow_array::Int32Array::from_iter(
195 input
196 .as_any()
197 .downcast_ref::<arrow_array::StringArray>()
198 .unwrap()
199 .iter()
200 .map(|v| v.map(|v| self.bucket_str(v))),
201 ),
202 DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
203 input
204 .as_any()
205 .downcast_ref::<arrow_array::LargeStringArray>()
206 .unwrap()
207 .iter()
208 .map(|v| v.map(|v| self.bucket_str(v))),
209 ),
210 DataType::Binary => arrow_array::Int32Array::from_iter(
211 input
212 .as_any()
213 .downcast_ref::<arrow_array::BinaryArray>()
214 .unwrap()
215 .iter()
216 .map(|v| v.map(|v| self.bucket_bytes(v))),
217 ),
218 DataType::LargeBinary => arrow_array::Int32Array::from_iter(
219 input
220 .as_any()
221 .downcast_ref::<arrow_array::LargeBinaryArray>()
222 .unwrap()
223 .iter()
224 .map(|v| v.map(|v| self.bucket_bytes(v))),
225 ),
226 DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
227 input
228 .as_any()
229 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
230 .unwrap()
231 .iter()
232 .map(|v| v.map(|v| self.bucket_bytes(v))),
233 ),
234 _ => {
235 return Err(crate::Error::new(
236 crate::ErrorKind::FeatureUnsupported,
237 format!(
238 "Unsupported data type for bucket transform: {:?}",
239 input.data_type()
240 ),
241 ));
242 }
243 };
244 Ok(Arc::new(res))
245 }
246
247 fn transform_literal(&self, input: &Datum) -> crate::Result<Option<Datum>> {
248 let val = match (input.data_type(), input.literal()) {
249 (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => self.bucket_int(*v),
250 (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => self.bucket_long(*v),
251 (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => self.bucket_decimal(*v),
252 (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => self.bucket_date(*v),
253 (PrimitiveType::Time, PrimitiveLiteral::Long(v)) => self.bucket_time(*v),
254 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v),
255 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v),
256 (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
257 self.bucket_timestamp(*v / 1000)
258 }
259 (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
260 self.bucket_timestamp(*v / 1000)
261 }
262 (PrimitiveType::String, PrimitiveLiteral::String(v)) => self.bucket_str(v.as_str()),
263 (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(v)) => {
264 self.bucket_bytes(uuid::Uuid::from_u128(*v).as_ref())
265 }
266 (PrimitiveType::Binary, PrimitiveLiteral::Binary(v)) => self.bucket_bytes(v.as_ref()),
267 (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(v)) => self.bucket_bytes(v.as_ref()),
268 _ => {
269 return Err(crate::Error::new(
270 crate::ErrorKind::FeatureUnsupported,
271 format!(
272 "Unsupported data type for bucket transform: {:?}",
273 input.data_type()
274 ),
275 ));
276 }
277 };
278 Ok(Some(Datum::int(val)))
279 }
280}
281
282#[cfg(test)]
283mod test {
284 use std::sync::Arc;
285
286 use arrow_array::{ArrayRef, Int32Array, TimestampMicrosecondArray, TimestampNanosecondArray};
287 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
288
289 use super::Bucket;
290 use crate::Result;
291 use crate::expr::PredicateOperator;
292 use crate::spec::PrimitiveType::{
293 Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp,
294 TimestampNs, Timestamptz, TimestamptzNs, Uuid,
295 };
296 use crate::spec::Type::{Primitive, Struct};
297 use crate::spec::decimal_utils::decimal_new;
298 use crate::spec::{Datum, NestedField, PrimitiveType, StructType, Transform, Type};
299 use crate::transform::TransformFunction;
300 use crate::transform::test::{TestProjectionFixture, TestTransformFixture};
301
302 #[test]
303 fn test_bucket_transform() {
304 let trans = Transform::Bucket(8);
305
306 let fixture = TestTransformFixture {
307 display: "bucket[8]".to_string(),
308 json: r#""bucket[8]""#.to_string(),
309 dedup_name: "bucket[8]".to_string(),
310 preserves_order: false,
311 satisfies_order_of: vec![
312 (Transform::Bucket(8), true),
313 (Transform::Bucket(4), false),
314 (Transform::Void, false),
315 (Transform::Day, false),
316 ],
317 trans_types: vec![
318 (Primitive(Binary), Some(Primitive(Int))),
319 (Primitive(Date), Some(Primitive(Int))),
320 (
321 Primitive(Decimal {
322 precision: 8,
323 scale: 5,
324 }),
325 Some(Primitive(Int)),
326 ),
327 (Primitive(Fixed(8)), Some(Primitive(Int))),
328 (Primitive(Int), Some(Primitive(Int))),
329 (Primitive(Long), Some(Primitive(Int))),
330 (Primitive(StringType), Some(Primitive(Int))),
331 (Primitive(Uuid), Some(Primitive(Int))),
332 (Primitive(Time), Some(Primitive(Int))),
333 (Primitive(Timestamp), Some(Primitive(Int))),
334 (Primitive(Timestamptz), Some(Primitive(Int))),
335 (Primitive(TimestampNs), Some(Primitive(Int))),
336 (Primitive(TimestamptzNs), Some(Primitive(Int))),
337 (
338 Struct(StructType::new(vec![
339 NestedField::optional(1, "a", Primitive(Timestamp)).into(),
340 ])),
341 None,
342 ),
343 ],
344 };
345
346 fixture.assert_transform(trans);
347 }
348
349 #[test]
350 fn test_projection_bucket_uuid() -> Result<()> {
351 let value = uuid::Uuid::from_u64_pair(123, 456);
352 let another = uuid::Uuid::from_u64_pair(456, 123);
353
354 let fixture = TestProjectionFixture::new(
355 Transform::Bucket(10),
356 "name",
357 NestedField::required(1, "value", Type::Primitive(PrimitiveType::Uuid)),
358 );
359
360 fixture.assert_projection(
361 &fixture.binary_predicate(PredicateOperator::Eq, Datum::uuid(value)),
362 Some("name = 4"),
363 )?;
364
365 fixture.assert_projection(
366 &fixture.binary_predicate(PredicateOperator::NotEq, Datum::uuid(value)),
367 None,
368 )?;
369
370 fixture.assert_projection(
371 &fixture.binary_predicate(PredicateOperator::LessThan, Datum::uuid(value)),
372 None,
373 )?;
374
375 fixture.assert_projection(
376 &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::uuid(value)),
377 None,
378 )?;
379
380 fixture.assert_projection(
381 &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::uuid(value)),
382 None,
383 )?;
384
385 fixture.assert_projection(
386 &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::uuid(value)),
387 None,
388 )?;
389
390 fixture.assert_projection(
391 &fixture.set_predicate(PredicateOperator::In, vec![
392 Datum::uuid(value),
393 Datum::uuid(another),
394 ]),
395 Some("name IN (4, 6)"),
396 )?;
397
398 fixture.assert_projection(
399 &fixture.set_predicate(PredicateOperator::NotIn, vec![
400 Datum::uuid(value),
401 Datum::uuid(another),
402 ]),
403 None,
404 )?;
405
406 Ok(())
407 }
408
409 #[test]
410 fn test_projection_bucket_fixed() -> Result<()> {
411 let value = "abcdefg".as_bytes().to_vec();
412 let another = "abcdehij".as_bytes().to_vec();
413
414 let fixture = TestProjectionFixture::new(
415 Transform::Bucket(10),
416 "name",
417 NestedField::required(
418 1,
419 "value",
420 Type::Primitive(PrimitiveType::Fixed(value.len() as u64)),
421 ),
422 );
423
424 fixture.assert_projection(
425 &fixture.binary_predicate(PredicateOperator::Eq, Datum::fixed(value.clone())),
426 Some("name = 4"),
427 )?;
428
429 fixture.assert_projection(
430 &fixture.binary_predicate(PredicateOperator::NotEq, Datum::fixed(value.clone())),
431 None,
432 )?;
433
434 fixture.assert_projection(
435 &fixture.binary_predicate(PredicateOperator::LessThan, Datum::fixed(value.clone())),
436 None,
437 )?;
438
439 fixture.assert_projection(
440 &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::fixed(value.clone())),
441 None,
442 )?;
443
444 fixture.assert_projection(
445 &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::fixed(value.clone())),
446 None,
447 )?;
448
449 fixture.assert_projection(
450 &fixture.binary_predicate(
451 PredicateOperator::GreaterThanOrEq,
452 Datum::fixed(value.clone()),
453 ),
454 None,
455 )?;
456
457 fixture.assert_projection(
458 &fixture.set_predicate(PredicateOperator::In, vec![
459 Datum::fixed(value.clone()),
460 Datum::fixed(another.clone()),
461 ]),
462 Some("name IN (4, 6)"),
463 )?;
464
465 fixture.assert_projection(
466 &fixture.set_predicate(PredicateOperator::NotIn, vec![
467 Datum::fixed(value.clone()),
468 Datum::fixed(another.clone()),
469 ]),
470 None,
471 )?;
472
473 Ok(())
474 }
475
476 #[test]
477 fn test_projection_bucket_string() -> Result<()> {
478 let value = "abcdefg";
479 let another = "abcdefgabc";
480
481 let fixture = TestProjectionFixture::new(
482 Transform::Bucket(10),
483 "name",
484 NestedField::required(1, "value", Type::Primitive(PrimitiveType::String)),
485 );
486
487 fixture.assert_projection(
488 &fixture.binary_predicate(PredicateOperator::Eq, Datum::string(value)),
489 Some("name = 4"),
490 )?;
491
492 fixture.assert_projection(
493 &fixture.binary_predicate(PredicateOperator::NotEq, Datum::string(value)),
494 None,
495 )?;
496
497 fixture.assert_projection(
498 &fixture.binary_predicate(PredicateOperator::LessThan, Datum::string(value)),
499 None,
500 )?;
501
502 fixture.assert_projection(
503 &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::string(value)),
504 None,
505 )?;
506
507 fixture.assert_projection(
508 &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::string(value)),
509 None,
510 )?;
511
512 fixture.assert_projection(
513 &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::string(value)),
514 None,
515 )?;
516
517 fixture.assert_projection(
518 &fixture.set_predicate(PredicateOperator::In, vec![
519 Datum::string(value),
520 Datum::string(another),
521 ]),
522 Some("name IN (9, 4)"),
523 )?;
524
525 fixture.assert_projection(
526 &fixture.set_predicate(PredicateOperator::NotIn, vec![
527 Datum::string(value),
528 Datum::string(another),
529 ]),
530 None,
531 )?;
532
533 Ok(())
534 }
535
536 #[test]
537 fn test_projection_bucket_decimal() -> Result<()> {
538 let prev = "99.00";
539 let curr = "100.00";
540 let next = "101.00";
541
542 let fixture = TestProjectionFixture::new(
543 Transform::Bucket(10),
544 "name",
545 NestedField::required(
546 1,
547 "value",
548 Type::Primitive(PrimitiveType::Decimal {
549 precision: 9,
550 scale: 2,
551 }),
552 ),
553 );
554
555 fixture.assert_projection(
556 &fixture.binary_predicate(PredicateOperator::Eq, Datum::decimal_from_str(curr)?),
557 Some("name = 2"),
558 )?;
559
560 fixture.assert_projection(
561 &fixture.binary_predicate(PredicateOperator::NotEq, Datum::decimal_from_str(curr)?),
562 None,
563 )?;
564
565 fixture.assert_projection(
566 &fixture.binary_predicate(PredicateOperator::LessThan, Datum::decimal_from_str(curr)?),
567 None,
568 )?;
569
570 fixture.assert_projection(
571 &fixture.binary_predicate(
572 PredicateOperator::LessThanOrEq,
573 Datum::decimal_from_str(curr)?,
574 ),
575 None,
576 )?;
577
578 fixture.assert_projection(
579 &fixture.binary_predicate(
580 PredicateOperator::GreaterThan,
581 Datum::decimal_from_str(curr)?,
582 ),
583 None,
584 )?;
585
586 fixture.assert_projection(
587 &fixture.binary_predicate(
588 PredicateOperator::GreaterThanOrEq,
589 Datum::decimal_from_str(curr)?,
590 ),
591 None,
592 )?;
593
594 fixture.assert_projection(
595 &fixture.set_predicate(PredicateOperator::In, vec![
596 Datum::decimal_from_str(next)?,
597 Datum::decimal_from_str(curr)?,
598 Datum::decimal_from_str(prev)?,
599 ]),
600 Some("name IN (2, 6)"),
601 )?;
602
603 fixture.assert_projection(
604 &fixture.set_predicate(PredicateOperator::NotIn, vec![
605 Datum::decimal_from_str(curr)?,
606 Datum::decimal_from_str(next)?,
607 ]),
608 None,
609 )?;
610
611 Ok(())
612 }
613
614 #[test]
615 fn test_projection_bucket_long() -> Result<()> {
616 let value = 100;
617 let fixture = TestProjectionFixture::new(
618 Transform::Bucket(10),
619 "name",
620 NestedField::required(1, "value", Type::Primitive(PrimitiveType::Long)),
621 );
622
623 fixture.assert_projection(
624 &fixture.binary_predicate(PredicateOperator::Eq, Datum::long(value)),
625 Some("name = 6"),
626 )?;
627
628 fixture.assert_projection(
629 &fixture.binary_predicate(PredicateOperator::NotEq, Datum::long(value)),
630 None,
631 )?;
632
633 fixture.assert_projection(
634 &fixture.binary_predicate(PredicateOperator::LessThan, Datum::long(value)),
635 None,
636 )?;
637
638 fixture.assert_projection(
639 &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::long(value)),
640 None,
641 )?;
642
643 fixture.assert_projection(
644 &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::long(value)),
645 None,
646 )?;
647
648 fixture.assert_projection(
649 &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::long(value)),
650 None,
651 )?;
652
653 fixture.assert_projection(
654 &fixture.set_predicate(PredicateOperator::In, vec![
655 Datum::long(value - 1),
656 Datum::long(value),
657 Datum::long(value + 1),
658 ]),
659 Some("name IN (8, 7, 6)"),
660 )?;
661
662 fixture.assert_projection(
663 &fixture.set_predicate(PredicateOperator::NotIn, vec![
664 Datum::long(value),
665 Datum::long(value + 1),
666 ]),
667 None,
668 )?;
669
670 Ok(())
671 }
672
673 #[test]
674 fn test_projection_bucket_integer() -> Result<()> {
675 let value = 100;
676
677 let fixture = TestProjectionFixture::new(
678 Transform::Bucket(10),
679 "name",
680 NestedField::required(1, "value", Type::Primitive(PrimitiveType::Int)),
681 );
682
683 fixture.assert_projection(
684 &fixture.binary_predicate(PredicateOperator::Eq, Datum::int(value)),
685 Some("name = 6"),
686 )?;
687
688 fixture.assert_projection(
689 &fixture.binary_predicate(PredicateOperator::NotEq, Datum::int(value)),
690 None,
691 )?;
692
693 fixture.assert_projection(
694 &fixture.binary_predicate(PredicateOperator::LessThan, Datum::int(value)),
695 None,
696 )?;
697
698 fixture.assert_projection(
699 &fixture.binary_predicate(PredicateOperator::LessThanOrEq, Datum::int(value)),
700 None,
701 )?;
702
703 fixture.assert_projection(
704 &fixture.binary_predicate(PredicateOperator::GreaterThan, Datum::int(value)),
705 None,
706 )?;
707
708 fixture.assert_projection(
709 &fixture.binary_predicate(PredicateOperator::GreaterThanOrEq, Datum::int(value)),
710 None,
711 )?;
712
713 fixture.assert_projection(
714 &fixture.set_predicate(PredicateOperator::In, vec![
715 Datum::int(value - 1),
716 Datum::int(value),
717 Datum::int(value + 1),
718 ]),
719 Some("name IN (8, 7, 6)"),
720 )?;
721
722 fixture.assert_projection(
723 &fixture.set_predicate(PredicateOperator::NotIn, vec![
724 Datum::int(value),
725 Datum::int(value + 1),
726 ]),
727 None,
728 )?;
729
730 Ok(())
731 }
732
733 #[test]
734 fn test_hash() {
735 assert_eq!(Bucket::hash_int(34), 2017239379);
737 assert_eq!(Bucket::hash_long(34), 2017239379);
739 assert_eq!(Bucket::hash_decimal(1420), -500754589);
741 let date = NaiveDate::from_ymd_opt(2017, 11, 16).unwrap();
743 assert_eq!(
744 Bucket::hash_date(
745 date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap())
746 .num_days() as i32
747 ),
748 -653330422
749 );
750 let time = NaiveTime::from_hms_opt(22, 31, 8).unwrap();
752 assert_eq!(
753 Bucket::hash_time(
754 time.signed_duration_since(NaiveTime::from_hms_opt(0, 0, 0).unwrap())
755 .num_microseconds()
756 .unwrap()
757 ),
758 -662762989
759 );
760 let timestamp =
762 NaiveDateTime::parse_from_str("2017-11-16 22:31:08", "%Y-%m-%d %H:%M:%S").unwrap();
763 assert_eq!(
764 Bucket::hash_timestamp(
765 timestamp
766 .signed_duration_since(
767 NaiveDateTime::parse_from_str("1970-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
768 .unwrap()
769 )
770 .num_microseconds()
771 .unwrap()
772 ),
773 -2047944441
774 );
775 let timestamp = DateTime::parse_from_rfc3339("2017-11-16T14:31:08-08:00").unwrap();
777 assert_eq!(
778 Bucket::hash_timestamp(
779 timestamp
780 .signed_duration_since(
781 DateTime::parse_from_rfc3339("1970-01-01T00:00:00-00:00").unwrap()
782 )
783 .num_microseconds()
784 .unwrap()
785 ),
786 -2047944441
787 );
788 assert_eq!(Bucket::hash_str("iceberg"), 1210000089);
790 assert_eq!(
792 Bucket::hash_bytes(
793 [
794 0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, 0xA4, 0x79, 0x3F, 0x34, 0x9C,
795 0xB7, 0x85, 0xE7
796 ]
797 .as_ref()
798 ),
799 1488055340
800 );
801 assert_eq!(
803 Bucket::hash_bytes([0x00, 0x01, 0x02, 0x03].as_ref()),
804 -188683207
805 );
806 }
807
808 #[test]
809 fn test_hash_decimal_with_negative_value() {
810 assert_eq!(Bucket::hash_decimal(1), -463810133);
812 assert_eq!(Bucket::hash_decimal(-1), -43192051);
813
814 assert_eq!(Bucket::hash_decimal(0), Bucket::hash_decimal(0));
816 assert_eq!(Bucket::hash_decimal(127), Bucket::hash_decimal(127));
817 assert_eq!(Bucket::hash_decimal(-128), Bucket::hash_decimal(-128));
818
819 assert_eq!(Bucket::hash_decimal(128), Bucket::hash_bytes(&[0x00, 0x80]));
823 assert_eq!(
824 Bucket::hash_decimal(-129),
825 Bucket::hash_bytes(&[0xFF, 0x7F])
826 );
827 }
828
829 #[test]
830 fn test_int_literal() {
831 let bucket = Bucket::new(10);
832 assert_eq!(
833 bucket.transform_literal(&Datum::int(34)).unwrap().unwrap(),
834 Datum::int(9)
835 );
836 }
837
838 #[test]
839 fn test_long_literal() {
840 let bucket = Bucket::new(10);
841 assert_eq!(
842 bucket.transform_literal(&Datum::long(34)).unwrap().unwrap(),
843 Datum::int(9)
844 );
845 }
846
847 #[test]
848 fn test_decimal_literal() {
849 let bucket = Bucket::new(10);
850 assert_eq!(
851 bucket
852 .transform_literal(&Datum::decimal(decimal_new(1420, 0)).unwrap())
853 .unwrap()
854 .unwrap(),
855 Datum::int(9)
856 );
857 }
858
859 #[test]
860 fn test_date_literal() {
861 let bucket = Bucket::new(100);
862 assert_eq!(
863 bucket
864 .transform_literal(&Datum::date(17486))
865 .unwrap()
866 .unwrap(),
867 Datum::int(26)
868 );
869 }
870
871 #[test]
872 fn test_time_literal() {
873 let bucket = Bucket::new(100);
874 assert_eq!(
875 bucket
876 .transform_literal(&Datum::time_micros(81068000000).unwrap())
877 .unwrap()
878 .unwrap(),
879 Datum::int(59)
880 );
881 }
882
883 #[test]
884 fn test_timestamp_literal() {
885 let bucket = Bucket::new(100);
886 assert_eq!(
887 bucket
888 .transform_literal(&Datum::timestamp_micros(1510871468000000))
889 .unwrap()
890 .unwrap(),
891 Datum::int(7)
892 );
893 }
894
895 #[test]
896 fn test_str_literal() {
897 let bucket = Bucket::new(100);
898 assert_eq!(
899 bucket
900 .transform_literal(&Datum::string("iceberg"))
901 .unwrap()
902 .unwrap(),
903 Datum::int(89)
904 );
905 }
906
907 #[test]
908 fn test_uuid_literal() {
909 let bucket = Bucket::new(100);
910 assert_eq!(
911 bucket
912 .transform_literal(&Datum::uuid(
913 "F79C3E09-677C-4BBD-A479-3F349CB785E7".parse().unwrap()
914 ))
915 .unwrap()
916 .unwrap(),
917 Datum::int(40)
918 );
919 }
920
921 #[test]
922 fn test_binary_literal() {
923 let bucket = Bucket::new(128);
924 assert_eq!(
925 bucket
926 .transform_literal(&Datum::binary(b"\x00\x01\x02\x03".to_vec()))
927 .unwrap()
928 .unwrap(),
929 Datum::int(57)
930 );
931 }
932
933 #[test]
934 fn test_fixed_literal() {
935 let bucket = Bucket::new(128);
936 assert_eq!(
937 bucket
938 .transform_literal(&Datum::fixed(b"foo".to_vec()))
939 .unwrap()
940 .unwrap(),
941 Datum::int(32)
942 );
943 }
944
945 #[test]
946 fn test_timestamptz_literal() {
947 let bucket = Bucket::new(100);
948 assert_eq!(
949 bucket
950 .transform_literal(&Datum::timestamptz_micros(1510871468000000))
951 .unwrap()
952 .unwrap(),
953 Datum::int(7)
954 );
955 }
956
957 #[test]
958 fn test_timestamp_ns_literal() {
959 let bucket = Bucket::new(100);
960 let ns_value = 1510871468000000i64 * 1000;
961 assert_eq!(
962 bucket
963 .transform_literal(&Datum::timestamp_nanos(ns_value))
964 .unwrap()
965 .unwrap(),
966 Datum::int(7)
967 );
968 }
969
970 #[test]
971 fn test_timestamptz_ns_literal() {
972 let bucket = Bucket::new(100);
973 let ns_value = 1510871468000000i64 * 1000;
974 assert_eq!(
975 bucket
976 .transform_literal(&Datum::timestamptz_nanos(ns_value))
977 .unwrap()
978 .unwrap(),
979 Datum::int(7)
980 );
981 }
982
983 #[test]
984 fn test_transform_timestamp_nanos_and_micros_array_equivalence() {
985 let bucket = Bucket::new(100);
986 let micros_value = 1510871468000000;
987 let nanos_value = micros_value * 1000;
988
989 let micro_array = TimestampMicrosecondArray::from_iter_values(vec![micros_value]);
990 let nano_array = TimestampNanosecondArray::from_iter_values(vec![nanos_value]);
991
992 let transformed_micro: ArrayRef = bucket.transform(Arc::new(micro_array)).unwrap();
993 let transformed_nano: ArrayRef = bucket.transform(Arc::new(nano_array)).unwrap();
994
995 let micro_result = transformed_micro
996 .as_any()
997 .downcast_ref::<Int32Array>()
998 .unwrap();
999 let nano_result = transformed_nano
1000 .as_any()
1001 .downcast_ref::<Int32Array>()
1002 .unwrap();
1003
1004 assert_eq!(micro_result.value(0), nano_result.value(0));
1005 }
1006}