1mod _serde;
19
20mod data_file;
21pub use data_file::*;
22mod entry;
23pub use entry::*;
24mod metadata;
25pub use metadata::*;
26mod writer;
27use std::sync::Arc;
28
29use apache_avro::{Reader as AvroReader, from_value};
30pub use writer::*;
31
32use super::{
33 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
34 UNASSIGNED_SEQUENCE_NUMBER,
35};
36use crate::error::Result;
37use crate::{Error, ErrorKind};
38
39#[derive(Debug, PartialEq, Eq, Clone)]
41pub struct Manifest {
42 metadata: ManifestMetadata,
43 entries: Vec<ManifestEntryRef>,
44}
45
46impl Manifest {
47 pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec<ManifestEntry>)> {
49 let reader = AvroReader::new(bs)?;
50
51 let meta = reader.user_metadata();
53 let metadata = ManifestMetadata::parse(meta)?;
54
55 let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?;
57
58 let entries = match metadata.format_version {
59 FormatVersion::V1 => {
60 let schema = manifest_schema_v1(&partition_type)?;
61 let reader = AvroReader::with_schema(&schema, bs)?;
62 reader
63 .into_iter()
64 .map(|value| {
65 from_value::<_serde::ManifestEntryV1>(&value?)?.try_into(
66 metadata.partition_spec.spec_id(),
67 &partition_type,
68 &metadata.schema,
69 )
70 })
71 .collect::<Result<Vec<_>>>()?
72 }
73 FormatVersion::V2 => {
74 let schema = manifest_schema_v2(&partition_type)?;
75 let reader = AvroReader::with_schema(&schema, bs)?;
76 reader
77 .into_iter()
78 .map(|value| {
79 from_value::<_serde::ManifestEntryV2>(&value?)?.try_into(
80 metadata.partition_spec.spec_id(),
81 &partition_type,
82 &metadata.schema,
83 )
84 })
85 .collect::<Result<Vec<_>>>()?
86 }
87 };
88
89 Ok((metadata, entries))
90 }
91
92 pub fn parse_avro(bs: &[u8]) -> Result<Self> {
94 let (metadata, entries) = Self::try_from_avro_bytes(bs)?;
95 Ok(Self::new(metadata, entries))
96 }
97
98 pub fn entries(&self) -> &[ManifestEntryRef] {
100 &self.entries
101 }
102
103 pub fn metadata(&self) -> &ManifestMetadata {
105 &self.metadata
106 }
107
108 pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
110 let Self { entries, metadata } = self;
111 (entries, metadata)
112 }
113
114 pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> Self {
116 Self {
117 metadata,
118 entries: entries.into_iter().map(Arc::new).collect(),
119 }
120 }
121}
122
123pub fn serialize_data_file_to_json(
125 data_file: DataFile,
126 partition_type: &super::StructType,
127 format_version: FormatVersion,
128) -> Result<String> {
129 let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
130 serde_json::to_string(&serde).map_err(|e| {
131 Error::new(
132 ErrorKind::DataInvalid,
133 "Failed to serialize DataFile to JSON!".to_string(),
134 )
135 .with_source(e)
136 })
137}
138
139pub fn deserialize_data_file_from_json(
141 json: &str,
142 partition_spec_id: i32,
143 partition_type: &super::StructType,
144 schema: &Schema,
145) -> Result<DataFile> {
146 let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
147 Error::new(
148 ErrorKind::DataInvalid,
149 "Failed to deserialize JSON to DataFile!".to_string(),
150 )
151 .with_source(e)
152 })?;
153
154 serde.try_into(partition_spec_id, partition_type, schema)
155}
156
157#[cfg(test)]
158mod tests {
159 use std::collections::HashMap;
160 use std::fs;
161 use std::sync::Arc;
162
163 use serde_json::Value;
164 use tempfile::TempDir;
165
166 use super::*;
167 use crate::io::FileIOBuilder;
168 use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};
169
170 #[tokio::test]
171 async fn test_parse_manifest_v2_unpartition() {
172 let schema = Arc::new(
173 Schema::builder()
174 .with_fields(vec![
175 Arc::new(NestedField::optional(
177 1,
178 "id",
179 Type::Primitive(PrimitiveType::Long),
180 )),
181 Arc::new(NestedField::optional(
182 2,
183 "v_int",
184 Type::Primitive(PrimitiveType::Int),
185 )),
186 Arc::new(NestedField::optional(
187 3,
188 "v_long",
189 Type::Primitive(PrimitiveType::Long),
190 )),
191 Arc::new(NestedField::optional(
192 4,
193 "v_float",
194 Type::Primitive(PrimitiveType::Float),
195 )),
196 Arc::new(NestedField::optional(
197 5,
198 "v_double",
199 Type::Primitive(PrimitiveType::Double),
200 )),
201 Arc::new(NestedField::optional(
202 6,
203 "v_varchar",
204 Type::Primitive(PrimitiveType::String),
205 )),
206 Arc::new(NestedField::optional(
207 7,
208 "v_bool",
209 Type::Primitive(PrimitiveType::Boolean),
210 )),
211 Arc::new(NestedField::optional(
212 8,
213 "v_date",
214 Type::Primitive(PrimitiveType::Date),
215 )),
216 Arc::new(NestedField::optional(
217 9,
218 "v_timestamp",
219 Type::Primitive(PrimitiveType::Timestamptz),
220 )),
221 Arc::new(NestedField::optional(
222 10,
223 "v_decimal",
224 Type::Primitive(PrimitiveType::Decimal {
225 precision: 36,
226 scale: 10,
227 }),
228 )),
229 Arc::new(NestedField::optional(
230 11,
231 "v_ts_ntz",
232 Type::Primitive(PrimitiveType::Timestamp),
233 )),
234 Arc::new(NestedField::optional(
235 12,
236 "v_ts_ns_ntz",
237 Type::Primitive(PrimitiveType::TimestampNs),
238 )),
239 ])
240 .build()
241 .unwrap(),
242 );
243 let metadata = ManifestMetadata {
244 schema_id: 0,
245 schema: schema.clone(),
246 partition_spec: PartitionSpec::builder(schema)
247 .with_spec_id(0)
248 .build()
249 .unwrap(),
250 content: ManifestContentType::Data,
251 format_version: FormatVersion::V2,
252 };
253 let mut entries = vec![
254 ManifestEntry {
255 status: ManifestStatus::Added,
256 snapshot_id: None,
257 sequence_number: None,
258 file_sequence_number: None,
259 data_file: DataFile {content:DataContentType::Data,file_path:"s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),file_format:DataFileFormat::Parquet,partition:Struct::empty(),record_count:1,file_size_in_bytes:5442,column_sizes:HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),value_counts:HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),null_value_counts:HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),nan_value_counts:HashMap::new(),lower_bounds:HashMap::new(),upper_bounds:HashMap::new(),key_metadata:None,split_offsets:vec![4],equality_ids:Some(Vec::new()),sort_order_id:None, partition_spec_id: 0,first_row_id: None,referenced_data_file: None,content_offset: None,content_size_in_bytes: None }
260 }
261 ];
262
263 let tmp_dir = TempDir::new().unwrap();
265 let path = tmp_dir.path().join("test_manifest.avro");
266 let io = FileIOBuilder::new_fs_io().build().unwrap();
267 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
268 let mut writer = ManifestWriterBuilder::new(
269 output_file,
270 Some(1),
271 None,
272 metadata.schema.clone(),
273 metadata.partition_spec.clone(),
274 )
275 .build_v2_data();
276 for entry in &entries {
277 writer.add_entry(entry.clone()).unwrap();
278 }
279 writer.write_manifest_file().await.unwrap();
280
281 let actual_manifest =
283 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
284 .unwrap();
285 entries[0].snapshot_id = Some(1);
287 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
288 }
289
290 #[tokio::test]
291 async fn test_parse_manifest_v2_partition() {
292 let schema = Arc::new(
293 Schema::builder()
294 .with_fields(vec![
295 Arc::new(NestedField::optional(
296 1,
297 "id",
298 Type::Primitive(PrimitiveType::Long),
299 )),
300 Arc::new(NestedField::optional(
301 2,
302 "v_int",
303 Type::Primitive(PrimitiveType::Int),
304 )),
305 Arc::new(NestedField::optional(
306 3,
307 "v_long",
308 Type::Primitive(PrimitiveType::Long),
309 )),
310 Arc::new(NestedField::optional(
311 4,
312 "v_float",
313 Type::Primitive(PrimitiveType::Float),
314 )),
315 Arc::new(NestedField::optional(
316 5,
317 "v_double",
318 Type::Primitive(PrimitiveType::Double),
319 )),
320 Arc::new(NestedField::optional(
321 6,
322 "v_varchar",
323 Type::Primitive(PrimitiveType::String),
324 )),
325 Arc::new(NestedField::optional(
326 7,
327 "v_bool",
328 Type::Primitive(PrimitiveType::Boolean),
329 )),
330 Arc::new(NestedField::optional(
331 8,
332 "v_date",
333 Type::Primitive(PrimitiveType::Date),
334 )),
335 Arc::new(NestedField::optional(
336 9,
337 "v_timestamp",
338 Type::Primitive(PrimitiveType::Timestamptz),
339 )),
340 Arc::new(NestedField::optional(
341 10,
342 "v_decimal",
343 Type::Primitive(PrimitiveType::Decimal {
344 precision: 36,
345 scale: 10,
346 }),
347 )),
348 Arc::new(NestedField::optional(
349 11,
350 "v_ts_ntz",
351 Type::Primitive(PrimitiveType::Timestamp),
352 )),
353 Arc::new(NestedField::optional(
354 12,
355 "v_ts_ns_ntz",
356 Type::Primitive(PrimitiveType::TimestampNs),
357 )),
358 ])
359 .build()
360 .unwrap(),
361 );
362 let metadata = ManifestMetadata {
363 schema_id: 0,
364 schema: schema.clone(),
365 partition_spec: PartitionSpec::builder(schema)
366 .with_spec_id(0)
367 .add_partition_field("v_int", "v_int", Transform::Identity)
368 .unwrap()
369 .add_partition_field("v_long", "v_long", Transform::Identity)
370 .unwrap()
371 .build()
372 .unwrap(),
373 content: ManifestContentType::Data,
374 format_version: FormatVersion::V2,
375 };
376 let mut entries = vec![ManifestEntry {
377 status: ManifestStatus::Added,
378 snapshot_id: None,
379 sequence_number: None,
380 file_sequence_number: None,
381 data_file: DataFile {
382 content: DataContentType::Data,
383 file_format: DataFileFormat::Parquet,
384 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
385 partition: Struct::from_iter(
386 vec![
387 Some(Literal::int(1)),
388 Some(Literal::long(1000)),
389 ]
390 .into_iter()
391 ),
392 record_count: 1,
393 file_size_in_bytes: 5442,
394 column_sizes: HashMap::from([
395 (0, 73),
396 (6, 34),
397 (2, 73),
398 (7, 61),
399 (3, 61),
400 (5, 62),
401 (9, 79),
402 (10, 73),
403 (1, 61),
404 (4, 73),
405 (8, 73)
406 ]),
407 value_counts: HashMap::from([
408 (4, 1),
409 (5, 1),
410 (2, 1),
411 (0, 1),
412 (3, 1),
413 (6, 1),
414 (8, 1),
415 (1, 1),
416 (10, 1),
417 (7, 1),
418 (9, 1)
419 ]),
420 null_value_counts: HashMap::from([
421 (1, 0),
422 (6, 0),
423 (2, 0),
424 (8, 0),
425 (0, 0),
426 (3, 0),
427 (5, 0),
428 (9, 0),
429 (7, 0),
430 (4, 0),
431 (10, 0)
432 ]),
433 nan_value_counts: HashMap::new(),
434 lower_bounds: HashMap::new(),
435 upper_bounds: HashMap::new(),
436 key_metadata: None,
437 split_offsets: vec![4],
438 equality_ids: Some(Vec::new()),
439 sort_order_id: None,
440 partition_spec_id: 0,
441 first_row_id: None,
442 referenced_data_file: None,
443 content_offset: None,
444 content_size_in_bytes: None,
445 },
446 }];
447
448 let tmp_dir = TempDir::new().unwrap();
450 let path = tmp_dir.path().join("test_manifest.avro");
451 let io = FileIOBuilder::new_fs_io().build().unwrap();
452 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
453 let mut writer = ManifestWriterBuilder::new(
454 output_file,
455 Some(2),
456 None,
457 metadata.schema.clone(),
458 metadata.partition_spec.clone(),
459 )
460 .build_v2_data();
461 for entry in &entries {
462 writer.add_entry(entry.clone()).unwrap();
463 }
464 let manifest_file = writer.write_manifest_file().await.unwrap();
465 assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER);
466 assert_eq!(
467 manifest_file.min_sequence_number,
468 UNASSIGNED_SEQUENCE_NUMBER
469 );
470
471 let actual_manifest =
473 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
474 .unwrap();
475 entries[0].snapshot_id = Some(2);
477 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
478 }
479
480 #[tokio::test]
481 async fn test_parse_manifest_v1_unpartition() {
482 let schema = Arc::new(
483 Schema::builder()
484 .with_schema_id(1)
485 .with_fields(vec![
486 Arc::new(NestedField::optional(
487 1,
488 "id",
489 Type::Primitive(PrimitiveType::Int),
490 )),
491 Arc::new(NestedField::optional(
492 2,
493 "data",
494 Type::Primitive(PrimitiveType::String),
495 )),
496 Arc::new(NestedField::optional(
497 3,
498 "comment",
499 Type::Primitive(PrimitiveType::String),
500 )),
501 ])
502 .build()
503 .unwrap(),
504 );
505 let metadata = ManifestMetadata {
506 schema_id: 1,
507 schema: schema.clone(),
508 partition_spec: PartitionSpec::builder(schema)
509 .with_spec_id(0)
510 .build()
511 .unwrap(),
512 content: ManifestContentType::Data,
513 format_version: FormatVersion::V1,
514 };
515 let mut entries = vec![ManifestEntry {
516 status: ManifestStatus::Added,
517 snapshot_id: Some(0),
518 sequence_number: Some(0),
519 file_sequence_number: Some(0),
520 data_file: DataFile {
521 content: DataContentType::Data,
522 file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
523 file_format: DataFileFormat::Parquet,
524 partition: Struct::empty(),
525 record_count: 1,
526 file_size_in_bytes: 875,
527 column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
528 value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
529 null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
530 nan_value_counts: HashMap::new(),
531 lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
532 upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
533 key_metadata: None,
534 split_offsets: vec![4],
535 equality_ids: None,
536 sort_order_id: Some(0),
537 partition_spec_id: 0,
538 first_row_id: None,
539 referenced_data_file: None,
540 content_offset: None,
541 content_size_in_bytes: None,
542 }
543 }];
544
545 let tmp_dir = TempDir::new().unwrap();
547 let path = tmp_dir.path().join("test_manifest.avro");
548 let io = FileIOBuilder::new_fs_io().build().unwrap();
549 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
550 let mut writer = ManifestWriterBuilder::new(
551 output_file,
552 Some(3),
553 None,
554 metadata.schema.clone(),
555 metadata.partition_spec.clone(),
556 )
557 .build_v1();
558 for entry in &entries {
559 writer.add_entry(entry.clone()).unwrap();
560 }
561 writer.write_manifest_file().await.unwrap();
562
563 let actual_manifest =
565 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
566 .unwrap();
567 entries[0].snapshot_id = Some(3);
569 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
570 }
571
572 #[tokio::test]
573 async fn test_parse_manifest_v1_partition() {
574 let schema = Arc::new(
575 Schema::builder()
576 .with_fields(vec![
577 Arc::new(NestedField::optional(
578 1,
579 "id",
580 Type::Primitive(PrimitiveType::Long),
581 )),
582 Arc::new(NestedField::optional(
583 2,
584 "data",
585 Type::Primitive(PrimitiveType::String),
586 )),
587 Arc::new(NestedField::optional(
588 3,
589 "category",
590 Type::Primitive(PrimitiveType::String),
591 )),
592 ])
593 .build()
594 .unwrap(),
595 );
596 let metadata = ManifestMetadata {
597 schema_id: 0,
598 schema: schema.clone(),
599 partition_spec: PartitionSpec::builder(schema)
600 .add_partition_field("category", "category", Transform::Identity)
601 .unwrap()
602 .build()
603 .unwrap(),
604 content: ManifestContentType::Data,
605 format_version: FormatVersion::V1,
606 };
607 let mut entries = vec![
608 ManifestEntry {
609 status: ManifestStatus::Added,
610 snapshot_id: Some(0),
611 sequence_number: Some(0),
612 file_sequence_number: Some(0),
613 data_file: DataFile {
614 content: DataContentType::Data,
615 file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
616 file_format: DataFileFormat::Parquet,
617 partition: Struct::from_iter(
618 vec![
619 Some(
620 Literal::string("x"),
621 ),
622 ]
623 .into_iter()
624 ),
625 record_count: 1,
626 file_size_in_bytes: 874,
627 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
628 value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
629 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
630 nan_value_counts: HashMap::new(),
631 lower_bounds: HashMap::from([
632 (1, Datum::long(1)),
633 (2, Datum::string("a")),
634 (3, Datum::string("x"))
635 ]),
636 upper_bounds: HashMap::from([
637 (1, Datum::long(1)),
638 (2, Datum::string("a")),
639 (3, Datum::string("x"))
640 ]),
641 key_metadata: None,
642 split_offsets: vec![4],
643 equality_ids: None,
644 sort_order_id: Some(0),
645 partition_spec_id: 0,
646 first_row_id: None,
647 referenced_data_file: None,
648 content_offset: None,
649 content_size_in_bytes: None,
650 },
651 }
652 ];
653
654 let tmp_dir = TempDir::new().unwrap();
656 let path = tmp_dir.path().join("test_manifest.avro");
657 let io = FileIOBuilder::new_fs_io().build().unwrap();
658 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
659 let mut writer = ManifestWriterBuilder::new(
660 output_file,
661 Some(2),
662 None,
663 metadata.schema.clone(),
664 metadata.partition_spec.clone(),
665 )
666 .build_v1();
667 for entry in &entries {
668 writer.add_entry(entry.clone()).unwrap();
669 }
670 let manifest_file = writer.write_manifest_file().await.unwrap();
671 let partitions = manifest_file.partitions.unwrap();
672 assert_eq!(partitions.len(), 1);
673 assert_eq!(
674 partitions[0].clone().lower_bound.unwrap(),
675 Datum::string("x").to_bytes().unwrap()
676 );
677 assert_eq!(
678 partitions[0].clone().upper_bound.unwrap(),
679 Datum::string("x").to_bytes().unwrap()
680 );
681
682 let actual_manifest =
684 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
685 .unwrap();
686 entries[0].snapshot_id = Some(2);
688 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
689 }
690
691 #[tokio::test]
692 async fn test_parse_manifest_with_schema_evolution() {
693 let schema = Arc::new(
694 Schema::builder()
695 .with_fields(vec![
696 Arc::new(NestedField::optional(
697 1,
698 "id",
699 Type::Primitive(PrimitiveType::Long),
700 )),
701 Arc::new(NestedField::optional(
702 2,
703 "v_int",
704 Type::Primitive(PrimitiveType::Int),
705 )),
706 ])
707 .build()
708 .unwrap(),
709 );
710 let metadata = ManifestMetadata {
711 schema_id: 0,
712 schema: schema.clone(),
713 partition_spec: PartitionSpec::builder(schema)
714 .with_spec_id(0)
715 .build()
716 .unwrap(),
717 content: ManifestContentType::Data,
718 format_version: FormatVersion::V2,
719 };
720 let entries = vec![ManifestEntry {
721 status: ManifestStatus::Added,
722 snapshot_id: None,
723 sequence_number: None,
724 file_sequence_number: None,
725 data_file: DataFile {
726 content: DataContentType::Data,
727 file_format: DataFileFormat::Parquet,
728 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
729 partition: Struct::empty(),
730 record_count: 1,
731 file_size_in_bytes: 5442,
732 column_sizes: HashMap::from([
733 (1, 61),
734 (2, 73),
735 (3, 61),
736 ]),
737 value_counts: HashMap::default(),
738 null_value_counts: HashMap::default(),
739 nan_value_counts: HashMap::new(),
740 lower_bounds: HashMap::from([
741 (1, Datum::long(1)),
742 (2, Datum::int(2)),
743 (3, Datum::string("x"))
744 ]),
745 upper_bounds: HashMap::from([
746 (1, Datum::long(1)),
747 (2, Datum::int(2)),
748 (3, Datum::string("x"))
749 ]),
750 key_metadata: None,
751 split_offsets: vec![4],
752 equality_ids: None,
753 sort_order_id: None,
754 partition_spec_id: 0,
755 first_row_id: None,
756 referenced_data_file: None,
757 content_offset: None,
758 content_size_in_bytes: None,
759 },
760 }];
761
762 let tmp_dir = TempDir::new().unwrap();
764 let path = tmp_dir.path().join("test_manifest.avro");
765 let io = FileIOBuilder::new_fs_io().build().unwrap();
766 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
767 let mut writer = ManifestWriterBuilder::new(
768 output_file,
769 Some(2),
770 None,
771 metadata.schema.clone(),
772 metadata.partition_spec.clone(),
773 )
774 .build_v2_data();
775 for entry in &entries {
776 writer.add_entry(entry.clone()).unwrap();
777 }
778 writer.write_manifest_file().await.unwrap();
779
780 let actual_manifest =
782 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
783 .unwrap();
784
785 let schema = Arc::new(
789 Schema::builder()
790 .with_fields(vec![
791 Arc::new(NestedField::optional(
792 1,
793 "id",
794 Type::Primitive(PrimitiveType::Long),
795 )),
796 Arc::new(NestedField::optional(
797 2,
798 "v_int",
799 Type::Primitive(PrimitiveType::Int),
800 )),
801 ])
802 .build()
803 .unwrap(),
804 );
805 let expected_manifest = Manifest {
806 metadata: ManifestMetadata {
807 schema_id: 0,
808 schema: schema.clone(),
809 partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
810 content: ManifestContentType::Data,
811 format_version: FormatVersion::V2,
812 },
813 entries: vec![Arc::new(ManifestEntry {
814 status: ManifestStatus::Added,
815 snapshot_id: Some(2),
816 sequence_number: None,
817 file_sequence_number: None,
818 data_file: DataFile {
819 content: DataContentType::Data,
820 file_format: DataFileFormat::Parquet,
821 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
822 partition: Struct::empty(),
823 record_count: 1,
824 file_size_in_bytes: 5442,
825 column_sizes: HashMap::from([
826 (1, 61),
827 (2, 73),
828 (3, 61),
829 ]),
830 value_counts: HashMap::default(),
831 null_value_counts: HashMap::default(),
832 nan_value_counts: HashMap::new(),
833 lower_bounds: HashMap::from([
834 (1, Datum::long(1)),
835 (2, Datum::int(2)),
836 ]),
837 upper_bounds: HashMap::from([
838 (1, Datum::long(1)),
839 (2, Datum::int(2)),
840 ]),
841 key_metadata: None,
842 split_offsets: vec![4],
843 equality_ids: None,
844 sort_order_id: None,
845 partition_spec_id: 0,
846 first_row_id: None,
847 referenced_data_file: None,
848 content_offset: None,
849 content_size_in_bytes: None,
850 },
851 })],
852 };
853
854 assert_eq!(actual_manifest, expected_manifest);
855 }
856
857 #[tokio::test]
858 async fn test_manifest_summary() {
859 let schema = Arc::new(
860 Schema::builder()
861 .with_fields(vec![
862 Arc::new(NestedField::optional(
863 1,
864 "time",
865 Type::Primitive(PrimitiveType::Date),
866 )),
867 Arc::new(NestedField::optional(
868 2,
869 "v_float",
870 Type::Primitive(PrimitiveType::Float),
871 )),
872 Arc::new(NestedField::optional(
873 3,
874 "v_double",
875 Type::Primitive(PrimitiveType::Double),
876 )),
877 ])
878 .build()
879 .unwrap(),
880 );
881 let partition_spec = PartitionSpec::builder(schema.clone())
882 .with_spec_id(0)
883 .add_partition_field("time", "year_of_time", Transform::Year)
884 .unwrap()
885 .add_partition_field("v_float", "f", Transform::Identity)
886 .unwrap()
887 .add_partition_field("v_double", "d", Transform::Identity)
888 .unwrap()
889 .build()
890 .unwrap();
891 let metadata = ManifestMetadata {
892 schema_id: 0,
893 schema,
894 partition_spec,
895 content: ManifestContentType::Data,
896 format_version: FormatVersion::V2,
897 };
898 let entries = vec![
899 ManifestEntry {
900 status: ManifestStatus::Added,
901 snapshot_id: None,
902 sequence_number: None,
903 file_sequence_number: None,
904 data_file: DataFile {
905 content: DataContentType::Data,
906 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
907 file_format: DataFileFormat::Parquet,
908 partition: Struct::from_iter(
909 vec![
910 Some(Literal::int(2021)),
911 Some(Literal::float(1.0)),
912 Some(Literal::double(2.0)),
913 ]
914 ),
915 record_count: 1,
916 file_size_in_bytes: 5442,
917 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
918 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
919 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
920 nan_value_counts: HashMap::new(),
921 lower_bounds: HashMap::new(),
922 upper_bounds: HashMap::new(),
923 key_metadata: None,
924 split_offsets: vec![4],
925 equality_ids: None,
926 sort_order_id: None,
927 partition_spec_id: 0,
928 first_row_id: None,
929 referenced_data_file: None,
930 content_offset: None,
931 content_size_in_bytes: None,
932 }
933 },
934 ManifestEntry {
935 status: ManifestStatus::Added,
936 snapshot_id: None,
937 sequence_number: None,
938 file_sequence_number: None,
939 data_file: DataFile {
940 content: DataContentType::Data,
941 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
942 file_format: DataFileFormat::Parquet,
943 partition: Struct::from_iter(
944 vec![
945 Some(Literal::int(1111)),
946 Some(Literal::float(15.5)),
947 Some(Literal::double(25.5)),
948 ]
949 ),
950 record_count: 1,
951 file_size_in_bytes: 5442,
952 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
953 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
954 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
955 nan_value_counts: HashMap::new(),
956 lower_bounds: HashMap::new(),
957 upper_bounds: HashMap::new(),
958 key_metadata: None,
959 split_offsets: vec![4],
960 equality_ids: None,
961 sort_order_id: None,
962 partition_spec_id: 0,
963 first_row_id: None,
964 referenced_data_file: None,
965 content_offset: None,
966 content_size_in_bytes: None,
967 }
968 },
969 ManifestEntry {
970 status: ManifestStatus::Added,
971 snapshot_id: None,
972 sequence_number: None,
973 file_sequence_number: None,
974 data_file: DataFile {
975 content: DataContentType::Data,
976 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
977 file_format: DataFileFormat::Parquet,
978 partition: Struct::from_iter(
979 vec![
980 Some(Literal::int(1211)),
981 Some(Literal::float(f32::NAN)),
982 Some(Literal::double(1.0)),
983 ]
984 ),
985 record_count: 1,
986 file_size_in_bytes: 5442,
987 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
988 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
989 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
990 nan_value_counts: HashMap::new(),
991 lower_bounds: HashMap::new(),
992 upper_bounds: HashMap::new(),
993 key_metadata: None,
994 split_offsets: vec![4],
995 equality_ids: None,
996 sort_order_id: None,
997 partition_spec_id: 0,
998 first_row_id: None,
999 referenced_data_file: None,
1000 content_offset: None,
1001 content_size_in_bytes: None,
1002 }
1003 },
1004 ManifestEntry {
1005 status: ManifestStatus::Added,
1006 snapshot_id: None,
1007 sequence_number: None,
1008 file_sequence_number: None,
1009 data_file: DataFile {
1010 content: DataContentType::Data,
1011 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1012 file_format: DataFileFormat::Parquet,
1013 partition: Struct::from_iter(
1014 vec![
1015 Some(Literal::int(1111)),
1016 None,
1017 Some(Literal::double(11.0)),
1018 ]
1019 ),
1020 record_count: 1,
1021 file_size_in_bytes: 5442,
1022 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1023 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1024 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1025 nan_value_counts: HashMap::new(),
1026 lower_bounds: HashMap::new(),
1027 upper_bounds: HashMap::new(),
1028 key_metadata: None,
1029 split_offsets: vec![4],
1030 equality_ids: None,
1031 sort_order_id: None,
1032 partition_spec_id: 0,
1033 first_row_id: None,
1034 referenced_data_file: None,
1035 content_offset: None,
1036 content_size_in_bytes: None,
1037 }
1038 },
1039 ];
1040
1041 let tmp_dir = TempDir::new().unwrap();
1043 let path = tmp_dir.path().join("test_manifest.avro");
1044 let io = FileIOBuilder::new_fs_io().build().unwrap();
1045 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1046 let mut writer = ManifestWriterBuilder::new(
1047 output_file,
1048 Some(1),
1049 None,
1050 metadata.schema.clone(),
1051 metadata.partition_spec.clone(),
1052 )
1053 .build_v2_data();
1054 for entry in &entries {
1055 writer.add_entry(entry.clone()).unwrap();
1056 }
1057 let res = writer.write_manifest_file().await.unwrap();
1058
1059 let partitions = res.partitions.unwrap();
1060
1061 assert_eq!(partitions.len(), 3);
1062 assert_eq!(
1063 partitions[0].clone().lower_bound.unwrap(),
1064 Datum::int(1111).to_bytes().unwrap()
1065 );
1066 assert_eq!(
1067 partitions[0].clone().upper_bound.unwrap(),
1068 Datum::int(2021).to_bytes().unwrap()
1069 );
1070 assert!(!partitions[0].clone().contains_null);
1071 assert_eq!(partitions[0].clone().contains_nan, Some(false));
1072
1073 assert_eq!(
1074 partitions[1].clone().lower_bound.unwrap(),
1075 Datum::float(1.0).to_bytes().unwrap()
1076 );
1077 assert_eq!(
1078 partitions[1].clone().upper_bound.unwrap(),
1079 Datum::float(15.5).to_bytes().unwrap()
1080 );
1081 assert!(partitions[1].clone().contains_null);
1082 assert_eq!(partitions[1].clone().contains_nan, Some(true));
1083
1084 assert_eq!(
1085 partitions[2].clone().lower_bound.unwrap(),
1086 Datum::double(1.0).to_bytes().unwrap()
1087 );
1088 assert_eq!(
1089 partitions[2].clone().upper_bound.unwrap(),
1090 Datum::double(25.5).to_bytes().unwrap()
1091 );
1092 assert!(!partitions[2].clone().contains_null);
1093 assert_eq!(partitions[2].clone().contains_nan, Some(false));
1094 }
1095
1096 #[test]
1097 fn test_data_file_serialization() {
1098 let schema = Schema::builder()
1100 .with_schema_id(1)
1101 .with_identifier_field_ids(vec![1])
1102 .with_fields(vec![
1103 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
1104 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1105 ])
1106 .build()
1107 .unwrap();
1108
1109 let partition_spec = PartitionSpec::builder(schema.clone())
1111 .with_spec_id(1)
1112 .add_partition_field("id", "id_partition", Transform::Identity)
1113 .unwrap()
1114 .build()
1115 .unwrap();
1116
1117 let partition_type = partition_spec.partition_type(&schema).unwrap();
1119
1120 let data_files = vec![
1122 DataFileBuilder::default()
1123 .content(DataContentType::Data)
1124 .file_format(DataFileFormat::Parquet)
1125 .file_path("path/to/file1.parquet".to_string())
1126 .file_size_in_bytes(1024)
1127 .record_count(100)
1128 .partition_spec_id(1)
1129 .partition(Struct::empty())
1130 .column_sizes(HashMap::from([(1, 512), (2, 1024)]))
1131 .value_counts(HashMap::from([(1, 100), (2, 500)]))
1132 .null_value_counts(HashMap::from([(1, 0), (2, 1)]))
1133 .build()
1134 .unwrap(),
1135 DataFileBuilder::default()
1136 .content(DataContentType::Data)
1137 .file_format(DataFileFormat::Parquet)
1138 .file_path("path/to/file2.parquet".to_string())
1139 .file_size_in_bytes(2048)
1140 .record_count(200)
1141 .partition_spec_id(1)
1142 .partition(Struct::empty())
1143 .column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
1144 .value_counts(HashMap::from([(1, 200), (2, 600)]))
1145 .null_value_counts(HashMap::from([(1, 10), (2, 999)]))
1146 .build()
1147 .unwrap(),
1148 ];
1149
1150 let serialized_files = data_files
1152 .clone()
1153 .into_iter()
1154 .map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
1155 .collect::<Vec<String>>();
1156
1157 assert_eq!(serialized_files.len(), 2);
1159 let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
1160 let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
1161 let expected_serialized_file1 = serde_json::json!({
1162 "content": 0,
1163 "file_path": "path/to/file1.parquet",
1164 "file_format": "PARQUET",
1165 "partition": {},
1166 "record_count": 100,
1167 "file_size_in_bytes": 1024,
1168 "column_sizes": [
1169 { "key": 1, "value": 512 },
1170 { "key": 2, "value": 1024 }
1171 ],
1172 "value_counts": [
1173 { "key": 1, "value": 100 },
1174 { "key": 2, "value": 500 }
1175 ],
1176 "null_value_counts": [
1177 { "key": 1, "value": 0 },
1178 { "key": 2, "value": 1 }
1179 ],
1180 "nan_value_counts": [],
1181 "lower_bounds": [],
1182 "upper_bounds": [],
1183 "key_metadata": null,
1184 "split_offsets": [],
1185 "equality_ids": null,
1186 "sort_order_id": null,
1187 "first_row_id": null,
1188 "referenced_data_file": null,
1189 "content_offset": null,
1190 "content_size_in_bytes": null
1191 });
1192 let expected_serialized_file2 = serde_json::json!({
1193 "content": 0,
1194 "file_path": "path/to/file2.parquet",
1195 "file_format": "PARQUET",
1196 "partition": {},
1197 "record_count": 200,
1198 "file_size_in_bytes": 2048,
1199 "column_sizes": [
1200 { "key": 1, "value": 1024 },
1201 { "key": 2, "value": 2048 }
1202 ],
1203 "value_counts": [
1204 { "key": 1, "value": 200 },
1205 { "key": 2, "value": 600 }
1206 ],
1207 "null_value_counts": [
1208 { "key": 1, "value": 10 },
1209 { "key": 2, "value": 999 }
1210 ],
1211 "nan_value_counts": [],
1212 "lower_bounds": [],
1213 "upper_bounds": [],
1214 "key_metadata": null,
1215 "split_offsets": [],
1216 "equality_ids": null,
1217 "sort_order_id": null,
1218 "first_row_id": null,
1219 "referenced_data_file": null,
1220 "content_offset": null,
1221 "content_size_in_bytes": null
1222 });
1223 assert_eq!(pretty_json1, expected_serialized_file1);
1224 assert_eq!(pretty_json2, expected_serialized_file2);
1225
1226 let deserialized_files: Vec<DataFile> = serialized_files
1228 .into_iter()
1229 .map(|json| {
1230 deserialize_data_file_from_json(
1231 &json,
1232 partition_spec.spec_id(),
1233 &partition_type,
1234 &schema,
1235 )
1236 .unwrap()
1237 })
1238 .collect();
1239
1240 assert_eq!(deserialized_files.len(), 2);
1242 let deserialized_data_file1 = deserialized_files.first().unwrap();
1243 let deserialized_data_file2 = deserialized_files.get(1).unwrap();
1244 let original_data_file1 = data_files.first().unwrap();
1245 let original_data_file2 = data_files.get(1).unwrap();
1246
1247 assert_eq!(deserialized_data_file1, original_data_file1);
1248 assert_eq!(deserialized_data_file2, original_data_file2);
1249 }
1250}