iceberg/spec/
manifest_list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ManifestList for Iceberg.
19
20use std::collections::HashMap;
21use std::str::FromStr;
22
23use apache_avro::types::Value;
24use apache_avro::{Reader, Writer, from_value};
25use bytes::Bytes;
26pub use serde_bytes::ByteBuf;
27use serde_derive::{Deserialize, Serialize};
28
29use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2};
30use self::_serde::{ManifestFileV1, ManifestFileV2};
31use super::{FormatVersion, Manifest};
32use crate::error::Result;
33use crate::io::{FileIO, OutputFile};
34use crate::{Error, ErrorKind};
35
36/// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write.
37pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1;
38
39/// Snapshots are embedded in table metadata, but the list of manifests for a
40/// snapshot are stored in a separate manifest list file.
41///
42/// A new manifest list is written for each attempt to commit a snapshot
43/// because the list of manifests always changes to produce a new snapshot.
44/// When a manifest list is written, the (optimistic) sequence number of the
45/// snapshot is written for all new manifest files tracked by the list.
46///
47/// A manifest list includes summary metadata that can be used to avoid
48/// scanning all of the manifests in a snapshot when planning a table scan.
49/// This includes the number of added, existing, and deleted files, and a
50/// summary of values for each field of the partition spec used to write the
51/// manifest.
52#[derive(Debug, Clone, PartialEq)]
53pub struct ManifestList {
54    /// Entries in a manifest list.
55    entries: Vec<ManifestFile>,
56}
57
58impl ManifestList {
59    /// Parse manifest list from bytes.
60    pub fn parse_with_version(bs: &[u8], version: FormatVersion) -> Result<ManifestList> {
61        match version {
62            FormatVersion::V1 => {
63                let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
64                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
65                from_value::<_serde::ManifestListV1>(&values)?.try_into()
66            }
67            FormatVersion::V2 => {
68                let reader = Reader::new(bs)?;
69                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
70                from_value::<_serde::ManifestListV2>(&values)?.try_into()
71            }
72        }
73    }
74
75    /// Get the entries in the manifest list.
76    pub fn entries(&self) -> &[ManifestFile] {
77        &self.entries
78    }
79
80    /// Take ownership of the entries in the manifest list, consuming it
81    pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {
82        Box::new(self.entries.into_iter())
83    }
84}
85
86/// A manifest list writer.
87pub struct ManifestListWriter {
88    format_version: FormatVersion,
89    output_file: OutputFile,
90    avro_writer: Writer<'static, Vec<u8>>,
91    sequence_number: i64,
92    snapshot_id: i64,
93}
94
95impl std::fmt::Debug for ManifestListWriter {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("ManifestListWriter")
98            .field("format_version", &self.format_version)
99            .field("output_file", &self.output_file)
100            .field("avro_writer", &self.avro_writer.schema())
101            .finish_non_exhaustive()
102    }
103}
104
105impl ManifestListWriter {
106    /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
107    pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
108        let mut metadata = HashMap::from_iter([
109            ("snapshot-id".to_string(), snapshot_id.to_string()),
110            ("format-version".to_string(), "1".to_string()),
111        ]);
112        if let Some(parent_snapshot_id) = parent_snapshot_id {
113            metadata.insert(
114                "parent-snapshot-id".to_string(),
115                parent_snapshot_id.to_string(),
116            );
117        }
118        Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
119    }
120
121    /// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
122    pub fn v2(
123        output_file: OutputFile,
124        snapshot_id: i64,
125        parent_snapshot_id: Option<i64>,
126        sequence_number: i64,
127    ) -> Self {
128        let mut metadata = HashMap::from_iter([
129            ("snapshot-id".to_string(), snapshot_id.to_string()),
130            ("sequence-number".to_string(), sequence_number.to_string()),
131            ("format-version".to_string(), "2".to_string()),
132        ]);
133        metadata.insert(
134            "parent-snapshot-id".to_string(),
135            parent_snapshot_id
136                .map(|v| v.to_string())
137                .unwrap_or("null".to_string()),
138        );
139        Self::new(
140            FormatVersion::V2,
141            output_file,
142            metadata,
143            sequence_number,
144            snapshot_id,
145        )
146    }
147
148    fn new(
149        format_version: FormatVersion,
150        output_file: OutputFile,
151        metadata: HashMap<String, String>,
152        sequence_number: i64,
153        snapshot_id: i64,
154    ) -> Self {
155        let avro_schema = match format_version {
156            FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
157            FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
158        };
159        let mut avro_writer = Writer::new(avro_schema, Vec::new());
160        for (key, value) in metadata {
161            avro_writer
162                .add_user_metadata(key, value)
163                .expect("Avro metadata should be added to the writer before the first record.");
164        }
165        Self {
166            format_version,
167            output_file,
168            avro_writer,
169            sequence_number,
170            snapshot_id,
171        }
172    }
173
174    /// Append manifests to be written.
175    pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
176        match self.format_version {
177            FormatVersion::V1 => {
178                for manifest in manifests {
179                    let manifes: ManifestFileV1 = manifest.try_into()?;
180                    self.avro_writer.append_ser(manifes)?;
181                }
182            }
183            FormatVersion::V2 => {
184                for mut manifest in manifests {
185                    if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
186                        if manifest.added_snapshot_id != self.snapshot_id {
187                            return Err(Error::new(
188                                ErrorKind::DataInvalid,
189                                format!(
190                                    "Found unassigned sequence number for a manifest from snapshot {}.",
191                                    manifest.added_snapshot_id
192                                ),
193                            ));
194                        }
195                        manifest.sequence_number = self.sequence_number;
196                    }
197                    if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
198                        if manifest.added_snapshot_id != self.snapshot_id {
199                            return Err(Error::new(
200                                ErrorKind::DataInvalid,
201                                format!(
202                                    "Found unassigned sequence number for a manifest from snapshot {}.",
203                                    manifest.added_snapshot_id
204                                ),
205                            ));
206                        }
207                        manifest.min_sequence_number = self.sequence_number;
208                    }
209                    let manifest_entry: ManifestFileV2 = manifest.try_into()?;
210                    self.avro_writer.append_ser(manifest_entry)?;
211                }
212            }
213        }
214        Ok(())
215    }
216
217    /// Write the manifest list to the output file.
218    pub async fn close(self) -> Result<()> {
219        let data = self.avro_writer.into_inner()?;
220        let mut writer = self.output_file.writer().await?;
221        writer.write(Bytes::from(data)).await?;
222        writer.close().await?;
223        Ok(())
224    }
225}
226
227/// This is a helper module that defines the schema field of the manifest list entry.
228mod _const_schema {
229    use std::sync::Arc;
230
231    use apache_avro::Schema as AvroSchema;
232    use once_cell::sync::Lazy;
233
234    use crate::avro::schema_to_avro_schema;
235    use crate::spec::{
236        ListType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type,
237    };
238
239    static MANIFEST_PATH: Lazy<NestedFieldRef> = {
240        Lazy::new(|| {
241            Arc::new(NestedField::required(
242                500,
243                "manifest_path",
244                Type::Primitive(PrimitiveType::String),
245            ))
246        })
247    };
248    static MANIFEST_LENGTH: Lazy<NestedFieldRef> = {
249        Lazy::new(|| {
250            Arc::new(NestedField::required(
251                501,
252                "manifest_length",
253                Type::Primitive(PrimitiveType::Long),
254            ))
255        })
256    };
257    static PARTITION_SPEC_ID: Lazy<NestedFieldRef> = {
258        Lazy::new(|| {
259            Arc::new(NestedField::required(
260                502,
261                "partition_spec_id",
262                Type::Primitive(PrimitiveType::Int),
263            ))
264        })
265    };
266    static CONTENT: Lazy<NestedFieldRef> = {
267        Lazy::new(|| {
268            Arc::new(NestedField::required(
269                517,
270                "content",
271                Type::Primitive(PrimitiveType::Int),
272            ))
273        })
274    };
275    static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
276        Lazy::new(|| {
277            Arc::new(NestedField::required(
278                515,
279                "sequence_number",
280                Type::Primitive(PrimitiveType::Long),
281            ))
282        })
283    };
284    static MIN_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
285        Lazy::new(|| {
286            Arc::new(NestedField::required(
287                516,
288                "min_sequence_number",
289                Type::Primitive(PrimitiveType::Long),
290            ))
291        })
292    };
293    static ADDED_SNAPSHOT_ID: Lazy<NestedFieldRef> = {
294        Lazy::new(|| {
295            Arc::new(NestedField::required(
296                503,
297                "added_snapshot_id",
298                Type::Primitive(PrimitiveType::Long),
299            ))
300        })
301    };
302    static ADDED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
303        Lazy::new(|| {
304            Arc::new(NestedField::required(
305                504,
306                "added_files_count",
307                Type::Primitive(PrimitiveType::Int),
308            ))
309        })
310    };
311    static ADDED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
312        Lazy::new(|| {
313            Arc::new(NestedField::optional(
314                504,
315                "added_data_files_count",
316                Type::Primitive(PrimitiveType::Int),
317            ))
318        })
319    };
320    static EXISTING_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
321        Lazy::new(|| {
322            Arc::new(NestedField::required(
323                505,
324                "existing_files_count",
325                Type::Primitive(PrimitiveType::Int),
326            ))
327        })
328    };
329    static EXISTING_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
330        Lazy::new(|| {
331            Arc::new(NestedField::optional(
332                505,
333                "existing_data_files_count",
334                Type::Primitive(PrimitiveType::Int),
335            ))
336        })
337    };
338    static DELETED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
339        Lazy::new(|| {
340            Arc::new(NestedField::required(
341                506,
342                "deleted_files_count",
343                Type::Primitive(PrimitiveType::Int),
344            ))
345        })
346    };
347    static DELETED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
348        Lazy::new(|| {
349            Arc::new(NestedField::optional(
350                506,
351                "deleted_data_files_count",
352                Type::Primitive(PrimitiveType::Int),
353            ))
354        })
355    };
356    static ADDED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
357        Lazy::new(|| {
358            Arc::new(NestedField::required(
359                512,
360                "added_rows_count",
361                Type::Primitive(PrimitiveType::Long),
362            ))
363        })
364    };
365    static ADDED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
366        Lazy::new(|| {
367            Arc::new(NestedField::optional(
368                512,
369                "added_rows_count",
370                Type::Primitive(PrimitiveType::Long),
371            ))
372        })
373    };
374    static EXISTING_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
375        Lazy::new(|| {
376            Arc::new(NestedField::required(
377                513,
378                "existing_rows_count",
379                Type::Primitive(PrimitiveType::Long),
380            ))
381        })
382    };
383    static EXISTING_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
384        Lazy::new(|| {
385            Arc::new(NestedField::optional(
386                513,
387                "existing_rows_count",
388                Type::Primitive(PrimitiveType::Long),
389            ))
390        })
391    };
392    static DELETED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
393        Lazy::new(|| {
394            Arc::new(NestedField::required(
395                514,
396                "deleted_rows_count",
397                Type::Primitive(PrimitiveType::Long),
398            ))
399        })
400    };
401    static DELETED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
402        Lazy::new(|| {
403            Arc::new(NestedField::optional(
404                514,
405                "deleted_rows_count",
406                Type::Primitive(PrimitiveType::Long),
407            ))
408        })
409    };
410    static PARTITIONS: Lazy<NestedFieldRef> = {
411        Lazy::new(|| {
412            // element type
413            let fields = vec![
414                Arc::new(NestedField::required(
415                    509,
416                    "contains_null",
417                    Type::Primitive(PrimitiveType::Boolean),
418                )),
419                Arc::new(NestedField::optional(
420                    518,
421                    "contains_nan",
422                    Type::Primitive(PrimitiveType::Boolean),
423                )),
424                Arc::new(NestedField::optional(
425                    510,
426                    "lower_bound",
427                    Type::Primitive(PrimitiveType::Binary),
428                )),
429                Arc::new(NestedField::optional(
430                    511,
431                    "upper_bound",
432                    Type::Primitive(PrimitiveType::Binary),
433                )),
434            ];
435            let element_field = Arc::new(NestedField::required(
436                508,
437                "r_508",
438                Type::Struct(StructType::new(fields)),
439            ));
440            Arc::new(NestedField::optional(
441                507,
442                "partitions",
443                Type::List(ListType { element_field }),
444            ))
445        })
446    };
447    static KEY_METADATA: Lazy<NestedFieldRef> = {
448        Lazy::new(|| {
449            Arc::new(NestedField::optional(
450                519,
451                "key_metadata",
452                Type::Primitive(PrimitiveType::Binary),
453            ))
454        })
455    };
456
457    static V1_SCHEMA: Lazy<Schema> = {
458        Lazy::new(|| {
459            let fields = vec![
460                MANIFEST_PATH.clone(),
461                MANIFEST_LENGTH.clone(),
462                PARTITION_SPEC_ID.clone(),
463                ADDED_SNAPSHOT_ID.clone(),
464                ADDED_FILES_COUNT_V1.clone().to_owned(),
465                EXISTING_FILES_COUNT_V1.clone(),
466                DELETED_FILES_COUNT_V1.clone(),
467                ADDED_ROWS_COUNT_V1.clone(),
468                EXISTING_ROWS_COUNT_V1.clone(),
469                DELETED_ROWS_COUNT_V1.clone(),
470                PARTITIONS.clone(),
471                KEY_METADATA.clone(),
472            ];
473            Schema::builder().with_fields(fields).build().unwrap()
474        })
475    };
476
477    static V2_SCHEMA: Lazy<Schema> = {
478        Lazy::new(|| {
479            let fields = vec![
480                MANIFEST_PATH.clone(),
481                MANIFEST_LENGTH.clone(),
482                PARTITION_SPEC_ID.clone(),
483                CONTENT.clone(),
484                SEQUENCE_NUMBER.clone(),
485                MIN_SEQUENCE_NUMBER.clone(),
486                ADDED_SNAPSHOT_ID.clone(),
487                ADDED_FILES_COUNT_V2.clone(),
488                EXISTING_FILES_COUNT_V2.clone(),
489                DELETED_FILES_COUNT_V2.clone(),
490                ADDED_ROWS_COUNT_V2.clone(),
491                EXISTING_ROWS_COUNT_V2.clone(),
492                DELETED_ROWS_COUNT_V2.clone(),
493                PARTITIONS.clone(),
494                KEY_METADATA.clone(),
495            ];
496            Schema::builder().with_fields(fields).build().unwrap()
497        })
498    };
499
500    pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V1: Lazy<AvroSchema> =
501        Lazy::new(|| schema_to_avro_schema("manifest_file", &V1_SCHEMA).unwrap());
502
503    pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V2: Lazy<AvroSchema> =
504        Lazy::new(|| schema_to_avro_schema("manifest_file", &V2_SCHEMA).unwrap());
505}
506
507/// Entry in a manifest list.
508#[derive(Debug, PartialEq, Clone, Eq, Hash)]
509pub struct ManifestFile {
510    /// field: 500
511    ///
512    /// Location of the manifest file
513    pub manifest_path: String,
514    /// field: 501
515    ///
516    /// Length of the manifest file in bytes
517    pub manifest_length: i64,
518    /// field: 502
519    ///
520    /// ID of a partition spec used to write the manifest; must be listed
521    /// in table metadata partition-specs
522    pub partition_spec_id: i32,
523    /// field: 517
524    ///
525    /// The type of files tracked by the manifest, either data or delete
526    /// files; 0 for all v1 manifests
527    pub content: ManifestContentType,
528    /// field: 515
529    ///
530    /// The sequence number when the manifest was added to the table; use 0
531    /// when reading v1 manifest lists
532    pub sequence_number: i64,
533    /// field: 516
534    ///
535    /// The minimum data sequence number of all live data or delete files in
536    /// the manifest; use 0 when reading v1 manifest lists
537    pub min_sequence_number: i64,
538    /// field: 503
539    ///
540    /// ID of the snapshot where the manifest file was added
541    pub added_snapshot_id: i64,
542    /// field: 504
543    ///
544    /// Number of entries in the manifest that have status ADDED, when null
545    /// this is assumed to be non-zero
546    pub added_files_count: Option<u32>,
547    /// field: 505
548    ///
549    /// Number of entries in the manifest that have status EXISTING (0),
550    /// when null this is assumed to be non-zero
551    pub existing_files_count: Option<u32>,
552    /// field: 506
553    ///
554    /// Number of entries in the manifest that have status DELETED (2),
555    /// when null this is assumed to be non-zero
556    pub deleted_files_count: Option<u32>,
557    /// field: 512
558    ///
559    /// Number of rows in all of files in the manifest that have status
560    /// ADDED, when null this is assumed to be non-zero
561    pub added_rows_count: Option<u64>,
562    /// field: 513
563    ///
564    /// Number of rows in all of files in the manifest that have status
565    /// EXISTING, when null this is assumed to be non-zero
566    pub existing_rows_count: Option<u64>,
567    /// field: 514
568    ///
569    /// Number of rows in all of files in the manifest that have status
570    /// DELETED, when null this is assumed to be non-zero
571    pub deleted_rows_count: Option<u64>,
572    /// field: 507
573    /// element_field: 508
574    ///
575    /// A list of field summaries for each partition field in the spec. Each
576    /// field in the list corresponds to a field in the manifest file’s
577    /// partition spec.
578    pub partitions: Option<Vec<FieldSummary>>,
579    /// field: 519
580    ///
581    /// Implementation-specific key metadata for encryption
582    pub key_metadata: Option<Vec<u8>>,
583}
584
585impl ManifestFile {
586    /// Checks if the manifest file has any added files.
587    pub fn has_added_files(&self) -> bool {
588        self.added_files_count.map(|c| c > 0).unwrap_or(true)
589    }
590
591    /// Checks whether this manifest contains entries with DELETED status.
592    pub fn has_deleted_files(&self) -> bool {
593        self.deleted_files_count.map(|c| c > 0).unwrap_or(true)
594    }
595
596    /// Checks if the manifest file has any existed files.
597    pub fn has_existing_files(&self) -> bool {
598        self.existing_files_count.map(|c| c > 0).unwrap_or(true)
599    }
600}
601
602/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
603#[derive(Debug, PartialEq, Clone, Copy, Eq, Hash, Default)]
604pub enum ManifestContentType {
605    /// The manifest content is data.
606    #[default]
607    Data = 0,
608    /// The manifest content is deletes.
609    Deletes = 1,
610}
611
612impl FromStr for ManifestContentType {
613    type Err = Error;
614
615    fn from_str(s: &str) -> Result<Self> {
616        match s {
617            "data" => Ok(ManifestContentType::Data),
618            "deletes" => Ok(ManifestContentType::Deletes),
619            _ => Err(Error::new(
620                ErrorKind::DataInvalid,
621                format!("Invalid manifest content type: {s}"),
622            )),
623        }
624    }
625}
626
627impl std::fmt::Display for ManifestContentType {
628    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
629        match self {
630            ManifestContentType::Data => write!(f, "data"),
631            ManifestContentType::Deletes => write!(f, "deletes"),
632        }
633    }
634}
635
636impl TryFrom<i32> for ManifestContentType {
637    type Error = Error;
638
639    fn try_from(value: i32) -> std::result::Result<Self, Self::Error> {
640        match value {
641            0 => Ok(ManifestContentType::Data),
642            1 => Ok(ManifestContentType::Deletes),
643            _ => Err(Error::new(
644                crate::ErrorKind::DataInvalid,
645                format!(
646                    "Invalid manifest content type. Expected 0 or 1, got {}",
647                    value
648                ),
649            )),
650        }
651    }
652}
653
654impl ManifestFile {
655    /// Load [`Manifest`].
656    ///
657    /// This method will also initialize inherited values of [`ManifestEntry`], such as `sequence_number`.
658    pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
659        let avro = file_io.new_input(&self.manifest_path)?.read().await?;
660
661        let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?;
662
663        // Let entries inherit values from the manifest list entry.
664        for entry in &mut entries {
665            entry.inherit_data(self);
666        }
667
668        Ok(Manifest::new(metadata, entries))
669    }
670}
671
672/// Field summary for partition field in the spec.
673///
674/// Each field in the list corresponds to a field in the manifest file’s partition spec.
675#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Hash)]
676pub struct FieldSummary {
677    /// field: 509
678    ///
679    /// Whether the manifest contains at least one partition with a null
680    /// value for the field
681    pub contains_null: bool,
682    /// field: 518
683    /// Whether the manifest contains at least one partition with a NaN
684    /// value for the field
685    pub contains_nan: Option<bool>,
686    /// field: 510
687    /// The minimum value for the field in the manifests
688    /// partitions.
689    pub lower_bound: Option<ByteBuf>,
690    /// field: 511
691    /// The maximum value for the field in the manifests
692    /// partitions.
693    pub upper_bound: Option<ByteBuf>,
694}
695
696/// This is a helper module that defines types to help with serialization/deserialization.
697/// For deserialization the input first gets read into either the [ManifestFileV1] or [ManifestFileV2] struct
698/// and then converted into the [ManifestFile] struct. Serialization works the other way around.
699/// [ManifestFileV1] and [ManifestFileV2] are internal struct that are only used for serialization and deserialization.
700pub(super) mod _serde {
701    pub use serde_bytes::ByteBuf;
702    use serde_derive::{Deserialize, Serialize};
703
704    use super::ManifestFile;
705    use crate::Error;
706    use crate::error::Result;
707    use crate::spec::FieldSummary;
708
709    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
710    #[serde(transparent)]
711    pub(crate) struct ManifestListV2 {
712        entries: Vec<ManifestFileV2>,
713    }
714
715    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
716    #[serde(transparent)]
717    pub(crate) struct ManifestListV1 {
718        entries: Vec<ManifestFileV1>,
719    }
720
721    impl ManifestListV2 {
722        /// Converts the [ManifestListV2] into a [ManifestList].
723        pub fn try_into(self) -> Result<super::ManifestList> {
724            Ok(super::ManifestList {
725                entries: self
726                    .entries
727                    .into_iter()
728                    .map(|v| v.try_into())
729                    .collect::<Result<Vec<_>>>()?,
730            })
731        }
732    }
733
734    impl TryFrom<super::ManifestList> for ManifestListV2 {
735        type Error = Error;
736
737        fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
738            Ok(Self {
739                entries: value
740                    .entries
741                    .into_iter()
742                    .map(|v| v.try_into())
743                    .collect::<std::result::Result<Vec<_>, _>>()?,
744            })
745        }
746    }
747
748    impl ManifestListV1 {
749        /// Converts the [ManifestListV1] into a [ManifestList].
750        pub fn try_into(self) -> Result<super::ManifestList> {
751            Ok(super::ManifestList {
752                entries: self
753                    .entries
754                    .into_iter()
755                    .map(|v| v.try_into())
756                    .collect::<Result<Vec<_>>>()?,
757            })
758        }
759    }
760
761    impl TryFrom<super::ManifestList> for ManifestListV1 {
762        type Error = Error;
763
764        fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
765            Ok(Self {
766                entries: value
767                    .entries
768                    .into_iter()
769                    .map(|v| v.try_into())
770                    .collect::<std::result::Result<Vec<_>, _>>()?,
771            })
772        }
773    }
774
775    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
776    pub(super) struct ManifestFileV1 {
777        pub manifest_path: String,
778        pub manifest_length: i64,
779        pub partition_spec_id: i32,
780        pub added_snapshot_id: i64,
781        pub added_data_files_count: Option<i32>,
782        pub existing_data_files_count: Option<i32>,
783        pub deleted_data_files_count: Option<i32>,
784        pub added_rows_count: Option<i64>,
785        pub existing_rows_count: Option<i64>,
786        pub deleted_rows_count: Option<i64>,
787        pub partitions: Option<Vec<FieldSummary>>,
788        pub key_metadata: Option<ByteBuf>,
789    }
790
791    // Aliases were added to fields that were renamed in Iceberg  1.5.0 (https://github.com/apache/iceberg/pull/5338), in order to support both conventions/versions.
792    // In the current implementation deserialization is done using field names, and therefore these fields may appear as either.
793    // see issue that raised this here: https://github.com/apache/iceberg-rust/issues/338
794    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
795    pub(super) struct ManifestFileV2 {
796        pub manifest_path: String,
797        pub manifest_length: i64,
798        pub partition_spec_id: i32,
799        #[serde(default = "v2_default_content_for_v1")]
800        pub content: i32,
801        #[serde(default = "v2_default_sequence_number_for_v1")]
802        pub sequence_number: i64,
803        #[serde(default = "v2_default_min_sequence_number_for_v1")]
804        pub min_sequence_number: i64,
805        pub added_snapshot_id: i64,
806        #[serde(alias = "added_data_files_count", alias = "added_files_count")]
807        pub added_files_count: i32,
808        #[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
809        pub existing_files_count: i32,
810        #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
811        pub deleted_files_count: i32,
812        pub added_rows_count: i64,
813        pub existing_rows_count: i64,
814        pub deleted_rows_count: i64,
815        pub partitions: Option<Vec<FieldSummary>>,
816        pub key_metadata: Option<ByteBuf>,
817    }
818
819    impl ManifestFileV2 {
820        /// Converts the [ManifestFileV2] into a [ManifestFile].
821        pub fn try_into(self) -> Result<ManifestFile> {
822            Ok(ManifestFile {
823                manifest_path: self.manifest_path,
824                manifest_length: self.manifest_length,
825                partition_spec_id: self.partition_spec_id,
826                content: self.content.try_into()?,
827                sequence_number: self.sequence_number,
828                min_sequence_number: self.min_sequence_number,
829                added_snapshot_id: self.added_snapshot_id,
830                added_files_count: Some(self.added_files_count.try_into()?),
831                existing_files_count: Some(self.existing_files_count.try_into()?),
832                deleted_files_count: Some(self.deleted_files_count.try_into()?),
833                added_rows_count: Some(self.added_rows_count.try_into()?),
834                existing_rows_count: Some(self.existing_rows_count.try_into()?),
835                deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
836                partitions: self.partitions,
837                key_metadata: self.key_metadata.map(|b| b.into_vec()),
838            })
839        }
840    }
841
842    fn v2_default_content_for_v1() -> i32 {
843        super::ManifestContentType::Data as i32
844    }
845
846    fn v2_default_sequence_number_for_v1() -> i64 {
847        0
848    }
849
850    fn v2_default_min_sequence_number_for_v1() -> i64 {
851        0
852    }
853
854    impl ManifestFileV1 {
855        /// Converts the [ManifestFileV1] into a [ManifestFile].
856        pub fn try_into(self) -> Result<ManifestFile> {
857            Ok(ManifestFile {
858                manifest_path: self.manifest_path,
859                manifest_length: self.manifest_length,
860                partition_spec_id: self.partition_spec_id,
861                added_snapshot_id: self.added_snapshot_id,
862                added_files_count: self
863                    .added_data_files_count
864                    .map(TryInto::try_into)
865                    .transpose()?,
866                existing_files_count: self
867                    .existing_data_files_count
868                    .map(TryInto::try_into)
869                    .transpose()?,
870                deleted_files_count: self
871                    .deleted_data_files_count
872                    .map(TryInto::try_into)
873                    .transpose()?,
874                added_rows_count: self.added_rows_count.map(TryInto::try_into).transpose()?,
875                existing_rows_count: self
876                    .existing_rows_count
877                    .map(TryInto::try_into)
878                    .transpose()?,
879                deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?,
880                partitions: self.partitions,
881                key_metadata: self.key_metadata.map(|b| b.into_vec()),
882                // as ref: https://iceberg.apache.org/spec/#partitioning
883                // use 0 when reading v1 manifest lists
884                content: super::ManifestContentType::Data,
885                sequence_number: 0,
886                min_sequence_number: 0,
887            })
888        }
889    }
890
891    fn convert_to_serde_key_metadata(key_metadata: Option<Vec<u8>>) -> Option<ByteBuf> {
892        match key_metadata {
893            Some(metadata) if !metadata.is_empty() => Some(ByteBuf::from(metadata)),
894            _ => None,
895        }
896    }
897
898    impl TryFrom<ManifestFile> for ManifestFileV2 {
899        type Error = Error;
900
901        fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
902            let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
903            Ok(Self {
904                manifest_path: value.manifest_path,
905                manifest_length: value.manifest_length,
906                partition_spec_id: value.partition_spec_id,
907                content: value.content as i32,
908                sequence_number: value.sequence_number,
909                min_sequence_number: value.min_sequence_number,
910                added_snapshot_id: value.added_snapshot_id,
911                added_files_count: value
912                    .added_files_count
913                    .ok_or_else(|| {
914                        Error::new(
915                            crate::ErrorKind::DataInvalid,
916                            "added_data_files_count in ManifestFileV2 should be require",
917                        )
918                    })?
919                    .try_into()?,
920                existing_files_count: value
921                    .existing_files_count
922                    .ok_or_else(|| {
923                        Error::new(
924                            crate::ErrorKind::DataInvalid,
925                            "existing_data_files_count in ManifestFileV2 should be require",
926                        )
927                    })?
928                    .try_into()?,
929                deleted_files_count: value
930                    .deleted_files_count
931                    .ok_or_else(|| {
932                        Error::new(
933                            crate::ErrorKind::DataInvalid,
934                            "deleted_data_files_count in ManifestFileV2 should be require",
935                        )
936                    })?
937                    .try_into()?,
938                added_rows_count: value
939                    .added_rows_count
940                    .ok_or_else(|| {
941                        Error::new(
942                            crate::ErrorKind::DataInvalid,
943                            "added_rows_count in ManifestFileV2 should be require",
944                        )
945                    })?
946                    .try_into()?,
947                existing_rows_count: value
948                    .existing_rows_count
949                    .ok_or_else(|| {
950                        Error::new(
951                            crate::ErrorKind::DataInvalid,
952                            "existing_rows_count in ManifestFileV2 should be require",
953                        )
954                    })?
955                    .try_into()?,
956                deleted_rows_count: value
957                    .deleted_rows_count
958                    .ok_or_else(|| {
959                        Error::new(
960                            crate::ErrorKind::DataInvalid,
961                            "deleted_rows_count in ManifestFileV2 should be require",
962                        )
963                    })?
964                    .try_into()?,
965                partitions: value.partitions,
966                key_metadata,
967            })
968        }
969    }
970
971    impl TryFrom<ManifestFile> for ManifestFileV1 {
972        type Error = Error;
973
974        fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
975            let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
976            Ok(Self {
977                manifest_path: value.manifest_path,
978                manifest_length: value.manifest_length,
979                partition_spec_id: value.partition_spec_id,
980                added_snapshot_id: value.added_snapshot_id,
981                added_data_files_count: value
982                    .added_files_count
983                    .map(TryInto::try_into)
984                    .transpose()?,
985                existing_data_files_count: value
986                    .existing_files_count
987                    .map(TryInto::try_into)
988                    .transpose()?,
989                deleted_data_files_count: value
990                    .deleted_files_count
991                    .map(TryInto::try_into)
992                    .transpose()?,
993                added_rows_count: value.added_rows_count.map(TryInto::try_into).transpose()?,
994                existing_rows_count: value
995                    .existing_rows_count
996                    .map(TryInto::try_into)
997                    .transpose()?,
998                deleted_rows_count: value
999                    .deleted_rows_count
1000                    .map(TryInto::try_into)
1001                    .transpose()?,
1002                partitions: value.partitions,
1003                key_metadata,
1004            })
1005        }
1006    }
1007}
1008
1009#[cfg(test)]
1010mod test {
1011    use std::fs;
1012
1013    use apache_avro::{Reader, Schema};
1014    use tempfile::TempDir;
1015
1016    use super::_serde::ManifestListV2;
1017    use crate::io::FileIOBuilder;
1018    use crate::spec::manifest_list::_serde::ManifestListV1;
1019    use crate::spec::{
1020        Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter,
1021        UNASSIGNED_SEQUENCE_NUMBER,
1022    };
1023
1024    #[tokio::test]
1025    async fn test_parse_manifest_list_v1() {
1026        let manifest_list = ManifestList {
1027            entries: vec![
1028                ManifestFile {
1029                    manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1030                    manifest_length: 5806,
1031                    partition_spec_id: 0,
1032                    content: ManifestContentType::Data,
1033                    sequence_number: 0,
1034                    min_sequence_number: 0,
1035                    added_snapshot_id: 1646658105718557341,
1036                    added_files_count: Some(3),
1037                    existing_files_count: Some(0),
1038                    deleted_files_count: Some(0),
1039                    added_rows_count: Some(3),
1040                    existing_rows_count: Some(0),
1041                    deleted_rows_count: Some(0),
1042                    partitions: Some(vec![]),
1043                    key_metadata: None,
1044                }
1045            ]
1046        };
1047
1048        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1049
1050        let tmp_dir = TempDir::new().unwrap();
1051        let file_name = "simple_manifest_list_v1.avro";
1052        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1053
1054        let mut writer = ManifestListWriter::v1(
1055            file_io.new_output(full_path.clone()).unwrap(),
1056            1646658105718557341,
1057            Some(1646658105718557341),
1058        );
1059
1060        writer
1061            .add_manifests(manifest_list.entries.clone().into_iter())
1062            .unwrap();
1063        writer.close().await.unwrap();
1064
1065        let bs = fs::read(full_path).expect("read_file must succeed");
1066
1067        let parsed_manifest_list =
1068            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1069
1070        assert_eq!(manifest_list, parsed_manifest_list);
1071    }
1072
1073    #[tokio::test]
1074    async fn test_parse_manifest_list_v2() {
1075        let manifest_list = ManifestList {
1076            entries: vec![
1077                ManifestFile {
1078                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1079                    manifest_length: 6926,
1080                    partition_spec_id: 1,
1081                    content: ManifestContentType::Data,
1082                    sequence_number: 1,
1083                    min_sequence_number: 1,
1084                    added_snapshot_id: 377075049360453639,
1085                    added_files_count: Some(1),
1086                    existing_files_count: Some(0),
1087                    deleted_files_count: Some(0),
1088                    added_rows_count: Some(3),
1089                    existing_rows_count: Some(0),
1090                    deleted_rows_count: Some(0),
1091                    partitions: Some(
1092                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1093                    ),
1094                    key_metadata: None,
1095                },
1096                ManifestFile {
1097                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
1098                    manifest_length: 6926,
1099                    partition_spec_id: 2,
1100                    content: ManifestContentType::Data,
1101                    sequence_number: 1,
1102                    min_sequence_number: 1,
1103                    added_snapshot_id: 377075049360453639,
1104                    added_files_count: Some(1),
1105                    existing_files_count: Some(0),
1106                    deleted_files_count: Some(0),
1107                    added_rows_count: Some(3),
1108                    existing_rows_count: Some(0),
1109                    deleted_rows_count: Some(0),
1110                    partitions: Some(
1111                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
1112                    ),
1113                    key_metadata: None,
1114                }
1115            ]
1116        };
1117
1118        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1119
1120        let tmp_dir = TempDir::new().unwrap();
1121        let file_name = "simple_manifest_list_v1.avro";
1122        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1123
1124        let mut writer = ManifestListWriter::v2(
1125            file_io.new_output(full_path.clone()).unwrap(),
1126            1646658105718557341,
1127            Some(1646658105718557341),
1128            1,
1129        );
1130
1131        writer
1132            .add_manifests(manifest_list.entries.clone().into_iter())
1133            .unwrap();
1134        writer.close().await.unwrap();
1135
1136        let bs = fs::read(full_path).expect("read_file must succeed");
1137
1138        let parsed_manifest_list =
1139            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1140
1141        assert_eq!(manifest_list, parsed_manifest_list);
1142    }
1143
1144    #[test]
1145    fn test_serialize_manifest_list_v1() {
1146        let manifest_list:ManifestListV1 = ManifestList {
1147            entries: vec![ManifestFile {
1148                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1149                manifest_length: 5806,
1150                partition_spec_id: 0,
1151                content: ManifestContentType::Data,
1152                sequence_number: 0,
1153                min_sequence_number: 0,
1154                added_snapshot_id: 1646658105718557341,
1155                added_files_count: Some(3),
1156                existing_files_count: Some(0),
1157                deleted_files_count: Some(0),
1158                added_rows_count: Some(3),
1159                existing_rows_count: Some(0),
1160                deleted_rows_count: Some(0),
1161                partitions: None,
1162                key_metadata: None,
1163            }]
1164        }.try_into().unwrap();
1165        let result = serde_json::to_string(&manifest_list).unwrap();
1166        assert_eq!(
1167            result,
1168            r#"[{"manifest_path":"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro","manifest_length":5806,"partition_spec_id":0,"added_snapshot_id":1646658105718557341,"added_data_files_count":3,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":null,"key_metadata":null}]"#
1169        );
1170    }
1171
1172    #[test]
1173    fn test_serialize_manifest_list_v2() {
1174        let manifest_list:ManifestListV2 = ManifestList {
1175            entries: vec![ManifestFile {
1176                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1177                manifest_length: 6926,
1178                partition_spec_id: 1,
1179                content: ManifestContentType::Data,
1180                sequence_number: 1,
1181                min_sequence_number: 1,
1182                added_snapshot_id: 377075049360453639,
1183                added_files_count: Some(1),
1184                existing_files_count: Some(0),
1185                deleted_files_count: Some(0),
1186                added_rows_count: Some(3),
1187                existing_rows_count: Some(0),
1188                deleted_rows_count: Some(0),
1189                partitions: Some(
1190                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1191                ),
1192                key_metadata: None,
1193            }]
1194        }.try_into().unwrap();
1195        let result = serde_json::to_string(&manifest_list).unwrap();
1196        assert_eq!(
1197            result,
1198            r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null}]"#
1199        );
1200    }
1201
1202    #[tokio::test]
1203    async fn test_manifest_list_writer_v1() {
1204        let expected_manifest_list = ManifestList {
1205            entries: vec![ManifestFile {
1206                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1207                manifest_length: 5806,
1208                partition_spec_id: 1,
1209                content: ManifestContentType::Data,
1210                sequence_number: 0,
1211                min_sequence_number: 0,
1212                added_snapshot_id: 1646658105718557341,
1213                added_files_count: Some(3),
1214                existing_files_count: Some(0),
1215                deleted_files_count: Some(0),
1216                added_rows_count: Some(3),
1217                existing_rows_count: Some(0),
1218                deleted_rows_count: Some(0),
1219                partitions: Some(
1220                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
1221                ),
1222                key_metadata: None,
1223            }]
1224        };
1225
1226        let temp_dir = TempDir::new().unwrap();
1227        let path = temp_dir.path().join("manifest_list_v1.avro");
1228        let io = FileIOBuilder::new_fs_io().build().unwrap();
1229        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1230
1231        let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1232        writer
1233            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1234            .unwrap();
1235        writer.close().await.unwrap();
1236
1237        let bs = fs::read(path).unwrap();
1238
1239        let manifest_list =
1240            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1241        assert_eq!(manifest_list, expected_manifest_list);
1242
1243        temp_dir.close().unwrap();
1244    }
1245
1246    #[tokio::test]
1247    async fn test_manifest_list_writer_v2() {
1248        let snapshot_id = 377075049360453639;
1249        let seq_num = 1;
1250        let mut expected_manifest_list = ManifestList {
1251            entries: vec![ManifestFile {
1252                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1253                manifest_length: 6926,
1254                partition_spec_id: 1,
1255                content: ManifestContentType::Data,
1256                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1257                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1258                added_snapshot_id: snapshot_id,
1259                added_files_count: Some(1),
1260                existing_files_count: Some(0),
1261                deleted_files_count: Some(0),
1262                added_rows_count: Some(3),
1263                existing_rows_count: Some(0),
1264                deleted_rows_count: Some(0),
1265                partitions: Some(
1266                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1267                ),
1268                key_metadata: None,
1269            }]
1270        };
1271
1272        let temp_dir = TempDir::new().unwrap();
1273        let path = temp_dir.path().join("manifest_list_v2.avro");
1274        let io = FileIOBuilder::new_fs_io().build().unwrap();
1275        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1276
1277        let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
1278        writer
1279            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1280            .unwrap();
1281        writer.close().await.unwrap();
1282
1283        let bs = fs::read(path).unwrap();
1284        let manifest_list =
1285            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1286        expected_manifest_list.entries[0].sequence_number = seq_num;
1287        expected_manifest_list.entries[0].min_sequence_number = seq_num;
1288        assert_eq!(manifest_list, expected_manifest_list);
1289
1290        temp_dir.close().unwrap();
1291    }
1292
1293    #[tokio::test]
1294    async fn test_manifest_list_writer_v1_as_v2() {
1295        let expected_manifest_list = ManifestList {
1296            entries: vec![ManifestFile {
1297                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1298                manifest_length: 5806,
1299                partition_spec_id: 1,
1300                content: ManifestContentType::Data,
1301                sequence_number: 0,
1302                min_sequence_number: 0,
1303                added_snapshot_id: 1646658105718557341,
1304                added_files_count: Some(3),
1305                existing_files_count: Some(0),
1306                deleted_files_count: Some(0),
1307                added_rows_count: Some(3),
1308                existing_rows_count: Some(0),
1309                deleted_rows_count: Some(0),
1310                partitions: Some(
1311                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1312                ),
1313                key_metadata: None,
1314            }]
1315        };
1316
1317        let temp_dir = TempDir::new().unwrap();
1318        let path = temp_dir.path().join("manifest_list_v1.avro");
1319        let io = FileIOBuilder::new_fs_io().build().unwrap();
1320        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1321
1322        let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1323        writer
1324            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1325            .unwrap();
1326        writer.close().await.unwrap();
1327
1328        let bs = fs::read(path).unwrap();
1329
1330        let manifest_list =
1331            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1332        assert_eq!(manifest_list, expected_manifest_list);
1333
1334        temp_dir.close().unwrap();
1335    }
1336
1337    #[tokio::test]
1338    async fn test_manifest_list_v2_deserializer_aliases() {
1339        // reading avro manifest file generated by iceberg 1.4.0
1340        let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro";
1341        let bs_1 = fs::read(avro_1_path).unwrap();
1342        let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await;
1343        assert_eq!(
1344            avro_1_fields,
1345            "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_data_files_count, existing_data_files_count, deleted_data_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1346        );
1347        // reading avro manifest file generated by iceberg 1.5.0
1348        let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro";
1349        let bs_2 = fs::read(avro_2_path).unwrap();
1350        let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await;
1351        assert_eq!(
1352            avro_2_fields,
1353            "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_files_count, existing_files_count, deleted_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1354        );
1355        // deserializing both files to ManifestList struct
1356        let _manifest_list_1 =
1357            ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2).unwrap();
1358        let _manifest_list_2 =
1359            ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2).unwrap();
1360    }
1361
1362    async fn read_avro_schema_fields_as_str(bs: Vec<u8>) -> String {
1363        let reader = Reader::new(&bs[..]).unwrap();
1364        let schema = reader.writer_schema();
1365        let fields: String = match schema {
1366            Schema::Record(record) => record
1367                .fields
1368                .iter()
1369                .map(|field| field.name.clone())
1370                .collect::<Vec<String>>()
1371                .join(", "),
1372            _ => "".to_string(),
1373        };
1374        fields
1375    }
1376
1377    #[test]
1378    fn test_manifest_content_type_default() {
1379        assert_eq!(ManifestContentType::default(), ManifestContentType::Data);
1380    }
1381
1382    #[test]
1383    fn test_manifest_content_type_default_value() {
1384        assert_eq!(ManifestContentType::default() as i32, 0);
1385    }
1386
1387    #[test]
1388    fn test_manifest_file_v1_to_v2_projection() {
1389        use crate::spec::manifest_list::_serde::ManifestFileV1;
1390
1391        // Create a V1 manifest file object (without V2 fields)
1392        let v1_manifest = ManifestFileV1 {
1393            manifest_path: "/test/manifest.avro".to_string(),
1394            manifest_length: 5806,
1395            partition_spec_id: 0,
1396            added_snapshot_id: 1646658105718557341,
1397            added_data_files_count: Some(3),
1398            existing_data_files_count: Some(0),
1399            deleted_data_files_count: Some(0),
1400            added_rows_count: Some(3),
1401            existing_rows_count: Some(0),
1402            deleted_rows_count: Some(0),
1403            partitions: None,
1404            key_metadata: None,
1405        };
1406
1407        // Convert V1 to V2 - this should apply defaults for missing V2 fields
1408        let v2_manifest: ManifestFile = v1_manifest.try_into().unwrap();
1409
1410        // Verify V1→V2 projection defaults are applied correctly
1411        assert_eq!(
1412            v2_manifest.content,
1413            ManifestContentType::Data,
1414            "V1 manifest content should default to Data (0)"
1415        );
1416        assert_eq!(
1417            v2_manifest.sequence_number, 0,
1418            "V1 manifest sequence_number should default to 0"
1419        );
1420        assert_eq!(
1421            v2_manifest.min_sequence_number, 0,
1422            "V1 manifest min_sequence_number should default to 0"
1423        );
1424
1425        // Verify other fields are preserved correctly
1426        assert_eq!(v2_manifest.manifest_path, "/test/manifest.avro");
1427        assert_eq!(v2_manifest.manifest_length, 5806);
1428        assert_eq!(v2_manifest.partition_spec_id, 0);
1429        assert_eq!(v2_manifest.added_snapshot_id, 1646658105718557341);
1430        assert_eq!(v2_manifest.added_files_count, Some(3));
1431        assert_eq!(v2_manifest.existing_files_count, Some(0));
1432        assert_eq!(v2_manifest.deleted_files_count, Some(0));
1433        assert_eq!(v2_manifest.added_rows_count, Some(3));
1434        assert_eq!(v2_manifest.existing_rows_count, Some(0));
1435        assert_eq!(v2_manifest.deleted_rows_count, Some(0));
1436        assert_eq!(v2_manifest.partitions, None);
1437        assert_eq!(v2_manifest.key_metadata, None);
1438    }
1439}