1use std::cmp::Ordering;
14use std::collections::VecDeque;
15use timely::Container;
16use timely::container::{ContainerBuilder, PushInto};
17use crate::Data;
18use crate::difference::{IsZero, Semigroup};
19
20#[inline]
26pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
27 consolidate_from(vec, 0);
28}
29
30#[inline]
36pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
37 let length = consolidate_slice(&mut vec[offset..]);
38 vec.truncate(offset + length);
39}
40
41#[inline]
43pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
44 if slice.len() > 1 {
45 consolidate_slice_slow(slice)
46 }
47 else {
48 slice.iter().filter(|x| !x.1.is_zero()).count()
49 }
50}
51
52fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
54 slice.sort_by(|x,y| x.0.cmp(&y.0));
57
58 let mut offset = 0;
60 let mut accum = slice[offset].1.clone();
61
62 for index in 1 .. slice.len() {
63 if slice[index].0 == slice[index-1].0 {
64 accum.plus_equals(&slice[index].1);
65 }
66 else {
67 if !accum.is_zero() {
68 slice.swap(offset, index-1);
69 slice[offset].1.clone_from(&accum);
70 offset += 1;
71 }
72 accum.clone_from(&slice[index].1);
73 }
74 }
75 if !accum.is_zero() {
76 slice.swap(offset, slice.len()-1);
77 slice[offset].1 = accum;
78 offset += 1;
79 }
80
81 offset
82}
83
84#[inline]
90pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
91 consolidate_updates_from(vec, 0);
92}
93
94#[inline]
100pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
101 let length = consolidate_updates_slice(&mut vec[offset..]);
102 vec.truncate(offset + length);
103}
104
105#[inline]
107pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
108
109 if slice.len() > 1 {
110 consolidate_updates_slice_slow(slice)
111 }
112 else {
113 slice.iter().filter(|x| !x.2.is_zero()).count()
114 }
115}
116
117fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
119 slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
122
123 let mut offset = 0;
125 let mut accum = slice[offset].2.clone();
126
127 for index in 1 .. slice.len() {
128 if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
129 accum.plus_equals(&slice[index].2);
130 }
131 else {
132 if !accum.is_zero() {
133 slice.swap(offset, index-1);
134 slice[offset].2.clone_from(&accum);
135 offset += 1;
136 }
137 accum.clone_from(&slice[index].2);
138 }
139 }
140 if !accum.is_zero() {
141 slice.swap(offset, slice.len()-1);
142 slice[offset].2 = accum;
143 offset += 1;
144 }
145
146 offset
147}
148
149
150#[derive(Default)]
153pub struct ConsolidatingContainerBuilder<C>{
154 current: C,
155 empty: Vec<C>,
156 outbound: VecDeque<C>,
157}
158
159impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
160where
161 D: Data,
162 T: Data,
163 R: Semigroup+'static,
164{
165 #[cold]
168 fn consolidate_and_flush_through(&mut self, multiple: usize) {
169 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
170 consolidate_updates(&mut self.current);
171 let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
172 while drain.peek().is_some() {
173 let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
174 container.clear();
175 container.extend((&mut drain).take(preferred_capacity));
176 self.outbound.push_back(container);
177 }
178 }
179}
180
181impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
182where
183 D: Data,
184 T: Data,
185 R: Semigroup+'static,
186 Vec<(D, T, R)>: PushInto<P>,
187{
188 #[inline]
192 fn push_into(&mut self, item: P) {
193 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
194 if self.current.capacity() < preferred_capacity * 2 {
195 self.current.reserve(preferred_capacity * 2 - self.current.capacity());
196 }
197 self.current.push_into(item);
198 if self.current.len() == self.current.capacity() {
199 self.consolidate_and_flush_through(preferred_capacity);
201 }
202 }
203}
204
205impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
206where
207 D: Data,
208 T: Data,
209 R: Semigroup+'static,
210{
211 type Container = Vec<(D,T,R)>;
212
213 #[inline]
214 fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
215 if let Some(container) = self.outbound.pop_front() {
216 self.empty.push(container);
217 self.empty.last_mut()
218 } else {
219 None
220 }
221 }
222
223 #[inline]
224 fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
225 if !self.current.is_empty() {
226 self.consolidate_and_flush_through(1);
228 self.empty.truncate(2);
231 }
232 self.extract()
233 }
234}
235
236pub trait ConsolidateLayout: Container {
243 type Key<'a>: Eq where Self: 'a;
245
246 type Diff<'a>;
248
249 type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
251
252 fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;
254
255 fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
257
258 fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
268
269 fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
271
272 fn consolidate_into(&mut self, target: &mut Self) {
274 let mut permutation = Vec::with_capacity(self.len());
276 permutation.extend(self.drain());
277 permutation.sort_by(|a, b| Self::cmp(a, b));
278
279 let mut iter = permutation.drain(..);
281 if let Some(item) = iter.next() {
282
283 let (k, d) = Self::into_parts(item);
284 let mut prev_key = k;
285 let mut prev_diff = Self::owned_diff(d);
286
287 for item in iter {
288 let (next_key, next_diff) = Self::into_parts(item);
289 if next_key == prev_key {
290 prev_diff.plus_equals(&next_diff);
291 }
292 else {
293 if !prev_diff.is_zero() {
294 target.push_with_diff(prev_key, prev_diff);
295 }
296 prev_key = next_key;
297 prev_diff = Self::owned_diff(next_diff);
298 }
299 }
300
301 if !prev_diff.is_zero() {
302 target.push_with_diff(prev_key, prev_diff);
303 }
304 }
305 }
306}
307
308impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
309where
310 D: Ord + Clone + 'static,
311 T: Ord + Clone + 'static,
312 R: Semigroup + Clone + 'static,
313{
314 type Key<'a> = (D, T) where Self: 'a;
315 type Diff<'a> = R where Self: 'a;
316 type DiffOwned = R;
317
318 fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }
319
320 fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
321 ((data, time), diff)
322 }
323
324 fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
325 (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
326 }
327
328 fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
329 self.push((data, time, diff));
330 }
331
332 fn consolidate_into(&mut self, target: &mut Self) {
334 consolidate_updates(self);
335 std::mem::swap(self, target);
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn test_consolidate() {
345 let test_cases = vec![
346 (
347 vec![("a", -1), ("b", -2), ("a", 1)],
348 vec![("b", -2)],
349 ),
350 (
351 vec![("a", -1), ("b", 0), ("a", 1)],
352 vec![],
353 ),
354 (
355 vec![("a", 0)],
356 vec![],
357 ),
358 (
359 vec![("a", 0), ("b", 0)],
360 vec![],
361 ),
362 (
363 vec![("a", 1), ("b", 1)],
364 vec![("a", 1), ("b", 1)],
365 ),
366 ];
367
368 for (mut input, output) in test_cases {
369 consolidate(&mut input);
370 assert_eq!(input, output);
371 }
372 }
373
374
375 #[test]
376 fn test_consolidate_updates() {
377 let test_cases = vec![
378 (
379 vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
380 vec![("b", 1, -2)],
381 ),
382 (
383 vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
384 vec![],
385 ),
386 (
387 vec![("a", 1, 0)],
388 vec![],
389 ),
390 (
391 vec![("a", 1, 0), ("b", 1, 0)],
392 vec![],
393 ),
394 (
395 vec![("a", 1, 1), ("b", 2, 1)],
396 vec![("a", 1, 1), ("b", 2, 1)],
397 ),
398 ];
399
400 for (mut input, output) in test_cases {
401 consolidate_updates(&mut input);
402 assert_eq!(input, output);
403 }
404 }
405
406 #[test]
407 fn test_consolidating_container_builder() {
408 let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
409 for _ in 0..1024 {
410 ccb.push_into((0, 0, 0));
411 }
412 assert_eq!(ccb.extract(), None);
413 assert_eq!(ccb.finish(), None);
414
415 for i in 0..1024 {
416 ccb.push_into((i, 0, 1));
417 }
418
419 let mut collected = Vec::default();
420 while let Some(container) = ccb.finish() {
421 collected.append(container);
422 }
423 collected.sort();
425 for i in 0..1024 {
426 assert_eq!((i, 0, 1), collected[i]);
427 }
428 }
429
430 #[test]
431 fn test_consolidate_into() {
432 let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
433 let mut target = Vec::default();
434 data.sort();
435 data.consolidate_into(&mut target);
436 assert_eq!(target, [(2, 1, 1)]);
437 }
438
439 #[cfg(not(debug_assertions))]
440 const LEN: usize = 256 << 10;
441 #[cfg(not(debug_assertions))]
442 const REPS: usize = 10 << 10;
443
444 #[cfg(debug_assertions)]
445 const LEN: usize = 256 << 1;
446 #[cfg(debug_assertions)]
447 const REPS: usize = 10 << 1;
448
449 #[test]
450 fn test_consolidator_duration() {
451 let mut data = Vec::with_capacity(LEN);
452 let mut data2 = Vec::with_capacity(LEN);
453 let mut target = Vec::new();
454 let mut duration = std::time::Duration::default();
455 for _ in 0..REPS {
456 data.clear();
457 data2.clear();
458 target.clear();
459 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
460 data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
461 data.sort_by(|x,y| x.0.cmp(&y.0));
462 let start = std::time::Instant::now();
463 data.consolidate_into(&mut target);
464 duration += start.elapsed();
465
466 consolidate_updates(&mut data2);
467 assert_eq!(target, data2);
468 }
469 println!("elapsed consolidator {duration:?}");
470 }
471
472 #[test]
473 fn test_consolidator_duration_vec() {
474 let mut data = Vec::with_capacity(LEN);
475 let mut duration = std::time::Duration::default();
476 for _ in 0..REPS {
477 data.clear();
478 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
479 data.sort_by(|x,y| x.0.cmp(&y.0));
480 let start = std::time::Instant::now();
481 consolidate_updates(&mut data);
482 duration += start.elapsed();
483 }
484 println!("elapsed vec {duration:?}");
485 }
486}