iceberg/inspect/
manifests.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::RecordBatch;
22use arrow_array::builder::{
23    BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
24};
25use arrow_array::types::{Int32Type, Int64Type};
26use arrow_schema::{DataType, Field, Fields};
27use futures::{StreamExt, stream};
28
29use crate::Result;
30use crate::arrow::schema_to_arrow_schema;
31use crate::scan::ArrowRecordBatchStream;
32use crate::spec::{Datum, FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
33use crate::table::Table;
34
35/// Manifests table.
36pub struct ManifestsTable<'a> {
37    table: &'a Table,
38}
39
40impl<'a> ManifestsTable<'a> {
41    /// Create a new Manifests table instance.
42    pub fn new(table: &'a Table) -> Self {
43        Self { table }
44    }
45
46    /// Returns the iceberg schema of the manifests table.
47    pub fn schema(&self) -> crate::spec::Schema {
48        let fields = vec![
49            NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
50            NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
51            NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
52            NestedField::new(
53                3,
54                "partition_spec_id",
55                Type::Primitive(PrimitiveType::Int),
56                true,
57            ),
58            NestedField::new(
59                4,
60                "added_snapshot_id",
61                Type::Primitive(PrimitiveType::Long),
62                true,
63            ),
64            NestedField::new(
65                5,
66                "added_data_files_count",
67                Type::Primitive(PrimitiveType::Int),
68                true,
69            ),
70            NestedField::new(
71                6,
72                "existing_data_files_count",
73                Type::Primitive(PrimitiveType::Int),
74                true,
75            ),
76            NestedField::new(
77                7,
78                "deleted_data_files_count",
79                Type::Primitive(PrimitiveType::Int),
80                true,
81            ),
82            NestedField::new(
83                15,
84                "added_delete_files_count",
85                Type::Primitive(PrimitiveType::Int),
86                true,
87            ),
88            NestedField::new(
89                16,
90                "existing_delete_files_count",
91                Type::Primitive(PrimitiveType::Int),
92                true,
93            ),
94            NestedField::new(
95                17,
96                "deleted_delete_files_count",
97                Type::Primitive(PrimitiveType::Int),
98                true,
99            ),
100            NestedField::new(
101                8,
102                "partition_summaries",
103                Type::List(ListType {
104                    element_field: Arc::new(NestedField::new(
105                        9,
106                        "item",
107                        Type::Struct(StructType::new(vec![
108                            Arc::new(NestedField::new(
109                                10,
110                                "contains_null",
111                                Type::Primitive(PrimitiveType::Boolean),
112                                true,
113                            )),
114                            Arc::new(NestedField::new(
115                                11,
116                                "contains_nan",
117                                Type::Primitive(PrimitiveType::Boolean),
118                                false,
119                            )),
120                            Arc::new(NestedField::new(
121                                12,
122                                "lower_bound",
123                                Type::Primitive(PrimitiveType::String),
124                                false,
125                            )),
126                            Arc::new(NestedField::new(
127                                13,
128                                "upper_bound",
129                                Type::Primitive(PrimitiveType::String),
130                                false,
131                            )),
132                        ])),
133                        true,
134                    )),
135                }),
136                true,
137            ),
138        ];
139
140        crate::spec::Schema::builder()
141            .with_fields(fields.into_iter().map(|f| f.into()))
142            .build()
143            .unwrap()
144    }
145
146    /// Scans the manifests table.
147    pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
148        let schema = schema_to_arrow_schema(&self.schema())?;
149
150        let mut content = PrimitiveBuilder::<Int32Type>::new();
151        let mut path = StringBuilder::new();
152        let mut length = PrimitiveBuilder::<Int64Type>::new();
153        let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
154        let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
155        let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
156        let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
157        let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
158        let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
159        let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
160        let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
161        let mut partition_summaries = self.partition_summary_builder()?;
162
163        if let Some(snapshot) = self.table.metadata().current_snapshot() {
164            let manifest_list = self.table.manifest_list_reader(snapshot).load().await?;
165            for manifest in manifest_list.entries() {
166                content.append_value(manifest.content as i32);
167                path.append_value(manifest.manifest_path.clone());
168                length.append_value(manifest.manifest_length);
169                partition_spec_id.append_value(manifest.partition_spec_id);
170                added_snapshot_id.append_value(manifest.added_snapshot_id);
171                added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
172                existing_data_files_count
173                    .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
174                deleted_data_files_count
175                    .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
176                added_delete_files_count
177                    .append_value(manifest.added_files_count.unwrap_or(0) as i32);
178                existing_delete_files_count
179                    .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
180                deleted_delete_files_count
181                    .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
182
183                let spec = self
184                    .table
185                    .metadata()
186                    .partition_spec_by_id(manifest.partition_spec_id)
187                    .unwrap();
188                let spec_struct = spec
189                    .partition_type(self.table.metadata().current_schema())
190                    .unwrap();
191                self.append_partition_summaries(
192                    &mut partition_summaries,
193                    &manifest.partitions.clone().unwrap_or_else(Vec::new),
194                    spec_struct,
195                );
196            }
197        }
198
199        let batch = RecordBatch::try_new(Arc::new(schema), vec![
200            Arc::new(content.finish()),
201            Arc::new(path.finish()),
202            Arc::new(length.finish()),
203            Arc::new(partition_spec_id.finish()),
204            Arc::new(added_snapshot_id.finish()),
205            Arc::new(added_data_files_count.finish()),
206            Arc::new(existing_data_files_count.finish()),
207            Arc::new(deleted_data_files_count.finish()),
208            Arc::new(added_delete_files_count.finish()),
209            Arc::new(existing_delete_files_count.finish()),
210            Arc::new(deleted_delete_files_count.finish()),
211            Arc::new(partition_summaries.finish()),
212        ])?;
213        Ok(stream::iter(vec![Ok(batch)]).boxed())
214    }
215
216    fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
217        let schema = schema_to_arrow_schema(&self.schema())?;
218        let partition_summary_fields =
219            match schema.field_with_name("partition_summaries")?.data_type() {
220                DataType::List(list_type) => match list_type.data_type() {
221                    DataType::Struct(fields) => fields.to_vec(),
222                    _ => unreachable!(),
223                },
224                _ => unreachable!(),
225            };
226
227        let partition_summaries = ListBuilder::new(StructBuilder::from_fields(
228            Fields::from(partition_summary_fields.clone()),
229            0,
230        ))
231        .with_field(Arc::new(
232            Field::new_struct("item", partition_summary_fields, false).with_metadata(
233                HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
234            ),
235        ));
236
237        Ok(partition_summaries)
238    }
239
240    fn append_partition_summaries(
241        &self,
242        builder: &mut GenericListBuilder<i32, StructBuilder>,
243        partitions: &[FieldSummary],
244        partition_struct: StructType,
245    ) {
246        let partition_summaries_builder = builder.values();
247        for (summary, field) in partitions.iter().zip(partition_struct.fields()) {
248            partition_summaries_builder
249                .field_builder::<BooleanBuilder>(0)
250                .unwrap()
251                .append_value(summary.contains_null);
252            partition_summaries_builder
253                .field_builder::<BooleanBuilder>(1)
254                .unwrap()
255                .append_option(summary.contains_nan);
256
257            partition_summaries_builder
258                .field_builder::<StringBuilder>(2)
259                .unwrap()
260                .append_option(summary.lower_bound.as_ref().map(|v| {
261                    Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
262                        .unwrap()
263                        .to_string()
264                }));
265            partition_summaries_builder
266                .field_builder::<StringBuilder>(3)
267                .unwrap()
268                .append_option(summary.upper_bound.as_ref().map(|v| {
269                    Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
270                        .unwrap()
271                        .to_string()
272                }));
273            partition_summaries_builder.append(true);
274        }
275        builder.append(true);
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use expect_test::expect;
282    use futures::TryStreamExt;
283
284    use crate::scan::tests::TableTestFixture;
285    use crate::test_utils::check_record_batches;
286
287    #[tokio::test]
288    async fn test_manifests_table() {
289        let mut fixture = TableTestFixture::new();
290        fixture.setup_manifest_files().await;
291
292        let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
293
294        check_record_batches(
295            record_batch.try_collect::<Vec<_>>().await.unwrap(),
296            expect![[r#"
297                Field { "content": Int32, metadata: {"PARQUET:field_id": "14"} },
298                Field { "path": Utf8, metadata: {"PARQUET:field_id": "1"} },
299                Field { "length": Int64, metadata: {"PARQUET:field_id": "2"} },
300                Field { "partition_spec_id": Int32, metadata: {"PARQUET:field_id": "3"} },
301                Field { "added_snapshot_id": Int64, metadata: {"PARQUET:field_id": "4"} },
302                Field { "added_data_files_count": Int32, metadata: {"PARQUET:field_id": "5"} },
303                Field { "existing_data_files_count": Int32, metadata: {"PARQUET:field_id": "6"} },
304                Field { "deleted_data_files_count": Int32, metadata: {"PARQUET:field_id": "7"} },
305                Field { "added_delete_files_count": Int32, metadata: {"PARQUET:field_id": "15"} },
306                Field { "existing_delete_files_count": Int32, metadata: {"PARQUET:field_id": "16"} },
307                Field { "deleted_delete_files_count": Int32, metadata: {"PARQUET:field_id": "17"} },
308                Field { "partition_summaries": List(non-null Struct("contains_null": non-null Boolean, metadata: {"PARQUET:field_id": "10"}, "contains_nan": Boolean, metadata: {"PARQUET:field_id": "11"}, "lower_bound": Utf8, metadata: {"PARQUET:field_id": "12"}, "upper_bound": Utf8, metadata: {"PARQUET:field_id": "13"}), metadata: {"PARQUET:field_id": "9"}), metadata: {"PARQUET:field_id": "8"} }"#]],
309            expect![[r#"
310                content: PrimitiveArray<Int32>
311                [
312                  0,
313                ],
314                path: (skipped),
315                length: (skipped),
316                partition_spec_id: PrimitiveArray<Int32>
317                [
318                  0,
319                ],
320                added_snapshot_id: PrimitiveArray<Int64>
321                [
322                  3055729675574597004,
323                ],
324                added_data_files_count: PrimitiveArray<Int32>
325                [
326                  1,
327                ],
328                existing_data_files_count: PrimitiveArray<Int32>
329                [
330                  1,
331                ],
332                deleted_data_files_count: PrimitiveArray<Int32>
333                [
334                  1,
335                ],
336                added_delete_files_count: PrimitiveArray<Int32>
337                [
338                  1,
339                ],
340                existing_delete_files_count: PrimitiveArray<Int32>
341                [
342                  1,
343                ],
344                deleted_delete_files_count: PrimitiveArray<Int32>
345                [
346                  1,
347                ],
348                partition_summaries: ListArray
349                [
350                  StructArray
351                -- validity:
352                [
353                  valid,
354                ]
355                [
356                -- child 0: "contains_null" (Boolean)
357                BooleanArray
358                [
359                  false,
360                ]
361                -- child 1: "contains_nan" (Boolean)
362                BooleanArray
363                [
364                  false,
365                ]
366                -- child 2: "lower_bound" (Utf8)
367                StringArray
368                [
369                  "100",
370                ]
371                -- child 3: "upper_bound" (Utf8)
372                StringArray
373                [
374                  "300",
375                ]
376                ],
377                ]"#]],
378            &["path", "length"],
379            Some("path"),
380        );
381    }
382}