use crate::data_type::AsBytes;
use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
SplitBlockAlgorithm, Uncompressed, XxHash,
};
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
use bytes::Bytes;
use std::hash::Hasher;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use twox_hash::XxHash64;
const SALT: [u32; 8] = [
0x47b6137b_u32,
0x44974d91_u32,
0x8824ad5b_u32,
0xa2b7289d_u32,
0x705495c7_u32,
0x2df1424b_u32,
0x9efc4947_u32,
0x5c6bfb31_u32,
];
#[derive(Debug, Copy, Clone)]
struct Block([u32; 8]);
impl Block {
const ZERO: Block = Block([0; 8]);
fn mask(x: u32) -> Self {
let mut result = [0_u32; 8];
for i in 0..8 {
let y = x.wrapping_mul(SALT[i]);
let y = y >> 27;
result[i] = 1 << y;
}
Self(result)
}
#[inline]
#[cfg(target_endian = "little")]
fn to_le_bytes(self) -> [u8; 32] {
self.to_ne_bytes()
}
#[inline]
#[cfg(not(target_endian = "little"))]
fn to_le_bytes(self) -> [u8; 32] {
self.swap_bytes().to_ne_bytes()
}
#[inline]
fn to_ne_bytes(self) -> [u8; 32] {
unsafe { std::mem::transmute(self) }
}
#[inline]
#[cfg(not(target_endian = "little"))]
fn swap_bytes(mut self) -> Self {
self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
self
}
fn insert(&mut self, hash: u32) {
let mask = Self::mask(hash);
for i in 0..8 {
self[i] |= mask[i];
}
}
fn check(&self, hash: u32) -> bool {
let mask = Self::mask(hash);
for i in 0..8 {
if self[i] & mask[i] == 0 {
return false;
}
}
true
}
}
impl std::ops::Index<usize> for Block {
type Output = u32;
#[inline]
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
}
}
impl std::ops::IndexMut<usize> for Block {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
self.0.index_mut(index)
}
}
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);
pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
pub(crate) fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
Ok((header, offset + length))
}
#[inline]
pub(crate) fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
.map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
Ok((header, (total_length - prot.as_slice().len()) as u64))
}
pub(crate) const BITSET_MIN_LENGTH: usize = 32;
pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
#[inline]
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
num_bytes.next_power_of_two()
}
#[inline]
fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
num_bits as usize
}
impl Sbbf {
pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
if !(0.0..1.0).contains(&fpp) {
return Err(ParquetError::General(format!(
"False positive probability must be between 0.0 and 1.0, got {fpp}"
)));
}
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Ok(Self::new_with_num_of_bytes(num_bits / 8))
}
pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
let bitset = vec![0_u8; num_bytes];
Self::new(&bitset)
}
pub(crate) fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
let mut block = Block::ZERO;
for (i, word) in chunk.chunks_exact(4).enumerate() {
block[i] = u32::from_le_bytes(word.try_into().unwrap());
}
block
})
.collect::<Vec<Block>>();
Self(data)
}
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {e}"))
})?;
protocol.flush()?;
self.write_bitset(&mut writer)?;
Ok(())
}
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
writer
.write_all(block.to_le_bytes().as_slice())
.map_err(|e| {
ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
})?;
}
Ok(())
}
fn header(&self) -> BloomFilterHeader {
BloomFilterHeader {
num_bytes: self.0.len() as i32 * 4 * 8,
algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
hash: BloomFilterHash::XXHASH(XxHash {}),
compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
}
}
pub(crate) fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
) -> Result<Option<Self>, ParquetError> {
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset
.try_into()
.map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
} else {
return Ok(None);
};
let buffer = match column_metadata.bloom_filter_length() {
Some(length) => reader.get_bytes(offset, length as usize),
None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
}?;
let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED(_) => {
}
}
match header.hash {
BloomFilterHash::XXHASH(_) => {
}
}
let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset - offset) as usize..),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
reader.get_bytes(bitset_offset, bitset_length)?
}
};
Ok(Some(Self::new(&bitset)))
}
#[inline]
fn hash_to_block_index(&self, hash: u64) -> usize {
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}
pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
self.insert_hash(hash_as_bytes(value));
}
fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].insert(hash as u32)
}
pub fn check<T: AsBytes>(&self, value: &T) -> bool {
self.check_hash(hash_as_bytes(value))
}
fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}
}
const SEED: u64 = 0;
#[inline]
fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
hasher.write(value.as_bytes());
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_bytes() {
assert_eq!(hash_as_bytes(""), 17241709254077376921);
}
#[test]
fn test_mask_set_quick_check() {
for i in 0..1_000_000 {
let result = Block::mask(i);
assert!(result.0.iter().all(|&x| x.count_ones() == 1));
}
}
#[test]
fn test_block_insert_and_check() {
for i in 0..1_000_000 {
let mut block = Block::ZERO;
block.insert(i);
assert!(block.check(i));
}
}
#[test]
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
for i in 0..1_000_000 {
sbbf.insert(&i);
assert!(sbbf.check(&i));
}
}
#[test]
fn test_with_fixture() {
let bitset: &[u8] = &[
200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
];
let sbbf = Sbbf::new(bitset);
for a in 0..10i64 {
let value = format!("a{a}");
assert!(sbbf.check(&value.as_str()));
}
}
#[test]
fn test_bloom_filter_header_size_assumption() {
let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
let (
BloomFilterHeader {
algorithm,
compression,
hash,
num_bytes,
},
read_length,
) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
assert_eq!(read_length, 15);
assert_eq!(
algorithm,
BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
);
assert_eq!(
compression,
BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
);
assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
assert_eq!(num_bytes, 32_i32);
assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
}
#[test]
fn test_optimal_num_of_bytes() {
for (input, expected) in &[
(0, 32),
(9, 32),
(31, 32),
(32, 32),
(33, 64),
(99, 128),
(1024, 1024),
(999_000_000, 128 * 1024 * 1024),
] {
assert_eq!(*expected, optimal_num_of_bytes(*input));
}
}
#[test]
fn test_num_of_bits_from_ndv_fpp() {
for (fpp, ndv, num_bits) in &[
(0.1, 10, 57),
(0.01, 10, 96),
(0.001, 10, 146),
(0.1, 100, 577),
(0.01, 100, 968),
(0.001, 100, 1460),
(0.1, 1000, 5772),
(0.01, 1000, 9681),
(0.001, 1000, 14607),
(0.1, 10000, 57725),
(0.01, 10000, 96815),
(0.001, 10000, 146076),
(0.1, 100000, 577254),
(0.01, 100000, 968152),
(0.001, 100000, 1460769),
(0.1, 1000000, 5772541),
(0.01, 1000000, 9681526),
(0.001, 1000000, 14607697),
(1e-50, 1_000_000_000_000, 14226231280773240832),
] {
assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
}
}
}