1use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27 UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34 DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35 ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, UNASSIGNED_SNAPSHOT_ID,
36};
37use crate::{Error, ErrorKind};
38
39pub struct ManifestWriterBuilder {
41 output: OutputFile,
42 snapshot_id: Option<i64>,
43 key_metadata: Option<Vec<u8>>,
44 schema: SchemaRef,
45 partition_spec: PartitionSpec,
46}
47
48impl ManifestWriterBuilder {
49 pub fn new(
51 output: OutputFile,
52 snapshot_id: Option<i64>,
53 key_metadata: Option<Vec<u8>>,
54 schema: SchemaRef,
55 partition_spec: PartitionSpec,
56 ) -> Self {
57 Self {
58 output,
59 snapshot_id,
60 key_metadata,
61 schema,
62 partition_spec,
63 }
64 }
65
66 pub fn build_v1(self) -> ManifestWriter {
68 let metadata = ManifestMetadata::builder()
69 .schema_id(self.schema.schema_id())
70 .schema(self.schema)
71 .partition_spec(self.partition_spec)
72 .format_version(FormatVersion::V1)
73 .content(ManifestContentType::Data)
74 .build();
75 ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
76 }
77
78 pub fn build_v2_data(self) -> ManifestWriter {
80 let metadata = ManifestMetadata::builder()
81 .schema_id(self.schema.schema_id())
82 .schema(self.schema)
83 .partition_spec(self.partition_spec)
84 .format_version(FormatVersion::V2)
85 .content(ManifestContentType::Data)
86 .build();
87 ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
88 }
89
90 pub fn build_v2_deletes(self) -> ManifestWriter {
92 let metadata = ManifestMetadata::builder()
93 .schema_id(self.schema.schema_id())
94 .schema(self.schema)
95 .partition_spec(self.partition_spec)
96 .format_version(FormatVersion::V2)
97 .content(ManifestContentType::Deletes)
98 .build();
99 ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
100 }
101}
102
103pub struct ManifestWriter {
105 output: OutputFile,
106
107 snapshot_id: Option<i64>,
108
109 added_files: u32,
110 added_rows: u64,
111 existing_files: u32,
112 existing_rows: u64,
113 deleted_files: u32,
114 deleted_rows: u64,
115
116 min_seq_num: Option<i64>,
117
118 key_metadata: Option<Vec<u8>>,
119
120 manifest_entries: Vec<ManifestEntry>,
121
122 metadata: ManifestMetadata,
123}
124
125impl ManifestWriter {
126 pub(crate) fn new(
128 output: OutputFile,
129 snapshot_id: Option<i64>,
130 key_metadata: Option<Vec<u8>>,
131 metadata: ManifestMetadata,
132 ) -> Self {
133 Self {
134 output,
135 snapshot_id,
136 added_files: 0,
137 added_rows: 0,
138 existing_files: 0,
139 existing_rows: 0,
140 deleted_files: 0,
141 deleted_rows: 0,
142 min_seq_num: None,
143 key_metadata,
144 manifest_entries: Vec::new(),
145 metadata,
146 }
147 }
148
149 fn construct_partition_summaries(
150 &mut self,
151 partition_type: &StructType,
152 ) -> Result<Vec<FieldSummary>> {
153 let mut field_stats: Vec<_> = partition_type
154 .fields()
155 .iter()
156 .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
157 .collect();
158 for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
159 for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
160 let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
161 stat.update(primitive_literal)?;
162 }
163 }
164 Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
165 }
166
167 fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
168 match self.metadata.content {
169 ManifestContentType::Data => {
170 if data_file.content != DataContentType::Data {
171 return Err(Error::new(
172 ErrorKind::DataInvalid,
173 format!(
174 "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
175 data_file.file_path(),
176 data_file.content
177 ),
178 ));
179 }
180 }
181 ManifestContentType::Deletes => {
182 if data_file.content != DataContentType::EqualityDeletes
183 && data_file.content != DataContentType::PositionDeletes
184 {
185 return Err(Error::new(
186 ErrorKind::DataInvalid,
187 format!(
188 "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
189 data_file.file_path(),
190 data_file.content
191 ),
192 ));
193 }
194 }
195 }
196 Ok(())
197 }
198
199 pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
205 self.check_data_file(&entry.data_file)?;
206 if entry.sequence_number().is_some_and(|n| n >= 0) {
207 entry.status = ManifestStatus::Added;
208 entry.snapshot_id = self.snapshot_id;
209 entry.file_sequence_number = None;
210 } else {
211 entry.status = ManifestStatus::Added;
212 entry.snapshot_id = self.snapshot_id;
213 entry.sequence_number = None;
214 entry.file_sequence_number = None;
215 };
216 self.add_entry_inner(entry)?;
217 Ok(())
218 }
219
220 pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
224 self.check_data_file(&data_file)?;
225 let entry = ManifestEntry {
226 status: ManifestStatus::Added,
227 snapshot_id: self.snapshot_id,
228 sequence_number: (sequence_number >= 0).then_some(sequence_number),
229 file_sequence_number: None,
230 data_file,
231 };
232 self.add_entry_inner(entry)?;
233 Ok(())
234 }
235
236 #[allow(dead_code)]
243 pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
244 self.check_data_file(&entry.data_file)?;
245 entry.status = ManifestStatus::Deleted;
246 entry.snapshot_id = self.snapshot_id;
247 self.add_entry_inner(entry)?;
248 Ok(())
249 }
250
251 pub fn add_delete_file(
255 &mut self,
256 data_file: DataFile,
257 sequence_number: i64,
258 file_sequence_number: Option<i64>,
259 ) -> Result<()> {
260 self.check_data_file(&data_file)?;
261 let entry = ManifestEntry {
262 status: ManifestStatus::Deleted,
263 snapshot_id: self.snapshot_id,
264 sequence_number: Some(sequence_number),
265 file_sequence_number,
266 data_file,
267 };
268 self.add_entry_inner(entry)?;
269 Ok(())
270 }
271
272 #[allow(dead_code)]
278 pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
279 self.check_data_file(&entry.data_file)?;
280 entry.status = ManifestStatus::Existing;
281 self.add_entry_inner(entry)?;
282 Ok(())
283 }
284
285 pub fn add_existing_file(
288 &mut self,
289 data_file: DataFile,
290 snapshot_id: i64,
291 sequence_number: i64,
292 file_sequence_number: Option<i64>,
293 ) -> Result<()> {
294 self.check_data_file(&data_file)?;
295 let entry = ManifestEntry {
296 status: ManifestStatus::Existing,
297 snapshot_id: Some(snapshot_id),
298 sequence_number: Some(sequence_number),
299 file_sequence_number,
300 data_file,
301 };
302 self.add_entry_inner(entry)?;
303 Ok(())
304 }
305
306 fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
307 if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
309 && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
310 {
311 return Err(Error::new(
312 ErrorKind::DataInvalid,
313 "Manifest entry with status Existing or Deleted should have sequence number",
314 ));
315 }
316
317 match entry.status {
319 ManifestStatus::Added => {
320 self.added_files += 1;
321 self.added_rows += entry.data_file.record_count;
322 }
323 ManifestStatus::Deleted => {
324 self.deleted_files += 1;
325 self.deleted_rows += entry.data_file.record_count;
326 }
327 ManifestStatus::Existing => {
328 self.existing_files += 1;
329 self.existing_rows += entry.data_file.record_count;
330 }
331 }
332 if entry.is_alive() {
333 if let Some(seq_num) = entry.sequence_number {
334 self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
335 }
336 }
337 self.manifest_entries.push(entry);
338 Ok(())
339 }
340
341 pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
343 let partition_type = self
345 .metadata
346 .partition_spec
347 .partition_type(&self.metadata.schema)?;
348 let table_schema = &self.metadata.schema;
349 let avro_schema = match self.metadata.format_version {
350 FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
351 FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
352 };
353 let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
354 avro_writer.add_user_metadata(
355 "schema".to_string(),
356 to_vec(table_schema).map_err(|err| {
357 Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
358 .with_source(err)
359 })?,
360 )?;
361 avro_writer.add_user_metadata(
362 "schema-id".to_string(),
363 table_schema.schema_id().to_string(),
364 )?;
365 avro_writer.add_user_metadata(
366 "partition-spec".to_string(),
367 to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
368 Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
369 .with_source(err)
370 })?,
371 )?;
372 avro_writer.add_user_metadata(
373 "partition-spec-id".to_string(),
374 self.metadata.partition_spec.spec_id().to_string(),
375 )?;
376 avro_writer.add_user_metadata(
377 "format-version".to_string(),
378 (self.metadata.format_version as u8).to_string(),
379 )?;
380 if self.metadata.format_version == FormatVersion::V2 {
381 avro_writer
382 .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
383 }
384
385 let partition_summary = self.construct_partition_summaries(&partition_type)?;
386 for entry in std::mem::take(&mut self.manifest_entries) {
388 let value = match self.metadata.format_version {
389 FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
390 .resolve(&avro_schema)?,
391 FormatVersion::V2 => to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
392 .resolve(&avro_schema)?,
393 };
394
395 avro_writer.append(value)?;
396 }
397
398 let content = avro_writer.into_inner()?;
399 let length = content.len();
400 self.output.write(Bytes::from(content)).await?;
401
402 Ok(ManifestFile {
403 manifest_path: self.output.location().to_string(),
404 manifest_length: length as i64,
405 partition_spec_id: self.metadata.partition_spec.spec_id(),
406 content: self.metadata.content,
407 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
410 min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
411 added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
412 added_files_count: Some(self.added_files),
413 existing_files_count: Some(self.existing_files),
414 deleted_files_count: Some(self.deleted_files),
415 added_rows_count: Some(self.added_rows),
416 existing_rows_count: Some(self.existing_rows),
417 deleted_rows_count: Some(self.deleted_rows),
418 partitions: Some(partition_summary),
419 key_metadata: self.key_metadata,
420 })
421 }
422}
423
424struct PartitionFieldStats {
425 partition_type: PrimitiveType,
426
427 contains_null: bool,
428 contains_nan: Option<bool>,
429 lower_bound: Option<Datum>,
430 upper_bound: Option<Datum>,
431}
432
433impl PartitionFieldStats {
434 pub(crate) fn new(partition_type: PrimitiveType) -> Self {
435 Self {
436 partition_type,
437 contains_null: false,
438 contains_nan: Some(false),
439 upper_bound: None,
440 lower_bound: None,
441 }
442 }
443
444 pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
445 let Some(value) = value else {
446 self.contains_null = true;
447 return Ok(());
448 };
449 if !self.partition_type.compatible(&value) {
450 return Err(Error::new(
451 ErrorKind::DataInvalid,
452 "value is not compatible with type",
453 ));
454 }
455 let value = Datum::new(self.partition_type.clone(), value);
456
457 if value.is_nan() {
458 self.contains_nan = Some(true);
459 return Ok(());
460 }
461
462 self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
463 if value < original {
464 value.clone()
465 } else {
466 original
467 }
468 }));
469 self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
470 if value > original { value } else { original }
471 }));
472
473 Ok(())
474 }
475
476 pub(crate) fn finish(self) -> FieldSummary {
477 FieldSummary {
478 contains_null: self.contains_null,
479 contains_nan: self.contains_nan,
480 upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
481 lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
482 }
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use std::collections::HashMap;
489 use std::fs;
490 use std::sync::Arc;
491
492 use tempfile::TempDir;
493
494 use super::*;
495 use crate::io::FileIOBuilder;
496 use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
497
498 #[tokio::test]
499 async fn test_add_delete_existing() {
500 let schema = Arc::new(
501 Schema::builder()
502 .with_fields(vec![
503 Arc::new(NestedField::optional(
504 1,
505 "id",
506 Type::Primitive(PrimitiveType::Int),
507 )),
508 Arc::new(NestedField::optional(
509 2,
510 "name",
511 Type::Primitive(PrimitiveType::String),
512 )),
513 ])
514 .build()
515 .unwrap(),
516 );
517 let metadata = ManifestMetadata {
518 schema_id: 0,
519 schema: schema.clone(),
520 partition_spec: PartitionSpec::builder(schema)
521 .with_spec_id(0)
522 .build()
523 .unwrap(),
524 content: ManifestContentType::Data,
525 format_version: FormatVersion::V2,
526 };
527 let mut entries = vec![
528 ManifestEntry {
529 status: ManifestStatus::Added,
530 snapshot_id: None,
531 sequence_number: Some(1),
532 file_sequence_number: Some(1),
533 data_file: DataFile {
534 content: DataContentType::Data,
535 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
536 file_format: DataFileFormat::Parquet,
537 partition: Struct::empty(),
538 record_count: 1,
539 file_size_in_bytes: 5442,
540 column_sizes: HashMap::from([(1, 61), (2, 73)]),
541 value_counts: HashMap::from([(1, 1), (2, 1)]),
542 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
543 nan_value_counts: HashMap::new(),
544 lower_bounds: HashMap::new(),
545 upper_bounds: HashMap::new(),
546 key_metadata: Some(Vec::new()),
547 split_offsets: vec![4],
548 equality_ids: None,
549 sort_order_id: None,
550 partition_spec_id: 0,
551 first_row_id: None,
552 referenced_data_file: None,
553 content_offset: None,
554 content_size_in_bytes: None,
555 },
556 },
557 ManifestEntry {
558 status: ManifestStatus::Deleted,
559 snapshot_id: Some(1),
560 sequence_number: Some(1),
561 file_sequence_number: Some(1),
562 data_file: DataFile {
563 content: DataContentType::Data,
564 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
565 file_format: DataFileFormat::Parquet,
566 partition: Struct::empty(),
567 record_count: 1,
568 file_size_in_bytes: 5442,
569 column_sizes: HashMap::from([(1, 61), (2, 73)]),
570 value_counts: HashMap::from([(1, 1), (2, 1)]),
571 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
572 nan_value_counts: HashMap::new(),
573 lower_bounds: HashMap::new(),
574 upper_bounds: HashMap::new(),
575 key_metadata: Some(Vec::new()),
576 split_offsets: vec![4],
577 equality_ids: None,
578 sort_order_id: None,
579 partition_spec_id: 0,
580 first_row_id: None,
581 referenced_data_file: None,
582 content_offset: None,
583 content_size_in_bytes: None,
584 },
585 },
586 ManifestEntry {
587 status: ManifestStatus::Existing,
588 snapshot_id: Some(1),
589 sequence_number: Some(1),
590 file_sequence_number: Some(1),
591 data_file: DataFile {
592 content: DataContentType::Data,
593 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
594 file_format: DataFileFormat::Parquet,
595 partition: Struct::empty(),
596 record_count: 1,
597 file_size_in_bytes: 5442,
598 column_sizes: HashMap::from([(1, 61), (2, 73)]),
599 value_counts: HashMap::from([(1, 1), (2, 1)]),
600 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
601 nan_value_counts: HashMap::new(),
602 lower_bounds: HashMap::new(),
603 upper_bounds: HashMap::new(),
604 key_metadata: Some(Vec::new()),
605 split_offsets: vec![4],
606 equality_ids: None,
607 sort_order_id: None,
608 partition_spec_id: 0,
609 first_row_id: None,
610 referenced_data_file: None,
611 content_offset: None,
612 content_size_in_bytes: None,
613 },
614 },
615 ];
616
617 let tmp_dir = TempDir::new().unwrap();
619 let path = tmp_dir.path().join("test_manifest.avro");
620 let io = FileIOBuilder::new_fs_io().build().unwrap();
621 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
622 let mut writer = ManifestWriterBuilder::new(
623 output_file,
624 Some(3),
625 None,
626 metadata.schema.clone(),
627 metadata.partition_spec.clone(),
628 )
629 .build_v2_data();
630 writer.add_entry(entries[0].clone()).unwrap();
631 writer.add_delete_entry(entries[1].clone()).unwrap();
632 writer.add_existing_entry(entries[2].clone()).unwrap();
633 writer.write_manifest_file().await.unwrap();
634
635 let actual_manifest =
637 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
638 .unwrap();
639
640 entries[0].snapshot_id = Some(3);
642 entries[1].snapshot_id = Some(3);
643 entries[0].file_sequence_number = None;
645 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
646 }
647}