1use std::collections::HashMap;
22use std::sync::Arc;
23
24use _serde::SnapshotV2;
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use typed_builder::TypedBuilder;
28
29use super::table_metadata::SnapshotLog;
30use crate::error::{Result, timestamp_ms_to_utc};
31use crate::io::FileIO;
32use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
33use crate::{Error, ErrorKind};
34
35pub const MAIN_BRANCH: &str = "main";
37pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
39
40pub type SnapshotRef = Arc<Snapshot>;
42#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
43#[serde(rename_all = "lowercase")]
44pub enum Operation {
46 Append,
48 Replace,
51 Overwrite,
53 Delete,
55}
56
57impl Operation {
58 pub fn as_str(&self) -> &str {
60 match self {
61 Operation::Append => "append",
62 Operation::Replace => "replace",
63 Operation::Overwrite => "overwrite",
64 Operation::Delete => "delete",
65 }
66 }
67}
68
69#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
70pub struct Summary {
72 pub operation: Operation,
74 #[serde(flatten)]
76 pub additional_properties: HashMap<String, String>,
77}
78
79impl Default for Operation {
80 fn default() -> Operation {
81 Self::Append
82 }
83}
84
85#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
86#[serde(from = "SnapshotV2", into = "SnapshotV2")]
87#[builder(field_defaults(setter(prefix = "with_")))]
88pub struct Snapshot {
90 snapshot_id: i64,
92 #[builder(default = None)]
95 parent_snapshot_id: Option<i64>,
96 sequence_number: i64,
99 timestamp_ms: i64,
102 #[builder(setter(into))]
106 manifest_list: String,
107 summary: Summary,
109 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
111 schema_id: Option<SchemaId>,
112}
113
114impl Snapshot {
115 #[inline]
117 pub fn snapshot_id(&self) -> i64 {
118 self.snapshot_id
119 }
120
121 #[inline]
123 pub fn parent_snapshot_id(&self) -> Option<i64> {
124 self.parent_snapshot_id
125 }
126
127 #[inline]
129 pub fn sequence_number(&self) -> i64 {
130 self.sequence_number
131 }
132 #[inline]
134 pub fn manifest_list(&self) -> &str {
135 &self.manifest_list
136 }
137
138 #[inline]
140 pub fn summary(&self) -> &Summary {
141 &self.summary
142 }
143 #[inline]
145 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
146 timestamp_ms_to_utc(self.timestamp_ms)
147 }
148
149 #[inline]
151 pub fn timestamp_ms(&self) -> i64 {
152 self.timestamp_ms
153 }
154
155 #[inline]
157 pub fn schema_id(&self) -> Option<SchemaId> {
158 self.schema_id
159 }
160
161 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
163 Ok(match self.schema_id() {
164 Some(schema_id) => table_metadata
165 .schema_by_id(schema_id)
166 .ok_or_else(|| {
167 Error::new(
168 ErrorKind::DataInvalid,
169 format!("Schema with id {} not found", schema_id),
170 )
171 })?
172 .clone(),
173 None => table_metadata.current_schema().clone(),
174 })
175 }
176
177 #[cfg(test)]
179 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
180 match self.parent_snapshot_id {
181 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
182 None => None,
183 }
184 }
185
186 pub async fn load_manifest_list(
188 &self,
189 file_io: &FileIO,
190 table_metadata: &TableMetadata,
191 ) -> Result<ManifestList> {
192 let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
193 ManifestList::parse_with_version(
194 &manifest_list_content,
195 table_metadata.format_version(),
198 )
199 }
200
201 #[allow(dead_code)]
202 pub(crate) fn log(&self) -> SnapshotLog {
203 SnapshotLog {
204 timestamp_ms: self.timestamp_ms,
205 snapshot_id: self.snapshot_id,
206 }
207 }
208}
209
210pub(super) mod _serde {
211 use std::collections::HashMap;
216
217 use serde::{Deserialize, Serialize};
218
219 use super::{Operation, Snapshot, Summary};
220 use crate::Error;
221 use crate::spec::SchemaId;
222
223 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
224 #[serde(rename_all = "kebab-case")]
225 pub(crate) struct SnapshotV2 {
227 pub snapshot_id: i64,
228 #[serde(skip_serializing_if = "Option::is_none")]
229 pub parent_snapshot_id: Option<i64>,
230 pub sequence_number: i64,
231 pub timestamp_ms: i64,
232 pub manifest_list: String,
233 pub summary: Summary,
234 #[serde(skip_serializing_if = "Option::is_none")]
235 pub schema_id: Option<SchemaId>,
236 }
237
238 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
239 #[serde(rename_all = "kebab-case")]
240 pub(crate) struct SnapshotV1 {
242 pub snapshot_id: i64,
243 #[serde(skip_serializing_if = "Option::is_none")]
244 pub parent_snapshot_id: Option<i64>,
245 pub timestamp_ms: i64,
246 #[serde(skip_serializing_if = "Option::is_none")]
247 pub manifest_list: Option<String>,
248 #[serde(skip_serializing_if = "Option::is_none")]
249 pub manifests: Option<Vec<String>>,
250 #[serde(skip_serializing_if = "Option::is_none")]
251 pub summary: Option<Summary>,
252 #[serde(skip_serializing_if = "Option::is_none")]
253 pub schema_id: Option<SchemaId>,
254 }
255
256 impl From<SnapshotV2> for Snapshot {
257 fn from(v2: SnapshotV2) -> Self {
258 Snapshot {
259 snapshot_id: v2.snapshot_id,
260 parent_snapshot_id: v2.parent_snapshot_id,
261 sequence_number: v2.sequence_number,
262 timestamp_ms: v2.timestamp_ms,
263 manifest_list: v2.manifest_list,
264 summary: v2.summary,
265 schema_id: v2.schema_id,
266 }
267 }
268 }
269
270 impl From<Snapshot> for SnapshotV2 {
271 fn from(v2: Snapshot) -> Self {
272 SnapshotV2 {
273 snapshot_id: v2.snapshot_id,
274 parent_snapshot_id: v2.parent_snapshot_id,
275 sequence_number: v2.sequence_number,
276 timestamp_ms: v2.timestamp_ms,
277 manifest_list: v2.manifest_list,
278 summary: v2.summary,
279 schema_id: v2.schema_id,
280 }
281 }
282 }
283
284 impl TryFrom<SnapshotV1> for Snapshot {
285 type Error = Error;
286
287 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
288 Ok(Snapshot {
289 snapshot_id: v1.snapshot_id,
290 parent_snapshot_id: v1.parent_snapshot_id,
291 sequence_number: 0,
292 timestamp_ms: v1.timestamp_ms,
293 manifest_list: match (v1.manifest_list, v1.manifests) {
294 (Some(file), None) => file,
295 (Some(_), Some(_)) => "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted".to_string(),
296 (None, _) => "Unsupported v1 snapshot, only manifest list is supported".to_string()
297 },
298 summary: v1.summary.unwrap_or(Summary {
299 operation: Operation::default(),
300 additional_properties: HashMap::new(),
301 }),
302 schema_id: v1.schema_id,
303 })
304 }
305 }
306
307 impl From<Snapshot> for SnapshotV1 {
308 fn from(v2: Snapshot) -> Self {
309 SnapshotV1 {
310 snapshot_id: v2.snapshot_id,
311 parent_snapshot_id: v2.parent_snapshot_id,
312 timestamp_ms: v2.timestamp_ms,
313 manifest_list: Some(v2.manifest_list),
314 summary: Some(v2.summary),
315 schema_id: v2.schema_id,
316 manifests: None,
317 }
318 }
319 }
320}
321
322#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
323#[serde(rename_all = "kebab-case")]
324pub struct SnapshotReference {
326 pub snapshot_id: i64,
328 #[serde(flatten)]
329 pub retention: SnapshotRetention,
331}
332
333impl SnapshotReference {
334 pub fn is_branch(&self) -> bool {
336 matches!(self.retention, SnapshotRetention::Branch { .. })
337 }
338}
339
340impl SnapshotReference {
341 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
343 SnapshotReference {
344 snapshot_id,
345 retention,
346 }
347 }
348}
349
350#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
351#[serde(rename_all = "lowercase", tag = "type")]
352pub enum SnapshotRetention {
354 #[serde(rename_all = "kebab-case")]
355 Branch {
358 #[serde(skip_serializing_if = "Option::is_none")]
361 min_snapshots_to_keep: Option<i32>,
362 #[serde(skip_serializing_if = "Option::is_none")]
365 max_snapshot_age_ms: Option<i64>,
366 #[serde(skip_serializing_if = "Option::is_none")]
369 max_ref_age_ms: Option<i64>,
370 },
371 #[serde(rename_all = "kebab-case")]
372 Tag {
374 #[serde(skip_serializing_if = "Option::is_none")]
377 max_ref_age_ms: Option<i64>,
378 },
379}
380
381impl SnapshotRetention {
382 pub fn branch(
384 min_snapshots_to_keep: Option<i32>,
385 max_snapshot_age_ms: Option<i64>,
386 max_ref_age_ms: Option<i64>,
387 ) -> Self {
388 SnapshotRetention::Branch {
389 min_snapshots_to_keep,
390 max_snapshot_age_ms,
391 max_ref_age_ms,
392 }
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use std::collections::HashMap;
399
400 use chrono::{TimeZone, Utc};
401
402 use crate::spec::snapshot::_serde::SnapshotV1;
403 use crate::spec::snapshot::{Operation, Snapshot, Summary};
404
405 #[test]
406 fn schema() {
407 let record = r#"
408 {
409 "snapshot-id": 3051729675574597004,
410 "timestamp-ms": 1515100955770,
411 "summary": {
412 "operation": "append"
413 },
414 "manifest-list": "s3://b/wh/.../s1.avro",
415 "schema-id": 0
416 }
417 "#;
418
419 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
420 .unwrap()
421 .try_into()
422 .unwrap();
423 assert_eq!(3051729675574597004, result.snapshot_id());
424 assert_eq!(
425 Utc.timestamp_millis_opt(1515100955770).unwrap(),
426 result.timestamp().unwrap()
427 );
428 assert_eq!(1515100955770, result.timestamp_ms());
429 assert_eq!(
430 Summary {
431 operation: Operation::Append,
432 additional_properties: HashMap::new()
433 },
434 *result.summary()
435 );
436 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
437 }
438
439 #[test]
440 fn test_snapshot_v1_to_v2_projection() {
441 use crate::spec::snapshot::_serde::SnapshotV1;
442
443 let v1_snapshot = SnapshotV1 {
445 snapshot_id: 1234567890,
446 parent_snapshot_id: Some(987654321),
447 timestamp_ms: 1515100955770,
448 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
449 manifests: None, summary: Some(Summary {
451 operation: Operation::Append,
452 additional_properties: HashMap::from([
453 ("added-files".to_string(), "5".to_string()),
454 ("added-records".to_string(), "100".to_string()),
455 ]),
456 }),
457 schema_id: Some(1),
458 };
459
460 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
462
463 assert_eq!(
465 v2_snapshot.sequence_number(),
466 0,
467 "V1 snapshot sequence_number should default to 0"
468 );
469
470 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
472 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
473 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
474 assert_eq!(
475 v2_snapshot.manifest_list(),
476 "s3://bucket/manifest-list.avro"
477 );
478 assert_eq!(v2_snapshot.schema_id(), Some(1));
479 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
480 assert_eq!(
481 v2_snapshot
482 .summary()
483 .additional_properties
484 .get("added-files"),
485 Some(&"5".to_string())
486 );
487 }
488
489 #[test]
490 fn test_snapshot_v1_to_v2_with_missing_summary() {
491 use crate::spec::snapshot::_serde::SnapshotV1;
492
493 let v1_snapshot = SnapshotV1 {
495 snapshot_id: 1111111111,
496 parent_snapshot_id: None,
497 timestamp_ms: 1515100955770,
498 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
499 manifests: None,
500 summary: None, schema_id: None,
502 };
503
504 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
506
507 assert_eq!(
509 v2_snapshot.sequence_number(),
510 0,
511 "V1 snapshot sequence_number should default to 0"
512 );
513 assert_eq!(
514 v2_snapshot.summary().operation,
515 Operation::Append,
516 "Missing V1 summary should default to Append operation"
517 );
518 assert!(
519 v2_snapshot.summary().additional_properties.is_empty(),
520 "Default summary should have empty additional_properties"
521 );
522
523 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
525 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
526 assert_eq!(v2_snapshot.schema_id(), None);
527 }
528}