use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{
ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
}
impl ArrowReaderBuilder {
pub(crate) fn new(file_io: FileIO) -> Self {
let num_cpus = available_parallelism().get();
ArrowReaderBuilder {
batch_size: None,
file_io,
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
}
}
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
self.concurrency_limit_data_files = val;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = Some(batch_size);
self
}
pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
self.row_group_filtering_enabled = row_group_filtering_enabled;
self
}
pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
self.row_selection_enabled = row_selection_enabled;
self
}
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
}
}
}
#[derive(Clone)]
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
}
impl ArrowReader {
pub async fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;
let stream = tasks
.map_ok(move |task| {
let file_io = file_io.clone();
Self::process_file_scan_task(
task,
batch_size,
file_io,
row_group_filtering_enabled,
row_selection_enabled,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_flatten_unordered(concurrency_limit_data_files);
Ok(Box::pin(stream) as ArrowRecordBatchStream)
}
async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<ArrowRecordBatchStream> {
let parquet_file = file_io.new_input(&task.data_file_path)?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
let should_load_page_index = row_selection_enabled && task.predicate.is_some();
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
parquet_file_reader,
ArrowReaderOptions::new().with_page_index(should_load_page_index),
)
.await?;
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}
if let Some(predicate) = &task.predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
predicate,
)?;
let row_filter = Self::get_row_filter(
predicate,
record_batch_stream_builder.parquet_schema(),
&iceberg_field_ids,
&field_id_map,
)?;
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
let mut selected_row_groups = None;
if row_group_filtering_enabled {
let result = Self::get_selected_row_group_indices(
predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
&task.schema,
)?;
selected_row_groups = Some(result);
}
if row_selection_enabled {
let row_selection = Self::get_row_selection(
predicate,
record_batch_stream_builder.metadata(),
&selected_row_groups,
&field_id_map,
&task.schema,
)?;
record_batch_stream_builder =
record_batch_stream_builder.with_row_selection(row_selection);
}
if let Some(selected_row_groups) = selected_row_groups {
record_batch_stream_builder =
record_batch_stream_builder.with_row_groups(selected_row_groups);
}
}
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
fn build_field_id_set_and_map(
parquet_schema: &SchemaDescriptor,
predicate: &BoundPredicate,
) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut collector, predicate)?;
let iceberg_field_ids = collector.field_ids();
let field_id_map = build_field_id_map(parquet_schema)?;
Ok((iceberg_field_ids, field_id_map))
}
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
match field.field_type.as_ref() {
Type::Primitive(_) => {
field_ids.push(field.id);
}
Type::Struct(struct_type) => {
for nested_field in struct_type.fields() {
Self::include_leaf_field_id(nested_field, field_ids);
}
}
Type::List(list_type) => {
Self::include_leaf_field_id(&list_type.element_field, field_ids);
}
Type::Map(map_type) => {
Self::include_leaf_field_id(&map_type.key_field, field_ids);
Self::include_leaf_field_id(&map_type.value_field, field_ids);
}
}
}
fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> Result<ProjectionMask> {
fn type_promotion_is_valid(
file_type: Option<&PrimitiveType>,
projected_type: Option<&PrimitiveType>,
) -> bool {
match (file_type, projected_type) {
(Some(lhs), Some(rhs)) if lhs == rhs => true,
(Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
(Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
(
Some(PrimitiveType::Decimal {
precision: file_precision,
scale: file_scale,
}),
Some(PrimitiveType::Decimal {
precision: requested_precision,
scale: requested_scale,
}),
) if requested_precision >= file_precision && file_scale == requested_scale => true,
(Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
_ => false,
}
}
let mut leaf_field_ids = vec![];
for field_id in field_ids {
let field = iceberg_schema_of_task.field_by_id(*field_id);
if let Some(field) = field {
Self::include_leaf_field_id(field, &mut leaf_field_ids);
}
}
if leaf_field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
let projected_arrow_schema = ArrowSchema::new_with_metadata(
fields.filter_leaves(|_, f| {
f.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|field_id| i32::from_str(field_id).ok())
.map_or(false, |field_id| {
projected_fields.insert((*f).clone(), field_id);
leaf_field_ids.contains(&field_id)
})
}),
arrow_schema.metadata().clone(),
);
let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
fields.filter_leaves(|idx, field| {
let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
};
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
return false;
}
if !type_promotion_is_valid(
parquet_iceberg_field
.unwrap()
.field_type
.as_primitive_type(),
iceberg_field.unwrap().field_type.as_primitive_type(),
) {
return false;
}
column_map.insert(field_id, idx);
true
});
if column_map.len() != leaf_field_ids.len() {
let missing_fields = leaf_field_ids
.iter()
.filter(|field_id| !column_map.contains_key(field_id))
.collect::<Vec<_>>();
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, iceberg_schema_of_task
),
)
.with_context("column_map", format! {"{:?}", column_map})
.with_context("field_ids", format! {"{:?}", leaf_field_ids})
.with_context("missing_fields", format! {"{:?}", missing_fields}));
}
let mut indices = vec![];
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(&field_id) {
indices.push(*col_idx);
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Field {} is not found in Parquet schema.", field_id),
));
}
}
Ok(ProjectionMask::leaves(parquet_schema, indices))
}
}
fn get_row_filter(
predicates: &BoundPredicate,
parquet_schema: &SchemaDescriptor,
iceberg_field_ids: &HashSet<i32>,
field_id_map: &HashMap<i32, usize>,
) -> Result<RowFilter> {
let mut column_indices = iceberg_field_ids
.iter()
.filter_map(|field_id| field_id_map.get(field_id).cloned())
.collect::<Vec<_>>();
column_indices.sort();
let mut converter = PredicateConverter {
parquet_schema,
column_map: field_id_map,
column_indices: &column_indices,
};
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
let predicate_func = visit(&mut converter, predicates)?;
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
}
fn get_selected_row_group_indices(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<Vec<usize>> {
let row_groups_metadata = parquet_metadata.row_groups();
let mut results = Vec::with_capacity(row_groups_metadata.len());
for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
if RowGroupMetricsEvaluator::eval(
predicate,
row_group_metadata,
field_id_map,
snapshot_schema,
)? {
results.push(idx);
}
}
Ok(results)
}
fn get_row_selection(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
selected_row_groups: &Option<Vec<usize>>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<RowSelection> {
let Some(column_index) = parquet_metadata.column_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain a column index",
));
};
let Some(offset_index) = parquet_metadata.offset_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain an offset index",
));
};
let mut selected_row_groups_idx = 0;
let page_index = column_index
.iter()
.enumerate()
.zip(offset_index)
.zip(parquet_metadata.row_groups());
let mut results = Vec::new();
for (((idx, column_index), offset_index), row_group_metadata) in page_index {
if let Some(selected_row_groups) = selected_row_groups {
if idx == selected_row_groups[selected_row_groups_idx] {
selected_row_groups_idx += 1;
} else {
continue;
}
}
let selections_for_page = PageIndexEvaluator::eval(
predicate,
column_index,
offset_index,
row_group_metadata,
field_id_map,
snapshot_schema,
)?;
results.push(selections_for_page);
if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
}
}
Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
}
}
fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> {
let mut column_map = HashMap::new();
for (idx, field) in parquet_schema.columns().iter().enumerate() {
let field_type = field.self_type();
match field_type {
ParquetType::PrimitiveType { basic_info, .. } => {
if !basic_info.has_id() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column idx: {}, name: {}, type {:?} in schema doesn't have field id",
idx,
basic_info.name(),
field_type
),
));
}
column_map.insert(basic_info.id(), idx);
}
ParquetType::GroupType { .. } => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column in schema should be primitive type but got {:?}",
field_type
),
));
}
};
}
Ok(column_map)
}
struct CollectFieldIdVisitor {
field_ids: HashSet<i32>,
}
impl CollectFieldIdVisitor {
fn field_ids(self) -> HashSet<i32> {
self.field_ids
}
}
impl BoundPredicateVisitor for CollectFieldIdVisitor {
type T = ();
fn always_true(&mut self) -> Result<()> {
Ok(())
}
fn always_false(&mut self) -> Result<()> {
Ok(())
}
fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
Ok(())
}
fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
Ok(())
}
fn not(&mut self, _inner: ()) -> Result<()> {
Ok(())
}
fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn less_than(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn less_than_or_eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn greater_than(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn greater_than_or_eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_eq(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn starts_with(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_starts_with(
&mut self,
reference: &BoundReference,
_literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn r#in(
&mut self,
reference: &BoundReference,
_literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
fn not_in(
&mut self,
reference: &BoundReference,
_literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<()> {
self.field_ids.insert(reference.field().id);
Ok(())
}
}
struct PredicateConverter<'a> {
pub parquet_schema: &'a SchemaDescriptor,
pub column_map: &'a HashMap<i32, usize>,
pub column_indices: &'a Vec<usize>,
}
impl PredicateConverter<'_> {
fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
if let Some(column_idx) = self.column_map.get(&reference.field().id) {
if self.parquet_schema.get_column_root_idx(*column_idx) != *column_idx {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Leave column `{}` in predicates isn't a root column in Parquet schema.",
reference.field().name
),
));
}
let index = self
.column_indices
.iter()
.position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!(
"Leave column `{}` in predicates cannot be found in the required column indices.",
reference.field().name
)))?;
Ok(Some(index))
} else {
Ok(None)
}
}
fn build_always_true(&self) -> Result<Box<PredicateResult>> {
Ok(Box::new(|batch| {
Ok(BooleanArray::from(vec![true; batch.num_rows()]))
}))
}
fn build_always_false(&self) -> Result<Box<PredicateResult>> {
Ok(Box::new(|batch| {
Ok(BooleanArray::from(vec![false; batch.num_rows()]))
}))
}
}
fn project_column(
batch: &RecordBatch,
column_idx: usize,
) -> std::result::Result<ArrayRef, ArrowError> {
let column = batch.column(column_idx);
match column.data_type() {
DataType::Struct(_) => Err(ArrowError::SchemaError(
"Does not support struct column yet.".to_string(),
)),
_ => Ok(column.clone()),
}
}
type PredicateResult =
dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
type T = Box<PredicateResult>;
fn always_true(&mut self) -> Result<Box<PredicateResult>> {
self.build_always_true()
}
fn always_false(&mut self) -> Result<Box<PredicateResult>> {
self.build_always_false()
}
fn and(
&mut self,
mut lhs: Box<PredicateResult>,
mut rhs: Box<PredicateResult>,
) -> Result<Box<PredicateResult>> {
Ok(Box::new(move |batch| {
let left = lhs(batch.clone())?;
let right = rhs(batch)?;
and(&left, &right)
}))
}
fn or(
&mut self,
mut lhs: Box<PredicateResult>,
mut rhs: Box<PredicateResult>,
) -> Result<Box<PredicateResult>> {
Ok(Box::new(move |batch| {
let left = lhs(batch.clone())?;
let right = rhs(batch)?;
or(&left, &right)
}))
}
fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
Ok(Box::new(move |batch| {
let pred_ret = inner(batch)?;
not(&pred_ret)
}))
}
fn is_null(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
Ok(Box::new(move |batch| {
let column = project_column(&batch, idx)?;
is_null(&column)
}))
} else {
self.build_always_true()
}
}
fn not_null(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
Ok(Box::new(move |batch| {
let column = project_column(&batch, idx)?;
is_not_null(&column)
}))
} else {
self.build_always_false()
}
}
fn is_nan(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if self.bound_reference(reference)?.is_some() {
self.build_always_true()
} else {
self.build_always_false()
}
}
fn not_nan(
&mut self,
reference: &BoundReference,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if self.bound_reference(reference)?.is_some() {
self.build_always_false()
} else {
self.build_always_true()
}
}
fn less_than(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
lt(&left, literal.as_ref())
}))
} else {
self.build_always_true()
}
}
fn less_than_or_eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
lt_eq(&left, literal.as_ref())
}))
} else {
self.build_always_true()
}
}
fn greater_than(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
gt(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn greater_than_or_eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
gt_eq(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
eq(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn not_eq(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
neq(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn starts_with(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
starts_with(&left, literal.as_ref())
}))
} else {
self.build_always_false()
}
}
fn not_starts_with(
&mut self,
reference: &BoundReference,
literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literal = get_arrow_datum(literal)?;
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
not(&starts_with(&left, literal.as_ref())?)
}))
} else {
self.build_always_true()
}
}
fn r#in(
&mut self,
reference: &BoundReference,
literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literals: Vec<_> = literals
.iter()
.map(|lit| get_arrow_datum(lit).unwrap())
.collect();
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
for literal in &literals {
acc = or(&acc, &eq(&left, literal.as_ref())?)?
}
Ok(acc)
}))
} else {
self.build_always_false()
}
}
fn not_in(
&mut self,
reference: &BoundReference,
literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
if let Some(idx) = self.bound_reference(reference)? {
let literals: Vec<_> = literals
.iter()
.map(|lit| get_arrow_datum(lit).unwrap())
.collect();
Ok(Box::new(move |batch| {
let left = project_column(&batch, idx)?;
let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
for literal in &literals {
acc = and(&acc, &neq(&left, literal.as_ref())?)?
}
Ok(acc)
}))
} else {
self.build_always_true()
}
}
}
struct ArrowFileReader<R: FileRead> {
meta: FileMetadata,
r: R,
}
impl<R: FileRead> ArrowFileReader<R> {
fn new(meta: FileMetadata, r: R) -> Self {
Self { meta, r }
}
}
impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(
self.r
.read(range.start as _..range.end as _)
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
)
}
fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
let reader = ParquetMetaDataReader::new();
let size = self.meta.size as usize;
let meta = reader.load_and_finish(self, size).await?;
Ok(Arc::new(meta))
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use parquet::arrow::ProjectionMask;
use parquet::schema::parser::parse_message_type;
use parquet::schema::types::SchemaDescriptor;
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
use crate::arrow::ArrowReader;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Reference};
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
use crate::ErrorKind;
fn table_schema_simple() -> SchemaRef {
Arc::new(
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
.with_fields(vec![
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
])
.build()
.unwrap(),
)
}
#[test]
fn test_collect_field_id() {
let schema = table_schema_simple();
let expr = Reference::new("qux").is_null();
let bound_expr = expr.bind(schema, true).unwrap();
let mut visitor = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut visitor, &bound_expr).unwrap();
let mut expected = HashSet::default();
expected.insert(4_i32);
assert_eq!(visitor.field_ids, expected);
}
#[test]
fn test_collect_field_id_with_and() {
let schema = table_schema_simple();
let expr = Reference::new("qux")
.is_null()
.and(Reference::new("baz").is_null());
let bound_expr = expr.bind(schema, true).unwrap();
let mut visitor = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut visitor, &bound_expr).unwrap();
let mut expected = HashSet::default();
expected.insert(4_i32);
expected.insert(3);
assert_eq!(visitor.field_ids, expected);
}
#[test]
fn test_collect_field_id_with_or() {
let schema = table_schema_simple();
let expr = Reference::new("qux")
.is_null()
.or(Reference::new("baz").is_null());
let bound_expr = expr.bind(schema, true).unwrap();
let mut visitor = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut visitor, &bound_expr).unwrap();
let mut expected = HashSet::default();
expected.insert(4_i32);
expected.insert(3);
assert_eq!(visitor.field_ids, expected);
}
#[test]
fn test_arrow_projection_mask() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![1])
.with_fields(vec![
NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(
3,
"c3",
Type::Primitive(PrimitiveType::Decimal {
precision: 38,
scale: 3,
}),
)
.into(),
])
.build()
.unwrap(),
);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
]));
let message_type = "
message schema {
required binary c1 (STRING) = 1;
optional int32 c2 (INTEGER(8,true)) = 2;
optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
}
";
let parquet_type = parse_message_type(message_type).expect("should parse schema");
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
let err = ArrowReader::get_arrow_projection_mask(
&[1, 2, 3],
&schema,
&parquet_schema,
&arrow_schema,
)
.unwrap_err();
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert_eq!(
err.to_string(),
"DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
);
let err = ArrowReader::get_arrow_projection_mask(
&[1, 3],
&schema,
&parquet_schema,
&arrow_schema,
)
.unwrap_err();
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert_eq!(
err.to_string(),
"DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
);
let mask =
ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
.expect("Some ProjectionMask");
assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
}
}