1use std::sync::Arc;
19
20use futures::stream::BoxStream;
21use serde::{Deserialize, Serialize, Serializer};
22use typed_builder::TypedBuilder;
23
24use crate::Result;
25use crate::expr::BoundPredicate;
26use crate::spec::{
27 DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, PartitionSpec, Schema,
28 SchemaRef, Struct,
29};
30
31pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
33
34fn serialize_not_implemented<S, T>(_: &T, _: S) -> std::result::Result<S::Ok, S::Error>
37where S: Serializer {
38 Err(serde::ser::Error::custom(
39 "Serialization not implemented for this field",
40 ))
41}
42
43fn deserialize_not_implemented<'de, D, T>(_: D) -> std::result::Result<T, D::Error>
46where D: serde::Deserializer<'de> {
47 Err(serde::de::Error::custom(
48 "Deserialization not implemented for this field",
49 ))
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TypedBuilder)]
54#[builder(field_defaults(setter(prefix = "with_")))]
55pub struct FileScanTask {
56 pub file_size_in_bytes: u64,
59 pub start: u64,
61 pub length: u64,
63 #[builder(default)]
68 pub record_count: Option<u64>,
69
70 pub data_file_path: String,
72
73 pub data_file_format: DataFileFormat,
75
76 pub schema: SchemaRef,
78 pub project_field_ids: Vec<i32>,
80 #[serde(skip_serializing_if = "Option::is_none")]
82 #[builder(default)]
83 pub predicate: Option<BoundPredicate>,
84
85 #[builder(default)]
87 pub deletes: Vec<FileScanTaskDeleteFile>,
88
89 #[serde(default)]
93 #[serde(skip_serializing_if = "Option::is_none")]
94 #[serde(serialize_with = "serialize_not_implemented")]
95 #[serde(deserialize_with = "deserialize_not_implemented")]
96 #[builder(default)]
97 pub partition: Option<Struct>,
98
99 #[serde(default)]
103 #[serde(skip_serializing_if = "Option::is_none")]
104 #[serde(serialize_with = "serialize_not_implemented")]
105 #[serde(deserialize_with = "deserialize_not_implemented")]
106 #[builder(default)]
107 pub partition_spec: Option<Arc<PartitionSpec>>,
108
109 #[serde(default)]
113 #[serde(skip_serializing_if = "Option::is_none")]
114 #[serde(serialize_with = "serialize_not_implemented")]
115 #[serde(deserialize_with = "deserialize_not_implemented")]
116 #[builder(default)]
117 pub name_mapping: Option<Arc<NameMapping>>,
118
119 pub case_sensitive: bool,
121}
122
123impl FileScanTask {
124 pub fn data_file_path(&self) -> &str {
126 &self.data_file_path
127 }
128
129 pub fn project_field_ids(&self) -> &[i32] {
131 &self.project_field_ids
132 }
133
134 pub fn predicate(&self) -> Option<&BoundPredicate> {
136 self.predicate.as_ref()
137 }
138
139 pub fn schema(&self) -> &Schema {
141 &self.schema
142 }
143
144 pub fn schema_ref(&self) -> SchemaRef {
146 self.schema.clone()
147 }
148}
149
150#[derive(Debug)]
151pub(crate) struct DeleteFileContext {
152 pub(crate) manifest_entry: ManifestEntryRef,
153 pub(crate) partition_spec_id: i32,
154}
155
156impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
157 fn from(ctx: &DeleteFileContext) -> Self {
158 FileScanTaskDeleteFile::builder()
159 .with_file_path(ctx.manifest_entry.file_path().to_string())
160 .with_file_size_in_bytes(ctx.manifest_entry.file_size_in_bytes())
161 .with_file_type(ctx.manifest_entry.content_type())
162 .with_partition_spec_id(ctx.partition_spec_id)
163 .with_equality_ids(ctx.manifest_entry.data_file.equality_ids.clone())
164 .build()
165 }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TypedBuilder)]
170#[builder(field_defaults(setter(prefix = "with_")))]
171pub struct FileScanTaskDeleteFile {
172 pub file_path: String,
174
175 pub file_size_in_bytes: u64,
177
178 pub file_type: DataContentType,
180
181 pub partition_spec_id: i32,
183
184 #[builder(default)]
186 pub equality_ids: Option<Vec<i32>>,
187}