iceberg/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg writer module.
19//!
20//! This module contains the generic writer trait and specific writer implementation. We categorize the writer into two types:
21//! 1. FileWriter: writer for physical file format (Such as parquet, orc).
22//! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file)
23//!    or other function (Such as partition writer, delta writer).
24//!
25//! The IcebergWriter will use the inner FileWriter to write physical files.
26//!
27//! The writer interface is designed to be extensible and flexible. Writers can be independently configured
28//! and composed to support complex write logic. E.g. By combining `FanoutPartitionWriter`, `DataFileWriter`, and `ParquetWriter`,
29//! you can build a writer that automatically partitions the data and writes it in the Parquet format.
30//!
31//! For this purpose, there are four trait corresponding to these writer:
32//! - IcebergWriterBuilder
33//! - IcebergWriter
34//! - FileWriterBuilder
35//! - FileWriter
36//!
37//! Users can create specific writer builders, combine them, and build the final writer.
38//! They can also define custom writers by implementing the `Writer` trait,
39//! allowing seamless integration with existing writers. (See the example below.)
40//!
41//! # Simple example for the data file writer used parquet physical format:
42//! ```rust, no_run
43//! use std::collections::HashMap;
44//! use std::sync::Arc;
45//!
46//! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
47//! use async_trait::async_trait;
48//! use iceberg::io::{FileIO, FileIOBuilder};
49//! use iceberg::spec::DataFile;
50//! use iceberg::transaction::Transaction;
51//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
52//! use iceberg::writer::file_writer::ParquetWriterBuilder;
53//! use iceberg::writer::file_writer::location_generator::{
54//!     DefaultFileNameGenerator, DefaultLocationGenerator,
55//! };
56//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
57//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
58//! use parquet::file::properties::WriterProperties;
59//! #[tokio::main]
60//! async fn main() -> Result<()> {
61//!     // Connect to a catalog.
62//!     use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
63//!     use iceberg::writer::file_writer::rolling_writer::{
64//!         RollingFileWriter, RollingFileWriterBuilder,
65//!     };
66//!     let catalog = MemoryCatalogBuilder::default()
67//!         .load(
68//!             "memory",
69//!             HashMap::from([(
70//!                 MEMORY_CATALOG_WAREHOUSE.to_string(),
71//!                 "file:///path/to/warehouse".to_string(),
72//!             )]),
73//!         )
74//!         .await?;
75//!     // Add customized code to create a table first.
76//!
77//!     // Load table from catalog.
78//!     let table = catalog
79//!         .load_table(&TableIdent::from_strs(["hello", "world"])?)
80//!         .await?;
81//!     let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
82//!     let file_name_generator = DefaultFileNameGenerator::new(
83//!         "test".to_string(),
84//!         None,
85//!         iceberg::spec::DataFileFormat::Parquet,
86//!     );
87//!
88//!     // Create a parquet file writer builder. The parameter can get from table.
89//!     let schema = table.metadata().current_schema().clone();
90//!     let parquet_writer_builder = ParquetWriterBuilder::new(
91//!         WriterProperties::default(),
92//!         schema.clone(),
93//!     );
94//!
95//!     // Create a rolling file writer using parquet file writer builder.
96//!     let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
97//!         parquet_writer_builder,
98//!         schema,
99//!         table.file_io().clone(),
100//!         location_generator.clone(),
101//!         file_name_generator.clone(),
102//!     );
103//!
104//!     // Create a data file writer using parquet file writer builder.
105//!     let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
106//!     // Build the data file writer
107//!     let mut data_file_writer = data_file_writer_builder.build(None).await?;
108//!
109//!     // Write the data using data_file_writer...
110//!
111//!     // Close the write and it will return data files back
112//!     let data_files = data_file_writer.close().await.unwrap();
113//!
114//!     Ok(())
115//! }
116//! ```
117//!
118//! # Custom writer to record latency
119//! ```rust, no_run
120//! use std::collections::HashMap;
121//! use std::time::Instant;
122//!
123//! use arrow_array::RecordBatch;
124//! use iceberg::io::FileIOBuilder;
125//! use iceberg::memory::MemoryCatalogBuilder;
126//! use iceberg::spec::{DataFile, PartitionKey};
127//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
128//! use iceberg::writer::file_writer::ParquetWriterBuilder;
129//! use iceberg::writer::file_writer::location_generator::{
130//!     DefaultFileNameGenerator, DefaultLocationGenerator,
131//! };
132//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
133//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
134//! use parquet::file::properties::WriterProperties;
135//!
136//! #[derive(Clone)]
137//! struct LatencyRecordWriterBuilder<B> {
138//!     inner_writer_builder: B,
139//! }
140//!
141//! impl<B: IcebergWriterBuilder> LatencyRecordWriterBuilder<B> {
142//!     pub fn new(inner_writer_builder: B) -> Self {
143//!         Self {
144//!             inner_writer_builder,
145//!         }
146//!     }
147//! }
148//!
149//! #[async_trait::async_trait]
150//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
151//!     type R = LatencyRecordWriter<B::R>;
152//!
153//!     async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
154//!         Ok(LatencyRecordWriter {
155//!             inner_writer: self.inner_writer_builder.build(partition_key).await?,
156//!         })
157//!     }
158//! }
159//! struct LatencyRecordWriter<W> {
160//!     inner_writer: W,
161//! }
162//!
163//! #[async_trait::async_trait]
164//! impl<W: IcebergWriter> IcebergWriter for LatencyRecordWriter<W> {
165//!     async fn write(&mut self, input: RecordBatch) -> Result<()> {
166//!         let start = Instant::now();
167//!         self.inner_writer.write(input).await?;
168//!         let _latency = start.elapsed();
169//!         // record latency...
170//!         Ok(())
171//!     }
172//!
173//!     async fn close(&mut self) -> Result<Vec<DataFile>> {
174//!         let start = Instant::now();
175//!         let res = self.inner_writer.close().await?;
176//!         let _latency = start.elapsed();
177//!         // record latency...
178//!         Ok(res)
179//!     }
180//! }
181//!
182//! #[tokio::main]
183//! async fn main() -> Result<()> {
184//!     // Connect to a catalog.
185//!     use iceberg::memory::MEMORY_CATALOG_WAREHOUSE;
186//!     use iceberg::spec::{Literal, PartitionKey, Struct};
187//!     use iceberg::writer::file_writer::rolling_writer::{
188//!         RollingFileWriter, RollingFileWriterBuilder,
189//!     };
190//!
191//!     let catalog = MemoryCatalogBuilder::default()
192//!         .load(
193//!             "memory",
194//!             HashMap::from([(
195//!                 MEMORY_CATALOG_WAREHOUSE.to_string(),
196//!                 "file:///path/to/warehouse".to_string(),
197//!             )]),
198//!         )
199//!         .await?;
200//!
201//!     // Add customized code to create a table first.
202//!
203//!     // Load table from catalog.
204//!     let table = catalog
205//!         .load_table(&TableIdent::from_strs(["hello", "world"])?)
206//!         .await?;
207//!     let partition_key = PartitionKey::new(
208//!         table.metadata().default_partition_spec().as_ref().clone(),
209//!         table.metadata().current_schema().clone(),
210//!         Struct::from_iter(vec![Some(Literal::string("Seattle"))]),
211//!     );
212//!     let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
213//!     let file_name_generator = DefaultFileNameGenerator::new(
214//!         "test".to_string(),
215//!         None,
216//!         iceberg::spec::DataFileFormat::Parquet,
217//!     );
218//!
219//!     // Create a parquet file writer builder. The parameter can get from table.
220//!     let schema = table.metadata().current_schema().clone();
221//!     let parquet_writer_builder = ParquetWriterBuilder::new(
222//!         WriterProperties::default(),
223//!         schema.clone(),
224//!     );
225//!
226//!     // Create a rolling file writer
227//!     let rolling_file_writer_builder = RollingFileWriterBuilder::new(
228//!         parquet_writer_builder,
229//!         schema,
230//!         512 * 1024 * 1024,
231//!         table.file_io().clone(),
232//!         location_generator.clone(),
233//!         file_name_generator.clone(),
234//!     );
235//!
236//!     // Create a data file writer builder using rolling file writer.
237//!     let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
238//!     // Create latency record writer using data file writer builder.
239//!     let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
240//!     // Build the final writer
241//!     let mut latency_record_data_file_writer = latency_record_builder
242//!         .build(Some(partition_key))
243//!         .await
244//!         .unwrap();
245//!
246//!     Ok(())
247//! }
248//! ```
249//!
250//! # Adding Partitioning to Data File Writers
251//!
252//! You can wrap a `DataFileWriter` with partitioning writers to handle partitioned tables.
253//! Iceberg provides two partitioning strategies:
254//!
255//! ## FanoutWriter - For Unsorted Data
256//!
257//! Wraps the data file writer to handle unsorted data by maintaining multiple active writers.
258//! Use this when your data is not pre-sorted by partition key. Writes to different partitions
259//! can happen in any order, even interleaved.
260//!
261//! ```rust, no_run
262//! # // Same setup as the simple example above...
263//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
264//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
265//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
266//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
267//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
268//! # use iceberg::writer::file_writer::location_generator::{
269//! #     DefaultFileNameGenerator, DefaultLocationGenerator,
270//! # };
271//! # use parquet::file::properties::WriterProperties;
272//! # use std::collections::HashMap;
273//! # #[tokio::main]
274//! # async fn main() -> Result<()> {
275//! # let catalog = MemoryCatalogBuilder::default()
276//! #     .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
277//! #     .await?;
278//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
279//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
280//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
281//! # let schema = table.metadata().current_schema().clone();
282//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
283//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
284//! #     parquet_writer_builder, schema, table.file_io().clone(), location_generator, file_name_generator);
285//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
286//!
287//! // Wrap the data file writer with FanoutWriter for partitioning
288//! use iceberg::writer::partitioning::fanout_writer::FanoutWriter;
289//! use iceberg::writer::partitioning::PartitioningWriter;
290//! use iceberg::spec::{Literal, PartitionKey, Struct};
291//!
292//! let mut fanout_writer = FanoutWriter::new(data_file_writer_builder);
293//!
294//! // Create partition keys for different regions
295//! let schema = table.metadata().current_schema().clone();
296//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
297//!
298//! let partition_key_us = PartitionKey::new(
299//!     partition_spec.clone(),
300//!     schema.clone(),
301//!     Struct::from_iter([Some(Literal::string("US"))]),
302//! );
303//!
304//! let partition_key_eu = PartitionKey::new(
305//!     partition_spec.clone(),
306//!     schema.clone(),
307//!     Struct::from_iter([Some(Literal::string("EU"))]),
308//! );
309//!
310//! // Write to different partitions in any order - can interleave partition writes
311//! // fanout_writer.write(partition_key_us.clone(), batch_us1).await?;
312//! // fanout_writer.write(partition_key_eu.clone(), batch_eu1).await?;
313//! // fanout_writer.write(partition_key_us.clone(), batch_us2).await?; // Back to US - OK!
314//! // fanout_writer.write(partition_key_eu.clone(), batch_eu2).await?; // Back to EU - OK!
315//!
316//! let data_files = fanout_writer.close().await?;
317//! # Ok(())
318//! # }
319//! ```
320//!
321//! ## ClusteredWriter - For Sorted Data
322//!
323//! Wraps the data file writer for pre-sorted data. More memory efficient as it maintains
324//! only one active writer at a time, but requires input sorted by partition key.
325//!
326//! ```rust, no_run
327//! # // Same setup as the simple example above...
328//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
329//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
330//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
331//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
332//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
333//! # use iceberg::writer::file_writer::location_generator::{
334//! #     DefaultFileNameGenerator, DefaultLocationGenerator,
335//! # };
336//! # use parquet::file::properties::WriterProperties;
337//! # use std::collections::HashMap;
338//! # #[tokio::main]
339//! # async fn main() -> Result<()> {
340//! # let catalog = MemoryCatalogBuilder::default()
341//! #     .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
342//! #     .await?;
343//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
344//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
345//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
346//! # let schema = table.metadata().current_schema().clone();
347//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
348//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
349//! #     parquet_writer_builder, schema, table.file_io().clone(), location_generator, file_name_generator);
350//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
351//!
352//! // Wrap the data file writer with ClusteredWriter for sorted partitioning
353//! use iceberg::writer::partitioning::clustered_writer::ClusteredWriter;
354//! use iceberg::writer::partitioning::PartitioningWriter;
355//! use iceberg::spec::{Literal, PartitionKey, Struct};
356//!
357//! let mut clustered_writer = ClusteredWriter::new(data_file_writer_builder);
358//!
359//! // Create partition keys (must write in sorted order)
360//! let schema = table.metadata().current_schema().clone();
361//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
362//!
363//! let partition_key_asia = PartitionKey::new(
364//!     partition_spec.clone(),
365//!     schema.clone(),
366//!     Struct::from_iter([Some(Literal::string("ASIA"))]),
367//! );
368//!
369//! let partition_key_eu = PartitionKey::new(
370//!     partition_spec.clone(),
371//!     schema.clone(),
372//!     Struct::from_iter([Some(Literal::string("EU"))]),
373//! );
374//!
375//! let partition_key_us = PartitionKey::new(
376//!     partition_spec.clone(),
377//!     schema.clone(),
378//!     Struct::from_iter([Some(Literal::string("US"))]),
379//! );
380//!
381//! // Write to partitions in sorted order (ASIA -> EU -> US)
382//! // clustered_writer.write(partition_key_asia, batch_asia).await?;
383//! // clustered_writer.write(partition_key_eu, batch_eu).await?;
384//! // clustered_writer.write(partition_key_us, batch_us).await?;
385//! // Writing back to ASIA would fail since data must be sorted!
386//!
387//! let data_files = clustered_writer.close().await?;
388//!
389//!     Ok(())
390//! }
391//! ```
392
393pub mod base_writer;
394pub mod combined_writer;
395pub mod file_writer;
396pub mod partitioning;
397
398use arrow_array::RecordBatch;
399
400use crate::Result;
401use crate::spec::{DataFile, PartitionKey, SchemaRef};
402
403type DefaultInput = RecordBatch;
404type DefaultOutput = Vec<DataFile>;
405
406/// The builder for iceberg writer.
407#[async_trait::async_trait]
408pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: Send + Sync + 'static {
409    /// The associated writer type.
410    type R: IcebergWriter<I, O>;
411    /// Build the iceberg writer with an optional partition key.
412    async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
413}
414
415/// The iceberg writer used to write data to iceberg table.
416#[async_trait::async_trait]
417pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
418    /// Write data to iceberg table.
419    async fn write(&mut self, input: I) -> Result<()>;
420    /// Close the writer and return the written data files.
421    /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again.
422    /// # NOTE
423    /// After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic.
424    async fn close(&mut self) -> Result<O>;
425}
426
427/// The current file status of the Iceberg writer.
428/// This is implemented for writers that write a single file at a time.
429pub trait CurrentFileStatus {
430    /// Get the current file path.
431    fn current_file_path(&self) -> String;
432    /// Get the current file row number.
433    fn current_row_num(&self) -> usize;
434    /// Get the current file written size.
435    fn current_written_size(&self) -> usize;
436    /// Get the current schema used by the writer.
437    fn current_schema(&self) -> SchemaRef;
438}
439
440#[cfg(test)]
441mod tests {
442    use arrow_array::RecordBatch;
443    use arrow_schema::Schema;
444    use arrow_select::concat::concat_batches;
445    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
446
447    use super::IcebergWriter;
448    use crate::io::FileIO;
449    use crate::spec::{DataFile, DataFileFormat};
450
451    // This function is used to guarantee the trait can be used as an object safe trait.
452    async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
453        let _ = w
454            .write(RecordBatch::new_empty(Schema::empty().into()))
455            .await;
456        let _ = w.close().await;
457    }
458
459    // This function check:
460    // The data of the written parquet file is correct.
461    // The metadata of the data file is consistent with the written parquet file.
462    pub(crate) async fn check_parquet_data_file(
463        file_io: &FileIO,
464        data_file: &DataFile,
465        batch: &RecordBatch,
466    ) {
467        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
468
469        let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
470        // read the written file
471        let input_content = input_file.read().await.unwrap();
472        let reader_builder =
473            ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
474
475        // check data
476        let reader = reader_builder.build().unwrap();
477        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
478        let res = concat_batches(&batch.schema(), &batches).unwrap();
479        assert_eq!(*batch, res);
480    }
481}