iceberg/spec/manifest/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27    UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34    DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35    ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, UNASSIGNED_SNAPSHOT_ID,
36};
37use crate::{Error, ErrorKind};
38
39/// The builder used to create a [`ManifestWriter`].
40pub struct ManifestWriterBuilder {
41    output: OutputFile,
42    snapshot_id: Option<i64>,
43    key_metadata: Option<Vec<u8>>,
44    schema: SchemaRef,
45    partition_spec: PartitionSpec,
46}
47
48impl ManifestWriterBuilder {
49    /// Create a new builder.
50    pub fn new(
51        output: OutputFile,
52        snapshot_id: Option<i64>,
53        key_metadata: Option<Vec<u8>>,
54        schema: SchemaRef,
55        partition_spec: PartitionSpec,
56    ) -> Self {
57        Self {
58            output,
59            snapshot_id,
60            key_metadata,
61            schema,
62            partition_spec,
63        }
64    }
65
66    /// Build a [`ManifestWriter`] for format version 1.
67    pub fn build_v1(self) -> ManifestWriter {
68        let metadata = ManifestMetadata::builder()
69            .schema_id(self.schema.schema_id())
70            .schema(self.schema)
71            .partition_spec(self.partition_spec)
72            .format_version(FormatVersion::V1)
73            .content(ManifestContentType::Data)
74            .build();
75        ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
76    }
77
78    /// Build a [`ManifestWriter`] for format version 2, data content.
79    pub fn build_v2_data(self) -> ManifestWriter {
80        let metadata = ManifestMetadata::builder()
81            .schema_id(self.schema.schema_id())
82            .schema(self.schema)
83            .partition_spec(self.partition_spec)
84            .format_version(FormatVersion::V2)
85            .content(ManifestContentType::Data)
86            .build();
87        ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
88    }
89
90    /// Build a [`ManifestWriter`] for format version 2, deletes content.
91    pub fn build_v2_deletes(self) -> ManifestWriter {
92        let metadata = ManifestMetadata::builder()
93            .schema_id(self.schema.schema_id())
94            .schema(self.schema)
95            .partition_spec(self.partition_spec)
96            .format_version(FormatVersion::V2)
97            .content(ManifestContentType::Deletes)
98            .build();
99        ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
100    }
101}
102
103/// A manifest writer.
104pub struct ManifestWriter {
105    output: OutputFile,
106
107    snapshot_id: Option<i64>,
108
109    added_files: u32,
110    added_rows: u64,
111    existing_files: u32,
112    existing_rows: u64,
113    deleted_files: u32,
114    deleted_rows: u64,
115
116    min_seq_num: Option<i64>,
117
118    key_metadata: Option<Vec<u8>>,
119
120    manifest_entries: Vec<ManifestEntry>,
121
122    metadata: ManifestMetadata,
123}
124
125impl ManifestWriter {
126    /// Create a new manifest writer.
127    pub(crate) fn new(
128        output: OutputFile,
129        snapshot_id: Option<i64>,
130        key_metadata: Option<Vec<u8>>,
131        metadata: ManifestMetadata,
132    ) -> Self {
133        Self {
134            output,
135            snapshot_id,
136            added_files: 0,
137            added_rows: 0,
138            existing_files: 0,
139            existing_rows: 0,
140            deleted_files: 0,
141            deleted_rows: 0,
142            min_seq_num: None,
143            key_metadata,
144            manifest_entries: Vec::new(),
145            metadata,
146        }
147    }
148
149    fn construct_partition_summaries(
150        &mut self,
151        partition_type: &StructType,
152    ) -> Result<Vec<FieldSummary>> {
153        let mut field_stats: Vec<_> = partition_type
154            .fields()
155            .iter()
156            .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
157            .collect();
158        for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
159            for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
160                let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
161                stat.update(primitive_literal)?;
162            }
163        }
164        Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
165    }
166
167    fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
168        match self.metadata.content {
169            ManifestContentType::Data => {
170                if data_file.content != DataContentType::Data {
171                    return Err(Error::new(
172                        ErrorKind::DataInvalid,
173                        format!(
174                            "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
175                            data_file.file_path(),
176                            data_file.content
177                        ),
178                    ));
179                }
180            }
181            ManifestContentType::Deletes => {
182                if data_file.content != DataContentType::EqualityDeletes
183                    && data_file.content != DataContentType::PositionDeletes
184                {
185                    return Err(Error::new(
186                        ErrorKind::DataInvalid,
187                        format!(
188                            "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
189                            data_file.file_path(),
190                            data_file.content
191                        ),
192                    ));
193                }
194            }
195        }
196        Ok(())
197    }
198
199    /// Add a new manifest entry. This method will update following status of the entry:
200    /// - Update the entry status to `Added`
201    /// - Set the snapshot id to the current snapshot id
202    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
203    /// - Set the file sequence number to `None`
204    pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
205        self.check_data_file(&entry.data_file)?;
206        if entry.sequence_number().is_some_and(|n| n >= 0) {
207            entry.status = ManifestStatus::Added;
208            entry.snapshot_id = self.snapshot_id;
209            entry.file_sequence_number = None;
210        } else {
211            entry.status = ManifestStatus::Added;
212            entry.snapshot_id = self.snapshot_id;
213            entry.sequence_number = None;
214            entry.file_sequence_number = None;
215        };
216        self.add_entry_inner(entry)?;
217        Ok(())
218    }
219
220    /// Add file as an added entry with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
221    /// number will be the provided data sequence number. The entry's file sequence number will be
222    /// assigned at commit.
223    pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
224        self.check_data_file(&data_file)?;
225        let entry = ManifestEntry {
226            status: ManifestStatus::Added,
227            snapshot_id: self.snapshot_id,
228            sequence_number: (sequence_number >= 0).then_some(sequence_number),
229            file_sequence_number: None,
230            data_file,
231        };
232        self.add_entry_inner(entry)?;
233        Ok(())
234    }
235
236    /// Add a delete manifest entry. This method will update following status of the entry:
237    /// - Update the entry status to `Deleted`
238    /// - Set the snapshot id to the current snapshot id
239    ///
240    /// # TODO
241    /// Remove this allow later
242    #[allow(dead_code)]
243    pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
244        self.check_data_file(&entry.data_file)?;
245        entry.status = ManifestStatus::Deleted;
246        entry.snapshot_id = self.snapshot_id;
247        self.add_entry_inner(entry)?;
248        Ok(())
249    }
250
251    /// Add a file as delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID.
252    /// However, the original data and file sequence numbers of the file must be preserved when
253    /// the file is marked as deleted.
254    pub fn add_delete_file(
255        &mut self,
256        data_file: DataFile,
257        sequence_number: i64,
258        file_sequence_number: Option<i64>,
259    ) -> Result<()> {
260        self.check_data_file(&data_file)?;
261        let entry = ManifestEntry {
262            status: ManifestStatus::Deleted,
263            snapshot_id: self.snapshot_id,
264            sequence_number: Some(sequence_number),
265            file_sequence_number,
266            data_file,
267        };
268        self.add_entry_inner(entry)?;
269        Ok(())
270    }
271
272    /// Add an existing manifest entry. This method will update following status of the entry:
273    /// - Update the entry status to `Existing`
274    ///
275    /// # TODO
276    /// Remove this allow later
277    #[allow(dead_code)]
278    pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
279        self.check_data_file(&entry.data_file)?;
280        entry.status = ManifestStatus::Existing;
281        self.add_entry_inner(entry)?;
282        Ok(())
283    }
284
285    /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
286    /// which were assigned at commit, must be preserved when adding an existing entry.
287    pub fn add_existing_file(
288        &mut self,
289        data_file: DataFile,
290        snapshot_id: i64,
291        sequence_number: i64,
292        file_sequence_number: Option<i64>,
293    ) -> Result<()> {
294        self.check_data_file(&data_file)?;
295        let entry = ManifestEntry {
296            status: ManifestStatus::Existing,
297            snapshot_id: Some(snapshot_id),
298            sequence_number: Some(sequence_number),
299            file_sequence_number,
300            data_file,
301        };
302        self.add_entry_inner(entry)?;
303        Ok(())
304    }
305
306    fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
307        // Check if the entry has sequence number
308        if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
309            && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
310        {
311            return Err(Error::new(
312                ErrorKind::DataInvalid,
313                "Manifest entry with status Existing or Deleted should have sequence number",
314            ));
315        }
316
317        // Update the statistics
318        match entry.status {
319            ManifestStatus::Added => {
320                self.added_files += 1;
321                self.added_rows += entry.data_file.record_count;
322            }
323            ManifestStatus::Deleted => {
324                self.deleted_files += 1;
325                self.deleted_rows += entry.data_file.record_count;
326            }
327            ManifestStatus::Existing => {
328                self.existing_files += 1;
329                self.existing_rows += entry.data_file.record_count;
330            }
331        }
332        if entry.is_alive() {
333            if let Some(seq_num) = entry.sequence_number {
334                self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
335            }
336        }
337        self.manifest_entries.push(entry);
338        Ok(())
339    }
340
341    /// Write manifest file and return it.
342    pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
343        // Create the avro writer
344        let partition_type = self
345            .metadata
346            .partition_spec
347            .partition_type(&self.metadata.schema)?;
348        let table_schema = &self.metadata.schema;
349        let avro_schema = match self.metadata.format_version {
350            FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
351            FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
352        };
353        let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
354        avro_writer.add_user_metadata(
355            "schema".to_string(),
356            to_vec(table_schema).map_err(|err| {
357                Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
358                    .with_source(err)
359            })?,
360        )?;
361        avro_writer.add_user_metadata(
362            "schema-id".to_string(),
363            table_schema.schema_id().to_string(),
364        )?;
365        avro_writer.add_user_metadata(
366            "partition-spec".to_string(),
367            to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
368                Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
369                    .with_source(err)
370            })?,
371        )?;
372        avro_writer.add_user_metadata(
373            "partition-spec-id".to_string(),
374            self.metadata.partition_spec.spec_id().to_string(),
375        )?;
376        avro_writer.add_user_metadata(
377            "format-version".to_string(),
378            (self.metadata.format_version as u8).to_string(),
379        )?;
380        if self.metadata.format_version == FormatVersion::V2 {
381            avro_writer
382                .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
383        }
384
385        let partition_summary = self.construct_partition_summaries(&partition_type)?;
386        // Write manifest entries
387        for entry in std::mem::take(&mut self.manifest_entries) {
388            let value = match self.metadata.format_version {
389                FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
390                    .resolve(&avro_schema)?,
391                FormatVersion::V2 => to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
392                    .resolve(&avro_schema)?,
393            };
394
395            avro_writer.append(value)?;
396        }
397
398        let content = avro_writer.into_inner()?;
399        let length = content.len();
400        self.output.write(Bytes::from(content)).await?;
401
402        Ok(ManifestFile {
403            manifest_path: self.output.location().to_string(),
404            manifest_length: length as i64,
405            partition_spec_id: self.metadata.partition_spec.spec_id(),
406            content: self.metadata.content,
407            // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
408            // real sequence number in `ManifestListWriter`.
409            sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
410            min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
411            added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
412            added_files_count: Some(self.added_files),
413            existing_files_count: Some(self.existing_files),
414            deleted_files_count: Some(self.deleted_files),
415            added_rows_count: Some(self.added_rows),
416            existing_rows_count: Some(self.existing_rows),
417            deleted_rows_count: Some(self.deleted_rows),
418            partitions: Some(partition_summary),
419            key_metadata: self.key_metadata,
420        })
421    }
422}
423
424struct PartitionFieldStats {
425    partition_type: PrimitiveType,
426
427    contains_null: bool,
428    contains_nan: Option<bool>,
429    lower_bound: Option<Datum>,
430    upper_bound: Option<Datum>,
431}
432
433impl PartitionFieldStats {
434    pub(crate) fn new(partition_type: PrimitiveType) -> Self {
435        Self {
436            partition_type,
437            contains_null: false,
438            contains_nan: Some(false),
439            upper_bound: None,
440            lower_bound: None,
441        }
442    }
443
444    pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
445        let Some(value) = value else {
446            self.contains_null = true;
447            return Ok(());
448        };
449        if !self.partition_type.compatible(&value) {
450            return Err(Error::new(
451                ErrorKind::DataInvalid,
452                "value is not compatible with type",
453            ));
454        }
455        let value = Datum::new(self.partition_type.clone(), value);
456
457        if value.is_nan() {
458            self.contains_nan = Some(true);
459            return Ok(());
460        }
461
462        self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
463            if value < original {
464                value.clone()
465            } else {
466                original
467            }
468        }));
469        self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
470            if value > original { value } else { original }
471        }));
472
473        Ok(())
474    }
475
476    pub(crate) fn finish(self) -> FieldSummary {
477        FieldSummary {
478            contains_null: self.contains_null,
479            contains_nan: self.contains_nan,
480            upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
481            lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
482        }
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use std::collections::HashMap;
489    use std::fs;
490    use std::sync::Arc;
491
492    use tempfile::TempDir;
493
494    use super::*;
495    use crate::io::FileIOBuilder;
496    use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
497
498    #[tokio::test]
499    async fn test_add_delete_existing() {
500        let schema = Arc::new(
501            Schema::builder()
502                .with_fields(vec![
503                    Arc::new(NestedField::optional(
504                        1,
505                        "id",
506                        Type::Primitive(PrimitiveType::Int),
507                    )),
508                    Arc::new(NestedField::optional(
509                        2,
510                        "name",
511                        Type::Primitive(PrimitiveType::String),
512                    )),
513                ])
514                .build()
515                .unwrap(),
516        );
517        let metadata = ManifestMetadata {
518            schema_id: 0,
519            schema: schema.clone(),
520            partition_spec: PartitionSpec::builder(schema)
521                .with_spec_id(0)
522                .build()
523                .unwrap(),
524            content: ManifestContentType::Data,
525            format_version: FormatVersion::V2,
526        };
527        let mut entries = vec![
528                ManifestEntry {
529                    status: ManifestStatus::Added,
530                    snapshot_id: None,
531                    sequence_number: Some(1),
532                    file_sequence_number: Some(1),
533                    data_file: DataFile {
534                        content: DataContentType::Data,
535                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
536                        file_format: DataFileFormat::Parquet,
537                        partition: Struct::empty(),
538                        record_count: 1,
539                        file_size_in_bytes: 5442,
540                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
541                        value_counts: HashMap::from([(1, 1), (2, 1)]),
542                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
543                        nan_value_counts: HashMap::new(),
544                        lower_bounds: HashMap::new(),
545                        upper_bounds: HashMap::new(),
546                        key_metadata: Some(Vec::new()),
547                        split_offsets: vec![4],
548                        equality_ids: None,
549                        sort_order_id: None,
550                        partition_spec_id: 0,
551                        first_row_id: None,
552                        referenced_data_file: None,
553                        content_offset: None,
554                        content_size_in_bytes: None,
555                    },
556                },
557                ManifestEntry {
558                    status: ManifestStatus::Deleted,
559                    snapshot_id: Some(1),
560                    sequence_number: Some(1),
561                    file_sequence_number: Some(1),
562                    data_file: DataFile {
563                        content: DataContentType::Data,
564                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
565                        file_format: DataFileFormat::Parquet,
566                        partition: Struct::empty(),
567                        record_count: 1,
568                        file_size_in_bytes: 5442,
569                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
570                        value_counts: HashMap::from([(1, 1), (2, 1)]),
571                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
572                        nan_value_counts: HashMap::new(),
573                        lower_bounds: HashMap::new(),
574                        upper_bounds: HashMap::new(),
575                        key_metadata: Some(Vec::new()),
576                        split_offsets: vec![4],
577                        equality_ids: None,
578                        sort_order_id: None,
579                        partition_spec_id: 0,
580                        first_row_id: None,
581                        referenced_data_file: None,
582                        content_offset: None,
583                        content_size_in_bytes: None,
584                    },
585                },
586                ManifestEntry {
587                    status: ManifestStatus::Existing,
588                    snapshot_id: Some(1),
589                    sequence_number: Some(1),
590                    file_sequence_number: Some(1),
591                    data_file: DataFile {
592                        content: DataContentType::Data,
593                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
594                        file_format: DataFileFormat::Parquet,
595                        partition: Struct::empty(),
596                        record_count: 1,
597                        file_size_in_bytes: 5442,
598                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
599                        value_counts: HashMap::from([(1, 1), (2, 1)]),
600                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
601                        nan_value_counts: HashMap::new(),
602                        lower_bounds: HashMap::new(),
603                        upper_bounds: HashMap::new(),
604                        key_metadata: Some(Vec::new()),
605                        split_offsets: vec![4],
606                        equality_ids: None,
607                        sort_order_id: None,
608                        partition_spec_id: 0,
609                        first_row_id: None,
610                        referenced_data_file: None,
611                        content_offset: None,
612                        content_size_in_bytes: None,
613                    },
614                },
615            ];
616
617        // write manifest to file
618        let tmp_dir = TempDir::new().unwrap();
619        let path = tmp_dir.path().join("test_manifest.avro");
620        let io = FileIOBuilder::new_fs_io().build().unwrap();
621        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
622        let mut writer = ManifestWriterBuilder::new(
623            output_file,
624            Some(3),
625            None,
626            metadata.schema.clone(),
627            metadata.partition_spec.clone(),
628        )
629        .build_v2_data();
630        writer.add_entry(entries[0].clone()).unwrap();
631        writer.add_delete_entry(entries[1].clone()).unwrap();
632        writer.add_existing_entry(entries[2].clone()).unwrap();
633        writer.write_manifest_file().await.unwrap();
634
635        // read back the manifest file and check the content
636        let actual_manifest =
637            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
638                .unwrap();
639
640        // The snapshot id is assigned when the entry is added and delete to the manifest. Existing entries are keep original.
641        entries[0].snapshot_id = Some(3);
642        entries[1].snapshot_id = Some(3);
643        // file sequence number is assigned to None when the entry is added and delete to the manifest.
644        entries[0].file_sequence_number = None;
645        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
646    }
647}