iceberg/puffin/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21
22use crate::Result;
23use crate::io::{FileWrite, OutputFile};
24use crate::puffin::blob::Blob;
25use crate::puffin::compression::CompressionCodec;
26use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
27
28/// Puffin writer
29pub struct PuffinWriter {
30    writer: Box<dyn FileWrite>,
31    is_header_written: bool,
32    num_bytes_written: u64,
33    written_blobs_metadata: Vec<BlobMetadata>,
34    properties: HashMap<String, String>,
35    footer_compression_codec: CompressionCodec,
36    flags: HashSet<Flag>,
37}
38
39impl PuffinWriter {
40    /// Returns a new Puffin writer
41    pub async fn new(
42        output_file: &OutputFile,
43        properties: HashMap<String, String>,
44        compress_footer: bool,
45    ) -> Result<Self> {
46        let mut flags = HashSet::<Flag>::new();
47        let footer_compression_codec = if compress_footer {
48            flags.insert(Flag::FooterPayloadCompressed);
49            CompressionCodec::Lz4
50        } else {
51            CompressionCodec::None
52        };
53
54        Ok(Self {
55            writer: output_file.writer().await?,
56            is_header_written: false,
57            num_bytes_written: 0,
58            written_blobs_metadata: Vec::new(),
59            properties,
60            footer_compression_codec,
61            flags,
62        })
63    }
64
65    /// Adds blob to Puffin file
66    pub async fn add(&mut self, blob: Blob, compression_codec: CompressionCodec) -> Result<()> {
67        self.write_header_once().await?;
68
69        let offset = self.num_bytes_written;
70        let compressed_bytes: Bytes = compression_codec.compress(blob.data)?.into();
71        let length = compressed_bytes.len().try_into()?;
72        self.write(compressed_bytes).await?;
73        self.written_blobs_metadata.push(BlobMetadata {
74            r#type: blob.r#type,
75            fields: blob.fields,
76            snapshot_id: blob.snapshot_id,
77            sequence_number: blob.sequence_number,
78            offset,
79            length,
80            compression_codec,
81            properties: blob.properties,
82        });
83
84        Ok(())
85    }
86
87    /// Finalizes the Puffin file
88    pub async fn close(mut self) -> Result<()> {
89        self.write_header_once().await?;
90        self.write_footer().await?;
91        self.writer.close().await?;
92        Ok(())
93    }
94
95    async fn write(&mut self, bytes: Bytes) -> Result<()> {
96        let length = bytes.len();
97        self.writer.write(bytes).await?;
98        self.num_bytes_written += length as u64;
99        Ok(())
100    }
101
102    async fn write_header_once(&mut self) -> Result<()> {
103        if !self.is_header_written {
104            let bytes = Bytes::copy_from_slice(&FileMetadata::MAGIC);
105            self.write(bytes).await?;
106            self.is_header_written = true;
107        }
108        Ok(())
109    }
110
111    fn footer_payload_bytes(&self) -> Result<Vec<u8>> {
112        let file_metadata = FileMetadata {
113            blobs: self.written_blobs_metadata.clone(),
114            properties: self.properties.clone(),
115        };
116        let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
117        let bytes = json.as_bytes();
118        self.footer_compression_codec.compress(bytes.to_vec())
119    }
120
121    fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize] {
122        let mut result = [0; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize];
123        for flag in &self.flags {
124            let byte_idx: usize = flag.byte_idx().into();
125            result[byte_idx] |= 0x1 << flag.bit_idx();
126        }
127        result
128    }
129
130    async fn write_footer(&mut self) -> Result<()> {
131        let mut footer_payload_bytes = self.footer_payload_bytes()?;
132        let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?);
133
134        let mut footer_bytes = Vec::new();
135        footer_bytes.extend(&FileMetadata::MAGIC);
136        footer_bytes.append(&mut footer_payload_bytes);
137        footer_bytes.extend(footer_payload_bytes_length);
138        footer_bytes.extend(self.flags_bytes());
139        footer_bytes.extend(&FileMetadata::MAGIC);
140
141        self.write(footer_bytes.into()).await
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use std::collections::HashMap;
148
149    use tempfile::TempDir;
150
151    use crate::Result;
152    use crate::io::{FileIOBuilder, InputFile, OutputFile};
153    use crate::puffin::blob::Blob;
154    use crate::puffin::compression::CompressionCodec;
155    use crate::puffin::metadata::FileMetadata;
156    use crate::puffin::reader::PuffinReader;
157    use crate::puffin::test_utils::{
158        blob_0, blob_1, empty_footer_payload, empty_footer_payload_bytes, file_properties,
159        java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
160        java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
161        zstd_compressed_metric_file_metadata,
162    };
163    use crate::puffin::writer::PuffinWriter;
164
165    async fn write_puffin_file(
166        temp_dir: &TempDir,
167        blobs: Vec<(Blob, CompressionCodec)>,
168        properties: HashMap<String, String>,
169    ) -> Result<OutputFile> {
170        let file_io = FileIOBuilder::new_fs_io().build()?;
171
172        let path_buf = temp_dir.path().join("temp_puffin.bin");
173        let temp_path = path_buf.to_str().unwrap();
174        let output_file = file_io.new_output(temp_path)?;
175
176        let mut writer = PuffinWriter::new(&output_file, properties, false).await?;
177        for (blob, compression_codec) in blobs {
178            writer.add(blob, compression_codec).await?;
179        }
180        writer.close().await?;
181
182        Ok(output_file)
183    }
184
185    async fn read_all_blobs_from_puffin_file(input_file: InputFile) -> Vec<Blob> {
186        let puffin_reader = PuffinReader::new(input_file);
187        let mut blobs = Vec::new();
188        let blobs_metadata = puffin_reader.file_metadata().await.unwrap().clone().blobs;
189        for blob_metadata in blobs_metadata {
190            blobs.push(puffin_reader.blob(&blob_metadata).await.unwrap());
191        }
192        blobs
193    }
194
195    #[tokio::test]
196    async fn test_write_uncompressed_empty_file() {
197        let temp_dir = TempDir::new().unwrap();
198
199        let input_file = write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
200            .await
201            .unwrap()
202            .to_input_file();
203
204        assert_eq!(
205            FileMetadata::read(&input_file).await.unwrap(),
206            empty_footer_payload()
207        );
208
209        assert_eq!(
210            input_file.read().await.unwrap().len(),
211            FileMetadata::MAGIC_LENGTH as usize
212                // no blobs since puffin file is empty
213                + FileMetadata::MAGIC_LENGTH as usize
214                + empty_footer_payload_bytes().len()
215                + FileMetadata::FOOTER_STRUCT_LENGTH as usize
216        )
217    }
218
219    fn blobs_with_compression(
220        blobs: Vec<Blob>,
221        compression_codec: CompressionCodec,
222    ) -> Vec<(Blob, CompressionCodec)> {
223        blobs
224            .into_iter()
225            .map(|blob| (blob, compression_codec))
226            .collect()
227    }
228
229    #[tokio::test]
230    async fn test_write_uncompressed_metric_data() {
231        let temp_dir = TempDir::new().unwrap();
232        let blobs = vec![blob_0(), blob_1()];
233        let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::None);
234
235        let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
236            .await
237            .unwrap()
238            .to_input_file();
239
240        assert_eq!(
241            FileMetadata::read(&input_file).await.unwrap(),
242            uncompressed_metric_file_metadata()
243        );
244
245        assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
246    }
247
248    #[tokio::test]
249    async fn test_write_zstd_compressed_metric_data() {
250        let temp_dir = TempDir::new().unwrap();
251        let blobs = vec![blob_0(), blob_1()];
252        let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd);
253
254        let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
255            .await
256            .unwrap()
257            .to_input_file();
258
259        assert_eq!(
260            FileMetadata::read(&input_file).await.unwrap(),
261            zstd_compressed_metric_file_metadata()
262        );
263
264        assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
265    }
266
267    #[tokio::test]
268    async fn test_write_lz4_compressed_metric_data() {
269        let temp_dir = TempDir::new().unwrap();
270        let blobs = vec![blob_0(), blob_1()];
271        let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4);
272
273        assert_eq!(
274            write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
275                .await
276                .unwrap_err()
277                .to_string(),
278            "FeatureUnsupported => LZ4 compression is not supported currently"
279        );
280    }
281
282    async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> {
283        input_file.read().await.unwrap().to_vec()
284    }
285
286    async fn assert_files_are_bit_identical(actual: OutputFile, expected: InputFile) {
287        let actual_bytes = get_file_as_byte_vec(actual.to_input_file()).await;
288        let expected_bytes = get_file_as_byte_vec(expected).await;
289        assert_eq!(actual_bytes, expected_bytes);
290    }
291
292    #[tokio::test]
293    async fn test_uncompressed_empty_puffin_file_is_bit_identical_to_java_generated_file() {
294        let temp_dir = TempDir::new().unwrap();
295
296        assert_files_are_bit_identical(
297            write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
298                .await
299                .unwrap(),
300            java_empty_uncompressed_input_file(),
301        )
302        .await
303    }
304
305    #[tokio::test]
306    async fn test_uncompressed_metric_data_is_bit_identical_to_java_generated_file() {
307        let temp_dir = TempDir::new().unwrap();
308        let blobs = vec![blob_0(), blob_1()];
309        let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::None);
310
311        assert_files_are_bit_identical(
312            write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
313                .await
314                .unwrap(),
315            java_uncompressed_metric_input_file(),
316        )
317        .await
318    }
319
320    #[tokio::test]
321    async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() {
322        let temp_dir = TempDir::new().unwrap();
323        let blobs = vec![blob_0(), blob_1()];
324        let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd);
325
326        assert_files_are_bit_identical(
327            write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
328                .await
329                .unwrap(),
330            java_zstd_compressed_metric_input_file(),
331        )
332        .await
333    }
334}