1use std::sync::{Arc, OnceLock};
21
22use crate::{
23 basic::LogicalType, errors::ParquetError, geospatial::statistics::GeospatialStatistics,
24 schema::types::ColumnDescPtr,
25};
26
27pub fn try_new_geo_stats_accumulator(
33 descr: &ColumnDescPtr,
34) -> Option<Box<dyn GeoStatsAccumulator>> {
35 if !matches!(
36 descr.logical_type_ref(),
37 Some(LogicalType::Geometry { .. }) | Some(LogicalType::Geography { .. })
38 ) {
39 return None;
40 }
41
42 Some(
43 ACCUMULATOR_FACTORY
44 .get_or_init(|| Arc::new(DefaultGeoStatsAccumulatorFactory::default()))
45 .new_accumulator(descr),
46 )
47}
48
49pub fn init_geo_stats_accumulator_factory(
56 factory: Arc<dyn GeoStatsAccumulatorFactory>,
57) -> Result<(), ParquetError> {
58 if ACCUMULATOR_FACTORY.set(factory).is_err() {
59 Err(ParquetError::General(
60 "Global GeoStatsAccumulatorFactory already set".to_string(),
61 ))
62 } else {
63 Ok(())
64 }
65}
66
67static ACCUMULATOR_FACTORY: OnceLock<Arc<dyn GeoStatsAccumulatorFactory>> = OnceLock::new();
69
70pub trait GeoStatsAccumulatorFactory: Send + Sync {
76 fn new_accumulator(&self, descr: &ColumnDescPtr) -> Box<dyn GeoStatsAccumulator>;
79}
80
81pub trait GeoStatsAccumulator: Send {
88 fn is_valid(&self) -> bool;
95
96 fn update_wkb(&mut self, wkb: &[u8]);
101
102 fn finish(&mut self) -> Option<Box<GeospatialStatistics>>;
104}
105
106#[derive(Debug, Default)]
116pub struct DefaultGeoStatsAccumulatorFactory {}
117
118impl GeoStatsAccumulatorFactory for DefaultGeoStatsAccumulatorFactory {
119 fn new_accumulator(&self, _descr: &ColumnDescPtr) -> Box<dyn GeoStatsAccumulator> {
120 #[cfg(feature = "geospatial")]
121 if let Some(crate::basic::LogicalType::Geometry { .. }) = _descr.logical_type_ref() {
122 Box::new(ParquetGeoStatsAccumulator::default())
123 } else {
124 Box::new(VoidGeoStatsAccumulator::default())
125 }
126
127 #[cfg(not(feature = "geospatial"))]
128 return Box::new(VoidGeoStatsAccumulator::default());
129 }
130}
131
132#[derive(Debug, Default)]
134pub struct VoidGeoStatsAccumulator {}
135
136impl GeoStatsAccumulator for VoidGeoStatsAccumulator {
137 fn is_valid(&self) -> bool {
138 false
139 }
140
141 fn update_wkb(&mut self, _wkb: &[u8]) {}
142
143 fn finish(&mut self) -> Option<Box<GeospatialStatistics>> {
144 None
145 }
146}
147
148#[cfg(feature = "geospatial")]
154#[derive(Debug)]
155pub struct ParquetGeoStatsAccumulator {
156 bounder: parquet_geospatial::bounding::GeometryBounder,
157 invalid: bool,
158}
159
160#[cfg(feature = "geospatial")]
161impl Default for ParquetGeoStatsAccumulator {
162 fn default() -> Self {
163 Self {
164 bounder: parquet_geospatial::bounding::GeometryBounder::empty(),
165 invalid: false,
166 }
167 }
168}
169
170#[cfg(feature = "geospatial")]
171impl GeoStatsAccumulator for ParquetGeoStatsAccumulator {
172 fn is_valid(&self) -> bool {
173 !self.invalid
174 }
175
176 fn update_wkb(&mut self, wkb: &[u8]) {
177 if self.bounder.update_wkb(wkb).is_err() {
178 self.invalid = true;
179 }
180 }
181
182 fn finish(&mut self) -> Option<Box<GeospatialStatistics>> {
183 use parquet_geospatial::interval::IntervalTrait;
184
185 use crate::geospatial::bounding_box::BoundingBox;
186
187 if self.invalid {
188 self.invalid = false;
190 self.bounder = parquet_geospatial::bounding::GeometryBounder::empty();
191 return None;
192 }
193
194 let bbox = if self.bounder.x().is_empty() || self.bounder.y().is_empty() {
195 None
196 } else {
197 let mut bbox = BoundingBox::new(
198 self.bounder.x().lo(),
199 self.bounder.x().hi(),
200 self.bounder.y().lo(),
201 self.bounder.y().hi(),
202 );
203
204 if !self.bounder.z().is_empty() {
205 bbox = bbox.with_zrange(self.bounder.z().lo(), self.bounder.z().hi());
206 }
207
208 if !self.bounder.m().is_empty() {
209 bbox = bbox.with_mrange(self.bounder.m().lo(), self.bounder.m().hi());
210 }
211
212 Some(bbox)
213 };
214
215 let bounder_geometry_types = self.bounder.geometry_types();
216 let geometry_types = if bounder_geometry_types.is_empty() {
217 None
218 } else {
219 Some(bounder_geometry_types)
220 };
221
222 self.bounder = parquet_geospatial::bounding::GeometryBounder::empty();
224
225 Some(Box::new(GeospatialStatistics::new(bbox, geometry_types)))
226 }
227}
228
229#[cfg(test)]
230mod test {
231 use super::*;
232
233 #[test]
234 fn test_void_accumulator() {
235 let mut accumulator = VoidGeoStatsAccumulator {};
236 assert!(!accumulator.is_valid());
237 accumulator.update_wkb(&[0x01, 0x02, 0x03]);
238 assert!(accumulator.finish().is_none());
239 }
240
241 #[cfg(feature = "geospatial")]
242 #[test]
243 fn test_default_accumulator_geospatial_factory() {
244 use std::sync::Arc;
245
246 use parquet_geospatial::testing::wkb_point_xy;
247
248 use crate::{
249 basic::LogicalType,
250 geospatial::bounding_box::BoundingBox,
251 schema::types::{ColumnDescriptor, ColumnPath, Type},
252 };
253
254 let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
256 .with_logical_type(Some(LogicalType::Geometry { crs: None }))
257 .build()
258 .unwrap();
259 let column_descr =
260 ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
261 let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap();
262
263 assert!(accumulator.is_valid());
264 accumulator.update_wkb(&wkb_point_xy(1.0, 2.0));
265 accumulator.update_wkb(&wkb_point_xy(11.0, 12.0));
266 let stats = accumulator.finish().unwrap();
267 assert_eq!(
268 stats.bounding_box().unwrap(),
269 &BoundingBox::new(1.0, 11.0, 2.0, 12.0)
270 );
271
272 let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
274 .with_logical_type(Some(LogicalType::Geography {
275 crs: None,
276 algorithm: None,
277 }))
278 .build()
279 .unwrap();
280 let column_descr =
281 ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
282 let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap();
283
284 assert!(!accumulator.is_valid());
285 assert!(accumulator.finish().is_none());
286
287 let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY)
289 .build()
290 .unwrap();
291 let column_descr =
292 ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![]));
293 assert!(try_new_geo_stats_accumulator(&Arc::new(column_descr)).is_none());
294
295 assert!(
298 init_geo_stats_accumulator_factory(Arc::new(
299 DefaultGeoStatsAccumulatorFactory::default()
300 ))
301 .is_err()
302 )
303 }
304
305 #[cfg(feature = "geospatial")]
306 #[test]
307 fn test_geometry_accumulator() {
308 use parquet_geospatial::testing::{wkb_point_xy, wkb_point_xyzm};
309
310 use crate::geospatial::bounding_box::BoundingBox;
311
312 let mut accumulator = ParquetGeoStatsAccumulator::default();
313
314 assert!(accumulator.is_valid());
316 accumulator.update_wkb(&wkb_point_xy(1.0, 2.0));
317 accumulator.update_wkb(&wkb_point_xy(11.0, 12.0));
318 let stats = accumulator.finish().unwrap();
319 assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
320 assert_eq!(
321 stats.bounding_box().unwrap(),
322 &BoundingBox::new(1.0, 11.0, 2.0, 12.0)
323 );
324
325 assert!(accumulator.is_valid());
328 accumulator.update_wkb(&wkb_point_xy(21.0, 22.0));
329 accumulator.update_wkb(&wkb_point_xy(31.0, 32.0));
330 let stats = accumulator.finish().unwrap();
331 assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
332 assert_eq!(
333 stats.bounding_box().unwrap(),
334 &BoundingBox::new(21.0, 31.0, 22.0, 32.0)
335 );
336
337 assert!(accumulator.is_valid());
340 accumulator.update_wkb(&wkb_point_xy(41.0, 42.0));
341 accumulator.update_wkb("these bytes are not WKB".as_bytes());
342 assert!(!accumulator.is_valid());
343 assert!(accumulator.finish().is_none());
344
345 assert!(accumulator.is_valid());
347 accumulator.update_wkb(&wkb_point_xy(41.0, 42.0));
348 accumulator.update_wkb(&wkb_point_xy(51.0, 52.0));
349 let stats = accumulator.finish().unwrap();
350 assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
351 assert_eq!(
352 stats.bounding_box().unwrap(),
353 &BoundingBox::new(41.0, 51.0, 42.0, 52.0)
354 );
355
356 assert!(accumulator.is_valid());
361 let stats = accumulator.finish().unwrap();
362 assert!(stats.geospatial_types().is_none());
363 assert!(stats.bounding_box().is_none());
364
365 assert!(accumulator.is_valid());
369 accumulator.update_wkb(&wkb_point_xy(f64::NAN, f64::NAN));
370 let stats = accumulator.finish().unwrap();
371 assert_eq!(stats.geospatial_types().unwrap(), &vec![1]);
372 assert!(stats.bounding_box().is_none());
373
374 assert!(accumulator.is_valid());
376 accumulator.update_wkb(&wkb_point_xyzm(1.0, 2.0, 3.0, 4.0));
377 accumulator.update_wkb(&wkb_point_xyzm(5.0, 6.0, 7.0, 8.0));
378 let stats = accumulator.finish().unwrap();
379 assert_eq!(stats.geospatial_types().unwrap(), &vec![3001]);
380 assert_eq!(
381 stats.bounding_box().unwrap(),
382 &BoundingBox::new(1.0, 5.0, 2.0, 6.0)
383 .with_zrange(3.0, 7.0)
384 .with_mrange(4.0, 8.0)
385 );
386 }
387}