1use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::RecordBatch;
22use arrow_array::builder::{
23 BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
24};
25use arrow_array::types::{Int32Type, Int64Type};
26use arrow_schema::{DataType, Field, Fields};
27use futures::{StreamExt, stream};
28
29use crate::Result;
30use crate::arrow::schema_to_arrow_schema;
31use crate::scan::ArrowRecordBatchStream;
32use crate::spec::{Datum, FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
33use crate::table::Table;
34
35pub struct ManifestsTable<'a> {
37 table: &'a Table,
38}
39
40impl<'a> ManifestsTable<'a> {
41 pub fn new(table: &'a Table) -> Self {
43 Self { table }
44 }
45
46 pub fn schema(&self) -> crate::spec::Schema {
48 let fields = vec![
49 NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
50 NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
51 NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
52 NestedField::new(
53 3,
54 "partition_spec_id",
55 Type::Primitive(PrimitiveType::Int),
56 true,
57 ),
58 NestedField::new(
59 4,
60 "added_snapshot_id",
61 Type::Primitive(PrimitiveType::Long),
62 true,
63 ),
64 NestedField::new(
65 5,
66 "added_data_files_count",
67 Type::Primitive(PrimitiveType::Int),
68 true,
69 ),
70 NestedField::new(
71 6,
72 "existing_data_files_count",
73 Type::Primitive(PrimitiveType::Int),
74 true,
75 ),
76 NestedField::new(
77 7,
78 "deleted_data_files_count",
79 Type::Primitive(PrimitiveType::Int),
80 true,
81 ),
82 NestedField::new(
83 15,
84 "added_delete_files_count",
85 Type::Primitive(PrimitiveType::Int),
86 true,
87 ),
88 NestedField::new(
89 16,
90 "existing_delete_files_count",
91 Type::Primitive(PrimitiveType::Int),
92 true,
93 ),
94 NestedField::new(
95 17,
96 "deleted_delete_files_count",
97 Type::Primitive(PrimitiveType::Int),
98 true,
99 ),
100 NestedField::new(
101 8,
102 "partition_summaries",
103 Type::List(ListType {
104 element_field: Arc::new(NestedField::new(
105 9,
106 "item",
107 Type::Struct(StructType::new(vec![
108 Arc::new(NestedField::new(
109 10,
110 "contains_null",
111 Type::Primitive(PrimitiveType::Boolean),
112 true,
113 )),
114 Arc::new(NestedField::new(
115 11,
116 "contains_nan",
117 Type::Primitive(PrimitiveType::Boolean),
118 false,
119 )),
120 Arc::new(NestedField::new(
121 12,
122 "lower_bound",
123 Type::Primitive(PrimitiveType::String),
124 false,
125 )),
126 Arc::new(NestedField::new(
127 13,
128 "upper_bound",
129 Type::Primitive(PrimitiveType::String),
130 false,
131 )),
132 ])),
133 true,
134 )),
135 }),
136 true,
137 ),
138 ];
139
140 crate::spec::Schema::builder()
141 .with_fields(fields.into_iter().map(|f| f.into()))
142 .build()
143 .unwrap()
144 }
145
146 pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
148 let schema = schema_to_arrow_schema(&self.schema())?;
149
150 let mut content = PrimitiveBuilder::<Int32Type>::new();
151 let mut path = StringBuilder::new();
152 let mut length = PrimitiveBuilder::<Int64Type>::new();
153 let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
154 let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
155 let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
156 let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
157 let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
158 let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
159 let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
160 let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
161 let mut partition_summaries = self.partition_summary_builder()?;
162
163 if let Some(snapshot) = self.table.metadata().current_snapshot() {
164 let manifest_list = self.table.manifest_list_reader(snapshot).load().await?;
165 for manifest in manifest_list.entries() {
166 content.append_value(manifest.content as i32);
167 path.append_value(manifest.manifest_path.clone());
168 length.append_value(manifest.manifest_length);
169 partition_spec_id.append_value(manifest.partition_spec_id);
170 added_snapshot_id.append_value(manifest.added_snapshot_id);
171 added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
172 existing_data_files_count
173 .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
174 deleted_data_files_count
175 .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
176 added_delete_files_count
177 .append_value(manifest.added_files_count.unwrap_or(0) as i32);
178 existing_delete_files_count
179 .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
180 deleted_delete_files_count
181 .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
182
183 let spec = self
184 .table
185 .metadata()
186 .partition_spec_by_id(manifest.partition_spec_id)
187 .unwrap();
188 let spec_struct = spec
189 .partition_type(self.table.metadata().current_schema())
190 .unwrap();
191 self.append_partition_summaries(
192 &mut partition_summaries,
193 &manifest.partitions.clone().unwrap_or_else(Vec::new),
194 spec_struct,
195 );
196 }
197 }
198
199 let batch = RecordBatch::try_new(Arc::new(schema), vec![
200 Arc::new(content.finish()),
201 Arc::new(path.finish()),
202 Arc::new(length.finish()),
203 Arc::new(partition_spec_id.finish()),
204 Arc::new(added_snapshot_id.finish()),
205 Arc::new(added_data_files_count.finish()),
206 Arc::new(existing_data_files_count.finish()),
207 Arc::new(deleted_data_files_count.finish()),
208 Arc::new(added_delete_files_count.finish()),
209 Arc::new(existing_delete_files_count.finish()),
210 Arc::new(deleted_delete_files_count.finish()),
211 Arc::new(partition_summaries.finish()),
212 ])?;
213 Ok(stream::iter(vec![Ok(batch)]).boxed())
214 }
215
216 fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
217 let schema = schema_to_arrow_schema(&self.schema())?;
218 let partition_summary_fields =
219 match schema.field_with_name("partition_summaries")?.data_type() {
220 DataType::List(list_type) => match list_type.data_type() {
221 DataType::Struct(fields) => fields.to_vec(),
222 _ => unreachable!(),
223 },
224 _ => unreachable!(),
225 };
226
227 let partition_summaries = ListBuilder::new(StructBuilder::from_fields(
228 Fields::from(partition_summary_fields.clone()),
229 0,
230 ))
231 .with_field(Arc::new(
232 Field::new_struct("item", partition_summary_fields, false).with_metadata(
233 HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
234 ),
235 ));
236
237 Ok(partition_summaries)
238 }
239
240 fn append_partition_summaries(
241 &self,
242 builder: &mut GenericListBuilder<i32, StructBuilder>,
243 partitions: &[FieldSummary],
244 partition_struct: StructType,
245 ) {
246 let partition_summaries_builder = builder.values();
247 for (summary, field) in partitions.iter().zip(partition_struct.fields()) {
248 partition_summaries_builder
249 .field_builder::<BooleanBuilder>(0)
250 .unwrap()
251 .append_value(summary.contains_null);
252 partition_summaries_builder
253 .field_builder::<BooleanBuilder>(1)
254 .unwrap()
255 .append_option(summary.contains_nan);
256
257 partition_summaries_builder
258 .field_builder::<StringBuilder>(2)
259 .unwrap()
260 .append_option(summary.lower_bound.as_ref().map(|v| {
261 Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
262 .unwrap()
263 .to_string()
264 }));
265 partition_summaries_builder
266 .field_builder::<StringBuilder>(3)
267 .unwrap()
268 .append_option(summary.upper_bound.as_ref().map(|v| {
269 Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
270 .unwrap()
271 .to_string()
272 }));
273 partition_summaries_builder.append(true);
274 }
275 builder.append(true);
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use expect_test::expect;
282 use futures::TryStreamExt;
283
284 use crate::scan::tests::TableTestFixture;
285 use crate::test_utils::check_record_batches;
286
287 #[tokio::test]
288 async fn test_manifests_table() {
289 let mut fixture = TableTestFixture::new();
290 fixture.setup_manifest_files().await;
291
292 let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
293
294 check_record_batches(
295 record_batch.try_collect::<Vec<_>>().await.unwrap(),
296 expect![[r#"
297 Field { "content": Int32, metadata: {"PARQUET:field_id": "14"} },
298 Field { "path": Utf8, metadata: {"PARQUET:field_id": "1"} },
299 Field { "length": Int64, metadata: {"PARQUET:field_id": "2"} },
300 Field { "partition_spec_id": Int32, metadata: {"PARQUET:field_id": "3"} },
301 Field { "added_snapshot_id": Int64, metadata: {"PARQUET:field_id": "4"} },
302 Field { "added_data_files_count": Int32, metadata: {"PARQUET:field_id": "5"} },
303 Field { "existing_data_files_count": Int32, metadata: {"PARQUET:field_id": "6"} },
304 Field { "deleted_data_files_count": Int32, metadata: {"PARQUET:field_id": "7"} },
305 Field { "added_delete_files_count": Int32, metadata: {"PARQUET:field_id": "15"} },
306 Field { "existing_delete_files_count": Int32, metadata: {"PARQUET:field_id": "16"} },
307 Field { "deleted_delete_files_count": Int32, metadata: {"PARQUET:field_id": "17"} },
308 Field { "partition_summaries": List(non-null Struct("contains_null": non-null Boolean, metadata: {"PARQUET:field_id": "10"}, "contains_nan": Boolean, metadata: {"PARQUET:field_id": "11"}, "lower_bound": Utf8, metadata: {"PARQUET:field_id": "12"}, "upper_bound": Utf8, metadata: {"PARQUET:field_id": "13"}), metadata: {"PARQUET:field_id": "9"}), metadata: {"PARQUET:field_id": "8"} }"#]],
309 expect![[r#"
310 content: PrimitiveArray<Int32>
311 [
312 0,
313 ],
314 path: (skipped),
315 length: (skipped),
316 partition_spec_id: PrimitiveArray<Int32>
317 [
318 0,
319 ],
320 added_snapshot_id: PrimitiveArray<Int64>
321 [
322 3055729675574597004,
323 ],
324 added_data_files_count: PrimitiveArray<Int32>
325 [
326 1,
327 ],
328 existing_data_files_count: PrimitiveArray<Int32>
329 [
330 1,
331 ],
332 deleted_data_files_count: PrimitiveArray<Int32>
333 [
334 1,
335 ],
336 added_delete_files_count: PrimitiveArray<Int32>
337 [
338 1,
339 ],
340 existing_delete_files_count: PrimitiveArray<Int32>
341 [
342 1,
343 ],
344 deleted_delete_files_count: PrimitiveArray<Int32>
345 [
346 1,
347 ],
348 partition_summaries: ListArray
349 [
350 StructArray
351 -- validity:
352 [
353 valid,
354 ]
355 [
356 -- child 0: "contains_null" (Boolean)
357 BooleanArray
358 [
359 false,
360 ]
361 -- child 1: "contains_nan" (Boolean)
362 BooleanArray
363 [
364 false,
365 ]
366 -- child 2: "lower_bound" (Utf8)
367 StringArray
368 [
369 "100",
370 ]
371 -- child 3: "upper_bound" (Utf8)
372 StringArray
373 [
374 "300",
375 ]
376 ],
377 ]"#]],
378 &["path", "length"],
379 Some("path"),
380 );
381 }
382}