iceberg/puffin/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use tokio::sync::OnceCell;
19
20use super::validate_puffin_compression;
21use crate::Result;
22use crate::io::{FileRead, InputFile};
23use crate::puffin::blob::Blob;
24use crate::puffin::metadata::{BlobMetadata, FileMetadata};
25
26/// Puffin reader
27pub struct PuffinReader {
28    input_file: InputFile,
29    file_metadata: OnceCell<FileMetadata>,
30}
31
32impl PuffinReader {
33    /// Returns a new Puffin reader
34    pub fn new(input_file: InputFile) -> Self {
35        Self {
36            input_file,
37            file_metadata: OnceCell::new(),
38        }
39    }
40
41    /// Returns file metadata
42    pub async fn file_metadata(&self) -> Result<&FileMetadata> {
43        self.file_metadata
44            .get_or_try_init(|| FileMetadata::read(&self.input_file))
45            .await
46    }
47
48    /// Returns blob
49    pub async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
50        validate_puffin_compression(blob_metadata.compression_codec)?;
51
52        let file_read = self.input_file.reader().await?;
53        let start = blob_metadata.offset;
54        let end = start + blob_metadata.length;
55        let bytes = file_read.read(start..end).await?;
56        let data = blob_metadata.compression_codec.decompress(bytes.to_vec())?;
57
58        Ok(Blob {
59            r#type: blob_metadata.r#type.clone(),
60            fields: blob_metadata.fields.clone(),
61            snapshot_id: blob_metadata.snapshot_id,
62            sequence_number: blob_metadata.sequence_number,
63            data,
64            properties: blob_metadata.properties.clone(),
65        })
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use std::collections::HashMap;
72
73    use crate::ErrorKind;
74    use crate::compression::CompressionCodec;
75    use crate::puffin::metadata::BlobMetadata;
76    use crate::puffin::reader::PuffinReader;
77    use crate::puffin::test_utils::{
78        blob_0, blob_1, java_uncompressed_metric_input_file,
79        java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
80        zstd_compressed_metric_file_metadata,
81    };
82
83    #[tokio::test]
84    async fn test_puffin_reader_uncompressed_metric_data() {
85        let input_file = java_uncompressed_metric_input_file();
86        let puffin_reader = PuffinReader::new(input_file);
87
88        let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
89        assert_eq!(file_metadata, uncompressed_metric_file_metadata());
90
91        assert_eq!(
92            puffin_reader
93                .blob(file_metadata.blobs.first().unwrap())
94                .await
95                .unwrap(),
96            blob_0()
97        );
98
99        assert_eq!(
100            puffin_reader
101                .blob(file_metadata.blobs.get(1).unwrap())
102                .await
103                .unwrap(),
104            blob_1(),
105        )
106    }
107
108    #[tokio::test]
109    async fn test_puffin_reader_zstd_compressed_metric_data() {
110        let input_file = java_zstd_compressed_metric_input_file();
111        let puffin_reader = PuffinReader::new(input_file);
112
113        let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
114        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
115
116        assert_eq!(
117            puffin_reader
118                .blob(file_metadata.blobs.first().unwrap())
119                .await
120                .unwrap(),
121            blob_0()
122        );
123
124        assert_eq!(
125            puffin_reader
126                .blob(file_metadata.blobs.get(1).unwrap())
127                .await
128                .unwrap(),
129            blob_1(),
130        )
131    }
132
133    #[tokio::test]
134    async fn test_gzip_compression_rejected_on_blob_access() {
135        // Use a real puffin file
136        let input_file = java_uncompressed_metric_input_file();
137        let reader = PuffinReader::new(input_file);
138
139        // Create a BlobMetadata with Gzip compression
140        let gzip_blob_metadata = BlobMetadata {
141            r#type: "test-type".to_string(),
142            fields: vec![1],
143            snapshot_id: 1,
144            sequence_number: 1,
145            offset: 4,
146            length: 10,
147            compression_codec: CompressionCodec::Gzip,
148            properties: HashMap::new(),
149        };
150
151        // Attempting to access the blob should fail
152        let result = reader.blob(&gzip_blob_metadata).await;
153        assert!(result.is_err());
154        let err = result.unwrap_err();
155        assert_eq!(err.kind(), ErrorKind::DataInvalid);
156        assert!(err.to_string().contains("Gzip"));
157        assert!(
158            err.to_string()
159                .contains("is not supported for Puffin files")
160        );
161    }
162}