1use std::collections::HashMap;
21use std::str::FromStr;
22
23use apache_avro::types::Value;
24use apache_avro::{Reader, Writer, from_value};
25use bytes::Bytes;
26pub use serde_bytes::ByteBuf;
27use serde_derive::{Deserialize, Serialize};
28
29use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2};
30use self::_serde::{ManifestFileV1, ManifestFileV2};
31use super::{FormatVersion, Manifest};
32use crate::error::Result;
33use crate::io::{FileIO, OutputFile};
34use crate::{Error, ErrorKind};
35
36pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1;
38
39#[derive(Debug, Clone, PartialEq)]
53pub struct ManifestList {
54 entries: Vec<ManifestFile>,
56}
57
58impl ManifestList {
59 pub fn parse_with_version(bs: &[u8], version: FormatVersion) -> Result<ManifestList> {
61 match version {
62 FormatVersion::V1 => {
63 let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
64 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
65 from_value::<_serde::ManifestListV1>(&values)?.try_into()
66 }
67 FormatVersion::V2 => {
68 let reader = Reader::new(bs)?;
69 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
70 from_value::<_serde::ManifestListV2>(&values)?.try_into()
71 }
72 }
73 }
74
75 pub fn entries(&self) -> &[ManifestFile] {
77 &self.entries
78 }
79
80 pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {
82 Box::new(self.entries.into_iter())
83 }
84}
85
86pub struct ManifestListWriter {
88 format_version: FormatVersion,
89 output_file: OutputFile,
90 avro_writer: Writer<'static, Vec<u8>>,
91 sequence_number: i64,
92 snapshot_id: i64,
93}
94
95impl std::fmt::Debug for ManifestListWriter {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("ManifestListWriter")
98 .field("format_version", &self.format_version)
99 .field("output_file", &self.output_file)
100 .field("avro_writer", &self.avro_writer.schema())
101 .finish_non_exhaustive()
102 }
103}
104
105impl ManifestListWriter {
106 pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
108 let mut metadata = HashMap::from_iter([
109 ("snapshot-id".to_string(), snapshot_id.to_string()),
110 ("format-version".to_string(), "1".to_string()),
111 ]);
112 if let Some(parent_snapshot_id) = parent_snapshot_id {
113 metadata.insert(
114 "parent-snapshot-id".to_string(),
115 parent_snapshot_id.to_string(),
116 );
117 }
118 Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
119 }
120
121 pub fn v2(
123 output_file: OutputFile,
124 snapshot_id: i64,
125 parent_snapshot_id: Option<i64>,
126 sequence_number: i64,
127 ) -> Self {
128 let mut metadata = HashMap::from_iter([
129 ("snapshot-id".to_string(), snapshot_id.to_string()),
130 ("sequence-number".to_string(), sequence_number.to_string()),
131 ("format-version".to_string(), "2".to_string()),
132 ]);
133 metadata.insert(
134 "parent-snapshot-id".to_string(),
135 parent_snapshot_id
136 .map(|v| v.to_string())
137 .unwrap_or("null".to_string()),
138 );
139 Self::new(
140 FormatVersion::V2,
141 output_file,
142 metadata,
143 sequence_number,
144 snapshot_id,
145 )
146 }
147
148 fn new(
149 format_version: FormatVersion,
150 output_file: OutputFile,
151 metadata: HashMap<String, String>,
152 sequence_number: i64,
153 snapshot_id: i64,
154 ) -> Self {
155 let avro_schema = match format_version {
156 FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
157 FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
158 };
159 let mut avro_writer = Writer::new(avro_schema, Vec::new());
160 for (key, value) in metadata {
161 avro_writer
162 .add_user_metadata(key, value)
163 .expect("Avro metadata should be added to the writer before the first record.");
164 }
165 Self {
166 format_version,
167 output_file,
168 avro_writer,
169 sequence_number,
170 snapshot_id,
171 }
172 }
173
174 pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
176 match self.format_version {
177 FormatVersion::V1 => {
178 for manifest in manifests {
179 let manifes: ManifestFileV1 = manifest.try_into()?;
180 self.avro_writer.append_ser(manifes)?;
181 }
182 }
183 FormatVersion::V2 => {
184 for mut manifest in manifests {
185 if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
186 if manifest.added_snapshot_id != self.snapshot_id {
187 return Err(Error::new(
188 ErrorKind::DataInvalid,
189 format!(
190 "Found unassigned sequence number for a manifest from snapshot {}.",
191 manifest.added_snapshot_id
192 ),
193 ));
194 }
195 manifest.sequence_number = self.sequence_number;
196 }
197 if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
198 if manifest.added_snapshot_id != self.snapshot_id {
199 return Err(Error::new(
200 ErrorKind::DataInvalid,
201 format!(
202 "Found unassigned sequence number for a manifest from snapshot {}.",
203 manifest.added_snapshot_id
204 ),
205 ));
206 }
207 manifest.min_sequence_number = self.sequence_number;
208 }
209 let manifest_entry: ManifestFileV2 = manifest.try_into()?;
210 self.avro_writer.append_ser(manifest_entry)?;
211 }
212 }
213 }
214 Ok(())
215 }
216
217 pub async fn close(self) -> Result<()> {
219 let data = self.avro_writer.into_inner()?;
220 let mut writer = self.output_file.writer().await?;
221 writer.write(Bytes::from(data)).await?;
222 writer.close().await?;
223 Ok(())
224 }
225}
226
227mod _const_schema {
229 use std::sync::Arc;
230
231 use apache_avro::Schema as AvroSchema;
232 use once_cell::sync::Lazy;
233
234 use crate::avro::schema_to_avro_schema;
235 use crate::spec::{
236 ListType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type,
237 };
238
239 static MANIFEST_PATH: Lazy<NestedFieldRef> = {
240 Lazy::new(|| {
241 Arc::new(NestedField::required(
242 500,
243 "manifest_path",
244 Type::Primitive(PrimitiveType::String),
245 ))
246 })
247 };
248 static MANIFEST_LENGTH: Lazy<NestedFieldRef> = {
249 Lazy::new(|| {
250 Arc::new(NestedField::required(
251 501,
252 "manifest_length",
253 Type::Primitive(PrimitiveType::Long),
254 ))
255 })
256 };
257 static PARTITION_SPEC_ID: Lazy<NestedFieldRef> = {
258 Lazy::new(|| {
259 Arc::new(NestedField::required(
260 502,
261 "partition_spec_id",
262 Type::Primitive(PrimitiveType::Int),
263 ))
264 })
265 };
266 static CONTENT: Lazy<NestedFieldRef> = {
267 Lazy::new(|| {
268 Arc::new(NestedField::required(
269 517,
270 "content",
271 Type::Primitive(PrimitiveType::Int),
272 ))
273 })
274 };
275 static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
276 Lazy::new(|| {
277 Arc::new(NestedField::required(
278 515,
279 "sequence_number",
280 Type::Primitive(PrimitiveType::Long),
281 ))
282 })
283 };
284 static MIN_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
285 Lazy::new(|| {
286 Arc::new(NestedField::required(
287 516,
288 "min_sequence_number",
289 Type::Primitive(PrimitiveType::Long),
290 ))
291 })
292 };
293 static ADDED_SNAPSHOT_ID: Lazy<NestedFieldRef> = {
294 Lazy::new(|| {
295 Arc::new(NestedField::required(
296 503,
297 "added_snapshot_id",
298 Type::Primitive(PrimitiveType::Long),
299 ))
300 })
301 };
302 static ADDED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
303 Lazy::new(|| {
304 Arc::new(NestedField::required(
305 504,
306 "added_files_count",
307 Type::Primitive(PrimitiveType::Int),
308 ))
309 })
310 };
311 static ADDED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
312 Lazy::new(|| {
313 Arc::new(NestedField::optional(
314 504,
315 "added_data_files_count",
316 Type::Primitive(PrimitiveType::Int),
317 ))
318 })
319 };
320 static EXISTING_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
321 Lazy::new(|| {
322 Arc::new(NestedField::required(
323 505,
324 "existing_files_count",
325 Type::Primitive(PrimitiveType::Int),
326 ))
327 })
328 };
329 static EXISTING_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
330 Lazy::new(|| {
331 Arc::new(NestedField::optional(
332 505,
333 "existing_data_files_count",
334 Type::Primitive(PrimitiveType::Int),
335 ))
336 })
337 };
338 static DELETED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
339 Lazy::new(|| {
340 Arc::new(NestedField::required(
341 506,
342 "deleted_files_count",
343 Type::Primitive(PrimitiveType::Int),
344 ))
345 })
346 };
347 static DELETED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
348 Lazy::new(|| {
349 Arc::new(NestedField::optional(
350 506,
351 "deleted_data_files_count",
352 Type::Primitive(PrimitiveType::Int),
353 ))
354 })
355 };
356 static ADDED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
357 Lazy::new(|| {
358 Arc::new(NestedField::required(
359 512,
360 "added_rows_count",
361 Type::Primitive(PrimitiveType::Long),
362 ))
363 })
364 };
365 static ADDED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
366 Lazy::new(|| {
367 Arc::new(NestedField::optional(
368 512,
369 "added_rows_count",
370 Type::Primitive(PrimitiveType::Long),
371 ))
372 })
373 };
374 static EXISTING_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
375 Lazy::new(|| {
376 Arc::new(NestedField::required(
377 513,
378 "existing_rows_count",
379 Type::Primitive(PrimitiveType::Long),
380 ))
381 })
382 };
383 static EXISTING_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
384 Lazy::new(|| {
385 Arc::new(NestedField::optional(
386 513,
387 "existing_rows_count",
388 Type::Primitive(PrimitiveType::Long),
389 ))
390 })
391 };
392 static DELETED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
393 Lazy::new(|| {
394 Arc::new(NestedField::required(
395 514,
396 "deleted_rows_count",
397 Type::Primitive(PrimitiveType::Long),
398 ))
399 })
400 };
401 static DELETED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
402 Lazy::new(|| {
403 Arc::new(NestedField::optional(
404 514,
405 "deleted_rows_count",
406 Type::Primitive(PrimitiveType::Long),
407 ))
408 })
409 };
410 static PARTITIONS: Lazy<NestedFieldRef> = {
411 Lazy::new(|| {
412 let fields = vec![
414 Arc::new(NestedField::required(
415 509,
416 "contains_null",
417 Type::Primitive(PrimitiveType::Boolean),
418 )),
419 Arc::new(NestedField::optional(
420 518,
421 "contains_nan",
422 Type::Primitive(PrimitiveType::Boolean),
423 )),
424 Arc::new(NestedField::optional(
425 510,
426 "lower_bound",
427 Type::Primitive(PrimitiveType::Binary),
428 )),
429 Arc::new(NestedField::optional(
430 511,
431 "upper_bound",
432 Type::Primitive(PrimitiveType::Binary),
433 )),
434 ];
435 let element_field = Arc::new(NestedField::required(
436 508,
437 "r_508",
438 Type::Struct(StructType::new(fields)),
439 ));
440 Arc::new(NestedField::optional(
441 507,
442 "partitions",
443 Type::List(ListType { element_field }),
444 ))
445 })
446 };
447 static KEY_METADATA: Lazy<NestedFieldRef> = {
448 Lazy::new(|| {
449 Arc::new(NestedField::optional(
450 519,
451 "key_metadata",
452 Type::Primitive(PrimitiveType::Binary),
453 ))
454 })
455 };
456
457 static V1_SCHEMA: Lazy<Schema> = {
458 Lazy::new(|| {
459 let fields = vec![
460 MANIFEST_PATH.clone(),
461 MANIFEST_LENGTH.clone(),
462 PARTITION_SPEC_ID.clone(),
463 ADDED_SNAPSHOT_ID.clone(),
464 ADDED_FILES_COUNT_V1.clone().to_owned(),
465 EXISTING_FILES_COUNT_V1.clone(),
466 DELETED_FILES_COUNT_V1.clone(),
467 ADDED_ROWS_COUNT_V1.clone(),
468 EXISTING_ROWS_COUNT_V1.clone(),
469 DELETED_ROWS_COUNT_V1.clone(),
470 PARTITIONS.clone(),
471 KEY_METADATA.clone(),
472 ];
473 Schema::builder().with_fields(fields).build().unwrap()
474 })
475 };
476
477 static V2_SCHEMA: Lazy<Schema> = {
478 Lazy::new(|| {
479 let fields = vec![
480 MANIFEST_PATH.clone(),
481 MANIFEST_LENGTH.clone(),
482 PARTITION_SPEC_ID.clone(),
483 CONTENT.clone(),
484 SEQUENCE_NUMBER.clone(),
485 MIN_SEQUENCE_NUMBER.clone(),
486 ADDED_SNAPSHOT_ID.clone(),
487 ADDED_FILES_COUNT_V2.clone(),
488 EXISTING_FILES_COUNT_V2.clone(),
489 DELETED_FILES_COUNT_V2.clone(),
490 ADDED_ROWS_COUNT_V2.clone(),
491 EXISTING_ROWS_COUNT_V2.clone(),
492 DELETED_ROWS_COUNT_V2.clone(),
493 PARTITIONS.clone(),
494 KEY_METADATA.clone(),
495 ];
496 Schema::builder().with_fields(fields).build().unwrap()
497 })
498 };
499
500 pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V1: Lazy<AvroSchema> =
501 Lazy::new(|| schema_to_avro_schema("manifest_file", &V1_SCHEMA).unwrap());
502
503 pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V2: Lazy<AvroSchema> =
504 Lazy::new(|| schema_to_avro_schema("manifest_file", &V2_SCHEMA).unwrap());
505}
506
507#[derive(Debug, PartialEq, Clone, Eq, Hash)]
509pub struct ManifestFile {
510 pub manifest_path: String,
514 pub manifest_length: i64,
518 pub partition_spec_id: i32,
523 pub content: ManifestContentType,
528 pub sequence_number: i64,
533 pub min_sequence_number: i64,
538 pub added_snapshot_id: i64,
542 pub added_files_count: Option<u32>,
547 pub existing_files_count: Option<u32>,
552 pub deleted_files_count: Option<u32>,
557 pub added_rows_count: Option<u64>,
562 pub existing_rows_count: Option<u64>,
567 pub deleted_rows_count: Option<u64>,
572 pub partitions: Option<Vec<FieldSummary>>,
579 pub key_metadata: Option<Vec<u8>>,
583}
584
585impl ManifestFile {
586 pub fn has_added_files(&self) -> bool {
588 self.added_files_count.map(|c| c > 0).unwrap_or(true)
589 }
590
591 pub fn has_deleted_files(&self) -> bool {
593 self.deleted_files_count.map(|c| c > 0).unwrap_or(true)
594 }
595
596 pub fn has_existing_files(&self) -> bool {
598 self.existing_files_count.map(|c| c > 0).unwrap_or(true)
599 }
600}
601
602#[derive(Debug, PartialEq, Clone, Copy, Eq, Hash, Default)]
604pub enum ManifestContentType {
605 #[default]
607 Data = 0,
608 Deletes = 1,
610}
611
612impl FromStr for ManifestContentType {
613 type Err = Error;
614
615 fn from_str(s: &str) -> Result<Self> {
616 match s {
617 "data" => Ok(ManifestContentType::Data),
618 "deletes" => Ok(ManifestContentType::Deletes),
619 _ => Err(Error::new(
620 ErrorKind::DataInvalid,
621 format!("Invalid manifest content type: {s}"),
622 )),
623 }
624 }
625}
626
627impl std::fmt::Display for ManifestContentType {
628 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
629 match self {
630 ManifestContentType::Data => write!(f, "data"),
631 ManifestContentType::Deletes => write!(f, "deletes"),
632 }
633 }
634}
635
636impl TryFrom<i32> for ManifestContentType {
637 type Error = Error;
638
639 fn try_from(value: i32) -> std::result::Result<Self, Self::Error> {
640 match value {
641 0 => Ok(ManifestContentType::Data),
642 1 => Ok(ManifestContentType::Deletes),
643 _ => Err(Error::new(
644 crate::ErrorKind::DataInvalid,
645 format!(
646 "Invalid manifest content type. Expected 0 or 1, got {}",
647 value
648 ),
649 )),
650 }
651 }
652}
653
654impl ManifestFile {
655 pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
659 let avro = file_io.new_input(&self.manifest_path)?.read().await?;
660
661 let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?;
662
663 for entry in &mut entries {
665 entry.inherit_data(self);
666 }
667
668 Ok(Manifest::new(metadata, entries))
669 }
670}
671
672#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Hash)]
676pub struct FieldSummary {
677 pub contains_null: bool,
682 pub contains_nan: Option<bool>,
686 pub lower_bound: Option<ByteBuf>,
690 pub upper_bound: Option<ByteBuf>,
694}
695
696pub(super) mod _serde {
701 pub use serde_bytes::ByteBuf;
702 use serde_derive::{Deserialize, Serialize};
703
704 use super::ManifestFile;
705 use crate::Error;
706 use crate::error::Result;
707 use crate::spec::FieldSummary;
708
709 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
710 #[serde(transparent)]
711 pub(crate) struct ManifestListV2 {
712 entries: Vec<ManifestFileV2>,
713 }
714
715 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
716 #[serde(transparent)]
717 pub(crate) struct ManifestListV1 {
718 entries: Vec<ManifestFileV1>,
719 }
720
721 impl ManifestListV2 {
722 pub fn try_into(self) -> Result<super::ManifestList> {
724 Ok(super::ManifestList {
725 entries: self
726 .entries
727 .into_iter()
728 .map(|v| v.try_into())
729 .collect::<Result<Vec<_>>>()?,
730 })
731 }
732 }
733
734 impl TryFrom<super::ManifestList> for ManifestListV2 {
735 type Error = Error;
736
737 fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
738 Ok(Self {
739 entries: value
740 .entries
741 .into_iter()
742 .map(|v| v.try_into())
743 .collect::<std::result::Result<Vec<_>, _>>()?,
744 })
745 }
746 }
747
748 impl ManifestListV1 {
749 pub fn try_into(self) -> Result<super::ManifestList> {
751 Ok(super::ManifestList {
752 entries: self
753 .entries
754 .into_iter()
755 .map(|v| v.try_into())
756 .collect::<Result<Vec<_>>>()?,
757 })
758 }
759 }
760
761 impl TryFrom<super::ManifestList> for ManifestListV1 {
762 type Error = Error;
763
764 fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
765 Ok(Self {
766 entries: value
767 .entries
768 .into_iter()
769 .map(|v| v.try_into())
770 .collect::<std::result::Result<Vec<_>, _>>()?,
771 })
772 }
773 }
774
775 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
776 pub(super) struct ManifestFileV1 {
777 pub manifest_path: String,
778 pub manifest_length: i64,
779 pub partition_spec_id: i32,
780 pub added_snapshot_id: i64,
781 pub added_data_files_count: Option<i32>,
782 pub existing_data_files_count: Option<i32>,
783 pub deleted_data_files_count: Option<i32>,
784 pub added_rows_count: Option<i64>,
785 pub existing_rows_count: Option<i64>,
786 pub deleted_rows_count: Option<i64>,
787 pub partitions: Option<Vec<FieldSummary>>,
788 pub key_metadata: Option<ByteBuf>,
789 }
790
791 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
795 pub(super) struct ManifestFileV2 {
796 pub manifest_path: String,
797 pub manifest_length: i64,
798 pub partition_spec_id: i32,
799 #[serde(default = "v2_default_content_for_v1")]
800 pub content: i32,
801 #[serde(default = "v2_default_sequence_number_for_v1")]
802 pub sequence_number: i64,
803 #[serde(default = "v2_default_min_sequence_number_for_v1")]
804 pub min_sequence_number: i64,
805 pub added_snapshot_id: i64,
806 #[serde(alias = "added_data_files_count", alias = "added_files_count")]
807 pub added_files_count: i32,
808 #[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
809 pub existing_files_count: i32,
810 #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
811 pub deleted_files_count: i32,
812 pub added_rows_count: i64,
813 pub existing_rows_count: i64,
814 pub deleted_rows_count: i64,
815 pub partitions: Option<Vec<FieldSummary>>,
816 pub key_metadata: Option<ByteBuf>,
817 }
818
819 impl ManifestFileV2 {
820 pub fn try_into(self) -> Result<ManifestFile> {
822 Ok(ManifestFile {
823 manifest_path: self.manifest_path,
824 manifest_length: self.manifest_length,
825 partition_spec_id: self.partition_spec_id,
826 content: self.content.try_into()?,
827 sequence_number: self.sequence_number,
828 min_sequence_number: self.min_sequence_number,
829 added_snapshot_id: self.added_snapshot_id,
830 added_files_count: Some(self.added_files_count.try_into()?),
831 existing_files_count: Some(self.existing_files_count.try_into()?),
832 deleted_files_count: Some(self.deleted_files_count.try_into()?),
833 added_rows_count: Some(self.added_rows_count.try_into()?),
834 existing_rows_count: Some(self.existing_rows_count.try_into()?),
835 deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
836 partitions: self.partitions,
837 key_metadata: self.key_metadata.map(|b| b.into_vec()),
838 })
839 }
840 }
841
842 fn v2_default_content_for_v1() -> i32 {
843 super::ManifestContentType::Data as i32
844 }
845
846 fn v2_default_sequence_number_for_v1() -> i64 {
847 0
848 }
849
850 fn v2_default_min_sequence_number_for_v1() -> i64 {
851 0
852 }
853
854 impl ManifestFileV1 {
855 pub fn try_into(self) -> Result<ManifestFile> {
857 Ok(ManifestFile {
858 manifest_path: self.manifest_path,
859 manifest_length: self.manifest_length,
860 partition_spec_id: self.partition_spec_id,
861 added_snapshot_id: self.added_snapshot_id,
862 added_files_count: self
863 .added_data_files_count
864 .map(TryInto::try_into)
865 .transpose()?,
866 existing_files_count: self
867 .existing_data_files_count
868 .map(TryInto::try_into)
869 .transpose()?,
870 deleted_files_count: self
871 .deleted_data_files_count
872 .map(TryInto::try_into)
873 .transpose()?,
874 added_rows_count: self.added_rows_count.map(TryInto::try_into).transpose()?,
875 existing_rows_count: self
876 .existing_rows_count
877 .map(TryInto::try_into)
878 .transpose()?,
879 deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?,
880 partitions: self.partitions,
881 key_metadata: self.key_metadata.map(|b| b.into_vec()),
882 content: super::ManifestContentType::Data,
885 sequence_number: 0,
886 min_sequence_number: 0,
887 })
888 }
889 }
890
891 fn convert_to_serde_key_metadata(key_metadata: Option<Vec<u8>>) -> Option<ByteBuf> {
892 match key_metadata {
893 Some(metadata) if !metadata.is_empty() => Some(ByteBuf::from(metadata)),
894 _ => None,
895 }
896 }
897
898 impl TryFrom<ManifestFile> for ManifestFileV2 {
899 type Error = Error;
900
901 fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
902 let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
903 Ok(Self {
904 manifest_path: value.manifest_path,
905 manifest_length: value.manifest_length,
906 partition_spec_id: value.partition_spec_id,
907 content: value.content as i32,
908 sequence_number: value.sequence_number,
909 min_sequence_number: value.min_sequence_number,
910 added_snapshot_id: value.added_snapshot_id,
911 added_files_count: value
912 .added_files_count
913 .ok_or_else(|| {
914 Error::new(
915 crate::ErrorKind::DataInvalid,
916 "added_data_files_count in ManifestFileV2 should be require",
917 )
918 })?
919 .try_into()?,
920 existing_files_count: value
921 .existing_files_count
922 .ok_or_else(|| {
923 Error::new(
924 crate::ErrorKind::DataInvalid,
925 "existing_data_files_count in ManifestFileV2 should be require",
926 )
927 })?
928 .try_into()?,
929 deleted_files_count: value
930 .deleted_files_count
931 .ok_or_else(|| {
932 Error::new(
933 crate::ErrorKind::DataInvalid,
934 "deleted_data_files_count in ManifestFileV2 should be require",
935 )
936 })?
937 .try_into()?,
938 added_rows_count: value
939 .added_rows_count
940 .ok_or_else(|| {
941 Error::new(
942 crate::ErrorKind::DataInvalid,
943 "added_rows_count in ManifestFileV2 should be require",
944 )
945 })?
946 .try_into()?,
947 existing_rows_count: value
948 .existing_rows_count
949 .ok_or_else(|| {
950 Error::new(
951 crate::ErrorKind::DataInvalid,
952 "existing_rows_count in ManifestFileV2 should be require",
953 )
954 })?
955 .try_into()?,
956 deleted_rows_count: value
957 .deleted_rows_count
958 .ok_or_else(|| {
959 Error::new(
960 crate::ErrorKind::DataInvalid,
961 "deleted_rows_count in ManifestFileV2 should be require",
962 )
963 })?
964 .try_into()?,
965 partitions: value.partitions,
966 key_metadata,
967 })
968 }
969 }
970
971 impl TryFrom<ManifestFile> for ManifestFileV1 {
972 type Error = Error;
973
974 fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
975 let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
976 Ok(Self {
977 manifest_path: value.manifest_path,
978 manifest_length: value.manifest_length,
979 partition_spec_id: value.partition_spec_id,
980 added_snapshot_id: value.added_snapshot_id,
981 added_data_files_count: value
982 .added_files_count
983 .map(TryInto::try_into)
984 .transpose()?,
985 existing_data_files_count: value
986 .existing_files_count
987 .map(TryInto::try_into)
988 .transpose()?,
989 deleted_data_files_count: value
990 .deleted_files_count
991 .map(TryInto::try_into)
992 .transpose()?,
993 added_rows_count: value.added_rows_count.map(TryInto::try_into).transpose()?,
994 existing_rows_count: value
995 .existing_rows_count
996 .map(TryInto::try_into)
997 .transpose()?,
998 deleted_rows_count: value
999 .deleted_rows_count
1000 .map(TryInto::try_into)
1001 .transpose()?,
1002 partitions: value.partitions,
1003 key_metadata,
1004 })
1005 }
1006 }
1007}
1008
1009#[cfg(test)]
1010mod test {
1011 use std::fs;
1012
1013 use apache_avro::{Reader, Schema};
1014 use tempfile::TempDir;
1015
1016 use super::_serde::ManifestListV2;
1017 use crate::io::FileIOBuilder;
1018 use crate::spec::manifest_list::_serde::ManifestListV1;
1019 use crate::spec::{
1020 Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter,
1021 UNASSIGNED_SEQUENCE_NUMBER,
1022 };
1023
1024 #[tokio::test]
1025 async fn test_parse_manifest_list_v1() {
1026 let manifest_list = ManifestList {
1027 entries: vec![
1028 ManifestFile {
1029 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1030 manifest_length: 5806,
1031 partition_spec_id: 0,
1032 content: ManifestContentType::Data,
1033 sequence_number: 0,
1034 min_sequence_number: 0,
1035 added_snapshot_id: 1646658105718557341,
1036 added_files_count: Some(3),
1037 existing_files_count: Some(0),
1038 deleted_files_count: Some(0),
1039 added_rows_count: Some(3),
1040 existing_rows_count: Some(0),
1041 deleted_rows_count: Some(0),
1042 partitions: Some(vec![]),
1043 key_metadata: None,
1044 }
1045 ]
1046 };
1047
1048 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1049
1050 let tmp_dir = TempDir::new().unwrap();
1051 let file_name = "simple_manifest_list_v1.avro";
1052 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1053
1054 let mut writer = ManifestListWriter::v1(
1055 file_io.new_output(full_path.clone()).unwrap(),
1056 1646658105718557341,
1057 Some(1646658105718557341),
1058 );
1059
1060 writer
1061 .add_manifests(manifest_list.entries.clone().into_iter())
1062 .unwrap();
1063 writer.close().await.unwrap();
1064
1065 let bs = fs::read(full_path).expect("read_file must succeed");
1066
1067 let parsed_manifest_list =
1068 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1069
1070 assert_eq!(manifest_list, parsed_manifest_list);
1071 }
1072
1073 #[tokio::test]
1074 async fn test_parse_manifest_list_v2() {
1075 let manifest_list = ManifestList {
1076 entries: vec![
1077 ManifestFile {
1078 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1079 manifest_length: 6926,
1080 partition_spec_id: 1,
1081 content: ManifestContentType::Data,
1082 sequence_number: 1,
1083 min_sequence_number: 1,
1084 added_snapshot_id: 377075049360453639,
1085 added_files_count: Some(1),
1086 existing_files_count: Some(0),
1087 deleted_files_count: Some(0),
1088 added_rows_count: Some(3),
1089 existing_rows_count: Some(0),
1090 deleted_rows_count: Some(0),
1091 partitions: Some(
1092 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1093 ),
1094 key_metadata: None,
1095 },
1096 ManifestFile {
1097 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
1098 manifest_length: 6926,
1099 partition_spec_id: 2,
1100 content: ManifestContentType::Data,
1101 sequence_number: 1,
1102 min_sequence_number: 1,
1103 added_snapshot_id: 377075049360453639,
1104 added_files_count: Some(1),
1105 existing_files_count: Some(0),
1106 deleted_files_count: Some(0),
1107 added_rows_count: Some(3),
1108 existing_rows_count: Some(0),
1109 deleted_rows_count: Some(0),
1110 partitions: Some(
1111 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
1112 ),
1113 key_metadata: None,
1114 }
1115 ]
1116 };
1117
1118 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1119
1120 let tmp_dir = TempDir::new().unwrap();
1121 let file_name = "simple_manifest_list_v1.avro";
1122 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1123
1124 let mut writer = ManifestListWriter::v2(
1125 file_io.new_output(full_path.clone()).unwrap(),
1126 1646658105718557341,
1127 Some(1646658105718557341),
1128 1,
1129 );
1130
1131 writer
1132 .add_manifests(manifest_list.entries.clone().into_iter())
1133 .unwrap();
1134 writer.close().await.unwrap();
1135
1136 let bs = fs::read(full_path).expect("read_file must succeed");
1137
1138 let parsed_manifest_list =
1139 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1140
1141 assert_eq!(manifest_list, parsed_manifest_list);
1142 }
1143
1144 #[test]
1145 fn test_serialize_manifest_list_v1() {
1146 let manifest_list:ManifestListV1 = ManifestList {
1147 entries: vec![ManifestFile {
1148 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1149 manifest_length: 5806,
1150 partition_spec_id: 0,
1151 content: ManifestContentType::Data,
1152 sequence_number: 0,
1153 min_sequence_number: 0,
1154 added_snapshot_id: 1646658105718557341,
1155 added_files_count: Some(3),
1156 existing_files_count: Some(0),
1157 deleted_files_count: Some(0),
1158 added_rows_count: Some(3),
1159 existing_rows_count: Some(0),
1160 deleted_rows_count: Some(0),
1161 partitions: None,
1162 key_metadata: None,
1163 }]
1164 }.try_into().unwrap();
1165 let result = serde_json::to_string(&manifest_list).unwrap();
1166 assert_eq!(
1167 result,
1168 r#"[{"manifest_path":"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro","manifest_length":5806,"partition_spec_id":0,"added_snapshot_id":1646658105718557341,"added_data_files_count":3,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":null,"key_metadata":null}]"#
1169 );
1170 }
1171
1172 #[test]
1173 fn test_serialize_manifest_list_v2() {
1174 let manifest_list:ManifestListV2 = ManifestList {
1175 entries: vec![ManifestFile {
1176 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1177 manifest_length: 6926,
1178 partition_spec_id: 1,
1179 content: ManifestContentType::Data,
1180 sequence_number: 1,
1181 min_sequence_number: 1,
1182 added_snapshot_id: 377075049360453639,
1183 added_files_count: Some(1),
1184 existing_files_count: Some(0),
1185 deleted_files_count: Some(0),
1186 added_rows_count: Some(3),
1187 existing_rows_count: Some(0),
1188 deleted_rows_count: Some(0),
1189 partitions: Some(
1190 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1191 ),
1192 key_metadata: None,
1193 }]
1194 }.try_into().unwrap();
1195 let result = serde_json::to_string(&manifest_list).unwrap();
1196 assert_eq!(
1197 result,
1198 r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null}]"#
1199 );
1200 }
1201
1202 #[tokio::test]
1203 async fn test_manifest_list_writer_v1() {
1204 let expected_manifest_list = ManifestList {
1205 entries: vec![ManifestFile {
1206 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1207 manifest_length: 5806,
1208 partition_spec_id: 1,
1209 content: ManifestContentType::Data,
1210 sequence_number: 0,
1211 min_sequence_number: 0,
1212 added_snapshot_id: 1646658105718557341,
1213 added_files_count: Some(3),
1214 existing_files_count: Some(0),
1215 deleted_files_count: Some(0),
1216 added_rows_count: Some(3),
1217 existing_rows_count: Some(0),
1218 deleted_rows_count: Some(0),
1219 partitions: Some(
1220 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
1221 ),
1222 key_metadata: None,
1223 }]
1224 };
1225
1226 let temp_dir = TempDir::new().unwrap();
1227 let path = temp_dir.path().join("manifest_list_v1.avro");
1228 let io = FileIOBuilder::new_fs_io().build().unwrap();
1229 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1230
1231 let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1232 writer
1233 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1234 .unwrap();
1235 writer.close().await.unwrap();
1236
1237 let bs = fs::read(path).unwrap();
1238
1239 let manifest_list =
1240 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1241 assert_eq!(manifest_list, expected_manifest_list);
1242
1243 temp_dir.close().unwrap();
1244 }
1245
1246 #[tokio::test]
1247 async fn test_manifest_list_writer_v2() {
1248 let snapshot_id = 377075049360453639;
1249 let seq_num = 1;
1250 let mut expected_manifest_list = ManifestList {
1251 entries: vec![ManifestFile {
1252 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1253 manifest_length: 6926,
1254 partition_spec_id: 1,
1255 content: ManifestContentType::Data,
1256 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1257 min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1258 added_snapshot_id: snapshot_id,
1259 added_files_count: Some(1),
1260 existing_files_count: Some(0),
1261 deleted_files_count: Some(0),
1262 added_rows_count: Some(3),
1263 existing_rows_count: Some(0),
1264 deleted_rows_count: Some(0),
1265 partitions: Some(
1266 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1267 ),
1268 key_metadata: None,
1269 }]
1270 };
1271
1272 let temp_dir = TempDir::new().unwrap();
1273 let path = temp_dir.path().join("manifest_list_v2.avro");
1274 let io = FileIOBuilder::new_fs_io().build().unwrap();
1275 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1276
1277 let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
1278 writer
1279 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1280 .unwrap();
1281 writer.close().await.unwrap();
1282
1283 let bs = fs::read(path).unwrap();
1284 let manifest_list =
1285 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1286 expected_manifest_list.entries[0].sequence_number = seq_num;
1287 expected_manifest_list.entries[0].min_sequence_number = seq_num;
1288 assert_eq!(manifest_list, expected_manifest_list);
1289
1290 temp_dir.close().unwrap();
1291 }
1292
1293 #[tokio::test]
1294 async fn test_manifest_list_writer_v1_as_v2() {
1295 let expected_manifest_list = ManifestList {
1296 entries: vec![ManifestFile {
1297 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1298 manifest_length: 5806,
1299 partition_spec_id: 1,
1300 content: ManifestContentType::Data,
1301 sequence_number: 0,
1302 min_sequence_number: 0,
1303 added_snapshot_id: 1646658105718557341,
1304 added_files_count: Some(3),
1305 existing_files_count: Some(0),
1306 deleted_files_count: Some(0),
1307 added_rows_count: Some(3),
1308 existing_rows_count: Some(0),
1309 deleted_rows_count: Some(0),
1310 partitions: Some(
1311 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1312 ),
1313 key_metadata: None,
1314 }]
1315 };
1316
1317 let temp_dir = TempDir::new().unwrap();
1318 let path = temp_dir.path().join("manifest_list_v1.avro");
1319 let io = FileIOBuilder::new_fs_io().build().unwrap();
1320 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1321
1322 let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1323 writer
1324 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1325 .unwrap();
1326 writer.close().await.unwrap();
1327
1328 let bs = fs::read(path).unwrap();
1329
1330 let manifest_list =
1331 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1332 assert_eq!(manifest_list, expected_manifest_list);
1333
1334 temp_dir.close().unwrap();
1335 }
1336
1337 #[tokio::test]
1338 async fn test_manifest_list_v2_deserializer_aliases() {
1339 let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro";
1341 let bs_1 = fs::read(avro_1_path).unwrap();
1342 let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await;
1343 assert_eq!(
1344 avro_1_fields,
1345 "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_data_files_count, existing_data_files_count, deleted_data_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1346 );
1347 let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro";
1349 let bs_2 = fs::read(avro_2_path).unwrap();
1350 let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await;
1351 assert_eq!(
1352 avro_2_fields,
1353 "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_files_count, existing_files_count, deleted_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1354 );
1355 let _manifest_list_1 =
1357 ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2).unwrap();
1358 let _manifest_list_2 =
1359 ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2).unwrap();
1360 }
1361
1362 async fn read_avro_schema_fields_as_str(bs: Vec<u8>) -> String {
1363 let reader = Reader::new(&bs[..]).unwrap();
1364 let schema = reader.writer_schema();
1365 let fields: String = match schema {
1366 Schema::Record(record) => record
1367 .fields
1368 .iter()
1369 .map(|field| field.name.clone())
1370 .collect::<Vec<String>>()
1371 .join(", "),
1372 _ => "".to_string(),
1373 };
1374 fields
1375 }
1376
1377 #[test]
1378 fn test_manifest_content_type_default() {
1379 assert_eq!(ManifestContentType::default(), ManifestContentType::Data);
1380 }
1381
1382 #[test]
1383 fn test_manifest_content_type_default_value() {
1384 assert_eq!(ManifestContentType::default() as i32, 0);
1385 }
1386
1387 #[test]
1388 fn test_manifest_file_v1_to_v2_projection() {
1389 use crate::spec::manifest_list::_serde::ManifestFileV1;
1390
1391 let v1_manifest = ManifestFileV1 {
1393 manifest_path: "/test/manifest.avro".to_string(),
1394 manifest_length: 5806,
1395 partition_spec_id: 0,
1396 added_snapshot_id: 1646658105718557341,
1397 added_data_files_count: Some(3),
1398 existing_data_files_count: Some(0),
1399 deleted_data_files_count: Some(0),
1400 added_rows_count: Some(3),
1401 existing_rows_count: Some(0),
1402 deleted_rows_count: Some(0),
1403 partitions: None,
1404 key_metadata: None,
1405 };
1406
1407 let v2_manifest: ManifestFile = v1_manifest.try_into().unwrap();
1409
1410 assert_eq!(
1412 v2_manifest.content,
1413 ManifestContentType::Data,
1414 "V1 manifest content should default to Data (0)"
1415 );
1416 assert_eq!(
1417 v2_manifest.sequence_number, 0,
1418 "V1 manifest sequence_number should default to 0"
1419 );
1420 assert_eq!(
1421 v2_manifest.min_sequence_number, 0,
1422 "V1 manifest min_sequence_number should default to 0"
1423 );
1424
1425 assert_eq!(v2_manifest.manifest_path, "/test/manifest.avro");
1427 assert_eq!(v2_manifest.manifest_length, 5806);
1428 assert_eq!(v2_manifest.partition_spec_id, 0);
1429 assert_eq!(v2_manifest.added_snapshot_id, 1646658105718557341);
1430 assert_eq!(v2_manifest.added_files_count, Some(3));
1431 assert_eq!(v2_manifest.existing_files_count, Some(0));
1432 assert_eq!(v2_manifest.deleted_files_count, Some(0));
1433 assert_eq!(v2_manifest.added_rows_count, Some(3));
1434 assert_eq!(v2_manifest.existing_rows_count, Some(0));
1435 assert_eq!(v2_manifest.deleted_rows_count, Some(0));
1436 assert_eq!(v2_manifest.partitions, None);
1437 assert_eq!(v2_manifest.key_metadata, None);
1438 }
1439}