1use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21
22use crate::Result;
23use crate::io::{FileWrite, OutputFile};
24use crate::puffin::blob::Blob;
25use crate::puffin::compression::CompressionCodec;
26use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
27
28pub struct PuffinWriter {
30 writer: Box<dyn FileWrite>,
31 is_header_written: bool,
32 num_bytes_written: u64,
33 written_blobs_metadata: Vec<BlobMetadata>,
34 properties: HashMap<String, String>,
35 footer_compression_codec: CompressionCodec,
36 flags: HashSet<Flag>,
37}
38
39impl PuffinWriter {
40 pub async fn new(
42 output_file: &OutputFile,
43 properties: HashMap<String, String>,
44 compress_footer: bool,
45 ) -> Result<Self> {
46 let mut flags = HashSet::<Flag>::new();
47 let footer_compression_codec = if compress_footer {
48 flags.insert(Flag::FooterPayloadCompressed);
49 CompressionCodec::Lz4
50 } else {
51 CompressionCodec::None
52 };
53
54 Ok(Self {
55 writer: output_file.writer().await?,
56 is_header_written: false,
57 num_bytes_written: 0,
58 written_blobs_metadata: Vec::new(),
59 properties,
60 footer_compression_codec,
61 flags,
62 })
63 }
64
65 pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> {
67 self.write_header_once().await?;
68
69 let offset = self.num_bytes_written;
70 let compressed_bytes: Bytes = compression_codec.compress(blob.data)?.into();
71 let length = compressed_bytes.len().try_into()?;
72 self.write(compressed_bytes).await?;
73 self.written_blobs_metadata.push(BlobMetadata {
74 r#type: blob.r#type,
75 fields: blob.fields,
76 snapshot_id: blob.snapshot_id,
77 sequence_number: blob.sequence_number,
78 offset,
79 length,
80 compression_codec,
81 properties: blob.properties,
82 });
83
84 Ok(())
85 }
86
87 pub async fn close(mut self) -> Result<()> {
89 self.write_header_once().await?;
90 self.write_footer().await?;
91 self.writer.close().await?;
92 Ok(())
93 }
94
95 async fn write(&mut self, bytes: Bytes) -> Result<()> {
96 let length = bytes.len();
97 self.writer.write(bytes).await?;
98 self.num_bytes_written += length as u64;
99 Ok(())
100 }
101
102 async fn write_header_once(&mut self) -> Result<()> {
103 if !self.is_header_written {
104 let bytes = Bytes::copy_from_slice(&FileMetadata::MAGIC);
105 self.write(bytes).await?;
106 self.is_header_written = true;
107 }
108 Ok(())
109 }
110
111 fn footer_payload_bytes(&self) -> Result<Vec<u8>> {
112 let file_metadata = FileMetadata {
113 blobs: self.written_blobs_metadata.clone(),
114 properties: self.properties.clone(),
115 };
116 let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
117 let bytes = json.as_bytes();
118 self.footer_compression_codec.compress(bytes.to_vec())
119 }
120
121 fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize] {
122 let mut result = [0; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize];
123 for flag in &self.flags {
124 let byte_idx: usize = flag.byte_idx().into();
125 result[byte_idx] |= 0x1 << flag.bit_idx();
126 }
127 result
128 }
129
130 async fn write_footer(&mut self) -> Result<()> {
131 let mut footer_payload_bytes = self.footer_payload_bytes()?;
132 let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?);
133
134 let mut footer_bytes = Vec::new();
135 footer_bytes.extend(&FileMetadata::MAGIC);
136 footer_bytes.append(&mut footer_payload_bytes);
137 footer_bytes.extend(footer_payload_bytes_length);
138 footer_bytes.extend(self.flags_bytes());
139 footer_bytes.extend(&FileMetadata::MAGIC);
140
141 self.write(footer_bytes.into()).await
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use std::collections::HashMap;
148
149 use tempfile::TempDir;
150
151 use crate::Result;
152 use crate::io::{FileIOBuilder, InputFile, OutputFile};
153 use crate::puffin::blob::Blob;
154 use crate::puffin::compression::CompressionCodec;
155 use crate::puffin::metadata::FileMetadata;
156 use crate::puffin::reader::PuffinReader;
157 use crate::puffin::test_utils::{
158 blob_0, blob_1, empty_footer_payload, empty_footer_payload_bytes, file_properties,
159 java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
160 java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
161 zstd_compressed_metric_file_metadata,
162 };
163 use crate::puffin::writer::PuffinWriter;
164
165 async fn write_puffin_file(
166 temp_dir: &TempDir,
167 blobs: Vec<(Blob, CompressionCodec)>,
168 properties: HashMap<String, String>,
169 ) -> Result<OutputFile> {
170 let file_io = FileIOBuilder::new_fs_io().build()?;
171
172 let path_buf = temp_dir.path().join("temp_puffin.bin");
173 let temp_path = path_buf.to_str().unwrap();
174 let output_file = file_io.new_output(temp_path)?;
175
176 let mut writer = PuffinWriter::new(&output_file, properties, false).await?;
177 for (blob, compression_codec) in blobs {
178 writer.add(blob, compression_codec).await?;
179 }
180 writer.close().await?;
181
182 Ok(output_file)
183 }
184
185 async fn read_all_blobs_from_puffin_file(input_file: InputFile) -> Vec<Blob> {
186 let puffin_reader = PuffinReader::new(input_file);
187 let mut blobs = Vec::new();
188 let blobs_metadata = puffin_reader.file_metadata().await.unwrap().clone().blobs;
189 for blob_metadata in blobs_metadata {
190 blobs.push(puffin_reader.blob(&blob_metadata).await.unwrap());
191 }
192 blobs
193 }
194
195 #[tokio::test]
196 async fn test_write_uncompressed_empty_file() {
197 let temp_dir = TempDir::new().unwrap();
198
199 let input_file = write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
200 .await
201 .unwrap()
202 .to_input_file();
203
204 assert_eq!(
205 FileMetadata::read(&input_file).await.unwrap(),
206 empty_footer_payload()
207 );
208
209 assert_eq!(
210 input_file.read().await.unwrap().len(),
211 FileMetadata::MAGIC_LENGTH as usize
212 + FileMetadata::MAGIC_LENGTH as usize
214 + empty_footer_payload_bytes().len()
215 + FileMetadata::FOOTER_STRUCT_LENGTH as usize
216 )
217 }
218
219 fn blobs_with_compression(
220 blobs: Vec<Blob>,
221 compression_codec: CompressionCodec,
222 ) -> Vec<(Blob, CompressionCodec)> {
223 blobs
224 .into_iter()
225 .map(|blob| (blob, compression_codec))
226 .collect()
227 }
228
229 #[tokio::test]
230 async fn test_write_uncompressed_metric_data() {
231 let temp_dir = TempDir::new().unwrap();
232 let blobs = vec![blob_0(), blob_1()];
233 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::None);
234
235 let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
236 .await
237 .unwrap()
238 .to_input_file();
239
240 assert_eq!(
241 FileMetadata::read(&input_file).await.unwrap(),
242 uncompressed_metric_file_metadata()
243 );
244
245 assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
246 }
247
248 #[tokio::test]
249 async fn test_write_zstd_compressed_metric_data() {
250 let temp_dir = TempDir::new().unwrap();
251 let blobs = vec![blob_0(), blob_1()];
252 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd);
253
254 let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
255 .await
256 .unwrap()
257 .to_input_file();
258
259 assert_eq!(
260 FileMetadata::read(&input_file).await.unwrap(),
261 zstd_compressed_metric_file_metadata()
262 );
263
264 assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
265 }
266
267 #[tokio::test]
268 async fn test_write_lz4_compressed_metric_data() {
269 let temp_dir = TempDir::new().unwrap();
270 let blobs = vec![blob_0(), blob_1()];
271 let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4);
272
273 assert_eq!(
274 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
275 .await
276 .unwrap_err()
277 .to_string(),
278 "FeatureUnsupported => LZ4 compression is not supported currently"
279 );
280 }
281
282 async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> {
283 input_file.read().await.unwrap().to_vec()
284 }
285
286 async fn assert_files_are_bit_identical(actual: OutputFile, expected: InputFile) {
287 let actual_bytes = get_file_as_byte_vec(actual.to_input_file()).await;
288 let expected_bytes = get_file_as_byte_vec(expected).await;
289 assert_eq!(actual_bytes, expected_bytes);
290 }
291
292 #[tokio::test]
293 async fn test_uncompressed_empty_puffin_file_is_bit_identical_to_java_generated_file() {
294 let temp_dir = TempDir::new().unwrap();
295
296 assert_files_are_bit_identical(
297 write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
298 .await
299 .unwrap(),
300 java_empty_uncompressed_input_file(),
301 )
302 .await
303 }
304
305 #[tokio::test]
306 async fn test_uncompressed_metric_data_is_bit_identical_to_java_generated_file() {
307 let temp_dir = TempDir::new().unwrap();
308 let blobs = vec![blob_0(), blob_1()];
309 let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::None);
310
311 assert_files_are_bit_identical(
312 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
313 .await
314 .unwrap(),
315 java_uncompressed_metric_input_file(),
316 )
317 .await
318 }
319
320 #[tokio::test]
321 async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() {
322 let temp_dir = TempDir::new().unwrap();
323 let blobs = vec![blob_0(), blob_1()];
324 let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd);
325
326 assert_files_are_bit_identical(
327 write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
328 .await
329 .unwrap(),
330 java_zstd_compressed_metric_input_file(),
331 )
332 .await
333 }
334}