1mod cache;
21use cache::*;
22mod context;
23use context::*;
24mod task;
25
26use std::sync::Arc;
27
28use arrow_array::RecordBatch;
29use futures::channel::mpsc::{Sender, channel};
30use futures::stream::BoxStream;
31use futures::{SinkExt, StreamExt, TryStreamExt};
32pub use task::*;
33
34use crate::arrow::ArrowReaderBuilder;
35pub use crate::arrow::{ScanMetrics, ScanResult};
36use crate::delete_file_index::DeleteFileIndex;
37use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
38use crate::expr::{Bind, BoundPredicate, Predicate};
39use crate::io::FileIO;
40use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
41use crate::runtime::Runtime;
42use crate::spec::{DataContentType, SnapshotRef};
43use crate::table::Table;
44use crate::util::available_parallelism;
45use crate::{Error, ErrorKind, Result};
46
47pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
49
50pub struct TableScanBuilder<'a> {
52 table: &'a Table,
53 column_names: Option<Vec<String>>,
55 snapshot_id: Option<i64>,
56 batch_size: Option<usize>,
57 case_sensitive: bool,
58 filter: Option<Predicate>,
59 concurrency_limit_data_files: usize,
60 concurrency_limit_manifest_entries: usize,
61 concurrency_limit_manifest_files: usize,
62 row_group_filtering_enabled: bool,
63 row_selection_enabled: bool,
64}
65
66impl<'a> TableScanBuilder<'a> {
67 pub(crate) fn new(table: &'a Table) -> Self {
68 let num_cpus = available_parallelism().get();
69
70 Self {
71 table,
72 column_names: None,
73 snapshot_id: None,
74 batch_size: None,
75 case_sensitive: true,
76 filter: None,
77 concurrency_limit_data_files: num_cpus,
78 concurrency_limit_manifest_entries: num_cpus,
79 concurrency_limit_manifest_files: num_cpus,
80 row_group_filtering_enabled: true,
81 row_selection_enabled: false,
82 }
83 }
84
85 pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
88 self.batch_size = batch_size;
89 self
90 }
91
92 pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
94 self.case_sensitive = case_sensitive;
95 self
96 }
97
98 pub fn with_filter(mut self, predicate: Predicate) -> Self {
100 self.filter = Some(predicate.rewrite_not());
103 self
104 }
105
106 pub fn select_all(mut self) -> Self {
108 self.column_names = None;
109 self
110 }
111
112 pub fn select_empty(mut self) -> Self {
114 self.column_names = Some(vec![]);
115 self
116 }
117
118 pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
120 self.column_names = Some(
121 column_names
122 .into_iter()
123 .map(|item| item.to_string())
124 .collect(),
125 );
126 self
127 }
128
129 pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
131 self.snapshot_id = Some(snapshot_id);
132 self
133 }
134
135 pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
138 self.concurrency_limit_manifest_files = limit;
139 self.concurrency_limit_manifest_entries = limit;
140 self.concurrency_limit_data_files = limit;
141 self
142 }
143
144 pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
146 self.concurrency_limit_data_files = limit;
147 self
148 }
149
150 pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
152 self.concurrency_limit_manifest_entries = limit;
153 self
154 }
155
156 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
165 self.row_group_filtering_enabled = row_group_filtering_enabled;
166 self
167 }
168
169 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
184 self.row_selection_enabled = row_selection_enabled;
185 self
186 }
187
188 pub fn build(self) -> Result<TableScan> {
190 let snapshot = match self.snapshot_id {
191 Some(snapshot_id) => self
192 .table
193 .metadata()
194 .snapshot_by_id(snapshot_id)
195 .ok_or_else(|| {
196 Error::new(
197 ErrorKind::DataInvalid,
198 format!("Snapshot with id {snapshot_id} not found"),
199 )
200 })?
201 .clone(),
202 None => {
203 let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
204 return Ok(TableScan {
205 batch_size: self.batch_size,
206 column_names: self.column_names,
207 file_io: self.table.file_io().clone(),
208 plan_context: None,
209 concurrency_limit_data_files: self.concurrency_limit_data_files,
210 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
211 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
212 row_group_filtering_enabled: self.row_group_filtering_enabled,
213 row_selection_enabled: self.row_selection_enabled,
214 runtime: self.table.runtime().clone(),
215 });
216 };
217 current_snapshot_id.clone()
218 }
219 };
220
221 let schema = snapshot.schema(self.table.metadata())?;
222
223 if let Some(column_names) = self.column_names.as_ref() {
225 for column_name in column_names {
226 if is_metadata_column_name(column_name) {
228 continue;
229 }
230 if schema.field_by_name(column_name).is_none() {
231 return Err(Error::new(
232 ErrorKind::DataInvalid,
233 format!("Column {column_name} not found in table. Schema: {schema}"),
234 ));
235 }
236 }
237 }
238
239 let mut field_ids = vec![];
240 let column_names = self.column_names.clone().unwrap_or_else(|| {
241 schema
242 .as_struct()
243 .fields()
244 .iter()
245 .map(|f| f.name.clone())
246 .collect()
247 });
248
249 for column_name in column_names.iter() {
250 if is_metadata_column_name(column_name) {
252 field_ids.push(get_metadata_field_id(column_name)?);
253 continue;
254 }
255
256 let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
257 Error::new(
258 ErrorKind::DataInvalid,
259 format!("Column {column_name} not found in table. Schema: {schema}"),
260 )
261 })?;
262
263 schema
264 .as_struct()
265 .field_by_id(field_id)
266 .ok_or_else(|| {
267 Error::new(
268 ErrorKind::FeatureUnsupported,
269 format!(
270 "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
271 ),
272 )
273 })?;
274
275 field_ids.push(field_id);
276 }
277
278 let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
279 Some(predicates.bind(schema.clone(), true)?)
280 } else {
281 None
282 };
283
284 let plan_context = PlanContext {
285 snapshot,
286 table_metadata: self.table.metadata_ref(),
287 snapshot_schema: schema,
288 case_sensitive: self.case_sensitive,
289 predicate: self.filter.map(Arc::new),
290 snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
291 object_cache: self.table.object_cache(),
292 field_ids: Arc::new(field_ids),
293 partition_filter_cache: Arc::new(PartitionFilterCache::new()),
294 manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
295 expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
296 };
297
298 Ok(TableScan {
299 batch_size: self.batch_size,
300 column_names: self.column_names,
301 file_io: self.table.file_io().clone(),
302 plan_context: Some(plan_context),
303 concurrency_limit_data_files: self.concurrency_limit_data_files,
304 concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
305 concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
306 row_group_filtering_enabled: self.row_group_filtering_enabled,
307 row_selection_enabled: self.row_selection_enabled,
308 runtime: self.table.runtime().clone(),
309 })
310 }
311}
312
313#[derive(Debug)]
315pub struct TableScan {
316 plan_context: Option<PlanContext>,
320 batch_size: Option<usize>,
321 file_io: FileIO,
322 column_names: Option<Vec<String>>,
323 concurrency_limit_manifest_files: usize,
326
327 concurrency_limit_manifest_entries: usize,
330
331 concurrency_limit_data_files: usize,
334
335 row_group_filtering_enabled: bool,
336 row_selection_enabled: bool,
337
338 runtime: Runtime,
339}
340
341impl TableScan {
342 pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
344 let Some(plan_context) = self.plan_context.as_ref() else {
345 return Ok(Box::pin(futures::stream::empty()));
346 };
347
348 let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
349 let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
350
351 let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
353 channel(concurrency_limit_manifest_files);
354 let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
355 channel(concurrency_limit_manifest_files);
356
357 let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
359
360 let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone());
361
362 let manifest_list = plan_context.get_manifest_list().await?;
363
364 let manifest_file_contexts = plan_context.build_manifest_file_contexts(
368 manifest_list,
369 manifest_entry_data_ctx_tx,
370 delete_file_idx.clone(),
371 manifest_entry_delete_ctx_tx,
372 )?;
373
374 let mut channel_for_manifest_error = file_scan_task_tx.clone();
375 let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
376 let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
377
378 let rt = self.runtime.clone();
379
380 rt.io().spawn(async move {
382 let result = futures::stream::iter(manifest_file_contexts)
383 .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
384 ctx.fetch_manifest_and_stream_manifest_entries().await
385 })
386 .await;
387
388 if let Err(error) = result {
389 let _ = channel_for_manifest_error.send(Err(error)).await;
390 }
391 });
392
393 {
395 let rt = rt.clone();
396 let rt_inner = rt.clone();
397 rt.cpu().spawn(async move {
398 let result = manifest_entry_delete_ctx_rx
399 .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
400 .try_for_each_concurrent(
401 concurrency_limit_manifest_entries,
402 |(manifest_entry_context, tx)| {
403 let rt_inner = rt_inner.clone();
404 async move {
405 rt_inner
406 .cpu()
407 .spawn(async move {
408 Self::process_delete_manifest_entry(
409 manifest_entry_context,
410 tx,
411 )
412 .await
413 })
414 .await?
415 }
416 },
417 )
418 .await;
419
420 if let Err(error) = result {
421 let _ = channel_for_delete_manifest_entry_error
422 .send(Err(error))
423 .await;
424 }
425 });
426 }
427
428 {
430 let rt_inner = rt.clone();
431 rt.cpu().spawn(async move {
432 let result = manifest_entry_data_ctx_rx
433 .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
434 .try_for_each_concurrent(
435 concurrency_limit_manifest_entries,
436 |(manifest_entry_context, tx)| {
437 let rt_inner = rt_inner.clone();
438 async move {
439 rt_inner
440 .cpu()
441 .spawn(async move {
442 Self::process_data_manifest_entry(
443 manifest_entry_context,
444 tx,
445 )
446 .await
447 })
448 .await?
449 }
450 },
451 )
452 .await;
453
454 if let Err(error) = result {
455 let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
456 }
457 });
458 }
459
460 Ok(file_scan_task_rx.boxed())
461 }
462
463 pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
465 let mut arrow_reader_builder =
466 ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
467 .with_data_file_concurrency_limit(self.concurrency_limit_data_files)
468 .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
469 .with_row_selection_enabled(self.row_selection_enabled);
470
471 if let Some(batch_size) = self.batch_size {
472 arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
473 }
474
475 arrow_reader_builder
476 .build()
477 .read(self.plan_files().await?)
478 .map(|result| result.stream())
479 }
480
481 pub fn column_names(&self) -> Option<&[String]> {
483 self.column_names.as_deref()
484 }
485
486 pub fn snapshot(&self) -> Option<&SnapshotRef> {
488 self.plan_context.as_ref().map(|x| &x.snapshot)
489 }
490
491 async fn process_data_manifest_entry(
492 manifest_entry_context: ManifestEntryContext,
493 mut file_scan_task_tx: Sender<Result<FileScanTask>>,
494 ) -> Result<()> {
495 if !manifest_entry_context.manifest_entry.is_alive() {
497 return Ok(());
498 }
499
500 if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
502 return Err(Error::new(
503 ErrorKind::FeatureUnsupported,
504 "Encountered an entry for a delete file in a data file manifest",
505 ));
506 }
507
508 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
509 let BoundPredicates {
510 snapshot_bound_predicate,
511 partition_bound_predicate,
512 } = bound_predicates.as_ref();
513
514 let expression_evaluator_cache =
515 manifest_entry_context.expression_evaluator_cache.as_ref();
516
517 let expression_evaluator = expression_evaluator_cache.get(
518 manifest_entry_context.partition_spec_id,
519 partition_bound_predicate,
520 )?;
521
522 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
525 return Ok(());
526 }
527
528 if !InclusiveMetricsEvaluator::eval(
530 snapshot_bound_predicate,
531 manifest_entry_context.manifest_entry.data_file(),
532 false,
533 )? {
534 return Ok(());
535 }
536 }
537
538 file_scan_task_tx
542 .send(Ok(manifest_entry_context.into_file_scan_task().await?))
543 .await?;
544
545 Ok(())
546 }
547
548 async fn process_delete_manifest_entry(
549 manifest_entry_context: ManifestEntryContext,
550 mut delete_file_ctx_tx: Sender<DeleteFileContext>,
551 ) -> Result<()> {
552 if !manifest_entry_context.manifest_entry.is_alive() {
554 return Ok(());
555 }
556
557 if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
559 return Err(Error::new(
560 ErrorKind::FeatureUnsupported,
561 "Encountered an entry for a data file in a delete manifest",
562 ));
563 }
564
565 if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
566 let expression_evaluator_cache =
567 manifest_entry_context.expression_evaluator_cache.as_ref();
568
569 let expression_evaluator = expression_evaluator_cache.get(
570 manifest_entry_context.partition_spec_id,
571 &bound_predicates.partition_bound_predicate,
572 )?;
573
574 if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
577 return Ok(());
578 }
579 }
580
581 delete_file_ctx_tx
582 .send(DeleteFileContext {
583 manifest_entry: manifest_entry_context.manifest_entry.clone(),
584 partition_spec_id: manifest_entry_context.partition_spec_id,
585 })
586 .await?;
587
588 Ok(())
589 }
590}
591
592pub(crate) struct BoundPredicates {
593 partition_bound_predicate: BoundPredicate,
594 snapshot_bound_predicate: BoundPredicate,
595}
596
597#[cfg(test)]
598pub mod tests {
599 #![allow(missing_docs)]
601
602 use std::collections::HashMap;
603 use std::fs;
604 use std::fs::File;
605 use std::sync::Arc;
606
607 use arrow_array::cast::AsArray;
608 use arrow_array::{
609 Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
610 StringArray,
611 };
612 use futures::{TryStreamExt, stream};
613 use minijinja::value::Value;
614 use minijinja::{AutoEscape, Environment, context};
615 use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
616 use parquet::basic::Compression;
617 use parquet::file::properties::WriterProperties;
618 use tempfile::TempDir;
619 use uuid::Uuid;
620
621 use crate::TableIdent;
622 use crate::arrow::ArrowReaderBuilder;
623 use crate::expr::{BoundPredicate, Reference};
624 use crate::io::{FileIO, OutputFile};
625 use crate::metadata_columns::RESERVED_COL_NAME_FILE;
626 use crate::scan::FileScanTask;
627 use crate::spec::{
628 DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
629 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
630 PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
631 };
632 use crate::table::Table;
633 use crate::test_utils::test_runtime;
634
635 fn render_template(template: &str, ctx: Value) -> String {
636 let mut env = Environment::new();
637 env.set_auto_escape_callback(|_| AutoEscape::None);
638 env.render_str(template, ctx).unwrap()
639 }
640
641 pub struct TableTestFixture {
642 pub table_location: String,
643 pub table: Table,
644 }
645
646 impl TableTestFixture {
647 #[allow(clippy::new_without_default)]
648 pub fn new() -> Self {
649 let tmp_dir = TempDir::new().unwrap();
650 let table_location = tmp_dir.path().join("table1");
651 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
652 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
653 let table_metadata1_location = table_location.join("metadata/v1.json");
654
655 let file_io = FileIO::new_with_fs();
656
657 let table_metadata = {
658 let template_json_str = fs::read_to_string(format!(
659 "{}/testdata/example_table_metadata_v2.json",
660 env!("CARGO_MANIFEST_DIR")
661 ))
662 .unwrap();
663 let metadata_json = render_template(&template_json_str, context! {
664 table_location => &table_location,
665 manifest_list_1_location => &manifest_list1_location,
666 manifest_list_2_location => &manifest_list2_location,
667 table_metadata_1_location => &table_metadata1_location,
668 });
669 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
670 };
671
672 let table = Table::builder()
673 .metadata(table_metadata)
674 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
675 .file_io(file_io.clone())
676 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
677 .runtime(test_runtime())
678 .build()
679 .unwrap();
680
681 Self {
682 table_location: table_location.to_str().unwrap().to_string(),
683 table,
684 }
685 }
686
687 #[allow(clippy::new_without_default)]
688 pub fn new_empty() -> Self {
689 let tmp_dir = TempDir::new().unwrap();
690 let table_location = tmp_dir.path().join("table1");
691 let table_metadata1_location = table_location.join("metadata/v1.json");
692
693 let file_io = FileIO::new_with_fs();
694
695 let table_metadata = {
696 let template_json_str = fs::read_to_string(format!(
697 "{}/testdata/example_empty_table_metadata_v2.json",
698 env!("CARGO_MANIFEST_DIR")
699 ))
700 .unwrap();
701 let metadata_json = render_template(&template_json_str, context! {
702 table_location => &table_location,
703 table_metadata_1_location => &table_metadata1_location,
704 });
705 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
706 };
707
708 let table = Table::builder()
709 .metadata(table_metadata)
710 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
711 .file_io(file_io.clone())
712 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
713 .runtime(test_runtime())
714 .build()
715 .unwrap();
716
717 Self {
718 table_location: table_location.to_str().unwrap().to_string(),
719 table,
720 }
721 }
722
723 pub fn new_with_deep_history() -> Self {
727 let tmp_dir = TempDir::new().unwrap();
728 let table_location = tmp_dir.path().join("table1");
729 let table_metadata1_location = table_location.join("metadata/v1.json");
730
731 let file_io = FileIO::new_with_fs();
732
733 let table_metadata = {
734 let json_str = fs::read_to_string(format!(
735 "{}/testdata/example_table_metadata_v2_deep_history.json",
736 env!("CARGO_MANIFEST_DIR")
737 ))
738 .unwrap();
739 serde_json::from_str::<TableMetadata>(&json_str).unwrap()
740 };
741
742 let table = Table::builder()
743 .metadata(table_metadata)
744 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
745 .file_io(file_io.clone())
746 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
747 .runtime(test_runtime())
748 .build()
749 .unwrap();
750
751 Self {
752 table_location: table_location.to_str().unwrap().to_string(),
753 table,
754 }
755 }
756
757 pub fn new_unpartitioned() -> Self {
758 let tmp_dir = TempDir::new().unwrap();
759 let table_location = tmp_dir.path().join("table1");
760 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
761 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
762 let table_metadata1_location = table_location.join("metadata/v1.json");
763
764 let file_io = FileIO::new_with_fs();
765
766 let mut table_metadata = {
767 let template_json_str = fs::read_to_string(format!(
768 "{}/testdata/example_table_metadata_v2.json",
769 env!("CARGO_MANIFEST_DIR")
770 ))
771 .unwrap();
772 let metadata_json = render_template(&template_json_str, context! {
773 table_location => &table_location,
774 manifest_list_1_location => &manifest_list1_location,
775 manifest_list_2_location => &manifest_list2_location,
776 table_metadata_1_location => &table_metadata1_location,
777 });
778 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
779 };
780
781 table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
782 table_metadata.partition_specs.clear();
783 table_metadata.default_partition_type = StructType::new(vec![]);
784 table_metadata
785 .partition_specs
786 .insert(0, table_metadata.default_spec.clone());
787
788 let table = Table::builder()
789 .metadata(table_metadata)
790 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
791 .file_io(file_io.clone())
792 .metadata_location(table_metadata1_location.to_str().unwrap())
793 .runtime(test_runtime())
794 .build()
795 .unwrap();
796
797 Self {
798 table_location: table_location.to_str().unwrap().to_string(),
799 table,
800 }
801 }
802
803 fn next_manifest_file(&self) -> OutputFile {
804 self.table
805 .file_io()
806 .new_output(format!(
807 "{}/metadata/manifest_{}.avro",
808 self.table_location,
809 Uuid::new_v4()
810 ))
811 .unwrap()
812 }
813
814 pub async fn setup_manifest_files(&mut self) {
815 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
816 let parent_snapshot = current_snapshot
817 .parent_snapshot(self.table.metadata())
818 .unwrap();
819 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
820 let current_partition_spec = self.table.metadata().default_partition_spec();
821
822 let parquet_file_size = self.write_parquet_data_files();
824
825 let mut writer = ManifestWriterBuilder::new(
826 self.next_manifest_file(),
827 Some(current_snapshot.snapshot_id()),
828 None,
829 current_schema.clone(),
830 current_partition_spec.as_ref().clone(),
831 )
832 .build_v2_data();
833 writer
834 .add_entry(
835 ManifestEntry::builder()
836 .status(ManifestStatus::Added)
837 .data_file(
838 DataFileBuilder::default()
839 .partition_spec_id(0)
840 .content(DataContentType::Data)
841 .file_path(format!("{}/1.parquet", &self.table_location))
842 .file_format(DataFileFormat::Parquet)
843 .file_size_in_bytes(parquet_file_size)
844 .record_count(1)
845 .partition(Struct::from_iter([Some(Literal::long(100))]))
846 .key_metadata(None)
847 .build()
848 .unwrap(),
849 )
850 .build(),
851 )
852 .unwrap();
853 writer
854 .add_delete_entry(
855 ManifestEntry::builder()
856 .status(ManifestStatus::Deleted)
857 .snapshot_id(parent_snapshot.snapshot_id())
858 .sequence_number(parent_snapshot.sequence_number())
859 .file_sequence_number(parent_snapshot.sequence_number())
860 .data_file(
861 DataFileBuilder::default()
862 .partition_spec_id(0)
863 .content(DataContentType::Data)
864 .file_path(format!("{}/2.parquet", &self.table_location))
865 .file_format(DataFileFormat::Parquet)
866 .file_size_in_bytes(parquet_file_size)
867 .record_count(1)
868 .partition(Struct::from_iter([Some(Literal::long(200))]))
869 .build()
870 .unwrap(),
871 )
872 .build(),
873 )
874 .unwrap();
875 writer
876 .add_existing_entry(
877 ManifestEntry::builder()
878 .status(ManifestStatus::Existing)
879 .snapshot_id(parent_snapshot.snapshot_id())
880 .sequence_number(parent_snapshot.sequence_number())
881 .file_sequence_number(parent_snapshot.sequence_number())
882 .data_file(
883 DataFileBuilder::default()
884 .partition_spec_id(0)
885 .content(DataContentType::Data)
886 .file_path(format!("{}/3.parquet", &self.table_location))
887 .file_format(DataFileFormat::Parquet)
888 .file_size_in_bytes(parquet_file_size)
889 .record_count(1)
890 .partition(Struct::from_iter([Some(Literal::long(300))]))
891 .build()
892 .unwrap(),
893 )
894 .build(),
895 )
896 .unwrap();
897 let data_file_manifest = writer.write_manifest_file().await.unwrap();
898
899 let manifest_list_writer = self
901 .table
902 .file_io()
903 .new_output(current_snapshot.manifest_list())
904 .unwrap()
905 .writer()
906 .await
907 .unwrap();
908 let mut manifest_list_write = ManifestListWriter::v2(
909 manifest_list_writer,
910 current_snapshot.snapshot_id(),
911 current_snapshot.parent_snapshot_id(),
912 current_snapshot.sequence_number(),
913 );
914 manifest_list_write
915 .add_manifests(vec![data_file_manifest].into_iter())
916 .unwrap();
917 manifest_list_write.close().await.unwrap();
918 }
919
920 fn write_parquet_data_files(&self) -> u64 {
923 std::fs::create_dir_all(&self.table_location).unwrap();
924
925 let schema = {
926 let fields = vec![
927 arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
928 .with_metadata(HashMap::from([(
929 PARQUET_FIELD_ID_META_KEY.to_string(),
930 "1".to_string(),
931 )])),
932 arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
933 .with_metadata(HashMap::from([(
934 PARQUET_FIELD_ID_META_KEY.to_string(),
935 "2".to_string(),
936 )])),
937 arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
938 .with_metadata(HashMap::from([(
939 PARQUET_FIELD_ID_META_KEY.to_string(),
940 "3".to_string(),
941 )])),
942 arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
943 .with_metadata(HashMap::from([(
944 PARQUET_FIELD_ID_META_KEY.to_string(),
945 "4".to_string(),
946 )])),
947 arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
948 .with_metadata(HashMap::from([(
949 PARQUET_FIELD_ID_META_KEY.to_string(),
950 "5".to_string(),
951 )])),
952 arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
953 .with_metadata(HashMap::from([(
954 PARQUET_FIELD_ID_META_KEY.to_string(),
955 "6".to_string(),
956 )])),
957 arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
958 .with_metadata(HashMap::from([(
959 PARQUET_FIELD_ID_META_KEY.to_string(),
960 "7".to_string(),
961 )])),
962 arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
963 .with_metadata(HashMap::from([(
964 PARQUET_FIELD_ID_META_KEY.to_string(),
965 "8".to_string(),
966 )])),
967 ];
968 Arc::new(arrow_schema::Schema::new(fields))
969 };
970 let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
972
973 let mut values = vec![2; 512];
974 values.append(vec![3; 200].as_mut());
975 values.append(vec![4; 300].as_mut());
976 values.append(vec![5; 12].as_mut());
977
978 let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
980
981 let mut values = vec![3; 512];
982 values.append(vec![4; 512].as_mut());
983
984 let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
986
987 let mut values = vec!["Apache"; 512];
989 values.append(vec!["Iceberg"; 512].as_mut());
990 let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
991
992 let mut values = vec![100.0f64; 512];
994 values.append(vec![150.0f64; 12].as_mut());
995 values.append(vec![200.0f64; 500].as_mut());
996 let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
997
998 let mut values = vec![100i32; 512];
1000 values.append(vec![150i32; 12].as_mut());
1001 values.append(vec![200i32; 500].as_mut());
1002 let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
1003
1004 let mut values = vec![100i64; 512];
1006 values.append(vec![150i64; 12].as_mut());
1007 values.append(vec![200i64; 500].as_mut());
1008 let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1009
1010 let mut values = vec![false; 512];
1012 values.append(vec![true; 512].as_mut());
1013 let values: BooleanArray = values.into();
1014 let col8 = Arc::new(values) as ArrayRef;
1015
1016 let to_write = RecordBatch::try_new(schema.clone(), vec![
1017 col1, col2, col3, col4, col5, col6, col7, col8,
1018 ])
1019 .unwrap();
1020
1021 let props = WriterProperties::builder()
1023 .set_compression(Compression::SNAPPY)
1024 .build();
1025
1026 for n in 1..=3 {
1027 let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
1028 let mut writer =
1029 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1030
1031 writer.write(&to_write).expect("Writing batch");
1032
1033 writer.close().unwrap();
1035 }
1036
1037 std::fs::metadata(format!("{}/1.parquet", &self.table_location))
1038 .unwrap()
1039 .len()
1040 }
1041
1042 pub async fn setup_unpartitioned_manifest_files(&mut self) {
1043 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1044 let parent_snapshot = current_snapshot
1045 .parent_snapshot(self.table.metadata())
1046 .unwrap();
1047 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1048 let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
1049
1050 let parquet_file_size = self.write_parquet_data_files();
1052
1053 let mut writer = ManifestWriterBuilder::new(
1055 self.next_manifest_file(),
1056 Some(current_snapshot.snapshot_id()),
1057 None,
1058 current_schema.clone(),
1059 current_partition_spec.as_ref().clone(),
1060 )
1061 .build_v2_data();
1062
1063 let empty_partition = Struct::empty();
1065
1066 writer
1067 .add_entry(
1068 ManifestEntry::builder()
1069 .status(ManifestStatus::Added)
1070 .data_file(
1071 DataFileBuilder::default()
1072 .partition_spec_id(0)
1073 .content(DataContentType::Data)
1074 .file_path(format!("{}/1.parquet", &self.table_location))
1075 .file_format(DataFileFormat::Parquet)
1076 .file_size_in_bytes(parquet_file_size)
1077 .record_count(1)
1078 .partition(empty_partition.clone())
1079 .key_metadata(None)
1080 .build()
1081 .unwrap(),
1082 )
1083 .build(),
1084 )
1085 .unwrap();
1086
1087 writer
1088 .add_delete_entry(
1089 ManifestEntry::builder()
1090 .status(ManifestStatus::Deleted)
1091 .snapshot_id(parent_snapshot.snapshot_id())
1092 .sequence_number(parent_snapshot.sequence_number())
1093 .file_sequence_number(parent_snapshot.sequence_number())
1094 .data_file(
1095 DataFileBuilder::default()
1096 .partition_spec_id(0)
1097 .content(DataContentType::Data)
1098 .file_path(format!("{}/2.parquet", &self.table_location))
1099 .file_format(DataFileFormat::Parquet)
1100 .file_size_in_bytes(parquet_file_size)
1101 .record_count(1)
1102 .partition(empty_partition.clone())
1103 .build()
1104 .unwrap(),
1105 )
1106 .build(),
1107 )
1108 .unwrap();
1109
1110 writer
1111 .add_existing_entry(
1112 ManifestEntry::builder()
1113 .status(ManifestStatus::Existing)
1114 .snapshot_id(parent_snapshot.snapshot_id())
1115 .sequence_number(parent_snapshot.sequence_number())
1116 .file_sequence_number(parent_snapshot.sequence_number())
1117 .data_file(
1118 DataFileBuilder::default()
1119 .partition_spec_id(0)
1120 .content(DataContentType::Data)
1121 .file_path(format!("{}/3.parquet", &self.table_location))
1122 .file_format(DataFileFormat::Parquet)
1123 .file_size_in_bytes(parquet_file_size)
1124 .record_count(1)
1125 .partition(empty_partition.clone())
1126 .build()
1127 .unwrap(),
1128 )
1129 .build(),
1130 )
1131 .unwrap();
1132
1133 let data_file_manifest = writer.write_manifest_file().await.unwrap();
1134
1135 let manifest_list_writer = self
1137 .table
1138 .file_io()
1139 .new_output(current_snapshot.manifest_list())
1140 .unwrap()
1141 .writer()
1142 .await
1143 .unwrap();
1144 let mut manifest_list_write = ManifestListWriter::v2(
1145 manifest_list_writer,
1146 current_snapshot.snapshot_id(),
1147 current_snapshot.parent_snapshot_id(),
1148 current_snapshot.sequence_number(),
1149 );
1150 manifest_list_write
1151 .add_manifests(vec![data_file_manifest].into_iter())
1152 .unwrap();
1153 manifest_list_write.close().await.unwrap();
1154 }
1155
1156 pub async fn setup_deadlock_manifests(&mut self) {
1157 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
1158 let _parent_snapshot = current_snapshot
1159 .parent_snapshot(self.table.metadata())
1160 .unwrap();
1161 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
1162 let current_partition_spec = self.table.metadata().default_partition_spec();
1163
1164 let mut writer = ManifestWriterBuilder::new(
1166 self.next_manifest_file(),
1167 Some(current_snapshot.snapshot_id()),
1168 None,
1169 current_schema.clone(),
1170 current_partition_spec.as_ref().clone(),
1171 )
1172 .build_v2_data();
1173
1174 for i in 0..10 {
1176 writer
1177 .add_entry(
1178 ManifestEntry::builder()
1179 .status(ManifestStatus::Added)
1180 .data_file(
1181 DataFileBuilder::default()
1182 .partition_spec_id(0)
1183 .content(DataContentType::Data)
1184 .file_path(format!("{}/{}.parquet", &self.table_location, i))
1185 .file_format(DataFileFormat::Parquet)
1186 .file_size_in_bytes(100)
1187 .record_count(1)
1188 .partition(Struct::from_iter([Some(Literal::long(100))]))
1189 .key_metadata(None)
1190 .build()
1191 .unwrap(),
1192 )
1193 .build(),
1194 )
1195 .unwrap();
1196 }
1197 let data_manifest = writer.write_manifest_file().await.unwrap();
1198
1199 let mut writer = ManifestWriterBuilder::new(
1201 self.next_manifest_file(),
1202 Some(current_snapshot.snapshot_id()),
1203 None,
1204 current_schema.clone(),
1205 current_partition_spec.as_ref().clone(),
1206 )
1207 .build_v2_deletes();
1208
1209 writer
1210 .add_entry(
1211 ManifestEntry::builder()
1212 .status(ManifestStatus::Added)
1213 .data_file(
1214 DataFileBuilder::default()
1215 .partition_spec_id(0)
1216 .content(DataContentType::PositionDeletes)
1217 .file_path(format!("{}/del.parquet", &self.table_location))
1218 .file_format(DataFileFormat::Parquet)
1219 .file_size_in_bytes(100)
1220 .record_count(1)
1221 .partition(Struct::from_iter([Some(Literal::long(100))]))
1222 .build()
1223 .unwrap(),
1224 )
1225 .build(),
1226 )
1227 .unwrap();
1228 let delete_manifest = writer.write_manifest_file().await.unwrap();
1229
1230 let manifest_list_writer = self
1233 .table
1234 .file_io()
1235 .new_output(current_snapshot.manifest_list())
1236 .unwrap()
1237 .writer()
1238 .await
1239 .unwrap();
1240 let mut manifest_list_write = ManifestListWriter::v2(
1241 manifest_list_writer,
1242 current_snapshot.snapshot_id(),
1243 current_snapshot.parent_snapshot_id(),
1244 current_snapshot.sequence_number(),
1245 );
1246 manifest_list_write
1247 .add_manifests(vec![data_manifest, delete_manifest].into_iter())
1248 .unwrap();
1249 manifest_list_write.close().await.unwrap();
1250 }
1251 }
1252
1253 #[tokio::test]
1254 async fn test_table_scan_columns() {
1255 let table = TableTestFixture::new().table;
1256
1257 let table_scan = table.scan().select(["x", "y"]).build().unwrap();
1258 assert_eq!(
1259 Some(vec!["x".to_string(), "y".to_string()]),
1260 table_scan.column_names
1261 );
1262
1263 let table_scan = table
1264 .scan()
1265 .select(["x", "y"])
1266 .select(["z"])
1267 .build()
1268 .unwrap();
1269 assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
1270 }
1271
1272 #[tokio::test]
1273 async fn test_select_all() {
1274 let table = TableTestFixture::new().table;
1275
1276 let table_scan = table.scan().select_all().build().unwrap();
1277 assert!(table_scan.column_names.is_none());
1278 }
1279
1280 #[test]
1281 fn test_select_no_exist_column() {
1282 let table = TableTestFixture::new().table;
1283
1284 let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
1285 assert!(table_scan.is_err());
1286 }
1287
1288 #[tokio::test]
1289 async fn test_table_scan_default_snapshot_id() {
1290 let table = TableTestFixture::new().table;
1291
1292 let table_scan = table.scan().build().unwrap();
1293 assert_eq!(
1294 table.metadata().current_snapshot().unwrap().snapshot_id(),
1295 table_scan.snapshot().unwrap().snapshot_id()
1296 );
1297 }
1298
1299 #[test]
1300 fn test_table_scan_non_exist_snapshot_id() {
1301 let table = TableTestFixture::new().table;
1302
1303 let table_scan = table.scan().snapshot_id(1024).build();
1304 assert!(table_scan.is_err());
1305 }
1306
1307 #[tokio::test]
1308 async fn test_table_scan_with_snapshot_id() {
1309 let table = TableTestFixture::new().table;
1310
1311 let table_scan = table
1312 .scan()
1313 .snapshot_id(3051729675574597004)
1314 .with_row_selection_enabled(true)
1315 .build()
1316 .unwrap();
1317 assert_eq!(
1318 table_scan.snapshot().unwrap().snapshot_id(),
1319 3051729675574597004
1320 );
1321 }
1322
1323 #[tokio::test]
1324 async fn test_plan_files_on_table_without_any_snapshots() {
1325 let table = TableTestFixture::new_empty().table;
1326 let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1327 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1328 assert!(batches.is_empty());
1329 }
1330
1331 #[tokio::test]
1332 async fn test_plan_files_no_deletions() {
1333 let mut fixture = TableTestFixture::new();
1334 fixture.setup_manifest_files().await;
1335
1336 let table_scan = fixture
1338 .table
1339 .scan()
1340 .with_row_selection_enabled(true)
1341 .build()
1342 .unwrap();
1343
1344 let mut tasks = table_scan
1345 .plan_files()
1346 .await
1347 .unwrap()
1348 .try_fold(vec![], |mut acc, task| async move {
1349 acc.push(task);
1350 Ok(acc)
1351 })
1352 .await
1353 .unwrap();
1354
1355 assert_eq!(tasks.len(), 2);
1356
1357 tasks.sort_by_key(|t| t.data_file_path.to_string());
1358
1359 assert_eq!(
1361 tasks[0].data_file_path,
1362 format!("{}/1.parquet", &fixture.table_location)
1363 );
1364
1365 assert_eq!(
1367 tasks[1].data_file_path,
1368 format!("{}/3.parquet", &fixture.table_location)
1369 );
1370 }
1371
1372 #[tokio::test]
1373 async fn test_open_parquet_no_deletions() {
1374 let mut fixture = TableTestFixture::new();
1375 fixture.setup_manifest_files().await;
1376
1377 let table_scan = fixture
1379 .table
1380 .scan()
1381 .with_row_selection_enabled(true)
1382 .build()
1383 .unwrap();
1384
1385 let batch_stream = table_scan.to_arrow().await.unwrap();
1386
1387 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1388
1389 let col = batches[0].column_by_name("x").unwrap();
1390
1391 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1392 assert_eq!(int64_arr.value(0), 1);
1393 }
1394
1395 #[tokio::test]
1396 async fn test_open_parquet_no_deletions_by_separate_reader() {
1397 let mut fixture = TableTestFixture::new();
1398 fixture.setup_manifest_files().await;
1399
1400 let table_scan = fixture
1402 .table
1403 .scan()
1404 .with_row_selection_enabled(true)
1405 .build()
1406 .unwrap();
1407
1408 let mut plan_task: Vec<_> = table_scan
1409 .plan_files()
1410 .await
1411 .unwrap()
1412 .try_collect()
1413 .await
1414 .unwrap();
1415 assert_eq!(plan_task.len(), 2);
1416
1417 let reader = ArrowReaderBuilder::new(
1418 fixture.table.file_io().clone(),
1419 fixture.table.runtime().clone(),
1420 )
1421 .build();
1422 let batch_stream = reader
1423 .clone()
1424 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1425 .unwrap()
1426 .stream();
1427 let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
1428
1429 let reader = ArrowReaderBuilder::new(
1430 fixture.table.file_io().clone(),
1431 fixture.table.runtime().clone(),
1432 )
1433 .build();
1434 let batch_stream = reader
1435 .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
1436 .unwrap()
1437 .stream();
1438 let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
1439
1440 assert_eq!(batch_1, batch_2);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_open_parquet_with_projection() {
1445 let mut fixture = TableTestFixture::new();
1446 fixture.setup_manifest_files().await;
1447
1448 let table_scan = fixture
1450 .table
1451 .scan()
1452 .select(["x", "z"])
1453 .with_row_selection_enabled(true)
1454 .build()
1455 .unwrap();
1456
1457 let batch_stream = table_scan.to_arrow().await.unwrap();
1458
1459 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1460
1461 assert_eq!(batches[0].num_columns(), 2);
1462
1463 let col1 = batches[0].column_by_name("x").unwrap();
1464 let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
1465 assert_eq!(int64_arr.value(0), 1);
1466
1467 let col2 = batches[0].column_by_name("z").unwrap();
1468 let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
1469 assert_eq!(int64_arr.value(0), 3);
1470
1471 let table_scan = fixture.table.scan().select_empty().build().unwrap();
1473 let batch_stream = table_scan.to_arrow().await.unwrap();
1474 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1475
1476 assert_eq!(batches[0].num_columns(), 0);
1477 assert_eq!(batches[0].num_rows(), 1024);
1478 }
1479
1480 #[tokio::test]
1481 async fn test_filter_on_arrow_lt() {
1482 let mut fixture = TableTestFixture::new();
1483 fixture.setup_manifest_files().await;
1484
1485 let mut builder = fixture.table.scan();
1487 let predicate = Reference::new("y").less_than(Datum::long(3));
1488 builder = builder
1489 .with_filter(predicate)
1490 .with_row_selection_enabled(true);
1491 let table_scan = builder.build().unwrap();
1492
1493 let batch_stream = table_scan.to_arrow().await.unwrap();
1494
1495 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1496
1497 assert_eq!(batches[0].num_rows(), 512);
1498
1499 let col = batches[0].column_by_name("x").unwrap();
1500 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1501 assert_eq!(int64_arr.value(0), 1);
1502
1503 let col = batches[0].column_by_name("y").unwrap();
1504 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1505 assert_eq!(int64_arr.value(0), 2);
1506 }
1507
1508 #[tokio::test]
1509 async fn test_filter_on_arrow_gt_eq() {
1510 let mut fixture = TableTestFixture::new();
1511 fixture.setup_manifest_files().await;
1512
1513 let mut builder = fixture.table.scan();
1515 let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
1516 builder = builder
1517 .with_filter(predicate)
1518 .with_row_selection_enabled(true);
1519 let table_scan = builder.build().unwrap();
1520
1521 let batch_stream = table_scan.to_arrow().await.unwrap();
1522
1523 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1524
1525 assert_eq!(batches[0].num_rows(), 12);
1526
1527 let col = batches[0].column_by_name("x").unwrap();
1528 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1529 assert_eq!(int64_arr.value(0), 1);
1530
1531 let col = batches[0].column_by_name("y").unwrap();
1532 let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1533 assert_eq!(int64_arr.value(0), 5);
1534 }
1535
1536 #[tokio::test]
1537 async fn test_filter_double_eq() {
1538 let mut fixture = TableTestFixture::new();
1539 fixture.setup_manifest_files().await;
1540
1541 let mut builder = fixture.table.scan();
1543 let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
1544 builder = builder
1545 .with_filter(predicate)
1546 .with_row_selection_enabled(true);
1547 let table_scan = builder.build().unwrap();
1548
1549 let batch_stream = table_scan.to_arrow().await.unwrap();
1550
1551 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1552
1553 assert_eq!(batches.len(), 2);
1554 assert_eq!(batches[0].num_rows(), 12);
1555
1556 let col = batches[0].column_by_name("dbl").unwrap();
1557 let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
1558 assert_eq!(f64_arr.value(1), 150.0f64);
1559 }
1560
1561 #[tokio::test]
1562 async fn test_filter_int_eq() {
1563 let mut fixture = TableTestFixture::new();
1564 fixture.setup_manifest_files().await;
1565
1566 let mut builder = fixture.table.scan();
1568 let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
1569 builder = builder
1570 .with_filter(predicate)
1571 .with_row_selection_enabled(true);
1572 let table_scan = builder.build().unwrap();
1573
1574 let batch_stream = table_scan.to_arrow().await.unwrap();
1575
1576 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1577
1578 assert_eq!(batches.len(), 2);
1579 assert_eq!(batches[0].num_rows(), 12);
1580
1581 let col = batches[0].column_by_name("i32").unwrap();
1582 let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
1583 assert_eq!(i32_arr.value(1), 150i32);
1584 }
1585
1586 #[tokio::test]
1587 async fn test_filter_long_eq() {
1588 let mut fixture = TableTestFixture::new();
1589 fixture.setup_manifest_files().await;
1590
1591 let mut builder = fixture.table.scan();
1593 let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
1594 builder = builder
1595 .with_filter(predicate)
1596 .with_row_selection_enabled(true);
1597 let table_scan = builder.build().unwrap();
1598
1599 let batch_stream = table_scan.to_arrow().await.unwrap();
1600
1601 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1602
1603 assert_eq!(batches.len(), 2);
1604 assert_eq!(batches[0].num_rows(), 12);
1605
1606 let col = batches[0].column_by_name("i64").unwrap();
1607 let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1608 assert_eq!(i64_arr.value(1), 150i64);
1609 }
1610
1611 #[tokio::test]
1612 async fn test_filter_bool_eq() {
1613 let mut fixture = TableTestFixture::new();
1614 fixture.setup_manifest_files().await;
1615
1616 let mut builder = fixture.table.scan();
1618 let predicate = Reference::new("bool").equal_to(Datum::bool(true));
1619 builder = builder
1620 .with_filter(predicate)
1621 .with_row_selection_enabled(true);
1622 let table_scan = builder.build().unwrap();
1623
1624 let batch_stream = table_scan.to_arrow().await.unwrap();
1625
1626 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1627
1628 assert_eq!(batches.len(), 2);
1629 assert_eq!(batches[0].num_rows(), 512);
1630
1631 let col = batches[0].column_by_name("bool").unwrap();
1632 let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
1633 assert!(bool_arr.value(1));
1634 }
1635
1636 #[tokio::test]
1637 async fn test_filter_on_arrow_is_null() {
1638 let mut fixture = TableTestFixture::new();
1639 fixture.setup_manifest_files().await;
1640
1641 let mut builder = fixture.table.scan();
1643 let predicate = Reference::new("y").is_null();
1644 builder = builder
1645 .with_filter(predicate)
1646 .with_row_selection_enabled(true);
1647 let table_scan = builder.build().unwrap();
1648
1649 let batch_stream = table_scan.to_arrow().await.unwrap();
1650
1651 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1652 assert_eq!(batches.len(), 0);
1653 }
1654
1655 #[tokio::test]
1656 async fn test_filter_on_arrow_is_not_null() {
1657 let mut fixture = TableTestFixture::new();
1658 fixture.setup_manifest_files().await;
1659
1660 let mut builder = fixture.table.scan();
1662 let predicate = Reference::new("y").is_not_null();
1663 builder = builder
1664 .with_filter(predicate)
1665 .with_row_selection_enabled(true);
1666 let table_scan = builder.build().unwrap();
1667
1668 let batch_stream = table_scan.to_arrow().await.unwrap();
1669
1670 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1671 assert_eq!(batches[0].num_rows(), 1024);
1672 }
1673
1674 #[tokio::test]
1675 async fn test_filter_on_arrow_lt_and_gt() {
1676 let mut fixture = TableTestFixture::new();
1677 fixture.setup_manifest_files().await;
1678
1679 let mut builder = fixture.table.scan();
1681 let predicate = Reference::new("y")
1682 .less_than(Datum::long(5))
1683 .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1684 builder = builder
1685 .with_filter(predicate)
1686 .with_row_selection_enabled(true);
1687 let table_scan = builder.build().unwrap();
1688
1689 let batch_stream = table_scan.to_arrow().await.unwrap();
1690
1691 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1692 assert_eq!(batches[0].num_rows(), 500);
1693
1694 let col = batches[0].column_by_name("x").unwrap();
1695 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
1696 assert_eq!(col, &expected_x);
1697
1698 let col = batches[0].column_by_name("y").unwrap();
1699 let mut values = vec![];
1700 values.append(vec![3; 200].as_mut());
1701 values.append(vec![4; 300].as_mut());
1702 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1703 assert_eq!(col, &expected_y);
1704
1705 let col = batches[0].column_by_name("z").unwrap();
1706 let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
1707 assert_eq!(col, &expected_z);
1708 }
1709
1710 #[tokio::test]
1711 async fn test_filter_on_arrow_lt_or_gt() {
1712 let mut fixture = TableTestFixture::new();
1713 fixture.setup_manifest_files().await;
1714
1715 let mut builder = fixture.table.scan();
1717 let predicate = Reference::new("y")
1718 .less_than(Datum::long(5))
1719 .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
1720 builder = builder
1721 .with_filter(predicate)
1722 .with_row_selection_enabled(true);
1723 let table_scan = builder.build().unwrap();
1724
1725 let batch_stream = table_scan.to_arrow().await.unwrap();
1726
1727 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1728 assert_eq!(batches[0].num_rows(), 1024);
1729
1730 let col = batches[0].column_by_name("x").unwrap();
1731 let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
1732 assert_eq!(col, &expected_x);
1733
1734 let col = batches[0].column_by_name("y").unwrap();
1735 let mut values = vec![2; 512];
1736 values.append(vec![3; 200].as_mut());
1737 values.append(vec![4; 300].as_mut());
1738 values.append(vec![5; 12].as_mut());
1739 let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1740 assert_eq!(col, &expected_y);
1741
1742 let col = batches[0].column_by_name("z").unwrap();
1743 let mut values = vec![3; 512];
1744 values.append(vec![4; 512].as_mut());
1745 let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
1746 assert_eq!(col, &expected_z);
1747 }
1748
1749 #[tokio::test]
1750 async fn test_filter_on_arrow_startswith() {
1751 let mut fixture = TableTestFixture::new();
1752 fixture.setup_manifest_files().await;
1753
1754 let mut builder = fixture.table.scan();
1756 let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1757 builder = builder
1758 .with_filter(predicate)
1759 .with_row_selection_enabled(true);
1760 let table_scan = builder.build().unwrap();
1761
1762 let batch_stream = table_scan.to_arrow().await.unwrap();
1763
1764 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1765
1766 assert_eq!(batches[0].num_rows(), 512);
1767
1768 let col = batches[0].column_by_name("a").unwrap();
1769 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1770 assert_eq!(string_arr.value(0), "Iceberg");
1771 }
1772
1773 #[tokio::test]
1774 async fn test_filter_on_arrow_not_startswith() {
1775 let mut fixture = TableTestFixture::new();
1776 fixture.setup_manifest_files().await;
1777
1778 let mut builder = fixture.table.scan();
1780 let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1781 builder = builder
1782 .with_filter(predicate)
1783 .with_row_selection_enabled(true);
1784 let table_scan = builder.build().unwrap();
1785
1786 let batch_stream = table_scan.to_arrow().await.unwrap();
1787
1788 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1789
1790 assert_eq!(batches[0].num_rows(), 512);
1791
1792 let col = batches[0].column_by_name("a").unwrap();
1793 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1794 assert_eq!(string_arr.value(0), "Apache");
1795 }
1796
1797 #[tokio::test]
1798 async fn test_filter_on_arrow_in() {
1799 let mut fixture = TableTestFixture::new();
1800 fixture.setup_manifest_files().await;
1801
1802 let mut builder = fixture.table.scan();
1804 let predicate =
1805 Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1806 builder = builder
1807 .with_filter(predicate)
1808 .with_row_selection_enabled(true);
1809 let table_scan = builder.build().unwrap();
1810
1811 let batch_stream = table_scan.to_arrow().await.unwrap();
1812
1813 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1814
1815 assert_eq!(batches[0].num_rows(), 512);
1816
1817 let col = batches[0].column_by_name("a").unwrap();
1818 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1819 assert_eq!(string_arr.value(0), "Iceberg");
1820 }
1821
1822 #[tokio::test]
1823 async fn test_filter_on_arrow_not_in() {
1824 let mut fixture = TableTestFixture::new();
1825 fixture.setup_manifest_files().await;
1826
1827 let mut builder = fixture.table.scan();
1829 let predicate =
1830 Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1831 builder = builder
1832 .with_filter(predicate)
1833 .with_row_selection_enabled(true);
1834 let table_scan = builder.build().unwrap();
1835
1836 let batch_stream = table_scan.to_arrow().await.unwrap();
1837
1838 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1839
1840 assert_eq!(batches[0].num_rows(), 512);
1841
1842 let col = batches[0].column_by_name("a").unwrap();
1843 let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1844 assert_eq!(string_arr.value(0), "Apache");
1845 }
1846
1847 #[test]
1848 fn test_file_scan_task_serialize_deserialize() {
1849 let test_fn = |task: FileScanTask| {
1850 let serialized = serde_json::to_string(&task).unwrap();
1851 let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
1852
1853 assert_eq!(task.data_file_path, deserialized.data_file_path);
1854 assert_eq!(task.start, deserialized.start);
1855 assert_eq!(task.length, deserialized.length);
1856 assert_eq!(task.project_field_ids, deserialized.project_field_ids);
1857 assert_eq!(task.predicate, deserialized.predicate);
1858 assert_eq!(task.schema, deserialized.schema);
1859 };
1860
1861 let schema = Arc::new(
1863 Schema::builder()
1864 .with_fields(vec![Arc::new(NestedField::required(
1865 1,
1866 "x",
1867 Type::Primitive(PrimitiveType::Binary),
1868 ))])
1869 .build()
1870 .unwrap(),
1871 );
1872 let task = FileScanTask::builder()
1873 .with_data_file_path("data_file_path".to_string())
1874 .with_file_size_in_bytes(0)
1875 .with_start(0)
1876 .with_length(100)
1877 .with_project_field_ids(vec![1, 2, 3])
1878 .with_schema(schema.clone())
1879 .with_record_count(Some(100))
1880 .with_data_file_format(DataFileFormat::Parquet)
1881 .with_case_sensitive(false)
1882 .build();
1883 test_fn(task);
1884
1885 let task = FileScanTask::builder()
1887 .with_data_file_path("data_file_path".to_string())
1888 .with_file_size_in_bytes(0)
1889 .with_start(0)
1890 .with_length(100)
1891 .with_project_field_ids(vec![1, 2, 3])
1892 .with_predicate(Some(BoundPredicate::AlwaysTrue))
1893 .with_schema(schema)
1894 .with_data_file_format(DataFileFormat::Avro)
1895 .with_case_sensitive(false)
1896 .build();
1897 test_fn(task);
1898 }
1899
1900 #[tokio::test]
1901 async fn test_select_with_file_column() {
1902 use arrow_array::cast::AsArray;
1903
1904 let mut fixture = TableTestFixture::new();
1905 fixture.setup_manifest_files().await;
1906
1907 let table_scan = fixture
1909 .table
1910 .scan()
1911 .select(["x", RESERVED_COL_NAME_FILE])
1912 .with_row_selection_enabled(true)
1913 .build()
1914 .unwrap();
1915
1916 let batch_stream = table_scan.to_arrow().await.unwrap();
1917 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1918
1919 assert_eq!(batches[0].num_columns(), 2);
1921
1922 let x_col = batches[0].column_by_name("x").unwrap();
1924 let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
1925 assert_eq!(x_arr.value(0), 1);
1926
1927 let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
1929 assert!(
1930 file_col.is_some(),
1931 "_file column should be present in the batch"
1932 );
1933
1934 let file_col = file_col.unwrap();
1936 assert!(
1937 matches!(
1938 file_col.data_type(),
1939 arrow_schema::DataType::RunEndEncoded(_, _)
1940 ),
1941 "_file column should use RunEndEncoded type"
1942 );
1943
1944 let run_array = file_col
1946 .as_any()
1947 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
1948 .expect("_file column should be a RunArray");
1949
1950 let values = run_array.values();
1951 let string_values = values.as_string::<i32>();
1952 assert_eq!(string_values.len(), 1, "Should have a single file path");
1953
1954 let file_path = string_values.value(0);
1955 assert!(
1956 file_path.ends_with(".parquet"),
1957 "File path should end with .parquet, got: {file_path}"
1958 );
1959 }
1960
1961 #[tokio::test]
1962 async fn test_select_file_column_position() {
1963 let mut fixture = TableTestFixture::new();
1964 fixture.setup_manifest_files().await;
1965
1966 let table_scan = fixture
1968 .table
1969 .scan()
1970 .select(["x", RESERVED_COL_NAME_FILE, "z"])
1971 .with_row_selection_enabled(true)
1972 .build()
1973 .unwrap();
1974
1975 let batch_stream = table_scan.to_arrow().await.unwrap();
1976 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1977
1978 assert_eq!(batches[0].num_columns(), 3);
1979
1980 let schema = batches[0].schema();
1982 assert_eq!(schema.field(0).name(), "x");
1983 assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
1984 assert_eq!(schema.field(2).name(), "z");
1985
1986 assert!(batches[0].column_by_name("x").is_some());
1988 assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
1989 assert!(batches[0].column_by_name("z").is_some());
1990 }
1991
1992 #[tokio::test]
1993 async fn test_select_file_column_only() {
1994 let mut fixture = TableTestFixture::new();
1995 fixture.setup_manifest_files().await;
1996
1997 let table_scan = fixture
1999 .table
2000 .scan()
2001 .select([RESERVED_COL_NAME_FILE])
2002 .with_row_selection_enabled(true)
2003 .build()
2004 .unwrap();
2005
2006 let batch_stream = table_scan.to_arrow().await.unwrap();
2007 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2008
2009 assert_eq!(batches[0].num_columns(), 1);
2011
2012 let schema = batches[0].schema();
2014 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2015
2016 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
2020 assert_eq!(total_rows, 2048);
2021 }
2022
2023 #[tokio::test]
2024 async fn test_file_column_with_multiple_files() {
2025 use std::collections::HashSet;
2026
2027 let mut fixture = TableTestFixture::new();
2028 fixture.setup_manifest_files().await;
2029
2030 let table_scan = fixture
2032 .table
2033 .scan()
2034 .select(["x", RESERVED_COL_NAME_FILE])
2035 .with_row_selection_enabled(true)
2036 .build()
2037 .unwrap();
2038
2039 let batch_stream = table_scan.to_arrow().await.unwrap();
2040 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2041
2042 let mut file_paths = HashSet::new();
2044 for batch in &batches {
2045 let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
2046 let run_array = file_col
2047 .as_any()
2048 .downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
2049 .expect("_file column should be a RunArray");
2050
2051 let values = run_array.values();
2052 let string_values = values.as_string::<i32>();
2053 for i in 0..string_values.len() {
2054 file_paths.insert(string_values.value(i).to_string());
2055 }
2056 }
2057
2058 assert!(!file_paths.is_empty(), "Should have at least one file path");
2060
2061 for path in &file_paths {
2063 assert!(
2064 path.ends_with(".parquet"),
2065 "All file paths should end with .parquet, got: {path}"
2066 );
2067 }
2068 }
2069
2070 #[tokio::test]
2071 async fn test_file_column_at_start() {
2072 let mut fixture = TableTestFixture::new();
2073 fixture.setup_manifest_files().await;
2074
2075 let table_scan = fixture
2077 .table
2078 .scan()
2079 .select([RESERVED_COL_NAME_FILE, "x", "y"])
2080 .with_row_selection_enabled(true)
2081 .build()
2082 .unwrap();
2083
2084 let batch_stream = table_scan.to_arrow().await.unwrap();
2085 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2086
2087 assert_eq!(batches[0].num_columns(), 3);
2088
2089 let schema = batches[0].schema();
2091 assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
2092 assert_eq!(schema.field(1).name(), "x");
2093 assert_eq!(schema.field(2).name(), "y");
2094 }
2095
2096 #[tokio::test]
2097 async fn test_file_column_at_end() {
2098 let mut fixture = TableTestFixture::new();
2099 fixture.setup_manifest_files().await;
2100
2101 let table_scan = fixture
2103 .table
2104 .scan()
2105 .select(["x", "y", RESERVED_COL_NAME_FILE])
2106 .with_row_selection_enabled(true)
2107 .build()
2108 .unwrap();
2109
2110 let batch_stream = table_scan.to_arrow().await.unwrap();
2111 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2112
2113 assert_eq!(batches[0].num_columns(), 3);
2114
2115 let schema = batches[0].schema();
2117 assert_eq!(schema.field(0).name(), "x");
2118 assert_eq!(schema.field(1).name(), "y");
2119 assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE);
2120 }
2121
2122 #[tokio::test]
2123 async fn test_select_with_repeated_column_names() {
2124 let mut fixture = TableTestFixture::new();
2125 fixture.setup_manifest_files().await;
2126
2127 let table_scan = fixture
2130 .table
2131 .scan()
2132 .select([
2133 "x",
2134 RESERVED_COL_NAME_FILE,
2135 "x", "y",
2137 RESERVED_COL_NAME_FILE, "y", ])
2140 .with_row_selection_enabled(true)
2141 .build()
2142 .unwrap();
2143
2144 let batch_stream = table_scan.to_arrow().await.unwrap();
2145 let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
2146
2147 assert_eq!(
2149 batches[0].num_columns(),
2150 6,
2151 "Should have exactly 6 columns with duplicates"
2152 );
2153
2154 let schema = batches[0].schema();
2155
2156 assert_eq!(schema.field(0).name(), "x", "Column 0 should be x");
2158 assert_eq!(
2159 schema.field(1).name(),
2160 RESERVED_COL_NAME_FILE,
2161 "Column 1 should be _file"
2162 );
2163 assert_eq!(
2164 schema.field(2).name(),
2165 "x",
2166 "Column 2 should be x (duplicate)"
2167 );
2168 assert_eq!(schema.field(3).name(), "y", "Column 3 should be y");
2169 assert_eq!(
2170 schema.field(4).name(),
2171 RESERVED_COL_NAME_FILE,
2172 "Column 4 should be _file (duplicate)"
2173 );
2174 assert_eq!(
2175 schema.field(5).name(),
2176 "y",
2177 "Column 5 should be y (duplicate)"
2178 );
2179
2180 assert!(
2182 matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64),
2183 "Column x should be Int64"
2184 );
2185 assert!(
2186 matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64),
2187 "Column x (duplicate) should be Int64"
2188 );
2189 assert!(
2190 matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64),
2191 "Column y should be Int64"
2192 );
2193 assert!(
2194 matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64),
2195 "Column y (duplicate) should be Int64"
2196 );
2197 assert!(
2198 matches!(
2199 schema.field(1).data_type(),
2200 arrow_schema::DataType::RunEndEncoded(_, _)
2201 ),
2202 "_file column should use RunEndEncoded type"
2203 );
2204 assert!(
2205 matches!(
2206 schema.field(4).data_type(),
2207 arrow_schema::DataType::RunEndEncoded(_, _)
2208 ),
2209 "_file column (duplicate) should use RunEndEncoded type"
2210 );
2211 }
2212
2213 #[tokio::test]
2214 async fn test_scan_deadlock() {
2215 let mut fixture = TableTestFixture::new();
2216 fixture.setup_deadlock_manifests().await;
2217
2218 let table_scan = fixture
2225 .table
2226 .scan()
2227 .with_concurrency_limit(1)
2228 .build()
2229 .unwrap();
2230
2231 let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
2234 table_scan
2235 .plan_files()
2236 .await
2237 .unwrap()
2238 .try_collect::<Vec<_>>()
2239 .await
2240 })
2241 .await;
2242
2243 assert!(result.is_ok(), "Scan timed out - deadlock detected");
2245 }
2246}