iceberg/puffin/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19
20use bytes::Bytes;
21use serde::{Deserialize, Serialize};
22
23use crate::io::{FileRead, InputFile};
24use crate::puffin::compression::CompressionCodec;
25use crate::{Error, ErrorKind, Result};
26
27/// Human-readable identification of the application writing the file, along with its version.
28/// Example: "Trino version 381"
29pub const CREATED_BY_PROPERTY: &str = "created-by";
30
31/// Metadata about a blob.
32/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata
33#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
34#[serde(rename_all = "kebab-case")]
35pub struct BlobMetadata {
36    pub(crate) r#type: String,
37    pub(crate) fields: Vec<i32>,
38    pub(crate) snapshot_id: i64,
39    pub(crate) sequence_number: i64,
40    pub(crate) offset: u64,
41    pub(crate) length: u64,
42    #[serde(skip_serializing_if = "CompressionCodec::is_none")]
43    #[serde(default)]
44    pub(crate) compression_codec: CompressionCodec,
45    #[serde(skip_serializing_if = "HashMap::is_empty")]
46    #[serde(default)]
47    pub(crate) properties: HashMap<String, String>,
48}
49
50impl BlobMetadata {
51    #[inline]
52    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
53    pub fn blob_type(&self) -> &str {
54        &self.r#type
55    }
56
57    #[inline]
58    /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
59    pub fn fields(&self) -> &[i32] {
60        &self.fields
61    }
62
63    #[inline]
64    /// ID of the Iceberg table's snapshot the blob was computed from
65    pub fn snapshot_id(&self) -> i64 {
66        self.snapshot_id
67    }
68
69    #[inline]
70    /// Sequence number of the Iceberg table's snapshot the blob was computed from
71    pub fn sequence_number(&self) -> i64 {
72        self.sequence_number
73    }
74
75    #[inline]
76    /// The offset in the file where the blob contents start
77    pub fn offset(&self) -> u64 {
78        self.offset
79    }
80
81    #[inline]
82    /// The length of the blob stored in the file (after compression, if compressed)
83    pub fn length(&self) -> u64 {
84        self.length
85    }
86
87    #[inline]
88    /// The compression codec used to compress the data
89    pub fn compression_codec(&self) -> CompressionCodec {
90        self.compression_codec
91    }
92
93    #[inline]
94    /// Arbitrary meta-information about the blob
95    pub fn properties(&self) -> &HashMap<String, String> {
96        &self.properties
97    }
98}
99
100#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
101pub(crate) enum Flag {
102    FooterPayloadCompressed = 0,
103}
104
105impl Flag {
106    pub(crate) fn byte_idx(self) -> u8 {
107        (self as u8) / 8
108    }
109
110    pub(crate) fn bit_idx(self) -> u8 {
111        (self as u8) % 8
112    }
113
114    fn matches(self, byte_idx: u8, bit_idx: u8) -> bool {
115        self.byte_idx() == byte_idx && self.bit_idx() == bit_idx
116    }
117
118    fn from(byte_idx: u8, bit_idx: u8) -> Result<Flag> {
119        if Flag::FooterPayloadCompressed.matches(byte_idx, bit_idx) {
120            Ok(Flag::FooterPayloadCompressed)
121        } else {
122            Err(Error::new(
123                ErrorKind::DataInvalid,
124                format!(
125                    "Unknown flag byte {} and bit {} combination",
126                    byte_idx, bit_idx
127                ),
128            ))
129        }
130    }
131}
132
133/// Metadata about a puffin file.
134///
135/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata
136#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
137pub struct FileMetadata {
138    pub(crate) blobs: Vec<BlobMetadata>,
139    #[serde(skip_serializing_if = "HashMap::is_empty")]
140    #[serde(default)]
141    pub(crate) properties: HashMap<String, String>,
142}
143
144impl FileMetadata {
145    pub(crate) const MAGIC_LENGTH: u8 = 4;
146    pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];
147
148    /// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer.
149    /// The structure of the Footer specification is illustrated below:
150    ///
151    /// ```text                                             
152    ///        Footer
153    ///        ┌────────────────────┐                 
154    ///        │  Magic (4 bytes)   │                 
155    ///        │                    │                 
156    ///        ├────────────────────┤                 
157    ///        │   FooterPayload    │                 
158    ///        │  (PAYLOAD_LENGTH)  │                 
159    ///        ├────────────────────┤ ◀─┐             
160    ///        │ FooterPayloadSize  │   │             
161    ///        │     (4 bytes)      │   │             
162    ///        ├────────────────────┤                 
163    ///        │  Flags (4 bytes)   │  FOOTER_STRUCT  
164    ///        │                    │                 
165    ///        ├────────────────────┤   │             
166    ///        │  Magic (4 bytes)   │   │             
167    ///        │                    │   │             
168    ///        └────────────────────┘ ◀─┘  
169    /// ```                      
170    const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
171    const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
172    const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
173        + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
174    pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
175    const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
176        FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
177    pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
178        FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
179
180    /// Constructs new puffin `FileMetadata`
181    pub fn new(blobs: Vec<BlobMetadata>, properties: HashMap<String, String>) -> Self {
182        Self { blobs, properties }
183    }
184
185    fn check_magic(bytes: &[u8]) -> Result<()> {
186        if bytes == FileMetadata::MAGIC {
187            Ok(())
188        } else {
189            Err(Error::new(
190                ErrorKind::DataInvalid,
191                format!(
192                    "Bad magic value: {:?} should be {:?}",
193                    bytes,
194                    FileMetadata::MAGIC
195                ),
196            ))
197        }
198    }
199
200    async fn read_footer_payload_length(
201        file_read: &dyn FileRead,
202        input_file_length: u64,
203    ) -> Result<u32> {
204        let start = input_file_length - FileMetadata::FOOTER_STRUCT_LENGTH as u64;
205        let end = start + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as u64;
206        let footer_payload_length_bytes = file_read.read(start..end).await?;
207        let mut buf = [0; 4];
208        buf.copy_from_slice(&footer_payload_length_bytes);
209        let footer_payload_length = u32::from_le_bytes(buf);
210        Ok(footer_payload_length)
211    }
212
213    async fn read_footer_bytes(
214        file_read: &dyn FileRead,
215        input_file_length: u64,
216        footer_payload_length: u32,
217    ) -> Result<Bytes> {
218        let footer_length = footer_payload_length as u64
219            + FileMetadata::FOOTER_STRUCT_LENGTH as u64
220            + FileMetadata::MAGIC_LENGTH as u64;
221        let start = input_file_length - footer_length;
222        let end = input_file_length;
223        file_read.read(start..end).await
224    }
225
226    fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
227        let mut flags = HashSet::new();
228
229        for byte_idx in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
230            let byte_offset = footer_bytes.len()
231                - FileMetadata::MAGIC_LENGTH as usize
232                - FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize
233                + byte_idx as usize;
234
235            let flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
236                Error::new(ErrorKind::DataInvalid, "Index range is out of bounds.")
237            })?;
238
239            for bit_idx in 0..8 {
240                if ((flag_byte >> bit_idx) & 1) != 0 {
241                    let flag = Flag::from(byte_idx, bit_idx)?;
242                    flags.insert(flag);
243                }
244            }
245        }
246
247        Ok(flags)
248    }
249
250    fn extract_footer_payload_as_str(
251        footer_bytes: &[u8],
252        footer_payload_length: u32,
253    ) -> Result<String> {
254        let flags = FileMetadata::decode_flags(footer_bytes)?;
255        let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) {
256            CompressionCodec::Lz4
257        } else {
258            CompressionCodec::None
259        };
260
261        let start_offset = FileMetadata::MAGIC_LENGTH as usize;
262        let end_offset =
263            FileMetadata::MAGIC_LENGTH as usize + usize::try_from(footer_payload_length)?;
264        let footer_payload_bytes = footer_bytes
265            .get(start_offset..end_offset)
266            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is out of bounds."))?;
267        let decompressed_footer_payload_bytes =
268            footer_compression_codec.decompress(footer_payload_bytes.into())?;
269
270        String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
271            Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 string")
272                .with_source(src)
273        })
274    }
275
276    fn from_json_str(string: &str) -> Result<FileMetadata> {
277        serde_json::from_str::<FileMetadata>(string).map_err(|src| {
278            Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON").with_source(src)
279        })
280    }
281
282    /// Returns the file metadata about a Puffin file
283    pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {
284        let file_read = input_file.reader().await?;
285
286        let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?;
287        FileMetadata::check_magic(&first_four_bytes)?;
288
289        let input_file_length = input_file.metadata().await?.size;
290        let footer_payload_length =
291            FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?;
292        let footer_bytes =
293            FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length)
294                .await?;
295
296        let magic_length = FileMetadata::MAGIC_LENGTH as usize;
297        // check first four bytes of footer
298        FileMetadata::check_magic(&footer_bytes[..magic_length])?;
299        // check last four bytes of footer
300        FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
301
302        let footer_payload_str =
303            FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
304
305        FileMetadata::from_json_str(&footer_payload_str)
306    }
307
308    /// Reads file_metadata in puffin file with a prefetch hint
309    ///
310    /// `prefetch_hint` is used to try to fetch the entire footer in one read. If
311    /// the entire footer isn't fetched in one read the function will call the regular
312    /// read option.
313    #[allow(dead_code)]
314    pub(crate) async fn read_with_prefetch(
315        input_file: &InputFile,
316        prefetch_hint: u8,
317    ) -> Result<FileMetadata> {
318        if prefetch_hint > 16 {
319            let input_file_length = input_file.metadata().await?.size;
320            let file_read = input_file.reader().await?;
321
322            // Hint cannot be larger than input file
323            if prefetch_hint as u64 > input_file_length {
324                return FileMetadata::read(input_file).await;
325            }
326
327            // Read footer based on prefetchi hint
328            let start = input_file_length - prefetch_hint as u64;
329            let end = input_file_length;
330            let footer_bytes = file_read.read(start..end).await?;
331
332            let payload_length_start =
333                footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
334            let payload_length_end =
335                payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
336            let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];
337
338            let mut buf = [0; 4];
339            buf.copy_from_slice(payload_length_bytes);
340            let footer_payload_length = u32::from_le_bytes(buf);
341
342            // If the (footer payload length + FOOTER_STRUCT_LENGTH + MAGIC_LENGTH) is greater
343            // than the fetched footer then you can have it read regularly from a read with no
344            // prefetch while passing in the footer_payload_length.
345            let footer_length = (footer_payload_length as usize)
346                + FileMetadata::FOOTER_STRUCT_LENGTH as usize
347                + FileMetadata::MAGIC_LENGTH as usize;
348            if footer_length > prefetch_hint as usize {
349                return FileMetadata::read(input_file).await;
350            }
351
352            // Read footer bytes
353            let footer_start = footer_bytes.len() - footer_length;
354            let footer_end = footer_bytes.len();
355            let footer_bytes = &footer_bytes[footer_start..footer_end];
356
357            let magic_length = FileMetadata::MAGIC_LENGTH as usize;
358            // check first four bytes of footer
359            FileMetadata::check_magic(&footer_bytes[..magic_length])?;
360            // check last four bytes of footer
361            FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
362
363            let footer_payload_str =
364                FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
365            return FileMetadata::from_json_str(&footer_payload_str);
366        }
367
368        FileMetadata::read(input_file).await
369    }
370
371    #[inline]
372    /// Metadata about blobs in file
373    pub fn blobs(&self) -> &[BlobMetadata] {
374        &self.blobs
375    }
376
377    #[inline]
378    /// Arbitrary meta-information, like writer identification/version.
379    pub fn properties(&self) -> &HashMap<String, String> {
380        &self.properties
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use std::collections::HashMap;
387
388    use bytes::Bytes;
389    use tempfile::TempDir;
390
391    use crate::io::{FileIOBuilder, InputFile};
392    use crate::puffin::metadata::{BlobMetadata, CompressionCodec, FileMetadata};
393    use crate::puffin::test_utils::{
394        empty_footer_payload, empty_footer_payload_bytes, empty_footer_payload_bytes_length_bytes,
395        java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
396        java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
397        zstd_compressed_metric_file_metadata,
398    };
399
400    const INVALID_MAGIC_VALUE: [u8; 4] = [80, 70, 65, 0];
401
402    async fn input_file_with_bytes(temp_dir: &TempDir, slice: &[u8]) -> InputFile {
403        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
404
405        let path_buf = temp_dir.path().join("abc.puffin");
406        let temp_path = path_buf.to_str().unwrap();
407        let output_file = file_io.new_output(temp_path).unwrap();
408
409        output_file
410            .write(Bytes::copy_from_slice(slice))
411            .await
412            .unwrap();
413
414        output_file.to_input_file()
415    }
416
417    async fn input_file_with_payload(temp_dir: &TempDir, payload_str: &str) -> InputFile {
418        let payload_bytes = payload_str.as_bytes();
419
420        let mut bytes = vec![];
421        bytes.extend(FileMetadata::MAGIC.to_vec());
422        bytes.extend(FileMetadata::MAGIC.to_vec());
423        bytes.extend(payload_bytes);
424        bytes.extend(u32::to_le_bytes(payload_bytes.len() as u32));
425        bytes.extend(vec![0, 0, 0, 0]);
426        bytes.extend(FileMetadata::MAGIC);
427
428        input_file_with_bytes(temp_dir, &bytes).await
429    }
430
431    #[tokio::test]
432    async fn test_file_starting_with_invalid_magic_returns_error() {
433        let temp_dir = TempDir::new().unwrap();
434
435        let mut bytes = vec![];
436        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
437        bytes.extend(FileMetadata::MAGIC.to_vec());
438        bytes.extend(empty_footer_payload_bytes());
439        bytes.extend(empty_footer_payload_bytes_length_bytes());
440        bytes.extend(vec![0, 0, 0, 0]);
441        bytes.extend(FileMetadata::MAGIC);
442
443        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
444
445        assert_eq!(
446            FileMetadata::read(&input_file)
447                .await
448                .unwrap_err()
449                .to_string(),
450            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
451        )
452    }
453
454    #[tokio::test]
455    async fn test_file_with_invalid_magic_at_start_of_footer_returns_error() {
456        let temp_dir = TempDir::new().unwrap();
457
458        let mut bytes = vec![];
459        bytes.extend(FileMetadata::MAGIC.to_vec());
460        bytes.extend(INVALID_MAGIC_VALUE.to_vec());
461        bytes.extend(empty_footer_payload_bytes());
462        bytes.extend(empty_footer_payload_bytes_length_bytes());
463        bytes.extend(vec![0, 0, 0, 0]);
464        bytes.extend(FileMetadata::MAGIC);
465
466        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
467
468        assert_eq!(
469            FileMetadata::read(&input_file)
470                .await
471                .unwrap_err()
472                .to_string(),
473            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
474        )
475    }
476
477    #[tokio::test]
478    async fn test_file_ending_with_invalid_magic_returns_error() {
479        let temp_dir = TempDir::new().unwrap();
480
481        let mut bytes = vec![];
482        bytes.extend(FileMetadata::MAGIC.to_vec());
483        bytes.extend(FileMetadata::MAGIC.to_vec());
484        bytes.extend(empty_footer_payload_bytes());
485        bytes.extend(empty_footer_payload_bytes_length_bytes());
486        bytes.extend(vec![0, 0, 0, 0]);
487        bytes.extend(INVALID_MAGIC_VALUE);
488
489        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
490
491        assert_eq!(
492            FileMetadata::read(&input_file)
493                .await
494                .unwrap_err()
495                .to_string(),
496            "DataInvalid => Bad magic value: [80, 70, 65, 0] should be [80, 70, 65, 49]",
497        )
498    }
499
500    #[tokio::test]
501    async fn test_encoded_payload_length_larger_than_actual_payload_length_returns_error() {
502        let temp_dir = TempDir::new().unwrap();
503
504        let mut bytes = vec![];
505        bytes.extend(FileMetadata::MAGIC.to_vec());
506        bytes.extend(FileMetadata::MAGIC.to_vec());
507        bytes.extend(empty_footer_payload_bytes());
508        bytes.extend(u32::to_le_bytes(
509            empty_footer_payload_bytes().len() as u32 + 1,
510        ));
511        bytes.extend(vec![0, 0, 0, 0]);
512        bytes.extend(FileMetadata::MAGIC.to_vec());
513
514        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
515
516        assert_eq!(
517            FileMetadata::read(&input_file)
518                .await
519                .unwrap_err()
520                .to_string(),
521            "DataInvalid => Bad magic value: [49, 80, 70, 65] should be [80, 70, 65, 49]",
522        )
523    }
524
525    #[tokio::test]
526    async fn test_encoded_payload_length_smaller_than_actual_payload_length_returns_error() {
527        let temp_dir = TempDir::new().unwrap();
528
529        let mut bytes = vec![];
530        bytes.extend(FileMetadata::MAGIC.to_vec());
531        bytes.extend(FileMetadata::MAGIC.to_vec());
532        bytes.extend(empty_footer_payload_bytes());
533        bytes.extend(u32::to_le_bytes(
534            empty_footer_payload_bytes().len() as u32 - 1,
535        ));
536        bytes.extend(vec![0, 0, 0, 0]);
537        bytes.extend(FileMetadata::MAGIC.to_vec());
538
539        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
540
541        assert_eq!(
542            FileMetadata::read(&input_file)
543                .await
544                .unwrap_err()
545                .to_string(),
546            "DataInvalid => Bad magic value: [70, 65, 49, 123] should be [80, 70, 65, 49]",
547        )
548    }
549
550    #[tokio::test]
551    async fn test_lz4_compressed_footer_returns_error() {
552        let temp_dir = TempDir::new().unwrap();
553
554        let mut bytes = vec![];
555        bytes.extend(FileMetadata::MAGIC.to_vec());
556        bytes.extend(FileMetadata::MAGIC.to_vec());
557        bytes.extend(empty_footer_payload_bytes());
558        bytes.extend(empty_footer_payload_bytes_length_bytes());
559        bytes.extend(vec![0b00000001, 0, 0, 0]);
560        bytes.extend(FileMetadata::MAGIC.to_vec());
561
562        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
563
564        assert_eq!(
565            FileMetadata::read(&input_file)
566                .await
567                .unwrap_err()
568                .to_string(),
569            "FeatureUnsupported => LZ4 decompression is not supported currently",
570        )
571    }
572
573    #[tokio::test]
574    async fn test_unknown_byte_bit_combination_returns_error() {
575        let temp_dir = TempDir::new().unwrap();
576
577        let mut bytes = vec![];
578        bytes.extend(FileMetadata::MAGIC.to_vec());
579        bytes.extend(FileMetadata::MAGIC.to_vec());
580        bytes.extend(empty_footer_payload_bytes());
581        bytes.extend(empty_footer_payload_bytes_length_bytes());
582        bytes.extend(vec![0b00000010, 0, 0, 0]);
583        bytes.extend(FileMetadata::MAGIC.to_vec());
584
585        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
586
587        assert_eq!(
588            FileMetadata::read(&input_file)
589                .await
590                .unwrap_err()
591                .to_string(),
592            "DataInvalid => Unknown flag byte 0 and bit 1 combination",
593        )
594    }
595
596    #[tokio::test]
597    async fn test_non_utf8_string_payload_returns_error() {
598        let temp_dir = TempDir::new().unwrap();
599
600        let payload_bytes: [u8; 4] = [0, 159, 146, 150];
601        let payload_bytes_length_bytes: [u8; 4] = u32::to_le_bytes(payload_bytes.len() as u32);
602
603        let mut bytes = vec![];
604        bytes.extend(FileMetadata::MAGIC.to_vec());
605        bytes.extend(FileMetadata::MAGIC.to_vec());
606        bytes.extend(payload_bytes);
607        bytes.extend(payload_bytes_length_bytes);
608        bytes.extend(vec![0, 0, 0, 0]);
609        bytes.extend(FileMetadata::MAGIC.to_vec());
610
611        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
612
613        assert_eq!(
614            FileMetadata::read(&input_file)
615                .await
616                .unwrap_err()
617                .to_string(),
618            "DataInvalid => Footer is not a valid UTF-8 string, source: invalid utf-8 sequence of 1 bytes from index 1",
619        )
620    }
621
622    #[tokio::test]
623    async fn test_minimal_valid_file_returns_file_metadata() {
624        let temp_dir = TempDir::new().unwrap();
625
626        let mut bytes = vec![];
627        bytes.extend(FileMetadata::MAGIC.to_vec());
628        bytes.extend(FileMetadata::MAGIC.to_vec());
629        bytes.extend(empty_footer_payload_bytes());
630        bytes.extend(empty_footer_payload_bytes_length_bytes());
631        bytes.extend(vec![0, 0, 0, 0]);
632        bytes.extend(FileMetadata::MAGIC);
633
634        let input_file = input_file_with_bytes(&temp_dir, &bytes).await;
635
636        assert_eq!(
637            FileMetadata::read(&input_file).await.unwrap(),
638            FileMetadata {
639                blobs: vec![],
640                properties: HashMap::new(),
641            }
642        )
643    }
644
645    #[tokio::test]
646    async fn test_returns_file_metadata_property() {
647        let temp_dir = TempDir::new().unwrap();
648
649        let input_file = input_file_with_payload(
650            &temp_dir,
651            r#"{
652                "blobs" : [ ],
653                "properties" : {
654                    "a property" : "a property value"
655                }
656            }"#,
657        )
658        .await;
659
660        assert_eq!(
661            FileMetadata::read(&input_file).await.unwrap(),
662            FileMetadata {
663                blobs: vec![],
664                properties: {
665                    let mut map = HashMap::new();
666                    map.insert("a property".to_string(), "a property value".to_string());
667                    map
668                },
669            }
670        )
671    }
672
673    #[tokio::test]
674    async fn test_returns_file_metadata_properties() {
675        let temp_dir = TempDir::new().unwrap();
676
677        let input_file = input_file_with_payload(
678            &temp_dir,
679            r#"{
680                "blobs" : [ ],
681                "properties" : {
682                    "a property" : "a property value",
683                    "another one": "also with value"
684                }
685            }"#,
686        )
687        .await;
688
689        assert_eq!(
690            FileMetadata::read(&input_file).await.unwrap(),
691            FileMetadata {
692                blobs: vec![],
693                properties: {
694                    let mut map = HashMap::new();
695                    map.insert("a property".to_string(), "a property value".to_string());
696                    map.insert("another one".to_string(), "also with value".to_string());
697                    map
698                },
699            }
700        )
701    }
702
703    #[tokio::test]
704    async fn test_returns_error_if_blobs_field_is_missing() {
705        let temp_dir = TempDir::new().unwrap();
706
707        let input_file = input_file_with_payload(
708            &temp_dir,
709            r#"{
710                "properties" : {}
711            }"#,
712        )
713        .await;
714
715        assert_eq!(
716            FileMetadata::read(&input_file)
717                .await
718                .unwrap_err()
719                .to_string(),
720            format!(
721                "DataInvalid => Given string is not valid JSON, source: missing field `blobs` at line 3 column 13"
722            ),
723        )
724    }
725
726    #[tokio::test]
727    async fn test_returns_error_if_blobs_field_is_bad() {
728        let temp_dir = TempDir::new().unwrap();
729
730        let input_file = input_file_with_payload(
731            &temp_dir,
732            r#"{
733                "blobs" : {}
734            }"#,
735        )
736        .await;
737
738        assert_eq!(
739            FileMetadata::read(&input_file)
740                .await
741                .unwrap_err()
742                .to_string(),
743            format!(
744                "DataInvalid => Given string is not valid JSON, source: invalid type: map, expected a sequence at line 2 column 26"
745            ),
746        )
747    }
748
749    #[tokio::test]
750    async fn test_returns_blobs_metadatas() {
751        let temp_dir = TempDir::new().unwrap();
752
753        let input_file = input_file_with_payload(
754            &temp_dir,
755            r#"{
756                "blobs" : [
757                    {
758                        "type" : "type-a",
759                        "fields" : [ 1 ],
760                        "snapshot-id" : 14,
761                        "sequence-number" : 3,
762                        "offset" : 4,
763                        "length" : 16
764                    },
765                    {
766                        "type" : "type-bbb",
767                        "fields" : [ 2, 3, 4 ],
768                        "snapshot-id" : 77,
769                        "sequence-number" : 4,
770                        "offset" : 21474836470000,
771                        "length" : 79834
772                    }
773                ]
774            }"#,
775        )
776        .await;
777
778        assert_eq!(
779            FileMetadata::read(&input_file).await.unwrap(),
780            FileMetadata {
781                blobs: vec![
782                    BlobMetadata {
783                        r#type: "type-a".to_string(),
784                        fields: vec![1],
785                        snapshot_id: 14,
786                        sequence_number: 3,
787                        offset: 4,
788                        length: 16,
789                        compression_codec: CompressionCodec::None,
790                        properties: HashMap::new(),
791                    },
792                    BlobMetadata {
793                        r#type: "type-bbb".to_string(),
794                        fields: vec![2, 3, 4],
795                        snapshot_id: 77,
796                        sequence_number: 4,
797                        offset: 21474836470000,
798                        length: 79834,
799                        compression_codec: CompressionCodec::None,
800                        properties: HashMap::new(),
801                    },
802                ],
803                properties: HashMap::new(),
804            }
805        )
806    }
807
808    #[tokio::test]
809    async fn test_returns_properties_in_blob_metadata() {
810        let temp_dir = TempDir::new().unwrap();
811
812        let input_file = input_file_with_payload(
813            &temp_dir,
814            r#"{
815                "blobs" : [
816                    {
817                        "type" : "type-a",
818                        "fields" : [ 1 ],
819                        "snapshot-id" : 14,
820                        "sequence-number" : 3,
821                        "offset" : 4,
822                        "length" : 16,
823                        "properties" : {
824                            "some key" : "some value"
825                        }
826                    }
827                ]
828            }"#,
829        )
830        .await;
831
832        assert_eq!(
833            FileMetadata::read(&input_file).await.unwrap(),
834            FileMetadata {
835                blobs: vec![BlobMetadata {
836                    r#type: "type-a".to_string(),
837                    fields: vec![1],
838                    snapshot_id: 14,
839                    sequence_number: 3,
840                    offset: 4,
841                    length: 16,
842                    compression_codec: CompressionCodec::None,
843                    properties: {
844                        let mut map = HashMap::new();
845                        map.insert("some key".to_string(), "some value".to_string());
846                        map
847                    },
848                }],
849                properties: HashMap::new(),
850            }
851        )
852    }
853
854    #[tokio::test]
855    async fn test_returns_error_if_blobs_fields_value_is_outside_i32_range() {
856        let temp_dir = TempDir::new().unwrap();
857
858        let out_of_i32_range_number: i64 = i32::MAX as i64 + 1;
859
860        let input_file = input_file_with_payload(
861            &temp_dir,
862            &format!(
863                r#"{{
864                    "blobs" : [
865                        {{
866                            "type" : "type-a",
867                            "fields" : [ {} ],
868                            "snapshot-id" : 14,
869                            "sequence-number" : 3,
870                            "offset" : 4,
871                            "length" : 16
872                        }}
873                    ]
874                }}"#,
875                out_of_i32_range_number
876            ),
877        )
878        .await;
879
880        assert_eq!(
881            FileMetadata::read(&input_file)
882                .await
883                .unwrap_err()
884                .to_string(),
885            format!(
886                "DataInvalid => Given string is not valid JSON, source: invalid value: integer `{}`, expected i32 at line 5 column 51",
887                out_of_i32_range_number
888            ),
889        )
890    }
891
892    #[tokio::test]
893    async fn test_returns_errors_if_footer_payload_is_not_encoded_in_json_format() {
894        let temp_dir = TempDir::new().unwrap();
895
896        let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;
897
898        assert_eq!(
899            FileMetadata::read(&input_file)
900                .await
901                .unwrap_err()
902                .to_string(),
903            "DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
904        )
905    }
906
907    #[tokio::test]
908    async fn test_read_file_metadata_of_uncompressed_empty_file() {
909        let input_file = java_empty_uncompressed_input_file();
910
911        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
912        assert_eq!(file_metadata, empty_footer_payload())
913    }
914
915    #[tokio::test]
916    async fn test_read_file_metadata_of_uncompressed_metric_data() {
917        let input_file = java_uncompressed_metric_input_file();
918
919        let file_metadata = FileMetadata::read(&input_file).await.unwrap();
920        assert_eq!(file_metadata, uncompressed_metric_file_metadata())
921    }
922
923    #[tokio::test]
924    async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
925        let input_file = java_zstd_compressed_metric_input_file();
926
927        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
928            .await
929            .unwrap();
930        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
931    }
932
933    #[tokio::test]
934    async fn test_read_file_metadata_of_empty_file_with_prefetching() {
935        let input_file = java_empty_uncompressed_input_file();
936        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
937            .await
938            .unwrap();
939
940        assert_eq!(file_metadata, empty_footer_payload());
941    }
942
943    #[tokio::test]
944    async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
945        let input_file = java_uncompressed_metric_input_file();
946        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
947            .await
948            .unwrap();
949
950        assert_eq!(file_metadata, uncompressed_metric_file_metadata());
951    }
952
953    #[tokio::test]
954    async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
955        let input_file = java_zstd_compressed_metric_input_file();
956        let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
957            .await
958            .unwrap();
959
960        assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
961    }
962}