1use tokio::sync::OnceCell;
19
20use super::validate_puffin_compression;
21use crate::Result;
22use crate::io::{FileRead, InputFile};
23use crate::puffin::blob::Blob;
24use crate::puffin::metadata::{BlobMetadata, FileMetadata};
25
26pub struct PuffinReader {
28 input_file: InputFile,
29 file_metadata: OnceCell<FileMetadata>,
30}
31
32impl PuffinReader {
33 pub fn new(input_file: InputFile) -> Self {
35 Self {
36 input_file,
37 file_metadata: OnceCell::new(),
38 }
39 }
40
41 pub async fn file_metadata(&self) -> Result<&FileMetadata> {
43 self.file_metadata
44 .get_or_try_init(|| FileMetadata::read(&self.input_file))
45 .await
46 }
47
48 pub async fn blob(&self, blob_metadata: &BlobMetadata) -> Result<Blob> {
50 validate_puffin_compression(blob_metadata.compression_codec)?;
51
52 let file_read = self.input_file.reader().await?;
53 let start = blob_metadata.offset;
54 let end = start + blob_metadata.length;
55 let bytes = file_read.read(start..end).await?;
56 let data = blob_metadata.compression_codec.decompress(bytes.to_vec())?;
57
58 Ok(Blob {
59 r#type: blob_metadata.r#type.clone(),
60 fields: blob_metadata.fields.clone(),
61 snapshot_id: blob_metadata.snapshot_id,
62 sequence_number: blob_metadata.sequence_number,
63 data,
64 properties: blob_metadata.properties.clone(),
65 })
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use std::collections::HashMap;
72
73 use crate::ErrorKind;
74 use crate::compression::CompressionCodec;
75 use crate::puffin::metadata::BlobMetadata;
76 use crate::puffin::reader::PuffinReader;
77 use crate::puffin::test_utils::{
78 blob_0, blob_1, java_uncompressed_metric_input_file,
79 java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
80 zstd_compressed_metric_file_metadata,
81 };
82
83 #[tokio::test]
84 async fn test_puffin_reader_uncompressed_metric_data() {
85 let input_file = java_uncompressed_metric_input_file();
86 let puffin_reader = PuffinReader::new(input_file);
87
88 let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
89 assert_eq!(file_metadata, uncompressed_metric_file_metadata());
90
91 assert_eq!(
92 puffin_reader
93 .blob(file_metadata.blobs.first().unwrap())
94 .await
95 .unwrap(),
96 blob_0()
97 );
98
99 assert_eq!(
100 puffin_reader
101 .blob(file_metadata.blobs.get(1).unwrap())
102 .await
103 .unwrap(),
104 blob_1(),
105 )
106 }
107
108 #[tokio::test]
109 async fn test_puffin_reader_zstd_compressed_metric_data() {
110 let input_file = java_zstd_compressed_metric_input_file();
111 let puffin_reader = PuffinReader::new(input_file);
112
113 let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
114 assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
115
116 assert_eq!(
117 puffin_reader
118 .blob(file_metadata.blobs.first().unwrap())
119 .await
120 .unwrap(),
121 blob_0()
122 );
123
124 assert_eq!(
125 puffin_reader
126 .blob(file_metadata.blobs.get(1).unwrap())
127 .await
128 .unwrap(),
129 blob_1(),
130 )
131 }
132
133 #[tokio::test]
134 async fn test_gzip_compression_rejected_on_blob_access() {
135 let input_file = java_uncompressed_metric_input_file();
137 let reader = PuffinReader::new(input_file);
138
139 let gzip_blob_metadata = BlobMetadata {
141 r#type: "test-type".to_string(),
142 fields: vec![1],
143 snapshot_id: 1,
144 sequence_number: 1,
145 offset: 4,
146 length: 10,
147 compression_codec: CompressionCodec::Gzip,
148 properties: HashMap::new(),
149 };
150
151 let result = reader.blob(&gzip_blob_metadata).await;
153 assert!(result.is_err());
154 let err = result.unwrap_err();
155 assert_eq!(err.kind(), ErrorKind::DataInvalid);
156 assert!(err.to_string().contains("Gzip"));
157 assert!(
158 err.to_string()
159 .contains("is not supported for Puffin files")
160 );
161 }
162}