1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Iceberg writer module.
//!
//! This module contains the generic writer trait and specific writer implementation. We categorize the writer into two types:
//! 1. FileWriter: writer for physical file format (Such as parquet, orc).
//! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file)
//!    or other function (Such as partition writer, delta writer).
//!
//! The IcebergWriter will use FileWriter to write underly physical file.
//!
//! We hope the writer interface can be extensible and flexible. Each writer can be create config independently
//! and combined together to build a writer which have complex write logic. E.g. combine `FanoutPartitionWriter`, `DataFileWriter`, `ParquetWriter` to get
//! a writer can split the data automatelly according to partition and write down as parquet physical format.
//!
//! For this purpose, there are four trait corresponding to these writer:
//! - IcebergWriterBuilder
//! - IcebergWriter
//! - FileWriterBuilder
//! - FileWriter
//!
//! User can create specific writer builder, combine them and build the writer finally. Also user can custom
//! own writer and implement writer trait for them so that the custom writer can integrate with existing writer. (See following example)
//!
//! # Simple example for the data file writer used parquet physical format:
//! ```rust, no_run
//! use std::sync::Arc;
//!
//! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
//! use async_trait::async_trait;
//! use iceberg::io::{FileIO, FileIOBuilder};
//! use iceberg::spec::DataFile;
//! use iceberg::transaction::Transaction;
//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! use iceberg::writer::file_writer::location_generator::{
//!     DefaultFileNameGenerator, DefaultLocationGenerator,
//! };
//! use iceberg::writer::file_writer::ParquetWriterBuilder;
//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
//! use iceberg::{Catalog, Result, TableIdent};
//! use iceberg_catalog_memory::MemoryCatalog;
//! use parquet::file::properties::WriterProperties;
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!     // Build your file IO.
//!     let file_io = FileIOBuilder::new("memory").build()?;
//!     // Connect to a catalog.
//!     let catalog = MemoryCatalog::new(file_io, None);
//!     // Load table from catalog.
//!     let table = catalog
//!         .load_table(&TableIdent::from_strs(["hello", "world"])?)
//!         .await?;
//!     let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//!     let file_name_generator = DefaultFileNameGenerator::new(
//!         "test".to_string(),
//!         None,
//!         iceberg::spec::DataFileFormat::Parquet,
//!     );
//!
//!     // Create a parquet file writer builder. The parameter can get from table.
//!     let parquet_writer_builder = ParquetWriterBuilder::new(
//!         WriterProperties::default(),
//!         table.metadata().current_schema().clone(),
//!         table.file_io().clone(),
//!         location_generator.clone(),
//!         file_name_generator.clone(),
//!     );
//!     // Create a data file writer using parquet file writer builder.
//!     let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
//!     // Build the data file writer
//!     let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
//!
//!     // Write the data using data_file_writer...
//!
//!     // Close the write and it will return data files back
//!     let data_files = data_file_writer.close().await.unwrap();
//!
//!     Ok(())
//! }
//! ```
//!
//! # Custom writer to record latency
//! ```rust, no_run
//! use std::time::Instant;
//!
//! use arrow_array::RecordBatch;
//! use iceberg::io::FileIOBuilder;
//! use iceberg::spec::DataFile;
//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! use iceberg::writer::file_writer::location_generator::{
//!     DefaultFileNameGenerator, DefaultLocationGenerator,
//! };
//! use iceberg::writer::file_writer::ParquetWriterBuilder;
//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
//! use iceberg::{Catalog, Result, TableIdent};
//! use iceberg_catalog_memory::MemoryCatalog;
//! use parquet::file::properties::WriterProperties;
//!
//! #[derive(Clone)]
//! struct LatencyRecordWriterBuilder<B> {
//!     inner_writer_builder: B,
//! }
//!
//! impl<B: IcebergWriterBuilder> LatencyRecordWriterBuilder<B> {
//!     pub fn new(inner_writer_builder: B) -> Self {
//!         Self {
//!             inner_writer_builder,
//!         }
//!     }
//! }
//!
//! #[async_trait::async_trait]
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
//!     type R = LatencyRecordWriter<B::R>;
//!
//!     async fn build(self) -> Result<Self::R> {
//!         Ok(LatencyRecordWriter {
//!             inner_writer: self.inner_writer_builder.build().await?,
//!         })
//!     }
//! }
//! struct LatencyRecordWriter<W> {
//!     inner_writer: W,
//! }
//!
//! #[async_trait::async_trait]
//! impl<W: IcebergWriter> IcebergWriter for LatencyRecordWriter<W> {
//!     async fn write(&mut self, input: RecordBatch) -> Result<()> {
//!         let start = Instant::now();
//!         self.inner_writer.write(input).await?;
//!         let _latency = start.elapsed();
//!         // record latency...
//!         Ok(())
//!     }
//!
//!     async fn close(&mut self) -> Result<Vec<DataFile>> {
//!         let start = Instant::now();
//!         let res = self.inner_writer.close().await?;
//!         let _latency = start.elapsed();
//!         // record latency...
//!         Ok(res)
//!     }
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!     // Build your file IO.
//!     let file_io = FileIOBuilder::new("memory").build()?;
//!     // Connect to a catalog.
//!     let catalog = MemoryCatalog::new(file_io, None);
//!     // Load table from catalog.
//!     let table = catalog
//!         .load_table(&TableIdent::from_strs(["hello", "world"])?)
//!         .await?;
//!     let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//!     let file_name_generator = DefaultFileNameGenerator::new(
//!         "test".to_string(),
//!         None,
//!         iceberg::spec::DataFileFormat::Parquet,
//!     );
//!
//!     // Create a parquet file writer builder. The parameter can get from table.
//!     let parquet_writer_builder = ParquetWriterBuilder::new(
//!         WriterProperties::default(),
//!         table.metadata().current_schema().clone(),
//!         table.file_io().clone(),
//!         location_generator.clone(),
//!         file_name_generator.clone(),
//!     );
//!     // Create a data file writer builder using parquet file writer builder.
//!     let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
//!     // Create latency record writer using data file writer builder.
//!     let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
//!     // Build the final writer
//!     let mut latency_record_data_file_writer = latency_record_builder.build().await.unwrap();
//!
//!     Ok(())
//! }
//! ```

pub mod base_writer;
pub mod file_writer;

use arrow_array::RecordBatch;

use crate::spec::DataFile;
use crate::Result;

type DefaultInput = RecordBatch;
type DefaultOutput = Vec<DataFile>;

/// The builder for iceberg writer.
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
    Send + Clone + 'static
{
    /// The associated writer type.
    type R: IcebergWriter<I, O>;
    /// Build the iceberg writer.
    async fn build(self) -> Result<Self::R>;
}

/// The iceberg writer used to write data to iceberg table.
#[async_trait::async_trait]
pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
    /// Write data to iceberg table.
    async fn write(&mut self, input: I) -> Result<()>;
    /// Close the writer and return the written data files.
    /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again.
    /// # NOTE
    /// After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic.
    async fn close(&mut self) -> Result<O>;
}

/// The current file status of iceberg writer. It implement for the writer which write a single
/// file.
pub trait CurrentFileStatus {
    /// Get the current file path.
    fn current_file_path(&self) -> String;
    /// Get the current file row number.
    fn current_row_num(&self) -> usize;
    /// Get the current file written size.
    fn current_written_size(&self) -> usize;
}

#[cfg(test)]
mod tests {
    use arrow_array::RecordBatch;
    use arrow_schema::Schema;
    use arrow_select::concat::concat_batches;
    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

    use super::IcebergWriter;
    use crate::io::FileIO;
    use crate::spec::{DataFile, DataFileFormat};

    // This function is used to guarantee the trait can be used as a object safe trait.
    async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
        let _ = w
            .write(RecordBatch::new_empty(Schema::empty().into()))
            .await;
        let _ = w.close().await;
    }

    // This function check:
    // The data of the written parquet file is correct.
    // The metadata of the data file is consistent with the written parquet file.
    pub(crate) async fn check_parquet_data_file(
        file_io: &FileIO,
        data_file: &DataFile,
        batch: &RecordBatch,
    ) {
        assert_eq!(data_file.file_format, DataFileFormat::Parquet);

        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
        // read the written file
        let input_content = input_file.read().await.unwrap();
        let reader_builder =
            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();

        // check data
        let reader = reader_builder.build().unwrap();
        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
        let res = concat_batches(&batch.schema(), &batches).unwrap();
        assert_eq!(*batch, res);
    }
}