1use std::io::{Read, Write};
21
22use flate2::Compression;
23use flate2::read::GzDecoder;
24use flate2::write::GzEncoder;
25use serde::{Deserialize, Serialize};
26
27use crate::{Error, ErrorKind, Result};
28
29#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum CompressionCodec {
33 #[default]
34 None,
36 Lz4,
38 Zstd,
40 Gzip,
42}
43
44impl CompressionCodec {
45 pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
46 match self {
47 CompressionCodec::None => Ok(bytes),
48 CompressionCodec::Lz4 => Err(Error::new(
49 ErrorKind::FeatureUnsupported,
50 "LZ4 decompression is not supported currently",
51 )),
52 CompressionCodec::Zstd => Ok(zstd::stream::decode_all(&bytes[..])?),
53 CompressionCodec::Gzip => {
54 let mut decoder = GzDecoder::new(&bytes[..]);
55 let mut decompressed = Vec::new();
56 decoder.read_to_end(&mut decompressed)?;
57 Ok(decompressed)
58 }
59 }
60 }
61
62 pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
63 match self {
64 CompressionCodec::None => Ok(bytes),
65 CompressionCodec::Lz4 => Err(Error::new(
66 ErrorKind::FeatureUnsupported,
67 "LZ4 compression is not supported currently",
68 )),
69 CompressionCodec::Zstd => {
70 let writer = Vec::<u8>::new();
71 let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
72 encoder.include_checksum(true)?;
73 encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
74 std::io::copy(&mut &bytes[..], &mut encoder)?;
75 Ok(encoder.finish()?)
76 }
77 CompressionCodec::Gzip => {
78 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
79 encoder.write_all(&bytes)?;
80 Ok(encoder.finish()?)
81 }
82 }
83 }
84
85 pub(crate) fn is_none(&self) -> bool {
86 matches!(self, CompressionCodec::None)
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::CompressionCodec;
93
94 #[tokio::test]
95 async fn test_compression_codec_none() {
96 let bytes_vec = [0_u8; 100].to_vec();
97
98 let codec = CompressionCodec::None;
99 let compressed = codec.compress(bytes_vec.clone()).unwrap();
100 assert_eq!(bytes_vec, compressed);
101 let decompressed = codec.decompress(compressed).unwrap();
102 assert_eq!(bytes_vec, decompressed);
103 }
104
105 #[tokio::test]
106 async fn test_compression_codec_compress() {
107 let bytes_vec = [0_u8; 100].to_vec();
108
109 let compression_codecs = [CompressionCodec::Zstd, CompressionCodec::Gzip];
110
111 for codec in compression_codecs {
112 let compressed = codec.compress(bytes_vec.clone()).unwrap();
113 assert!(compressed.len() < bytes_vec.len());
114 let decompressed = codec.decompress(compressed).unwrap();
115 assert_eq!(decompressed, bytes_vec);
116 }
117 }
118
119 #[tokio::test]
120 async fn test_compression_codec_unsupported() {
121 let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")];
122 let bytes_vec = [0_u8; 100].to_vec();
123
124 for (codec, name) in unsupported_codecs {
125 assert_eq!(
126 codec.compress(bytes_vec.clone()).unwrap_err().to_string(),
127 format!("FeatureUnsupported => {name} compression is not supported currently"),
128 );
129
130 assert_eq!(
131 codec.decompress(bytes_vec.clone()).unwrap_err().to_string(),
132 format!("FeatureUnsupported => {name} decompression is not supported currently"),
133 );
134 }
135 }
136}