iceberg/puffin/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21use serde::{Deserialize, Serialize};
22
23use crate::io::{FileRead, InputFile};
24use crate::puffin::compression::CompressionCodec;
25use crate::{Error, ErrorKind, Result};
26
27/// Human-readable identification of the application writing the file, along with its version.
28/// Example: "Trino version 381"
29pub const CREATED_BY_PROPERTY: &str = "created-by";
30
31/// Metadata about a blob.
32/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata
33#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34#[serde(rename_all = "kebab-case")]
35pub struct BlobMetadata {
36    pub(crate) r#type: String,
37    pub(crate) fields: Vec<i32>,
38    pub(crate) snapshot_id: i64,
39    pub(crate) sequence_number: i64,
40    pub(crate) offset: u64,
41    pub(crate) length: u64,
42    #[serde(skip_serializing_if = "CompressionCodec::is_none")]
43    #[serde(default)]
44    pub(crate) compression_codec: CompressionCodec,
45    #[serde(skip_serializing_if = "HashMap::is_empty")]
46    #[serde(default)]
47    pub(crate) properties: HashMap<String, String>,
48}
49
50impl BlobMetadata {
51    #[inline]
52    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
53    pub fn blob_type(&self) -> &str {
54        &self.r#type
55    }
56
57    #[inline]
58    /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
59    pub fn fields(&self) -> &[i32] {
60        &self.fields
61    }
62
63    #[inline]
64    /// ID of the Iceberg table's snapshot the blob was computed from
65    pub fn snapshot_id(&self) -> i64 {
66        self.snapshot_id
67    }
68
69    #[inline]
70    /// Sequence number of the Iceberg table's snapshot the blob was computed from
71    pub fn sequence_number(&self) -> i64 {
72        self.sequence_number
73    }
74
75    #[inline]
76    /// The offset in the file where the blob contents start
77    pub fn offset(&self) -> u64 {
78        self.offset
79    }
80
81    #[inline]
82    /// The length of the blob stored in the file (after compression, if compressed)
83    pub fn length(&self) -> u64 {
84        self.length
85    }
86
87    #[inline]
88    /// The compression codec used to compress the data
89    pub fn compression_codec(&self) -> CompressionCodec {
90        self.compression_codec
91    }
92
93    #[inline]
94    /// Arbitrary meta-information about the blob
95    pub fn properties(&self) -> &HashMap<String, String> {
96        &self.properties
97    }
98}
99
100#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
101pub(crate) enum Flag {
102    FooterPayloadCompressed = 0,
103}
104
105impl Flag {
106    pub(crate) fn byte_idx(self) -> u8 {
107        (self as u8) / 8
108    }
109
110    pub(crate) fn bit_idx(self) -> u8 {
111        (self as u8) % 8
112    }
113
114    fn matches(self, byte_idx: u8, bit_idx: u8) -> bool {
115        self.byte_idx() == byte_idx && self.bit_idx() == bit_idx
116    }
117
118    fn from(byte_idx: u8, bit_idx: u8) -> Result<Flag> {
119        if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) {
120            Ok(Flag::FooterPayloadCompressed)
121        } else {
122            Err(Error::new(
123                ErrorKind::DataInvalid,
124                format!("Unknown flag byte {byte_idx} and bit {bit_idx} combination"),
125            ))
126        }
127    }
128}
129
130/// Metadata about a puffin file.
131///
132/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata
133#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
134pub struct FileMetadata {
135    pub(crate) blobs: Vec<BlobMetadata>,
136    #[serde(skip_serializing_if = "HashMap::is_empty")]
137    #[serde(default)]
138    pub(crate) properties: HashMap<String, String>,
139}
140
141impl FileMetadata {
142    pub(crate) const MAGIC_LENGTH: u8 = 4;
143    pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];
144
145    /// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer.
146    /// The structure of the Footer specification is illustrated below:
147    ///
148    /// ```text                                             
149    ///        Footer
150    ///        ┌────────────────────┐                 
151    ///        │  Magic (4 bytes)   │                 
152    ///        │                    │                 
153    ///        ├────────────────────┤                 
154    ///        │   FooterPayload    │                 
155    ///        │  (PAYLOAD_LENGTH)  │                 
156    ///        ├────────────────────┤ ◀─┐             
157    ///        │ FooterPayloadSize  │   │             
158    ///        │     (4 bytes)      │   │             
159    ///        ├────────────────────┤                 
160    ///        │  Flags (4 bytes)   │  FOOTER_STRUCT  
161    ///        │                    │                 
162    ///        ├────────────────────┤   │             
163    ///        │  Magic (4 bytes)   │   │             
164    ///        │                    │   │             
165    ///        └────────────────────┘ ◀─┘  
166    /// ```                      
167    const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
168    const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
169    const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
170        + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
171    pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
172    const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
173        FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
174    pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
175        FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
176
177    /// Constructs new puffin `FileMetadata`
178    pub fn new(blobs: Vec<BlobMetadata>, properties: HashMap<String, String>) -> Self {
179        Self { blobs, properties }
180    }
181
182    fn check_magic(bytes: &[u8]) -> Result<()> {
183        if bytes == FileMetadata::MAGIC {
184            Ok(())
185        } else {
186            Err(Error::new(
187                ErrorKind::DataInvalid,
188                format!(
189                    "Bad magic value: {:?} should be {:?}",
190                    bytes,
191                    FileMetadata::MAGIC
192                ),
193            ))
194        }
195    }
196
197    async fn read_footer_payload_length(
198        file_read: &dyn FileRead,
199        input_file_length: u64,
200    ) -> Result<u32> {
201        let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as u64;
202        let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as u64;
203        let footer_payload_length_bytes = file_read.read(start..end).await?;
204        let mut buf = [0; 4];
205        buf.copy_from_slice(&footer_payload_length_bytes);
206        let footer_payload_length = u32::from_le_bytes(buf);
207        Ok(footer_payload_length)
208    }
209
210    async fn read_footer_bytes(
211        file_read: &dyn FileRead,
212        input_file_length: u64,
213        footer_payload_length: u32,
214    ) -> Result<Bytes> {
215        let footer_length = footer_payload_length as u64
216            + FileMetadata::FOOTER_STRUCT_LENGTH as u64
217            + FileMetadata::MAGIC_LENGTH as u64;
218        let start = input_file_length - footer_length;
219        let end = input_file_length;
220        file_read.read(start..end).await
221    }
222
223    fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
224        let mut flags = HashSet::new();
225
226        for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
227            let byte_offset = footer_bytes.len()
228                - FileMetadata::MAGIC_LENGTH as usize
229                - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize
230                + byte_idx as usize;
231
232            let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
233                Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.")
234            })?;
235
236            for bit_idx in 0..8 {
237                if ((flag_byte >> bit_idx) & 1) != 0 {
238                    let flag = Flag::from(byte_idx, bit_idx)?;
239                    flags.insert(flag);
240                }
241            }
242        }
243
244        Ok(flags)
245    }
246
247    fn extract_footer_payload_as_str(
248        footer_bytes: &[u8],
249        footer_payload_length: u32,
250    ) -> Result<String> {
251        let flags = FileMetadata::decode_flags(footer_bytes)?;
252        let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) {
253            CompressionCodec::Lz4
254        } else {
255            CompressionCodec::None
256        };
257
258        let start_offset = FileMetadata::MAGIC_LENGTH as usize;
259        let end_offset =
260            FileMetadata::MAGIC_LENGTH as usize + usize::try_from(footer_payload_length)?;
261        let footer_payload_bytes = footer_bytes
262            .get(start_offset..end_offset)
263            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is out of bounds."))?;
264        let decompressed_footer_payload_bytes =
265            footer_compression_codec.decompress(footer_payload_bytes.into())?;
266
267        String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
268            Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 string")
269                .with_source(src)
270        })
271    }
272
273    fn from_json_str(string: &str) -> Result<FileMetadata> {
274        serde_json::from_str::<FileMetadata>(string).map_err(|src| {
275            Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON").with_source(src)
276        })
277    }
278
279    /// Returns the file metadata about a Puffin file
280    pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
281        let file_read = input_file.reader().await?;
282
283        let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
284        FileMetadata::check_magic(&first_four_bytes)?;
285
286        let input_file_length = input_file.metadata().await?.size;
287        let footer_payload_length =
288            FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?;
289        let footer_bytes =
290            FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length)
291                .await?;
292
293        let magic_length = FileMetadata::MAGIC_LENGTH as usize;
294        // check first four bytes of footer
295        FileMetadata::check_magic(&footer_bytes[..magic_length])?;
296        // check last four bytes of footer
297        FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
298
299        let footer_payload_str =
300            FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
301
302        FileMetadata::from_json_str(&footer_payload_str)
303    }
304
305    /// Reads file_metadata in puffin file with a prefetch hint
306    ///
307    /// `prefetch_hint` is used to try to fetch the entire footer in one read. If
308    /// the entire footer isn't fetched in one read the function will call the regular
309    /// read option.
310    #[allow(dead_code)]
311    pub(crate) async fn read_with_prefetch(
312        input_file: &InputFile,
313        prefetch_hint: u8,
314    ) -> Result<FileMetadata> {
315        if prefetch_hint > 16 {
316            let input_file_length = input_file.metadata().await?.size;
317            let file_read = input_file.reader().await?;
318
319            // Hint cannot be larger than input file
320            if prefetch_hint as u64 > input_file_length {
321                return FileMetadata::read(input_file).await;
322            }
323
324            // Read footer based on prefetchi hint
325            let start = input_file_length - prefetch_hint as u64;
326            let end = input_file_length;
327            let footer_bytes = file_read.read(start..end).await?;
328
329            let payload_length_start =
330                footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
331            let payload_length_end =
332                payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
333            let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];
334
335            let mut buf = [0; 4];
336            buf.copy_from_slice(payload_length_bytes);
337            let footer_payload_length = u32::from_le_bytes(buf);
338
339            // If the (footer payload length + FOOTER_STRUCT_LENGTH + MAGIC_LENGTH) is greater
340            // than the fetched footer then you can have it read regularly from a read with no
341            // prefetch while passing in the footer_payload_length.
342            let footer_length = (footer_payload_length as usize)
343                + FileMetadata::FOOTER_STRUCT_LENGTH as usize
344                + FileMetadata::MAGIC_LENGTH as usize;
345            if footer_length > prefetch_hint as usize {
346                return FileMetadata::read(input_file).await;
347            }
348
349            // Read footer bytes
350            let footer_start = footer_bytes.len() - footer_length;
351            let footer_end = footer_bytes.len();
352            let footer_bytes = &footer_bytes[footer_start..footer_end];
353
354            let magic_length = FileMetadata::MAGIC_LENGTH as usize;
355            // check first four bytes of footer
356            FileMetadata::check_magic(&footer_bytes[..magic_length])?;
357            // check last four bytes of footer
358            FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
359
360            let footer_payload_str =
361                FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
362            return FileMetadata::from_json_str(&footer_payload_str);
363        }
364
365        FileMetadata::read(input_file).await
366    }
367
368    #[inline]
369    /// Metadata about blobs in file
370    pub fn blobs(&self) -> &[BlobMetadata] {
371        &self.blobs
372    }
373
374    #[inline]
375    /// Arbitrary meta-information, like writer identification/version.
376    pub fn properties(&self) -> &HashMap<String, String> {
377        &self.properties
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use std::collections::HashMap;
384
385    use bytes::Bytes;
386    use tempfile::TempDir;
387
388    use crate::io::{FileIOBuilder, InputFile};
389    use crate::puffin::metadata::{BlobMetadata, CompressionCodec, FileMetadata};
390    use crate::puffin::test_utils::{
391        empty_footer_payload, empty_footer_payload_bytes, empty_footer_payload_bytes_length_bytes,
392        java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
393        java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
394        zstd_compressed_metric_file_metadata,
395    };
396
397    const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0];
398
399    async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> InputFile {
400        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
401
402        let path_buf = temp_dir.path().join("abc.puffin");
403        let temp_path = path_buf.to_str().unwrap();
404        let output_file = file_io.new_output(temp_path).unwrap();
405
406        output_file
407            .write(Bytes::copy_from_slice(slice))
408            .await
409            .unwrap();
410
411        output_file.to_input_file()
412    }
413
414    async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> InputFile {
415        let payload_bytes = payload_str.as_bytes();
416
417        let mut bytes = vec![];
418        bytes.extend(FileMetadata::MAGIC.to_vec());
419        bytes.extend(FileMetadata::MAGIC.to_vec());
420        bytes.extend(payload_bytes);
421        bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32));
422        bytes.extend(vec![0, 0, 0, 0]);
423        bytes.extend(FileMetadata::MAGIC);
424
425        input_file_with_bytes(temp_dir, &bytes).await
426    }
427
428    #[tokio::test]
429    async fn test_file_starting_with_invalid_magic_returns_error() {
430        let temp_dir = TempDir::new().unwrap();
431
432        let mut bytes = vec![];
433        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
434        bytes.extend(FileMetadata::MAGIC.to_vec());
435        bytes.extend(empty_footer_payload_bytes());
436        bytes.extend(empty_footer_payload_bytes_length_bytes());
437        bytes.extend(vec![0, 0, 0, 0]);
438        bytes.extend(FileMetadata::MAGIC);
439
440        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
441
442        assert_eq!(
443            FileMetadata::read(&input_file)
444                .await
445                .unwrap_err()
446                .to_string(),
447            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
448        )
449    }
450
451    #[tokio::test]
452    async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() {
453        let temp_dir = TempDir::new().unwrap();
454
455        let mut bytes = vec![];
456        bytes.extend(FileMetadata::MAGIC.to_vec());
457        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
458        bytes.extend(empty_footer_payload_bytes());
459        bytes.extend(empty_footer_payload_bytes_length_bytes());
460        bytes.extend(vec![0, 0, 0, 0]);
461        bytes.extend(FileMetadata::MAGIC);
462
463        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
464
465        assert_eq!(
466            FileMetadata::read(&input_file)
467                .await
468                .unwrap_err()
469                .to_string(),
470            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
471        )
472    }
473
474    #[tokio::test]
475    async fn test_file_ending_with_invalid_magic_returns_error() {
476        let temp_dir = TempDir::new().unwrap();
477
478        let mut bytes = vec![];
479        bytes.extend(FileMetadata::MAGIC.to_vec());
480        bytes.extend(FileMetadata::MAGIC.to_vec());
481        bytes.extend(empty_footer_payload_bytes());
482        bytes.extend(empty_footer_payload_bytes_length_bytes());
483        bytes.extend(vec![0, 0, 0, 0]);
484        bytes.extend(INVALID_MAGIC_VALUE);
485
486        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
487
488        assert_eq!(
489            FileMetadata::read(&input_file)
490                .await
491                .unwrap_err()
492                .to_string(),
493            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
494        )
495    }
496
497    #[tokio::test]
498    async fn test_encoded_payload_length_larger_than_actual_payload_length_returns_error() {
499        let temp_dir = TempDir::new().unwrap();
500
501        let mut bytes = vec![];
502        bytes.extend(FileMetadata::MAGIC.to_vec());
503        bytes.extend(FileMetadata::MAGIC.to_vec());
504        bytes.extend(empty_footer_payload_bytes());
505        bytes.extend(u32::to_le_bytes(
506            empty_footer_payload_bytes().len() as u32 + 1,
507        ));
508        bytes.extend(vec![0, 0, 0, 0]);
509        bytes.extend(FileMetadata::MAGIC.to_vec());
510
511        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
512
513        assert_eq!(
514            FileMetadata::read(&input_file)
515                .await
516                .unwrap_err()
517                .to_string(),
518            "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 70, 65, 49]",
519        )
520    }
521
522    #[tokio::test]
523    async fn test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() {
524        let temp_dir = TempDir::new().unwrap();
525
526        let mut bytes = vec![];
527        bytes.extend(FileMetadata::MAGIC.to_vec());
528        bytes.extend(FileMetadata::MAGIC.to_vec());
529        bytes.extend(empty_footer_payload_bytes());
530        bytes.extend(u32::to_le_bytes(
531            empty_footer_payload_bytes().len() as u32 - 1,
532        ));
533        bytes.extend(vec![0, 0, 0, 0]);
534        bytes.extend(FileMetadata::MAGIC.to_vec());
535
536        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
537
538        assert_eq!(
539            FileMetadata::read(&input_file)
540                .await
541                .unwrap_err()
542                .to_string(),
543            "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 70, 65, 49]",
544        )
545    }
546
547    #[tokio::test]
548    async fn test_lz4_compressed_footer_returns_error() {
549        let temp_dir = TempDir::new().unwrap();
550
551        let mut bytes = vec![];
552        bytes.extend(FileMetadata::MAGIC.to_vec());
553        bytes.extend(FileMetadata::MAGIC.to_vec());
554        bytes.extend(empty_footer_payload_bytes());
555        bytes.extend(empty_footer_payload_bytes_length_bytes());
556        bytes.extend(vec![0b00000001, 0, 0, 0]);
557        bytes.extend(FileMetadata::MAGIC.to_vec());
558
559        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
560
561        assert_eq!(
562            FileMetadata::read(&input_file)
563                .await
564                .unwrap_err()
565                .to_string(),
566            "FeatureUnsupported => LZ4 decompression is not supported currently",
567        )
568    }
569
570    #[tokio::test]
571    async fn test_unknown_byte_bit_combination_returns_error() {
572        let temp_dir = TempDir::new().unwrap();
573
574        let mut bytes = vec![];
575        bytes.extend(FileMetadata::MAGIC.to_vec());
576        bytes.extend(FileMetadata::MAGIC.to_vec());
577        bytes.extend(empty_footer_payload_bytes());
578        bytes.extend(empty_footer_payload_bytes_length_bytes());
579        bytes.extend(vec![0b00000010, 0, 0, 0]);
580        bytes.extend(FileMetadata::MAGIC.to_vec());
581
582        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
583
584        assert_eq!(
585            FileMetadata::read(&input_file)
586                .await
587                .unwrap_err()
588                .to_string(),
589            "DataInvalid => Unknown flag byte 0 and bit 1 combination",
590        )
591    }
592
593    #[tokio::test]
594    async fn test_non_utf8_string_payload_returns_error() {
595        let temp_dir = TempDir::new().unwrap();
596
597        let payload_bytes: [u8; 4] = [0, 159, 146, 150];
598        let payload_bytes_length_bytes: [u8; 4] = u32::to_le_bytes(payload_bytes.len() as u32);
599
600        let mut bytes = vec![];
601        bytes.extend(FileMetadata::MAGIC.to_vec());
602        bytes.extend(FileMetadata::MAGIC.to_vec());
603        bytes.extend(payload_bytes);
604        bytes.extend(payload_bytes_length_bytes);
605        bytes.extend(vec![0, 0, 0, 0]);
606        bytes.extend(FileMetadata::MAGIC.to_vec());
607
608        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
609
610        assert_eq!(
611            FileMetadata::read(&input_file)
612                .await
613                .unwrap_err()
614                .to_string(),
615            "DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1",
616        )
617    }
618
619    #[tokio::test]
620    async fn test_minimal_valid_file_returns_file_metadata() {
621        let temp_dir = TempDir::new().unwrap();
622
623        let mut bytes = vec![];
624        bytes.extend(FileMetadata::MAGIC.to_vec());
625        bytes.extend(FileMetadata::MAGIC.to_vec());
626        bytes.extend(empty_footer_payload_bytes());
627        bytes.extend(empty_footer_payload_bytes_length_bytes());
628        bytes.extend(vec![0, 0, 0, 0]);
629        bytes.extend(FileMetadata::MAGIC);
630
631        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
632
633        assert_eq!(
634            FileMetadata::read(&input_file).await.unwrap(),
635            FileMetadata {
636                blobs: vec![],
637                properties: HashMap::new(),
638            }
639        )
640    }
641
642    #[tokio::test]
643    async fn test_returns_file_metadata_property() {
644        let temp_dir = TempDir::new().unwrap();
645
646        let input_file = input_file_with_payload(
647            &temp_dir,
648            r#"{
649                "blobs" : [ ],
650                "properties" : {
651                    "a property" : "a property value"
652                }
653            }"#,
654        )
655        .await;
656
657        assert_eq!(
658            FileMetadata::read(&input_file).await.unwrap(),
659            FileMetadata {
660                blobs: vec![],
661                properties: {
662                    let mut map = HashMap::new();
663                    map.insert("a property".to_string(), "a property value".to_string());
664                    map
665                },
666            }
667        )
668    }
669
670    #[tokio::test]
671    async fn test_returns_file_metadata_properties() {
672        let temp_dir = TempDir::new().unwrap();
673
674        let input_file = input_file_with_payload(
675            &temp_dir,
676            r#"{
677                "blobs" : [ ],
678                "properties" : {
679                    "a property" : "a property value",
680                    "another one": "also with value"
681                }
682            }"#,
683        )
684        .await;
685
686        assert_eq!(
687            FileMetadata::read(&input_file).await.unwrap(),
688            FileMetadata {
689                blobs: vec![],
690                properties: {
691                    let mut map = HashMap::new();
692                    map.insert("a property".to_string(), "a property value".to_string());
693                    map.insert("another one".to_string(), "also with value".to_string());
694                    map
695                },
696            }
697        )
698    }
699
700    #[tokio::test]
701    async fn test_returns_error_if_blobs_field_is_missing() {
702        let temp_dir = TempDir::new().unwrap();
703
704        let input_file = input_file_with_payload(
705            &temp_dir,
706            r#"{
707                "properties" : {}
708            }"#,
709        )
710        .await;
711
712        assert_eq!(
713            FileMetadata::read(&input_file)
714                .await
715                .unwrap_err()
716                .to_string(),
717            format!(
718                "DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13"
719            ),
720        )
721    }
722
723    #[tokio::test]
724    async fn test_returns_error_if_blobs_field_is_bad() {
725        let temp_dir = TempDir::new().unwrap();
726
727        let input_file = input_file_with_payload(
728            &temp_dir,
729            r#"{
730                "blobs" : {}
731            }"#,
732        )
733        .await;
734
735        assert_eq!(
736            FileMetadata::read(&input_file)
737                .await
738                .unwrap_err()
739                .to_string(),
740            format!(
741                "DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"
742            ),
743        )
744    }
745
746    #[tokio::test]
747    async fn test_returns_blobs_metadatas() {
748        let temp_dir = TempDir::new().unwrap();
749
750        let input_file = input_file_with_payload(
751            &temp_dir,
752            r#"{
753                "blobs" : [
754                    {
755                        "type" : "type-a",
756                        "fields" : [ 1 ],
757                        "snapshot-id" : 14,
758                        "sequence-number" : 3,
759                        "offset" : 4,
760                        "length" : 16
761                    },
762                    {
763                        "type" : "type-bbb",
764                        "fields" : [ 2, 3, 4 ],
765                        "snapshot-id" : 77,
766                        "sequence-number" : 4,
767                        "offset" : 21474836470000,
768                        "length" : 79834
769                    }
770                ]
771            }"#,
772        )
773        .await;
774
775        assert_eq!(
776            FileMetadata::read(&input_file).await.unwrap(),
777            FileMetadata {
778                blobs: vec![
779                    BlobMetadata {
780                        r#type: "type-a".to_string(),
781                        fields: vec![1],
782                        snapshot_id: 14,
783                        sequence_number: 3,
784                        offset: 4,
785                        length: 16,
786                        compression_codec: CompressionCodec::None,
787                        properties: HashMap::new(),
788                    },
789                    BlobMetadata {
790                        r#type: "type-bbb".to_string(),
791                        fields: vec![2, 3, 4],
792                        snapshot_id: 77,
793                        sequence_number: 4,
794                        offset: 21474836470000,
795                        length: 79834,
796                        compression_codec: CompressionCodec::None,
797                        properties: HashMap::new(),
798                    },
799                ],
800                properties: HashMap::new(),
801            }
802        )
803    }
804
805    #[tokio::test]
806    async fn test_returns_properties_in_blob_metadata() {
807        let temp_dir = TempDir::new().unwrap();
808
809        let input_file = input_file_with_payload(
810            &temp_dir,
811            r#"{
812                "blobs" : [
813                    {
814                        "type" : "type-a",
815                        "fields" : [ 1 ],
816                        "snapshot-id" : 14,
817                        "sequence-number" : 3,
818                        "offset" : 4,
819                        "length" : 16,
820                        "properties" : {
821                            "some key" : "some value"
822                        }
823                    }
824                ]
825            }"#,
826        )
827        .await;
828
829        assert_eq!(
830            FileMetadata::read(&input_file).await.unwrap(),
831            FileMetadata {
832                blobs: vec![BlobMetadata {
833                    r#type: "type-a".to_string(),
834                    fields: vec![1],
835                    snapshot_id: 14,
836                    sequence_number: 3,
837                    offset: 4,
838                    length: 16,
839                    compression_codec: CompressionCodec::None,
840                    properties: {
841                        let mut map = HashMap::new();
842                        map.insert("some key".to_string(), "some value".to_string());
843                        map
844                    },
845                }],
846                properties: HashMap::new(),
847            }
848        )
849    }
850
851    #[tokio::test]
852    async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() {
853        let temp_dir = TempDir::new().unwrap();
854
855        let out_of_i32_range_number: i64 = i32::MAX as i64 + 1;
856
857        let input_file = input_file_with_payload(
858            &temp_dir,
859            &format!(
860                r#"{{
861                    "blobs" : [
862                        {{
863                            "type" : "type-a",
864                            "fields" : [ {out_of_i32_range_number} ],
865                            "snapshot-id" : 14,
866                            "sequence-number" : 3,
867                            "offset" : 4,
868                            "length" : 16
869                        }}
870                    ]
871                }}"#
872            ),
873        )
874        .await;
875
876        assert_eq!(
877            FileMetadata::read(&input_file)
878                .await
879                .unwrap_err()
880                .to_string(),
881            format!(
882                "DataInvalid => Given string is not valid JSON, source: invalid value: integer `{out_of_i32_range_number}`, expected i32 at line 5 column 51"
883            ),
884        )
885    }
886
887    #[tokio::test]
888    async fn test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() {
889        let temp_dir = TempDir::new().unwrap();
890
891        let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;
892
893        assert_eq!(
894            FileMetadata::read(&input_file)
895                .await
896                .unwrap_err()
897                .to_string(),
898            "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
899        )
900    }
901
902    #[tokio::test]
903    async fn test_read_file_metadata_of_uncompressed_empty_file() {
904        let input_file = java_empty_uncompressed_input_file();
905
906        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
907        assert_eq!(file_metadata, empty_footer_payload())
908    }
909
910    #[tokio::test]
911    async fn test_read_file_metadata_of_uncompressed_metric_data() {
912        let input_file = java_uncompressed_metric_input_file();
913
914        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
915        assert_eq!(file_metadata, uncompressed_metric_file_metadata())
916    }
917
918    #[tokio::test]
919    async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
920        let input_file = java_zstd_compressed_metric_input_file();
921
922        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
923            .await
924            .unwrap();
925        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
926    }
927
928    #[tokio::test]
929    async fn test_read_file_metadata_of_empty_file_with_prefetching() {
930        let input_file = java_empty_uncompressed_input_file();
931        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
932            .await
933            .unwrap();
934
935        assert_eq!(file_metadata, empty_footer_payload());
936    }
937
938    #[tokio::test]
939    async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
940        let input_file = java_uncompressed_metric_input_file();
941        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
942            .await
943            .unwrap();
944
945        assert_eq!(file_metadata, uncompressed_metric_file_metadata());
946    }
947
948    #[tokio::test]
949    async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
950        let input_file = java_zstd_compressed_metric_input_file();
951        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
952            .await
953            .unwrap();
954
955        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
956    }
957}