iceberg/inspect/
manifests.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::RecordBatch;
22use arrow_array::builder::{
23    BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
24};
25use arrow_array::types::{Int32Type, Int64Type};
26use arrow_schema::{DataType, Field, Fields};
27use futures::{StreamExt, stream};
28
29use crate::Result;
30use crate::arrow::schema_to_arrow_schema;
31use crate::scan::ArrowRecordBatchStream;
32use crate::spec::{Datum, FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
33use crate::table::Table;
34
35/// Manifests table.
36pub struct ManifestsTable<'a> {
37    table: &'a Table,
38}
39
40impl<'a> ManifestsTable<'a> {
41    /// Create a new Manifests table instance.
42    pub fn new(table: &'a Table) -> Self {
43        Self { table }
44    }
45
46    /// Returns the iceberg schema of the manifests table.
47    pub fn schema(&self) -> crate::spec::Schema {
48        let fields = vec![
49            NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
50            NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
51            NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
52            NestedField::new(
53                3,
54                "partition_spec_id",
55                Type::Primitive(PrimitiveType::Int),
56                true,
57            ),
58            NestedField::new(
59                4,
60                "added_snapshot_id",
61                Type::Primitive(PrimitiveType::Long),
62                true,
63            ),
64            NestedField::new(
65                5,
66                "added_data_files_count",
67                Type::Primitive(PrimitiveType::Int),
68                true,
69            ),
70            NestedField::new(
71                6,
72                "existing_data_files_count",
73                Type::Primitive(PrimitiveType::Int),
74                true,
75            ),
76            NestedField::new(
77                7,
78                "deleted_data_files_count",
79                Type::Primitive(PrimitiveType::Int),
80                true,
81            ),
82            NestedField::new(
83                15,
84                "added_delete_files_count",
85                Type::Primitive(PrimitiveType::Int),
86                true,
87            ),
88            NestedField::new(
89                16,
90                "existing_delete_files_count",
91                Type::Primitive(PrimitiveType::Int),
92                true,
93            ),
94            NestedField::new(
95                17,
96                "deleted_delete_files_count",
97                Type::Primitive(PrimitiveType::Int),
98                true,
99            ),
100            NestedField::new(
101                8,
102                "partition_summaries",
103                Type::List(ListType {
104                    element_field: Arc::new(NestedField::new(
105                        9,
106                        "item",
107                        Type::Struct(StructType::new(vec![
108                            Arc::new(NestedField::new(
109                                10,
110                                "contains_null",
111                                Type::Primitive(PrimitiveType::Boolean),
112                                true,
113                            )),
114                            Arc::new(NestedField::new(
115                                11,
116                                "contains_nan",
117                                Type::Primitive(PrimitiveType::Boolean),
118                                false,
119                            )),
120                            Arc::new(NestedField::new(
121                                12,
122                                "lower_bound",
123                                Type::Primitive(PrimitiveType::String),
124                                false,
125                            )),
126                            Arc::new(NestedField::new(
127                                13,
128                                "upper_bound",
129                                Type::Primitive(PrimitiveType::String),
130                                false,
131                            )),
132                        ])),
133                        true,
134                    )),
135                }),
136                true,
137            ),
138        ];
139
140        crate::spec::Schema::builder()
141            .with_fields(fields.into_iter().map(|f| f.into()))
142            .build()
143            .unwrap()
144    }
145
146    /// Scans the manifests table.
147    pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
148        let schema = schema_to_arrow_schema(&self.schema())?;
149
150        let mut content = PrimitiveBuilder::<Int32Type>::new();
151        let mut path = StringBuilder::new();
152        let mut length = PrimitiveBuilder::<Int64Type>::new();
153        let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
154        let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
155        let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
156        let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
157        let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
158        let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
159        let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
160        let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
161        let mut partition_summaries = self.partition_summary_builder()?;
162
163        if let Some(snapshot) = self.table.metadata().current_snapshot() {
164            let manifest_list = snapshot
165                .load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
166                .await?;
167            for manifest in manifest_list.entries() {
168                content.append_value(manifest.content as i32);
169                path.append_value(manifest.manifest_path.clone());
170                length.append_value(manifest.manifest_length);
171                partition_spec_id.append_value(manifest.partition_spec_id);
172                added_snapshot_id.append_value(manifest.added_snapshot_id);
173                added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
174                existing_data_files_count
175                    .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
176                deleted_data_files_count
177                    .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
178                added_delete_files_count
179                    .append_value(manifest.added_files_count.unwrap_or(0) as i32);
180                existing_delete_files_count
181                    .append_value(manifest.existing_files_count.unwrap_or(0) as i32);
182                deleted_delete_files_count
183                    .append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
184
185                let spec = self
186                    .table
187                    .metadata()
188                    .partition_spec_by_id(manifest.partition_spec_id)
189                    .unwrap();
190                let spec_struct = spec
191                    .partition_type(self.table.metadata().current_schema())
192                    .unwrap();
193                self.append_partition_summaries(
194                    &mut partition_summaries,
195                    &manifest.partitions.clone().unwrap_or_else(Vec::new),
196                    spec_struct,
197                );
198            }
199        }
200
201        let batch = RecordBatch::try_new(Arc::new(schema), vec![
202            Arc::new(content.finish()),
203            Arc::new(path.finish()),
204            Arc::new(length.finish()),
205            Arc::new(partition_spec_id.finish()),
206            Arc::new(added_snapshot_id.finish()),
207            Arc::new(added_data_files_count.finish()),
208            Arc::new(existing_data_files_count.finish()),
209            Arc::new(deleted_data_files_count.finish()),
210            Arc::new(added_delete_files_count.finish()),
211            Arc::new(existing_delete_files_count.finish()),
212            Arc::new(deleted_delete_files_count.finish()),
213            Arc::new(partition_summaries.finish()),
214        ])?;
215        Ok(stream::iter(vec![Ok(batch)]).boxed())
216    }
217
218    fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
219        let schema = schema_to_arrow_schema(&self.schema())?;
220        let partition_summary_fields =
221            match schema.field_with_name("partition_summaries")?.data_type() {
222                DataType::List(list_type) => match list_type.data_type() {
223                    DataType::Struct(fields) => fields.to_vec(),
224                    _ => unreachable!(),
225                },
226                _ => unreachable!(),
227            };
228
229        let partition_summaries = ListBuilder::new(StructBuilder::from_fields(
230            Fields::from(partition_summary_fields.clone()),
231            0,
232        ))
233        .with_field(Arc::new(
234            Field::new_struct("item", partition_summary_fields, false).with_metadata(
235                HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
236            ),
237        ));
238
239        Ok(partition_summaries)
240    }
241
242    fn append_partition_summaries(
243        &self,
244        builder: &mut GenericListBuilder<i32, StructBuilder>,
245        partitions: &[FieldSummary],
246        partition_struct: StructType,
247    ) {
248        let partition_summaries_builder = builder.values();
249        for (summary, field) in partitions.iter().zip(partition_struct.fields()) {
250            partition_summaries_builder
251                .field_builder::<BooleanBuilder>(0)
252                .unwrap()
253                .append_value(summary.contains_null);
254            partition_summaries_builder
255                .field_builder::<BooleanBuilder>(1)
256                .unwrap()
257                .append_option(summary.contains_nan);
258
259            partition_summaries_builder
260                .field_builder::<StringBuilder>(2)
261                .unwrap()
262                .append_option(summary.lower_bound.as_ref().map(|v| {
263                    Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
264                        .unwrap()
265                        .to_string()
266                }));
267            partition_summaries_builder
268                .field_builder::<StringBuilder>(3)
269                .unwrap()
270                .append_option(summary.upper_bound.as_ref().map(|v| {
271                    Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
272                        .unwrap()
273                        .to_string()
274                }));
275            partition_summaries_builder.append(true);
276        }
277        builder.append(true);
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use expect_test::expect;
284    use futures::TryStreamExt;
285
286    use crate::scan::tests::TableTestFixture;
287    use crate::test_utils::check_record_batches;
288
289    #[tokio::test]
290    async fn test_manifests_table() {
291        let mut fixture = TableTestFixture::new();
292        fixture.setup_manifest_files().await;
293
294        let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
295
296        check_record_batches(
297            record_batch.try_collect::<Vec<_>>().await.unwrap(),
298            expect![[r#"
299                Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
300                Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
301                Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
302                Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
303                Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
304                Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
305                Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} },
306                Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} },
307                Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} },
308                Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} },
309                Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} },
310                Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]],
311            expect![[r#"
312                content: PrimitiveArray<Int32>
313                [
314                  0,
315                ],
316                path: (skipped),
317                length: (skipped),
318                partition_spec_id: PrimitiveArray<Int32>
319                [
320                  0,
321                ],
322                added_snapshot_id: PrimitiveArray<Int64>
323                [
324                  3055729675574597004,
325                ],
326                added_data_files_count: PrimitiveArray<Int32>
327                [
328                  1,
329                ],
330                existing_data_files_count: PrimitiveArray<Int32>
331                [
332                  1,
333                ],
334                deleted_data_files_count: PrimitiveArray<Int32>
335                [
336                  1,
337                ],
338                added_delete_files_count: PrimitiveArray<Int32>
339                [
340                  1,
341                ],
342                existing_delete_files_count: PrimitiveArray<Int32>
343                [
344                  1,
345                ],
346                deleted_delete_files_count: PrimitiveArray<Int32>
347                [
348                  1,
349                ],
350                partition_summaries: ListArray
351                [
352                  StructArray
353                -- validity:
354                [
355                  valid,
356                ]
357                [
358                -- child 0: "contains_null" (Boolean)
359                BooleanArray
360                [
361                  false,
362                ]
363                -- child 1: "contains_nan" (Boolean)
364                BooleanArray
365                [
366                  false,
367                ]
368                -- child 2: "lower_bound" (Utf8)
369                StringArray
370                [
371                  "100",
372                ]
373                -- child 3: "upper_bound" (Utf8)
374                StringArray
375                [
376                  "300",
377                ]
378                ],
379                ]"#]],
380            &["path", "length"],
381            Some("path"),
382        );
383    }
384}