iceberg/puffin/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use tokio::sync::OnceCell;
19
20use crate::Result;
21use crate::io::{FileRead, InputFile};
22use crate::puffin::blob::Blob;
23use crate::puffin::metadata::{BlobMetadata, FileMetadata};
24
25/// Puffin reader
26pub struct PuffinReader {
27    input_file: InputFile,
28    file_metadata: OnceCell<FileMetadata>,
29}
30
31impl PuffinReader {
32    /// Returns a new Puffin reader
33    pub fn new(input_file: InputFile) -> Self {
34        Self {
35            input_file,
36            file_metadata: OnceCell::new(),
37        }
38    }
39
40    /// Returns file metadata
41    pub async fn file_metadata(&self) -> Result<&FileMetadata> {
42        self.file_metadata
43            .get_or_try_init(|| FileMetadata::read(&self.input_file))
44            .await
45    }
46
47    /// Returns blob
48    pub async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
49        let file_read = self.input_file.reader().await?;
50        let start = blob_metadata.offset;
51        let end = start + blob_metadata.length;
52        let bytes = file_read.read(start..end).await?.to_vec();
53        let data = blob_metadata.compression_codec.decompress(bytes)?;
54
55        Ok(Blob {
56            r#type: blob_metadata.r#type.clone(),
57            fields: blob_metadata.fields.clone(),
58            snapshot_id: blob_metadata.snapshot_id,
59            sequence_number: blob_metadata.sequence_number,
60            data,
61            properties: blob_metadata.properties.clone(),
62        })
63    }
64}
65
66#[cfg(test)]
67mod tests {
68
69    use crate::puffin::reader::PuffinReader;
70    use crate::puffin::test_utils::{
71        blob_0, blob_1, java_uncompressed_metric_input_file,
72        java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
73        zstd_compressed_metric_file_metadata,
74    };
75
76    #[tokio::test]
77    async fn test_puffin_reader_uncompressed_metric_data() {
78        let input_file = java_uncompressed_metric_input_file();
79        let puffin_reader = PuffinReader::new(input_file);
80
81        let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
82        assert_eq!(file_metadata, uncompressed_metric_file_metadata());
83
84        assert_eq!(
85            puffin_reader
86                .blob(file_metadata.blobs.first().unwrap())
87                .await
88                .unwrap(),
89            blob_0()
90        );
91
92        assert_eq!(
93            puffin_reader
94                .blob(file_metadata.blobs.get(1).unwrap())
95                .await
96                .unwrap(),
97            blob_1(),
98        )
99    }
100
101    #[tokio::test]
102    async fn test_puffin_reader_zstd_compressed_metric_data() {
103        let input_file = java_zstd_compressed_metric_input_file();
104        let puffin_reader = PuffinReader::new(input_file);
105
106        let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
107        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
108
109        assert_eq!(
110            puffin_reader
111                .blob(file_metadata.blobs.first().unwrap())
112                .await
113                .unwrap(),
114            blob_0()
115        );
116
117        assert_eq!(
118            puffin_reader
119                .blob(file_metadata.blobs.get(1).unwrap())
120                .await
121                .unwrap(),
122            blob_1(),
123        )
124    }
125}