parquet/geospatial/
accumulator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides implementations and traits for building [`GeospatialStatistics`]
19
20use std::sync::{Arc, OnceLock};
21
22use crate::{
23    basic::LogicalType, errors::ParquetError, geospatial::statistics::GeospatialStatistics,
24    schema::types::ColumnDescPtr,
25};
26
27/// Create a new [`GeoStatsAccumulator`] instance if `descr` represents a Geometry or
28/// Geography [`LogicalType`]
29///
30/// Returns a suitable [`GeoStatsAccumulator`] if `descr` represents a non-geospatial type
31/// or `None` otherwise.
32pub fn try_new_geo_stats_accumulator(
33    descr: &ColumnDescPtr,
34) -> Option<Box<dyn GeoStatsAccumulator>> {
35    if !matches!(
36        descr.logical_type_ref(),
37        Some(LogicalType::Geometry { .. }) | Some(LogicalType::Geography { .. })
38    ) {
39        return None;
40    }
41
42    Some(
43        ACCUMULATOR_FACTORY
44            .get_or_init(|| Arc::new(DefaultGeoStatsAccumulatorFactory::default()))
45            .new_accumulator(descr),
46    )
47}
48
49/// Initialize the global [`GeoStatsAccumulatorFactory`]
50///
51/// This may only be done once before any calls to [`try_new_geo_stats_accumulator`].
52/// Clients may use this to implement support for builds of the Parquet crate without
53/// geospatial support or to implement support for Geography bounding using external
54/// dependencies.
55pub fn init_geo_stats_accumulator_factory(
56    factory: Arc<dyn GeoStatsAccumulatorFactory>,
57) -> Result<(), ParquetError> {
58    if ACCUMULATOR_FACTORY.set(factory).is_err() {
59        Err(ParquetError::General(
60            "Global GeoStatsAccumulatorFactory already set".to_string(),
61        ))
62    } else {
63        Ok(())
64    }
65}
66
67/// Global accumulator factory instance
68static ACCUMULATOR_FACTORY: OnceLock<Arc<dyn GeoStatsAccumulatorFactory>> = OnceLock::new();
69
70/// Factory for [`GeospatialStatistics`] accumulators
71///
72/// The GeoStatsAccumulatorFactory is a trait implemented by the global factory that
73/// generates new instances of a [`GeoStatsAccumulator`] when constructing new
74/// encoders for a Geometry or Geography logical type.
75pub trait GeoStatsAccumulatorFactory: Send + Sync {
76    /// Create a new [`GeoStatsAccumulator`] appropriate for the logical type of a given
77    /// [`ColumnDescPtr`]
78    fn new_accumulator(&self, descr: &ColumnDescPtr) -> Box<dyn GeoStatsAccumulator>;
79}
80
81/// Dynamic [`GeospatialStatistics`] accumulator
82///
83/// The GeoStatsAccumulator is a trait whose implementors can ingest the (non-null)
84/// elements of a column and return compliant [`GeospatialStatistics`] (or `None`).
85/// When built with geospatial support this will usually be the
86/// [`ParquetGeoStatsAccumulator`]
87pub trait GeoStatsAccumulator: Send {
88    /// Returns true if this instance can return [`GeospatialStatistics`] from
89    /// [`GeoStatsAccumulator::finish`].
90    ///
91    /// This method returns false when this crate is built without geospatial support
92    /// (i.e., from the [`VoidGeoStatsAccumulator`]) or if the accumulator encountered
93    /// invalid or unsupported elements for which it cannot compute valid statistics.
94    fn is_valid(&self) -> bool;
95
96    /// Update with a single slice of WKB-encoded values
97    ///
98    /// This method is infallible; however, in the event of improperly encoded values,
99    /// implementations must ensure that [`GeoStatsAccumulator::finish`] returns `None`.
100    fn update_wkb(&mut self, wkb: &[u8]);
101
102    /// Compute the final statistics and reset internal state
103    fn finish(&mut self) -> Option<Box<GeospatialStatistics>>;
104}
105
106/// Default accumulator for [`GeospatialStatistics`]
107///
108/// When this crate is built with geospatial support, this factory constructs a
109/// [`ParquetGeoStatsAccumulator`] that ensures Geometry columns are written with
110/// statistics when statistics for that column are enabled. Otherwise, this factory
111/// returns a [`VoidGeoStatsAccumulator`] that never adds any geospatial statistics.
112///
113/// Bounding for Geography columns is not currently implemented by parquet-geospatial
114/// and this factory will always return a [`VoidGeoStatsAccumulator`].
115#[derive(Debug, Default)]
116pub struct DefaultGeoStatsAccumulatorFactory {}
117
118impl GeoStatsAccumulatorFactory for DefaultGeoStatsAccumulatorFactory {
119    fn new_accumulator(&self, _descr: &ColumnDescPtr) -> Box<dyn GeoStatsAccumulator> {
120        #[cfg(feature = "geospatial")]
121        if let Some(crate::basic::LogicalType::Geometry { .. }) = _descr.logical_type_ref() {
122            Box::new(ParquetGeoStatsAccumulator::default())
123        } else {
124            Box::new(VoidGeoStatsAccumulator::default())
125        }
126
127        #[cfg(not(feature = "geospatial"))]
128        return Box::new(VoidGeoStatsAccumulator::default());
129    }
130}
131
132/// A [`GeoStatsAccumulator`] that never computes any [`GeospatialStatistics`]
133#[derive(Debug, Default)]
134pub struct VoidGeoStatsAccumulator {}
135
136impl GeoStatsAccumulator for VoidGeoStatsAccumulator {
137    fn is_valid(&self) -> bool {
138        false
139    }
140
141    fn update_wkb(&mut self, _wkb: &[u8]) {}
142
143    fn finish(&mut self) -> Option<Box<GeospatialStatistics>> {
144        None
145    }
146}
147
148/// A [`GeoStatsAccumulator`] that uses the parquet-geospatial crate to compute Geometry statistics
149///
150/// Note that this accumulator only supports Geometry types and will return invalid statistics for
151/// non-point Geography input ([`GeoStatsAccumulatorFactory::new_accumulator`] is responsible
152/// for ensuring an appropriate accumulator based on the logical type).
153#[cfg(feature = "geospatial")]
154#[derive(Debug)]
155pub struct ParquetGeoStatsAccumulator {
156    bounder: parquet_geospatial::bounding::GeometryBounder,
157    invalid: bool,
158}
159
160#[cfg(feature = "geospatial")]
161impl Default for ParquetGeoStatsAccumulator {
162    fn default() -> Self {
163        Self {
164            bounder: parquet_geospatial::bounding::GeometryBounder::empty(),
165            invalid: false,
166        }
167    }
168}
169
170#[cfg(feature = "geospatial")]
171impl GeoStatsAccumulator for ParquetGeoStatsAccumulator {
172    fn is_valid(&self) -> bool {
173        !self.invalid
174    }
175
176    fn update_wkb(&mut self, wkb: &[u8]) {
177        if self.bounder.update_wkb(wkb).is_err() {
178            self.invalid = true;
179        }
180    }
181
182    fn finish(&mut self) -> Option<Box<GeospatialStatistics>> {
183        use parquet_geospatial::interval::IntervalTrait;
184
185        use crate::geospatial::bounding_box::BoundingBox;
186
187        if self.invalid {
188            // Reset
189            self.invalid = false;
190            self.bounder = parquet_geospatial::bounding::GeometryBounder::empty();
191            return None;
192        }
193
194        let bbox = if self.bounder.x().is_empty() || self.bounder.y().is_empty() {
195            None
196        } else {
197            let mut bbox = BoundingBox::new(
198                self.bounder.x().lo(),
199                self.bounder.x().hi(),
200                self.bounder.y().lo(),
201                self.bounder.y().hi(),
202            );
203
204            if !self.bounder.z().is_empty() {
205                bbox = bbox.with_zrange(self.bounder.z().lo(), self.bounder.z().hi());
206            }
207
208            if !self.bounder.m().is_empty() {
209                bbox = bbox.with_mrange(self.bounder.m().lo(), self.bounder.m().hi());
210            }
211
212            Some(bbox)
213        };
214
215        let bounder_geometry_types = self.bounder.geometry_types();
216        let geometry_types = if bounder_geometry_types.is_empty() {
217            None
218        } else {
219            Some(bounder_geometry_types)
220        };
221
222        // Reset
223        self.bounder = parquet_geospatial::bounding::GeometryBounder::empty();
224
225        Some(Box::new(GeospatialStatistics::new(bbox, geometry_types)))
226    }
227}
228
229#[cfg(test)]
230mod test {
231    use super::*;
232
233    #[test]
234    fn test_void_accumulator() {
235        let mut accumulator = VoidGeoStatsAccumulator {};
236        assert!(!accumulator.is_valid());
237        accumulator.update_wkb(&[0x01, 0x02, 0x03]);
238        assert!(accumulator.finish().is_none());
239    }
240
241    #[cfg(feature = "geospatial")]
242    #[test]
243    fn test_default_accumulator_geospatial_factory() {
244        use std::sync::Arc;
245
246        use parquet_geospatial::testing::wkb_point_xy;
247
248        use crate::{
249            basic::LogicalType,
250            geospatial::bounding_box::BoundingBox,
251            schema::types::{ColumnDescriptor, ColumnPath, Type},
252        };
253
254        // Check that we have a working accumulator for Geometry
255        let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
256            .with_logical_type(Some(LogicalType::Geometry { crs: None }))
257            .build()
258            .unwrap();
259        let column_descr =
260            ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
261        let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap();
262
263        assert!(accumulator.is_valid());
264        accumulator.update_wkb(&wkb_point_xy(1.0, 2.0));
265        accumulator.update_wkb(&wkb_point_xy(11.0, 12.0));
266        let stats = accumulator.finish().unwrap();
267        assert_eq!(
268            stats.bounding_box().unwrap(),
269            &BoundingBox::new(1.0, 11.0, 2.0, 12.0)
270        );
271
272        // Check that we have a void accumulator for Geography
273        let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
274            .with_logical_type(Some(LogicalType::Geography {
275                crs: None,
276                algorithm: None,
277            }))
278            .build()
279            .unwrap();
280        let column_descr =
281            ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
282        let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap();
283
284        assert!(!accumulator.is_valid());
285        assert!(accumulator.finish().is_none());
286
287        // Check that we return None if the type is not geometry or goegraphy
288        let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
289            .build()
290            .unwrap();
291        let column_descr =
292            ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
293        assert!(try_new_geo_stats_accumulator(&Arc::new(column_descr)).is_none());
294
295        // We should not be able to initialize a global accumulator after we've initialized at least
296        // one accumulator
297        assert!(
298            init_geo_stats_accumulator_factory(Arc::new(
299                DefaultGeoStatsAccumulatorFactory::default()
300            ))
301            .is_err()
302        )
303    }
304
305    #[cfg(feature = "geospatial")]
306    #[test]
307    fn test_geometry_accumulator() {
308        use parquet_geospatial::testing::{wkb_point_xy, wkb_point_xyzm};
309
310        use crate::geospatial::bounding_box::BoundingBox;
311
312        let mut accumulator = ParquetGeoStatsAccumulator::default();
313
314        // A fresh instance should be able to bound input
315        assert!(accumulator.is_valid());
316        accumulator.update_wkb(&wkb_point_xy(1.0, 2.0));
317        accumulator.update_wkb(&wkb_point_xy(11.0, 12.0));
318        let stats = accumulator.finish().unwrap();
319        assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
320        assert_eq!(
321            stats.bounding_box().unwrap(),
322            &BoundingBox::new(1.0, 11.0, 2.0, 12.0)
323        );
324
325        // finish() should have reset the bounder such that the first values
326        // aren't when computing the next bound of statistics.
327        assert!(accumulator.is_valid());
328        accumulator.update_wkb(&wkb_point_xy(21.0, 22.0));
329        accumulator.update_wkb(&wkb_point_xy(31.0, 32.0));
330        let stats = accumulator.finish().unwrap();
331        assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
332        assert_eq!(
333            stats.bounding_box().unwrap(),
334            &BoundingBox::new(21.0, 31.0, 22.0, 32.0)
335        );
336
337        // When an accumulator encounters invalid input, it reports is_valid() false
338        // and does not compute subsequent statistics
339        assert!(accumulator.is_valid());
340        accumulator.update_wkb(&wkb_point_xy(41.0, 42.0));
341        accumulator.update_wkb("these bytes are not WKB".as_bytes());
342        assert!(!accumulator.is_valid());
343        assert!(accumulator.finish().is_none());
344
345        // Subsequent rounds of accumulation should work as expected
346        assert!(accumulator.is_valid());
347        accumulator.update_wkb(&wkb_point_xy(41.0, 42.0));
348        accumulator.update_wkb(&wkb_point_xy(51.0, 52.0));
349        let stats = accumulator.finish().unwrap();
350        assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
351        assert_eq!(
352            stats.bounding_box().unwrap(),
353            &BoundingBox::new(41.0, 51.0, 42.0, 52.0)
354        );
355
356        // When there was no input at all (occurs in the all null case), both geometry
357        // types and bounding box will be None. This is because Parquet Thrift statistics
358        // have no mechanism to communicate "empty". (The all null situation may be determined
359        // from the null count in this case).
360        assert!(accumulator.is_valid());
361        let stats = accumulator.finish().unwrap();
362        assert!(stats.geospatial_types().is_none());
363        assert!(stats.bounding_box().is_none());
364
365        // When there was 100% "empty" input (i.e., non-null geometries without
366        // coordinates), there should be statistics with geometry types but no
367        // bounding box.
368        assert!(accumulator.is_valid());
369        accumulator.update_wkb(&wkb_point_xy(f64::NAN, f64::NAN));
370        let stats = accumulator.finish().unwrap();
371        assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
372        assert!(stats.bounding_box().is_none());
373
374        // If Z and/or M are present, they should be reported in the bounding box
375        assert!(accumulator.is_valid());
376        accumulator.update_wkb(&wkb_point_xyzm(1.0, 2.0, 3.0, 4.0));
377        accumulator.update_wkb(&wkb_point_xyzm(5.0, 6.0, 7.0, 8.0));
378        let stats = accumulator.finish().unwrap();
379        assert_eq!(stats.geospatial_types().unwrap(), &vec![3001]);
380        assert_eq!(
381            stats.bounding_box().unwrap(),
382            &BoundingBox::new(1.0, 5.0, 2.0, 6.0)
383                .with_zrange(3.0, 7.0)
384                .with_mrange(4.0, 8.0)
385        );
386    }
387}