1use std::sync::Arc;
26
27use once_cell::sync::Lazy;
28
29use crate::spec::{NestedField, NestedFieldRef, PrimitiveType, Type};
30use crate::{Error, ErrorKind, Result};
31
32pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1;
34
35pub const RESERVED_FIELD_ID_POS: i32 = i32::MAX - 2;
37
38pub const RESERVED_FIELD_ID_DELETED: i32 = i32::MAX - 3;
40
41pub const RESERVED_FIELD_ID_SPEC_ID: i32 = i32::MAX - 4;
43
44pub const RESERVED_FIELD_ID_PARTITION: i32 = i32::MAX - 5;
46
47pub const RESERVED_FIELD_ID_DELETE_FILE_PATH: i32 = i32::MAX - 101;
49
50pub const RESERVED_FIELD_ID_DELETE_FILE_POS: i32 = i32::MAX - 102;
52
53pub const RESERVED_FIELD_ID_CHANGE_TYPE: i32 = i32::MAX - 104;
55
56pub const RESERVED_FIELD_ID_CHANGE_ORDINAL: i32 = i32::MAX - 105;
58
59pub const RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID: i32 = i32::MAX - 106;
61
62pub const RESERVED_FIELD_ID_ROW_ID: i32 = i32::MAX - 107;
64
65pub const RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER: i32 = i32::MAX - 108;
67
68pub const RESERVED_COL_NAME_FILE: &str = "_file";
70
71pub const RESERVED_COL_NAME_POS: &str = "_pos";
73
74pub const RESERVED_COL_NAME_DELETED: &str = "_deleted";
76
77pub const RESERVED_COL_NAME_SPEC_ID: &str = "_spec_id";
79
80pub const RESERVED_COL_NAME_PARTITION: &str = "_partition";
82
83pub const RESERVED_COL_NAME_DELETE_FILE_PATH: &str = "file_path";
85
86pub const RESERVED_COL_NAME_DELETE_FILE_POS: &str = "pos";
88
89pub const RESERVED_COL_NAME_CHANGE_TYPE: &str = "_change_type";
91
92pub const RESERVED_COL_NAME_CHANGE_ORDINAL: &str = "_change_ordinal";
94
95pub const RESERVED_COL_NAME_COMMIT_SNAPSHOT_ID: &str = "_commit_snapshot_id";
97
98pub const RESERVED_COL_NAME_ROW_ID: &str = "_row_id";
100
101pub const RESERVED_COL_NAME_LAST_UPDATED_SEQUENCE_NUMBER: &str = "_last_updated_sequence_number";
103
104static FILE_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
107 Arc::new(
108 NestedField::required(
109 RESERVED_FIELD_ID_FILE,
110 RESERVED_COL_NAME_FILE,
111 Type::Primitive(PrimitiveType::String),
112 )
113 .with_doc("Path of the file in which a row is stored"),
114 )
115});
116
117static POS_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
120 Arc::new(
121 NestedField::required(
122 RESERVED_FIELD_ID_POS,
123 RESERVED_COL_NAME_POS,
124 Type::Primitive(PrimitiveType::Long),
125 )
126 .with_doc("Ordinal position of a row in the source data file"),
127 )
128});
129
130static DELETED_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
133 Arc::new(
134 NestedField::required(
135 RESERVED_FIELD_ID_DELETED,
136 RESERVED_COL_NAME_DELETED,
137 Type::Primitive(PrimitiveType::Boolean),
138 )
139 .with_doc("Whether the row has been deleted"),
140 )
141});
142
143static SPEC_ID_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
146 Arc::new(
147 NestedField::required(
148 RESERVED_FIELD_ID_SPEC_ID,
149 RESERVED_COL_NAME_SPEC_ID,
150 Type::Primitive(PrimitiveType::Int),
151 )
152 .with_doc("Spec ID used to track the file containing a row"),
153 )
154});
155
156static DELETE_FILE_PATH_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
159 Arc::new(
160 NestedField::required(
161 RESERVED_FIELD_ID_DELETE_FILE_PATH,
162 RESERVED_COL_NAME_DELETE_FILE_PATH,
163 Type::Primitive(PrimitiveType::String),
164 )
165 .with_doc("Path of a file, used in position-based delete files"),
166 )
167});
168
169static DELETE_FILE_POS_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
172 Arc::new(
173 NestedField::required(
174 RESERVED_FIELD_ID_DELETE_FILE_POS,
175 RESERVED_COL_NAME_DELETE_FILE_POS,
176 Type::Primitive(PrimitiveType::Long),
177 )
178 .with_doc("Ordinal position of a row, used in position-based delete files"),
179 )
180});
181
182static CHANGE_TYPE_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
185 Arc::new(
186 NestedField::required(
187 RESERVED_FIELD_ID_CHANGE_TYPE,
188 RESERVED_COL_NAME_CHANGE_TYPE,
189 Type::Primitive(PrimitiveType::String),
190 )
191 .with_doc(
192 "The record type in the changelog (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER)",
193 ),
194 )
195});
196
197static CHANGE_ORDINAL_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
200 Arc::new(
201 NestedField::required(
202 RESERVED_FIELD_ID_CHANGE_ORDINAL,
203 RESERVED_COL_NAME_CHANGE_ORDINAL,
204 Type::Primitive(PrimitiveType::Int),
205 )
206 .with_doc("The order of the change"),
207 )
208});
209
210static COMMIT_SNAPSHOT_ID_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
213 Arc::new(
214 NestedField::required(
215 RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID,
216 RESERVED_COL_NAME_COMMIT_SNAPSHOT_ID,
217 Type::Primitive(PrimitiveType::Long),
218 )
219 .with_doc("The snapshot ID in which the change occurred"),
220 )
221});
222
223static ROW_ID_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
226 Arc::new(
227 NestedField::required(
228 RESERVED_FIELD_ID_ROW_ID,
229 RESERVED_COL_NAME_ROW_ID,
230 Type::Primitive(PrimitiveType::Long),
231 )
232 .with_doc("A unique long assigned for row lineage"),
233 )
234});
235
236static LAST_UPDATED_SEQUENCE_NUMBER_FIELD: Lazy<NestedFieldRef> = Lazy::new(|| {
239 Arc::new(
240 NestedField::required(
241 RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER,
242 RESERVED_COL_NAME_LAST_UPDATED_SEQUENCE_NUMBER,
243 Type::Primitive(PrimitiveType::Long),
244 )
245 .with_doc("The sequence number which last updated this row"),
246 )
247});
248
249pub fn file_field() -> &'static NestedFieldRef {
254 &FILE_FIELD
255}
256
257pub fn pos_field() -> &'static NestedFieldRef {
262 &POS_FIELD
263}
264
265pub fn deleted_field() -> &'static NestedFieldRef {
270 &DELETED_FIELD
271}
272
273pub fn spec_id_field() -> &'static NestedFieldRef {
278 &SPEC_ID_FIELD
279}
280
281pub fn delete_file_path_field() -> &'static NestedFieldRef {
286 &DELETE_FILE_PATH_FIELD
287}
288
289pub fn delete_file_pos_field() -> &'static NestedFieldRef {
294 &DELETE_FILE_POS_FIELD
295}
296
297pub fn change_type_field() -> &'static NestedFieldRef {
302 &CHANGE_TYPE_FIELD
303}
304
305pub fn change_ordinal_field() -> &'static NestedFieldRef {
310 &CHANGE_ORDINAL_FIELD
311}
312
313pub fn commit_snapshot_id_field() -> &'static NestedFieldRef {
318 &COMMIT_SNAPSHOT_ID_FIELD
319}
320
321pub fn row_id_field() -> &'static NestedFieldRef {
326 &ROW_ID_FIELD
327}
328
329pub fn last_updated_sequence_number_field() -> &'static NestedFieldRef {
334 &LAST_UPDATED_SEQUENCE_NUMBER_FIELD
335}
336
337pub fn partition_field(partition_fields: Vec<NestedFieldRef>) -> NestedFieldRef {
370 use crate::spec::StructType;
371
372 Arc::new(
373 NestedField::required(
374 RESERVED_FIELD_ID_PARTITION,
375 RESERVED_COL_NAME_PARTITION,
376 Type::Struct(StructType::new(partition_fields)),
377 )
378 .with_doc("Partition to which a row belongs"),
379 )
380}
381
382pub fn get_metadata_field(field_id: i32) -> Result<&'static NestedFieldRef> {
394 match field_id {
395 RESERVED_FIELD_ID_FILE => Ok(file_field()),
396 RESERVED_FIELD_ID_POS => Ok(pos_field()),
397 RESERVED_FIELD_ID_DELETED => Ok(deleted_field()),
398 RESERVED_FIELD_ID_SPEC_ID => Ok(spec_id_field()),
399 RESERVED_FIELD_ID_PARTITION => Err(Error::new(
400 ErrorKind::Unexpected,
401 "The _partition field must be created using partition_field() with appropriate partition fields",
402 )),
403 RESERVED_FIELD_ID_DELETE_FILE_PATH => Ok(delete_file_path_field()),
404 RESERVED_FIELD_ID_DELETE_FILE_POS => Ok(delete_file_pos_field()),
405 RESERVED_FIELD_ID_CHANGE_TYPE => Ok(change_type_field()),
406 RESERVED_FIELD_ID_CHANGE_ORDINAL => Ok(change_ordinal_field()),
407 RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID => Ok(commit_snapshot_id_field()),
408 RESERVED_FIELD_ID_ROW_ID => Ok(row_id_field()),
409 RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER => Ok(last_updated_sequence_number_field()),
410 _ if is_metadata_field(field_id) => {
411 Err(Error::new(
413 ErrorKind::Unexpected,
414 format!(
415 "Metadata field ID {field_id} recognized but field definition not implemented"
416 ),
417 ))
418 }
419 _ => Err(Error::new(
420 ErrorKind::Unexpected,
421 format!("Field ID {field_id} is not a metadata field"),
422 )),
423 }
424}
425
426pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
434 match column_name {
435 RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE),
436 RESERVED_COL_NAME_POS => Ok(RESERVED_FIELD_ID_POS),
437 RESERVED_COL_NAME_DELETED => Ok(RESERVED_FIELD_ID_DELETED),
438 RESERVED_COL_NAME_SPEC_ID => Ok(RESERVED_FIELD_ID_SPEC_ID),
439 RESERVED_COL_NAME_PARTITION => Ok(RESERVED_FIELD_ID_PARTITION),
440 RESERVED_COL_NAME_DELETE_FILE_PATH => Ok(RESERVED_FIELD_ID_DELETE_FILE_PATH),
441 RESERVED_COL_NAME_DELETE_FILE_POS => Ok(RESERVED_FIELD_ID_DELETE_FILE_POS),
442 RESERVED_COL_NAME_CHANGE_TYPE => Ok(RESERVED_FIELD_ID_CHANGE_TYPE),
443 RESERVED_COL_NAME_CHANGE_ORDINAL => Ok(RESERVED_FIELD_ID_CHANGE_ORDINAL),
444 RESERVED_COL_NAME_COMMIT_SNAPSHOT_ID => Ok(RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID),
445 RESERVED_COL_NAME_ROW_ID => Ok(RESERVED_FIELD_ID_ROW_ID),
446 RESERVED_COL_NAME_LAST_UPDATED_SEQUENCE_NUMBER => {
447 Ok(RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER)
448 }
449 _ => Err(Error::new(
450 ErrorKind::Unexpected,
451 format!("Unknown/unsupported metadata column name: {column_name}"),
452 )),
453 }
454}
455
456pub fn is_metadata_field(field_id: i32) -> bool {
464 matches!(
465 field_id,
466 RESERVED_FIELD_ID_FILE
467 | RESERVED_FIELD_ID_POS
468 | RESERVED_FIELD_ID_DELETED
469 | RESERVED_FIELD_ID_SPEC_ID
470 | RESERVED_FIELD_ID_PARTITION
471 | RESERVED_FIELD_ID_DELETE_FILE_PATH
472 | RESERVED_FIELD_ID_DELETE_FILE_POS
473 | RESERVED_FIELD_ID_CHANGE_TYPE
474 | RESERVED_FIELD_ID_CHANGE_ORDINAL
475 | RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID
476 | RESERVED_FIELD_ID_ROW_ID
477 | RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER
478 )
479}
480
481pub fn is_metadata_column_name(column_name: &str) -> bool {
489 get_metadata_field_id(column_name).is_ok()
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use crate::spec::PrimitiveType;
496
497 #[test]
498 fn test_partition_field_creation() {
499 let partition_fields = vec![
501 Arc::new(NestedField::required(
502 1000,
503 "year",
504 Type::Primitive(PrimitiveType::Int),
505 )),
506 Arc::new(NestedField::required(
507 1001,
508 "month",
509 Type::Primitive(PrimitiveType::Int),
510 )),
511 ];
512
513 let partition = partition_field(partition_fields);
515
516 assert_eq!(partition.id, RESERVED_FIELD_ID_PARTITION);
518 assert_eq!(partition.name, RESERVED_COL_NAME_PARTITION);
519 assert!(partition.required);
520
521 if let Type::Struct(struct_type) = partition.field_type.as_ref() {
523 assert_eq!(struct_type.fields().len(), 2);
524 assert_eq!(struct_type.fields()[0].name, "year");
525 assert_eq!(struct_type.fields()[1].name, "month");
526 } else {
527 panic!("Expected struct type for _partition field");
528 }
529 }
530
531 #[test]
532 fn test_partition_field_id_recognized() {
533 assert!(is_metadata_field(RESERVED_FIELD_ID_PARTITION));
534 }
535
536 #[test]
537 fn test_partition_field_name_recognized() {
538 assert_eq!(
539 get_metadata_field_id(RESERVED_COL_NAME_PARTITION).unwrap(),
540 RESERVED_FIELD_ID_PARTITION
541 );
542 }
543
544 #[test]
545 fn test_get_metadata_field_returns_error_for_partition() {
546 let result = get_metadata_field(RESERVED_FIELD_ID_PARTITION);
548 assert!(result.is_err());
549 assert!(
550 result
551 .unwrap_err()
552 .to_string()
553 .contains("partition_field()")
554 );
555 }
556
557 #[test]
558 fn test_all_metadata_field_ids() {
559 assert!(get_metadata_field(RESERVED_FIELD_ID_FILE).is_ok());
561 assert!(get_metadata_field(RESERVED_FIELD_ID_POS).is_ok());
562 assert!(get_metadata_field(RESERVED_FIELD_ID_DELETED).is_ok());
563 assert!(get_metadata_field(RESERVED_FIELD_ID_SPEC_ID).is_ok());
564 assert!(get_metadata_field(RESERVED_FIELD_ID_DELETE_FILE_PATH).is_ok());
565 assert!(get_metadata_field(RESERVED_FIELD_ID_DELETE_FILE_POS).is_ok());
566 assert!(get_metadata_field(RESERVED_FIELD_ID_CHANGE_TYPE).is_ok());
567 assert!(get_metadata_field(RESERVED_FIELD_ID_CHANGE_ORDINAL).is_ok());
568 assert!(get_metadata_field(RESERVED_FIELD_ID_COMMIT_SNAPSHOT_ID).is_ok());
569 assert!(get_metadata_field(RESERVED_FIELD_ID_ROW_ID).is_ok());
570 assert!(get_metadata_field(RESERVED_FIELD_ID_LAST_UPDATED_SEQUENCE_NUMBER).is_ok());
571 }
572}