1use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21
22use super::validate_puffin_compression;
23use crate::Result;
24use crate::compression::CompressionCodec;
25use crate::io::{FileWrite, OutputFile};
26use crate::puffin::blob::Blob;
27use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
28
29pub struct PuffinWriter {
31 writer: Box<dyn FileWrite>,
32 is_header_written: bool,
33 num_bytes_written: u64,
34 written_blobs_metadata: Vec<BlobMetadata>,
35 properties: HashMap<String, String>,
36 footer_compression_codec: CompressionCodec,
37 flags: HashSet<Flag>,
38}
39
40impl PuffinWriter {
41 pub async fn new(
43 output_file: &OutputFile,
44 properties: HashMap<String, String>,
45 compress_footer: bool,
46 ) -> Result<Self> {
47 let mut flags = HashSet::<Flag>::new();
48 let footer_compression_codec = if compress_footer {
49 flags.insert(Flag::FooterPayloadCompressed);
50 CompressionCodec::Lz4
51 } else {
52 CompressionCodec::None
53 };
54
55 Ok(Self {
56 writer: output_file.writer().await?,
57 is_header_written: false,
58 num_bytes_written: 0,
59 written_blobs_metadata: Vec::new(),
60 properties,
61 footer_compression_codec,
62 flags,
63 })
64 }
65
66 pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> {
68 validate_puffin_compression(compression_codec)?;
69
70 self.write_header_once().await?;
71
72 let offset = self.num_bytes_written;
73 let compressed_bytes: Bytes = compression_codec.compress(blob.data)?.into();
74 let length = compressed_bytes.len().try_into()?;
75 self.write(compressed_bytes).await?;
76 self.written_blobs_metadata.push(BlobMetadata {
77 r#type: blob.r#type,
78 fields: blob.fields,
79 snapshot_id: blob.snapshot_id,
80 sequence_number: blob.sequence_number,
81 offset,
82 length,
83 compression_codec,
84 properties: blob.properties,
85 });
86
87 Ok(())
88 }
89
90 pub async fn close(mut self) -> Result<()> {
92 self.write_header_once().await?;
93 self.write_footer().await?;
94 self.writer.close().await?;
95 Ok(())
96 }
97
98 async fn write(&mut self, bytes: Bytes) -> Result<()> {
99 let length = bytes.len();
100 self.writer.write(bytes).await?;
101 self.num_bytes_written += length as u64;
102 Ok(())
103 }
104
105 async fn write_header_once(&mut self) -> Result<()> {
106 if !self.is_header_written {
107 let bytes = Bytes::copy_from_slice(&FileMetadata::MAGIC);
108 self.write(bytes).await?;
109 self.is_header_written = true;
110 }
111 Ok(())
112 }
113
114 fn footer_payload_bytes(&self) -> Result<Vec<u8>> {
115 let file_metadata = FileMetadata {
116 blobs: self.written_blobs_metadata.clone(),
117 properties: self.properties.clone(),
118 };
119 let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
120 self.footer_compression_codec.compress(json.into_bytes())
121 }
122
123 fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize] {
124 let mut result = [0; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize];
125 for flag in &self.flags {
126 let byte_idx: usize = flag.byte_idx().into();
127 result[byte_idx] |= 0x1 << flag.bit_idx();
128 }
129 result
130 }
131
132 async fn write_footer(&mut self) -> Result<()> {
133 let mut footer_payload_bytes = self.footer_payload_bytes()?;
134 let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?);
135
136 let mut footer_bytes = Vec::new();
137 footer_bytes.extend(&FileMetadata::MAGIC);
138 footer_bytes.append(&mut footer_payload_bytes);
139 footer_bytes.extend(footer_payload_bytes_length);
140 footer_bytes.extend(self.flags_bytes());
141 footer_bytes.extend(&FileMetadata::MAGIC);
142
143 self.write(footer_bytes.into()).await
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::collections::HashMap;
150
151 use tempfile::TempDir;
152
153 use crate::compression::CompressionCodec;
154 use crate::io::{FileIOBuilder, InputFile, OutputFile};
155 use crate::puffin::blob::Blob;
156 use crate::puffin::metadata::FileMetadata;
157 use crate::puffin::reader::PuffinReader;
158 use crate::puffin::test_utils::{
159 blob_0, blob_1, empty_footer_payload, empty_footer_payload_bytes, file_properties,
160 java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
161 java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
162 zstd_compressed_metric_file_metadata,
163 };
164 use crate::puffin::writer::PuffinWriter;
165 use crate::{ErrorKind, Result};
166
167 async fn write_puffin_file(
168 temp_dir: &TempDir,
169 blobs: Vec<(Blob, CompressionCodec)>,
170 properties: HashMap<String, String>,
171 ) -> Result<OutputFile> {
172 let file_io = FileIOBuilder::new_fs_io().build()?;
173
174 let path_buf = temp_dir.path().join("temp_puffin.bin");
175 let temp_path = path_buf.to_str().unwrap();
176 let output_file = file_io.new_output(temp_path)?;
177
178 let mut writer = PuffinWriter::new(&output_file, properties, false).await?;
179 for (blob, compression_codec) in blobs {
180 writer.add(blob, compression_codec).await?;
181 }
182 writer.close().await?;
183
184 Ok(output_file)
185 }
186
187 async fn read_all_blobs_from_puffin_file(input_file: InputFile) -> Vec<Blob> {
188 let puffin_reader = PuffinReader::new(input_file);
189 let mut blobs = Vec::new();
190 let blobs_metadata = puffin_reader.file_metadata().await.unwrap().clone().blobs;
191 for blob_metadata in blobs_metadata {
192 blobs.push(puffin_reader.blob(&blob_metadata).await.unwrap());
193 }
194 blobs
195 }
196
197 #[tokio::test]
198 async fn test_write_uncompressed_empty_file() {
199 let temp_dir = TempDir::new().unwrap();
200
201 let input_file = write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
202 .await
203 .unwrap()
204 .to_input_file();
205
206 assert_eq!(
207 FileMetadata::read(&input_file).await.unwrap(),
208 empty_footer_payload()
209 );
210
211 assert_eq!(
212 input_file.read().await.unwrap().len(),
213 FileMetadata::MAGIC_LENGTH as usize
214 + FileMetadata::MAGIC_LENGTH as usize
216 + empty_footer_payload_bytes().len()
217 + FileMetadata::FOOTER_STRUCT_LENGTH as usize
218 )
219 }
220
221 fn blobs_with_compression(
222 blobs: Vec<Blob>,
223 compression_codec: CompressionCodec,
224 ) -> Vec<(Blob, CompressionCodec)> {
225 blobs
226 .into_iter()
227 .map(|blob| (blob, compression_codec))
228 .collect()
229 }
230
231 #[tokio::test]
232 async fn test_write_uncompressed_metric_data() {
233 let temp_dir = TempDir::new().unwrap();
234 let blobs = vec![blob_0(), blob_1()];
235 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::None);
236
237 let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
238 .await
239 .unwrap()
240 .to_input_file();
241
242 assert_eq!(
243 FileMetadata::read(&input_file).await.unwrap(),
244 uncompressed_metric_file_metadata()
245 );
246
247 assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
248 }
249
250 #[tokio::test]
251 async fn test_write_zstd_compressed_metric_data() {
252 let temp_dir = TempDir::new().unwrap();
253 let blobs = vec![blob_0(), blob_1()];
254 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd);
255
256 let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
257 .await
258 .unwrap()
259 .to_input_file();
260
261 assert_eq!(
262 FileMetadata::read(&input_file).await.unwrap(),
263 zstd_compressed_metric_file_metadata()
264 );
265
266 assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
267 }
268
269 #[tokio::test]
270 async fn test_write_lz4_compressed_metric_data() {
271 let temp_dir = TempDir::new().unwrap();
272 let blobs = vec![blob_0(), blob_1()];
273 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4);
274
275 assert_eq!(
276 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
277 .await
278 .unwrap_err()
279 .to_string(),
280 "FeatureUnsupported => LZ4 compression is not supported currently"
281 );
282 }
283
284 async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> {
285 input_file.read().await.unwrap().to_vec()
286 }
287
288 async fn assert_files_are_bit_identical(actual: OutputFile, expected: InputFile) {
289 let actual_bytes = get_file_as_byte_vec(actual.to_input_file()).await;
290 let expected_bytes = get_file_as_byte_vec(expected).await;
291 assert_eq!(actual_bytes, expected_bytes);
292 }
293
294 #[tokio::test]
295 async fn test_uncompressed_empty_puffin_file_is_bit_identical_to_java_generated_file() {
296 let temp_dir = TempDir::new().unwrap();
297
298 assert_files_are_bit_identical(
299 write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
300 .await
301 .unwrap(),
302 java_empty_uncompressed_input_file(),
303 )
304 .await
305 }
306
307 #[tokio::test]
308 async fn test_uncompressed_metric_data_is_bit_identical_to_java_generated_file() {
309 let temp_dir = TempDir::new().unwrap();
310 let blobs = vec![blob_0(), blob_1()];
311 let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::None);
312
313 assert_files_are_bit_identical(
314 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
315 .await
316 .unwrap(),
317 java_uncompressed_metric_input_file(),
318 )
319 .await
320 }
321
322 #[tokio::test]
323 async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() {
324 let temp_dir = TempDir::new().unwrap();
325 let blobs = vec![blob_0(), blob_1()];
326 let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd);
327
328 assert_files_are_bit_identical(
329 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
330 .await
331 .unwrap(),
332 java_zstd_compressed_metric_input_file(),
333 )
334 .await
335 }
336
337 #[tokio::test]
338 async fn test_gzip_compression_rejected() {
339 let temp_dir = TempDir::new().unwrap();
340 let blobs = vec![blob_0()];
341 let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Gzip);
342
343 let result = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()).await;
344
345 assert!(result.is_err());
346 let err = result.unwrap_err();
347 assert_eq!(err.kind(), ErrorKind::DataInvalid);
348 assert!(err.to_string().contains("Gzip"));
349 assert!(
350 err.to_string()
351 .contains("is not supported for Puffin files")
352 );
353 }
354}