iceberg/spec/manifest/
data_file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::str::FromStr;
21
22use apache_avro::{Reader as AvroReader, Writer as AvroWriter, from_value, to_value};
23use serde_derive::{Deserialize, Serialize};
24use serde_with::{DeserializeFromStr, SerializeDisplay};
25
26use super::_serde::DataFileSerde;
27use super::{
28    Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, data_file_schema_v3,
29};
30use crate::error::Result;
31use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
32use crate::{Error, ErrorKind};
33
34/// Data file carries data file path, partition tuple, metrics, …
35#[derive(Debug, PartialEq, Clone, Eq, Builder)]
36pub struct DataFile {
37    /// field id: 134
38    ///
39    /// Type of content stored by the data file: data, equality deletes,
40    /// or position deletes (all v1 files are data files)
41    pub(crate) content: DataContentType,
42    /// field id: 100
43    ///
44    /// Full URI for the file with FS scheme
45    pub(crate) file_path: String,
46    /// field id: 101
47    ///
48    /// String file format name, `avro`, `orc`, `parquet`, or `puffin`
49    pub(crate) file_format: DataFileFormat,
50    /// field id: 102
51    ///
52    /// Partition data tuple, schema based on the partition spec output using
53    /// partition field ids for the struct field ids
54    #[builder(default = "Struct::empty()")]
55    pub(crate) partition: Struct,
56    /// field id: 103
57    ///
58    /// Number of records in this file, or the cardinality of a deletion vector
59    pub(crate) record_count: u64,
60    /// field id: 104
61    ///
62    /// Total file size in bytes
63    pub(crate) file_size_in_bytes: u64,
64    /// field id: 108
65    /// key field id: 117
66    /// value field id: 118
67    ///
68    /// Map from column id to the total size on disk of all regions that
69    /// store the column. Does not include bytes necessary to read other
70    /// columns, like footers. Leave null for row-oriented formats (Avro)
71    #[builder(default)]
72    pub(crate) column_sizes: HashMap<i32, u64>,
73    /// field id: 109
74    /// key field id: 119
75    /// value field id: 120
76    ///
77    /// Map from column id to number of values in the column (including null
78    /// and NaN values)
79    #[builder(default)]
80    pub(crate) value_counts: HashMap<i32, u64>,
81    /// field id: 110
82    /// key field id: 121
83    /// value field id: 122
84    ///
85    /// Map from column id to number of null values in the column
86    #[builder(default)]
87    pub(crate) null_value_counts: HashMap<i32, u64>,
88    /// field id: 137
89    /// key field id: 138
90    /// value field id: 139
91    ///
92    /// Map from column id to number of NaN values in the column
93    #[builder(default)]
94    pub(crate) nan_value_counts: HashMap<i32, u64>,
95    /// field id: 125
96    /// key field id: 126
97    /// value field id: 127
98    ///
99    /// Map from column id to lower bound in the column serialized as binary.
100    /// Each value must be less than or equal to all non-null, non-NaN values
101    /// in the column for the file.
102    ///
103    /// Reference:
104    ///
105    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
106    #[builder(default)]
107    pub(crate) lower_bounds: HashMap<i32, Datum>,
108    /// field id: 128
109    /// key field id: 129
110    /// value field id: 130
111    ///
112    /// Map from column id to upper bound in the column serialized as binary.
113    /// Each value must be greater than or equal to all non-null, non-Nan
114    /// values in the column for the file.
115    ///
116    /// Reference:
117    ///
118    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
119    #[builder(default)]
120    pub(crate) upper_bounds: HashMap<i32, Datum>,
121    /// field id: 131
122    ///
123    /// Implementation-specific key metadata for encryption
124    #[builder(default)]
125    pub(crate) key_metadata: Option<Vec<u8>>,
126    /// field id: 132
127    /// element field id: 133
128    ///
129    /// Split offsets for the data file. For example, all row group offsets
130    /// in a Parquet file. Must be sorted ascending. Optional field that
131    /// should be serialized as null when not present.
132    #[builder(default)]
133    pub(crate) split_offsets: Option<Vec<i64>>,
134    /// field id: 135
135    /// element field id: 136
136    ///
137    /// Field ids used to determine row equality in equality delete files.
138    /// Required when content is EqualityDeletes and should be null
139    /// otherwise. Fields with ids listed in this column must be present
140    /// in the delete file
141    #[builder(default)]
142    pub(crate) equality_ids: Option<Vec<i32>>,
143    /// field id: 140
144    ///
145    /// ID representing sort order for this file.
146    ///
147    /// If sort order ID is missing or unknown, then the order is assumed to
148    /// be unsorted. Only data files and equality delete files should be
149    /// written with a non-null order id. Position deletes are required to be
150    /// sorted by file and position, not a table order, and should set sort
151    /// order id to null. Readers must ignore sort order id for position
152    /// delete files.
153    #[builder(default, setter(strip_option))]
154    pub(crate) sort_order_id: Option<i32>,
155    /// field id: 142
156    ///
157    /// The _row_id for the first row in the data file.
158    /// For more details, refer to https://github.com/apache/iceberg/blob/main/format/spec.md#first-row-id-inheritance
159    #[builder(default)]
160    pub(crate) first_row_id: Option<i64>,
161    /// This field is not included in spec. It is just store in memory representation used
162    /// in process.
163    #[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
164    pub(crate) partition_spec_id: i32,
165    /// field id: 143
166    ///
167    /// Fully qualified location (URI with FS scheme) of a data file that all deletes reference.
168    /// Position delete metadata can use `referenced_data_file` when all deletes tracked by the
169    /// entry are in a single data file. Setting the referenced file is required for deletion vectors.
170    #[builder(default)]
171    pub(crate) referenced_data_file: Option<String>,
172    /// field: 144
173    ///
174    /// The offset in the file where the content starts.
175    /// The `content_offset` and `content_size_in_bytes` fields are used to reference a specific blob
176    /// for direct access to a deletion vector. For deletion vectors, these values are required and must
177    /// exactly match the `offset` and `length` stored in the Puffin footer for the deletion vector blob.
178    #[builder(default)]
179    pub(crate) content_offset: Option<i64>,
180    /// field: 145
181    ///
182    /// The length of a referenced content stored in the file; required if `content_offset` is present
183    #[builder(default)]
184    pub(crate) content_size_in_bytes: Option<i64>,
185}
186
187impl DataFile {
188    /// Get the content type of the data file (data, equality deletes, or position deletes)
189    pub fn content_type(&self) -> DataContentType {
190        self.content
191    }
192    /// Get the file path as full URI with FS scheme
193    pub fn file_path(&self) -> &str {
194        &self.file_path
195    }
196    /// Get the file format of the file (avro, orc or parquet).
197    pub fn file_format(&self) -> DataFileFormat {
198        self.file_format
199    }
200    /// Get the partition values of the file.
201    pub fn partition(&self) -> &Struct {
202        &self.partition
203    }
204    /// Get the record count in the data file.
205    pub fn record_count(&self) -> u64 {
206        self.record_count
207    }
208    /// Get the file size in bytes.
209    pub fn file_size_in_bytes(&self) -> u64 {
210        self.file_size_in_bytes
211    }
212    /// Get the column sizes.
213    /// Map from column id to the total size on disk of all regions that
214    /// store the column. Does not include bytes necessary to read other
215    /// columns, like footers. Null for row-oriented formats (Avro)
216    pub fn column_sizes(&self) -> &HashMap<i32, u64> {
217        &self.column_sizes
218    }
219    /// Get the columns value counts for the data file.
220    /// Map from column id to number of values in the column (including null
221    /// and NaN values)
222    pub fn value_counts(&self) -> &HashMap<i32, u64> {
223        &self.value_counts
224    }
225    /// Get the null value counts of the data file.
226    /// Map from column id to number of null values in the column
227    pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
228        &self.null_value_counts
229    }
230    /// Get the nan value counts of the data file.
231    /// Map from column id to number of NaN values in the column
232    pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
233        &self.nan_value_counts
234    }
235    /// Get the lower bounds of the data file values per column.
236    /// Map from column id to lower bound in the column serialized as binary.
237    pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
238        &self.lower_bounds
239    }
240    /// Get the upper bounds of the data file values per column.
241    /// Map from column id to upper bound in the column serialized as binary.
242    pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
243        &self.upper_bounds
244    }
245    /// Get the Implementation-specific key metadata for the data file.
246    pub fn key_metadata(&self) -> Option<&[u8]> {
247        self.key_metadata.as_deref()
248    }
249    /// Get the split offsets of the data file.
250    /// For example, all row group offsets in a Parquet file.
251    /// Returns `None` if no split offsets are present.
252    pub fn split_offsets(&self) -> Option<&[i64]> {
253        self.split_offsets.as_deref()
254    }
255    /// Get the equality ids of the data file.
256    /// Field ids used to determine row equality in equality delete files.
257    /// null when content is not EqualityDeletes.
258    pub fn equality_ids(&self) -> Option<Vec<i32>> {
259        self.equality_ids.clone()
260    }
261    /// Get the first row id in the data file.
262    pub fn first_row_id(&self) -> Option<i64> {
263        self.first_row_id
264    }
265    /// Get the sort order id of the data file.
266    /// Only data files and equality delete files should be
267    /// written with a non-null order id. Position deletes are required to be
268    /// sorted by file and position, not a table order, and should set sort
269    /// order id to null. Readers must ignore sort order id for position
270    /// delete files.
271    pub fn sort_order_id(&self) -> Option<i32> {
272        self.sort_order_id
273    }
274    /// Get the fully qualified referenced location for the corresponding data file.
275    /// Positional delete files could have the field set, and deletion vectors must the field set.
276    pub fn referenced_data_file(&self) -> Option<String> {
277        self.referenced_data_file.clone()
278    }
279    /// Get the offset in the file where the blob content starts.
280    /// Only meaningful for puffin blobs, and required for deletion vectors.
281    pub fn content_offset(&self) -> Option<i64> {
282        self.content_offset
283    }
284    /// Get the length of a puffin blob.
285    /// Only meaningful for puffin blobs, and required for deletion vectors.
286    pub fn content_size_in_bytes(&self) -> Option<i64> {
287        self.content_size_in_bytes
288    }
289}
290
291/// Convert data files to avro bytes and write to writer.
292/// Return the bytes written.
293pub fn write_data_files_to_avro<W: Write>(
294    writer: &mut W,
295    data_files: impl IntoIterator<Item = DataFile>,
296    partition_type: &StructType,
297    version: FormatVersion,
298) -> Result<usize> {
299    let avro_schema = match version {
300        FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
301        FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
302        FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
303    };
304    let mut writer = AvroWriter::new(&avro_schema, writer);
305
306    for data_file in data_files {
307        let value = to_value(DataFileSerde::try_from(
308            data_file,
309            partition_type,
310            FormatVersion::V1,
311        )?)?
312        .resolve(&avro_schema)?;
313        writer.append(value)?;
314    }
315
316    Ok(writer.flush()?)
317}
318
319/// Parse data files from avro bytes.
320pub fn read_data_files_from_avro<R: Read>(
321    reader: &mut R,
322    schema: &Schema,
323    partition_spec_id: i32,
324    partition_type: &StructType,
325    version: FormatVersion,
326) -> Result<Vec<DataFile>> {
327    let avro_schema = match version {
328        FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
329        FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
330        FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
331    };
332
333    let reader = AvroReader::with_schema(&avro_schema, reader)?;
334    reader
335        .into_iter()
336        .map(|value| {
337            from_value::<DataFileSerde>(&value?)?.try_into(
338                partition_spec_id,
339                partition_type,
340                schema,
341            )
342        })
343        .collect::<Result<Vec<_>>>()
344}
345
346/// Type of content stored by the data file: data, equality deletes, or
347/// position deletes (all v1 files are data files)
348#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
349pub enum DataContentType {
350    /// value: 0
351    #[default]
352    Data = 0,
353    /// value: 1
354    PositionDeletes = 1,
355    /// value: 2
356    EqualityDeletes = 2,
357}
358
359impl TryFrom<i32> for DataContentType {
360    type Error = Error;
361
362    fn try_from(v: i32) -> Result<DataContentType> {
363        match v {
364            0 => Ok(DataContentType::Data),
365            1 => Ok(DataContentType::PositionDeletes),
366            2 => Ok(DataContentType::EqualityDeletes),
367            _ => Err(Error::new(
368                ErrorKind::DataInvalid,
369                format!("data content type {v} is invalid"),
370            )),
371        }
372    }
373}
374
375/// Format of this data.
376#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
377pub enum DataFileFormat {
378    /// Avro file format: <https://avro.apache.org/>
379    Avro,
380    /// Orc file format: <https://orc.apache.org/>
381    Orc,
382    /// Parquet file format: <https://parquet.apache.org/>
383    Parquet,
384    /// Puffin file format: <https://iceberg.apache.org/puffin-spec/>
385    Puffin,
386}
387
388impl FromStr for DataFileFormat {
389    type Err = Error;
390
391    fn from_str(s: &str) -> Result<Self> {
392        match s.to_lowercase().as_str() {
393            "avro" => Ok(Self::Avro),
394            "orc" => Ok(Self::Orc),
395            "parquet" => Ok(Self::Parquet),
396            "puffin" => Ok(Self::Puffin),
397            _ => Err(Error::new(
398                ErrorKind::DataInvalid,
399                format!("Unsupported data file format: {s}"),
400            )),
401        }
402    }
403}
404
405impl std::fmt::Display for DataFileFormat {
406    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
407        match self {
408            DataFileFormat::Avro => write!(f, "avro"),
409            DataFileFormat::Orc => write!(f, "orc"),
410            DataFileFormat::Parquet => write!(f, "parquet"),
411            DataFileFormat::Puffin => write!(f, "puffin"),
412        }
413    }
414}
415
416#[cfg(test)]
417mod test {
418    use crate::spec::DataContentType;
419    #[test]
420    fn test_data_content_type_default() {
421        assert_eq!(DataContentType::default(), DataContentType::Data);
422    }
423
424    #[test]
425    fn test_data_content_type_default_value() {
426        assert_eq!(DataContentType::default() as i32, 0);
427    }
428}