iceberg/writer/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg writer module.
19//!
20//! This module contains the generic writer trait and specific writer implementation. We categorize the writer into two types:
21//! 1. FileWriter: writer for physical file format (Such as parquet, orc).
22//! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file)
23//! or other function (Such as partition writer, delta writer).
24//!
25//! The IcebergWriter will use the inner FileWriter to write physical files.
26//!
27//! The writer interface is designed to be extensible and flexible. Writers can be independently configured
28//! and composed to support complex write logic. E.g. By combining `FanoutPartitionWriter`, `DataFileWriter`, and `ParquetWriter`,
29//! you can build a writer that automatically partitions the data and writes it in the Parquet format.
30//!
31//! For this purpose, there are four trait corresponding to these writer:
32//! - IcebergWriterBuilder
33//! - IcebergWriter
34//! - FileWriterBuilder
35//! - FileWriter
36//!
37//! Users can create specific writer builders, combine them, and build the final writer.
38//! They can also define custom writers by implementing the `Writer` trait,
39//! allowing seamless integration with existing writers. (See the example below.)
40//!
41//! # Simple example for the data file writer used parquet physical format:
42//! ```rust, no_run
43//! use std::collections::HashMap;
44//! use std::sync::Arc;
45//!
46//! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
47//! use async_trait::async_trait;
48//! use iceberg::io::{FileIO, FileIOBuilder};
49//! use iceberg::spec::DataFile;
50//! use iceberg::transaction::Transaction;
51//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
52//! use iceberg::writer::file_writer::ParquetWriterBuilder;
53//! use iceberg::writer::file_writer::location_generator::{
54//! DefaultFileNameGenerator, DefaultLocationGenerator,
55//! };
56//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
57//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
58//! use parquet::file::properties::WriterProperties;
59//! #[tokio::main]
60//! async fn main() -> Result<()> {
61//! // Connect to a catalog.
62//! use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
63//! use iceberg::writer::file_writer::rolling_writer::{
64//! RollingFileWriter, RollingFileWriterBuilder,
65//! };
66//! let catalog = MemoryCatalogBuilder::default()
67//! .load(
68//! "memory",
69//! HashMap::from([(
70//! MEMORY_CATALOG_WAREHOUSE.to_string(),
71//! "file:///path/to/warehouse".to_string(),
72//! )]),
73//! )
74//! .await?;
75//! // Add customized code to create a table first.
76//!
77//! // Load table from catalog.
78//! let table = catalog
79//! .load_table(&TableIdent::from_strs(["hello", "world"])?)
80//! .await?;
81//! let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
82//! let file_name_generator = DefaultFileNameGenerator::new(
83//! "test".to_string(),
84//! None,
85//! iceberg::spec::DataFileFormat::Parquet,
86//! );
87//!
88//! // Create a parquet file writer builder. The parameter can get from table.
89//! let schema = table.metadata().current_schema().clone();
90//! let parquet_writer_builder =
91//! ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
92//!
93//! // Create a rolling file writer using parquet file writer builder.
94//! let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
95//! parquet_writer_builder,
96//! schema,
97//! table.file_io().clone(),
98//! location_generator.clone(),
99//! file_name_generator.clone(),
100//! );
101//!
102//! // Create a data file writer using parquet file writer builder.
103//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
104//! // Build the data file writer
105//! let mut data_file_writer = data_file_writer_builder.build(None).await?;
106//!
107//! // Write the data using data_file_writer...
108//!
109//! // Close the write and it will return data files back
110//! let data_files = data_file_writer.close().await.unwrap();
111//!
112//! Ok(())
113//! }
114//! ```
115//!
116//! # Custom writer to record latency
117//! ```rust, no_run
118//! use std::collections::HashMap;
119//! use std::time::Instant;
120//!
121//! use arrow_array::RecordBatch;
122//! use iceberg::io::FileIOBuilder;
123//! use iceberg::memory::MemoryCatalogBuilder;
124//! use iceberg::spec::{DataFile, PartitionKey};
125//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
126//! use iceberg::writer::file_writer::ParquetWriterBuilder;
127//! use iceberg::writer::file_writer::location_generator::{
128//! DefaultFileNameGenerator, DefaultLocationGenerator,
129//! };
130//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
131//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
132//! use parquet::file::properties::WriterProperties;
133//!
134//! #[derive(Clone)]
135//! struct LatencyRecordWriterBuilder<B> {
136//! inner_writer_builder: B,
137//! }
138//!
139//! impl<B: IcebergWriterBuilder> LatencyRecordWriterBuilder<B> {
140//! pub fn new(inner_writer_builder: B) -> Self {
141//! Self {
142//! inner_writer_builder,
143//! }
144//! }
145//! }
146//!
147//! #[async_trait::async_trait]
148//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
149//! type R = LatencyRecordWriter<B::R>;
150//!
151//! async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
152//! Ok(LatencyRecordWriter {
153//! inner_writer: self.inner_writer_builder.build(partition_key).await?,
154//! })
155//! }
156//! }
157//! struct LatencyRecordWriter<W> {
158//! inner_writer: W,
159//! }
160//!
161//! #[async_trait::async_trait]
162//! impl<W: IcebergWriter> IcebergWriter for LatencyRecordWriter<W> {
163//! async fn write(&mut self, input: RecordBatch) -> Result<()> {
164//! let start = Instant::now();
165//! self.inner_writer.write(input).await?;
166//! let _latency = start.elapsed();
167//! // record latency...
168//! Ok(())
169//! }
170//!
171//! async fn close(&mut self) -> Result<Vec<DataFile>> {
172//! let start = Instant::now();
173//! let res = self.inner_writer.close().await?;
174//! let _latency = start.elapsed();
175//! // record latency...
176//! Ok(res)
177//! }
178//! }
179//!
180//! #[tokio::main]
181//! async fn main() -> Result<()> {
182//! // Connect to a catalog.
183//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE;
184//! use iceberg::spec::{Literal, PartitionKey, Struct};
185//! use iceberg::writer::file_writer::rolling_writer::{
186//! RollingFileWriter, RollingFileWriterBuilder,
187//! };
188//!
189//! let catalog = MemoryCatalogBuilder::default()
190//! .load(
191//! "memory",
192//! HashMap::from([(
193//! MEMORY_CATALOG_WAREHOUSE.to_string(),
194//! "file:///path/to/warehouse".to_string(),
195//! )]),
196//! )
197//! .await?;
198//!
199//! // Add customized code to create a table first.
200//!
201//! // Load table from catalog.
202//! let table = catalog
203//! .load_table(&TableIdent::from_strs(["hello", "world"])?)
204//! .await?;
205//! let partition_key = PartitionKey::new(
206//! table.metadata().default_partition_spec().as_ref().clone(),
207//! table.metadata().current_schema().clone(),
208//! Struct::from_iter(vec![Some(Literal::string("Seattle"))]),
209//! );
210//! let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
211//! let file_name_generator = DefaultFileNameGenerator::new(
212//! "test".to_string(),
213//! None,
214//! iceberg::spec::DataFileFormat::Parquet,
215//! );
216//!
217//! // Create a parquet file writer builder. The parameter can get from table.
218//! let schema = table.metadata().current_schema().clone();
219//! let parquet_writer_builder =
220//! ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
221//!
222//! // Create a rolling file writer
223//! let rolling_file_writer_builder = RollingFileWriterBuilder::new(
224//! parquet_writer_builder,
225//! schema,
226//! 512 * 1024 * 1024,
227//! table.file_io().clone(),
228//! location_generator.clone(),
229//! file_name_generator.clone(),
230//! );
231//!
232//! // Create a data file writer builder using rolling file writer.
233//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
234//! // Create latency record writer using data file writer builder.
235//! let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
236//! // Build the final writer
237//! let mut latency_record_data_file_writer = latency_record_builder
238//! .build(Some(partition_key))
239//! .await
240//! .unwrap();
241//!
242//! Ok(())
243//! }
244//! ```
245//!
246//! # Adding Partitioning to Data File Writers
247//!
248//! You can wrap a `DataFileWriter` with partitioning writers to handle partitioned tables.
249//! Iceberg provides two partitioning strategies:
250//!
251//! ## FanoutWriter - For Unsorted Data
252//!
253//! Wraps the data file writer to handle unsorted data by maintaining multiple active writers.
254//! Use this when your data is not pre-sorted by partition key. Writes to different partitions
255//! can happen in any order, even interleaved.
256//!
257//! ```rust, no_run
258//! # // Same setup as the simple example above...
259//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
260//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
261//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
262//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
263//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
264//! # use iceberg::writer::file_writer::location_generator::{
265//! # DefaultFileNameGenerator, DefaultLocationGenerator,
266//! # };
267//! # use parquet::file::properties::WriterProperties;
268//! # use std::collections::HashMap;
269//! # #[tokio::main]
270//! # async fn main() -> Result<()> {
271//! # let catalog = MemoryCatalogBuilder::default()
272//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
273//! # .await?;
274//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
275//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
276//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
277//! # let schema = table.metadata().current_schema().clone();
278//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
279//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
280//! # parquet_writer_builder, schema, table.file_io().clone(), location_generator, file_name_generator);
281//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
282//!
283//! // Wrap the data file writer with FanoutWriter for partitioning
284//! use iceberg::writer::partitioning::fanout_writer::FanoutWriter;
285//! use iceberg::writer::partitioning::PartitioningWriter;
286//! use iceberg::spec::{Literal, PartitionKey, Struct};
287//!
288//! let mut fanout_writer = FanoutWriter::new(data_file_writer_builder);
289//!
290//! // Create partition keys for different regions
291//! let schema = table.metadata().current_schema().clone();
292//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
293//!
294//! let partition_key_us = PartitionKey::new(
295//! partition_spec.clone(),
296//! schema.clone(),
297//! Struct::from_iter([Some(Literal::string("US"))]),
298//! );
299//!
300//! let partition_key_eu = PartitionKey::new(
301//! partition_spec.clone(),
302//! schema.clone(),
303//! Struct::from_iter([Some(Literal::string("EU"))]),
304//! );
305//!
306//! // Write to different partitions in any order - can interleave partition writes
307//! // fanout_writer.write(partition_key_us.clone(), batch_us1).await?;
308//! // fanout_writer.write(partition_key_eu.clone(), batch_eu1).await?;
309//! // fanout_writer.write(partition_key_us.clone(), batch_us2).await?; // Back to US - OK!
310//! // fanout_writer.write(partition_key_eu.clone(), batch_eu2).await?; // Back to EU - OK!
311//!
312//! let data_files = fanout_writer.close().await?;
313//! # Ok(())
314//! # }
315//! ```
316//!
317//! ## ClusteredWriter - For Sorted Data
318//!
319//! Wraps the data file writer for pre-sorted data. More memory efficient as it maintains
320//! only one active writer at a time, but requires input sorted by partition key.
321//!
322//! ```rust, no_run
323//! # // Same setup as the simple example above...
324//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
325//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
326//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
327//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
328//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
329//! # use iceberg::writer::file_writer::location_generator::{
330//! # DefaultFileNameGenerator, DefaultLocationGenerator,
331//! # };
332//! # use parquet::file::properties::WriterProperties;
333//! # use std::collections::HashMap;
334//! # #[tokio::main]
335//! # async fn main() -> Result<()> {
336//! # let catalog = MemoryCatalogBuilder::default()
337//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
338//! # .await?;
339//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
340//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
341//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
342//! # let schema = table.metadata().current_schema().clone();
343//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
344//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
345//! # parquet_writer_builder, schema, table.file_io().clone(), location_generator, file_name_generator);
346//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
347//!
348//! // Wrap the data file writer with ClusteredWriter for sorted partitioning
349//! use iceberg::writer::partitioning::clustered_writer::ClusteredWriter;
350//! use iceberg::writer::partitioning::PartitioningWriter;
351//! use iceberg::spec::{Literal, PartitionKey, Struct};
352//!
353//! let mut clustered_writer = ClusteredWriter::new(data_file_writer_builder);
354//!
355//! // Create partition keys (must write in sorted order)
356//! let schema = table.metadata().current_schema().clone();
357//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
358//!
359//! let partition_key_asia = PartitionKey::new(
360//! partition_spec.clone(),
361//! schema.clone(),
362//! Struct::from_iter([Some(Literal::string("ASIA"))]),
363//! );
364//!
365//! let partition_key_eu = PartitionKey::new(
366//! partition_spec.clone(),
367//! schema.clone(),
368//! Struct::from_iter([Some(Literal::string("EU"))]),
369//! );
370//!
371//! let partition_key_us = PartitionKey::new(
372//! partition_spec.clone(),
373//! schema.clone(),
374//! Struct::from_iter([Some(Literal::string("US"))]),
375//! );
376//!
377//! // Write to partitions in sorted order (ASIA -> EU -> US)
378//! // clustered_writer.write(partition_key_asia, batch_asia).await?;
379//! // clustered_writer.write(partition_key_eu, batch_eu).await?;
380//! // clustered_writer.write(partition_key_us, batch_us).await?;
381//! // Writing back to ASIA would fail since data must be sorted!
382//!
383//! let data_files = clustered_writer.close().await?;
384//!
385//! Ok(())
386//! }
387//! ```
388
389pub mod base_writer;
390pub mod combined_writer;
391pub mod file_writer;
392pub mod partitioning;
393
394use arrow_array::RecordBatch;
395
396use crate::Result;
397use crate::spec::{DataFile, PartitionKey};
398
399type DefaultInput = RecordBatch;
400type DefaultOutput = Vec<DataFile>;
401
402/// The builder for iceberg writer.
403#[async_trait::async_trait]
404pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: Send + Sync + 'static {
405 /// The associated writer type.
406 type R: IcebergWriter<I, O>;
407 /// Build the iceberg writer with an optional partition key.
408 async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
409}
410
411/// The iceberg writer used to write data to iceberg table.
412#[async_trait::async_trait]
413pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
414 /// Write data to iceberg table.
415 async fn write(&mut self, input: I) -> Result<()>;
416 /// Close the writer and return the written data files.
417 /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again.
418 /// # NOTE
419 /// After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic.
420 async fn close(&mut self) -> Result<O>;
421}
422
423/// The current file status of the Iceberg writer.
424/// This is implemented for writers that write a single file at a time.
425pub trait CurrentFileStatus {
426 /// Get the current file path.
427 fn current_file_path(&self) -> String;
428 /// Get the current file row number.
429 fn current_row_num(&self) -> usize;
430 /// Get the current file written size.
431 fn current_written_size(&self) -> usize;
432 /// Get the current schema used by the writer.
433 fn current_schema(&self) -> crate::spec::SchemaRef;
434}
435
436#[cfg(test)]
437mod tests {
438 use arrow_array::RecordBatch;
439 use arrow_schema::Schema;
440 use arrow_select::concat::concat_batches;
441 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
442
443 use super::IcebergWriter;
444 use crate::io::FileIO;
445 use crate::spec::{DataFile, DataFileFormat};
446
447 // This function is used to guarantee the trait can be used as an object safe trait.
448 async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
449 let _ = w
450 .write(RecordBatch::new_empty(Schema::empty().into()))
451 .await;
452 let _ = w.close().await;
453 }
454
455 // This function check:
456 // The data of the written parquet file is correct.
457 // The metadata of the data file is consistent with the written parquet file.
458 pub(crate) async fn check_parquet_data_file(
459 file_io: &FileIO,
460 data_file: &DataFile,
461 batch: &RecordBatch,
462 ) {
463 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
464
465 let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
466 // read the written file
467 let input_content = input_file.read().await.unwrap();
468 let reader_builder =
469 ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
470
471 // check data
472 let reader = reader_builder.build().unwrap();
473 let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
474 let res = concat_batches(&batch.schema(), &batches).unwrap();
475 assert_eq!(*batch, res);
476 }
477}