iceberg/puffin/
compression.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use serde::{Deserialize, Serialize};
19
20use crate::{Error, ErrorKind, Result};
21
22/// Data compression formats
23#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
24#[serde(rename_all = "lowercase")]
25pub enum CompressionCodec {
26    #[default]
27    /// No compression
28    None,
29    /// LZ4 single compression frame with content size present
30    Lz4,
31    /// Zstandard single compression frame with content size present
32    Zstd,
33}
34
35impl CompressionCodec {
36    pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
37        match self {
38            CompressionCodec::None => Ok(bytes),
39            CompressionCodec::Lz4 => Err(Error::new(
40                ErrorKind::FeatureUnsupported,
41                "LZ4 decompression is not supported currently",
42            )),
43            CompressionCodec::Zstd => {
44                let decompressed = zstd::stream::decode_all(&bytes[..])?;
45                Ok(decompressed)
46            }
47        }
48    }
49
50    pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
51        match self {
52            CompressionCodec::None => Ok(bytes),
53            CompressionCodec::Lz4 => Err(Error::new(
54                ErrorKind::FeatureUnsupported,
55                "LZ4 compression is not supported currently",
56            )),
57            CompressionCodec::Zstd => {
58                let writer = Vec::<u8>::new();
59                let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
60                encoder.include_checksum(true)?;
61                encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
62                std::io::copy(&mut &bytes[..], &mut encoder)?;
63                let compressed = encoder.finish()?;
64                Ok(compressed)
65            }
66        }
67    }
68
69    pub(crate) fn is_none(&self) -> bool {
70        matches!(self, CompressionCodec::None)
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use crate::puffin::compression::CompressionCodec;
77
78    #[tokio::test]
79    async fn test_compression_codec_none() {
80        let compression_codec = CompressionCodec::None;
81        let bytes_vec = [0_u8; 100].to_vec();
82
83        let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
84        assert_eq!(bytes_vec, compressed);
85
86        let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
87        assert_eq!(compressed, decompressed)
88    }
89
90    #[tokio::test]
91    async fn test_compression_codec_lz4() {
92        let compression_codec = CompressionCodec::Lz4;
93        let bytes_vec = [0_u8; 100].to_vec();
94
95        assert_eq!(
96            compression_codec
97                .compress(bytes_vec.clone())
98                .unwrap_err()
99                .to_string(),
100            "FeatureUnsupported => LZ4 compression is not supported currently",
101        );
102
103        assert_eq!(
104            compression_codec
105                .decompress(bytes_vec.clone())
106                .unwrap_err()
107                .to_string(),
108            "FeatureUnsupported => LZ4 decompression is not supported currently",
109        )
110    }
111
112    #[tokio::test]
113    async fn test_compression_codec_zstd() {
114        let compression_codec = CompressionCodec::Zstd;
115        let bytes_vec = [0_u8; 100].to_vec();
116
117        let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
118        assert!(compressed.len() < bytes_vec.len());
119
120        let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
121        assert_eq!(decompressed, bytes_vec)
122    }
123}