1use bytemuck::cast_slice_mut;
2use byteorder::{LittleEndian, ReadBytesExt};
3use core::convert::Infallible;
4use std::error::Error;
5use std::io::{self, SeekFrom};
6use std::mem;
7use std::ops::RangeInclusive;
8
9use crate::bitmap::container::Container;
10use crate::bitmap::serialization::{
11 NO_OFFSET_THRESHOLD, SERIAL_COOKIE, SERIAL_COOKIE_NO_RUNCONTAINER,
12};
13use crate::RoaringBitmap;
14
15use super::container::ARRAY_LIMIT;
16use super::store::{ArrayStore, BitmapStore, Store, BITMAP_LENGTH};
17
18impl RoaringBitmap {
19 pub fn intersection_with_serialized_unchecked<R>(&self, other: R) -> io::Result<RoaringBitmap>
45 where
46 R: io::Read + io::Seek,
47 {
48 RoaringBitmap::intersection_with_serialized_impl::<R, _, Infallible, _, Infallible>(
49 self,
50 other,
51 |values| Ok(ArrayStore::from_vec_unchecked(values)),
52 |len, values| Ok(BitmapStore::from_unchecked(len, values)),
53 )
54 }
55
56 fn intersection_with_serialized_impl<R, A, AErr, B, BErr>(
57 &self,
58 mut reader: R,
59 a: A,
60 b: B,
61 ) -> io::Result<RoaringBitmap>
62 where
63 R: io::Read + io::Seek,
64 A: Fn(Vec<u16>) -> Result<ArrayStore, AErr>,
65 AErr: Error + Send + Sync + 'static,
66 B: Fn(u64, Box<[u64; 1024]>) -> Result<BitmapStore, BErr>,
67 BErr: Error + Send + Sync + 'static,
68 {
69 let (size, has_offsets, has_run_containers) = {
71 let cookie = reader.read_u32::<LittleEndian>()?;
72 if cookie == SERIAL_COOKIE_NO_RUNCONTAINER {
73 (reader.read_u32::<LittleEndian>()? as usize, true, false)
74 } else if (cookie as u16) == SERIAL_COOKIE {
75 let size = ((cookie >> 16) + 1) as usize;
76 (size, size >= NO_OFFSET_THRESHOLD, true)
77 } else {
78 return Err(io::Error::other("unknown cookie value"));
79 }
80 };
81
82 let run_container_bitmap = if has_run_containers {
84 let mut bitmap = vec![0u8; size.div_ceil(8)];
85 reader.read_exact(&mut bitmap)?;
86 Some(bitmap)
87 } else {
88 None
89 };
90
91 if size > u16::MAX as usize + 1 {
92 return Err(io::Error::other("size is greater than supported"));
93 }
94
95 let mut descriptions = vec![[0; 2]; size];
97 reader.read_exact(cast_slice_mut(&mut descriptions))?;
98 descriptions.iter_mut().for_each(|[ref mut key, ref mut len]| {
99 *key = u16::from_le(*key);
100 *len = u16::from_le(*len);
101 });
102
103 if has_offsets {
104 let mut offsets = vec![0; size];
105 reader.read_exact(cast_slice_mut(&mut offsets))?;
106 offsets.iter_mut().for_each(|offset| *offset = u32::from_le(*offset));
107 return self.intersection_with_serialized_impl_with_offsets(
108 reader,
109 a,
110 b,
111 &descriptions,
112 &offsets,
113 run_container_bitmap.as_deref(),
114 );
115 }
116
117 let mut containers = Vec::new();
119 for (i, &[key, len_minus_one]) in descriptions.iter().enumerate() {
120 let container = match self.containers.binary_search_by_key(&key, |c| c.key) {
121 Ok(index) => self.containers.get(index),
122 Err(_) => None,
123 };
124 let cardinality = u64::from(len_minus_one) + 1;
125
126 let is_run_container =
128 run_container_bitmap.as_ref().is_some_and(|bm| bm[i / 8] & (1 << (i % 8)) != 0);
129
130 let store = if is_run_container {
131 let runs = reader.read_u16::<LittleEndian>()?;
132 match container {
133 Some(_) => {
134 let mut intervals = vec![[0, 0]; runs as usize];
135 reader.read_exact(cast_slice_mut(&mut intervals))?;
136 intervals.iter_mut().for_each(|[s, len]| {
137 *s = u16::from_le(*s);
138 *len = u16::from_le(*len);
139 });
140
141 let cardinality = intervals.iter().map(|[_, len]| *len as usize).sum();
142 let mut store = Store::with_capacity(cardinality);
143 intervals.into_iter().try_for_each(
144 |[s, len]| -> Result<(), io::ErrorKind> {
145 let end = s.checked_add(len).ok_or(io::ErrorKind::InvalidData)?;
146 store.insert_range(RangeInclusive::new(s, end));
147 Ok(())
148 },
149 )?;
150 store
151 }
152 None => {
153 let runs_size = mem::size_of::<u16>() * 2 * runs as usize;
154 reader.seek(SeekFrom::Current(runs_size as i64))?;
155 continue;
156 }
157 }
158 } else if cardinality <= ARRAY_LIMIT {
159 match container {
160 Some(_) => {
161 let mut values = vec![0; cardinality as usize];
162 reader.read_exact(cast_slice_mut(&mut values))?;
163 values.iter_mut().for_each(|n| *n = u16::from_le(*n));
164 let array =
165 a(values).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
166 Store::Array(array)
167 }
168 None => {
169 let array_size = mem::size_of::<u16>() * cardinality as usize;
170 reader.seek(SeekFrom::Current(array_size as i64))?;
171 continue;
172 }
173 }
174 } else {
175 match container {
176 Some(_) => {
177 let mut values = Box::new([0; BITMAP_LENGTH]);
178 reader.read_exact(cast_slice_mut(&mut values[..]))?;
179 values.iter_mut().for_each(|n| *n = u64::from_le(*n));
180 let bitmap = b(cardinality, values)
181 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
182 Store::Bitmap(bitmap)
183 }
184 None => {
185 let bitmap_size = mem::size_of::<u64>() * BITMAP_LENGTH;
186 reader.seek(SeekFrom::Current(bitmap_size as i64))?;
187 continue;
188 }
189 }
190 };
191
192 if let Some(container) = container {
193 let mut other_container = Container { key, store };
194 other_container &= container;
195 if !other_container.is_empty() {
196 containers.push(other_container);
197 }
198 }
199 }
200
201 Ok(RoaringBitmap { containers })
202 }
203
204 fn intersection_with_serialized_impl_with_offsets<R, A, AErr, B, BErr>(
205 &self,
206 mut reader: R,
207 a: A,
208 b: B,
209 descriptions: &[[u16; 2]],
210 offsets: &[u32],
211 run_container_bitmap: Option<&[u8]>,
212 ) -> io::Result<RoaringBitmap>
213 where
214 R: io::Read + io::Seek,
215 A: Fn(Vec<u16>) -> Result<ArrayStore, AErr>,
216 AErr: Error + Send + Sync + 'static,
217 B: Fn(u64, Box<[u64; 1024]>) -> Result<BitmapStore, BErr>,
218 BErr: Error + Send + Sync + 'static,
219 {
220 let mut containers = Vec::new();
221 for container in &self.containers {
222 let i = match descriptions.binary_search_by_key(&container.key, |[k, _]| *k) {
223 Ok(index) => index,
224 Err(_) => continue,
225 };
226
227 reader.seek(SeekFrom::Start(offsets[i] as u64))?;
229
230 let [key, len_minus_one] = descriptions[i];
231 let cardinality = u64::from(len_minus_one) + 1;
232
233 let is_run_container =
235 run_container_bitmap.as_ref().is_some_and(|bm| bm[i / 8] & (1 << (i % 8)) != 0);
236
237 let store = if is_run_container {
238 let runs = reader.read_u16::<LittleEndian>().unwrap();
239 let mut intervals = vec![[0, 0]; runs as usize];
240 reader.read_exact(cast_slice_mut(&mut intervals)).unwrap();
241 intervals.iter_mut().for_each(|[s, len]| {
242 *s = u16::from_le(*s);
243 *len = u16::from_le(*len);
244 });
245
246 let cardinality = intervals.iter().map(|[_, len]| *len as usize).sum();
247 let mut store = Store::with_capacity(cardinality);
248 intervals.into_iter().try_for_each(|[s, len]| -> Result<(), io::ErrorKind> {
249 let end = s.checked_add(len).ok_or(io::ErrorKind::InvalidData)?;
250 store.insert_range(RangeInclusive::new(s, end));
251 Ok(())
252 })?;
253 store
254 } else if cardinality <= ARRAY_LIMIT {
255 let mut values = vec![0; cardinality as usize];
256 reader.read_exact(cast_slice_mut(&mut values)).unwrap();
257 values.iter_mut().for_each(|n| *n = u16::from_le(*n));
258 let array = a(values).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
259 Store::Array(array)
260 } else {
261 let mut values = Box::new([0; BITMAP_LENGTH]);
262 reader.read_exact(cast_slice_mut(&mut values[..])).unwrap();
263 values.iter_mut().for_each(|n| *n = u64::from_le(*n));
264 let bitmap = b(cardinality, values)
265 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
266 Store::Bitmap(bitmap)
267 };
268
269 let mut other_container = Container { key, store };
270 other_container &= container;
271 if !other_container.is_empty() {
272 containers.push(other_container);
273 }
274 }
275
276 Ok(RoaringBitmap { containers })
277 }
278}
279
280#[cfg(test)]
281mod test {
282 use crate::RoaringBitmap;
283 use proptest::prelude::*;
284 use std::io::Cursor;
285
286 proptest! {
288 #[test]
289 fn intersection_with_serialized_eq_materialized_intersection(
290 a in RoaringBitmap::arbitrary(),
291 b in RoaringBitmap::arbitrary()
292 ) {
293 let mut serialized_bytes_b = Vec::new();
294 b.serialize_into(&mut serialized_bytes_b).unwrap();
295 let serialized_bytes_b = &serialized_bytes_b[..];
296
297 prop_assert_eq!(a.intersection_with_serialized_unchecked(Cursor::new(serialized_bytes_b)).unwrap(), a & b);
298 }
299 }
300}