iceberg/puffin/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21
22use super::validate_puffin_compression;
23use crate::Result;
24use crate::compression::CompressionCodec;
25use crate::io::{FileWrite, OutputFile};
26use crate::puffin::blob::Blob;
27use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
28
29/// Puffin writer
30pub struct PuffinWriter {
31    writer: Box<dyn FileWrite>,
32    is_header_written: bool,
33    num_bytes_written: u64,
34    written_blobs_metadata: Vec<BlobMetadata>,
35    properties: HashMap<String, String>,
36    footer_compression_codec: CompressionCodec,
37    flags: HashSet<Flag>,
38}
39
40impl PuffinWriter {
41    /// Returns a new Puffin writer
42    pub async fn new(
43        output_file: &OutputFile,
44        properties: HashMap<String, String>,
45        compress_footer: bool,
46    ) -> Result<Self> {
47        let mut flags = HashSet::<Flag>::new();
48        let footer_compression_codec = if compress_footer {
49            flags.insert(Flag::FooterPayloadCompressed);
50            CompressionCodec::Lz4
51        } else {
52            CompressionCodec::None
53        };
54
55        Ok(Self {
56            writer: output_file.writer().await?,
57            is_header_written: false,
58            num_bytes_written: 0,
59            written_blobs_metadata: Vec::new(),
60            properties,
61            footer_compression_codec,
62            flags,
63        })
64    }
65
66    /// Adds blob to Puffin file
67    pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> {
68        validate_puffin_compression(compression_codec)?;
69
70        self.write_header_once().await?;
71
72        let offset = self.num_bytes_written;
73        let compressed_bytes: Bytes = compression_codec.compress(blob.data)?.into();
74        let length = compressed_bytes.len().try_into()?;
75        self.write(compressed_bytes).await?;
76        self.written_blobs_metadata.push(BlobMetadata {
77            r#type: blob.r#type,
78            fields: blob.fields,
79            snapshot_id: blob.snapshot_id,
80            sequence_number: blob.sequence_number,
81            offset,
82            length,
83            compression_codec,
84            properties: blob.properties,
85        });
86
87        Ok(())
88    }
89
90    /// Finalizes the Puffin file
91    pub async fn close(mut self) -> Result<()> {
92        self.write_header_once().await?;
93        self.write_footer().await?;
94        self.writer.close().await?;
95        Ok(())
96    }
97
98    async fn write(&mut self, bytes: Bytes) -> Result<()> {
99        let length = bytes.len();
100        self.writer.write(bytes).await?;
101        self.num_bytes_written += length as u64;
102        Ok(())
103    }
104
105    async fn write_header_once(&mut self) -> Result<()> {
106        if !self.is_header_written {
107            let bytes = Bytes::copy_from_slice(&FileMetadata::MAGIC);
108            self.write(bytes).await?;
109            self.is_header_written = true;
110        }
111        Ok(())
112    }
113
114    fn footer_payload_bytes(&self) -> Result<Vec<u8>> {
115        let file_metadata = FileMetadata {
116            blobs: self.written_blobs_metadata.clone(),
117            properties: self.properties.clone(),
118        };
119        let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
120        self.footer_compression_codec.compress(json.into_bytes())
121    }
122
123    fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize] {
124        let mut result = [0; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize];
125        for flag in &self.flags {
126            let byte_idx: usize = flag.byte_idx().into();
127            result[byte_idx] |= 0x1 << flag.bit_idx();
128        }
129        result
130    }
131
132    async fn write_footer(&mut self) -> Result<()> {
133        let mut footer_payload_bytes = self.footer_payload_bytes()?;
134        let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?);
135
136        let mut footer_bytes = Vec::new();
137        footer_bytes.extend(&FileMetadata::MAGIC);
138        footer_bytes.append(&mut footer_payload_bytes);
139        footer_bytes.extend(footer_payload_bytes_length);
140        footer_bytes.extend(self.flags_bytes());
141        footer_bytes.extend(&FileMetadata::MAGIC);
142
143        self.write(footer_bytes.into()).await
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::collections::HashMap;
150
151    use tempfile::TempDir;
152
153    use crate::compression::CompressionCodec;
154    use crate::io::{FileIOBuilder, InputFile, OutputFile};
155    use crate::puffin::blob::Blob;
156    use crate::puffin::metadata::FileMetadata;
157    use crate::puffin::reader::PuffinReader;
158    use crate::puffin::test_utils::{
159        blob_0, blob_1, empty_footer_payload, empty_footer_payload_bytes, file_properties,
160        java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
161        java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
162        zstd_compressed_metric_file_metadata,
163    };
164    use crate::puffin::writer::PuffinWriter;
165    use crate::{ErrorKind, Result};
166
167    async fn write_puffin_file(
168        temp_dir: &TempDir,
169        blobs: Vec<(Blob, CompressionCodec)>,
170        properties: HashMap<String, String>,
171    ) -> Result<OutputFile> {
172        let file_io = FileIOBuilder::new_fs_io().build()?;
173
174        let path_buf = temp_dir.path().join("temp_puffin.bin");
175        let temp_path = path_buf.to_str().unwrap();
176        let output_file = file_io.new_output(temp_path)?;
177
178        let mut writer = PuffinWriter::new(&output_file, properties, false).await?;
179        for (blob, compression_codec) in blobs {
180            writer.add(blob, compression_codec).await?;
181        }
182        writer.close().await?;
183
184        Ok(output_file)
185    }
186
187    async fn read_all_blobs_from_puffin_file(input_file: InputFile) -> Vec<Blob> {
188        let puffin_reader = PuffinReader::new(input_file);
189        let mut blobs = Vec::new();
190        let blobs_metadata = puffin_reader.file_metadata().await.unwrap().clone().blobs;
191        for blob_metadata in blobs_metadata {
192            blobs.push(puffin_reader.blob(&blob_metadata).await.unwrap());
193        }
194        blobs
195    }
196
197    #[tokio::test]
198    async fn test_write_uncompressed_empty_file() {
199        let temp_dir = TempDir::new().unwrap();
200
201        let input_file = write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
202            .await
203            .unwrap()
204            .to_input_file();
205
206        assert_eq!(
207            FileMetadata::read(&input_file).await.unwrap(),
208            empty_footer_payload()
209        );
210
211        assert_eq!(
212            input_file.read().await.unwrap().len(),
213            FileMetadata::MAGIC_LENGTH as usize
214                // no blobs since puffin file is empty
215                + FileMetadata::MAGIC_LENGTH as usize
216                + empty_footer_payload_bytes().len()
217                + FileMetadata::FOOTER_STRUCT_LENGTH as usize
218        )
219    }
220
221    fn blobs_with_compression(
222        blobs: Vec<Blob>,
223        compression_codec: CompressionCodec,
224    ) -> Vec<(Blob, CompressionCodec)> {
225        blobs
226            .into_iter()
227            .map(|blob| (blob, compression_codec))
228            .collect()
229    }
230
231    #[tokio::test]
232    async fn test_write_uncompressed_metric_data() {
233        let temp_dir = TempDir::new().unwrap();
234        let blobs = vec![blob_0(), blob_1()];
235        let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::None);
236
237        let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
238            .await
239            .unwrap()
240            .to_input_file();
241
242        assert_eq!(
243            FileMetadata::read(&input_file).await.unwrap(),
244            uncompressed_metric_file_metadata()
245        );
246
247        assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
248    }
249
250    #[tokio::test]
251    async fn test_write_zstd_compressed_metric_data() {
252        let temp_dir = TempDir::new().unwrap();
253        let blobs = vec![blob_0(), blob_1()];
254        let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd);
255
256        let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
257            .await
258            .unwrap()
259            .to_input_file();
260
261        assert_eq!(
262            FileMetadata::read(&input_file).await.unwrap(),
263            zstd_compressed_metric_file_metadata()
264        );
265
266        assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
267    }
268
269    #[tokio::test]
270    async fn test_write_lz4_compressed_metric_data() {
271        let temp_dir = TempDir::new().unwrap();
272        let blobs = vec![blob_0(), blob_1()];
273        let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4);
274
275        assert_eq!(
276            write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
277                .await
278                .unwrap_err()
279                .to_string(),
280            "FeatureUnsupported => LZ4 compression is not supported currently"
281        );
282    }
283
284    async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> {
285        input_file.read().await.unwrap().to_vec()
286    }
287
288    async fn assert_files_are_bit_identical(actual: OutputFile, expected: InputFile) {
289        let actual_bytes = get_file_as_byte_vec(actual.to_input_file()).await;
290        let expected_bytes = get_file_as_byte_vec(expected).await;
291        assert_eq!(actual_bytes, expected_bytes);
292    }
293
294    #[tokio::test]
295    async fn test_uncompressed_empty_puffin_file_is_bit_identical_to_java_generated_file() {
296        let temp_dir = TempDir::new().unwrap();
297
298        assert_files_are_bit_identical(
299            write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
300                .await
301                .unwrap(),
302            java_empty_uncompressed_input_file(),
303        )
304        .await
305    }
306
307    #[tokio::test]
308    async fn test_uncompressed_metric_data_is_bit_identical_to_java_generated_file() {
309        let temp_dir = TempDir::new().unwrap();
310        let blobs = vec![blob_0(), blob_1()];
311        let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::None);
312
313        assert_files_are_bit_identical(
314            write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
315                .await
316                .unwrap(),
317            java_uncompressed_metric_input_file(),
318        )
319        .await
320    }
321
322    #[tokio::test]
323    async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() {
324        let temp_dir = TempDir::new().unwrap();
325        let blobs = vec![blob_0(), blob_1()];
326        let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd);
327
328        assert_files_are_bit_identical(
329            write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
330                .await
331                .unwrap(),
332            java_zstd_compressed_metric_input_file(),
333        )
334        .await
335    }
336
337    #[tokio::test]
338    async fn test_gzip_compression_rejected() {
339        let temp_dir = TempDir::new().unwrap();
340        let blobs = vec![blob_0()];
341        let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Gzip);
342
343        let result = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()).await;
344
345        assert!(result.is_err());
346        let err = result.unwrap_err();
347        assert_eq!(err.kind(), ErrorKind::DataInvalid);
348        assert!(err.to_string().contains("Gzip"));
349        assert!(
350            err.to_string()
351                .contains("is not supported for Puffin files")
352        );
353    }
354}