iceberg/spec/manifest/
data_file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::str::FromStr;
21
22use apache_avro::{Reader as AvroReader, Writer as AvroWriter, from_value, to_value};
23use serde_derive::{Deserialize, Serialize};
24use serde_with::{DeserializeFromStr, SerializeDisplay};
25
26use super::_serde::DataFileSerde;
27use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2};
28use crate::error::Result;
29use crate::spec::{Struct, StructType};
30use crate::{Error, ErrorKind};
31
32/// Data file carries data file path, partition tuple, metrics, …
33#[derive(Debug, PartialEq, Clone, Eq, Builder)]
34pub struct DataFile {
35    /// field id: 134
36    ///
37    /// Type of content stored by the data file: data, equality deletes,
38    /// or position deletes (all v1 files are data files)
39    pub(crate) content: DataContentType,
40    /// field id: 100
41    ///
42    /// Full URI for the file with FS scheme
43    pub(crate) file_path: String,
44    /// field id: 101
45    ///
46    /// String file format name, `avro`, `orc`, `parquet`, or `puffin`
47    pub(crate) file_format: DataFileFormat,
48    /// field id: 102
49    ///
50    /// Partition data tuple, schema based on the partition spec output using
51    /// partition field ids for the struct field ids
52    pub(crate) partition: Struct,
53    /// field id: 103
54    ///
55    /// Number of records in this file, or the cardinality of a deletion vector
56    pub(crate) record_count: u64,
57    /// field id: 104
58    ///
59    /// Total file size in bytes
60    pub(crate) file_size_in_bytes: u64,
61    /// field id: 108
62    /// key field id: 117
63    /// value field id: 118
64    ///
65    /// Map from column id to the total size on disk of all regions that
66    /// store the column. Does not include bytes necessary to read other
67    /// columns, like footers. Leave null for row-oriented formats (Avro)
68    #[builder(default)]
69    pub(crate) column_sizes: HashMap<i32, u64>,
70    /// field id: 109
71    /// key field id: 119
72    /// value field id: 120
73    ///
74    /// Map from column id to number of values in the column (including null
75    /// and NaN values)
76    #[builder(default)]
77    pub(crate) value_counts: HashMap<i32, u64>,
78    /// field id: 110
79    /// key field id: 121
80    /// value field id: 122
81    ///
82    /// Map from column id to number of null values in the column
83    #[builder(default)]
84    pub(crate) null_value_counts: HashMap<i32, u64>,
85    /// field id: 137
86    /// key field id: 138
87    /// value field id: 139
88    ///
89    /// Map from column id to number of NaN values in the column
90    #[builder(default)]
91    pub(crate) nan_value_counts: HashMap<i32, u64>,
92    /// field id: 125
93    /// key field id: 126
94    /// value field id: 127
95    ///
96    /// Map from column id to lower bound in the column serialized as binary.
97    /// Each value must be less than or equal to all non-null, non-NaN values
98    /// in the column for the file.
99    ///
100    /// Reference:
101    ///
102    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
103    #[builder(default)]
104    pub(crate) lower_bounds: HashMap<i32, Datum>,
105    /// field id: 128
106    /// key field id: 129
107    /// value field id: 130
108    ///
109    /// Map from column id to upper bound in the column serialized as binary.
110    /// Each value must be greater than or equal to all non-null, non-Nan
111    /// values in the column for the file.
112    ///
113    /// Reference:
114    ///
115    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
116    #[builder(default)]
117    pub(crate) upper_bounds: HashMap<i32, Datum>,
118    /// field id: 131
119    ///
120    /// Implementation-specific key metadata for encryption
121    #[builder(default)]
122    pub(crate) key_metadata: Option<Vec<u8>>,
123    /// field id: 132
124    /// element field id: 133
125    ///
126    /// Split offsets for the data file. For example, all row group offsets
127    /// in a Parquet file. Must be sorted ascending
128    #[builder(default)]
129    pub(crate) split_offsets: Vec<i64>,
130    /// field id: 135
131    /// element field id: 136
132    ///
133    /// Field ids used to determine row equality in equality delete files.
134    /// Required when content is EqualityDeletes and should be null
135    /// otherwise. Fields with ids listed in this column must be present
136    /// in the delete file
137    #[builder(default)]
138    pub(crate) equality_ids: Option<Vec<i32>>,
139    /// field id: 140
140    ///
141    /// ID representing sort order for this file.
142    ///
143    /// If sort order ID is missing or unknown, then the order is assumed to
144    /// be unsorted. Only data files and equality delete files should be
145    /// written with a non-null order id. Position deletes are required to be
146    /// sorted by file and position, not a table order, and should set sort
147    /// order id to null. Readers must ignore sort order id for position
148    /// delete files.
149    #[builder(default, setter(strip_option))]
150    pub(crate) sort_order_id: Option<i32>,
151    /// field id: 142
152    ///
153    /// The _row_id for the first row in the data file.
154    /// For more details, refer to https://github.com/apache/iceberg/blob/main/format/spec.md#first-row-id-inheritance
155    #[builder(default)]
156    pub(crate) first_row_id: Option<i64>,
157    /// This field is not included in spec. It is just store in memory representation used
158    /// in process.
159    pub(crate) partition_spec_id: i32,
160    /// field id: 143
161    ///
162    /// Fully qualified location (URI with FS scheme) of a data file that all deletes reference.
163    /// Position delete metadata can use `referenced_data_file` when all deletes tracked by the
164    /// entry are in a single data file. Setting the referenced file is required for deletion vectors.
165    #[builder(default)]
166    pub(crate) referenced_data_file: Option<String>,
167    /// field: 144
168    ///
169    /// The offset in the file where the content starts.
170    /// The `content_offset` and `content_size_in_bytes` fields are used to reference a specific blob
171    /// for direct access to a deletion vector. For deletion vectors, these values are required and must
172    /// exactly match the `offset` and `length` stored in the Puffin footer for the deletion vector blob.
173    #[builder(default)]
174    pub(crate) content_offset: Option<i64>,
175    /// field: 145
176    ///
177    /// The length of a referenced content stored in the file; required if `content_offset` is present
178    #[builder(default)]
179    pub(crate) content_size_in_bytes: Option<i64>,
180}
181
182impl DataFile {
183    /// Get the content type of the data file (data, equality deletes, or position deletes)
184    pub fn content_type(&self) -> DataContentType {
185        self.content
186    }
187    /// Get the file path as full URI with FS scheme
188    pub fn file_path(&self) -> &str {
189        &self.file_path
190    }
191    /// Get the file format of the file (avro, orc or parquet).
192    pub fn file_format(&self) -> DataFileFormat {
193        self.file_format
194    }
195    /// Get the partition values of the file.
196    pub fn partition(&self) -> &Struct {
197        &self.partition
198    }
199    /// Get the record count in the data file.
200    pub fn record_count(&self) -> u64 {
201        self.record_count
202    }
203    /// Get the file size in bytes.
204    pub fn file_size_in_bytes(&self) -> u64 {
205        self.file_size_in_bytes
206    }
207    /// Get the column sizes.
208    /// Map from column id to the total size on disk of all regions that
209    /// store the column. Does not include bytes necessary to read other
210    /// columns, like footers. Null for row-oriented formats (Avro)
211    pub fn column_sizes(&self) -> &HashMap<i32, u64> {
212        &self.column_sizes
213    }
214    /// Get the columns value counts for the data file.
215    /// Map from column id to number of values in the column (including null
216    /// and NaN values)
217    pub fn value_counts(&self) -> &HashMap<i32, u64> {
218        &self.value_counts
219    }
220    /// Get the null value counts of the data file.
221    /// Map from column id to number of null values in the column
222    pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
223        &self.null_value_counts
224    }
225    /// Get the nan value counts of the data file.
226    /// Map from column id to number of NaN values in the column
227    pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
228        &self.nan_value_counts
229    }
230    /// Get the lower bounds of the data file values per column.
231    /// Map from column id to lower bound in the column serialized as binary.
232    pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
233        &self.lower_bounds
234    }
235    /// Get the upper bounds of the data file values per column.
236    /// Map from column id to upper bound in the column serialized as binary.
237    pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
238        &self.upper_bounds
239    }
240    /// Get the Implementation-specific key metadata for the data file.
241    pub fn key_metadata(&self) -> Option<&[u8]> {
242        self.key_metadata.as_deref()
243    }
244    /// Get the split offsets of the data file.
245    /// For example, all row group offsets in a Parquet file.
246    pub fn split_offsets(&self) -> &[i64] {
247        &self.split_offsets
248    }
249    /// Get the equality ids of the data file.
250    /// Field ids used to determine row equality in equality delete files.
251    /// null when content is not EqualityDeletes.
252    pub fn equality_ids(&self) -> Option<Vec<i32>> {
253        self.equality_ids.clone()
254    }
255    /// Get the first row id in the data file.
256    pub fn first_row_id(&self) -> Option<i64> {
257        self.first_row_id
258    }
259    /// Get the sort order id of the data file.
260    /// Only data files and equality delete files should be
261    /// written with a non-null order id. Position deletes are required to be
262    /// sorted by file and position, not a table order, and should set sort
263    /// order id to null. Readers must ignore sort order id for position
264    /// delete files.
265    pub fn sort_order_id(&self) -> Option<i32> {
266        self.sort_order_id
267    }
268    /// Get the fully qualified referenced location for the corresponding data file.
269    /// Positional delete files could have the field set, and deletion vectors must the field set.
270    pub fn referenced_data_file(&self) -> Option<String> {
271        self.referenced_data_file.clone()
272    }
273    /// Get the offset in the file where the blob content starts.
274    /// Only meaningful for puffin blobs, and required for deletion vectors.
275    pub fn content_offset(&self) -> Option<i64> {
276        self.content_offset
277    }
278    /// Get the length of a puffin blob.
279    /// Only meaningful for puffin blobs, and required for deletion vectors.
280    pub fn content_size_in_bytes(&self) -> Option<i64> {
281        self.content_size_in_bytes
282    }
283}
284
285/// Convert data files to avro bytes and write to writer.
286/// Return the bytes written.
287pub fn write_data_files_to_avro<W: Write>(
288    writer: &mut W,
289    data_files: impl IntoIterator<Item = DataFile>,
290    partition_type: &StructType,
291    version: FormatVersion,
292) -> Result<usize> {
293    let avro_schema = match version {
294        FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
295        FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
296    };
297    let mut writer = AvroWriter::new(&avro_schema, writer);
298
299    for data_file in data_files {
300        let value = to_value(DataFileSerde::try_from(
301            data_file,
302            partition_type,
303            FormatVersion::V1,
304        )?)?
305        .resolve(&avro_schema)?;
306        writer.append(value)?;
307    }
308
309    Ok(writer.flush()?)
310}
311
312/// Parse data files from avro bytes.
313pub fn read_data_files_from_avro<R: Read>(
314    reader: &mut R,
315    schema: &Schema,
316    partition_spec_id: i32,
317    partition_type: &StructType,
318    version: FormatVersion,
319) -> Result<Vec<DataFile>> {
320    let avro_schema = match version {
321        FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
322        FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
323    };
324
325    let reader = AvroReader::with_schema(&avro_schema, reader)?;
326    reader
327        .into_iter()
328        .map(|value| {
329            from_value::<DataFileSerde>(&value?)?.try_into(
330                partition_spec_id,
331                partition_type,
332                schema,
333            )
334        })
335        .collect::<Result<Vec<_>>>()
336}
337
338/// Type of content stored by the data file: data, equality deletes, or
339/// position deletes (all v1 files are data files)
340#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
341pub enum DataContentType {
342    /// value: 0
343    #[default]
344    Data = 0,
345    /// value: 1
346    PositionDeletes = 1,
347    /// value: 2
348    EqualityDeletes = 2,
349}
350
351impl TryFrom<i32> for DataContentType {
352    type Error = Error;
353
354    fn try_from(v: i32) -> Result<DataContentType> {
355        match v {
356            0 => Ok(DataContentType::Data),
357            1 => Ok(DataContentType::PositionDeletes),
358            2 => Ok(DataContentType::EqualityDeletes),
359            _ => Err(Error::new(
360                ErrorKind::DataInvalid,
361                format!("data content type {} is invalid", v),
362            )),
363        }
364    }
365}
366
367/// Format of this data.
368#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
369pub enum DataFileFormat {
370    /// Avro file format: <https://avro.apache.org/>
371    Avro,
372    /// Orc file format: <https://orc.apache.org/>
373    Orc,
374    /// Parquet file format: <https://parquet.apache.org/>
375    Parquet,
376    /// Puffin file format: <https://iceberg.apache.org/puffin-spec/>
377    Puffin,
378}
379
380impl FromStr for DataFileFormat {
381    type Err = Error;
382
383    fn from_str(s: &str) -> Result<Self> {
384        match s.to_lowercase().as_str() {
385            "avro" => Ok(Self::Avro),
386            "orc" => Ok(Self::Orc),
387            "parquet" => Ok(Self::Parquet),
388            "puffin" => Ok(Self::Puffin),
389            _ => Err(Error::new(
390                ErrorKind::DataInvalid,
391                format!("Unsupported data file format: {}", s),
392            )),
393        }
394    }
395}
396
397impl std::fmt::Display for DataFileFormat {
398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399        match self {
400            DataFileFormat::Avro => write!(f, "avro"),
401            DataFileFormat::Orc => write!(f, "orc"),
402            DataFileFormat::Parquet => write!(f, "parquet"),
403            DataFileFormat::Puffin => write!(f, "puffin"),
404        }
405    }
406}
407
408#[cfg(test)]
409mod test {
410    use crate::spec::DataContentType;
411    #[test]
412    fn test_data_content_type_default() {
413        assert_eq!(DataContentType::default(), DataContentType::Data);
414    }
415
416    #[test]
417    fn test_data_content_type_default_value() {
418        assert_eq!(DataContentType::default() as i32, 0);
419    }
420}