iceberg/
compression.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Compression codec support for data compression and decompression.
19
20use std::io::{Read, Write};
21
22use flate2::Compression;
23use flate2::read::GzDecoder;
24use flate2::write::GzEncoder;
25use serde::{Deserialize, Serialize};
26
27use crate::{Error, ErrorKind, Result};
28
29/// Data compression formats
30#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum CompressionCodec {
33    #[default]
34    /// No compression
35    None,
36    /// LZ4 single compression frame with content size present
37    Lz4,
38    /// Zstandard single compression frame with content size present
39    Zstd,
40    /// Gzip compression
41    Gzip,
42}
43
44impl CompressionCodec {
45    pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
46        match self {
47            CompressionCodec::None => Ok(bytes),
48            CompressionCodec::Lz4 => Err(Error::new(
49                ErrorKind::FeatureUnsupported,
50                "LZ4 decompression is not supported currently",
51            )),
52            CompressionCodec::Zstd => Ok(zstd::stream::decode_all(&bytes[..])?),
53            CompressionCodec::Gzip => {
54                let mut decoder = GzDecoder::new(&bytes[..]);
55                let mut decompressed = Vec::new();
56                decoder.read_to_end(&mut decompressed)?;
57                Ok(decompressed)
58            }
59        }
60    }
61
62    pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
63        match self {
64            CompressionCodec::None => Ok(bytes),
65            CompressionCodec::Lz4 => Err(Error::new(
66                ErrorKind::FeatureUnsupported,
67                "LZ4 compression is not supported currently",
68            )),
69            CompressionCodec::Zstd => {
70                let writer = Vec::<u8>::new();
71                let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
72                encoder.include_checksum(true)?;
73                encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
74                std::io::copy(&mut &bytes[..], &mut encoder)?;
75                Ok(encoder.finish()?)
76            }
77            CompressionCodec::Gzip => {
78                let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
79                encoder.write_all(&bytes)?;
80                Ok(encoder.finish()?)
81            }
82        }
83    }
84
85    pub(crate) fn is_none(&self) -> bool {
86        matches!(self, CompressionCodec::None)
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::CompressionCodec;
93
94    #[tokio::test]
95    async fn test_compression_codec_none() {
96        let bytes_vec = [0_u8; 100].to_vec();
97
98        let codec = CompressionCodec::None;
99        let compressed = codec.compress(bytes_vec.clone()).unwrap();
100        assert_eq!(bytes_vec, compressed);
101        let decompressed = codec.decompress(compressed).unwrap();
102        assert_eq!(bytes_vec, decompressed);
103    }
104
105    #[tokio::test]
106    async fn test_compression_codec_compress() {
107        let bytes_vec = [0_u8; 100].to_vec();
108
109        let compression_codecs = [CompressionCodec::Zstd, CompressionCodec::Gzip];
110
111        for codec in compression_codecs {
112            let compressed = codec.compress(bytes_vec.clone()).unwrap();
113            assert!(compressed.len() < bytes_vec.len());
114            let decompressed = codec.decompress(compressed).unwrap();
115            assert_eq!(decompressed, bytes_vec);
116        }
117    }
118
119    #[tokio::test]
120    async fn test_compression_codec_unsupported() {
121        let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")];
122        let bytes_vec = [0_u8; 100].to_vec();
123
124        for (codec, name) in unsupported_codecs {
125            assert_eq!(
126                codec.compress(bytes_vec.clone()).unwrap_err().to_string(),
127                format!("FeatureUnsupported => {name} compression is not supported currently"),
128            );
129
130            assert_eq!(
131                codec.decompress(bytes_vec.clone()).unwrap_err().to_string(),
132                format!("FeatureUnsupported => {name} decompression is not supported currently"),
133            );
134        }
135    }
136}