1use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21use serde::{Deserialize, Serialize};
22
23use crate::compression::CompressionCodec;
24use crate::io::{FileRead, InputFile};
25use crate::{Error, ErrorKind, Result};
26
27pub const CREATED_BY_PROPERTY: &str = "created-by";
30
31#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34#[serde(rename_all = "kebab-case")]
35pub struct BlobMetadata {
36 pub(crate) r#type: String,
37 pub(crate) fields: Vec<i32>,
38 pub(crate) snapshot_id: i64,
39 pub(crate) sequence_number: i64,
40 pub(crate) offset: u64,
41 pub(crate) length: u64,
42 #[serde(skip_serializing_if = "CompressionCodec::is_none")]
43 #[serde(default)]
44 pub(crate) compression_codec: CompressionCodec,
45 #[serde(skip_serializing_if = "HashMap::is_empty")]
46 #[serde(default)]
47 pub(crate) properties: HashMap<String, String>,
48}
49
50impl BlobMetadata {
51 #[inline]
52 pub fn blob_type(&self) -> &str {
54 &self.r#type
55 }
56
57 #[inline]
58 pub fn fields(&self) -> &[i32] {
60 &self.fields
61 }
62
63 #[inline]
64 pub fn snapshot_id(&self) -> i64 {
66 self.snapshot_id
67 }
68
69 #[inline]
70 pub fn sequence_number(&self) -> i64 {
72 self.sequence_number
73 }
74
75 #[inline]
76 pub fn offset(&self) -> u64 {
78 self.offset
79 }
80
81 #[inline]
82 pub fn length(&self) -> u64 {
84 self.length
85 }
86
87 #[inline]
88 pub fn compression_codec(&self) -> CompressionCodec {
90 self.compression_codec
91 }
92
93 #[inline]
94 pub fn properties(&self) -> &HashMap<String, String> {
96 &self.properties
97 }
98}
99
100#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
101pub(crate) enum Flag {
102 FooterPayloadCompressed = 0,
103}
104
105impl Flag {
106 pub(crate) fn byte_idx(self) -> u8 {
107 (self as u8) / 8
108 }
109
110 pub(crate) fn bit_idx(self) -> u8 {
111 (self as u8) % 8
112 }
113
114 fn matches(self, byte_idx: u8, bit_idx: u8) -> bool {
115 self.byte_idx() == byte_idx && self.bit_idx() == bit_idx
116 }
117
118 fn from(byte_idx: u8, bit_idx: u8) -> Result<Flag> {
119 if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) {
120 Ok(Flag::FooterPayloadCompressed)
121 } else {
122 Err(Error::new(
123 ErrorKind::DataInvalid,
124 format!("Unknown flag byte {byte_idx} and bit {bit_idx} combination"),
125 ))
126 }
127 }
128}
129
130#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
134pub struct FileMetadata {
135 pub(crate) blobs: Vec<BlobMetadata>,
136 #[serde(skip_serializing_if = "HashMap::is_empty")]
137 #[serde(default)]
138 pub(crate) properties: HashMap<String, String>,
139}
140
141impl FileMetadata {
142 pub(crate) const MAGIC_LENGTH: u8 = 4;
143 pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];
144
145 const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
168 const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
169 const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
170 + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
171 pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
172 const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
173 FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
174 pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
175 FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
176
177 pub fn new(blobs: Vec<BlobMetadata>, properties: HashMap<String, String>) -> Self {
179 Self { blobs, properties }
180 }
181
182 fn check_magic(bytes: &[u8]) -> Result<()> {
183 if bytes == FileMetadata::MAGIC {
184 Ok(())
185 } else {
186 Err(Error::new(
187 ErrorKind::DataInvalid,
188 format!(
189 "Bad magic value: {:?} should be {:?}",
190 bytes,
191 FileMetadata::MAGIC
192 ),
193 ))
194 }
195 }
196
197 async fn read_footer_payload_length(
198 file_read: &dyn FileRead,
199 input_file_length: u64,
200 ) -> Result<u32> {
201 let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as u64;
202 let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as u64;
203 let footer_payload_length_bytes = file_read.read(start..end).await?;
204 let mut buf = [0; 4];
205 buf.copy_from_slice(&footer_payload_length_bytes);
206 let footer_payload_length = u32::from_le_bytes(buf);
207 Ok(footer_payload_length)
208 }
209
210 async fn read_footer_bytes(
211 file_read: &dyn FileRead,
212 input_file_length: u64,
213 footer_payload_length: u32,
214 ) -> Result<Bytes> {
215 let footer_length = footer_payload_length as u64
216 + FileMetadata::FOOTER_STRUCT_LENGTH as u64
217 + FileMetadata::MAGIC_LENGTH as u64;
218 let start = input_file_length - footer_length;
219 let end = input_file_length;
220 file_read.read(start..end).await
221 }
222
223 fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
224 let mut flags = HashSet::new();
225
226 for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
227 let byte_offset = footer_bytes.len()
228 - FileMetadata::MAGIC_LENGTH as usize
229 - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize
230 + byte_idx as usize;
231
232 let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
233 Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.")
234 })?;
235
236 for bit_idx in 0..8 {
237 if ((flag_byte >> bit_idx) & 1) != 0 {
238 let flag = Flag::from(byte_idx, bit_idx)?;
239 flags.insert(flag);
240 }
241 }
242 }
243
244 Ok(flags)
245 }
246
247 fn extract_footer_payload_as_str(
248 footer_bytes: &[u8],
249 footer_payload_length: u32,
250 ) -> Result<String> {
251 let flags = FileMetadata::decode_flags(footer_bytes)?;
252 let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) {
253 CompressionCodec::Lz4
254 } else {
255 CompressionCodec::None
256 };
257
258 let start_offset = FileMetadata::MAGIC_LENGTH as usize;
259 let end_offset =
260 FileMetadata::MAGIC_LENGTH as usize + usize::try_from(footer_payload_length)?;
261 let footer_payload_bytes = footer_bytes
262 .get(start_offset..end_offset)
263 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is out of bounds."))?;
264 let decompressed_footer_payload_bytes =
265 footer_compression_codec.decompress(footer_payload_bytes.into())?;
266
267 String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
268 Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 string")
269 .with_source(src)
270 })
271 }
272
273 fn from_json_str(string: &str) -> Result<FileMetadata> {
274 serde_json::from_str::<FileMetadata>(string).map_err(|src| {
275 Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON").with_source(src)
276 })
277 }
278
279 pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
281 let file_read = input_file.reader().await?;
282
283 let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
284 FileMetadata::check_magic(&first_four_bytes)?;
285
286 let input_file_length = input_file.metadata().await?.size;
287 let footer_payload_length =
288 FileMetadata::read_footer_payload_length(file_read.as_ref(), input_file_length).await?;
289 let footer_bytes = FileMetadata::read_footer_bytes(
290 file_read.as_ref(),
291 input_file_length,
292 footer_payload_length,
293 )
294 .await?;
295
296 let magic_length = FileMetadata::MAGIC_LENGTH as usize;
297 FileMetadata::check_magic(&footer_bytes[..magic_length])?;
299 FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
301
302 let footer_payload_str =
303 FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
304
305 FileMetadata::from_json_str(&footer_payload_str)
306 }
307
308 #[allow(dead_code)]
314 pub(crate) async fn read_with_prefetch(
315 input_file: &InputFile,
316 prefetch_hint: u8,
317 ) -> Result<FileMetadata> {
318 if prefetch_hint > 16 {
319 let input_file_length = input_file.metadata().await?.size;
320 let file_read = input_file.reader().await?;
321
322 if prefetch_hint as u64 > input_file_length {
324 return FileMetadata::read(input_file).await;
325 }
326
327 let start = input_file_length - prefetch_hint as u64;
329 let end = input_file_length;
330 let footer_bytes = file_read.read(start..end).await?;
331
332 let payload_length_start =
333 footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
334 let payload_length_end =
335 payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
336 let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];
337
338 let mut buf = [0; 4];
339 buf.copy_from_slice(payload_length_bytes);
340 let footer_payload_length = u32::from_le_bytes(buf);
341
342 let footer_length = (footer_payload_length as usize)
346 + FileMetadata::FOOTER_STRUCT_LENGTH as usize
347 + FileMetadata::MAGIC_LENGTH as usize;
348 if footer_length > prefetch_hint as usize {
349 return FileMetadata::read(input_file).await;
350 }
351
352 let footer_start = footer_bytes.len() - footer_length;
354 let footer_end = footer_bytes.len();
355 let footer_bytes = &footer_bytes[footer_start..footer_end];
356
357 let magic_length = FileMetadata::MAGIC_LENGTH as usize;
358 FileMetadata::check_magic(&footer_bytes[..magic_length])?;
360 FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
362
363 let footer_payload_str =
364 FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
365 return FileMetadata::from_json_str(&footer_payload_str);
366 }
367
368 FileMetadata::read(input_file).await
369 }
370
371 #[inline]
372 pub fn blobs(&self) -> &[BlobMetadata] {
374 &self.blobs
375 }
376
377 #[inline]
378 pub fn properties(&self) -> &HashMap<String, String> {
380 &self.properties
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use std::collections::HashMap;
387
388 use bytes::Bytes;
389 use tempfile::TempDir;
390
391 use crate::io::{FileIOBuilder, InputFile};
392 use crate::puffin::metadata::{BlobMetadata, CompressionCodec, FileMetadata};
393 use crate::puffin::test_utils::{
394 empty_footer_payload, empty_footer_payload_bytes, empty_footer_payload_bytes_length_bytes,
395 java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
396 java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
397 zstd_compressed_metric_file_metadata,
398 };
399
400 const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0];
401
402 async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> InputFile {
403 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
404
405 let path_buf = temp_dir.path().join("abc.puffin");
406 let temp_path = path_buf.to_str().unwrap();
407 let output_file = file_io.new_output(temp_path).unwrap();
408
409 output_file
410 .write(Bytes::copy_from_slice(slice))
411 .await
412 .unwrap();
413
414 output_file.to_input_file()
415 }
416
417 async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> InputFile {
418 let payload_bytes = payload_str.as_bytes();
419
420 let mut bytes = vec![];
421 bytes.extend(FileMetadata::MAGIC.to_vec());
422 bytes.extend(FileMetadata::MAGIC.to_vec());
423 bytes.extend(payload_bytes);
424 bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32));
425 bytes.extend(vec![0, 0, 0, 0]);
426 bytes.extend(FileMetadata::MAGIC);
427
428 input_file_with_bytes(temp_dir, &bytes).await
429 }
430
431 #[tokio::test]
432 async fn test_file_starting_with_invalid_magic_returns_error() {
433 let temp_dir = TempDir::new().unwrap();
434
435 let mut bytes = vec![];
436 bytes.extend(INVALID_MAGIC_VALUE.to_vec());
437 bytes.extend(FileMetadata::MAGIC.to_vec());
438 bytes.extend(empty_footer_payload_bytes());
439 bytes.extend(empty_footer_payload_bytes_length_bytes());
440 bytes.extend(vec![0, 0, 0, 0]);
441 bytes.extend(FileMetadata::MAGIC);
442
443 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
444
445 assert_eq!(
446 FileMetadata::read(&input_file)
447 .await
448 .unwrap_err()
449 .to_string(),
450 "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
451 )
452 }
453
454 #[tokio::test]
455 async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() {
456 let temp_dir = TempDir::new().unwrap();
457
458 let mut bytes = vec![];
459 bytes.extend(FileMetadata::MAGIC.to_vec());
460 bytes.extend(INVALID_MAGIC_VALUE.to_vec());
461 bytes.extend(empty_footer_payload_bytes());
462 bytes.extend(empty_footer_payload_bytes_length_bytes());
463 bytes.extend(vec![0, 0, 0, 0]);
464 bytes.extend(FileMetadata::MAGIC);
465
466 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
467
468 assert_eq!(
469 FileMetadata::read(&input_file)
470 .await
471 .unwrap_err()
472 .to_string(),
473 "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
474 )
475 }
476
477 #[tokio::test]
478 async fn test_file_ending_with_invalid_magic_returns_error() {
479 let temp_dir = TempDir::new().unwrap();
480
481 let mut bytes = vec![];
482 bytes.extend(FileMetadata::MAGIC.to_vec());
483 bytes.extend(FileMetadata::MAGIC.to_vec());
484 bytes.extend(empty_footer_payload_bytes());
485 bytes.extend(empty_footer_payload_bytes_length_bytes());
486 bytes.extend(vec![0, 0, 0, 0]);
487 bytes.extend(INVALID_MAGIC_VALUE);
488
489 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
490
491 assert_eq!(
492 FileMetadata::read(&input_file)
493 .await
494 .unwrap_err()
495 .to_string(),
496 "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
497 )
498 }
499
500 #[tokio::test]
501 async fn test_encoded_payload_length_larger_than_actual_payload_length_returns_error() {
502 let temp_dir = TempDir::new().unwrap();
503
504 let mut bytes = vec![];
505 bytes.extend(FileMetadata::MAGIC.to_vec());
506 bytes.extend(FileMetadata::MAGIC.to_vec());
507 bytes.extend(empty_footer_payload_bytes());
508 bytes.extend(u32::to_le_bytes(
509 empty_footer_payload_bytes().len() as u32 + 1,
510 ));
511 bytes.extend(vec![0, 0, 0, 0]);
512 bytes.extend(FileMetadata::MAGIC.to_vec());
513
514 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
515
516 assert_eq!(
517 FileMetadata::read(&input_file)
518 .await
519 .unwrap_err()
520 .to_string(),
521 "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 70, 65, 49]",
522 )
523 }
524
525 #[tokio::test]
526 async fn test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() {
527 let temp_dir = TempDir::new().unwrap();
528
529 let mut bytes = vec![];
530 bytes.extend(FileMetadata::MAGIC.to_vec());
531 bytes.extend(FileMetadata::MAGIC.to_vec());
532 bytes.extend(empty_footer_payload_bytes());
533 bytes.extend(u32::to_le_bytes(
534 empty_footer_payload_bytes().len() as u32 - 1,
535 ));
536 bytes.extend(vec![0, 0, 0, 0]);
537 bytes.extend(FileMetadata::MAGIC.to_vec());
538
539 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
540
541 assert_eq!(
542 FileMetadata::read(&input_file)
543 .await
544 .unwrap_err()
545 .to_string(),
546 "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 70, 65, 49]",
547 )
548 }
549
550 #[tokio::test]
551 async fn test_lz4_compressed_footer_returns_error() {
552 let temp_dir = TempDir::new().unwrap();
553
554 let mut bytes = vec![];
555 bytes.extend(FileMetadata::MAGIC.to_vec());
556 bytes.extend(FileMetadata::MAGIC.to_vec());
557 bytes.extend(empty_footer_payload_bytes());
558 bytes.extend(empty_footer_payload_bytes_length_bytes());
559 bytes.extend(vec![0b00000001, 0, 0, 0]);
560 bytes.extend(FileMetadata::MAGIC.to_vec());
561
562 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
563
564 assert_eq!(
565 FileMetadata::read(&input_file)
566 .await
567 .unwrap_err()
568 .to_string(),
569 "FeatureUnsupported => LZ4 decompression is not supported currently",
570 )
571 }
572
573 #[tokio::test]
574 async fn test_unknown_byte_bit_combination_returns_error() {
575 let temp_dir = TempDir::new().unwrap();
576
577 let mut bytes = vec![];
578 bytes.extend(FileMetadata::MAGIC.to_vec());
579 bytes.extend(FileMetadata::MAGIC.to_vec());
580 bytes.extend(empty_footer_payload_bytes());
581 bytes.extend(empty_footer_payload_bytes_length_bytes());
582 bytes.extend(vec![0b00000010, 0, 0, 0]);
583 bytes.extend(FileMetadata::MAGIC.to_vec());
584
585 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
586
587 assert_eq!(
588 FileMetadata::read(&input_file)
589 .await
590 .unwrap_err()
591 .to_string(),
592 "DataInvalid => Unknown flag byte 0 and bit 1 combination",
593 )
594 }
595
596 #[tokio::test]
597 async fn test_non_utf8_string_payload_returns_error() {
598 let temp_dir = TempDir::new().unwrap();
599
600 let payload_bytes: [u8; 4] = [0, 159, 146, 150];
601 let payload_bytes_length_bytes: [u8; 4] = u32::to_le_bytes(payload_bytes.len() as u32);
602
603 let mut bytes = vec![];
604 bytes.extend(FileMetadata::MAGIC.to_vec());
605 bytes.extend(FileMetadata::MAGIC.to_vec());
606 bytes.extend(payload_bytes);
607 bytes.extend(payload_bytes_length_bytes);
608 bytes.extend(vec![0, 0, 0, 0]);
609 bytes.extend(FileMetadata::MAGIC.to_vec());
610
611 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
612
613 assert_eq!(
614 FileMetadata::read(&input_file)
615 .await
616 .unwrap_err()
617 .to_string(),
618 "DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1",
619 )
620 }
621
622 #[tokio::test]
623 async fn test_minimal_valid_file_returns_file_metadata() {
624 let temp_dir = TempDir::new().unwrap();
625
626 let mut bytes = vec![];
627 bytes.extend(FileMetadata::MAGIC.to_vec());
628 bytes.extend(FileMetadata::MAGIC.to_vec());
629 bytes.extend(empty_footer_payload_bytes());
630 bytes.extend(empty_footer_payload_bytes_length_bytes());
631 bytes.extend(vec![0, 0, 0, 0]);
632 bytes.extend(FileMetadata::MAGIC);
633
634 let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
635
636 assert_eq!(
637 FileMetadata::read(&input_file).await.unwrap(),
638 FileMetadata {
639 blobs: vec![],
640 properties: HashMap::new(),
641 }
642 )
643 }
644
645 #[tokio::test]
646 async fn test_returns_file_metadata_property() {
647 let temp_dir = TempDir::new().unwrap();
648
649 let input_file = input_file_with_payload(
650 &temp_dir,
651 r#"{
652 "blobs" : [ ],
653 "properties" : {
654 "a property" : "a property value"
655 }
656 }"#,
657 )
658 .await;
659
660 assert_eq!(
661 FileMetadata::read(&input_file).await.unwrap(),
662 FileMetadata {
663 blobs: vec![],
664 properties: {
665 let mut map = HashMap::new();
666 map.insert("a property".to_string(), "a property value".to_string());
667 map
668 },
669 }
670 )
671 }
672
673 #[tokio::test]
674 async fn test_returns_file_metadata_properties() {
675 let temp_dir = TempDir::new().unwrap();
676
677 let input_file = input_file_with_payload(
678 &temp_dir,
679 r#"{
680 "blobs" : [ ],
681 "properties" : {
682 "a property" : "a property value",
683 "another one": "also with value"
684 }
685 }"#,
686 )
687 .await;
688
689 assert_eq!(
690 FileMetadata::read(&input_file).await.unwrap(),
691 FileMetadata {
692 blobs: vec![],
693 properties: {
694 let mut map = HashMap::new();
695 map.insert("a property".to_string(), "a property value".to_string());
696 map.insert("another one".to_string(), "also with value".to_string());
697 map
698 },
699 }
700 )
701 }
702
703 #[tokio::test]
704 async fn test_returns_error_if_blobs_field_is_missing() {
705 let temp_dir = TempDir::new().unwrap();
706
707 let input_file = input_file_with_payload(
708 &temp_dir,
709 r#"{
710 "properties" : {}
711 }"#,
712 )
713 .await;
714
715 assert_eq!(
716 FileMetadata::read(&input_file)
717 .await
718 .unwrap_err()
719 .to_string(),
720 format!(
721 "DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13"
722 ),
723 )
724 }
725
726 #[tokio::test]
727 async fn test_returns_error_if_blobs_field_is_bad() {
728 let temp_dir = TempDir::new().unwrap();
729
730 let input_file = input_file_with_payload(
731 &temp_dir,
732 r#"{
733 "blobs" : {}
734 }"#,
735 )
736 .await;
737
738 assert_eq!(
739 FileMetadata::read(&input_file)
740 .await
741 .unwrap_err()
742 .to_string(),
743 format!(
744 "DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"
745 ),
746 )
747 }
748
749 #[tokio::test]
750 async fn test_returns_blobs_metadatas() {
751 let temp_dir = TempDir::new().unwrap();
752
753 let input_file = input_file_with_payload(
754 &temp_dir,
755 r#"{
756 "blobs" : [
757 {
758 "type" : "type-a",
759 "fields" : [ 1 ],
760 "snapshot-id" : 14,
761 "sequence-number" : 3,
762 "offset" : 4,
763 "length" : 16
764 },
765 {
766 "type" : "type-bbb",
767 "fields" : [ 2, 3, 4 ],
768 "snapshot-id" : 77,
769 "sequence-number" : 4,
770 "offset" : 21474836470000,
771 "length" : 79834
772 }
773 ]
774 }"#,
775 )
776 .await;
777
778 assert_eq!(
779 FileMetadata::read(&input_file).await.unwrap(),
780 FileMetadata {
781 blobs: vec![
782 BlobMetadata {
783 r#type: "type-a".to_string(),
784 fields: vec![1],
785 snapshot_id: 14,
786 sequence_number: 3,
787 offset: 4,
788 length: 16,
789 compression_codec: CompressionCodec::None,
790 properties: HashMap::new(),
791 },
792 BlobMetadata {
793 r#type: "type-bbb".to_string(),
794 fields: vec![2, 3, 4],
795 snapshot_id: 77,
796 sequence_number: 4,
797 offset: 21474836470000,
798 length: 79834,
799 compression_codec: CompressionCodec::None,
800 properties: HashMap::new(),
801 },
802 ],
803 properties: HashMap::new(),
804 }
805 )
806 }
807
808 #[tokio::test]
809 async fn test_returns_properties_in_blob_metadata() {
810 let temp_dir = TempDir::new().unwrap();
811
812 let input_file = input_file_with_payload(
813 &temp_dir,
814 r#"{
815 "blobs" : [
816 {
817 "type" : "type-a",
818 "fields" : [ 1 ],
819 "snapshot-id" : 14,
820 "sequence-number" : 3,
821 "offset" : 4,
822 "length" : 16,
823 "properties" : {
824 "some key" : "some value"
825 }
826 }
827 ]
828 }"#,
829 )
830 .await;
831
832 assert_eq!(
833 FileMetadata::read(&input_file).await.unwrap(),
834 FileMetadata {
835 blobs: vec![BlobMetadata {
836 r#type: "type-a".to_string(),
837 fields: vec![1],
838 snapshot_id: 14,
839 sequence_number: 3,
840 offset: 4,
841 length: 16,
842 compression_codec: CompressionCodec::None,
843 properties: {
844 let mut map = HashMap::new();
845 map.insert("some key".to_string(), "some value".to_string());
846 map
847 },
848 }],
849 properties: HashMap::new(),
850 }
851 )
852 }
853
854 #[tokio::test]
855 async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() {
856 let temp_dir = TempDir::new().unwrap();
857
858 let out_of_i32_range_number: i64 = i32::MAX as i64 + 1;
859
860 let input_file = input_file_with_payload(
861 &temp_dir,
862 &format!(
863 r#"{{
864 "blobs" : [
865 {{
866 "type" : "type-a",
867 "fields" : [ {out_of_i32_range_number} ],
868 "snapshot-id" : 14,
869 "sequence-number" : 3,
870 "offset" : 4,
871 "length" : 16
872 }}
873 ]
874 }}"#
875 ),
876 )
877 .await;
878
879 assert_eq!(
880 FileMetadata::read(&input_file)
881 .await
882 .unwrap_err()
883 .to_string(),
884 format!(
885 "DataInvalid => Given string is not valid JSON, source: invalid value: integer `{out_of_i32_range_number}`, expected i32 at line 5 column 51"
886 ),
887 )
888 }
889
890 #[tokio::test]
891 async fn test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() {
892 let temp_dir = TempDir::new().unwrap();
893
894 let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;
895
896 assert_eq!(
897 FileMetadata::read(&input_file)
898 .await
899 .unwrap_err()
900 .to_string(),
901 "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
902 )
903 }
904
905 #[tokio::test]
906 async fn test_read_file_metadata_of_uncompressed_empty_file() {
907 let input_file = java_empty_uncompressed_input_file();
908
909 let file_metadata = FileMetadata::read(&input_file).await.unwrap();
910 assert_eq!(file_metadata, empty_footer_payload())
911 }
912
913 #[tokio::test]
914 async fn test_read_file_metadata_of_uncompressed_metric_data() {
915 let input_file = java_uncompressed_metric_input_file();
916
917 let file_metadata = FileMetadata::read(&input_file).await.unwrap();
918 assert_eq!(file_metadata, uncompressed_metric_file_metadata())
919 }
920
921 #[tokio::test]
922 async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
923 let input_file = java_zstd_compressed_metric_input_file();
924
925 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
926 .await
927 .unwrap();
928 assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
929 }
930
931 #[tokio::test]
932 async fn test_read_file_metadata_of_empty_file_with_prefetching() {
933 let input_file = java_empty_uncompressed_input_file();
934 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
935 .await
936 .unwrap();
937
938 assert_eq!(file_metadata, empty_footer_payload());
939 }
940
941 #[tokio::test]
942 async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
943 let input_file = java_uncompressed_metric_input_file();
944 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
945 .await
946 .unwrap();
947
948 assert_eq!(file_metadata, uncompressed_metric_file_metadata());
949 }
950
951 #[tokio::test]
952 async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
953 let input_file = java_zstd_compressed_metric_input_file();
954 let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
955 .await
956 .unwrap();
957
958 assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
959 }
960
961 #[tokio::test]
962 async fn test_gzip_compression_allowed_in_metadata() {
963 let temp_dir = TempDir::new().unwrap();
964
965 let payload = r#"{
968 "blobs": [
969 {
970 "type": "test-type",
971 "fields": [1],
972 "snapshot-id": 1,
973 "sequence-number": 1,
974 "offset": 4,
975 "length": 10,
976 "compression-codec": "gzip"
977 }
978 ]
979 }"#;
980
981 let input_file = input_file_with_payload(&temp_dir, payload).await;
982
983 let result = FileMetadata::read(&input_file).await;
985 assert!(result.is_ok());
986 let metadata = result.unwrap();
987 assert_eq!(metadata.blobs.len(), 1);
988 assert_eq!(metadata.blobs[0].compression_codec, CompressionCodec::Gzip);
989 }
990}