iceberg/writer/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Iceberg writer module.
19//!
20//! This module contains the generic writer trait and specific writer implementation. We categorize the writer into two types:
21//! 1. FileWriter: writer for physical file format (Such as parquet, orc).
22//! 2. IcebergWriter: writer for logical format provided by iceberg table (Such as data file, equality delete file, position delete file)
23//! or other function (Such as partition writer, delta writer).
24//!
25//! The IcebergWriter will use the inner FileWriter to write physical files.
26//!
27//! The writer interface is designed to be extensible and flexible. Writers can be independently configured
28//! and composed to support complex write logic. E.g. By combining `FanoutPartitionWriter`, `DataFileWriter`, and `ParquetWriter`,
29//! you can build a writer that automatically partitions the data and writes it in the Parquet format.
30//!
31//! For this purpose, there are four trait corresponding to these writer:
32//! - IcebergWriterBuilder
33//! - IcebergWriter
34//! - FileWriterBuilder
35//! - FileWriter
36//!
37//! Users can create specific writer builders, combine them, and build the final writer.
38//! They can also define custom writers by implementing the `Writer` trait,
39//! allowing seamless integration with existing writers. (See the example below.)
40//!
41//! # Simple example for the data file writer used parquet physical format:
42//! ```rust, no_run
43//! use std::collections::HashMap;
44//! use std::sync::Arc;
45//!
46//! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
47//! use async_trait::async_trait;
48//! use iceberg::io::{FileIO, FileIOBuilder};
49//! use iceberg::spec::DataFile;
50//! use iceberg::transaction::Transaction;
51//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
52//! use iceberg::writer::file_writer::ParquetWriterBuilder;
53//! use iceberg::writer::file_writer::location_generator::{
54//! DefaultFileNameGenerator, DefaultLocationGenerator,
55//! };
56//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
57//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
58//! use parquet::file::properties::WriterProperties;
59//! #[tokio::main]
60//! async fn main() -> Result<()> {
61//! // Connect to a catalog.
62//! use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
63//! let catalog = MemoryCatalogBuilder::default()
64//! .load(
65//! "memory",
66//! HashMap::from([(
67//! MEMORY_CATALOG_WAREHOUSE.to_string(),
68//! "file:///path/to/warehouse".to_string(),
69//! )]),
70//! )
71//! .await?;
72//! // Add customized code to create a table first.
73//!
74//! // Load table from catalog.
75//! let table = catalog
76//! .load_table(&TableIdent::from_strs(["hello", "world"])?)
77//! .await?;
78//! let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
79//! let file_name_generator = DefaultFileNameGenerator::new(
80//! "test".to_string(),
81//! None,
82//! iceberg::spec::DataFileFormat::Parquet,
83//! );
84//!
85//! // Create a parquet file writer builder. The parameter can get from table.
86//! let parquet_writer_builder = ParquetWriterBuilder::new(
87//! WriterProperties::default(),
88//! table.metadata().current_schema().clone(),
89//! None,
90//! table.file_io().clone(),
91//! location_generator.clone(),
92//! file_name_generator.clone(),
93//! );
94//! // Create a data file writer using parquet file writer builder.
95//! let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
96//! // Build the data file writer
97//! let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
98//!
99//! // Write the data using data_file_writer...
100//!
101//! // Close the write and it will return data files back
102//! let data_files = data_file_writer.close().await.unwrap();
103//!
104//! Ok(())
105//! }
106//! ```
107//!
108//! # Custom writer to record latency
109//! ```rust, no_run
110//! use std::collections::HashMap;
111//! use std::time::Instant;
112//!
113//! use arrow_array::RecordBatch;
114//! use iceberg::io::FileIOBuilder;
115//! use iceberg::memory::MemoryCatalogBuilder;
116//! use iceberg::spec::DataFile;
117//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
118//! use iceberg::writer::file_writer::ParquetWriterBuilder;
119//! use iceberg::writer::file_writer::location_generator::{
120//! DefaultFileNameGenerator, DefaultLocationGenerator,
121//! };
122//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
123//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
124//! use parquet::file::properties::WriterProperties;
125//!
126//! #[derive(Clone)]
127//! struct LatencyRecordWriterBuilder<B> {
128//! inner_writer_builder: B,
129//! }
130//!
131//! impl<B: IcebergWriterBuilder> LatencyRecordWriterBuilder<B> {
132//! pub fn new(inner_writer_builder: B) -> Self {
133//! Self {
134//! inner_writer_builder,
135//! }
136//! }
137//! }
138//!
139//! #[async_trait::async_trait]
140//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
141//! type R = LatencyRecordWriter<B::R>;
142//!
143//! async fn build(self) -> Result<Self::R> {
144//! Ok(LatencyRecordWriter {
145//! inner_writer: self.inner_writer_builder.build().await?,
146//! })
147//! }
148//! }
149//! struct LatencyRecordWriter<W> {
150//! inner_writer: W,
151//! }
152//!
153//! #[async_trait::async_trait]
154//! impl<W: IcebergWriter> IcebergWriter for LatencyRecordWriter<W> {
155//! async fn write(&mut self, input: RecordBatch) -> Result<()> {
156//! let start = Instant::now();
157//! self.inner_writer.write(input).await?;
158//! let _latency = start.elapsed();
159//! // record latency...
160//! Ok(())
161//! }
162//!
163//! async fn close(&mut self) -> Result<Vec<DataFile>> {
164//! let start = Instant::now();
165//! let res = self.inner_writer.close().await?;
166//! let _latency = start.elapsed();
167//! // record latency...
168//! Ok(res)
169//! }
170//! }
171//!
172//! #[tokio::main]
173//! async fn main() -> Result<()> {
174//! // Connect to a catalog.
175//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE;
176//! use iceberg::spec::{Literal, PartitionKey, Struct};
177//!
178//! let catalog = MemoryCatalogBuilder::default()
179//! .load(
180//! "memory",
181//! HashMap::from([(
182//! MEMORY_CATALOG_WAREHOUSE.to_string(),
183//! "file:///path/to/warehouse".to_string(),
184//! )]),
185//! )
186//! .await?;
187//!
188//! // Add customized code to create a table first.
189//!
190//! // Load table from catalog.
191//! let table = catalog
192//! .load_table(&TableIdent::from_strs(["hello", "world"])?)
193//! .await?;
194//! let partition_key = PartitionKey::new(
195//! table.metadata().default_partition_spec().as_ref().clone(),
196//! table.metadata().current_schema().clone(),
197//! Struct::from_iter(vec![Some(Literal::string("Seattle"))]),
198//! );
199//! let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
200//! let file_name_generator = DefaultFileNameGenerator::new(
201//! "test".to_string(),
202//! None,
203//! iceberg::spec::DataFileFormat::Parquet,
204//! );
205//!
206//! // Create a parquet file writer builder. The parameter can get from table.
207//! let parquet_writer_builder = ParquetWriterBuilder::new(
208//! WriterProperties::default(),
209//! table.metadata().current_schema().clone(),
210//! Some(partition_key),
211//! table.file_io().clone(),
212//! location_generator.clone(),
213//! file_name_generator.clone(),
214//! );
215//! // Create a data file writer builder using parquet file writer builder.
216//! let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
217//! // Create latency record writer using data file writer builder.
218//! let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
219//! // Build the final writer
220//! let mut latency_record_data_file_writer = latency_record_builder.build().await.unwrap();
221//!
222//! Ok(())
223//! }
224//! ```
225
226pub mod base_writer;
227pub mod combined_writer;
228pub mod file_writer;
229
230use arrow_array::RecordBatch;
231
232use crate::Result;
233use crate::spec::{DataFile, SchemaRef};
234
235type DefaultInput = RecordBatch;
236type DefaultOutput = Vec<DataFile>;
237
238/// The builder for iceberg writer.
239#[async_trait::async_trait]
240pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
241 Send + Clone + 'static
242{
243 /// The associated writer type.
244 type R: IcebergWriter<I, O>;
245 /// Build the iceberg writer.
246 async fn build(self) -> Result<Self::R>;
247}
248
249/// The iceberg writer used to write data to iceberg table.
250#[async_trait::async_trait]
251pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
252 /// Write data to iceberg table.
253 async fn write(&mut self, input: I) -> Result<()>;
254 /// Close the writer and return the written data files.
255 /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again.
256 /// # NOTE
257 /// After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic.
258 async fn close(&mut self) -> Result<O>;
259}
260
261/// The current file status of the Iceberg writer.
262/// This is implemented for writers that write a single file at a time.
263pub trait CurrentFileStatus {
264 /// Get the current file path.
265 fn current_file_path(&self) -> String;
266 /// Get the current file row number.
267 fn current_row_num(&self) -> usize;
268 /// Get the current file written size.
269 fn current_written_size(&self) -> usize;
270 /// Get the current schema used by the writer.
271 fn current_schema(&self) -> SchemaRef;
272}
273
274#[cfg(test)]
275mod tests {
276 use arrow_array::RecordBatch;
277 use arrow_schema::Schema;
278 use arrow_select::concat::concat_batches;
279 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
280
281 use super::IcebergWriter;
282 use crate::io::FileIO;
283 use crate::spec::{DataFile, DataFileFormat};
284
285 // This function is used to guarantee the trait can be used as an object safe trait.
286 async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
287 let _ = w
288 .write(RecordBatch::new_empty(Schema::empty().into()))
289 .await;
290 let _ = w.close().await;
291 }
292
293 // This function check:
294 // The data of the written parquet file is correct.
295 // The metadata of the data file is consistent with the written parquet file.
296 pub(crate) async fn check_parquet_data_file(
297 file_io: &FileIO,
298 data_file: &DataFile,
299 batch: &RecordBatch,
300 ) {
301 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
302
303 let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
304 // read the written file
305 let input_content = input_file.read().await.unwrap();
306 let reader_builder =
307 ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
308
309 // check data
310 let reader = reader_builder.build().unwrap();
311 let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
312 let res = concat_batches(&batch.schema(), &batches).unwrap();
313 assert_eq!(*batch, res);
314 }
315}