parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
25use crate::file::page_index::index::Index;
26use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
27use crate::file::reader::ChunkReader;
28use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
29use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
30use crate::schema::types;
31use crate::schema::types::SchemaDescriptor;
32use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
33
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::MetadataFetch;
36
37/// Reads the [`ParquetMetaData`] from a byte stream.
38///
39/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
40/// the Parquet metadata.
41///
42/// Parquet metadata is not necessarily contiguous in the files: part is stored
43/// in the footer (the last bytes of the file), but other portions (such as the
44/// PageIndex) can be stored elsewhere.
45///
46/// This reader handles reading the footer as well as the non contiguous parts
47/// of the metadata such as the page indexes; excluding Bloom Filters.
48///
49/// # Example
50/// ```no_run
51/// # use parquet::file::metadata::ParquetMetaDataReader;
52/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
53/// // read parquet metadata including page indexes from a file
54/// let file = open_parquet_file("some_path.parquet");
55/// let mut reader = ParquetMetaDataReader::new()
56///     .with_page_indexes(true);
57/// reader.try_parse(&file).unwrap();
58/// let metadata = reader.finish().unwrap();
59/// assert!(metadata.column_index().is_some());
60/// assert!(metadata.offset_index().is_some());
61/// ```
62#[derive(Default)]
63pub struct ParquetMetaDataReader {
64    metadata: Option<ParquetMetaData>,
65    column_index: bool,
66    offset_index: bool,
67    prefetch_hint: Option<usize>,
68    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
69    // `self.parse_metadata` is called.
70    metadata_size: Option<usize>,
71}
72
73impl ParquetMetaDataReader {
74    /// Create a new [`ParquetMetaDataReader`]
75    pub fn new() -> Self {
76        Default::default()
77    }
78
79    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
80    /// obtained via other means.
81    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
82        Self {
83            metadata: Some(metadata),
84            ..Default::default()
85        }
86    }
87
88    /// Enable or disable reading the page index structures described in
89    /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
90    /// `self.with_column_indexes(val).with_offset_indexes(val)`
91    ///
92    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
93    pub fn with_page_indexes(self, val: bool) -> Self {
94        self.with_column_indexes(val).with_offset_indexes(val)
95    }
96
97    /// Enable or disable reading the Parquet [ColumnIndex] structure.
98    ///
99    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
100    pub fn with_column_indexes(mut self, val: bool) -> Self {
101        self.column_index = val;
102        self
103    }
104
105    /// Enable or disable reading the Parquet [OffsetIndex] structure.
106    ///
107    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
108    pub fn with_offset_indexes(mut self, val: bool) -> Self {
109        self.offset_index = val;
110        self
111    }
112
113    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
114    /// Only used for the asynchronous [`Self::try_load()`] method.
115    ///
116    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
117    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
118    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
119    /// needed to decode the page index structures, if they have been requested. To avoid
120    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
121    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
122    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
123    /// in extra fetches being performed.
124    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
125        self.prefetch_hint = prefetch;
126        self
127    }
128
129    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
130    pub fn has_metadata(&self) -> bool {
131        self.metadata.is_some()
132    }
133
134    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
135    pub fn finish(&mut self) -> Result<ParquetMetaData> {
136        self.metadata
137            .take()
138            .ok_or_else(|| general_err!("could not parse parquet metadata"))
139    }
140
141    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
142    ///
143    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
144    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
145    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
146    ///
147    /// This call will consume `self`.
148    ///
149    /// # Example
150    /// ```no_run
151    /// # use parquet::file::metadata::ParquetMetaDataReader;
152    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
153    /// // read parquet metadata including page indexes
154    /// let file = open_parquet_file("some_path.parquet");
155    /// let metadata = ParquetMetaDataReader::new()
156    ///     .with_page_indexes(true)
157    ///     .parse_and_finish(&file).unwrap();
158    /// ```
159    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
160        self.try_parse(reader)?;
161        self.finish()
162    }
163
164    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
165    ///
166    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
167    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
168    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
169    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
170        self.try_parse_sized(reader, reader.len() as usize)
171    }
172
173    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
174    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
175    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
176    ///
177    /// Using this function also allows for retrying with a larger buffer.
178    ///
179    /// # Errors
180    ///
181    /// This function will return [`ParquetError::IndexOutOfBound`] in the event `reader` does not
182    /// provide enough data to fully parse the metadata (see example below).
183    ///
184    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
185    ///
186    /// # Example
187    /// ```no_run
188    /// # use parquet::file::metadata::ParquetMetaDataReader;
189    /// # use parquet::errors::ParquetError;
190    /// # use crate::parquet::file::reader::Length;
191    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<usize>) -> bytes::Bytes { unimplemented!(); }
192    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
193    /// let file = open_parquet_file("some_path.parquet");
194    /// let len = file.len() as usize;
195    /// let bytes = get_bytes(&file, 1000..len);
196    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
197    /// match reader.try_parse_sized(&bytes, len) {
198    ///     Ok(_) => (),
199    ///     Err(ParquetError::IndexOutOfBound(needed, _)) => {
200    ///         let bytes = get_bytes(&file, len - needed..len);
201    ///         reader.try_parse_sized(&bytes, len).unwrap();
202    ///     }
203    ///     _ => panic!("unexpected error")
204    /// }
205    /// let metadata = reader.finish().unwrap();
206    /// ```
207    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
208        self.metadata = match self.parse_metadata(reader) {
209            Ok(metadata) => Some(metadata),
210            // FIXME: throughout this module ParquetError::IndexOutOfBound is used to indicate the
211            // need for more data. This is not it's intended use. The plan is to add a NeedMoreData
212            // value to the enum, but this would be a breaking change. This will be done as
213            // 54.0.0 draws nearer.
214            // https://github.com/apache/arrow-rs/issues/6447
215            Err(ParquetError::IndexOutOfBound(needed, _)) => {
216                // If reader is the same length as `file_size` then presumably there is no more to
217                // read, so return an EOF error.
218                if file_size == reader.len() as usize || needed > file_size {
219                    return Err(eof_err!(
220                        "Parquet file too small. Size is {} but need {}",
221                        file_size,
222                        needed
223                    ));
224                } else {
225                    // Ask for a larger buffer
226                    return Err(ParquetError::IndexOutOfBound(needed, file_size));
227                }
228            }
229            Err(e) => return Err(e),
230        };
231
232        // we can return if page indexes aren't requested
233        if !self.column_index && !self.offset_index {
234            return Ok(());
235        }
236
237        self.read_page_indexes_sized(reader, file_size)
238    }
239
240    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
241    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
242    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
243        self.read_page_indexes_sized(reader, reader.len() as usize)
244    }
245
246    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
247    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
248    /// a [`Bytes`] struct containing the tail of the file).
249    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
250    pub fn read_page_indexes_sized<R: ChunkReader>(
251        &mut self,
252        reader: &R,
253        file_size: usize,
254    ) -> Result<()> {
255        if self.metadata.is_none() {
256            return Err(general_err!(
257                "Tried to read page indexes without ParquetMetaData metadata"
258            ));
259        }
260
261        // FIXME: there are differing implementations in the case where page indexes are missing
262        // from the file. `MetadataLoader` will leave them as `None`, while the parser in
263        // `index_reader::read_columns_indexes` returns a vector of empty vectors.
264        // It is best for this function to replicate the latter behavior for now, but in a future
265        // breaking release, the two paths to retrieve metadata should be made consistent. Note that this is only
266        // an issue if the user requested page indexes, so there is no need to provide empty
267        // vectors in `try_parse_sized()`.
268        // https://github.com/apache/arrow-rs/issues/6447
269
270        // Get bounds needed for page indexes (if any are present in the file).
271        let Some(range) = self.range_for_page_index() else {
272            self.empty_page_indexes();
273            return Ok(());
274        };
275
276        // Check to see if needed range is within `file_range`. Checking `range.end` seems
277        // redundant, but it guards against `range_for_page_index()` returning garbage.
278        let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
279        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
280            // Requested range starts beyond EOF
281            if range.end > file_size {
282                return Err(eof_err!(
283                    "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
284                    range
285                ));
286            } else {
287                // Ask for a larger buffer
288                return Err(ParquetError::IndexOutOfBound(
289                    file_size - range.start,
290                    file_size,
291                ));
292            }
293        }
294
295        // Perform extra sanity check to make sure `range` and the footer metadata don't
296        // overlap.
297        if let Some(metadata_size) = self.metadata_size {
298            let metadata_range = file_size.saturating_sub(metadata_size)..file_size;
299            if range.end > metadata_range.start {
300                return Err(eof_err!(
301                    "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
302                    range,
303                    metadata_range
304                ));
305            }
306        }
307
308        let bytes_needed = range.end - range.start;
309        let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
310        let offset = range.start;
311
312        self.parse_column_index(&bytes, offset)?;
313        self.parse_offset_index(&bytes, offset)?;
314
315        Ok(())
316    }
317
318    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
319    ///
320    /// This call will consume `self`.
321    ///
322    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
323    /// performed by this function.
324    #[cfg(all(feature = "async", feature = "arrow"))]
325    pub async fn load_and_finish<F: MetadataFetch>(
326        mut self,
327        fetch: F,
328        file_size: usize,
329    ) -> Result<ParquetMetaData> {
330        self.try_load(fetch, file_size).await?;
331        self.finish()
332    }
333
334    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
335    /// given a [`MetadataFetch`].
336    ///
337    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
338    /// performed by this function.
339    #[cfg(all(feature = "async", feature = "arrow"))]
340    pub async fn try_load<F: MetadataFetch>(
341        &mut self,
342        mut fetch: F,
343        file_size: usize,
344    ) -> Result<()> {
345        let (metadata, remainder) =
346            Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;
347
348        self.metadata = Some(metadata);
349
350        // we can return if page indexes aren't requested
351        if !self.column_index && !self.offset_index {
352            return Ok(());
353        }
354
355        self.load_page_index_with_remainder(fetch, remainder).await
356    }
357
358    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
359    /// been obtained. See [`Self::new_with_metadata()`].
360    #[cfg(all(feature = "async", feature = "arrow"))]
361    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
362        self.load_page_index_with_remainder(fetch, None).await
363    }
364
365    #[cfg(all(feature = "async", feature = "arrow"))]
366    async fn load_page_index_with_remainder<F: MetadataFetch>(
367        &mut self,
368        mut fetch: F,
369        remainder: Option<(usize, Bytes)>,
370    ) -> Result<()> {
371        if self.metadata.is_none() {
372            return Err(general_err!("Footer metadata is not present"));
373        }
374
375        // Get bounds needed for page indexes (if any are present in the file).
376        let range = self.range_for_page_index();
377        let range = match range {
378            Some(range) => range,
379            None => return Ok(()),
380        };
381
382        let bytes = match &remainder {
383            Some((remainder_start, remainder)) if *remainder_start <= range.start => {
384                let offset = range.start - *remainder_start;
385                remainder.slice(offset..range.end - *remainder_start + offset)
386            }
387            // Note: this will potentially fetch data already in remainder, this keeps things simple
388            _ => fetch.fetch(range.start..range.end).await?,
389        };
390
391        // Sanity check
392        assert_eq!(bytes.len(), range.end - range.start);
393        let offset = range.start;
394
395        self.parse_column_index(&bytes, offset)?;
396        self.parse_offset_index(&bytes, offset)?;
397
398        Ok(())
399    }
400
401    fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
402        let metadata = self.metadata.as_mut().unwrap();
403        if self.column_index {
404            let index = metadata
405                .row_groups()
406                .iter()
407                .map(|x| {
408                    x.columns()
409                        .iter()
410                        .map(|c| match c.column_index_range() {
411                            Some(r) => decode_column_index(
412                                &bytes[r.start - start_offset..r.end - start_offset],
413                                c.column_type(),
414                            ),
415                            None => Ok(Index::NONE),
416                        })
417                        .collect::<Result<Vec<_>>>()
418                })
419                .collect::<Result<Vec<_>>>()?;
420            metadata.set_column_index(Some(index));
421        }
422        Ok(())
423    }
424
425    fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
426        let metadata = self.metadata.as_mut().unwrap();
427        if self.offset_index {
428            let index = metadata
429                .row_groups()
430                .iter()
431                .map(|x| {
432                    x.columns()
433                        .iter()
434                        .map(|c| match c.offset_index_range() {
435                            Some(r) => decode_offset_index(
436                                &bytes[r.start - start_offset..r.end - start_offset],
437                            ),
438                            None => Err(general_err!("missing offset index")),
439                        })
440                        .collect::<Result<Vec<_>>>()
441                })
442                .collect::<Result<Vec<_>>>()?;
443
444            metadata.set_offset_index(Some(index));
445        }
446        Ok(())
447    }
448
449    /// Set the column_index and offset_indexes to empty `Vec` for backwards compatibility
450    ///
451    /// See <https://github.com/apache/arrow-rs/pull/6451>  for details
452    fn empty_page_indexes(&mut self) {
453        let metadata = self.metadata.as_mut().unwrap();
454        let num_row_groups = metadata.num_row_groups();
455        if self.column_index {
456            metadata.set_column_index(Some(vec![vec![]; num_row_groups]));
457        }
458        if self.offset_index {
459            metadata.set_offset_index(Some(vec![vec![]; num_row_groups]));
460        }
461    }
462
463    fn range_for_page_index(&self) -> Option<Range<usize>> {
464        // sanity check
465        self.metadata.as_ref()?;
466
467        // Get bounds needed for page indexes (if any are present in the file).
468        let mut range = None;
469        let metadata = self.metadata.as_ref().unwrap();
470        for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
471            if self.column_index {
472                range = acc_range(range, c.column_index_range());
473            }
474            if self.offset_index {
475                range = acc_range(range, c.offset_index_range());
476            }
477        }
478        range
479    }
480
481    // One-shot parse of footer.
482    // Side effect: this will set `self.metadata_size`
483    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
484        // check file is large enough to hold footer
485        let file_size = chunk_reader.len();
486        if file_size < (FOOTER_SIZE as u64) {
487            return Err(ParquetError::IndexOutOfBound(
488                FOOTER_SIZE,
489                file_size as usize,
490            ));
491        }
492
493        let mut footer = [0_u8; 8];
494        chunk_reader
495            .get_read(file_size - 8)?
496            .read_exact(&mut footer)?;
497
498        let metadata_len = Self::decode_footer(&footer)?;
499        let footer_metadata_len = FOOTER_SIZE + metadata_len;
500        self.metadata_size = Some(footer_metadata_len);
501
502        if footer_metadata_len > file_size as usize {
503            return Err(ParquetError::IndexOutOfBound(
504                footer_metadata_len,
505                file_size as usize,
506            ));
507        }
508
509        let start = file_size - footer_metadata_len as u64;
510        Self::decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref())
511    }
512
513    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
514    /// been provided, then return that value if it is larger than the size of the Parquet
515    /// file footer (8 bytes). Otherwise returns `8`.
516    #[cfg(all(feature = "async", feature = "arrow"))]
517    fn get_prefetch_size(&self) -> usize {
518        if let Some(prefetch) = self.prefetch_hint {
519            if prefetch > FOOTER_SIZE {
520                return prefetch;
521            }
522        }
523        FOOTER_SIZE
524    }
525
526    #[cfg(all(feature = "async", feature = "arrow"))]
527    async fn load_metadata<F: MetadataFetch>(
528        fetch: &mut F,
529        file_size: usize,
530        prefetch: usize,
531    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
532        if file_size < FOOTER_SIZE {
533            return Err(eof_err!("file size of {} is less than footer", file_size));
534        }
535
536        // If a size hint is provided, read more than the minimum size
537        // to try and avoid a second fetch.
538        // Note: prefetch > file_size is ok since we're using saturating_sub.
539        let footer_start = file_size.saturating_sub(prefetch);
540
541        let suffix = fetch.fetch(footer_start..file_size).await?;
542        let suffix_len = suffix.len();
543        let fetch_len = file_size - footer_start;
544        if suffix_len < fetch_len {
545            return Err(eof_err!(
546                "metadata requires {} bytes, but could only read {}",
547                fetch_len,
548                suffix_len
549            ));
550        }
551
552        let mut footer = [0; FOOTER_SIZE];
553        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
554
555        let length = Self::decode_footer(&footer)?;
556
557        if file_size < length + FOOTER_SIZE {
558            return Err(eof_err!(
559                "file size of {} is less than footer + metadata {}",
560                file_size,
561                length + FOOTER_SIZE
562            ));
563        }
564
565        // Did not fetch the entire file metadata in the initial read, need to make a second request
566        if length > suffix_len - FOOTER_SIZE {
567            let metadata_start = file_size - length - FOOTER_SIZE;
568            let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
569            Ok((Self::decode_metadata(&meta)?, None))
570        } else {
571            let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
572            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
573            Ok((
574                Self::decode_metadata(slice)?,
575                Some((footer_start, suffix.slice(..metadata_start))),
576            ))
577        }
578    }
579
580    /// Decodes the Parquet footer returning the metadata length in bytes
581    ///
582    /// A parquet footer is 8 bytes long and has the following layout:
583    /// * 4 bytes for the metadata length
584    /// * 4 bytes for the magic bytes 'PAR1'
585    ///
586    /// ```text
587    /// +-----+--------+
588    /// | len | 'PAR1' |
589    /// +-----+--------+
590    /// ```
591    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
592        // check this is indeed a parquet file
593        if slice[4..] != PARQUET_MAGIC {
594            return Err(general_err!("Invalid Parquet file. Corrupt footer"));
595        }
596
597        // get the metadata length from the footer
598        let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
599        // u32 won't be larger than usize in most cases
600        Ok(metadata_len as usize)
601    }
602
603    /// Decodes [`ParquetMetaData`] from the provided bytes.
604    ///
605    /// Typically this is used to decode the metadata from the end of a parquet
606    /// file. The format of `buf` is the Thift compact binary protocol, as specified
607    /// by the [Parquet Spec].
608    ///
609    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
610    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
611        let mut prot = TCompactSliceInputProtocol::new(buf);
612        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
613            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
614        let schema = types::from_thrift(&t_file_metadata.schema)?;
615        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
616        let mut row_groups = Vec::new();
617        for rg in t_file_metadata.row_groups {
618            row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
619        }
620        let column_orders = Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);
621
622        let file_metadata = FileMetaData::new(
623            t_file_metadata.version,
624            t_file_metadata.num_rows,
625            t_file_metadata.created_by,
626            t_file_metadata.key_value_metadata,
627            schema_descr,
628            column_orders,
629        );
630        Ok(ParquetMetaData::new(file_metadata, row_groups))
631    }
632
633    /// Parses column orders from Thrift definition.
634    /// If no column orders are defined, returns `None`.
635    fn parse_column_orders(
636        t_column_orders: Option<Vec<TColumnOrder>>,
637        schema_descr: &SchemaDescriptor,
638    ) -> Option<Vec<ColumnOrder>> {
639        match t_column_orders {
640            Some(orders) => {
641                // Should always be the case
642                assert_eq!(
643                    orders.len(),
644                    schema_descr.num_columns(),
645                    "Column order length mismatch"
646                );
647                let mut res = Vec::new();
648                for (i, column) in schema_descr.columns().iter().enumerate() {
649                    match orders[i] {
650                        TColumnOrder::TYPEORDER(_) => {
651                            let sort_order = ColumnOrder::get_sort_order(
652                                column.logical_type(),
653                                column.converted_type(),
654                                column.physical_type(),
655                            );
656                            res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
657                        }
658                    }
659                }
660                Some(res)
661            }
662            None => None,
663        }
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use bytes::Bytes;
671
672    use crate::basic::SortOrder;
673    use crate::basic::Type;
674    use crate::file::reader::Length;
675    use crate::format::TypeDefinedOrder;
676    use crate::schema::types::Type as SchemaType;
677    use crate::util::test_common::file_util::get_test_file;
678
679    #[test]
680    fn test_parse_metadata_size_smaller_than_footer() {
681        let test_file = tempfile::tempfile().unwrap();
682        let err = ParquetMetaDataReader::new()
683            .parse_metadata(&test_file)
684            .unwrap_err();
685        assert!(matches!(err, ParquetError::IndexOutOfBound(8, _)));
686    }
687
688    #[test]
689    fn test_parse_metadata_corrupt_footer() {
690        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
691        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
692        assert_eq!(
693            reader_result.unwrap_err().to_string(),
694            "Parquet error: Invalid Parquet file. Corrupt footer"
695        );
696    }
697
698    #[test]
699    fn test_parse_metadata_invalid_start() {
700        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
701        let err = ParquetMetaDataReader::new()
702            .parse_metadata(&test_file)
703            .unwrap_err();
704        assert!(matches!(err, ParquetError::IndexOutOfBound(263, _)));
705    }
706
707    #[test]
708    fn test_metadata_column_orders_parse() {
709        // Define simple schema, we do not need to provide logical types.
710        let fields = vec![
711            Arc::new(
712                SchemaType::primitive_type_builder("col1", Type::INT32)
713                    .build()
714                    .unwrap(),
715            ),
716            Arc::new(
717                SchemaType::primitive_type_builder("col2", Type::FLOAT)
718                    .build()
719                    .unwrap(),
720            ),
721        ];
722        let schema = SchemaType::group_type_builder("schema")
723            .with_fields(fields)
724            .build()
725            .unwrap();
726        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
727
728        let t_column_orders = Some(vec![
729            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
730            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
731        ]);
732
733        assert_eq!(
734            ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr),
735            Some(vec![
736                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
737                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
738            ])
739        );
740
741        // Test when no column orders are defined.
742        assert_eq!(
743            ParquetMetaDataReader::parse_column_orders(None, &schema_descr),
744            None
745        );
746    }
747
748    #[test]
749    #[should_panic(expected = "Column order length mismatch")]
750    fn test_metadata_column_orders_len_mismatch() {
751        let schema = SchemaType::group_type_builder("schema").build().unwrap();
752        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
753
754        let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
755
756        ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
757    }
758
759    #[test]
760    fn test_try_parse() {
761        let file = get_test_file("alltypes_tiny_pages.parquet");
762        let len = file.len() as usize;
763
764        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
765
766        let bytes_for_range = |range: Range<usize>| {
767            file.get_bytes(range.start as u64, range.end - range.start)
768                .unwrap()
769        };
770
771        // read entire file
772        let bytes = bytes_for_range(0..len);
773        reader.try_parse(&bytes).unwrap();
774        let metadata = reader.finish().unwrap();
775        assert!(metadata.column_index.is_some());
776        assert!(metadata.offset_index.is_some());
777
778        // read more than enough of file
779        let bytes = bytes_for_range(320000..len);
780        reader.try_parse_sized(&bytes, len).unwrap();
781        let metadata = reader.finish().unwrap();
782        assert!(metadata.column_index.is_some());
783        assert!(metadata.offset_index.is_some());
784
785        // exactly enough
786        let bytes = bytes_for_range(323583..len);
787        reader.try_parse_sized(&bytes, len).unwrap();
788        let metadata = reader.finish().unwrap();
789        assert!(metadata.column_index.is_some());
790        assert!(metadata.offset_index.is_some());
791
792        // not enough for page index
793        let bytes = bytes_for_range(323584..len);
794        // should fail
795        match reader.try_parse_sized(&bytes, len).unwrap_err() {
796            // expected error, try again with provided bounds
797            ParquetError::IndexOutOfBound(needed, _) => {
798                let bytes = bytes_for_range(len - needed..len);
799                reader.try_parse_sized(&bytes, len).unwrap();
800                let metadata = reader.finish().unwrap();
801                assert!(metadata.column_index.is_some());
802                assert!(metadata.offset_index.is_some());
803            }
804            _ => panic!("unexpected error"),
805        };
806
807        // not enough for page index but lie about file size
808        let bytes = bytes_for_range(323584..len);
809        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
810        assert_eq!(
811            reader_result.to_string(),
812            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
813        );
814
815        // not enough for file metadata
816        let mut reader = ParquetMetaDataReader::new();
817        let bytes = bytes_for_range(452505..len);
818        // should fail
819        match reader.try_parse_sized(&bytes, len).unwrap_err() {
820            // expected error, try again with provided bounds
821            ParquetError::IndexOutOfBound(needed, _) => {
822                let bytes = bytes_for_range(len - needed..len);
823                reader.try_parse_sized(&bytes, len).unwrap();
824                reader.finish().unwrap();
825            }
826            _ => panic!("unexpected error"),
827        };
828
829        // not enough for file metadata but use try_parse()
830        let reader_result = reader.try_parse(&bytes).unwrap_err();
831        assert_eq!(
832            reader_result.to_string(),
833            "EOF: Parquet file too small. Size is 1728 but need 1729"
834        );
835
836        // read head of file rather than tail
837        let bytes = bytes_for_range(0..1000);
838        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
839        assert_eq!(
840            reader_result.to_string(),
841            "Parquet error: Invalid Parquet file. Corrupt footer"
842        );
843
844        // lie about file size
845        let bytes = bytes_for_range(452510..len);
846        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
847        assert_eq!(
848            reader_result.to_string(),
849            "EOF: Parquet file too small. Size is 1728 but need 1729"
850        );
851    }
852}
853
854#[cfg(all(feature = "async", feature = "arrow", test))]
855mod async_tests {
856    use super::*;
857    use bytes::Bytes;
858    use futures::future::BoxFuture;
859    use futures::FutureExt;
860    use std::fs::File;
861    use std::future::Future;
862    use std::io::{Read, Seek, SeekFrom};
863    use std::ops::Range;
864    use std::sync::atomic::{AtomicUsize, Ordering};
865
866    use crate::arrow::async_reader::MetadataFetch;
867    use crate::file::reader::Length;
868    use crate::util::test_common::file_util::get_test_file;
869
870    struct MetadataFetchFn<F>(F);
871
872    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
873    where
874        F: FnMut(Range<usize>) -> Fut + Send,
875        Fut: Future<Output = Result<Bytes>> + Send,
876    {
877        fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
878            async move { self.0(range).await }.boxed()
879        }
880    }
881
882    fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
883        file.seek(SeekFrom::Start(range.start as _))?;
884        let len = range.end - range.start;
885        let mut buf = Vec::with_capacity(len);
886        file.take(len as _).read_to_end(&mut buf)?;
887        Ok(buf.into())
888    }
889
890    #[tokio::test]
891    async fn test_simple() {
892        let mut file = get_test_file("nulls.snappy.parquet");
893        let len = file.len() as usize;
894
895        let expected = ParquetMetaDataReader::new()
896            .parse_and_finish(&file)
897            .unwrap();
898        let expected = expected.file_metadata().schema();
899        let fetch_count = AtomicUsize::new(0);
900
901        let mut fetch = |range| {
902            fetch_count.fetch_add(1, Ordering::SeqCst);
903            futures::future::ready(read_range(&mut file, range))
904        };
905
906        let input = MetadataFetchFn(&mut fetch);
907        let actual = ParquetMetaDataReader::new()
908            .load_and_finish(input, len)
909            .await
910            .unwrap();
911        assert_eq!(actual.file_metadata().schema(), expected);
912        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
913
914        // Metadata hint too small - below footer size
915        fetch_count.store(0, Ordering::SeqCst);
916        let input = MetadataFetchFn(&mut fetch);
917        let actual = ParquetMetaDataReader::new()
918            .with_prefetch_hint(Some(7))
919            .load_and_finish(input, len)
920            .await
921            .unwrap();
922        assert_eq!(actual.file_metadata().schema(), expected);
923        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
924
925        // Metadata hint too small
926        fetch_count.store(0, Ordering::SeqCst);
927        let input = MetadataFetchFn(&mut fetch);
928        let actual = ParquetMetaDataReader::new()
929            .with_prefetch_hint(Some(10))
930            .load_and_finish(input, len)
931            .await
932            .unwrap();
933        assert_eq!(actual.file_metadata().schema(), expected);
934        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
935
936        // Metadata hint too large
937        fetch_count.store(0, Ordering::SeqCst);
938        let input = MetadataFetchFn(&mut fetch);
939        let actual = ParquetMetaDataReader::new()
940            .with_prefetch_hint(Some(500))
941            .load_and_finish(input, len)
942            .await
943            .unwrap();
944        assert_eq!(actual.file_metadata().schema(), expected);
945        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
946
947        // Metadata hint exactly correct
948        fetch_count.store(0, Ordering::SeqCst);
949        let input = MetadataFetchFn(&mut fetch);
950        let actual = ParquetMetaDataReader::new()
951            .with_prefetch_hint(Some(428))
952            .load_and_finish(input, len)
953            .await
954            .unwrap();
955        assert_eq!(actual.file_metadata().schema(), expected);
956        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
957
958        let input = MetadataFetchFn(&mut fetch);
959        let err = ParquetMetaDataReader::new()
960            .load_and_finish(input, 4)
961            .await
962            .unwrap_err()
963            .to_string();
964        assert_eq!(err, "EOF: file size of 4 is less than footer");
965
966        let input = MetadataFetchFn(&mut fetch);
967        let err = ParquetMetaDataReader::new()
968            .load_and_finish(input, 20)
969            .await
970            .unwrap_err()
971            .to_string();
972        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
973    }
974
975    #[tokio::test]
976    async fn test_page_index() {
977        let mut file = get_test_file("alltypes_tiny_pages.parquet");
978        let len = file.len() as usize;
979        let fetch_count = AtomicUsize::new(0);
980        let mut fetch = |range| {
981            fetch_count.fetch_add(1, Ordering::SeqCst);
982            futures::future::ready(read_range(&mut file, range))
983        };
984
985        let f = MetadataFetchFn(&mut fetch);
986        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
987        loader.try_load(f, len).await.unwrap();
988        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
989        let metadata = loader.finish().unwrap();
990        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
991
992        // Prefetch just footer exactly
993        fetch_count.store(0, Ordering::SeqCst);
994        let f = MetadataFetchFn(&mut fetch);
995        let mut loader = ParquetMetaDataReader::new()
996            .with_page_indexes(true)
997            .with_prefetch_hint(Some(1729));
998        loader.try_load(f, len).await.unwrap();
999        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1000        let metadata = loader.finish().unwrap();
1001        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1002
1003        // Prefetch more than footer but not enough
1004        fetch_count.store(0, Ordering::SeqCst);
1005        let f = MetadataFetchFn(&mut fetch);
1006        let mut loader = ParquetMetaDataReader::new()
1007            .with_page_indexes(true)
1008            .with_prefetch_hint(Some(130649));
1009        loader.try_load(f, len).await.unwrap();
1010        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1011        let metadata = loader.finish().unwrap();
1012        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1013
1014        // Prefetch exactly enough
1015        fetch_count.store(0, Ordering::SeqCst);
1016        let f = MetadataFetchFn(&mut fetch);
1017        let metadata = ParquetMetaDataReader::new()
1018            .with_page_indexes(true)
1019            .with_prefetch_hint(Some(130650))
1020            .load_and_finish(f, len)
1021            .await
1022            .unwrap();
1023        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1024        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1025    }
1026}