iceberg/writer/partitioning/
unpartitioned_writer.rs1use std::marker::PhantomData;
21
22use crate::Result;
23use crate::writer::{DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder};
24
25pub struct UnpartitionedWriter<B, I = DefaultInput, O = DefaultOutput>
36where
37 B: IcebergWriterBuilder<I, O>,
38 O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
39 <O as IntoIterator>::Item: Clone,
40{
41 inner_builder: B,
42 writer: Option<B::R>,
43 output: Vec<<O as IntoIterator>::Item>,
44 _phantom: PhantomData<I>,
45}
46
47impl<B, I, O> UnpartitionedWriter<B, I, O>
48where
49 B: IcebergWriterBuilder<I, O>,
50 I: Send + 'static,
51 O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
52 <O as IntoIterator>::Item: Send + Clone,
53{
54 pub fn new(inner_builder: B) -> Self {
56 Self {
57 inner_builder,
58 writer: None,
59 output: Vec::new(),
60 _phantom: PhantomData,
61 }
62 }
63
64 pub async fn write(&mut self, input: I) -> Result<()> {
76 if self.writer.is_none() {
78 self.writer = Some(self.inner_builder.build(None).await?);
79 }
80
81 self.writer
83 .as_mut()
84 .expect("Writer should be initialized")
85 .write(input)
86 .await
87 }
88
89 pub async fn close(mut self) -> Result<O> {
98 if let Some(mut writer) = self.writer.take() {
99 self.output.extend(writer.close().await?);
100 }
101 Ok(O::from_iter(self.output))
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::collections::HashMap;
108 use std::sync::Arc;
109
110 use arrow_array::{Int32Array, RecordBatch, StringArray};
111 use arrow_schema::{DataType, Field, Schema};
112 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
113 use parquet::file::properties::WriterProperties;
114 use tempfile::TempDir;
115
116 use super::*;
117 use crate::Result;
118 use crate::io::FileIOBuilder;
119 use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Struct, Type};
120 use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
121 use crate::writer::file_writer::ParquetWriterBuilder;
122 use crate::writer::file_writer::location_generator::{
123 DefaultFileNameGenerator, DefaultLocationGenerator,
124 };
125 use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
126
127 #[tokio::test]
128 async fn test_unpartitioned_writer() -> Result<()> {
129 let temp_dir = TempDir::new()?;
130
131 let schema = Arc::new(
133 crate::spec::Schema::builder()
134 .with_schema_id(1)
135 .with_fields(vec![
136 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
137 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
138 ])
139 .build()?,
140 );
141
142 let arrow_schema = Arc::new(Schema::new(vec![
144 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
145 PARQUET_FIELD_ID_META_KEY.to_string(),
146 "1".to_string(),
147 )])),
148 Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
149 PARQUET_FIELD_ID_META_KEY.to_string(),
150 "2".to_string(),
151 )])),
152 ]));
153
154 let file_io = FileIOBuilder::new_fs_io().build()?;
156 let location_gen = DefaultLocationGenerator::with_data_location(
157 temp_dir.path().to_str().unwrap().to_string(),
158 );
159 let file_name_gen =
160 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
161 let parquet_writer_builder =
162 ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
163 let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
164 parquet_writer_builder,
165 file_io,
166 location_gen,
167 file_name_gen,
168 );
169 let writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
170
171 let mut writer = UnpartitionedWriter::new(writer_builder);
172
173 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![
175 Arc::new(Int32Array::from(vec![1, 2])),
176 Arc::new(StringArray::from(vec!["Alice", "Bob"])),
177 ])?;
178 let batch2 = RecordBatch::try_new(arrow_schema, vec![
179 Arc::new(Int32Array::from(vec![3, 4])),
180 Arc::new(StringArray::from(vec!["Charlie", "Dave"])),
181 ])?;
182
183 writer.write(batch1).await?;
184 writer.write(batch2).await?;
185
186 let data_files = writer.close().await?;
187
188 assert!(!data_files.is_empty());
190 for file in &data_files {
191 assert_eq!(file.partition, Struct::empty());
192 assert_eq!(file.file_format, DataFileFormat::Parquet);
193 assert_eq!(file.record_count, 4);
194 }
195
196 Ok(())
197 }
198}