iceberg/spec/manifest/data_file.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::str::FromStr;
21
22use apache_avro::{Reader as AvroReader, Writer as AvroWriter, from_value, to_value};
23use serde_derive::{Deserialize, Serialize};
24use serde_with::{DeserializeFromStr, SerializeDisplay};
25
26use super::_serde::DataFileSerde;
27use super::{Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2};
28use crate::error::Result;
29use crate::spec::{Struct, StructType};
30use crate::{Error, ErrorKind};
31
32/// Data file carries data file path, partition tuple, metrics, …
33#[derive(Debug, PartialEq, Clone, Eq, Builder)]
34pub struct DataFile {
35 /// field id: 134
36 ///
37 /// Type of content stored by the data file: data, equality deletes,
38 /// or position deletes (all v1 files are data files)
39 pub(crate) content: DataContentType,
40 /// field id: 100
41 ///
42 /// Full URI for the file with FS scheme
43 pub(crate) file_path: String,
44 /// field id: 101
45 ///
46 /// String file format name, `avro`, `orc`, `parquet`, or `puffin`
47 pub(crate) file_format: DataFileFormat,
48 /// field id: 102
49 ///
50 /// Partition data tuple, schema based on the partition spec output using
51 /// partition field ids for the struct field ids
52 pub(crate) partition: Struct,
53 /// field id: 103
54 ///
55 /// Number of records in this file, or the cardinality of a deletion vector
56 pub(crate) record_count: u64,
57 /// field id: 104
58 ///
59 /// Total file size in bytes
60 pub(crate) file_size_in_bytes: u64,
61 /// field id: 108
62 /// key field id: 117
63 /// value field id: 118
64 ///
65 /// Map from column id to the total size on disk of all regions that
66 /// store the column. Does not include bytes necessary to read other
67 /// columns, like footers. Leave null for row-oriented formats (Avro)
68 #[builder(default)]
69 pub(crate) column_sizes: HashMap<i32, u64>,
70 /// field id: 109
71 /// key field id: 119
72 /// value field id: 120
73 ///
74 /// Map from column id to number of values in the column (including null
75 /// and NaN values)
76 #[builder(default)]
77 pub(crate) value_counts: HashMap<i32, u64>,
78 /// field id: 110
79 /// key field id: 121
80 /// value field id: 122
81 ///
82 /// Map from column id to number of null values in the column
83 #[builder(default)]
84 pub(crate) null_value_counts: HashMap<i32, u64>,
85 /// field id: 137
86 /// key field id: 138
87 /// value field id: 139
88 ///
89 /// Map from column id to number of NaN values in the column
90 #[builder(default)]
91 pub(crate) nan_value_counts: HashMap<i32, u64>,
92 /// field id: 125
93 /// key field id: 126
94 /// value field id: 127
95 ///
96 /// Map from column id to lower bound in the column serialized as binary.
97 /// Each value must be less than or equal to all non-null, non-NaN values
98 /// in the column for the file.
99 ///
100 /// Reference:
101 ///
102 /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
103 #[builder(default)]
104 pub(crate) lower_bounds: HashMap<i32, Datum>,
105 /// field id: 128
106 /// key field id: 129
107 /// value field id: 130
108 ///
109 /// Map from column id to upper bound in the column serialized as binary.
110 /// Each value must be greater than or equal to all non-null, non-Nan
111 /// values in the column for the file.
112 ///
113 /// Reference:
114 ///
115 /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
116 #[builder(default)]
117 pub(crate) upper_bounds: HashMap<i32, Datum>,
118 /// field id: 131
119 ///
120 /// Implementation-specific key metadata for encryption
121 #[builder(default)]
122 pub(crate) key_metadata: Option<Vec<u8>>,
123 /// field id: 132
124 /// element field id: 133
125 ///
126 /// Split offsets for the data file. For example, all row group offsets
127 /// in a Parquet file. Must be sorted ascending
128 #[builder(default)]
129 pub(crate) split_offsets: Vec<i64>,
130 /// field id: 135
131 /// element field id: 136
132 ///
133 /// Field ids used to determine row equality in equality delete files.
134 /// Required when content is EqualityDeletes and should be null
135 /// otherwise. Fields with ids listed in this column must be present
136 /// in the delete file
137 #[builder(default)]
138 pub(crate) equality_ids: Option<Vec<i32>>,
139 /// field id: 140
140 ///
141 /// ID representing sort order for this file.
142 ///
143 /// If sort order ID is missing or unknown, then the order is assumed to
144 /// be unsorted. Only data files and equality delete files should be
145 /// written with a non-null order id. Position deletes are required to be
146 /// sorted by file and position, not a table order, and should set sort
147 /// order id to null. Readers must ignore sort order id for position
148 /// delete files.
149 #[builder(default, setter(strip_option))]
150 pub(crate) sort_order_id: Option<i32>,
151 /// field id: 142
152 ///
153 /// The _row_id for the first row in the data file.
154 /// For more details, refer to https://github.com/apache/iceberg/blob/main/format/spec.md#first-row-id-inheritance
155 #[builder(default)]
156 pub(crate) first_row_id: Option<i64>,
157 /// This field is not included in spec. It is just store in memory representation used
158 /// in process.
159 pub(crate) partition_spec_id: i32,
160 /// field id: 143
161 ///
162 /// Fully qualified location (URI with FS scheme) of a data file that all deletes reference.
163 /// Position delete metadata can use `referenced_data_file` when all deletes tracked by the
164 /// entry are in a single data file. Setting the referenced file is required for deletion vectors.
165 #[builder(default)]
166 pub(crate) referenced_data_file: Option<String>,
167 /// field: 144
168 ///
169 /// The offset in the file where the content starts.
170 /// The `content_offset` and `content_size_in_bytes` fields are used to reference a specific blob
171 /// for direct access to a deletion vector. For deletion vectors, these values are required and must
172 /// exactly match the `offset` and `length` stored in the Puffin footer for the deletion vector blob.
173 #[builder(default)]
174 pub(crate) content_offset: Option<i64>,
175 /// field: 145
176 ///
177 /// The length of a referenced content stored in the file; required if `content_offset` is present
178 #[builder(default)]
179 pub(crate) content_size_in_bytes: Option<i64>,
180}
181
182impl DataFile {
183 /// Get the content type of the data file (data, equality deletes, or position deletes)
184 pub fn content_type(&self) -> DataContentType {
185 self.content
186 }
187 /// Get the file path as full URI with FS scheme
188 pub fn file_path(&self) -> &str {
189 &self.file_path
190 }
191 /// Get the file format of the file (avro, orc or parquet).
192 pub fn file_format(&self) -> DataFileFormat {
193 self.file_format
194 }
195 /// Get the partition values of the file.
196 pub fn partition(&self) -> &Struct {
197 &self.partition
198 }
199 /// Get the record count in the data file.
200 pub fn record_count(&self) -> u64 {
201 self.record_count
202 }
203 /// Get the file size in bytes.
204 pub fn file_size_in_bytes(&self) -> u64 {
205 self.file_size_in_bytes
206 }
207 /// Get the column sizes.
208 /// Map from column id to the total size on disk of all regions that
209 /// store the column. Does not include bytes necessary to read other
210 /// columns, like footers. Null for row-oriented formats (Avro)
211 pub fn column_sizes(&self) -> &HashMap<i32, u64> {
212 &self.column_sizes
213 }
214 /// Get the columns value counts for the data file.
215 /// Map from column id to number of values in the column (including null
216 /// and NaN values)
217 pub fn value_counts(&self) -> &HashMap<i32, u64> {
218 &self.value_counts
219 }
220 /// Get the null value counts of the data file.
221 /// Map from column id to number of null values in the column
222 pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
223 &self.null_value_counts
224 }
225 /// Get the nan value counts of the data file.
226 /// Map from column id to number of NaN values in the column
227 pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
228 &self.nan_value_counts
229 }
230 /// Get the lower bounds of the data file values per column.
231 /// Map from column id to lower bound in the column serialized as binary.
232 pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
233 &self.lower_bounds
234 }
235 /// Get the upper bounds of the data file values per column.
236 /// Map from column id to upper bound in the column serialized as binary.
237 pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
238 &self.upper_bounds
239 }
240 /// Get the Implementation-specific key metadata for the data file.
241 pub fn key_metadata(&self) -> Option<&[u8]> {
242 self.key_metadata.as_deref()
243 }
244 /// Get the split offsets of the data file.
245 /// For example, all row group offsets in a Parquet file.
246 pub fn split_offsets(&self) -> &[i64] {
247 &self.split_offsets
248 }
249 /// Get the equality ids of the data file.
250 /// Field ids used to determine row equality in equality delete files.
251 /// null when content is not EqualityDeletes.
252 pub fn equality_ids(&self) -> Option<Vec<i32>> {
253 self.equality_ids.clone()
254 }
255 /// Get the first row id in the data file.
256 pub fn first_row_id(&self) -> Option<i64> {
257 self.first_row_id
258 }
259 /// Get the sort order id of the data file.
260 /// Only data files and equality delete files should be
261 /// written with a non-null order id. Position deletes are required to be
262 /// sorted by file and position, not a table order, and should set sort
263 /// order id to null. Readers must ignore sort order id for position
264 /// delete files.
265 pub fn sort_order_id(&self) -> Option<i32> {
266 self.sort_order_id
267 }
268 /// Get the fully qualified referenced location for the corresponding data file.
269 /// Positional delete files could have the field set, and deletion vectors must the field set.
270 pub fn referenced_data_file(&self) -> Option<String> {
271 self.referenced_data_file.clone()
272 }
273 /// Get the offset in the file where the blob content starts.
274 /// Only meaningful for puffin blobs, and required for deletion vectors.
275 pub fn content_offset(&self) -> Option<i64> {
276 self.content_offset
277 }
278 /// Get the length of a puffin blob.
279 /// Only meaningful for puffin blobs, and required for deletion vectors.
280 pub fn content_size_in_bytes(&self) -> Option<i64> {
281 self.content_size_in_bytes
282 }
283}
284
285/// Convert data files to avro bytes and write to writer.
286/// Return the bytes written.
287pub fn write_data_files_to_avro<W: Write>(
288 writer: &mut W,
289 data_files: impl IntoIterator<Item = DataFile>,
290 partition_type: &StructType,
291 version: FormatVersion,
292) -> Result<usize> {
293 let avro_schema = match version {
294 FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
295 FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
296 };
297 let mut writer = AvroWriter::new(&avro_schema, writer);
298
299 for data_file in data_files {
300 let value = to_value(DataFileSerde::try_from(
301 data_file,
302 partition_type,
303 FormatVersion::V1,
304 )?)?
305 .resolve(&avro_schema)?;
306 writer.append(value)?;
307 }
308
309 Ok(writer.flush()?)
310}
311
312/// Parse data files from avro bytes.
313pub fn read_data_files_from_avro<R: Read>(
314 reader: &mut R,
315 schema: &Schema,
316 partition_spec_id: i32,
317 partition_type: &StructType,
318 version: FormatVersion,
319) -> Result<Vec<DataFile>> {
320 let avro_schema = match version {
321 FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
322 FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
323 };
324
325 let reader = AvroReader::with_schema(&avro_schema, reader)?;
326 reader
327 .into_iter()
328 .map(|value| {
329 from_value::<DataFileSerde>(&value?)?.try_into(
330 partition_spec_id,
331 partition_type,
332 schema,
333 )
334 })
335 .collect::<Result<Vec<_>>>()
336}
337
338/// Type of content stored by the data file: data, equality deletes, or
339/// position deletes (all v1 files are data files)
340#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
341pub enum DataContentType {
342 /// value: 0
343 #[default]
344 Data = 0,
345 /// value: 1
346 PositionDeletes = 1,
347 /// value: 2
348 EqualityDeletes = 2,
349}
350
351impl TryFrom<i32> for DataContentType {
352 type Error = Error;
353
354 fn try_from(v: i32) -> Result<DataContentType> {
355 match v {
356 0 => Ok(DataContentType::Data),
357 1 => Ok(DataContentType::PositionDeletes),
358 2 => Ok(DataContentType::EqualityDeletes),
359 _ => Err(Error::new(
360 ErrorKind::DataInvalid,
361 format!("data content type {} is invalid", v),
362 )),
363 }
364 }
365}
366
367/// Format of this data.
368#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
369pub enum DataFileFormat {
370 /// Avro file format: <https://avro.apache.org/>
371 Avro,
372 /// Orc file format: <https://orc.apache.org/>
373 Orc,
374 /// Parquet file format: <https://parquet.apache.org/>
375 Parquet,
376 /// Puffin file format: <https://iceberg.apache.org/puffin-spec/>
377 Puffin,
378}
379
380impl FromStr for DataFileFormat {
381 type Err = Error;
382
383 fn from_str(s: &str) -> Result<Self> {
384 match s.to_lowercase().as_str() {
385 "avro" => Ok(Self::Avro),
386 "orc" => Ok(Self::Orc),
387 "parquet" => Ok(Self::Parquet),
388 "puffin" => Ok(Self::Puffin),
389 _ => Err(Error::new(
390 ErrorKind::DataInvalid,
391 format!("Unsupported data file format: {}", s),
392 )),
393 }
394 }
395}
396
397impl std::fmt::Display for DataFileFormat {
398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399 match self {
400 DataFileFormat::Avro => write!(f, "avro"),
401 DataFileFormat::Orc => write!(f, "orc"),
402 DataFileFormat::Parquet => write!(f, "parquet"),
403 DataFileFormat::Puffin => write!(f, "puffin"),
404 }
405 }
406}
407
408#[cfg(test)]
409mod test {
410 use crate::spec::DataContentType;
411 #[test]
412 fn test_data_content_type_default() {
413 assert_eq!(DataContentType::default(), DataContentType::Data);
414 }
415
416 #[test]
417 fn test_data_content_type_default_value() {
418 assert_eq!(DataContentType::default() as i32, 0);
419 }
420}