iceberg/writer/partitioning/
unpartitioned_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides the `UnpartitionedWriter` implementation.
19
20use std::marker::PhantomData;
21
22use crate::Result;
23use crate::writer::{DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder};
24
25/// A simple wrapper around `IcebergWriterBuilder` for unpartitioned tables.
26///
27/// This writer lazily creates the underlying writer on the first write operation
28/// and writes all data to a single file (or set of files if rolling).
29///
30/// # Type Parameters
31///
32/// * `B` - The inner writer builder type
33/// * `I` - Input type (defaults to `RecordBatch`)
34/// * `O` - Output collection type (defaults to `Vec<DataFile>`)
35pub struct UnpartitionedWriter<B, I = DefaultInput, O = DefaultOutput>
36where
37    B: IcebergWriterBuilder<I, O>,
38    O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
39    <O as IntoIterator>::Item: Clone,
40{
41    inner_builder: B,
42    writer: Option<B::R>,
43    output: Vec<<O as IntoIterator>::Item>,
44    _phantom: PhantomData<I>,
45}
46
47impl<B, I, O> UnpartitionedWriter<B, I, O>
48where
49    B: IcebergWriterBuilder<I, O>,
50    I: Send + 'static,
51    O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
52    <O as IntoIterator>::Item: Send + Clone,
53{
54    /// Create a new `UnpartitionedWriter`.
55    pub fn new(inner_builder: B) -> Self {
56        Self {
57            inner_builder,
58            writer: None,
59            output: Vec::new(),
60            _phantom: PhantomData,
61        }
62    }
63
64    /// Write data to the writer.
65    ///
66    /// The underlying writer is lazily created on the first write operation.
67    ///
68    /// # Parameters
69    ///
70    /// * `input` - The input data to write
71    ///
72    /// # Returns
73    ///
74    /// `Ok(())` on success, or an error if the write operation fails.
75    pub async fn write(&mut self, input: I) -> Result<()> {
76        // Lazily create writer on first write
77        if self.writer.is_none() {
78            self.writer = Some(self.inner_builder.build(None).await?);
79        }
80
81        // Write directly to inner writer
82        self.writer
83            .as_mut()
84            .expect("Writer should be initialized")
85            .write(input)
86            .await
87    }
88
89    /// Close the writer and return all written data files.
90    ///
91    /// This method consumes the writer to prevent further use.
92    ///
93    /// # Returns
94    ///
95    /// The accumulated output from all write operations, or an empty collection
96    /// if no data was written.
97    pub async fn close(mut self) -> Result<O> {
98        if let Some(mut writer) = self.writer.take() {
99            self.output.extend(writer.close().await?);
100        }
101        Ok(O::from_iter(self.output))
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::collections::HashMap;
108    use std::sync::Arc;
109
110    use arrow_array::{Int32Array, RecordBatch, StringArray};
111    use arrow_schema::{DataType, Field, Schema};
112    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
113    use parquet::file::properties::WriterProperties;
114    use tempfile::TempDir;
115
116    use super::*;
117    use crate::Result;
118    use crate::io::FileIOBuilder;
119    use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Struct, Type};
120    use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
121    use crate::writer::file_writer::ParquetWriterBuilder;
122    use crate::writer::file_writer::location_generator::{
123        DefaultFileNameGenerator, DefaultLocationGenerator,
124    };
125    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
126
127    #[tokio::test]
128    async fn test_unpartitioned_writer() -> Result<()> {
129        let temp_dir = TempDir::new()?;
130
131        // Build Iceberg schema
132        let schema = Arc::new(
133            crate::spec::Schema::builder()
134                .with_schema_id(1)
135                .with_fields(vec![
136                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
137                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
138                ])
139                .build()?,
140        );
141
142        // Build Arrow schema
143        let arrow_schema = Arc::new(Schema::new(vec![
144            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
145                PARQUET_FIELD_ID_META_KEY.to_string(),
146                "1".to_string(),
147            )])),
148            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
149                PARQUET_FIELD_ID_META_KEY.to_string(),
150                "2".to_string(),
151            )])),
152        ]));
153
154        // Build writer
155        let file_io = FileIOBuilder::new_fs_io().build()?;
156        let location_gen = DefaultLocationGenerator::with_data_location(
157            temp_dir.path().to_str().unwrap().to_string(),
158        );
159        let file_name_gen =
160            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
161        let parquet_writer_builder =
162            ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
163        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
164            parquet_writer_builder,
165            file_io,
166            location_gen,
167            file_name_gen,
168        );
169        let writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
170
171        let mut writer = UnpartitionedWriter::new(writer_builder);
172
173        // Write two batches
174        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![
175            Arc::new(Int32Array::from(vec![1, 2])),
176            Arc::new(StringArray::from(vec!["Alice", "Bob"])),
177        ])?;
178        let batch2 = RecordBatch::try_new(arrow_schema, vec![
179            Arc::new(Int32Array::from(vec![3, 4])),
180            Arc::new(StringArray::from(vec!["Charlie", "Dave"])),
181        ])?;
182
183        writer.write(batch1).await?;
184        writer.write(batch2).await?;
185
186        let data_files = writer.close().await?;
187
188        // Verify files have empty partition and correct format
189        assert!(!data_files.is_empty());
190        for file in &data_files {
191            assert_eq!(file.partition, Struct::empty());
192            assert_eq!(file.file_format, DataFileFormat::Parquet);
193            assert_eq!(file.record_count, 4);
194        }
195
196        Ok(())
197    }
198}