mod cache;
use cache::*;
mod context;
use context::*;
mod task;
use std::sync::Arc;
use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Sender};
use futures::stream::BoxStream;
use futures::{SinkExt, StreamExt, TryStreamExt};
pub use task::*;
use crate::arrow::ArrowReaderBuilder;
use crate::delete_file_index::DeleteFileIndex;
use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{DataContentType, SnapshotRef};
use crate::table::Table;
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind, Result};
pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
pub struct TableScanBuilder<'a> {
table: &'a Table,
column_names: Option<Vec<String>>,
snapshot_id: Option<i64>,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
concurrency_limit_data_files: usize,
concurrency_limit_manifest_entries: usize,
concurrency_limit_manifest_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
delete_file_processing_enabled: bool,
}
impl<'a> TableScanBuilder<'a> {
pub(crate) fn new(table: &'a Table) -> Self {
let num_cpus = available_parallelism().get();
Self {
table,
column_names: None,
snapshot_id: None,
batch_size: None,
case_sensitive: true,
filter: None,
concurrency_limit_data_files: num_cpus,
concurrency_limit_manifest_entries: num_cpus,
concurrency_limit_manifest_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
delete_file_processing_enabled: false,
}
}
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
self.case_sensitive = case_sensitive;
self
}
pub fn with_filter(mut self, predicate: Predicate) -> Self {
self.filter = Some(predicate.rewrite_not());
self
}
pub fn select_all(mut self) -> Self {
self.column_names = None;
self
}
pub fn select_empty(mut self) -> Self {
self.column_names = Some(vec![]);
self
}
pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
self.column_names = Some(
column_names
.into_iter()
.map(|item| item.to_string())
.collect(),
);
self
}
pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
self.snapshot_id = Some(snapshot_id);
self
}
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
self.concurrency_limit_manifest_entries = limit;
self.concurrency_limit_data_files = limit;
self
}
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_data_files = limit;
self
}
pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_entries = limit;
self
}
pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
self.row_group_filtering_enabled = row_group_filtering_enabled;
self
}
pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
self.row_selection_enabled = row_selection_enabled;
self
}
pub fn with_delete_file_processing_enabled(
mut self,
delete_file_processing_enabled: bool,
) -> Self {
self.delete_file_processing_enabled = delete_file_processing_enabled;
self
}
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?
.clone(),
None => {
let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
return Ok(TableScan {
batch_size: self.batch_size,
column_names: self.column_names,
file_io: self.table.file_io().clone(),
plan_context: None,
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
delete_file_processing_enabled: self.delete_file_processing_enabled,
});
};
current_snapshot_id.clone()
}
};
let schema = snapshot.schema(self.table.metadata())?;
if let Some(column_names) = self.column_names.as_ref() {
for column_name in column_names {
if schema.field_by_name(column_name).is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Column {} not found in table. Schema: {}",
column_name, schema
),
));
}
}
}
let mut field_ids = vec![];
let column_names = self.column_names.clone().unwrap_or_else(|| {
schema
.as_struct()
.fields()
.iter()
.map(|f| f.name.clone())
.collect()
});
for column_name in column_names.iter() {
let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Column {} not found in table. Schema: {}",
column_name, schema
),
)
})?;
schema
.as_struct()
.field_by_id(field_id)
.ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}",
column_name, schema
),
)
})?;
field_ids.push(field_id);
}
let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
Some(predicates.bind(schema.clone(), true)?)
} else {
None
};
let plan_context = PlanContext {
snapshot,
table_metadata: self.table.metadata_ref(),
snapshot_schema: schema,
case_sensitive: self.case_sensitive,
predicate: self.filter.map(Arc::new),
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
object_cache: self.table.object_cache(),
field_ids: Arc::new(field_ids),
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
};
Ok(TableScan {
batch_size: self.batch_size,
column_names: self.column_names,
file_io: self.table.file_io().clone(),
plan_context: Some(plan_context),
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
delete_file_processing_enabled: self.delete_file_processing_enabled,
})
}
}
#[derive(Debug)]
pub struct TableScan {
plan_context: Option<PlanContext>,
batch_size: Option<usize>,
file_io: FileIO,
column_names: Option<Vec<String>>,
concurrency_limit_manifest_files: usize,
concurrency_limit_manifest_entries: usize,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
delete_file_processing_enabled: bool,
}
impl TableScan {
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
let Some(plan_context) = self.plan_context.as_ref() else {
return Ok(Box::pin(futures::stream::empty()));
};
let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
channel(concurrency_limit_manifest_files);
let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
channel(concurrency_limit_manifest_files);
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
if self.delete_file_processing_enabled {
Some(DeleteFileIndex::new())
} else {
None
};
let manifest_list = plan_context.get_manifest_list().await?;
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
manifest_list,
manifest_entry_data_ctx_tx,
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
}),
)?;
let mut channel_for_manifest_error = file_scan_task_tx.clone();
spawn(async move {
let result = futures::stream::iter(manifest_file_contexts)
.try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
ctx.fetch_manifest_and_stream_manifest_entries().await
})
.await;
if let Err(error) = result {
let _ = channel_for_manifest_error.send(Err(error)).await;
}
});
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
spawn(async move {
let result = manifest_entry_delete_ctx_rx
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_delete_manifest_entry(manifest_entry_context, tx)
.await
})
.await
},
)
.await;
if let Err(error) = result {
let _ = channel_for_delete_manifest_entry_error
.send(Err(error))
.await;
}
})
.await;
}
spawn(async move {
let result = manifest_entry_data_ctx_rx
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_data_manifest_entry(manifest_entry_context, tx).await
})
.await
},
)
.await;
if let Err(error) = result {
let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
}
});
Ok(file_scan_task_rx.boxed())
}
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
.with_row_selection_enabled(self.row_selection_enabled);
if let Some(batch_size) = self.batch_size {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
}
arrow_reader_builder
.build()
.read(self.plan_files().await?)
.await
}
pub fn column_names(&self) -> Option<&[String]> {
self.column_names.as_deref()
}
pub fn snapshot(&self) -> Option<&SnapshotRef> {
self.plan_context.as_ref().map(|x| &x.snapshot)
}
async fn process_data_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut file_scan_task_tx: Sender<Result<FileScanTask>>,
) -> Result<()> {
if !manifest_entry_context.manifest_entry.is_alive() {
return Ok(());
}
if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Encountered an entry for a delete file in a data file manifest",
));
}
if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let BoundPredicates {
snapshot_bound_predicate,
partition_bound_predicate,
} = bound_predicates.as_ref();
let expression_evaluator_cache =
manifest_entry_context.expression_evaluator_cache.as_ref();
let expression_evaluator = expression_evaluator_cache.get(
manifest_entry_context.partition_spec_id,
partition_bound_predicate,
)?;
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
return Ok(());
}
if !InclusiveMetricsEvaluator::eval(
snapshot_bound_predicate,
manifest_entry_context.manifest_entry.data_file(),
false,
)? {
return Ok(());
}
}
file_scan_task_tx
.send(Ok(manifest_entry_context.into_file_scan_task().await?))
.await?;
Ok(())
}
async fn process_delete_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut delete_file_ctx_tx: Sender<DeleteFileContext>,
) -> Result<()> {
if !manifest_entry_context.manifest_entry.is_alive() {
return Ok(());
}
if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Encountered an entry for a data file in a delete manifest",
));
}
if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let expression_evaluator_cache =
manifest_entry_context.expression_evaluator_cache.as_ref();
let expression_evaluator = expression_evaluator_cache.get(
manifest_entry_context.partition_spec_id,
&bound_predicates.partition_bound_predicate,
)?;
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
return Ok(());
}
}
delete_file_ctx_tx
.send(DeleteFileContext {
manifest_entry: manifest_entry_context.manifest_entry.clone(),
partition_spec_id: manifest_entry_context.partition_spec_id,
})
.await?;
Ok(())
}
}
pub(crate) struct BoundPredicates {
partition_bound_predicate: BoundPredicate,
snapshot_bound_predicate: BoundPredicate,
}
#[cfg(test)]
pub mod tests {
#![allow(missing_docs)]
use std::collections::HashMap;
use std::fs;
use std::fs::File;
use std::sync::Arc;
use arrow_array::{
ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
};
use futures::{stream, TryStreamExt};
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
use tera::{Context, Tera};
use uuid::Uuid;
use crate::arrow::ArrowReaderBuilder;
use crate::expr::{BoundPredicate, Reference};
use crate::io::{FileIO, OutputFile};
use crate::scan::FileScanTask;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
};
use crate::table::Table;
use crate::TableIdent;
pub struct TableTestFixture {
pub table_location: String,
pub table: Table,
}
impl TableTestFixture {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
let table_metadata1_location = table_location.join("metadata/v1.json");
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();
let table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("manifest_list_1_location", &manifest_list1_location);
context.insert("manifest_list_2_location", &manifest_list2_location);
context.insert("table_metadata_1_location", &table_metadata1_location);
let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
};
let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.build()
.unwrap();
Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}
#[allow(clippy::new_without_default)]
pub fn new_empty() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let table_metadata1_location = table_location.join("metadata/v1.json");
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();
let table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_empty_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("table_metadata_1_location", &table_metadata1_location);
let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
};
let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.build()
.unwrap();
Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}
pub fn new_unpartitioned() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
let table_metadata1_location = table_location.join("metadata/v1.json");
let file_io = FileIO::from_path(table_location.to_str().unwrap())
.unwrap()
.build()
.unwrap();
let mut table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("manifest_list_1_location", &manifest_list1_location);
context.insert("manifest_list_2_location", &manifest_list2_location);
context.insert("table_metadata_1_location", &table_metadata1_location);
let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
};
table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
table_metadata.partition_specs.clear();
table_metadata.default_partition_type = StructType::new(vec![]);
table_metadata
.partition_specs
.insert(0, table_metadata.default_spec.clone());
let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.to_str().unwrap())
.build()
.unwrap();
Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}
fn next_manifest_file(&self) -> OutputFile {
self.table
.file_io()
.new_output(format!(
"{}/metadata/manifest_{}.avro",
self.table_location,
Uuid::new_v4()
))
.unwrap()
}
pub async fn setup_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
Some(current_snapshot.snapshot_id()),
vec![],
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.build_v2_data();
writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_delete_entry(
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_existing_entry(
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let data_file_manifest = writer.write_manifest_file().await.unwrap();
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
let schema = {
let fields = vec![
arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let mut values = vec![2; 512];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
values.append(vec![5; 12].as_mut());
let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![3; 512];
values.append(vec![4; 512].as_mut());
let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec!["Apache"; 512];
values.append(vec!["Iceberg"; 512].as_mut());
let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
let mut values = vec![100.0f64; 512];
values.append(vec![150.0f64; 12].as_mut());
values.append(vec![200.0f64; 500].as_mut());
let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![100i32; 512];
values.append(vec![150i32; 12].as_mut());
values.append(vec![200i32; 500].as_mut());
let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![100i64; 512];
values.append(vec![150i64; 12].as_mut());
values.append(vec![200i64; 500].as_mut());
let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![false; 512];
values.append(vec![true; 512].as_mut());
let values: BooleanArray = values.into();
let col8 = Arc::new(values) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(), vec![
col1, col2, col3, col4, col5, col6, col7, col8,
])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
for n in 1..=3 {
let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
}
}
pub async fn setup_unpartitioned_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
Some(current_snapshot.snapshot_id()),
vec![],
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.build_v2_data();
let empty_partition = Struct::empty();
writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(empty_partition.clone())
.key_metadata(None)
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_delete_entry(
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(empty_partition.clone())
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_existing_entry(
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(empty_partition.clone())
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let data_file_manifest = writer.write_manifest_file().await.unwrap();
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
let schema = {
let fields = vec![
arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let mut values = vec![2; 512];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
values.append(vec![5; 12].as_mut());
let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![3; 512];
values.append(vec![4; 512].as_mut());
let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec!["Apache"; 512];
values.append(vec!["Iceberg"; 512].as_mut());
let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
let mut values = vec![100.0f64; 512];
values.append(vec![150.0f64; 12].as_mut());
values.append(vec![200.0f64; 500].as_mut());
let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![100i32; 512];
values.append(vec![150i32; 12].as_mut());
values.append(vec![200i32; 500].as_mut());
let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![100i64; 512];
values.append(vec![150i64; 12].as_mut());
values.append(vec![200i64; 500].as_mut());
let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![false; 512];
values.append(vec![true; 512].as_mut());
let values: BooleanArray = values.into();
let col8 = Arc::new(values) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(), vec![
col1, col2, col3, col4, col5, col6, col7, col8,
])
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
for n in 1..=3 {
let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
}
}
}
#[test]
fn test_table_scan_columns() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().select(["x", "y"]).build().unwrap();
assert_eq!(
Some(vec!["x".to_string(), "y".to_string()]),
table_scan.column_names
);
let table_scan = table
.scan()
.select(["x", "y"])
.select(["z"])
.build()
.unwrap();
assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
}
#[test]
fn test_select_all() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().select_all().build().unwrap();
assert!(table_scan.column_names.is_none());
}
#[test]
fn test_select_no_exist_column() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
assert!(table_scan.is_err());
}
#[test]
fn test_table_scan_default_snapshot_id() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().build().unwrap();
assert_eq!(
table.metadata().current_snapshot().unwrap().snapshot_id(),
table_scan.snapshot().unwrap().snapshot_id()
);
}
#[test]
fn test_table_scan_non_exist_snapshot_id() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().snapshot_id(1024).build();
assert!(table_scan.is_err());
}
#[test]
fn test_table_scan_with_snapshot_id() {
let table = TableTestFixture::new().table;
let table_scan = table
.scan()
.snapshot_id(3051729675574597004)
.with_row_selection_enabled(true)
.build()
.unwrap();
assert_eq!(
table_scan.snapshot().unwrap().snapshot_id(),
3051729675574597004
);
}
#[tokio::test]
async fn test_plan_files_on_table_without_any_snapshots() {
let table = TableTestFixture::new_empty().table;
let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert!(batches.is_empty());
}
#[tokio::test]
async fn test_plan_files_no_deletions() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let table_scan = fixture
.table
.scan()
.with_row_selection_enabled(true)
.build()
.unwrap();
let mut tasks = table_scan
.plan_files()
.await
.unwrap()
.try_fold(vec![], |mut acc, task| async move {
acc.push(task);
Ok(acc)
})
.await
.unwrap();
assert_eq!(tasks.len(), 2);
tasks.sort_by_key(|t| t.data_file_path.to_string());
assert_eq!(
tasks[0].data_file_path,
format!("{}/1.parquet", &fixture.table_location)
);
assert_eq!(
tasks[1].data_file_path,
format!("{}/3.parquet", &fixture.table_location)
);
}
#[tokio::test]
async fn test_open_parquet_no_deletions() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let table_scan = fixture
.table
.scan()
.with_row_selection_enabled(true)
.build()
.unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
let col = batches[0].column_by_name("x").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
}
#[tokio::test]
async fn test_open_parquet_no_deletions_by_separate_reader() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let table_scan = fixture
.table
.scan()
.with_row_selection_enabled(true)
.build()
.unwrap();
let mut plan_task: Vec<_> = table_scan
.plan_files()
.await
.unwrap()
.try_collect()
.await
.unwrap();
assert_eq!(plan_task.len(), 2);
let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
let batch_stream = reader
.clone()
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
let batch_stream = reader
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batch_1, batch_2);
}
#[tokio::test]
async fn test_open_parquet_with_projection() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let table_scan = fixture
.table
.scan()
.select(["x", "z"])
.with_row_selection_enabled(true)
.build()
.unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_columns(), 2);
let col1 = batches[0].column_by_name("x").unwrap();
let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
let col2 = batches[0].column_by_name("z").unwrap();
let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 3);
let table_scan = fixture.table.scan().select_empty().build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_columns(), 0);
assert_eq!(batches[0].num_rows(), 1024);
}
#[tokio::test]
async fn test_filter_on_arrow_lt() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").less_than(Datum::long(3));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("x").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
let col = batches[0].column_by_name("y").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 2);
}
#[tokio::test]
async fn test_filter_on_arrow_gt_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("x").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
let col = batches[0].column_by_name("y").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 5);
}
#[tokio::test]
async fn test_filter_double_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("dbl").unwrap();
let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
assert_eq!(f64_arr.value(1), 150.0f64);
}
#[tokio::test]
async fn test_filter_int_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("i32").unwrap();
let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(i32_arr.value(1), 150i32);
}
#[tokio::test]
async fn test_filter_long_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("i64").unwrap();
let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(i64_arr.value(1), 150i64);
}
#[tokio::test]
async fn test_filter_bool_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("bool").equal_to(Datum::bool(true));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("bool").unwrap();
let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
assert!(bool_arr.value(1));
}
#[tokio::test]
async fn test_filter_on_arrow_is_null() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").is_null();
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 0);
}
#[tokio::test]
async fn test_filter_on_arrow_is_not_null() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").is_not_null();
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 1024);
}
#[tokio::test]
async fn test_filter_on_arrow_lt_and_gt() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("y")
.less_than(Datum::long(5))
.and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 500);
let col = batches[0].column_by_name("x").unwrap();
let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
assert_eq!(col, &expected_x);
let col = batches[0].column_by_name("y").unwrap();
let mut values = vec![];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
assert_eq!(col, &expected_y);
let col = batches[0].column_by_name("z").unwrap();
let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
assert_eq!(col, &expected_z);
}
#[tokio::test]
async fn test_filter_on_arrow_lt_or_gt() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("y")
.less_than(Datum::long(5))
.or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 1024);
let col = batches[0].column_by_name("x").unwrap();
let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
assert_eq!(col, &expected_x);
let col = batches[0].column_by_name("y").unwrap();
let mut values = vec![2; 512];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
values.append(vec![5; 12].as_mut());
let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
assert_eq!(col, &expected_y);
let col = batches[0].column_by_name("z").unwrap();
let mut values = vec![3; 512];
values.append(vec![4; 512].as_mut());
let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
assert_eq!(col, &expected_z);
}
#[tokio::test]
async fn test_filter_on_arrow_startswith() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Iceberg");
}
#[tokio::test]
async fn test_filter_on_arrow_not_startswith() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Apache");
}
#[tokio::test]
async fn test_filter_on_arrow_in() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate =
Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Iceberg");
}
#[tokio::test]
async fn test_filter_on_arrow_not_in() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
let mut builder = fixture.table.scan();
let predicate =
Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Apache");
}
#[test]
fn test_file_scan_task_serialize_deserialize() {
let test_fn = |task: FileScanTask| {
let serialized = serde_json::to_string(&task).unwrap();
let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
assert_eq!(task.data_file_path, deserialized.data_file_path);
assert_eq!(task.start, deserialized.start);
assert_eq!(task.length, deserialized.length);
assert_eq!(task.project_field_ids, deserialized.project_field_ids);
assert_eq!(task.predicate, deserialized.predicate);
assert_eq!(task.schema, deserialized.schema);
};
let schema = Arc::new(
Schema::builder()
.with_fields(vec![Arc::new(NestedField::required(
1,
"x",
Type::Primitive(PrimitiveType::Binary),
))])
.build()
.unwrap(),
);
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: None,
schema: schema.clone(),
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
deletes: vec![],
};
test_fn(task);
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: Some(BoundPredicate::AlwaysTrue),
schema,
record_count: None,
data_file_format: DataFileFormat::Avro,
deletes: vec![],
};
test_fn(task);
}
}