1use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::RecordBatch;
22use arrow_array::builder::{
23 BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
24};
25use arrow_array::types::{Int32Type, Int64Type};
26use arrow_schema::{DataType, Field, Fields};
27use futures::{StreamExt, stream};
28
29use crate::Result;
30use crate::arrow::schema_to_arrow_schema;
31use crate::scan::ArrowRecordBatchStream;
32use crate::spec::{Datum, FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
33use crate::table::Table;
34
35pub struct ManifestsTable<'a> {
37 table: &'a Table,
38}
39
40impl<'a> ManifestsTable<'a> {
41 pub fn new(table: &'a Table) -> Self {
43 Self { table }
44 }
45
46 pub fn schema(&self) -> crate::spec::Schema {
48 let fields = vec![
49 NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
50 NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
51 NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
52 NestedField::new(
53 3,
54 "partition_spec_id",
55 Type::Primitive(PrimitiveType::Int),
56 true,
57 ),
58 NestedField::new(
59 4,
60 "added_snapshot_id",
61 Type::Primitive(PrimitiveType::Long),
62 true,
63 ),
64 NestedField::new(
65 5,
66 "added_data_files_count",
67 Type::Primitive(PrimitiveType::Int),
68 true,
69 ),
70 NestedField::new(
71 6,
72 "existing_data_files_count",
73 Type::Primitive(PrimitiveType::Int),
74 true,
75 ),
76 NestedField::new(
77 7,
78 "deleted_data_files_count",
79 Type::Primitive(PrimitiveType::Int),
80 true,
81 ),
82 NestedField::new(
83 15,
84 "added_delete_files_count",
85 Type::Primitive(PrimitiveType::Int),
86 true,
87 ),
88 NestedField::new(
89 16,
90 "existing_delete_files_count",
91 Type::Primitive(PrimitiveType::Int),
92 true,
93 ),
94 NestedField::new(
95 17,
96 "deleted_delete_files_count",
97 Type::Primitive(PrimitiveType::Int),
98 true,
99 ),
100 NestedField::new(
101 8,
102 "partition_summaries",
103 Type::List(ListType {
104 element_field: Arc::new(NestedField::new(
105 9,
106 "item",
107 Type::Struct(StructType::new(vec![
108 Arc::new(NestedField::new(
109 10,
110 "contains_null",
111 Type::Primitive(PrimitiveType::Boolean),
112 true,
113 )),
114 Arc::new(NestedField::new(
115 11,
116 "contains_nan",
117 Type::Primitive(PrimitiveType::Boolean),
118 false,
119 )),
120 Arc::new(NestedField::new(
121 12,
122 "lower_bound",
123 Type::Primitive(PrimitiveType::String),
124 false,
125 )),
126 Arc::new(NestedField::new(
127 13,
128 "upper_bound",
129 Type::Primitive(PrimitiveType::String),
130 false,
131 )),
132 ])),
133 true,
134 )),
135 }),
136 true,
137 ),
138 ];
139
140 crate::spec::Schema::builder()
141 .with_fields(fields.into_iter().map(|f| f.into()))
142 .build()
143 .unwrap()
144 }
145
146 pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
148 let schema = schema_to_arrow_schema(&self.schema())?;
149
150 let mut content = PrimitiveBuilder::<Int32Type>::new();
151 let mut path = StringBuilder::new();
152 let mut length = PrimitiveBuilder::<Int64Type>::new();
153 let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
154 let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
155 let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
156 let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
157 let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
158 let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
159 let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
160 let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
161 let mut partition_summaries = self.partition_summary_builder()?;
162
163 if let Some(snapshot) = self.table.metadata().current_snapshot() {
164 let manifest_list = snapshot
165 .load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
166 .await?;
167 for manifest in manifest_list.entries() {
168 content.append_value(manifest.content as i32);
169 path.append_value(manifest.manifest_path.clone());
170 length.append_value(manifest.manifest_length);
171 partition_spec_id.append_value(manifest.partition_spec_id);
172 added_snapshot_id.append_value(manifest.added_snapshot_id);
173 added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
174 existing_data_files_count
175 .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
176 deleted_data_files_count
177 .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
178 added_delete_files_count
179 .append_value(manifest.added_files_count.unwrap_or(0) as i32);
180 existing_delete_files_count
181 .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
182 deleted_delete_files_count
183 .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
184
185 let spec = self
186 .table
187 .metadata()
188 .partition_spec_by_id(manifest.partition_spec_id)
189 .unwrap();
190 let spec_struct = spec
191 .partition_type(self.table.metadata().current_schema())
192 .unwrap();
193 self.append_partition_summaries(
194 &mut partition_summaries,
195 &manifest.partitions.clone().unwrap_or_else(Vec::new),
196 spec_struct,
197 );
198 }
199 }
200
201 let batch = RecordBatch::try_new(Arc::new(schema), vec![
202 Arc::new(content.finish()),
203 Arc::new(path.finish()),
204 Arc::new(length.finish()),
205 Arc::new(partition_spec_id.finish()),
206 Arc::new(added_snapshot_id.finish()),
207 Arc::new(added_data_files_count.finish()),
208 Arc::new(existing_data_files_count.finish()),
209 Arc::new(deleted_data_files_count.finish()),
210 Arc::new(added_delete_files_count.finish()),
211 Arc::new(existing_delete_files_count.finish()),
212 Arc::new(deleted_delete_files_count.finish()),
213 Arc::new(partition_summaries.finish()),
214 ])?;
215 Ok(stream::iter(vec![Ok(batch)]).boxed())
216 }
217
218 fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
219 let schema = schema_to_arrow_schema(&self.schema())?;
220 let partition_summary_fields =
221 match schema.field_with_name("partition_summaries")?.data_type() {
222 DataType::List(list_type) => match list_type.data_type() {
223 DataType::Struct(fields) => fields.to_vec(),
224 _ => unreachable!(),
225 },
226 _ => unreachable!(),
227 };
228
229 let partition_summaries = ListBuilder::new(StructBuilder::from_fields(
230 Fields::from(partition_summary_fields.clone()),
231 0,
232 ))
233 .with_field(Arc::new(
234 Field::new_struct("item", partition_summary_fields, false).with_metadata(
235 HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
236 ),
237 ));
238
239 Ok(partition_summaries)
240 }
241
242 fn append_partition_summaries(
243 &self,
244 builder: &mut GenericListBuilder<i32, StructBuilder>,
245 partitions: &[FieldSummary],
246 partition_struct: StructType,
247 ) {
248 let partition_summaries_builder = builder.values();
249 for (summary, field) in partitions.iter().zip(partition_struct.fields()) {
250 partition_summaries_builder
251 .field_builder::<BooleanBuilder>(0)
252 .unwrap()
253 .append_value(summary.contains_null);
254 partition_summaries_builder
255 .field_builder::<BooleanBuilder>(1)
256 .unwrap()
257 .append_option(summary.contains_nan);
258
259 partition_summaries_builder
260 .field_builder::<StringBuilder>(2)
261 .unwrap()
262 .append_option(summary.lower_bound.as_ref().map(|v| {
263 Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
264 .unwrap()
265 .to_string()
266 }));
267 partition_summaries_builder
268 .field_builder::<StringBuilder>(3)
269 .unwrap()
270 .append_option(summary.upper_bound.as_ref().map(|v| {
271 Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
272 .unwrap()
273 .to_string()
274 }));
275 partition_summaries_builder.append(true);
276 }
277 builder.append(true);
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use expect_test::expect;
284 use futures::TryStreamExt;
285
286 use crate::scan::tests::TableTestFixture;
287 use crate::test_utils::check_record_batches;
288
289 #[tokio::test]
290 async fn test_manifests_table() {
291 let mut fixture = TableTestFixture::new();
292 fixture.setup_manifest_files().await;
293
294 let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
295
296 check_record_batches(
297 record_batch.try_collect::<Vec<_>>().await.unwrap(),
298 expect![[r#"
299 Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
300 Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
301 Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
302 Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
303 Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
304 Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
305 Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} },
306 Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} },
307 Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} },
308 Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} },
309 Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} },
310 Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]],
311 expect![[r#"
312 content: PrimitiveArray<Int32>
313 [
314 0,
315 ],
316 path: (skipped),
317 length: (skipped),
318 partition_spec_id: PrimitiveArray<Int32>
319 [
320 0,
321 ],
322 added_snapshot_id: PrimitiveArray<Int64>
323 [
324 3055729675574597004,
325 ],
326 added_data_files_count: PrimitiveArray<Int32>
327 [
328 1,
329 ],
330 existing_data_files_count: PrimitiveArray<Int32>
331 [
332 1,
333 ],
334 deleted_data_files_count: PrimitiveArray<Int32>
335 [
336 1,
337 ],
338 added_delete_files_count: PrimitiveArray<Int32>
339 [
340 1,
341 ],
342 existing_delete_files_count: PrimitiveArray<Int32>
343 [
344 1,
345 ],
346 deleted_delete_files_count: PrimitiveArray<Int32>
347 [
348 1,
349 ],
350 partition_summaries: ListArray
351 [
352 StructArray
353 -- validity:
354 [
355 valid,
356 ]
357 [
358 -- child 0: "contains_null" (Boolean)
359 BooleanArray
360 [
361 false,
362 ]
363 -- child 1: "contains_nan" (Boolean)
364 BooleanArray
365 [
366 false,
367 ]
368 -- child 2: "lower_bound" (Utf8)
369 StringArray
370 [
371 "100",
372 ]
373 -- child 3: "upper_bound" (Utf8)
374 StringArray
375 [
376 "300",
377 ]
378 ],
379 ]"#]],
380 &["path", "length"],
381 Some("path"),
382 );
383 }
384}