iceberg/spec/
snapshot.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Snapshots
20 */
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use _serde::SnapshotV2;
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use typed_builder::TypedBuilder;
28
29use super::table_metadata::SnapshotLog;
30use crate::error::{Result, timestamp_ms_to_utc};
31use crate::io::FileIO;
32use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
33use crate::{Error, ErrorKind};
34
35/// The ref name of the main branch of the table.
36pub const MAIN_BRANCH: &str = "main";
37/// Placeholder for snapshot ID. The field with this value must be replaced with the actual snapshot ID before it is committed.
38pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
39
40/// Reference to [`Snapshot`].
41pub type SnapshotRef = Arc<Snapshot>;
42#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
43#[serde(rename_all = "lowercase")]
44/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
45pub enum Operation {
46    /// Only data files were added and no files were removed.
47    Append,
48    /// Data and delete files were added and removed without changing table data;
49    /// i.e., compaction, changing the data file format, or relocating data files.
50    Replace,
51    /// Data and delete files were added and removed in a logical overwrite operation.
52    Overwrite,
53    /// Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
54    Delete,
55}
56
57impl Operation {
58    /// Returns the string representation (lowercase) of the operation.
59    pub fn as_str(&self) -> &str {
60        match self {
61            Operation::Append => "append",
62            Operation::Replace => "replace",
63            Operation::Overwrite => "overwrite",
64            Operation::Delete => "delete",
65        }
66    }
67}
68
69#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
70/// Summarises the changes in the snapshot.
71pub struct Summary {
72    /// The type of operation in the snapshot
73    pub operation: Operation,
74    /// Other summary data.
75    #[serde(flatten)]
76    pub additional_properties: HashMap<String, String>,
77}
78
79impl Default for Operation {
80    fn default() -> Operation {
81        Self::Append
82    }
83}
84
85#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
86#[serde(from = "SnapshotV2", into = "SnapshotV2")]
87#[builder(field_defaults(setter(prefix = "with_")))]
88/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.
89pub struct Snapshot {
90    /// A unique long ID
91    snapshot_id: i64,
92    /// The snapshot ID of the snapshot’s parent.
93    /// Omitted for any snapshot with no parent
94    #[builder(default = None)]
95    parent_snapshot_id: Option<i64>,
96    /// A monotonically increasing long that tracks the order of
97    /// changes to a table.
98    sequence_number: i64,
99    /// A timestamp when the snapshot was created, used for garbage
100    /// collection and table inspection
101    timestamp_ms: i64,
102    /// The location of a manifest list for this snapshot that
103    /// tracks manifest files with additional metadata.
104    /// Currently we only support manifest list file, and manifest files are not supported.
105    #[builder(setter(into))]
106    manifest_list: String,
107    /// A string map that summarizes the snapshot changes, including operation.
108    summary: Summary,
109    /// ID of the table’s current schema when the snapshot was created.
110    #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
111    schema_id: Option<SchemaId>,
112}
113
114impl Snapshot {
115    /// Get the id of the snapshot
116    #[inline]
117    pub fn snapshot_id(&self) -> i64 {
118        self.snapshot_id
119    }
120
121    /// Get parent snapshot id.
122    #[inline]
123    pub fn parent_snapshot_id(&self) -> Option<i64> {
124        self.parent_snapshot_id
125    }
126
127    /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables.
128    #[inline]
129    pub fn sequence_number(&self) -> i64 {
130        self.sequence_number
131    }
132    /// Get location of manifest_list file
133    #[inline]
134    pub fn manifest_list(&self) -> &str {
135        &self.manifest_list
136    }
137
138    /// Get summary of the snapshot
139    #[inline]
140    pub fn summary(&self) -> &Summary {
141        &self.summary
142    }
143    /// Get the timestamp of when the snapshot was created
144    #[inline]
145    pub fn timestamp(&self) -> Result<DateTime<Utc>> {
146        timestamp_ms_to_utc(self.timestamp_ms)
147    }
148
149    /// Get the timestamp of when the snapshot was created in milliseconds
150    #[inline]
151    pub fn timestamp_ms(&self) -> i64 {
152        self.timestamp_ms
153    }
154
155    /// Get the schema id of this snapshot.
156    #[inline]
157    pub fn schema_id(&self) -> Option<SchemaId> {
158        self.schema_id
159    }
160
161    /// Get the schema of this snapshot.
162    pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
163        Ok(match self.schema_id() {
164            Some(schema_id) => table_metadata
165                .schema_by_id(schema_id)
166                .ok_or_else(|| {
167                    Error::new(
168                        ErrorKind::DataInvalid,
169                        format!("Schema with id {} not found", schema_id),
170                    )
171                })?
172                .clone(),
173            None => table_metadata.current_schema().clone(),
174        })
175    }
176
177    /// Get parent snapshot.
178    #[cfg(test)]
179    pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
180        match self.parent_snapshot_id {
181            Some(id) => table_metadata.snapshot_by_id(id).cloned(),
182            None => None,
183        }
184    }
185
186    /// Load manifest list.
187    pub async fn load_manifest_list(
188        &self,
189        file_io: &FileIO,
190        table_metadata: &TableMetadata,
191    ) -> Result<ManifestList> {
192        let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
193        ManifestList::parse_with_version(
194            &manifest_list_content,
195            // TODO: You don't really need the version since you could just project any Avro in
196            // the version that you'd like to get (probably always the latest)
197            table_metadata.format_version(),
198        )
199    }
200
201    #[allow(dead_code)]
202    pub(crate) fn log(&self) -> SnapshotLog {
203        SnapshotLog {
204            timestamp_ms: self.timestamp_ms,
205            snapshot_id: self.snapshot_id,
206        }
207    }
208}
209
210pub(super) mod _serde {
211    /// This is a helper module that defines types to help with serialization/deserialization.
212    /// For deserialization the input first gets read into either the [SnapshotV1] or [SnapshotV2] struct
213    /// and then converted into the [Snapshot] struct. Serialization works the other way around.
214    /// [SnapshotV1] and [SnapshotV2] are internal struct that are only used for serialization and deserialization.
215    use std::collections::HashMap;
216
217    use serde::{Deserialize, Serialize};
218
219    use super::{Operation, Snapshot, Summary};
220    use crate::Error;
221    use crate::spec::SchemaId;
222
223    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
224    #[serde(rename_all = "kebab-case")]
225    /// Defines the structure of a v2 snapshot for serialization/deserialization
226    pub(crate) struct SnapshotV2 {
227        pub snapshot_id: i64,
228        #[serde(skip_serializing_if = "Option::is_none")]
229        pub parent_snapshot_id: Option<i64>,
230        pub sequence_number: i64,
231        pub timestamp_ms: i64,
232        pub manifest_list: String,
233        pub summary: Summary,
234        #[serde(skip_serializing_if = "Option::is_none")]
235        pub schema_id: Option<SchemaId>,
236    }
237
238    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
239    #[serde(rename_all = "kebab-case")]
240    /// Defines the structure of a v1 snapshot for serialization/deserialization
241    pub(crate) struct SnapshotV1 {
242        pub snapshot_id: i64,
243        #[serde(skip_serializing_if = "Option::is_none")]
244        pub parent_snapshot_id: Option<i64>,
245        pub timestamp_ms: i64,
246        #[serde(skip_serializing_if = "Option::is_none")]
247        pub manifest_list: Option<String>,
248        #[serde(skip_serializing_if = "Option::is_none")]
249        pub manifests: Option<Vec<String>>,
250        #[serde(skip_serializing_if = "Option::is_none")]
251        pub summary: Option<Summary>,
252        #[serde(skip_serializing_if = "Option::is_none")]
253        pub schema_id: Option<SchemaId>,
254    }
255
256    impl From<SnapshotV2> for Snapshot {
257        fn from(v2: SnapshotV2) -> Self {
258            Snapshot {
259                snapshot_id: v2.snapshot_id,
260                parent_snapshot_id: v2.parent_snapshot_id,
261                sequence_number: v2.sequence_number,
262                timestamp_ms: v2.timestamp_ms,
263                manifest_list: v2.manifest_list,
264                summary: v2.summary,
265                schema_id: v2.schema_id,
266            }
267        }
268    }
269
270    impl From<Snapshot> for SnapshotV2 {
271        fn from(v2: Snapshot) -> Self {
272            SnapshotV2 {
273                snapshot_id: v2.snapshot_id,
274                parent_snapshot_id: v2.parent_snapshot_id,
275                sequence_number: v2.sequence_number,
276                timestamp_ms: v2.timestamp_ms,
277                manifest_list: v2.manifest_list,
278                summary: v2.summary,
279                schema_id: v2.schema_id,
280            }
281        }
282    }
283
284    impl TryFrom<SnapshotV1> for Snapshot {
285        type Error = Error;
286
287        fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
288            Ok(Snapshot {
289                snapshot_id: v1.snapshot_id,
290                parent_snapshot_id: v1.parent_snapshot_id,
291                sequence_number: 0,
292                timestamp_ms: v1.timestamp_ms,
293                manifest_list: match (v1.manifest_list, v1.manifests) {
294                    (Some(file), None) => file,
295                    (Some(_), Some(_)) => "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted".to_string(),
296                    (None, _) => "Unsupported v1 snapshot, only manifest list is supported".to_string()
297                   },
298                summary: v1.summary.unwrap_or(Summary {
299                    operation: Operation::default(),
300                    additional_properties: HashMap::new(),
301                }),
302                schema_id: v1.schema_id,
303            })
304        }
305    }
306
307    impl From<Snapshot> for SnapshotV1 {
308        fn from(v2: Snapshot) -> Self {
309            SnapshotV1 {
310                snapshot_id: v2.snapshot_id,
311                parent_snapshot_id: v2.parent_snapshot_id,
312                timestamp_ms: v2.timestamp_ms,
313                manifest_list: Some(v2.manifest_list),
314                summary: Some(v2.summary),
315                schema_id: v2.schema_id,
316                manifests: None,
317            }
318        }
319    }
320}
321
322#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
323#[serde(rename_all = "kebab-case")]
324/// Iceberg tables keep track of branches and tags using snapshot references.
325pub struct SnapshotReference {
326    /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a branch.
327    pub snapshot_id: i64,
328    #[serde(flatten)]
329    /// Snapshot retention policy
330    pub retention: SnapshotRetention,
331}
332
333impl SnapshotReference {
334    /// Returns true if the snapshot reference is a branch.
335    pub fn is_branch(&self) -> bool {
336        matches!(self.retention, SnapshotRetention::Branch { .. })
337    }
338}
339
340impl SnapshotReference {
341    /// Create new snapshot reference
342    pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
343        SnapshotReference {
344            snapshot_id,
345            retention,
346        }
347    }
348}
349
350#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
351#[serde(rename_all = "lowercase", tag = "type")]
352/// The snapshot expiration procedure removes snapshots from table metadata and applies the table’s retention policy.
353pub enum SnapshotRetention {
354    #[serde(rename_all = "kebab-case")]
355    /// Branches are mutable named references that can be updated by committing a new snapshot as
356    /// the branch’s referenced snapshot using the Commit Conflict Resolution and Retry procedures.
357    Branch {
358        /// A positive number for the minimum number of snapshots to keep in a branch while expiring snapshots.
359        /// Defaults to table property history.expire.min-snapshots-to-keep.
360        #[serde(skip_serializing_if = "Option::is_none")]
361        min_snapshots_to_keep: Option<i32>,
362        /// A positive number for the max age of snapshots to keep when expiring, including the latest snapshot.
363        /// Defaults to table property history.expire.max-snapshot-age-ms.
364        #[serde(skip_serializing_if = "Option::is_none")]
365        max_snapshot_age_ms: Option<i64>,
366        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
367        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
368        #[serde(skip_serializing_if = "Option::is_none")]
369        max_ref_age_ms: Option<i64>,
370    },
371    #[serde(rename_all = "kebab-case")]
372    /// Tags are labels for individual snapshots.
373    Tag {
374        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
375        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
376        #[serde(skip_serializing_if = "Option::is_none")]
377        max_ref_age_ms: Option<i64>,
378    },
379}
380
381impl SnapshotRetention {
382    /// Create a new branch retention policy
383    pub fn branch(
384        min_snapshots_to_keep: Option<i32>,
385        max_snapshot_age_ms: Option<i64>,
386        max_ref_age_ms: Option<i64>,
387    ) -> Self {
388        SnapshotRetention::Branch {
389            min_snapshots_to_keep,
390            max_snapshot_age_ms,
391            max_ref_age_ms,
392        }
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use std::collections::HashMap;
399
400    use chrono::{TimeZone, Utc};
401
402    use crate::spec::snapshot::_serde::SnapshotV1;
403    use crate::spec::snapshot::{Operation, Snapshot, Summary};
404
405    #[test]
406    fn schema() {
407        let record = r#"
408        {
409            "snapshot-id": 3051729675574597004,
410            "timestamp-ms": 1515100955770,
411            "summary": {
412                "operation": "append"
413            },
414            "manifest-list": "s3://b/wh/.../s1.avro",
415            "schema-id": 0
416        }
417        "#;
418
419        let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
420            .unwrap()
421            .try_into()
422            .unwrap();
423        assert_eq!(3051729675574597004, result.snapshot_id());
424        assert_eq!(
425            Utc.timestamp_millis_opt(1515100955770).unwrap(),
426            result.timestamp().unwrap()
427        );
428        assert_eq!(1515100955770, result.timestamp_ms());
429        assert_eq!(
430            Summary {
431                operation: Operation::Append,
432                additional_properties: HashMap::new()
433            },
434            *result.summary()
435        );
436        assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
437    }
438
439    #[test]
440    fn test_snapshot_v1_to_v2_projection() {
441        use crate::spec::snapshot::_serde::SnapshotV1;
442
443        // Create a V1 snapshot (without sequence-number field)
444        let v1_snapshot = SnapshotV1 {
445            snapshot_id: 1234567890,
446            parent_snapshot_id: Some(987654321),
447            timestamp_ms: 1515100955770,
448            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
449            manifests: None, // V1 can have either manifest_list or manifests, but not both
450            summary: Some(Summary {
451                operation: Operation::Append,
452                additional_properties: HashMap::from([
453                    ("added-files".to_string(), "5".to_string()),
454                    ("added-records".to_string(), "100".to_string()),
455                ]),
456            }),
457            schema_id: Some(1),
458        };
459
460        // Convert V1 to V2 - this should apply defaults for missing V2 fields
461        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
462
463        // Verify V1→V2 projection defaults are applied correctly
464        assert_eq!(
465            v2_snapshot.sequence_number(),
466            0,
467            "V1 snapshot sequence_number should default to 0"
468        );
469
470        // Verify other fields are preserved correctly during conversion
471        assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
472        assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
473        assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
474        assert_eq!(
475            v2_snapshot.manifest_list(),
476            "s3://bucket/manifest-list.avro"
477        );
478        assert_eq!(v2_snapshot.schema_id(), Some(1));
479        assert_eq!(v2_snapshot.summary().operation, Operation::Append);
480        assert_eq!(
481            v2_snapshot
482                .summary()
483                .additional_properties
484                .get("added-files"),
485            Some(&"5".to_string())
486        );
487    }
488
489    #[test]
490    fn test_snapshot_v1_to_v2_with_missing_summary() {
491        use crate::spec::snapshot::_serde::SnapshotV1;
492
493        // Create a V1 snapshot without summary (should get default)
494        let v1_snapshot = SnapshotV1 {
495            snapshot_id: 1111111111,
496            parent_snapshot_id: None,
497            timestamp_ms: 1515100955770,
498            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
499            manifests: None,
500            summary: None, // V1 summary is optional
501            schema_id: None,
502        };
503
504        // Convert V1 to V2 - this should apply default summary
505        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
506
507        // Verify defaults are applied correctly
508        assert_eq!(
509            v2_snapshot.sequence_number(),
510            0,
511            "V1 snapshot sequence_number should default to 0"
512        );
513        assert_eq!(
514            v2_snapshot.summary().operation,
515            Operation::Append,
516            "Missing V1 summary should default to Append operation"
517        );
518        assert!(
519            v2_snapshot.summary().additional_properties.is_empty(),
520            "Default summary should have empty additional_properties"
521        );
522
523        // Verify other fields
524        assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
525        assert_eq!(v2_snapshot.parent_snapshot_id(), None);
526        assert_eq!(v2_snapshot.schema_id(), None);
527    }
528}