iceberg/writer/partitioning/
fanout_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides the `FanoutWriter` implementation.
19
20use std::collections::HashMap;
21use std::marker::PhantomData;
22
23use async_trait::async_trait;
24
25use crate::spec::{PartitionKey, Struct};
26use crate::writer::partitioning::PartitioningWriter;
27use crate::writer::{DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder};
28use crate::{Error, ErrorKind, Result};
29
30/// A writer that can write data to multiple partitions simultaneously.
31///
32/// Unlike `ClusteredWriter` which expects sorted input and maintains only one active writer,
33/// `FanoutWriter` can handle unsorted data by maintaining multiple active writers in a map.
34/// This allows writing to any partition at any time, but uses more memory as all writers
35/// remain active until the writer is closed.
36///
37/// # Type Parameters
38///
39/// * `B` - The inner writer builder type
40/// * `I` - Input type (defaults to `RecordBatch`)
41/// * `O` - Output collection type (defaults to `Vec<DataFile>`)
42pub struct FanoutWriter<B, I = DefaultInput, O = DefaultOutput>
43where
44    B: IcebergWriterBuilder<I, O>,
45    O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
46    <O as IntoIterator>::Item: Clone,
47{
48    inner_builder: B,
49    partition_writers: HashMap<Struct, B::R>,
50    output: Vec<<O as IntoIterator>::Item>,
51    _phantom: PhantomData<I>,
52}
53
54impl<B, I, O> FanoutWriter<B, I, O>
55where
56    B: IcebergWriterBuilder<I, O>,
57    I: Send + 'static,
58    O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
59    <O as IntoIterator>::Item: Send + Clone,
60{
61    /// Create a new `FanoutWriter`.
62    pub fn new(inner_builder: B) -> Self {
63        Self {
64            inner_builder,
65            partition_writers: HashMap::new(),
66            output: Vec::new(),
67            _phantom: PhantomData,
68        }
69    }
70
71    /// Get or create a writer for the specified partition.
72    async fn get_or_create_writer(&mut self, partition_key: &PartitionKey) -> Result<&mut B::R> {
73        if !self.partition_writers.contains_key(partition_key.data()) {
74            let writer = self
75                .inner_builder
76                .build(Some(partition_key.clone()))
77                .await?;
78            self.partition_writers
79                .insert(partition_key.data().clone(), writer);
80        }
81
82        self.partition_writers
83            .get_mut(partition_key.data())
84            .ok_or_else(|| {
85                Error::new(
86                    ErrorKind::Unexpected,
87                    "Failed to get partition writer after creation",
88                )
89            })
90    }
91}
92
93#[async_trait]
94impl<B, I, O> PartitioningWriter<I, O> for FanoutWriter<B, I, O>
95where
96    B: IcebergWriterBuilder<I, O>,
97    I: Send + 'static,
98    O: IntoIterator + FromIterator<<O as IntoIterator>::Item> + Send + 'static,
99    <O as IntoIterator>::Item: Send + Clone,
100{
101    async fn write(&mut self, partition_key: PartitionKey, input: I) -> Result<()> {
102        let writer = self.get_or_create_writer(&partition_key).await?;
103        writer.write(input).await
104    }
105
106    async fn close(mut self) -> Result<O> {
107        // Close all partition writers
108        for (_, mut writer) in self.partition_writers {
109            self.output.extend(writer.close().await?);
110        }
111
112        // Collect all output items into the output collection type
113        Ok(O::from_iter(self.output))
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use std::collections::HashMap;
120    use std::sync::Arc;
121
122    use arrow_array::{Int32Array, RecordBatch, StringArray};
123    use arrow_schema::{DataType, Field, Schema};
124    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
125    use parquet::file::properties::WriterProperties;
126    use tempfile::TempDir;
127
128    use super::*;
129    use crate::io::FileIOBuilder;
130    use crate::spec::{
131        DataFileFormat, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Struct,
132        Type,
133    };
134    use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
135    use crate::writer::file_writer::ParquetWriterBuilder;
136    use crate::writer::file_writer::location_generator::{
137        DefaultFileNameGenerator, DefaultLocationGenerator,
138    };
139    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
140
141    #[tokio::test]
142    async fn test_fanout_writer_single_partition() -> Result<()> {
143        let temp_dir = TempDir::new()?;
144        let file_io = FileIOBuilder::new_fs_io().build()?;
145        let location_gen = DefaultLocationGenerator::with_data_location(
146            temp_dir.path().to_str().unwrap().to_string(),
147        );
148        let file_name_gen =
149            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
150
151        // Create schema with partition field
152        let schema = Arc::new(
153            crate::spec::Schema::builder()
154                .with_schema_id(1)
155                .with_fields(vec![
156                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
157                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
158                    NestedField::required(3, "region", Type::Primitive(PrimitiveType::String))
159                        .into(),
160                ])
161                .build()?,
162        );
163
164        // Create partition spec - using the same pattern as data_file_writer tests
165        let partition_spec = PartitionSpec::builder(schema.clone()).build()?;
166        let partition_value = Struct::from_iter([Some(Literal::string("US"))]);
167        let partition_key =
168            PartitionKey::new(partition_spec, schema.clone(), partition_value.clone());
169
170        // Create writer builder
171        let parquet_writer_builder =
172            ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
173
174        // Create rolling file writer builder
175        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
176            parquet_writer_builder,
177            file_io.clone(),
178            location_gen,
179            file_name_gen,
180        );
181
182        // Create data file writer builder
183        let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
184
185        // Create fanout writer
186        let mut writer = FanoutWriter::new(data_file_writer_builder);
187
188        // Create test data with proper field ID metadata
189        let arrow_schema = Schema::new(vec![
190            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
191                PARQUET_FIELD_ID_META_KEY.to_string(),
192                1.to_string(),
193            )])),
194            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
195                PARQUET_FIELD_ID_META_KEY.to_string(),
196                2.to_string(),
197            )])),
198            Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([(
199                PARQUET_FIELD_ID_META_KEY.to_string(),
200                3.to_string(),
201            )])),
202        ]);
203
204        let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
205            Arc::new(Int32Array::from(vec![1, 2])),
206            Arc::new(StringArray::from(vec!["Alice", "Bob"])),
207            Arc::new(StringArray::from(vec!["US", "US"])),
208        ])?;
209
210        let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
211            Arc::new(Int32Array::from(vec![3, 4])),
212            Arc::new(StringArray::from(vec!["Charlie", "Dave"])),
213            Arc::new(StringArray::from(vec!["US", "US"])),
214        ])?;
215
216        // Write data to the same partition
217        writer.write(partition_key.clone(), batch1).await?;
218        writer.write(partition_key.clone(), batch2).await?;
219
220        // Close writer and get data files
221        let data_files = writer.close().await?;
222
223        // Verify at least one file was created
224        assert!(
225            !data_files.is_empty(),
226            "Expected at least one data file to be created"
227        );
228
229        // Verify that all data files have the correct partition value
230        for data_file in &data_files {
231            assert_eq!(data_file.partition, partition_value);
232        }
233
234        Ok(())
235    }
236
237    #[tokio::test]
238    async fn test_fanout_writer_multiple_partitions() -> Result<()> {
239        let temp_dir = TempDir::new()?;
240        let file_io = FileIOBuilder::new_fs_io().build()?;
241        let location_gen = DefaultLocationGenerator::with_data_location(
242            temp_dir.path().to_str().unwrap().to_string(),
243        );
244        let file_name_gen =
245            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
246
247        // Create schema with partition field
248        let schema = Arc::new(
249            crate::spec::Schema::builder()
250                .with_schema_id(1)
251                .with_fields(vec![
252                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
253                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
254                    NestedField::required(3, "region", Type::Primitive(PrimitiveType::String))
255                        .into(),
256                ])
257                .build()?,
258        );
259
260        // Create partition spec
261        let partition_spec = PartitionSpec::builder(schema.clone()).build()?;
262
263        // Create partition keys for different regions
264        let partition_value_us = Struct::from_iter([Some(Literal::string("US"))]);
265        let partition_key_us = PartitionKey::new(
266            partition_spec.clone(),
267            schema.clone(),
268            partition_value_us.clone(),
269        );
270
271        let partition_value_eu = Struct::from_iter([Some(Literal::string("EU"))]);
272        let partition_key_eu = PartitionKey::new(
273            partition_spec.clone(),
274            schema.clone(),
275            partition_value_eu.clone(),
276        );
277
278        let partition_value_asia = Struct::from_iter([Some(Literal::string("ASIA"))]);
279        let partition_key_asia = PartitionKey::new(
280            partition_spec.clone(),
281            schema.clone(),
282            partition_value_asia.clone(),
283        );
284
285        // Create writer builder
286        let parquet_writer_builder =
287            ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
288
289        // Create rolling file writer builder
290        let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
291            parquet_writer_builder,
292            file_io.clone(),
293            location_gen,
294            file_name_gen,
295        );
296
297        // Create data file writer builder
298        let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
299
300        // Create fanout writer
301        let mut writer = FanoutWriter::new(data_file_writer_builder);
302
303        // Create test data with proper field ID metadata
304        let arrow_schema = Schema::new(vec![
305            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
306                PARQUET_FIELD_ID_META_KEY.to_string(),
307                1.to_string(),
308            )])),
309            Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
310                PARQUET_FIELD_ID_META_KEY.to_string(),
311                2.to_string(),
312            )])),
313            Field::new("region", DataType::Utf8, false).with_metadata(HashMap::from([(
314                PARQUET_FIELD_ID_META_KEY.to_string(),
315                3.to_string(),
316            )])),
317        ]);
318
319        // Create batches for different partitions
320        let batch_us1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
321            Arc::new(Int32Array::from(vec![1, 2])),
322            Arc::new(StringArray::from(vec!["Alice", "Bob"])),
323            Arc::new(StringArray::from(vec!["US", "US"])),
324        ])?;
325
326        let batch_eu1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
327            Arc::new(Int32Array::from(vec![3, 4])),
328            Arc::new(StringArray::from(vec!["Charlie", "Dave"])),
329            Arc::new(StringArray::from(vec!["EU", "EU"])),
330        ])?;
331
332        let batch_us2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
333            Arc::new(Int32Array::from(vec![5])),
334            Arc::new(StringArray::from(vec!["Eve"])),
335            Arc::new(StringArray::from(vec!["US"])),
336        ])?;
337
338        let batch_asia1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
339            Arc::new(Int32Array::from(vec![6, 7])),
340            Arc::new(StringArray::from(vec!["Frank", "Grace"])),
341            Arc::new(StringArray::from(vec!["ASIA", "ASIA"])),
342        ])?;
343
344        // Write data in mixed partition order to demonstrate fanout capability
345        // This is the key difference from ClusteredWriter - we can write to any partition at any time
346        writer.write(partition_key_us.clone(), batch_us1).await?;
347        writer.write(partition_key_eu.clone(), batch_eu1).await?;
348        writer.write(partition_key_us.clone(), batch_us2).await?; // Back to US partition
349        writer
350            .write(partition_key_asia.clone(), batch_asia1)
351            .await?;
352
353        // Close writer and get data files
354        let data_files = writer.close().await?;
355
356        // Verify files were created for all partitions
357        assert!(
358            data_files.len() >= 3,
359            "Expected at least 3 data files (one per partition), got {}",
360            data_files.len()
361        );
362
363        // Verify that we have files for each partition
364        let mut partitions_found = std::collections::HashSet::new();
365        for data_file in &data_files {
366            partitions_found.insert(data_file.partition.clone());
367        }
368
369        assert!(
370            partitions_found.contains(&partition_value_us),
371            "Missing US partition"
372        );
373        assert!(
374            partitions_found.contains(&partition_value_eu),
375            "Missing EU partition"
376        );
377        assert!(
378            partitions_found.contains(&partition_value_asia),
379            "Missing ASIA partition"
380        );
381
382        Ok(())
383    }
384}