iceberg/spec/manifest/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod _serde;
19
20mod data_file;
21pub use data_file::*;
22mod entry;
23pub use entry::*;
24mod metadata;
25pub use metadata::*;
26mod writer;
27use std::sync::Arc;
28
29use apache_avro::{Reader as AvroReader, from_value};
30pub use writer::*;
31
32use super::{
33    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
34    UNASSIGNED_SEQUENCE_NUMBER,
35};
36use crate::error::Result;
37use crate::{Error, ErrorKind};
38
39/// A manifest contains metadata and a list of entries.
40#[derive(Debug, PartialEq, Eq, Clone)]
41pub struct Manifest {
42    metadata: ManifestMetadata,
43    entries: Vec<ManifestEntryRef>,
44}
45
46impl Manifest {
47    /// Parse manifest metadata and entries from bytes of avro file.
48    pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec<ManifestEntry>)> {
49        let reader = AvroReader::new(bs)?;
50
51        // Parse manifest metadata
52        let meta = reader.user_metadata();
53        let metadata = ManifestMetadata::parse(meta)?;
54
55        // Parse manifest entries
56        let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?;
57
58        let entries = match metadata.format_version {
59            FormatVersion::V1 => {
60                let schema = manifest_schema_v1(&partition_type)?;
61                let reader = AvroReader::with_schema(&schema, bs)?;
62                reader
63                    .into_iter()
64                    .map(|value| {
65                        from_value::<_serde::ManifestEntryV1>(&value?)?.try_into(
66                            metadata.partition_spec.spec_id(),
67                            &partition_type,
68                            &metadata.schema,
69                        )
70                    })
71                    .collect::<Result<Vec<_>>>()?
72            }
73            FormatVersion::V2 => {
74                let schema = manifest_schema_v2(&partition_type)?;
75                let reader = AvroReader::with_schema(&schema, bs)?;
76                reader
77                    .into_iter()
78                    .map(|value| {
79                        from_value::<_serde::ManifestEntryV2>(&value?)?.try_into(
80                            metadata.partition_spec.spec_id(),
81                            &partition_type,
82                            &metadata.schema,
83                        )
84                    })
85                    .collect::<Result<Vec<_>>>()?
86            }
87        };
88
89        Ok((metadata, entries))
90    }
91
92    /// Parse manifest from bytes of avro file.
93    pub fn parse_avro(bs: &[u8]) -> Result<Self> {
94        let (metadata, entries) = Self::try_from_avro_bytes(bs)?;
95        Ok(Self::new(metadata, entries))
96    }
97
98    /// Entries slice.
99    pub fn entries(&self) -> &[ManifestEntryRef] {
100        &self.entries
101    }
102
103    /// Get metadata.
104    pub fn metadata(&self) -> &ManifestMetadata {
105        &self.metadata
106    }
107
108    /// Consume this Manifest, returning its constituent parts
109    pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
110        let Self { entries, metadata } = self;
111        (entries, metadata)
112    }
113
114    /// Constructor from [`ManifestMetadata`] and [`ManifestEntry`]s.
115    pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> Self {
116        Self {
117            metadata,
118            entries: entries.into_iter().map(Arc::new).collect(),
119        }
120    }
121}
122
123/// Serialize a DataFile to a JSON string.
124pub fn serialize_data_file_to_json(
125    data_file: DataFile,
126    partition_type: &super::StructType,
127    format_version: FormatVersion,
128) -> Result<String> {
129    let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
130    serde_json::to_string(&serde).map_err(|e| {
131        Error::new(
132            ErrorKind::DataInvalid,
133            "Failed to serialize DataFile to JSON!".to_string(),
134        )
135        .with_source(e)
136    })
137}
138
139/// Deserialize a DataFile from a JSON string.
140pub fn deserialize_data_file_from_json(
141    json: &str,
142    partition_spec_id: i32,
143    partition_type: &super::StructType,
144    schema: &Schema,
145) -> Result<DataFile> {
146    let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
147        Error::new(
148            ErrorKind::DataInvalid,
149            "Failed to deserialize JSON to DataFile!".to_string(),
150        )
151        .with_source(e)
152    })?;
153
154    serde.try_into(partition_spec_id, partition_type, schema)
155}
156
157#[cfg(test)]
158mod tests {
159    use std::collections::HashMap;
160    use std::fs;
161    use std::sync::Arc;
162
163    use serde_json::Value;
164    use tempfile::TempDir;
165
166    use super::*;
167    use crate::io::FileIOBuilder;
168    use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};
169
170    #[tokio::test]
171    async fn test_parse_manifest_v2_unpartition() {
172        let schema = Arc::new(
173            Schema::builder()
174                .with_fields(vec![
175                    // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz
176                    Arc::new(NestedField::optional(
177                        1,
178                        "id",
179                        Type::Primitive(PrimitiveType::Long),
180                    )),
181                    Arc::new(NestedField::optional(
182                        2,
183                        "v_int",
184                        Type::Primitive(PrimitiveType::Int),
185                    )),
186                    Arc::new(NestedField::optional(
187                        3,
188                        "v_long",
189                        Type::Primitive(PrimitiveType::Long),
190                    )),
191                    Arc::new(NestedField::optional(
192                        4,
193                        "v_float",
194                        Type::Primitive(PrimitiveType::Float),
195                    )),
196                    Arc::new(NestedField::optional(
197                        5,
198                        "v_double",
199                        Type::Primitive(PrimitiveType::Double),
200                    )),
201                    Arc::new(NestedField::optional(
202                        6,
203                        "v_varchar",
204                        Type::Primitive(PrimitiveType::String),
205                    )),
206                    Arc::new(NestedField::optional(
207                        7,
208                        "v_bool",
209                        Type::Primitive(PrimitiveType::Boolean),
210                    )),
211                    Arc::new(NestedField::optional(
212                        8,
213                        "v_date",
214                        Type::Primitive(PrimitiveType::Date),
215                    )),
216                    Arc::new(NestedField::optional(
217                        9,
218                        "v_timestamp",
219                        Type::Primitive(PrimitiveType::Timestamptz),
220                    )),
221                    Arc::new(NestedField::optional(
222                        10,
223                        "v_decimal",
224                        Type::Primitive(PrimitiveType::Decimal {
225                            precision: 36,
226                            scale: 10,
227                        }),
228                    )),
229                    Arc::new(NestedField::optional(
230                        11,
231                        "v_ts_ntz",
232                        Type::Primitive(PrimitiveType::Timestamp),
233                    )),
234                    Arc::new(NestedField::optional(
235                        12,
236                        "v_ts_ns_ntz",
237                        Type::Primitive(PrimitiveType::TimestampNs),
238                    )),
239                ])
240                .build()
241                .unwrap(),
242        );
243        let metadata = ManifestMetadata {
244            schema_id: 0,
245            schema: schema.clone(),
246            partition_spec: PartitionSpec::builder(schema)
247                .with_spec_id(0)
248                .build()
249                .unwrap(),
250            content: ManifestContentType::Data,
251            format_version: FormatVersion::V2,
252        };
253        let mut entries = vec![
254                ManifestEntry {
255                    status: ManifestStatus::Added,
256                    snapshot_id: None,
257                    sequence_number: None,
258                    file_sequence_number: None,
259                    data_file: DataFile {content:DataContentType::Data,file_path:"s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),file_format:DataFileFormat::Parquet,partition:Struct::empty(),record_count:1,file_size_in_bytes:5442,column_sizes:HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),value_counts:HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),null_value_counts:HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),nan_value_counts:HashMap::new(),lower_bounds:HashMap::new(),upper_bounds:HashMap::new(),key_metadata:None,split_offsets:vec![4],equality_ids:Some(Vec::new()),sort_order_id:None, partition_spec_id: 0,first_row_id: None,referenced_data_file: None,content_offset: None,content_size_in_bytes: None }
260                }
261            ];
262
263        // write manifest to file
264        let tmp_dir = TempDir::new().unwrap();
265        let path = tmp_dir.path().join("test_manifest.avro");
266        let io = FileIOBuilder::new_fs_io().build().unwrap();
267        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
268        let mut writer = ManifestWriterBuilder::new(
269            output_file,
270            Some(1),
271            None,
272            metadata.schema.clone(),
273            metadata.partition_spec.clone(),
274        )
275        .build_v2_data();
276        for entry in &entries {
277            writer.add_entry(entry.clone()).unwrap();
278        }
279        writer.write_manifest_file().await.unwrap();
280
281        // read back the manifest file and check the content
282        let actual_manifest =
283            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
284                .unwrap();
285        // The snapshot id is assigned when the entry is added to the manifest.
286        entries[0].snapshot_id = Some(1);
287        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
288    }
289
290    #[tokio::test]
291    async fn test_parse_manifest_v2_partition() {
292        let schema = Arc::new(
293            Schema::builder()
294                .with_fields(vec![
295                    Arc::new(NestedField::optional(
296                        1,
297                        "id",
298                        Type::Primitive(PrimitiveType::Long),
299                    )),
300                    Arc::new(NestedField::optional(
301                        2,
302                        "v_int",
303                        Type::Primitive(PrimitiveType::Int),
304                    )),
305                    Arc::new(NestedField::optional(
306                        3,
307                        "v_long",
308                        Type::Primitive(PrimitiveType::Long),
309                    )),
310                    Arc::new(NestedField::optional(
311                        4,
312                        "v_float",
313                        Type::Primitive(PrimitiveType::Float),
314                    )),
315                    Arc::new(NestedField::optional(
316                        5,
317                        "v_double",
318                        Type::Primitive(PrimitiveType::Double),
319                    )),
320                    Arc::new(NestedField::optional(
321                        6,
322                        "v_varchar",
323                        Type::Primitive(PrimitiveType::String),
324                    )),
325                    Arc::new(NestedField::optional(
326                        7,
327                        "v_bool",
328                        Type::Primitive(PrimitiveType::Boolean),
329                    )),
330                    Arc::new(NestedField::optional(
331                        8,
332                        "v_date",
333                        Type::Primitive(PrimitiveType::Date),
334                    )),
335                    Arc::new(NestedField::optional(
336                        9,
337                        "v_timestamp",
338                        Type::Primitive(PrimitiveType::Timestamptz),
339                    )),
340                    Arc::new(NestedField::optional(
341                        10,
342                        "v_decimal",
343                        Type::Primitive(PrimitiveType::Decimal {
344                            precision: 36,
345                            scale: 10,
346                        }),
347                    )),
348                    Arc::new(NestedField::optional(
349                        11,
350                        "v_ts_ntz",
351                        Type::Primitive(PrimitiveType::Timestamp),
352                    )),
353                    Arc::new(NestedField::optional(
354                        12,
355                        "v_ts_ns_ntz",
356                        Type::Primitive(PrimitiveType::TimestampNs),
357                    )),
358                ])
359                .build()
360                .unwrap(),
361        );
362        let metadata = ManifestMetadata {
363            schema_id: 0,
364            schema: schema.clone(),
365            partition_spec: PartitionSpec::builder(schema)
366                .with_spec_id(0)
367                .add_partition_field("v_int", "v_int", Transform::Identity)
368                .unwrap()
369                .add_partition_field("v_long", "v_long", Transform::Identity)
370                .unwrap()
371                .build()
372                .unwrap(),
373            content: ManifestContentType::Data,
374            format_version: FormatVersion::V2,
375        };
376        let mut entries = vec![ManifestEntry {
377                status: ManifestStatus::Added,
378                snapshot_id: None,
379                sequence_number: None,
380                file_sequence_number: None,
381                data_file: DataFile {
382                    content: DataContentType::Data,
383                    file_format: DataFileFormat::Parquet,
384                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
385                    partition: Struct::from_iter(
386                        vec![
387                            Some(Literal::int(1)),
388                            Some(Literal::long(1000)),
389                        ]
390                            .into_iter()
391                    ),
392                    record_count: 1,
393                    file_size_in_bytes: 5442,
394                    column_sizes: HashMap::from([
395                        (0, 73),
396                        (6, 34),
397                        (2, 73),
398                        (7, 61),
399                        (3, 61),
400                        (5, 62),
401                        (9, 79),
402                        (10, 73),
403                        (1, 61),
404                        (4, 73),
405                        (8, 73)
406                    ]),
407                    value_counts: HashMap::from([
408                        (4, 1),
409                        (5, 1),
410                        (2, 1),
411                        (0, 1),
412                        (3, 1),
413                        (6, 1),
414                        (8, 1),
415                        (1, 1),
416                        (10, 1),
417                        (7, 1),
418                        (9, 1)
419                    ]),
420                    null_value_counts: HashMap::from([
421                        (1, 0),
422                        (6, 0),
423                        (2, 0),
424                        (8, 0),
425                        (0, 0),
426                        (3, 0),
427                        (5, 0),
428                        (9, 0),
429                        (7, 0),
430                        (4, 0),
431                        (10, 0)
432                    ]),
433                    nan_value_counts: HashMap::new(),
434                    lower_bounds: HashMap::new(),
435                    upper_bounds: HashMap::new(),
436                    key_metadata: None,
437                    split_offsets: vec![4],
438                    equality_ids: Some(Vec::new()),
439                    sort_order_id: None,
440                    partition_spec_id: 0,
441                    first_row_id: None,
442                    referenced_data_file: None,
443                    content_offset: None,
444                    content_size_in_bytes: None,
445                },
446            }];
447
448        // write manifest to file and check the return manifest file.
449        let tmp_dir = TempDir::new().unwrap();
450        let path = tmp_dir.path().join("test_manifest.avro");
451        let io = FileIOBuilder::new_fs_io().build().unwrap();
452        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
453        let mut writer = ManifestWriterBuilder::new(
454            output_file,
455            Some(2),
456            None,
457            metadata.schema.clone(),
458            metadata.partition_spec.clone(),
459        )
460        .build_v2_data();
461        for entry in &entries {
462            writer.add_entry(entry.clone()).unwrap();
463        }
464        let manifest_file = writer.write_manifest_file().await.unwrap();
465        assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER);
466        assert_eq!(
467            manifest_file.min_sequence_number,
468            UNASSIGNED_SEQUENCE_NUMBER
469        );
470
471        // read back the manifest file and check the content
472        let actual_manifest =
473            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
474                .unwrap();
475        // The snapshot id is assigned when the entry is added to the manifest.
476        entries[0].snapshot_id = Some(2);
477        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
478    }
479
480    #[tokio::test]
481    async fn test_parse_manifest_v1_unpartition() {
482        let schema = Arc::new(
483            Schema::builder()
484                .with_schema_id(1)
485                .with_fields(vec![
486                    Arc::new(NestedField::optional(
487                        1,
488                        "id",
489                        Type::Primitive(PrimitiveType::Int),
490                    )),
491                    Arc::new(NestedField::optional(
492                        2,
493                        "data",
494                        Type::Primitive(PrimitiveType::String),
495                    )),
496                    Arc::new(NestedField::optional(
497                        3,
498                        "comment",
499                        Type::Primitive(PrimitiveType::String),
500                    )),
501                ])
502                .build()
503                .unwrap(),
504        );
505        let metadata = ManifestMetadata {
506            schema_id: 1,
507            schema: schema.clone(),
508            partition_spec: PartitionSpec::builder(schema)
509                .with_spec_id(0)
510                .build()
511                .unwrap(),
512            content: ManifestContentType::Data,
513            format_version: FormatVersion::V1,
514        };
515        let mut entries = vec![ManifestEntry {
516                status: ManifestStatus::Added,
517                snapshot_id: Some(0),
518                sequence_number: Some(0),
519                file_sequence_number: Some(0),
520                data_file: DataFile {
521                    content: DataContentType::Data,
522                    file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
523                    file_format: DataFileFormat::Parquet,
524                    partition: Struct::empty(),
525                    record_count: 1,
526                    file_size_in_bytes: 875,
527                    column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
528                    value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
529                    null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
530                    nan_value_counts: HashMap::new(),
531                    lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
532                    upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
533                    key_metadata: None,
534                    split_offsets: vec![4],
535                    equality_ids: None,
536                    sort_order_id: Some(0),
537                    partition_spec_id: 0,
538                    first_row_id: None,
539                    referenced_data_file: None,
540                    content_offset: None,
541                    content_size_in_bytes: None,
542                }
543            }];
544
545        // write manifest to file
546        let tmp_dir = TempDir::new().unwrap();
547        let path = tmp_dir.path().join("test_manifest.avro");
548        let io = FileIOBuilder::new_fs_io().build().unwrap();
549        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
550        let mut writer = ManifestWriterBuilder::new(
551            output_file,
552            Some(3),
553            None,
554            metadata.schema.clone(),
555            metadata.partition_spec.clone(),
556        )
557        .build_v1();
558        for entry in &entries {
559            writer.add_entry(entry.clone()).unwrap();
560        }
561        writer.write_manifest_file().await.unwrap();
562
563        // read back the manifest file and check the content
564        let actual_manifest =
565            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
566                .unwrap();
567        // The snapshot id is assigned when the entry is added to the manifest.
568        entries[0].snapshot_id = Some(3);
569        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
570    }
571
572    #[tokio::test]
573    async fn test_parse_manifest_v1_partition() {
574        let schema = Arc::new(
575            Schema::builder()
576                .with_fields(vec![
577                    Arc::new(NestedField::optional(
578                        1,
579                        "id",
580                        Type::Primitive(PrimitiveType::Long),
581                    )),
582                    Arc::new(NestedField::optional(
583                        2,
584                        "data",
585                        Type::Primitive(PrimitiveType::String),
586                    )),
587                    Arc::new(NestedField::optional(
588                        3,
589                        "category",
590                        Type::Primitive(PrimitiveType::String),
591                    )),
592                ])
593                .build()
594                .unwrap(),
595        );
596        let metadata = ManifestMetadata {
597            schema_id: 0,
598            schema: schema.clone(),
599            partition_spec: PartitionSpec::builder(schema)
600                .add_partition_field("category", "category", Transform::Identity)
601                .unwrap()
602                .build()
603                .unwrap(),
604            content: ManifestContentType::Data,
605            format_version: FormatVersion::V1,
606        };
607        let mut entries = vec![
608                ManifestEntry {
609                    status: ManifestStatus::Added,
610                    snapshot_id: Some(0),
611                    sequence_number: Some(0),
612                    file_sequence_number: Some(0),
613                    data_file: DataFile {
614                        content: DataContentType::Data,
615                        file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
616                        file_format: DataFileFormat::Parquet,
617                        partition: Struct::from_iter(
618                            vec![
619                                Some(
620                                    Literal::string("x"),
621                                ),
622                            ]
623                                .into_iter()
624                        ),
625                        record_count: 1,
626                        file_size_in_bytes: 874,
627                        column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
628                        value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
629                        null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
630                        nan_value_counts: HashMap::new(),
631                        lower_bounds: HashMap::from([
632                        (1, Datum::long(1)),
633                        (2, Datum::string("a")),
634                        (3, Datum::string("x"))
635                        ]),
636                        upper_bounds: HashMap::from([
637                        (1, Datum::long(1)),
638                        (2, Datum::string("a")),
639                        (3, Datum::string("x"))
640                        ]),
641                        key_metadata: None,
642                        split_offsets: vec![4],
643                        equality_ids: None,
644                        sort_order_id: Some(0),
645                        partition_spec_id: 0,
646                        first_row_id: None,
647                        referenced_data_file: None,
648                        content_offset: None,
649                        content_size_in_bytes: None,
650                    },
651                }
652            ];
653
654        // write manifest to file
655        let tmp_dir = TempDir::new().unwrap();
656        let path = tmp_dir.path().join("test_manifest.avro");
657        let io = FileIOBuilder::new_fs_io().build().unwrap();
658        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
659        let mut writer = ManifestWriterBuilder::new(
660            output_file,
661            Some(2),
662            None,
663            metadata.schema.clone(),
664            metadata.partition_spec.clone(),
665        )
666        .build_v1();
667        for entry in &entries {
668            writer.add_entry(entry.clone()).unwrap();
669        }
670        let manifest_file = writer.write_manifest_file().await.unwrap();
671        let partitions = manifest_file.partitions.unwrap();
672        assert_eq!(partitions.len(), 1);
673        assert_eq!(
674            partitions[0].clone().lower_bound.unwrap(),
675            Datum::string("x").to_bytes().unwrap()
676        );
677        assert_eq!(
678            partitions[0].clone().upper_bound.unwrap(),
679            Datum::string("x").to_bytes().unwrap()
680        );
681
682        // read back the manifest file and check the content
683        let actual_manifest =
684            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
685                .unwrap();
686        // The snapshot id is assigned when the entry is added to the manifest.
687        entries[0].snapshot_id = Some(2);
688        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
689    }
690
691    #[tokio::test]
692    async fn test_parse_manifest_with_schema_evolution() {
693        let schema = Arc::new(
694            Schema::builder()
695                .with_fields(vec![
696                    Arc::new(NestedField::optional(
697                        1,
698                        "id",
699                        Type::Primitive(PrimitiveType::Long),
700                    )),
701                    Arc::new(NestedField::optional(
702                        2,
703                        "v_int",
704                        Type::Primitive(PrimitiveType::Int),
705                    )),
706                ])
707                .build()
708                .unwrap(),
709        );
710        let metadata = ManifestMetadata {
711            schema_id: 0,
712            schema: schema.clone(),
713            partition_spec: PartitionSpec::builder(schema)
714                .with_spec_id(0)
715                .build()
716                .unwrap(),
717            content: ManifestContentType::Data,
718            format_version: FormatVersion::V2,
719        };
720        let entries = vec![ManifestEntry {
721                status: ManifestStatus::Added,
722                snapshot_id: None,
723                sequence_number: None,
724                file_sequence_number: None,
725                data_file: DataFile {
726                    content: DataContentType::Data,
727                    file_format: DataFileFormat::Parquet,
728                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
729                    partition: Struct::empty(),
730                    record_count: 1,
731                    file_size_in_bytes: 5442,
732                    column_sizes: HashMap::from([
733                        (1, 61),
734                        (2, 73),
735                        (3, 61),
736                    ]),
737                    value_counts: HashMap::default(),
738                    null_value_counts: HashMap::default(),
739                    nan_value_counts: HashMap::new(),
740                    lower_bounds: HashMap::from([
741                        (1, Datum::long(1)),
742                        (2, Datum::int(2)),
743                        (3, Datum::string("x"))
744                    ]),
745                    upper_bounds: HashMap::from([
746                        (1, Datum::long(1)),
747                        (2, Datum::int(2)),
748                        (3, Datum::string("x"))
749                    ]),
750                    key_metadata: None,
751                    split_offsets: vec![4],
752                    equality_ids: None,
753                    sort_order_id: None,
754                    partition_spec_id: 0,
755                    first_row_id: None,
756                    referenced_data_file: None,
757                    content_offset: None,
758                    content_size_in_bytes: None,
759                },
760            }];
761
762        // write manifest to file
763        let tmp_dir = TempDir::new().unwrap();
764        let path = tmp_dir.path().join("test_manifest.avro");
765        let io = FileIOBuilder::new_fs_io().build().unwrap();
766        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
767        let mut writer = ManifestWriterBuilder::new(
768            output_file,
769            Some(2),
770            None,
771            metadata.schema.clone(),
772            metadata.partition_spec.clone(),
773        )
774        .build_v2_data();
775        for entry in &entries {
776            writer.add_entry(entry.clone()).unwrap();
777        }
778        writer.write_manifest_file().await.unwrap();
779
780        // read back the manifest file and check the content
781        let actual_manifest =
782            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
783                .unwrap();
784
785        // Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and
786        // other parts should be same.
787        // The snapshot id is assigned when the entry is added to the manifest.
788        let schema = Arc::new(
789            Schema::builder()
790                .with_fields(vec![
791                    Arc::new(NestedField::optional(
792                        1,
793                        "id",
794                        Type::Primitive(PrimitiveType::Long),
795                    )),
796                    Arc::new(NestedField::optional(
797                        2,
798                        "v_int",
799                        Type::Primitive(PrimitiveType::Int),
800                    )),
801                ])
802                .build()
803                .unwrap(),
804        );
805        let expected_manifest = Manifest {
806            metadata: ManifestMetadata {
807                schema_id: 0,
808                schema: schema.clone(),
809                partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
810                content: ManifestContentType::Data,
811                format_version: FormatVersion::V2,
812            },
813            entries: vec![Arc::new(ManifestEntry {
814                status: ManifestStatus::Added,
815                snapshot_id: Some(2),
816                sequence_number: None,
817                file_sequence_number: None,
818                data_file: DataFile {
819                    content: DataContentType::Data,
820                    file_format: DataFileFormat::Parquet,
821                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
822                    partition: Struct::empty(),
823                    record_count: 1,
824                    file_size_in_bytes: 5442,
825                    column_sizes: HashMap::from([
826                        (1, 61),
827                        (2, 73),
828                        (3, 61),
829                    ]),
830                    value_counts: HashMap::default(),
831                    null_value_counts: HashMap::default(),
832                    nan_value_counts: HashMap::new(),
833                    lower_bounds: HashMap::from([
834                        (1, Datum::long(1)),
835                        (2, Datum::int(2)),
836                    ]),
837                    upper_bounds: HashMap::from([
838                        (1, Datum::long(1)),
839                        (2, Datum::int(2)),
840                    ]),
841                    key_metadata: None,
842                    split_offsets: vec![4],
843                    equality_ids: None,
844                    sort_order_id: None,
845                    partition_spec_id: 0,
846                    first_row_id: None,
847                    referenced_data_file: None,
848                    content_offset: None,
849                    content_size_in_bytes: None,
850                },
851            })],
852        };
853
854        assert_eq!(actual_manifest, expected_manifest);
855    }
856
857    #[tokio::test]
858    async fn test_manifest_summary() {
859        let schema = Arc::new(
860            Schema::builder()
861                .with_fields(vec![
862                    Arc::new(NestedField::optional(
863                        1,
864                        "time",
865                        Type::Primitive(PrimitiveType::Date),
866                    )),
867                    Arc::new(NestedField::optional(
868                        2,
869                        "v_float",
870                        Type::Primitive(PrimitiveType::Float),
871                    )),
872                    Arc::new(NestedField::optional(
873                        3,
874                        "v_double",
875                        Type::Primitive(PrimitiveType::Double),
876                    )),
877                ])
878                .build()
879                .unwrap(),
880        );
881        let partition_spec = PartitionSpec::builder(schema.clone())
882            .with_spec_id(0)
883            .add_partition_field("time", "year_of_time", Transform::Year)
884            .unwrap()
885            .add_partition_field("v_float", "f", Transform::Identity)
886            .unwrap()
887            .add_partition_field("v_double", "d", Transform::Identity)
888            .unwrap()
889            .build()
890            .unwrap();
891        let metadata = ManifestMetadata {
892            schema_id: 0,
893            schema,
894            partition_spec,
895            content: ManifestContentType::Data,
896            format_version: FormatVersion::V2,
897        };
898        let entries = vec![
899            ManifestEntry {
900                status: ManifestStatus::Added,
901                snapshot_id: None,
902                sequence_number: None,
903                file_sequence_number: None,
904                data_file: DataFile {
905                    content: DataContentType::Data,
906                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
907                    file_format: DataFileFormat::Parquet,
908                    partition: Struct::from_iter(
909                        vec![
910                            Some(Literal::int(2021)),
911                            Some(Literal::float(1.0)),
912                            Some(Literal::double(2.0)),
913                        ]
914                    ),
915                    record_count: 1,
916                    file_size_in_bytes: 5442,
917                    column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
918                    value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
919                    null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
920                    nan_value_counts: HashMap::new(),
921                    lower_bounds: HashMap::new(),
922                    upper_bounds: HashMap::new(),
923                    key_metadata: None,
924                    split_offsets: vec![4],
925                    equality_ids: None,
926                    sort_order_id: None,
927                    partition_spec_id: 0,
928                    first_row_id: None,
929                    referenced_data_file: None,
930                    content_offset: None,
931                    content_size_in_bytes: None,
932                }
933            },
934                ManifestEntry {
935                    status: ManifestStatus::Added,
936                    snapshot_id: None,
937                    sequence_number: None,
938                    file_sequence_number: None,
939                    data_file: DataFile {
940                        content: DataContentType::Data,
941                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
942                        file_format: DataFileFormat::Parquet,
943                        partition: Struct::from_iter(
944                            vec![
945                                Some(Literal::int(1111)),
946                                Some(Literal::float(15.5)),
947                                Some(Literal::double(25.5)),
948                            ]
949                        ),
950                        record_count: 1,
951                        file_size_in_bytes: 5442,
952                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
953                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
954                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
955                        nan_value_counts: HashMap::new(),
956                        lower_bounds: HashMap::new(),
957                        upper_bounds: HashMap::new(),
958                        key_metadata: None,
959                        split_offsets: vec![4],
960                        equality_ids: None,
961                        sort_order_id: None,
962                        partition_spec_id: 0,
963                        first_row_id: None,
964                        referenced_data_file: None,
965                        content_offset: None,
966                        content_size_in_bytes: None,
967                    }
968                },
969                ManifestEntry {
970                    status: ManifestStatus::Added,
971                    snapshot_id: None,
972                    sequence_number: None,
973                    file_sequence_number: None,
974                    data_file: DataFile {
975                        content: DataContentType::Data,
976                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
977                        file_format: DataFileFormat::Parquet,
978                        partition: Struct::from_iter(
979                            vec![
980                                Some(Literal::int(1211)),
981                                Some(Literal::float(f32::NAN)),
982                                Some(Literal::double(1.0)),
983                            ]
984                        ),
985                        record_count: 1,
986                        file_size_in_bytes: 5442,
987                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
988                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
989                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
990                        nan_value_counts: HashMap::new(),
991                        lower_bounds: HashMap::new(),
992                        upper_bounds: HashMap::new(),
993                        key_metadata: None,
994                        split_offsets: vec![4],
995                        equality_ids: None,
996                        sort_order_id: None,
997                        partition_spec_id: 0,
998                        first_row_id: None,
999                        referenced_data_file: None,
1000                        content_offset: None,
1001                        content_size_in_bytes: None,
1002                    }
1003                },
1004                ManifestEntry {
1005                    status: ManifestStatus::Added,
1006                    snapshot_id: None,
1007                    sequence_number: None,
1008                    file_sequence_number: None,
1009                    data_file: DataFile {
1010                        content: DataContentType::Data,
1011                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1012                        file_format: DataFileFormat::Parquet,
1013                        partition: Struct::from_iter(
1014                            vec![
1015                                Some(Literal::int(1111)),
1016                                None,
1017                                Some(Literal::double(11.0)),
1018                            ]
1019                        ),
1020                        record_count: 1,
1021                        file_size_in_bytes: 5442,
1022                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1023                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1024                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1025                        nan_value_counts: HashMap::new(),
1026                        lower_bounds: HashMap::new(),
1027                        upper_bounds: HashMap::new(),
1028                        key_metadata: None,
1029                        split_offsets: vec![4],
1030                        equality_ids: None,
1031                        sort_order_id: None,
1032                        partition_spec_id: 0,
1033                        first_row_id: None,
1034                        referenced_data_file: None,
1035                        content_offset: None,
1036                        content_size_in_bytes: None,
1037                    }
1038                },
1039        ];
1040
1041        // write manifest to file
1042        let tmp_dir = TempDir::new().unwrap();
1043        let path = tmp_dir.path().join("test_manifest.avro");
1044        let io = FileIOBuilder::new_fs_io().build().unwrap();
1045        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1046        let mut writer = ManifestWriterBuilder::new(
1047            output_file,
1048            Some(1),
1049            None,
1050            metadata.schema.clone(),
1051            metadata.partition_spec.clone(),
1052        )
1053        .build_v2_data();
1054        for entry in &entries {
1055            writer.add_entry(entry.clone()).unwrap();
1056        }
1057        let res = writer.write_manifest_file().await.unwrap();
1058
1059        let partitions = res.partitions.unwrap();
1060
1061        assert_eq!(partitions.len(), 3);
1062        assert_eq!(
1063            partitions[0].clone().lower_bound.unwrap(),
1064            Datum::int(1111).to_bytes().unwrap()
1065        );
1066        assert_eq!(
1067            partitions[0].clone().upper_bound.unwrap(),
1068            Datum::int(2021).to_bytes().unwrap()
1069        );
1070        assert!(!partitions[0].clone().contains_null);
1071        assert_eq!(partitions[0].clone().contains_nan, Some(false));
1072
1073        assert_eq!(
1074            partitions[1].clone().lower_bound.unwrap(),
1075            Datum::float(1.0).to_bytes().unwrap()
1076        );
1077        assert_eq!(
1078            partitions[1].clone().upper_bound.unwrap(),
1079            Datum::float(15.5).to_bytes().unwrap()
1080        );
1081        assert!(partitions[1].clone().contains_null);
1082        assert_eq!(partitions[1].clone().contains_nan, Some(true));
1083
1084        assert_eq!(
1085            partitions[2].clone().lower_bound.unwrap(),
1086            Datum::double(1.0).to_bytes().unwrap()
1087        );
1088        assert_eq!(
1089            partitions[2].clone().upper_bound.unwrap(),
1090            Datum::double(25.5).to_bytes().unwrap()
1091        );
1092        assert!(!partitions[2].clone().contains_null);
1093        assert_eq!(partitions[2].clone().contains_nan, Some(false));
1094    }
1095
1096    #[test]
1097    fn test_data_file_serialization() {
1098        // Create a simple schema
1099        let schema = Schema::builder()
1100            .with_schema_id(1)
1101            .with_identifier_field_ids(vec![1])
1102            .with_fields(vec![
1103                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
1104                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1105            ])
1106            .build()
1107            .unwrap();
1108
1109        // Create a partition spec
1110        let partition_spec = PartitionSpec::builder(schema.clone())
1111            .with_spec_id(1)
1112            .add_partition_field("id", "id_partition", Transform::Identity)
1113            .unwrap()
1114            .build()
1115            .unwrap();
1116
1117        // Get partition type from the partition spec
1118        let partition_type = partition_spec.partition_type(&schema).unwrap();
1119
1120        // Create a vector of DataFile objects
1121        let data_files = vec![
1122            DataFileBuilder::default()
1123                .content(DataContentType::Data)
1124                .file_format(DataFileFormat::Parquet)
1125                .file_path("path/to/file1.parquet".to_string())
1126                .file_size_in_bytes(1024)
1127                .record_count(100)
1128                .partition_spec_id(1)
1129                .partition(Struct::empty())
1130                .column_sizes(HashMap::from([(1, 512), (2, 1024)]))
1131                .value_counts(HashMap::from([(1, 100), (2, 500)]))
1132                .null_value_counts(HashMap::from([(1, 0), (2, 1)]))
1133                .build()
1134                .unwrap(),
1135            DataFileBuilder::default()
1136                .content(DataContentType::Data)
1137                .file_format(DataFileFormat::Parquet)
1138                .file_path("path/to/file2.parquet".to_string())
1139                .file_size_in_bytes(2048)
1140                .record_count(200)
1141                .partition_spec_id(1)
1142                .partition(Struct::empty())
1143                .column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
1144                .value_counts(HashMap::from([(1, 200), (2, 600)]))
1145                .null_value_counts(HashMap::from([(1, 10), (2, 999)]))
1146                .build()
1147                .unwrap(),
1148        ];
1149
1150        // Serialize the DataFile objects
1151        let serialized_files = data_files
1152            .clone()
1153            .into_iter()
1154            .map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
1155            .collect::<Vec<String>>();
1156
1157        // Verify we have the expected serialized files
1158        assert_eq!(serialized_files.len(), 2);
1159        let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
1160        let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
1161        let expected_serialized_file1 = serde_json::json!({
1162            "content": 0,
1163            "file_path": "path/to/file1.parquet",
1164            "file_format": "PARQUET",
1165            "partition": {},
1166            "record_count": 100,
1167            "file_size_in_bytes": 1024,
1168            "column_sizes": [
1169                { "key": 1, "value": 512 },
1170                { "key": 2, "value": 1024 }
1171            ],
1172            "value_counts": [
1173                { "key": 1, "value": 100 },
1174                { "key": 2, "value": 500 }
1175            ],
1176            "null_value_counts": [
1177                { "key": 1, "value": 0 },
1178                { "key": 2, "value": 1 }
1179            ],
1180            "nan_value_counts": [],
1181            "lower_bounds": [],
1182            "upper_bounds": [],
1183            "key_metadata": null,
1184            "split_offsets": [],
1185            "equality_ids": null,
1186            "sort_order_id": null,
1187            "first_row_id": null,
1188            "referenced_data_file": null,
1189            "content_offset": null,
1190            "content_size_in_bytes": null
1191        });
1192        let expected_serialized_file2 = serde_json::json!({
1193            "content": 0,
1194            "file_path": "path/to/file2.parquet",
1195            "file_format": "PARQUET",
1196            "partition": {},
1197            "record_count": 200,
1198            "file_size_in_bytes": 2048,
1199            "column_sizes": [
1200                { "key": 1, "value": 1024 },
1201                { "key": 2, "value": 2048 }
1202            ],
1203            "value_counts": [
1204                { "key": 1, "value": 200 },
1205                { "key": 2, "value": 600 }
1206            ],
1207            "null_value_counts": [
1208                { "key": 1, "value": 10 },
1209                { "key": 2, "value": 999 }
1210            ],
1211            "nan_value_counts": [],
1212            "lower_bounds": [],
1213            "upper_bounds": [],
1214            "key_metadata": null,
1215            "split_offsets": [],
1216            "equality_ids": null,
1217            "sort_order_id": null,
1218            "first_row_id": null,
1219            "referenced_data_file": null,
1220            "content_offset": null,
1221            "content_size_in_bytes": null
1222        });
1223        assert_eq!(pretty_json1, expected_serialized_file1);
1224        assert_eq!(pretty_json2, expected_serialized_file2);
1225
1226        // Now deserialize the JSON strings back into DataFile objects
1227        let deserialized_files: Vec<DataFile> = serialized_files
1228            .into_iter()
1229            .map(|json| {
1230                deserialize_data_file_from_json(
1231                    &json,
1232                    partition_spec.spec_id(),
1233                    &partition_type,
1234                    &schema,
1235                )
1236                .unwrap()
1237            })
1238            .collect();
1239
1240        // Verify we have the expected number of deserialized files
1241        assert_eq!(deserialized_files.len(), 2);
1242        let deserialized_data_file1 = deserialized_files.first().unwrap();
1243        let deserialized_data_file2 = deserialized_files.get(1).unwrap();
1244        let original_data_file1 = data_files.first().unwrap();
1245        let original_data_file2 = data_files.get(1).unwrap();
1246
1247        assert_eq!(deserialized_data_file1, original_data_file1);
1248        assert_eq!(deserialized_data_file2, original_data_file2);
1249    }
1250}