iceberg/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg writer module.
19//!
20//! This module contains the generic writer trait and specific writer implementation. We categorize the writer into two types:
21//! 1. FileWriter: writer for physical file format (Such as parquet, orc).
22//! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file)
23//!    or other function (Such as partition writer, delta writer).
24//!
25//! The IcebergWriter will use the inner FileWriter to write physical files.
26//!
27//! The writer interface is designed to be extensible and flexible. Writers can be independently configured
28//! and composed to support complex write logic. E.g. By combining `FanoutPartitionWriter`, `DataFileWriter`, and `ParquetWriter`,
29//! you can build a writer that automatically partitions the data and writes it in the Parquet format.
30//!
31//! For this purpose, there are four trait corresponding to these writer:
32//! - IcebergWriterBuilder
33//! - IcebergWriter
34//! - FileWriterBuilder
35//! - FileWriter
36//!
37//! Users can create specific writer builders, combine them, and build the final writer.
38//! They can also define custom writers by implementing the `Writer` trait,
39//! allowing seamless integration with existing writers. (See the example below.)
40//!
41//! # Simple example for the data file writer used parquet physical format:
42//! ```rust, no_run
43//! use std::collections::HashMap;
44//! use std::sync::Arc;
45//!
46//! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
47//! use async_trait::async_trait;
48//! use iceberg::io::{FileIO, FileIOBuilder};
49//! use iceberg::spec::DataFile;
50//! use iceberg::transaction::Transaction;
51//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
52//! use iceberg::writer::file_writer::ParquetWriterBuilder;
53//! use iceberg::writer::file_writer::location_generator::{
54//!     DefaultFileNameGenerator, DefaultLocationGenerator,
55//! };
56//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
57//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
58//! use parquet::file::properties::WriterProperties;
59//! #[tokio::main]
60//! async fn main() -> Result<()> {
61//!     // Connect to a catalog.
62//!     use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
63//!     let catalog = MemoryCatalogBuilder::default()
64//!         .load(
65//!             "memory",
66//!             HashMap::from([(
67//!                 MEMORY_CATALOG_WAREHOUSE.to_string(),
68//!                 "file:///path/to/warehouse".to_string(),
69//!             )]),
70//!         )
71//!         .await?;
72//!     // Add customized code to create a table first.
73//!
74//!     // Load table from catalog.
75//!     let table = catalog
76//!         .load_table(&TableIdent::from_strs(["hello", "world"])?)
77//!         .await?;
78//!     let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
79//!     let file_name_generator = DefaultFileNameGenerator::new(
80//!         "test".to_string(),
81//!         None,
82//!         iceberg::spec::DataFileFormat::Parquet,
83//!     );
84//!
85//!     // Create a parquet file writer builder. The parameter can get from table.
86//!     let parquet_writer_builder = ParquetWriterBuilder::new(
87//!         WriterProperties::default(),
88//!         table.metadata().current_schema().clone(),
89//!         None,
90//!         table.file_io().clone(),
91//!         location_generator.clone(),
92//!         file_name_generator.clone(),
93//!     );
94//!     // Create a data file writer using parquet file writer builder.
95//!     let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
96//!     // Build the data file writer
97//!     let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
98//!
99//!     // Write the data using data_file_writer...
100//!
101//!     // Close the write and it will return data files back
102//!     let data_files = data_file_writer.close().await.unwrap();
103//!
104//!     Ok(())
105//! }
106//! ```
107//!
108//! # Custom writer to record latency
109//! ```rust, no_run
110//! use std::collections::HashMap;
111//! use std::time::Instant;
112//!
113//! use arrow_array::RecordBatch;
114//! use iceberg::io::FileIOBuilder;
115//! use iceberg::memory::MemoryCatalogBuilder;
116//! use iceberg::spec::DataFile;
117//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
118//! use iceberg::writer::file_writer::ParquetWriterBuilder;
119//! use iceberg::writer::file_writer::location_generator::{
120//!     DefaultFileNameGenerator, DefaultLocationGenerator,
121//! };
122//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
123//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
124//! use parquet::file::properties::WriterProperties;
125//!
126//! #[derive(Clone)]
127//! struct LatencyRecordWriterBuilder<B> {
128//!     inner_writer_builder: B,
129//! }
130//!
131//! impl<B: IcebergWriterBuilder> LatencyRecordWriterBuilder<B> {
132//!     pub fn new(inner_writer_builder: B) -> Self {
133//!         Self {
134//!             inner_writer_builder,
135//!         }
136//!     }
137//! }
138//!
139//! #[async_trait::async_trait]
140//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
141//!     type R = LatencyRecordWriter<B::R>;
142//!
143//!     async fn build(self) -> Result<Self::R> {
144//!         Ok(LatencyRecordWriter {
145//!             inner_writer: self.inner_writer_builder.build().await?,
146//!         })
147//!     }
148//! }
149//! struct LatencyRecordWriter<W> {
150//!     inner_writer: W,
151//! }
152//!
153//! #[async_trait::async_trait]
154//! impl<W: IcebergWriter> IcebergWriter for LatencyRecordWriter<W> {
155//!     async fn write(&mut self, input: RecordBatch) -> Result<()> {
156//!         let start = Instant::now();
157//!         self.inner_writer.write(input).await?;
158//!         let _latency = start.elapsed();
159//!         // record latency...
160//!         Ok(())
161//!     }
162//!
163//!     async fn close(&mut self) -> Result<Vec<DataFile>> {
164//!         let start = Instant::now();
165//!         let res = self.inner_writer.close().await?;
166//!         let _latency = start.elapsed();
167//!         // record latency...
168//!         Ok(res)
169//!     }
170//! }
171//!
172//! #[tokio::main]
173//! async fn main() -> Result<()> {
174//!     // Connect to a catalog.
175//!     use iceberg::memory::MEMORY_CATALOG_WAREHOUSE;
176//!     use iceberg::spec::{Literal, PartitionKey, Struct};
177//!
178//!     let catalog = MemoryCatalogBuilder::default()
179//!         .load(
180//!             "memory",
181//!             HashMap::from([(
182//!                 MEMORY_CATALOG_WAREHOUSE.to_string(),
183//!                 "file:///path/to/warehouse".to_string(),
184//!             )]),
185//!         )
186//!         .await?;
187//!
188//!     // Add customized code to create a table first.
189//!
190//!     // Load table from catalog.
191//!     let table = catalog
192//!         .load_table(&TableIdent::from_strs(["hello", "world"])?)
193//!         .await?;
194//!     let partition_key = PartitionKey::new(
195//!         table.metadata().default_partition_spec().as_ref().clone(),
196//!         table.metadata().current_schema().clone(),
197//!         Struct::from_iter(vec![Some(Literal::string("Seattle"))]),
198//!     );
199//!     let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
200//!     let file_name_generator = DefaultFileNameGenerator::new(
201//!         "test".to_string(),
202//!         None,
203//!         iceberg::spec::DataFileFormat::Parquet,
204//!     );
205//!
206//!     // Create a parquet file writer builder. The parameter can get from table.
207//!     let parquet_writer_builder = ParquetWriterBuilder::new(
208//!         WriterProperties::default(),
209//!         table.metadata().current_schema().clone(),
210//!         Some(partition_key),
211//!         table.file_io().clone(),
212//!         location_generator.clone(),
213//!         file_name_generator.clone(),
214//!     );
215//!     // Create a data file writer builder using parquet file writer builder.
216//!     let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
217//!     // Create latency record writer using data file writer builder.
218//!     let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
219//!     // Build the final writer
220//!     let mut latency_record_data_file_writer = latency_record_builder.build().await.unwrap();
221//!
222//!     Ok(())
223//! }
224//! ```
225
226pub mod base_writer;
227pub mod combined_writer;
228pub mod file_writer;
229
230use arrow_array::RecordBatch;
231
232use crate::Result;
233use crate::spec::{DataFile, SchemaRef};
234
235type DefaultInput = RecordBatch;
236type DefaultOutput = Vec<DataFile>;
237
238/// The builder for iceberg writer.
239#[async_trait::async_trait]
240pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
241    Send + Clone + 'static
242{
243    /// The associated writer type.
244    type R: IcebergWriter<I, O>;
245    /// Build the iceberg writer.
246    async fn build(self) -> Result<Self::R>;
247}
248
249/// The iceberg writer used to write data to iceberg table.
250#[async_trait::async_trait]
251pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
252    /// Write data to iceberg table.
253    async fn write(&mut self, input: I) -> Result<()>;
254    /// Close the writer and return the written data files.
255    /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again.
256    /// # NOTE
257    /// After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic.
258    async fn close(&mut self) -> Result<O>;
259}
260
261/// The current file status of the Iceberg writer.
262/// This is implemented for writers that write a single file at a time.
263pub trait CurrentFileStatus {
264    /// Get the current file path.
265    fn current_file_path(&self) -> String;
266    /// Get the current file row number.
267    fn current_row_num(&self) -> usize;
268    /// Get the current file written size.
269    fn current_written_size(&self) -> usize;
270    /// Get the current schema used by the writer.
271    fn current_schema(&self) -> SchemaRef;
272}
273
274#[cfg(test)]
275mod tests {
276    use arrow_array::RecordBatch;
277    use arrow_schema::Schema;
278    use arrow_select::concat::concat_batches;
279    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
280
281    use super::IcebergWriter;
282    use crate::io::FileIO;
283    use crate::spec::{DataFile, DataFileFormat};
284
285    // This function is used to guarantee the trait can be used as an object safe trait.
286    async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
287        let _ = w
288            .write(RecordBatch::new_empty(Schema::empty().into()))
289            .await;
290        let _ = w.close().await;
291    }
292
293    // This function check:
294    // The data of the written parquet file is correct.
295    // The metadata of the data file is consistent with the written parquet file.
296    pub(crate) async fn check_parquet_data_file(
297        file_io: &FileIO,
298        data_file: &DataFile,
299        batch: &RecordBatch,
300    ) {
301        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
302
303        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
304        // read the written file
305        let input_content = input_file.read().await.unwrap();
306        let reader_builder =
307            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
308
309        // check data
310        let reader = reader_builder.build().unwrap();
311        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
312        let res = concat_batches(&batch.schema(), &batches).unwrap();
313        assert_eq!(*batch, res);
314    }
315}