iceberg/spec/manifest/data_file.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::str::FromStr;
21
22use apache_avro::{Reader as AvroReader, Writer as AvroWriter, from_value, to_value};
23use serde_derive::{Deserialize, Serialize};
24use serde_with::{DeserializeFromStr, SerializeDisplay};
25
26use super::_serde::DataFileSerde;
27use super::{
28 Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, data_file_schema_v3,
29};
30use crate::error::Result;
31use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
32use crate::{Error, ErrorKind};
33
34/// Data file carries data file path, partition tuple, metrics, …
35#[derive(Debug, PartialEq, Clone, Eq, Builder)]
36pub struct DataFile {
37 /// field id: 134
38 ///
39 /// Type of content stored by the data file: data, equality deletes,
40 /// or position deletes (all v1 files are data files)
41 pub(crate) content: DataContentType,
42 /// field id: 100
43 ///
44 /// Full URI for the file with FS scheme
45 pub(crate) file_path: String,
46 /// field id: 101
47 ///
48 /// String file format name, `avro`, `orc`, `parquet`, or `puffin`
49 pub(crate) file_format: DataFileFormat,
50 /// field id: 102
51 ///
52 /// Partition data tuple, schema based on the partition spec output using
53 /// partition field ids for the struct field ids
54 #[builder(default = "Struct::empty()")]
55 pub(crate) partition: Struct,
56 /// field id: 103
57 ///
58 /// Number of records in this file, or the cardinality of a deletion vector
59 pub(crate) record_count: u64,
60 /// field id: 104
61 ///
62 /// Total file size in bytes
63 pub(crate) file_size_in_bytes: u64,
64 /// field id: 108
65 /// key field id: 117
66 /// value field id: 118
67 ///
68 /// Map from column id to the total size on disk of all regions that
69 /// store the column. Does not include bytes necessary to read other
70 /// columns, like footers. Leave null for row-oriented formats (Avro)
71 #[builder(default)]
72 pub(crate) column_sizes: HashMap<i32, u64>,
73 /// field id: 109
74 /// key field id: 119
75 /// value field id: 120
76 ///
77 /// Map from column id to number of values in the column (including null
78 /// and NaN values)
79 #[builder(default)]
80 pub(crate) value_counts: HashMap<i32, u64>,
81 /// field id: 110
82 /// key field id: 121
83 /// value field id: 122
84 ///
85 /// Map from column id to number of null values in the column
86 #[builder(default)]
87 pub(crate) null_value_counts: HashMap<i32, u64>,
88 /// field id: 137
89 /// key field id: 138
90 /// value field id: 139
91 ///
92 /// Map from column id to number of NaN values in the column
93 #[builder(default)]
94 pub(crate) nan_value_counts: HashMap<i32, u64>,
95 /// field id: 125
96 /// key field id: 126
97 /// value field id: 127
98 ///
99 /// Map from column id to lower bound in the column serialized as binary.
100 /// Each value must be less than or equal to all non-null, non-NaN values
101 /// in the column for the file.
102 ///
103 /// Reference:
104 ///
105 /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
106 #[builder(default)]
107 pub(crate) lower_bounds: HashMap<i32, Datum>,
108 /// field id: 128
109 /// key field id: 129
110 /// value field id: 130
111 ///
112 /// Map from column id to upper bound in the column serialized as binary.
113 /// Each value must be greater than or equal to all non-null, non-Nan
114 /// values in the column for the file.
115 ///
116 /// Reference:
117 ///
118 /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
119 #[builder(default)]
120 pub(crate) upper_bounds: HashMap<i32, Datum>,
121 /// field id: 131
122 ///
123 /// Implementation-specific key metadata for encryption
124 #[builder(default)]
125 pub(crate) key_metadata: Option<Vec<u8>>,
126 /// field id: 132
127 /// element field id: 133
128 ///
129 /// Split offsets for the data file. For example, all row group offsets
130 /// in a Parquet file. Must be sorted ascending. Optional field that
131 /// should be serialized as null when not present.
132 #[builder(default)]
133 pub(crate) split_offsets: Option<Vec<i64>>,
134 /// field id: 135
135 /// element field id: 136
136 ///
137 /// Field ids used to determine row equality in equality delete files.
138 /// Required when content is EqualityDeletes and should be null
139 /// otherwise. Fields with ids listed in this column must be present
140 /// in the delete file
141 #[builder(default)]
142 pub(crate) equality_ids: Option<Vec<i32>>,
143 /// field id: 140
144 ///
145 /// ID representing sort order for this file.
146 ///
147 /// If sort order ID is missing or unknown, then the order is assumed to
148 /// be unsorted. Only data files and equality delete files should be
149 /// written with a non-null order id. Position deletes are required to be
150 /// sorted by file and position, not a table order, and should set sort
151 /// order id to null. Readers must ignore sort order id for position
152 /// delete files.
153 #[builder(default, setter(strip_option))]
154 pub(crate) sort_order_id: Option<i32>,
155 /// field id: 142
156 ///
157 /// The _row_id for the first row in the data file.
158 /// For more details, refer to https://github.com/apache/iceberg/blob/main/format/spec.md#first-row-id-inheritance
159 #[builder(default)]
160 pub(crate) first_row_id: Option<i64>,
161 /// This field is not included in spec. It is just store in memory representation used
162 /// in process.
163 #[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
164 pub(crate) partition_spec_id: i32,
165 /// field id: 143
166 ///
167 /// Fully qualified location (URI with FS scheme) of a data file that all deletes reference.
168 /// Position delete metadata can use `referenced_data_file` when all deletes tracked by the
169 /// entry are in a single data file. Setting the referenced file is required for deletion vectors.
170 #[builder(default)]
171 pub(crate) referenced_data_file: Option<String>,
172 /// field: 144
173 ///
174 /// The offset in the file where the content starts.
175 /// The `content_offset` and `content_size_in_bytes` fields are used to reference a specific blob
176 /// for direct access to a deletion vector. For deletion vectors, these values are required and must
177 /// exactly match the `offset` and `length` stored in the Puffin footer for the deletion vector blob.
178 #[builder(default)]
179 pub(crate) content_offset: Option<i64>,
180 /// field: 145
181 ///
182 /// The length of a referenced content stored in the file; required if `content_offset` is present
183 #[builder(default)]
184 pub(crate) content_size_in_bytes: Option<i64>,
185}
186
187impl DataFile {
188 /// Get the content type of the data file (data, equality deletes, or position deletes)
189 pub fn content_type(&self) -> DataContentType {
190 self.content
191 }
192 /// Get the file path as full URI with FS scheme
193 pub fn file_path(&self) -> &str {
194 &self.file_path
195 }
196 /// Get the file format of the file (avro, orc or parquet).
197 pub fn file_format(&self) -> DataFileFormat {
198 self.file_format
199 }
200 /// Get the partition values of the file.
201 pub fn partition(&self) -> &Struct {
202 &self.partition
203 }
204 /// Get the record count in the data file.
205 pub fn record_count(&self) -> u64 {
206 self.record_count
207 }
208 /// Get the file size in bytes.
209 pub fn file_size_in_bytes(&self) -> u64 {
210 self.file_size_in_bytes
211 }
212 /// Get the column sizes.
213 /// Map from column id to the total size on disk of all regions that
214 /// store the column. Does not include bytes necessary to read other
215 /// columns, like footers. Null for row-oriented formats (Avro)
216 pub fn column_sizes(&self) -> &HashMap<i32, u64> {
217 &self.column_sizes
218 }
219 /// Get the columns value counts for the data file.
220 /// Map from column id to number of values in the column (including null
221 /// and NaN values)
222 pub fn value_counts(&self) -> &HashMap<i32, u64> {
223 &self.value_counts
224 }
225 /// Get the null value counts of the data file.
226 /// Map from column id to number of null values in the column
227 pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
228 &self.null_value_counts
229 }
230 /// Get the nan value counts of the data file.
231 /// Map from column id to number of NaN values in the column
232 pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
233 &self.nan_value_counts
234 }
235 /// Get the lower bounds of the data file values per column.
236 /// Map from column id to lower bound in the column serialized as binary.
237 pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
238 &self.lower_bounds
239 }
240 /// Get the upper bounds of the data file values per column.
241 /// Map from column id to upper bound in the column serialized as binary.
242 pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
243 &self.upper_bounds
244 }
245 /// Get the Implementation-specific key metadata for the data file.
246 pub fn key_metadata(&self) -> Option<&[u8]> {
247 self.key_metadata.as_deref()
248 }
249 /// Get the split offsets of the data file.
250 /// For example, all row group offsets in a Parquet file.
251 /// Returns `None` if no split offsets are present.
252 pub fn split_offsets(&self) -> Option<&[i64]> {
253 self.split_offsets.as_deref()
254 }
255 /// Get the equality ids of the data file.
256 /// Field ids used to determine row equality in equality delete files.
257 /// null when content is not EqualityDeletes.
258 pub fn equality_ids(&self) -> Option<Vec<i32>> {
259 self.equality_ids.clone()
260 }
261 /// Get the first row id in the data file.
262 pub fn first_row_id(&self) -> Option<i64> {
263 self.first_row_id
264 }
265 /// Get the sort order id of the data file.
266 /// Only data files and equality delete files should be
267 /// written with a non-null order id. Position deletes are required to be
268 /// sorted by file and position, not a table order, and should set sort
269 /// order id to null. Readers must ignore sort order id for position
270 /// delete files.
271 pub fn sort_order_id(&self) -> Option<i32> {
272 self.sort_order_id
273 }
274 /// Get the fully qualified referenced location for the corresponding data file.
275 /// Positional delete files could have the field set, and deletion vectors must the field set.
276 pub fn referenced_data_file(&self) -> Option<String> {
277 self.referenced_data_file.clone()
278 }
279 /// Get the offset in the file where the blob content starts.
280 /// Only meaningful for puffin blobs, and required for deletion vectors.
281 pub fn content_offset(&self) -> Option<i64> {
282 self.content_offset
283 }
284 /// Get the length of a puffin blob.
285 /// Only meaningful for puffin blobs, and required for deletion vectors.
286 pub fn content_size_in_bytes(&self) -> Option<i64> {
287 self.content_size_in_bytes
288 }
289}
290
291/// Convert data files to avro bytes and write to writer.
292/// Return the bytes written.
293pub fn write_data_files_to_avro<W: Write>(
294 writer: &mut W,
295 data_files: impl IntoIterator<Item = DataFile>,
296 partition_type: &StructType,
297 version: FormatVersion,
298) -> Result<usize> {
299 let avro_schema = match version {
300 FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
301 FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
302 FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
303 };
304 let mut writer = AvroWriter::new(&avro_schema, writer);
305
306 for data_file in data_files {
307 let value = to_value(DataFileSerde::try_from(
308 data_file,
309 partition_type,
310 FormatVersion::V1,
311 )?)?
312 .resolve(&avro_schema)?;
313 writer.append(value)?;
314 }
315
316 Ok(writer.flush()?)
317}
318
319/// Parse data files from avro bytes.
320pub fn read_data_files_from_avro<R: Read>(
321 reader: &mut R,
322 schema: &Schema,
323 partition_spec_id: i32,
324 partition_type: &StructType,
325 version: FormatVersion,
326) -> Result<Vec<DataFile>> {
327 let avro_schema = match version {
328 FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
329 FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
330 FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
331 };
332
333 let reader = AvroReader::with_schema(&avro_schema, reader)?;
334 reader
335 .into_iter()
336 .map(|value| {
337 from_value::<DataFileSerde>(&value?)?.try_into(
338 partition_spec_id,
339 partition_type,
340 schema,
341 )
342 })
343 .collect::<Result<Vec<_>>>()
344}
345
346/// Type of content stored by the data file: data, equality deletes, or
347/// position deletes (all v1 files are data files)
348#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
349pub enum DataContentType {
350 /// value: 0
351 #[default]
352 Data = 0,
353 /// value: 1
354 PositionDeletes = 1,
355 /// value: 2
356 EqualityDeletes = 2,
357}
358
359impl TryFrom<i32> for DataContentType {
360 type Error = Error;
361
362 fn try_from(v: i32) -> Result<DataContentType> {
363 match v {
364 0 => Ok(DataContentType::Data),
365 1 => Ok(DataContentType::PositionDeletes),
366 2 => Ok(DataContentType::EqualityDeletes),
367 _ => Err(Error::new(
368 ErrorKind::DataInvalid,
369 format!("data content type {v} is invalid"),
370 )),
371 }
372 }
373}
374
375/// Format of this data.
376#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
377pub enum DataFileFormat {
378 /// Avro file format: <https://avro.apache.org/>
379 Avro,
380 /// Orc file format: <https://orc.apache.org/>
381 Orc,
382 /// Parquet file format: <https://parquet.apache.org/>
383 Parquet,
384 /// Puffin file format: <https://iceberg.apache.org/puffin-spec/>
385 Puffin,
386}
387
388impl FromStr for DataFileFormat {
389 type Err = Error;
390
391 fn from_str(s: &str) -> Result<Self> {
392 match s.to_lowercase().as_str() {
393 "avro" => Ok(Self::Avro),
394 "orc" => Ok(Self::Orc),
395 "parquet" => Ok(Self::Parquet),
396 "puffin" => Ok(Self::Puffin),
397 _ => Err(Error::new(
398 ErrorKind::DataInvalid,
399 format!("Unsupported data file format: {s}"),
400 )),
401 }
402 }
403}
404
405impl std::fmt::Display for DataFileFormat {
406 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
407 match self {
408 DataFileFormat::Avro => write!(f, "avro"),
409 DataFileFormat::Orc => write!(f, "orc"),
410 DataFileFormat::Parquet => write!(f, "parquet"),
411 DataFileFormat::Puffin => write!(f, "puffin"),
412 }
413 }
414}
415
416#[cfg(test)]
417mod test {
418 use crate::spec::DataContentType;
419 #[test]
420 fn test_data_content_type_default() {
421 assert_eq!(DataContentType::default(), DataContentType::Data);
422 }
423
424 #[test]
425 fn test_data_content_type_default_value() {
426 assert_eq!(DataContentType::default() as i32, 0);
427 }
428}