1use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21
22use super::validate_puffin_compression;
23use crate::Result;
24use crate::compression::CompressionCodec;
25use crate::io::{FileWrite, OutputFile};
26use crate::puffin::blob::Blob;
27use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
28
29pub struct PuffinWriter {
31 writer: Box<dyn FileWrite>,
32 is_header_written: bool,
33 num_bytes_written: u64,
34 written_blobs_metadata: Vec<BlobMetadata>,
35 properties: HashMap<String, String>,
36 footer_compression_codec: CompressionCodec,
37 flags: HashSet<Flag>,
38}
39
40impl PuffinWriter {
41 pub async fn new(
43 output_file: &OutputFile,
44 properties: HashMap<String, String>,
45 compress_footer: bool,
46 ) -> Result<Self> {
47 let mut flags = HashSet::<Flag>::new();
48 let footer_compression_codec = if compress_footer {
49 flags.insert(Flag::FooterPayloadCompressed);
50 CompressionCodec::Lz4
51 } else {
52 CompressionCodec::None
53 };
54
55 Ok(Self {
56 writer: output_file.writer().await?,
57 is_header_written: false,
58 num_bytes_written: 0,
59 written_blobs_metadata: Vec::new(),
60 properties,
61 footer_compression_codec,
62 flags,
63 })
64 }
65
66 pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> {
68 validate_puffin_compression(compression_codec)?;
69
70 self.write_header_once().await?;
71
72 let offset = self.num_bytes_written;
73 let compressed_bytes: Bytes = compression_codec.compress(blob.data)?.into();
74 let length = compressed_bytes.len().try_into()?;
75 self.write(compressed_bytes).await?;
76 self.written_blobs_metadata.push(BlobMetadata {
77 r#type: blob.r#type,
78 fields: blob.fields,
79 snapshot_id: blob.snapshot_id,
80 sequence_number: blob.sequence_number,
81 offset,
82 length,
83 compression_codec,
84 properties: blob.properties,
85 });
86
87 Ok(())
88 }
89
90 pub async fn close(mut self) -> Result<()> {
92 self.write_header_once().await?;
93 self.write_footer().await?;
94 self.writer.close().await?;
95 Ok(())
96 }
97
98 async fn write(&mut self, bytes: Bytes) -> Result<()> {
99 let length = bytes.len();
100 self.writer.write(bytes).await?;
101 self.num_bytes_written += length as u64;
102 Ok(())
103 }
104
105 async fn write_header_once(&mut self) -> Result<()> {
106 if !self.is_header_written {
107 let bytes = Bytes::copy_from_slice(&FileMetadata::MAGIC);
108 self.write(bytes).await?;
109 self.is_header_written = true;
110 }
111 Ok(())
112 }
113
114 fn footer_payload_bytes(&self) -> Result<Vec<u8>> {
115 let file_metadata = FileMetadata {
116 blobs: self.written_blobs_metadata.clone(),
117 properties: self.properties.clone(),
118 };
119 let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
120 self.footer_compression_codec.compress(json.into_bytes())
121 }
122
123 fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize] {
124 let mut result = [0; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize];
125 for flag in &self.flags {
126 let byte_idx: usize = flag.byte_idx().into();
127 result[byte_idx] |= 0x1 << flag.bit_idx();
128 }
129 result
130 }
131
132 async fn write_footer(&mut self) -> Result<()> {
133 let mut footer_payload_bytes = self.footer_payload_bytes()?;
134 let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?);
135
136 let mut footer_bytes = Vec::new();
137 footer_bytes.extend(&FileMetadata::MAGIC);
138 footer_bytes.append(&mut footer_payload_bytes);
139 footer_bytes.extend(footer_payload_bytes_length);
140 footer_bytes.extend(self.flags_bytes());
141 footer_bytes.extend(&FileMetadata::MAGIC);
142
143 self.write(footer_bytes.into()).await
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::collections::HashMap;
150
151 use tempfile::TempDir;
152
153 use crate::compression::CompressionCodec;
154 use crate::io::{FileIO, InputFile, OutputFile};
155 use crate::puffin::blob::Blob;
156 use crate::puffin::metadata::FileMetadata;
157 use crate::puffin::reader::PuffinReader;
158 use crate::puffin::test_utils::{
159 blob_0, blob_1, empty_footer_payload, empty_footer_payload_bytes, file_properties,
160 java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
161 java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
162 zstd_compressed_metric_file_metadata,
163 };
164 use crate::puffin::writer::PuffinWriter;
165 use crate::{ErrorKind, Result};
166
167 async fn write_puffin_file(
168 temp_dir: &TempDir,
169 blobs: Vec<(Blob, CompressionCodec)>,
170 properties: HashMap<String, String>,
171 ) -> Result<OutputFile> {
172 let file_io = FileIO::new_with_fs();
173
174 let path_buf = temp_dir.path().join("temp_puffin.bin");
175 let temp_path = path_buf.to_str().unwrap();
176 let output_file = file_io.new_output(temp_path)?;
177
178 let mut writer = PuffinWriter::new(&output_file, properties, false).await?;
179 for (blob, compression_codec) in blobs {
180 writer.add(blob, compression_codec).await?;
181 }
182 writer.close().await?;
183
184 Ok(output_file)
185 }
186
187 async fn read_all_blobs_from_puffin_file(input_file: InputFile) -> Vec<Blob> {
188 let puffin_reader = PuffinReader::new(input_file);
189 let mut blobs = Vec::new();
190 let blobs_metadata = puffin_reader.file_metadata().await.unwrap().clone().blobs;
191 for blob_metadata in blobs_metadata {
192 blobs.push(puffin_reader.blob(&blob_metadata).await.unwrap());
193 }
194 blobs
195 }
196
197 #[tokio::test]
198 async fn test_write_uncompressed_empty_file() {
199 let temp_dir = TempDir::new().unwrap();
200
201 let input_file = write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
202 .await
203 .unwrap()
204 .to_input_file();
205
206 assert_eq!(
207 FileMetadata::read(&input_file).await.unwrap(),
208 empty_footer_payload()
209 );
210
211 assert_eq!(
212 input_file.read().await.unwrap().len(),
213 FileMetadata::MAGIC_LENGTH as usize
214 + FileMetadata::MAGIC_LENGTH as usize
216 + empty_footer_payload_bytes().len()
217 + FileMetadata::FOOTER_STRUCT_LENGTH as usize
218 )
219 }
220
221 fn blobs_with_compression(
222 blobs: Vec<Blob>,
223 compression_codec: CompressionCodec,
224 ) -> Vec<(Blob, CompressionCodec)> {
225 blobs
226 .into_iter()
227 .map(|blob| (blob, compression_codec))
228 .collect()
229 }
230
231 #[tokio::test]
232 async fn test_write_uncompressed_metric_data() {
233 let temp_dir = TempDir::new().unwrap();
234 let blobs = vec![blob_0(), blob_1()];
235 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::None);
236
237 let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
238 .await
239 .unwrap()
240 .to_input_file();
241
242 assert_eq!(
243 FileMetadata::read(&input_file).await.unwrap(),
244 uncompressed_metric_file_metadata()
245 );
246
247 assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
248 }
249
250 #[tokio::test]
251 async fn test_write_zstd_compressed_metric_data() {
252 let temp_dir = TempDir::new().unwrap();
253 let blobs = vec![blob_0(), blob_1()];
254 let blobs_with_compression =
255 blobs_with_compression(blobs.clone(), CompressionCodec::zstd_default());
256
257 let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
258 .await
259 .unwrap()
260 .to_input_file();
261
262 assert_eq!(
263 FileMetadata::read(&input_file).await.unwrap(),
264 zstd_compressed_metric_file_metadata()
265 );
266
267 assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
268 }
269
270 #[tokio::test]
271 async fn test_write_lz4_compressed_metric_data() {
272 let temp_dir = TempDir::new().unwrap();
273 let blobs = vec![blob_0(), blob_1()];
274 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4);
275
276 assert_eq!(
277 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
278 .await
279 .unwrap_err()
280 .to_string(),
281 "FeatureUnsupported => LZ4 compression is not supported currently"
282 );
283 }
284
285 async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> {
286 input_file.read().await.unwrap().to_vec()
287 }
288
289 async fn assert_files_are_bit_identical(actual: OutputFile, expected: InputFile) {
290 let actual_bytes = get_file_as_byte_vec(actual.to_input_file()).await;
291 let expected_bytes = get_file_as_byte_vec(expected).await;
292 assert_eq!(actual_bytes, expected_bytes);
293 }
294
295 #[tokio::test]
296 async fn test_uncompressed_empty_puffin_file_is_bit_identical_to_java_generated_file() {
297 let temp_dir = TempDir::new().unwrap();
298
299 assert_files_are_bit_identical(
300 write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
301 .await
302 .unwrap(),
303 java_empty_uncompressed_input_file(),
304 )
305 .await
306 }
307
308 #[tokio::test]
309 async fn test_uncompressed_metric_data_is_bit_identical_to_java_generated_file() {
310 let temp_dir = TempDir::new().unwrap();
311 let blobs = vec![blob_0(), blob_1()];
312 let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::None);
313
314 assert_files_are_bit_identical(
315 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
316 .await
317 .unwrap(),
318 java_uncompressed_metric_input_file(),
319 )
320 .await
321 }
322
323 #[tokio::test]
324 async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() {
325 let temp_dir = TempDir::new().unwrap();
326 let blobs = vec![blob_0(), blob_1()];
327 let blobs_with_compression =
328 blobs_with_compression(blobs, CompressionCodec::zstd_default());
329
330 assert_files_are_bit_identical(
331 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
332 .await
333 .unwrap(),
334 java_zstd_compressed_metric_input_file(),
335 )
336 .await
337 }
338
339 #[tokio::test]
340 async fn test_gzip_compression_rejected() {
341 let temp_dir = TempDir::new().unwrap();
342 let blobs = vec![blob_0()];
343 let blobs_with_compression =
344 blobs_with_compression(blobs, CompressionCodec::gzip_default());
345
346 let result = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()).await;
347
348 assert!(result.is_err());
349 let err = result.unwrap_err();
350 assert_eq!(err.kind(), ErrorKind::DataInvalid);
351 assert!(err.to_string().contains("gzip"));
352 assert!(
353 err.to_string()
354 .contains("is not supported for Puffin files")
355 );
356 }
357}