1use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21use serde::{Deserialize, Serialize};
22
23use crate::compression::CompressionCodec;
24use crate::io::{FileRead, InputFile};
25use crate::{Error, ErrorKind, Result};
26
27pub const CREATED_BY_PROPERTY: &str = "created-by";
30
31#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34#[serde(rename_all = "kebab-case")]
35pub struct BlobMetadata {
36 pub(crate) r#type: String,
37 pub(crate) fields: Vec<i32>,
38 pub(crate) snapshot_id: i64,
39 pub(crate) sequence_number: i64,
40 pub(crate) offset: u64,
41 pub(crate) length: u64,
42 #[serde(skip_serializing_if = "CompressionCodec::is_none")]
43 #[serde(default)]
44 pub(crate) compression_codec: CompressionCodec,
45 #[serde(skip_serializing_if = "HashMap::is_empty")]
46 #[serde(default)]
47 pub(crate) properties: HashMap<String, String>,
48}
49
50impl BlobMetadata {
51 #[inline]
52 pub fn blob_type(&self) -> &str {
54 &self.r#type
55 }
56
57 #[inline]
58 pub fn fields(&self) -> &[i32] {
60 &self.fields
61 }
62
63 #[inline]
64 pub fn snapshot_id(&self) -> i64 {
66 self.snapshot_id
67 }
68
69 #[inline]
70 pub fn sequence_number(&self) -> i64 {
72 self.sequence_number
73 }
74
75 #[inline]
76 pub fn offset(&self) -> u64 {
78 self.offset
79 }
80
81 #[inline]
82 pub fn length(&self) -> u64 {
84 self.length
85 }
86
87 #[inline]
88 pub fn compression_codec(&self) -> CompressionCodec {
90 self.compression_codec
91 }
92
93 #[inline]
94 pub fn properties(&self) -> &HashMap<String, String> {
96 &self.properties
97 }
98}
99
100#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
101pub(crate) enum Flag {
102 FooterPayloadCompressed = 0,
103}
104
105impl Flag {
106 pub(crate) fn byte_idx(self) -> u8 {
107 (self as u8) / 8
108 }
109
110 pub(crate) fn bit_idx(self) -> u8 {
111 (self as u8) % 8
112 }
113
114 fn matches(self, byte_idx: u8, bit_idx: u8) -> bool {
115 self.byte_idx() == byte_idx && self.bit_idx() == bit_idx
116 }
117
118 fn from(byte_idx: u8, bit_idx: u8) -> Result<Flag> {
119 if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) {
120 Ok(Flag::FooterPayloadCompressed)
121 } else {
122 Err(Error::new(
123 ErrorKind::DataInvalid,
124 format!("Unknown flag byte {byte_idx} and bit {bit_idx} combination"),
125 ))
126 }
127 }
128}
129
130#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
134pub struct FileMetadata {
135 pub(crate) blobs: Vec<BlobMetadata>,
136 #[serde(skip_serializing_if = "HashMap::is_empty")]
137 #[serde(default)]
138 pub(crate) properties: HashMap<String, String>,
139}
140
141impl FileMetadata {
142 pub(crate) const MAGIC_LENGTH: u8 = 4;
143 pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];
144
145 const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
168 const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
169 const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
170 + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
171 pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
172 const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
173 FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
174 pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
175 FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
176
177 pub fn new(blobs: Vec<BlobMetadata>, properties: HashMap<String, String>) -> Self {
179 Self { blobs, properties }
180 }
181
182 fn check_magic(bytes: &[u8]) -> Result<()> {
183 if bytes == FileMetadata::MAGIC {
184 Ok(())
185 } else {
186 Err(Error::new(
187 ErrorKind::DataInvalid,
188 format!(
189 "Bad magic value: {:?} should be {:?}",
190 bytes,
191 FileMetadata::MAGIC
192 ),
193 ))
194 }
195 }
196
197 async fn read_footer_payload_length(
198 file_read: &dyn FileRead,
199 input_file_length: u64,
200 ) -> Result<u32> {
201 let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as u64;
202 let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as u64;
203 let footer_payload_length_bytes = file_read.read(start..end).await?;
204 let mut buf = [0; 4];
205 buf.copy_from_slice(&footer_payload_length_bytes);
206 let footer_payload_length = u32::from_le_bytes(buf);
207 Ok(footer_payload_length)
208 }
209
210 async fn read_footer_bytes(
211 file_read: &dyn FileRead,
212 input_file_length: u64,
213 footer_payload_length: u32,
214 ) -> Result<Bytes> {
215 let footer_length = footer_payload_length as u64
216 + FileMetadata::FOOTER_STRUCT_LENGTH as u64
217 + FileMetadata::MAGIC_LENGTH as u64;
218 let start = input_file_length - footer_length;
219 let end = input_file_length;
220 file_read.read(start..end).await
221 }
222
223 fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
224 let mut flags = HashSet::new();
225
226 for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
227 let byte_offset = footer_bytes.len()
228 - FileMetadata::MAGIC_LENGTH as usize
229 - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize
230 + byte_idx as usize;
231
232 let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
233 Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.")
234 })?;
235
236 for bit_idx in 0..8 {
237 if ((flag_byte >> bit_idx) & 1) != 0 {
238 let flag = Flag::from(byte_idx, bit_idx)?;
239 flags.insert(flag);
240 }
241 }
242 }
243
244 Ok(flags)
245 }
246
247 fn extract_footer_payload_as_str(
248 footer_bytes: &[u8],
249 footer_payload_length: u32,
250 ) -> Result<String> {
251 let flags = FileMetadata::decode_flags(footer_bytes)?;
252 let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) {
253 CompressionCodec::Lz4
254 } else {
255 CompressionCodec::None
256 };
257
258 let start_offset = FileMetadata::MAGIC_LENGTH as usize;
259 let end_offset =
260 FileMetadata::MAGIC_LENGTH as usize + usize::try_from(footer_payload_length)?;
261 let footer_payload_bytes = footer_bytes
262 .get(start_offset..end_offset)
263 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is out of bounds."))?;
264 let decompressed_footer_payload_bytes =
265 footer_compression_codec.decompress(footer_payload_bytes.into())?;
266
267 String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
268 Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 string")
269 .with_source(src)
270 })
271 }
272
273 fn from_json_str(string: &str) -> Result<FileMetadata> {
274 serde_json::from_str::<FileMetadata>(string).map_err(|src| {
275 Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON").with_source(src)
276 })
277 }
278
279 pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
281 let file_read = input_file.reader().await?;
282
283 let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
284 FileMetadata::check_magic(&first_four_bytes)?;
285
286 let input_file_length = input_file.metadata().await?.size;
287 let footer_payload_length =
288 FileMetadata::read_footer_payload_length(file_read.as_ref(), input_file_length).await?;
289 let footer_bytes = FileMetadata::read_footer_bytes(
290 file_read.as_ref(),
291 input_file_length,
292 footer_payload_length,
293 )
294 .await?;
295
296 let magic_length = FileMetadata::MAGIC_LENGTH as usize;
297 FileMetadata::check_magic(&footer_bytes[..magic_length])?;
299 FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
301
302 let footer_payload_str =
303 FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
304
305 FileMetadata::from_json_str(&footer_payload_str)
306 }
307
308 #[allow(dead_code)]
314 pub(crate) async fn read_with_prefetch(
315 input_file: &InputFile,
316 prefetch_hint: u8,
317 ) -> Result<FileMetadata> {
318 if prefetch_hint > 16 {
319 let input_file_length = input_file.metadata().await?.size;
320 let file_read = input_file.reader().await?;
321
322 if prefetch_hint as u64 > input_file_length {
324 return FileMetadata::read(input_file).await;
325 }
326
327 let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
329 FileMetadata::check_magic(&first_four_bytes)?;
330
331 let start = input_file_length - prefetch_hint as u64;
333 let end = input_file_length;
334 let footer_bytes = file_read.read(start..end).await?;
335
336 let payload_length_start =
337 footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
338 let payload_length_end =
339 payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
340 let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];
341
342 let mut buf = [0; 4];
343 buf.copy_from_slice(payload_length_bytes);
344 let footer_payload_length = u32::from_le_bytes(buf);
345
346 let footer_length = (footer_payload_length as usize)
350 + FileMetadata::FOOTER_STRUCT_LENGTH as usize
351 + FileMetadata::MAGIC_LENGTH as usize;
352 if footer_length > prefetch_hint as usize {
353 return FileMetadata::read(input_file).await;
354 }
355
356 let footer_start = footer_bytes.len() - footer_length;
358 let footer_end = footer_bytes.len();
359 let footer_bytes = &footer_bytes[footer_start..footer_end];
360
361 let magic_length = FileMetadata::MAGIC_LENGTH as usize;
362 FileMetadata::check_magic(&footer_bytes[..magic_length])?;
364 FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
366
367 let footer_payload_str =
368 FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
369 return FileMetadata::from_json_str(&footer_payload_str);
370 }
371
372 FileMetadata::read(input_file).await
373 }
374
375 #[inline]
376 pub fn blobs(&self) -> &[BlobMetadata] {
378 &self.blobs
379 }
380
381 #[inline]
382 pub fn properties(&self) -> &HashMap<String, String> {
384 &self.properties
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use std::collections::HashMap;
391
392 use bytes::Bytes;
393 use tempfile::TempDir;
394
395 use crate::ErrorKind;
396 use crate::io::{FileIO, InputFile};
397 use crate::puffin::metadata::{BlobMetadata, CompressionCodec, FileMetadata};
398 use crate::puffin::test_utils::{
399 empty_footer_payload, empty_footer_payload_bytes, empty_footer_payload_bytes_length_bytes,
400 java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
401 java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
402 zstd_compressed_metric_file_metadata,
403 };
404
405 const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0];
406
407 async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> InputFile {
408 let file_io = FileIO::new_with_fs();
409
410 let path_buf = temp_dir.path().join("abc.puffin");
411 let temp_path = path_buf.to_str().unwrap();
412 let output_file = file_io.new_output(temp_path).unwrap();
413
414 output_file
415 .write(Bytes::copy_from_slice(slice))
416 .await
417 .unwrap();
418
419 output_file.to_input_file()
420 }
421
422 async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> InputFile {
423 let payload_bytes = payload_str.as_bytes();
424
425 let mut bytes = vec![];
426 bytes.extend(FileMetadata::MAGIC.to_vec());
427 bytes.extend(FileMetadata::MAGIC.to_vec());
428 bytes.extend(payload_bytes);
429 bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32));
430 bytes.extend(vec![0, 0, 0, 0]);
431 bytes.extend(FileMetadata::MAGIC);
432
433 input_file_with_bytes(temp_dir, &bytes).await
434 }
435
436 #[tokio::test]
437 async fn test_file_starting_with_invalid_magic_returns_error() {
438 let temp_dir = TempDir::new().unwrap();
439
440 let mut bytes = vec![];
441 bytes.extend(INVALID_MAGIC_VALUE.to_vec());
442 bytes.extend(FileMetadata::MAGIC.to_vec());
443 bytes.extend(empty_footer_payload_bytes());
444 bytes.extend(empty_footer_payload_bytes_length_bytes());
445 bytes.extend(vec![0, 0, 0, 0]);
446 bytes.extend(FileMetadata::MAGIC);
447
448 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
449
450 assert_eq!(
451 FileMetadata::read(&input_file)
452 .await
453 .unwrap_err()
454 .to_string(),
455 "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
456 )
457 }
458
459 #[tokio::test]
460 async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() {
461 let temp_dir = TempDir::new().unwrap();
462
463 let mut bytes = vec![];
464 bytes.extend(FileMetadata::MAGIC.to_vec());
465 bytes.extend(INVALID_MAGIC_VALUE.to_vec());
466 bytes.extend(empty_footer_payload_bytes());
467 bytes.extend(empty_footer_payload_bytes_length_bytes());
468 bytes.extend(vec![0, 0, 0, 0]);
469 bytes.extend(FileMetadata::MAGIC);
470
471 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
472
473 assert_eq!(
474 FileMetadata::read(&input_file)
475 .await
476 .unwrap_err()
477 .to_string(),
478 "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
479 )
480 }
481
482 #[tokio::test]
483 async fn test_file_ending_with_invalid_magic_returns_error() {
484 let temp_dir = TempDir::new().unwrap();
485
486 let mut bytes = vec![];
487 bytes.extend(FileMetadata::MAGIC.to_vec());
488 bytes.extend(FileMetadata::MAGIC.to_vec());
489 bytes.extend(empty_footer_payload_bytes());
490 bytes.extend(empty_footer_payload_bytes_length_bytes());
491 bytes.extend(vec![0, 0, 0, 0]);
492 bytes.extend(INVALID_MAGIC_VALUE);
493
494 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
495
496 assert_eq!(
497 FileMetadata::read(&input_file)
498 .await
499 .unwrap_err()
500 .to_string(),
501 "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
502 )
503 }
504
505 #[tokio::test]
506 async fn test_encoded_payload_length_larger_than_actual_payload_length_returns_error() {
507 let temp_dir = TempDir::new().unwrap();
508
509 let mut bytes = vec![];
510 bytes.extend(FileMetadata::MAGIC.to_vec());
511 bytes.extend(FileMetadata::MAGIC.to_vec());
512 bytes.extend(empty_footer_payload_bytes());
513 bytes.extend(u32::to_le_bytes(
514 empty_footer_payload_bytes().len() as u32 + 1,
515 ));
516 bytes.extend(vec![0, 0, 0, 0]);
517 bytes.extend(FileMetadata::MAGIC.to_vec());
518
519 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
520
521 assert_eq!(
522 FileMetadata::read(&input_file)
523 .await
524 .unwrap_err()
525 .to_string(),
526 "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 70, 65, 49]",
527 )
528 }
529
530 #[tokio::test]
531 async fn test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() {
532 let temp_dir = TempDir::new().unwrap();
533
534 let mut bytes = vec![];
535 bytes.extend(FileMetadata::MAGIC.to_vec());
536 bytes.extend(FileMetadata::MAGIC.to_vec());
537 bytes.extend(empty_footer_payload_bytes());
538 bytes.extend(u32::to_le_bytes(
539 empty_footer_payload_bytes().len() as u32 - 1,
540 ));
541 bytes.extend(vec![0, 0, 0, 0]);
542 bytes.extend(FileMetadata::MAGIC.to_vec());
543
544 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
545
546 assert_eq!(
547 FileMetadata::read(&input_file)
548 .await
549 .unwrap_err()
550 .to_string(),
551 "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 70, 65, 49]",
552 )
553 }
554
555 #[tokio::test]
556 async fn test_lz4_compressed_footer_returns_error() {
557 let temp_dir = TempDir::new().unwrap();
558
559 let mut bytes = vec![];
560 bytes.extend(FileMetadata::MAGIC.to_vec());
561 bytes.extend(FileMetadata::MAGIC.to_vec());
562 bytes.extend(empty_footer_payload_bytes());
563 bytes.extend(empty_footer_payload_bytes_length_bytes());
564 bytes.extend(vec![0b00000001, 0, 0, 0]);
565 bytes.extend(FileMetadata::MAGIC.to_vec());
566
567 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
568
569 assert_eq!(
570 FileMetadata::read(&input_file)
571 .await
572 .unwrap_err()
573 .to_string(),
574 "FeatureUnsupported => LZ4 decompression is not supported currently",
575 )
576 }
577
578 #[tokio::test]
579 async fn test_unknown_byte_bit_combination_returns_error() {
580 let temp_dir = TempDir::new().unwrap();
581
582 let mut bytes = vec![];
583 bytes.extend(FileMetadata::MAGIC.to_vec());
584 bytes.extend(FileMetadata::MAGIC.to_vec());
585 bytes.extend(empty_footer_payload_bytes());
586 bytes.extend(empty_footer_payload_bytes_length_bytes());
587 bytes.extend(vec![0b00000010, 0, 0, 0]);
588 bytes.extend(FileMetadata::MAGIC.to_vec());
589
590 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
591
592 assert_eq!(
593 FileMetadata::read(&input_file)
594 .await
595 .unwrap_err()
596 .to_string(),
597 "DataInvalid => Unknown flag byte 0 and bit 1 combination",
598 )
599 }
600
601 #[tokio::test]
602 async fn test_non_utf8_string_payload_returns_error() {
603 let temp_dir = TempDir::new().unwrap();
604
605 let payload_bytes: [u8; 4] = [0, 159, 146, 150];
606 let payload_bytes_length_bytes: [u8; 4] = u32::to_le_bytes(payload_bytes.len() as u32);
607
608 let mut bytes = vec![];
609 bytes.extend(FileMetadata::MAGIC.to_vec());
610 bytes.extend(FileMetadata::MAGIC.to_vec());
611 bytes.extend(payload_bytes);
612 bytes.extend(payload_bytes_length_bytes);
613 bytes.extend(vec![0, 0, 0, 0]);
614 bytes.extend(FileMetadata::MAGIC.to_vec());
615
616 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
617
618 assert_eq!(
619 FileMetadata::read(&input_file)
620 .await
621 .unwrap_err()
622 .to_string(),
623 "DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1",
624 )
625 }
626
627 #[tokio::test]
628 async fn test_minimal_valid_file_returns_file_metadata() {
629 let temp_dir = TempDir::new().unwrap();
630
631 let mut bytes = vec![];
632 bytes.extend(FileMetadata::MAGIC.to_vec());
633 bytes.extend(FileMetadata::MAGIC.to_vec());
634 bytes.extend(empty_footer_payload_bytes());
635 bytes.extend(empty_footer_payload_bytes_length_bytes());
636 bytes.extend(vec![0, 0, 0, 0]);
637 bytes.extend(FileMetadata::MAGIC);
638
639 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
640
641 assert_eq!(
642 FileMetadata::read(&input_file).await.unwrap(),
643 FileMetadata {
644 blobs: vec![],
645 properties: HashMap::new(),
646 }
647 )
648 }
649
650 #[tokio::test]
651 async fn test_returns_file_metadata_property() {
652 let temp_dir = TempDir::new().unwrap();
653
654 let input_file = input_file_with_payload(
655 &temp_dir,
656 r#"{
657 "blobs" : [ ],
658 "properties" : {
659 "a property" : "a property value"
660 }
661 }"#,
662 )
663 .await;
664
665 assert_eq!(
666 FileMetadata::read(&input_file).await.unwrap(),
667 FileMetadata {
668 blobs: vec![],
669 properties: {
670 let mut map = HashMap::new();
671 map.insert("a property".to_string(), "a property value".to_string());
672 map
673 },
674 }
675 )
676 }
677
678 #[tokio::test]
679 async fn test_returns_file_metadata_properties() {
680 let temp_dir = TempDir::new().unwrap();
681
682 let input_file = input_file_with_payload(
683 &temp_dir,
684 r#"{
685 "blobs" : [ ],
686 "properties" : {
687 "a property" : "a property value",
688 "another one": "also with value"
689 }
690 }"#,
691 )
692 .await;
693
694 assert_eq!(
695 FileMetadata::read(&input_file).await.unwrap(),
696 FileMetadata {
697 blobs: vec![],
698 properties: {
699 let mut map = HashMap::new();
700 map.insert("a property".to_string(), "a property value".to_string());
701 map.insert("another one".to_string(), "also with value".to_string());
702 map
703 },
704 }
705 )
706 }
707
708 #[tokio::test]
709 async fn test_returns_error_if_blobs_field_is_missing() {
710 let temp_dir = TempDir::new().unwrap();
711
712 let input_file = input_file_with_payload(
713 &temp_dir,
714 r#"{
715 "properties" : {}
716 }"#,
717 )
718 .await;
719
720 assert_eq!(
721 FileMetadata::read(&input_file)
722 .await
723 .unwrap_err()
724 .to_string(),
725 format!(
726 "DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13"
727 ),
728 )
729 }
730
731 #[tokio::test]
732 async fn test_returns_error_if_blobs_field_is_bad() {
733 let temp_dir = TempDir::new().unwrap();
734
735 let input_file = input_file_with_payload(
736 &temp_dir,
737 r#"{
738 "blobs" : {}
739 }"#,
740 )
741 .await;
742
743 assert_eq!(
744 FileMetadata::read(&input_file)
745 .await
746 .unwrap_err()
747 .to_string(),
748 format!(
749 "DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"
750 ),
751 )
752 }
753
754 #[tokio::test]
755 async fn test_returns_blobs_metadatas() {
756 let temp_dir = TempDir::new().unwrap();
757
758 let input_file = input_file_with_payload(
759 &temp_dir,
760 r#"{
761 "blobs" : [
762 {
763 "type" : "type-a",
764 "fields" : [ 1 ],
765 "snapshot-id" : 14,
766 "sequence-number" : 3,
767 "offset" : 4,
768 "length" : 16
769 },
770 {
771 "type" : "type-bbb",
772 "fields" : [ 2, 3, 4 ],
773 "snapshot-id" : 77,
774 "sequence-number" : 4,
775 "offset" : 21474836470000,
776 "length" : 79834
777 }
778 ]
779 }"#,
780 )
781 .await;
782
783 assert_eq!(
784 FileMetadata::read(&input_file).await.unwrap(),
785 FileMetadata {
786 blobs: vec![
787 BlobMetadata {
788 r#type: "type-a".to_string(),
789 fields: vec![1],
790 snapshot_id: 14,
791 sequence_number: 3,
792 offset: 4,
793 length: 16,
794 compression_codec: CompressionCodec::None,
795 properties: HashMap::new(),
796 },
797 BlobMetadata {
798 r#type: "type-bbb".to_string(),
799 fields: vec![2, 3, 4],
800 snapshot_id: 77,
801 sequence_number: 4,
802 offset: 21474836470000,
803 length: 79834,
804 compression_codec: CompressionCodec::None,
805 properties: HashMap::new(),
806 },
807 ],
808 properties: HashMap::new(),
809 }
810 )
811 }
812
813 #[tokio::test]
814 async fn test_returns_properties_in_blob_metadata() {
815 let temp_dir = TempDir::new().unwrap();
816
817 let input_file = input_file_with_payload(
818 &temp_dir,
819 r#"{
820 "blobs" : [
821 {
822 "type" : "type-a",
823 "fields" : [ 1 ],
824 "snapshot-id" : 14,
825 "sequence-number" : 3,
826 "offset" : 4,
827 "length" : 16,
828 "properties" : {
829 "some key" : "some value"
830 }
831 }
832 ]
833 }"#,
834 )
835 .await;
836
837 assert_eq!(
838 FileMetadata::read(&input_file).await.unwrap(),
839 FileMetadata {
840 blobs: vec![BlobMetadata {
841 r#type: "type-a".to_string(),
842 fields: vec![1],
843 snapshot_id: 14,
844 sequence_number: 3,
845 offset: 4,
846 length: 16,
847 compression_codec: CompressionCodec::None,
848 properties: {
849 let mut map = HashMap::new();
850 map.insert("some key".to_string(), "some value".to_string());
851 map
852 },
853 }],
854 properties: HashMap::new(),
855 }
856 )
857 }
858
859 #[tokio::test]
860 async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() {
861 let temp_dir = TempDir::new().unwrap();
862
863 let out_of_i32_range_number: i64 = i32::MAX as i64 + 1;
864
865 let input_file = input_file_with_payload(
866 &temp_dir,
867 &format!(
868 r#"{{
869 "blobs" : [
870 {{
871 "type" : "type-a",
872 "fields" : [ {out_of_i32_range_number} ],
873 "snapshot-id" : 14,
874 "sequence-number" : 3,
875 "offset" : 4,
876 "length" : 16
877 }}
878 ]
879 }}"#
880 ),
881 )
882 .await;
883
884 assert_eq!(
885 FileMetadata::read(&input_file)
886 .await
887 .unwrap_err()
888 .to_string(),
889 format!(
890 "DataInvalid => Given string is not valid JSON, source: invalid value: integer `{out_of_i32_range_number}`, expected i32 at line 5 column 51"
891 ),
892 )
893 }
894
895 #[tokio::test]
896 async fn test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() {
897 let temp_dir = TempDir::new().unwrap();
898
899 let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;
900
901 assert_eq!(
902 FileMetadata::read(&input_file)
903 .await
904 .unwrap_err()
905 .to_string(),
906 "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
907 )
908 }
909
910 #[tokio::test]
911 async fn test_read_file_metadata_of_uncompressed_empty_file() {
912 let input_file = java_empty_uncompressed_input_file();
913
914 let file_metadata = FileMetadata::read(&input_file).await.unwrap();
915 assert_eq!(file_metadata, empty_footer_payload())
916 }
917
918 #[tokio::test]
919 async fn test_read_file_metadata_of_uncompressed_metric_data() {
920 let input_file = java_uncompressed_metric_input_file();
921
922 let file_metadata = FileMetadata::read(&input_file).await.unwrap();
923 assert_eq!(file_metadata, uncompressed_metric_file_metadata())
924 }
925
926 #[tokio::test]
927 async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
928 let input_file = java_zstd_compressed_metric_input_file();
929
930 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
931 .await
932 .unwrap();
933 assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
934 }
935
936 #[tokio::test]
937 async fn test_read_file_metadata_of_empty_file_with_prefetching() {
938 let input_file = java_empty_uncompressed_input_file();
939 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
940 .await
941 .unwrap();
942
943 assert_eq!(file_metadata, empty_footer_payload());
944 }
945
946 #[tokio::test]
947 async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
948 let input_file = java_uncompressed_metric_input_file();
949 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
950 .await
951 .unwrap();
952
953 assert_eq!(file_metadata, uncompressed_metric_file_metadata());
954 }
955
956 #[tokio::test]
957 async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
958 let input_file = java_zstd_compressed_metric_input_file();
959 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
960 .await
961 .unwrap();
962
963 assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
964 }
965
966 #[tokio::test]
967 async fn test_read_with_incorrect_header_magic() {
968 let temp_dir = TempDir::new().unwrap();
969
970 let prefetch_hint: u8 = 64;
971 let mut bytes = vec![];
972 bytes.extend([0x00, 0x00, 0x00, 0x00]);
974 bytes.extend(vec![0u8; prefetch_hint as usize]);
976 bytes.extend(FileMetadata::MAGIC);
978 bytes.extend(empty_footer_payload_bytes());
979 bytes.extend(empty_footer_payload_bytes_length_bytes());
980 bytes.extend(vec![0, 0, 0, 0]); bytes.extend(FileMetadata::MAGIC);
982
983 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
984
985 assert_eq!(
986 FileMetadata::read(&input_file).await.unwrap_err().kind(),
987 ErrorKind::DataInvalid,
988 );
989 assert_eq!(
990 FileMetadata::read_with_prefetch(&input_file, prefetch_hint)
991 .await
992 .unwrap_err()
993 .kind(),
994 ErrorKind::DataInvalid,
995 );
996 }
997
998 #[tokio::test]
999 async fn test_gzip_compression_allowed_in_metadata() {
1000 let temp_dir = TempDir::new().unwrap();
1001
1002 let payload = r#"{
1005 "blobs": [
1006 {
1007 "type": "test-type",
1008 "fields": [1],
1009 "snapshot-id": 1,
1010 "sequence-number": 1,
1011 "offset": 4,
1012 "length": 10,
1013 "compression-codec": "gzip"
1014 }
1015 ]
1016 }"#;
1017
1018 let input_file = input_file_with_payload(&temp_dir, payload).await;
1019
1020 let result = FileMetadata::read(&input_file).await;
1022 assert!(result.is_ok());
1023 let metadata = result.unwrap();
1024 assert_eq!(metadata.blobs.len(), 1);
1025 assert_eq!(
1026 metadata.blobs[0].compression_codec,
1027 CompressionCodec::gzip_default()
1028 );
1029 }
1030}