iceberg/puffin/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21use serde::{Deserialize, Serialize};
22
23use crate::compression::CompressionCodec;
24use crate::io::{FileRead, InputFile};
25use crate::{Error, ErrorKind, Result};
26
27/// Human-readable identification of the application writing the file, along with its version.
28/// Example: "Trino version 381"
29pub const CREATED_BY_PROPERTY: &str = "created-by";
30
31/// Metadata about a blob.
32/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata
33#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34#[serde(rename_all = "kebab-case")]
35pub struct BlobMetadata {
36    pub(crate) r#type: String,
37    pub(crate) fields: Vec<i32>,
38    pub(crate) snapshot_id: i64,
39    pub(crate) sequence_number: i64,
40    pub(crate) offset: u64,
41    pub(crate) length: u64,
42    #[serde(skip_serializing_if = "CompressionCodec::is_none")]
43    #[serde(default)]
44    pub(crate) compression_codec: CompressionCodec,
45    #[serde(skip_serializing_if = "HashMap::is_empty")]
46    #[serde(default)]
47    pub(crate) properties: HashMap<String, String>,
48}
49
50impl BlobMetadata {
51    #[inline]
52    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
53    pub fn blob_type(&self) -> &str {
54        &self.r#type
55    }
56
57    #[inline]
58    /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
59    pub fn fields(&self) -> &[i32] {
60        &self.fields
61    }
62
63    #[inline]
64    /// ID of the Iceberg table's snapshot the blob was computed from
65    pub fn snapshot_id(&self) -> i64 {
66        self.snapshot_id
67    }
68
69    #[inline]
70    /// Sequence number of the Iceberg table's snapshot the blob was computed from
71    pub fn sequence_number(&self) -> i64 {
72        self.sequence_number
73    }
74
75    #[inline]
76    /// The offset in the file where the blob contents start
77    pub fn offset(&self) -> u64 {
78        self.offset
79    }
80
81    #[inline]
82    /// The length of the blob stored in the file (after compression, if compressed)
83    pub fn length(&self) -> u64 {
84        self.length
85    }
86
87    #[inline]
88    /// The compression codec used to compress the data
89    pub fn compression_codec(&self) -> CompressionCodec {
90        self.compression_codec
91    }
92
93    #[inline]
94    /// Arbitrary meta-information about the blob
95    pub fn properties(&self) -> &HashMap<String, String> {
96        &self.properties
97    }
98}
99
100#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
101pub(crate) enum Flag {
102    FooterPayloadCompressed = 0,
103}
104
105impl Flag {
106    pub(crate) fn byte_idx(self) -> u8 {
107        (self as u8) / 8
108    }
109
110    pub(crate) fn bit_idx(self) -> u8 {
111        (self as u8) % 8
112    }
113
114    fn matches(self, byte_idx: u8, bit_idx: u8) -> bool {
115        self.byte_idx() == byte_idx && self.bit_idx() == bit_idx
116    }
117
118    fn from(byte_idx: u8, bit_idx: u8) -> Result<Flag> {
119        if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) {
120            Ok(Flag::FooterPayloadCompressed)
121        } else {
122            Err(Error::new(
123                ErrorKind::DataInvalid,
124                format!("Unknown flag byte {byte_idx} and bit {bit_idx} combination"),
125            ))
126        }
127    }
128}
129
130/// Metadata about a puffin file.
131///
132/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata
133#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
134pub struct FileMetadata {
135    pub(crate) blobs: Vec<BlobMetadata>,
136    #[serde(skip_serializing_if = "HashMap::is_empty")]
137    #[serde(default)]
138    pub(crate) properties: HashMap<String, String>,
139}
140
141impl FileMetadata {
142    pub(crate) const MAGIC_LENGTH: u8 = 4;
143    pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];
144
145    /// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer.
146    /// The structure of the Footer specification is illustrated below:
147    ///
148    /// ```text                                             
149    ///        Footer
150    ///        ┌────────────────────┐                 
151    ///        │  Magic (4 bytes)   │                 
152    ///        │                    │                 
153    ///        ├────────────────────┤                 
154    ///        │   FooterPayload    │                 
155    ///        │  (PAYLOAD_LENGTH)  │                 
156    ///        ├────────────────────┤ ◀─┐             
157    ///        │ FooterPayloadSize  │   │             
158    ///        │     (4 bytes)      │   │             
159    ///        ├────────────────────┤                 
160    ///        │  Flags (4 bytes)   │  FOOTER_STRUCT  
161    ///        │                    │                 
162    ///        ├────────────────────┤   │             
163    ///        │  Magic (4 bytes)   │   │             
164    ///        │                    │   │             
165    ///        └────────────────────┘ ◀─┘  
166    /// ```                      
167    const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
168    const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
169    const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
170        + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
171    pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
172    const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
173        FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
174    pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
175        FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
176
177    /// Constructs new puffin `FileMetadata`
178    pub fn new(blobs: Vec<BlobMetadata>, properties: HashMap<String, String>) -> Self {
179        Self { blobs, properties }
180    }
181
182    fn check_magic(bytes: &[u8]) -> Result<()> {
183        if bytes == FileMetadata::MAGIC {
184            Ok(())
185        } else {
186            Err(Error::new(
187                ErrorKind::DataInvalid,
188                format!(
189                    "Bad magic value: {:?} should be {:?}",
190                    bytes,
191                    FileMetadata::MAGIC
192                ),
193            ))
194        }
195    }
196
197    async fn read_footer_payload_length(
198        file_read: &dyn FileRead,
199        input_file_length: u64,
200    ) -> Result<u32> {
201        let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as u64;
202        let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as u64;
203        let footer_payload_length_bytes = file_read.read(start..end).await?;
204        let mut buf = [0; 4];
205        buf.copy_from_slice(&footer_payload_length_bytes);
206        let footer_payload_length = u32::from_le_bytes(buf);
207        Ok(footer_payload_length)
208    }
209
210    async fn read_footer_bytes(
211        file_read: &dyn FileRead,
212        input_file_length: u64,
213        footer_payload_length: u32,
214    ) -> Result<Bytes> {
215        let footer_length = footer_payload_length as u64
216            + FileMetadata::FOOTER_STRUCT_LENGTH as u64
217            + FileMetadata::MAGIC_LENGTH as u64;
218        let start = input_file_length - footer_length;
219        let end = input_file_length;
220        file_read.read(start..end).await
221    }
222
223    fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
224        let mut flags = HashSet::new();
225
226        for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
227            let byte_offset = footer_bytes.len()
228                - FileMetadata::MAGIC_LENGTH as usize
229                - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize
230                + byte_idx as usize;
231
232            let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
233                Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.")
234            })?;
235
236            for bit_idx in 0..8 {
237                if ((flag_byte >> bit_idx) & 1) != 0 {
238                    let flag = Flag::from(byte_idx, bit_idx)?;
239                    flags.insert(flag);
240                }
241            }
242        }
243
244        Ok(flags)
245    }
246
247    fn extract_footer_payload_as_str(
248        footer_bytes: &[u8],
249        footer_payload_length: u32,
250    ) -> Result<String> {
251        let flags = FileMetadata::decode_flags(footer_bytes)?;
252        let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) {
253            CompressionCodec::Lz4
254        } else {
255            CompressionCodec::None
256        };
257
258        let start_offset = FileMetadata::MAGIC_LENGTH as usize;
259        let end_offset =
260            FileMetadata::MAGIC_LENGTH as usize + usize::try_from(footer_payload_length)?;
261        let footer_payload_bytes = footer_bytes
262            .get(start_offset..end_offset)
263            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is out of bounds."))?;
264        let decompressed_footer_payload_bytes =
265            footer_compression_codec.decompress(footer_payload_bytes.into())?;
266
267        String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
268            Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 string")
269                .with_source(src)
270        })
271    }
272
273    fn from_json_str(string: &str) -> Result<FileMetadata> {
274        serde_json::from_str::<FileMetadata>(string).map_err(|src| {
275            Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON").with_source(src)
276        })
277    }
278
279    /// Returns the file metadata about a Puffin file
280    pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
281        let file_read = input_file.reader().await?;
282
283        let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
284        FileMetadata::check_magic(&first_four_bytes)?;
285
286        let input_file_length = input_file.metadata().await?.size;
287        let footer_payload_length =
288            FileMetadata::read_footer_payload_length(file_read.as_ref(), input_file_length).await?;
289        let footer_bytes = FileMetadata::read_footer_bytes(
290            file_read.as_ref(),
291            input_file_length,
292            footer_payload_length,
293        )
294        .await?;
295
296        let magic_length = FileMetadata::MAGIC_LENGTH as usize;
297        // check first four bytes of footer
298        FileMetadata::check_magic(&footer_bytes[..magic_length])?;
299        // check last four bytes of footer
300        FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
301
302        let footer_payload_str =
303            FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
304
305        FileMetadata::from_json_str(&footer_payload_str)
306    }
307
308    /// Reads file_metadata in puffin file with a prefetch hint
309    ///
310    /// `prefetch_hint` is used to try to fetch the entire footer in one read. If
311    /// the entire footer isn't fetched in one read the function will call the regular
312    /// read option.
313    #[allow(dead_code)]
314    pub(crate) async fn read_with_prefetch(
315        input_file: &InputFile,
316        prefetch_hint: u8,
317    ) -> Result<FileMetadata> {
318        if prefetch_hint > 16 {
319            let input_file_length = input_file.metadata().await?.size;
320            let file_read = input_file.reader().await?;
321
322            // Hint cannot be larger than input file
323            if prefetch_hint as u64 > input_file_length {
324                return FileMetadata::read(input_file).await;
325            }
326
327            // Validate file header magic
328            let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
329            FileMetadata::check_magic(&first_four_bytes)?;
330
331            // Read footer based on prefetch hint
332            let start = input_file_length - prefetch_hint as u64;
333            let end = input_file_length;
334            let footer_bytes = file_read.read(start..end).await?;
335
336            let payload_length_start =
337                footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
338            let payload_length_end =
339                payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
340            let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];
341
342            let mut buf = [0; 4];
343            buf.copy_from_slice(payload_length_bytes);
344            let footer_payload_length = u32::from_le_bytes(buf);
345
346            // If the (footer payload length + FOOTER_STRUCT_LENGTH + MAGIC_LENGTH) is greater
347            // than the fetched footer then you can have it read regularly from a read with no
348            // prefetch while passing in the footer_payload_length.
349            let footer_length = (footer_payload_length as usize)
350                + FileMetadata::FOOTER_STRUCT_LENGTH as usize
351                + FileMetadata::MAGIC_LENGTH as usize;
352            if footer_length > prefetch_hint as usize {
353                return FileMetadata::read(input_file).await;
354            }
355
356            // Read footer bytes
357            let footer_start = footer_bytes.len() - footer_length;
358            let footer_end = footer_bytes.len();
359            let footer_bytes = &footer_bytes[footer_start..footer_end];
360
361            let magic_length = FileMetadata::MAGIC_LENGTH as usize;
362            // check first four bytes of footer
363            FileMetadata::check_magic(&footer_bytes[..magic_length])?;
364            // check last four bytes of footer
365            FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
366
367            let footer_payload_str =
368                FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
369            return FileMetadata::from_json_str(&footer_payload_str);
370        }
371
372        FileMetadata::read(input_file).await
373    }
374
375    #[inline]
376    /// Metadata about blobs in file
377    pub fn blobs(&self) -> &[BlobMetadata] {
378        &self.blobs
379    }
380
381    #[inline]
382    /// Arbitrary meta-information, like writer identification/version.
383    pub fn properties(&self) -> &HashMap<String, String> {
384        &self.properties
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use std::collections::HashMap;
391
392    use bytes::Bytes;
393    use tempfile::TempDir;
394
395    use crate::ErrorKind;
396    use crate::io::{FileIO, InputFile};
397    use crate::puffin::metadata::{BlobMetadata, CompressionCodec, FileMetadata};
398    use crate::puffin::test_utils::{
399        empty_footer_payload, empty_footer_payload_bytes, empty_footer_payload_bytes_length_bytes,
400        java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
401        java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
402        zstd_compressed_metric_file_metadata,
403    };
404
405    const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0];
406
407    async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> InputFile {
408        let file_io = FileIO::new_with_fs();
409
410        let path_buf = temp_dir.path().join("abc.puffin");
411        let temp_path = path_buf.to_str().unwrap();
412        let output_file = file_io.new_output(temp_path).unwrap();
413
414        output_file
415            .write(Bytes::copy_from_slice(slice))
416            .await
417            .unwrap();
418
419        output_file.to_input_file()
420    }
421
422    async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> InputFile {
423        let payload_bytes = payload_str.as_bytes();
424
425        let mut bytes = vec![];
426        bytes.extend(FileMetadata::MAGIC.to_vec());
427        bytes.extend(FileMetadata::MAGIC.to_vec());
428        bytes.extend(payload_bytes);
429        bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32));
430        bytes.extend(vec![0, 0, 0, 0]);
431        bytes.extend(FileMetadata::MAGIC);
432
433        input_file_with_bytes(temp_dir, &bytes).await
434    }
435
436    #[tokio::test]
437    async fn test_file_starting_with_invalid_magic_returns_error() {
438        let temp_dir = TempDir::new().unwrap();
439
440        let mut bytes = vec![];
441        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
442        bytes.extend(FileMetadata::MAGIC.to_vec());
443        bytes.extend(empty_footer_payload_bytes());
444        bytes.extend(empty_footer_payload_bytes_length_bytes());
445        bytes.extend(vec![0, 0, 0, 0]);
446        bytes.extend(FileMetadata::MAGIC);
447
448        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
449
450        assert_eq!(
451            FileMetadata::read(&input_file)
452                .await
453                .unwrap_err()
454                .to_string(),
455            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
456        )
457    }
458
459    #[tokio::test]
460    async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() {
461        let temp_dir = TempDir::new().unwrap();
462
463        let mut bytes = vec![];
464        bytes.extend(FileMetadata::MAGIC.to_vec());
465        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
466        bytes.extend(empty_footer_payload_bytes());
467        bytes.extend(empty_footer_payload_bytes_length_bytes());
468        bytes.extend(vec![0, 0, 0, 0]);
469        bytes.extend(FileMetadata::MAGIC);
470
471        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
472
473        assert_eq!(
474            FileMetadata::read(&input_file)
475                .await
476                .unwrap_err()
477                .to_string(),
478            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
479        )
480    }
481
482    #[tokio::test]
483    async fn test_file_ending_with_invalid_magic_returns_error() {
484        let temp_dir = TempDir::new().unwrap();
485
486        let mut bytes = vec![];
487        bytes.extend(FileMetadata::MAGIC.to_vec());
488        bytes.extend(FileMetadata::MAGIC.to_vec());
489        bytes.extend(empty_footer_payload_bytes());
490        bytes.extend(empty_footer_payload_bytes_length_bytes());
491        bytes.extend(vec![0, 0, 0, 0]);
492        bytes.extend(INVALID_MAGIC_VALUE);
493
494        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
495
496        assert_eq!(
497            FileMetadata::read(&input_file)
498                .await
499                .unwrap_err()
500                .to_string(),
501            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
502        )
503    }
504
505    #[tokio::test]
506    async fn test_encoded_payload_length_larger_than_actual_payload_length_returns_error() {
507        let temp_dir = TempDir::new().unwrap();
508
509        let mut bytes = vec![];
510        bytes.extend(FileMetadata::MAGIC.to_vec());
511        bytes.extend(FileMetadata::MAGIC.to_vec());
512        bytes.extend(empty_footer_payload_bytes());
513        bytes.extend(u32::to_le_bytes(
514            empty_footer_payload_bytes().len() as u32 + 1,
515        ));
516        bytes.extend(vec![0, 0, 0, 0]);
517        bytes.extend(FileMetadata::MAGIC.to_vec());
518
519        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
520
521        assert_eq!(
522            FileMetadata::read(&input_file)
523                .await
524                .unwrap_err()
525                .to_string(),
526            "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 70, 65, 49]",
527        )
528    }
529
530    #[tokio::test]
531    async fn test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() {
532        let temp_dir = TempDir::new().unwrap();
533
534        let mut bytes = vec![];
535        bytes.extend(FileMetadata::MAGIC.to_vec());
536        bytes.extend(FileMetadata::MAGIC.to_vec());
537        bytes.extend(empty_footer_payload_bytes());
538        bytes.extend(u32::to_le_bytes(
539            empty_footer_payload_bytes().len() as u32 - 1,
540        ));
541        bytes.extend(vec![0, 0, 0, 0]);
542        bytes.extend(FileMetadata::MAGIC.to_vec());
543
544        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
545
546        assert_eq!(
547            FileMetadata::read(&input_file)
548                .await
549                .unwrap_err()
550                .to_string(),
551            "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 70, 65, 49]",
552        )
553    }
554
555    #[tokio::test]
556    async fn test_lz4_compressed_footer_returns_error() {
557        let temp_dir = TempDir::new().unwrap();
558
559        let mut bytes = vec![];
560        bytes.extend(FileMetadata::MAGIC.to_vec());
561        bytes.extend(FileMetadata::MAGIC.to_vec());
562        bytes.extend(empty_footer_payload_bytes());
563        bytes.extend(empty_footer_payload_bytes_length_bytes());
564        bytes.extend(vec![0b00000001, 0, 0, 0]);
565        bytes.extend(FileMetadata::MAGIC.to_vec());
566
567        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
568
569        assert_eq!(
570            FileMetadata::read(&input_file)
571                .await
572                .unwrap_err()
573                .to_string(),
574            "FeatureUnsupported => LZ4 decompression is not supported currently",
575        )
576    }
577
578    #[tokio::test]
579    async fn test_unknown_byte_bit_combination_returns_error() {
580        let temp_dir = TempDir::new().unwrap();
581
582        let mut bytes = vec![];
583        bytes.extend(FileMetadata::MAGIC.to_vec());
584        bytes.extend(FileMetadata::MAGIC.to_vec());
585        bytes.extend(empty_footer_payload_bytes());
586        bytes.extend(empty_footer_payload_bytes_length_bytes());
587        bytes.extend(vec![0b00000010, 0, 0, 0]);
588        bytes.extend(FileMetadata::MAGIC.to_vec());
589
590        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
591
592        assert_eq!(
593            FileMetadata::read(&input_file)
594                .await
595                .unwrap_err()
596                .to_string(),
597            "DataInvalid => Unknown flag byte 0 and bit 1 combination",
598        )
599    }
600
601    #[tokio::test]
602    async fn test_non_utf8_string_payload_returns_error() {
603        let temp_dir = TempDir::new().unwrap();
604
605        let payload_bytes: [u8; 4] = [0, 159, 146, 150];
606        let payload_bytes_length_bytes: [u8; 4] = u32::to_le_bytes(payload_bytes.len() as u32);
607
608        let mut bytes = vec![];
609        bytes.extend(FileMetadata::MAGIC.to_vec());
610        bytes.extend(FileMetadata::MAGIC.to_vec());
611        bytes.extend(payload_bytes);
612        bytes.extend(payload_bytes_length_bytes);
613        bytes.extend(vec![0, 0, 0, 0]);
614        bytes.extend(FileMetadata::MAGIC.to_vec());
615
616        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
617
618        assert_eq!(
619            FileMetadata::read(&input_file)
620                .await
621                .unwrap_err()
622                .to_string(),
623            "DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1",
624        )
625    }
626
627    #[tokio::test]
628    async fn test_minimal_valid_file_returns_file_metadata() {
629        let temp_dir = TempDir::new().unwrap();
630
631        let mut bytes = vec![];
632        bytes.extend(FileMetadata::MAGIC.to_vec());
633        bytes.extend(FileMetadata::MAGIC.to_vec());
634        bytes.extend(empty_footer_payload_bytes());
635        bytes.extend(empty_footer_payload_bytes_length_bytes());
636        bytes.extend(vec![0, 0, 0, 0]);
637        bytes.extend(FileMetadata::MAGIC);
638
639        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
640
641        assert_eq!(
642            FileMetadata::read(&input_file).await.unwrap(),
643            FileMetadata {
644                blobs: vec![],
645                properties: HashMap::new(),
646            }
647        )
648    }
649
650    #[tokio::test]
651    async fn test_returns_file_metadata_property() {
652        let temp_dir = TempDir::new().unwrap();
653
654        let input_file = input_file_with_payload(
655            &temp_dir,
656            r#"{
657                "blobs" : [ ],
658                "properties" : {
659                    "a property" : "a property value"
660                }
661            }"#,
662        )
663        .await;
664
665        assert_eq!(
666            FileMetadata::read(&input_file).await.unwrap(),
667            FileMetadata {
668                blobs: vec![],
669                properties: {
670                    let mut map = HashMap::new();
671                    map.insert("a property".to_string(), "a property value".to_string());
672                    map
673                },
674            }
675        )
676    }
677
678    #[tokio::test]
679    async fn test_returns_file_metadata_properties() {
680        let temp_dir = TempDir::new().unwrap();
681
682        let input_file = input_file_with_payload(
683            &temp_dir,
684            r#"{
685                "blobs" : [ ],
686                "properties" : {
687                    "a property" : "a property value",
688                    "another one": "also with value"
689                }
690            }"#,
691        )
692        .await;
693
694        assert_eq!(
695            FileMetadata::read(&input_file).await.unwrap(),
696            FileMetadata {
697                blobs: vec![],
698                properties: {
699                    let mut map = HashMap::new();
700                    map.insert("a property".to_string(), "a property value".to_string());
701                    map.insert("another one".to_string(), "also with value".to_string());
702                    map
703                },
704            }
705        )
706    }
707
708    #[tokio::test]
709    async fn test_returns_error_if_blobs_field_is_missing() {
710        let temp_dir = TempDir::new().unwrap();
711
712        let input_file = input_file_with_payload(
713            &temp_dir,
714            r#"{
715                "properties" : {}
716            }"#,
717        )
718        .await;
719
720        assert_eq!(
721            FileMetadata::read(&input_file)
722                .await
723                .unwrap_err()
724                .to_string(),
725            format!(
726                "DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13"
727            ),
728        )
729    }
730
731    #[tokio::test]
732    async fn test_returns_error_if_blobs_field_is_bad() {
733        let temp_dir = TempDir::new().unwrap();
734
735        let input_file = input_file_with_payload(
736            &temp_dir,
737            r#"{
738                "blobs" : {}
739            }"#,
740        )
741        .await;
742
743        assert_eq!(
744            FileMetadata::read(&input_file)
745                .await
746                .unwrap_err()
747                .to_string(),
748            format!(
749                "DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"
750            ),
751        )
752    }
753
754    #[tokio::test]
755    async fn test_returns_blobs_metadatas() {
756        let temp_dir = TempDir::new().unwrap();
757
758        let input_file = input_file_with_payload(
759            &temp_dir,
760            r#"{
761                "blobs" : [
762                    {
763                        "type" : "type-a",
764                        "fields" : [ 1 ],
765                        "snapshot-id" : 14,
766                        "sequence-number" : 3,
767                        "offset" : 4,
768                        "length" : 16
769                    },
770                    {
771                        "type" : "type-bbb",
772                        "fields" : [ 2, 3, 4 ],
773                        "snapshot-id" : 77,
774                        "sequence-number" : 4,
775                        "offset" : 21474836470000,
776                        "length" : 79834
777                    }
778                ]
779            }"#,
780        )
781        .await;
782
783        assert_eq!(
784            FileMetadata::read(&input_file).await.unwrap(),
785            FileMetadata {
786                blobs: vec![
787                    BlobMetadata {
788                        r#type: "type-a".to_string(),
789                        fields: vec![1],
790                        snapshot_id: 14,
791                        sequence_number: 3,
792                        offset: 4,
793                        length: 16,
794                        compression_codec: CompressionCodec::None,
795                        properties: HashMap::new(),
796                    },
797                    BlobMetadata {
798                        r#type: "type-bbb".to_string(),
799                        fields: vec![2, 3, 4],
800                        snapshot_id: 77,
801                        sequence_number: 4,
802                        offset: 21474836470000,
803                        length: 79834,
804                        compression_codec: CompressionCodec::None,
805                        properties: HashMap::new(),
806                    },
807                ],
808                properties: HashMap::new(),
809            }
810        )
811    }
812
813    #[tokio::test]
814    async fn test_returns_properties_in_blob_metadata() {
815        let temp_dir = TempDir::new().unwrap();
816
817        let input_file = input_file_with_payload(
818            &temp_dir,
819            r#"{
820                "blobs" : [
821                    {
822                        "type" : "type-a",
823                        "fields" : [ 1 ],
824                        "snapshot-id" : 14,
825                        "sequence-number" : 3,
826                        "offset" : 4,
827                        "length" : 16,
828                        "properties" : {
829                            "some key" : "some value"
830                        }
831                    }
832                ]
833            }"#,
834        )
835        .await;
836
837        assert_eq!(
838            FileMetadata::read(&input_file).await.unwrap(),
839            FileMetadata {
840                blobs: vec![BlobMetadata {
841                    r#type: "type-a".to_string(),
842                    fields: vec![1],
843                    snapshot_id: 14,
844                    sequence_number: 3,
845                    offset: 4,
846                    length: 16,
847                    compression_codec: CompressionCodec::None,
848                    properties: {
849                        let mut map = HashMap::new();
850                        map.insert("some key".to_string(), "some value".to_string());
851                        map
852                    },
853                }],
854                properties: HashMap::new(),
855            }
856        )
857    }
858
859    #[tokio::test]
860    async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() {
861        let temp_dir = TempDir::new().unwrap();
862
863        let out_of_i32_range_number: i64 = i32::MAX as i64 + 1;
864
865        let input_file = input_file_with_payload(
866            &temp_dir,
867            &format!(
868                r#"{{
869                    "blobs" : [
870                        {{
871                            "type" : "type-a",
872                            "fields" : [ {out_of_i32_range_number} ],
873                            "snapshot-id" : 14,
874                            "sequence-number" : 3,
875                            "offset" : 4,
876                            "length" : 16
877                        }}
878                    ]
879                }}"#
880            ),
881        )
882        .await;
883
884        assert_eq!(
885            FileMetadata::read(&input_file)
886                .await
887                .unwrap_err()
888                .to_string(),
889            format!(
890                "DataInvalid => Given string is not valid JSON, source: invalid value: integer `{out_of_i32_range_number}`, expected i32 at line 5 column 51"
891            ),
892        )
893    }
894
895    #[tokio::test]
896    async fn test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() {
897        let temp_dir = TempDir::new().unwrap();
898
899        let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;
900
901        assert_eq!(
902            FileMetadata::read(&input_file)
903                .await
904                .unwrap_err()
905                .to_string(),
906            "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
907        )
908    }
909
910    #[tokio::test]
911    async fn test_read_file_metadata_of_uncompressed_empty_file() {
912        let input_file = java_empty_uncompressed_input_file();
913
914        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
915        assert_eq!(file_metadata, empty_footer_payload())
916    }
917
918    #[tokio::test]
919    async fn test_read_file_metadata_of_uncompressed_metric_data() {
920        let input_file = java_uncompressed_metric_input_file();
921
922        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
923        assert_eq!(file_metadata, uncompressed_metric_file_metadata())
924    }
925
926    #[tokio::test]
927    async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
928        let input_file = java_zstd_compressed_metric_input_file();
929
930        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
931            .await
932            .unwrap();
933        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
934    }
935
936    #[tokio::test]
937    async fn test_read_file_metadata_of_empty_file_with_prefetching() {
938        let input_file = java_empty_uncompressed_input_file();
939        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
940            .await
941            .unwrap();
942
943        assert_eq!(file_metadata, empty_footer_payload());
944    }
945
946    #[tokio::test]
947    async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
948        let input_file = java_uncompressed_metric_input_file();
949        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
950            .await
951            .unwrap();
952
953        assert_eq!(file_metadata, uncompressed_metric_file_metadata());
954    }
955
956    #[tokio::test]
957    async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
958        let input_file = java_zstd_compressed_metric_input_file();
959        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
960            .await
961            .unwrap();
962
963        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
964    }
965
966    #[tokio::test]
967    async fn test_read_with_incorrect_header_magic() {
968        let temp_dir = TempDir::new().unwrap();
969
970        let prefetch_hint: u8 = 64;
971        let mut bytes = vec![];
972        // Invalid header magic
973        bytes.extend([0x00, 0x00, 0x00, 0x00]);
974        // Intentionally keep file size larger than prefetch_hint.
975        bytes.extend(vec![0u8; prefetch_hint as usize]);
976        // Valid footer: magic + payload + footer struct
977        bytes.extend(FileMetadata::MAGIC);
978        bytes.extend(empty_footer_payload_bytes());
979        bytes.extend(empty_footer_payload_bytes_length_bytes());
980        bytes.extend(vec![0, 0, 0, 0]); // flags
981        bytes.extend(FileMetadata::MAGIC);
982
983        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
984
985        assert_eq!(
986            FileMetadata::read(&input_file).await.unwrap_err().kind(),
987            ErrorKind::DataInvalid,
988        );
989        assert_eq!(
990            FileMetadata::read_with_prefetch(&input_file, prefetch_hint)
991                .await
992                .unwrap_err()
993                .kind(),
994            ErrorKind::DataInvalid,
995        );
996    }
997
998    #[tokio::test]
999    async fn test_gzip_compression_allowed_in_metadata() {
1000        let temp_dir = TempDir::new().unwrap();
1001
1002        // Create a JSON payload with Gzip compression codec
1003        // Metadata should be readable, but accessing the blob will fail
1004        let payload = r#"{
1005            "blobs": [
1006                {
1007                    "type": "test-type",
1008                    "fields": [1],
1009                    "snapshot-id": 1,
1010                    "sequence-number": 1,
1011                    "offset": 4,
1012                    "length": 10,
1013                    "compression-codec": "gzip"
1014                }
1015            ]
1016        }"#;
1017
1018        let input_file = input_file_with_payload(&temp_dir, payload).await;
1019
1020        // Reading metadata should succeed (lazy validation)
1021        let result = FileMetadata::read(&input_file).await;
1022        assert!(result.is_ok());
1023        let metadata = result.unwrap();
1024        assert_eq!(metadata.blobs.len(), 1);
1025        assert_eq!(
1026            metadata.blobs[0].compression_codec,
1027            CompressionCodec::gzip_default()
1028        );
1029    }
1030}