1use std::cmp::Ordering;
14use std::collections::VecDeque;
15use timely::container::{ContainerBuilder, DrainContainer, PushInto};
16use crate::Data;
17use crate::difference::{IsZero, Semigroup};
18
19#[inline]
25pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
26 consolidate_from(vec, 0);
27}
28
29#[inline]
35pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
36 let length = consolidate_slice(&mut vec[offset..]);
37 vec.truncate(offset + length);
38}
39
40#[inline]
42pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
43 if slice.len() > 1 {
44 consolidate_slice_slow(slice)
45 }
46 else {
47 slice.iter().filter(|x| !x.1.is_zero()).count()
48 }
49}
50
51fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
53 slice.sort_by(|x,y| x.0.cmp(&y.0));
56
57 let mut offset = 0;
59 let mut accum = slice[offset].1.clone();
60
61 for index in 1 .. slice.len() {
62 if slice[index].0 == slice[index-1].0 {
63 accum.plus_equals(&slice[index].1);
64 }
65 else {
66 if !accum.is_zero() {
67 slice.swap(offset, index-1);
68 slice[offset].1.clone_from(&accum);
69 offset += 1;
70 }
71 accum.clone_from(&slice[index].1);
72 }
73 }
74 if !accum.is_zero() {
75 slice.swap(offset, slice.len()-1);
76 slice[offset].1 = accum;
77 offset += 1;
78 }
79
80 offset
81}
82
83#[inline]
89pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
90 consolidate_updates_from(vec, 0);
91}
92
93#[inline]
99pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
100 let length = consolidate_updates_slice(&mut vec[offset..]);
101 vec.truncate(offset + length);
102}
103
104#[inline]
106pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
107
108 if slice.len() > 1 {
109 consolidate_updates_slice_slow(slice)
110 }
111 else {
112 slice.iter().filter(|x| !x.2.is_zero()).count()
113 }
114}
115
116fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
118 slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
121
122 let mut offset = 0;
124 let mut accum = slice[offset].2.clone();
125
126 for index in 1 .. slice.len() {
127 if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
128 accum.plus_equals(&slice[index].2);
129 }
130 else {
131 if !accum.is_zero() {
132 slice.swap(offset, index-1);
133 slice[offset].2.clone_from(&accum);
134 offset += 1;
135 }
136 accum.clone_from(&slice[index].2);
137 }
138 }
139 if !accum.is_zero() {
140 slice.swap(offset, slice.len()-1);
141 slice[offset].2 = accum;
142 offset += 1;
143 }
144
145 offset
146}
147
148
149#[derive(Default)]
152pub struct ConsolidatingContainerBuilder<C>{
153 current: C,
154 empty: Vec<C>,
155 outbound: VecDeque<C>,
156}
157
158impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
159where
160 D: Data,
161 T: Data,
162 R: Semigroup+'static,
163{
164 #[cold]
167 fn consolidate_and_flush_through(&mut self, multiple: usize) {
168 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
169 consolidate_updates(&mut self.current);
170 let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
171 while drain.peek().is_some() {
172 let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
173 container.clear();
174 container.extend((&mut drain).take(preferred_capacity));
175 self.outbound.push_back(container);
176 }
177 }
178}
179
180impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
181where
182 D: Data,
183 T: Data,
184 R: Semigroup+'static,
185 Vec<(D, T, R)>: PushInto<P>,
186{
187 #[inline]
191 fn push_into(&mut self, item: P) {
192 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
193 if self.current.capacity() < preferred_capacity * 2 {
194 self.current.reserve(preferred_capacity * 2 - self.current.capacity());
195 }
196 self.current.push_into(item);
197 if self.current.len() == self.current.capacity() {
198 self.consolidate_and_flush_through(preferred_capacity);
200 }
201 }
202}
203
204impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
205where
206 D: Data,
207 T: Data,
208 R: Semigroup+'static,
209{
210 type Container = Vec<(D,T,R)>;
211
212 #[inline]
213 fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
214 if let Some(container) = self.outbound.pop_front() {
215 self.empty.push(container);
216 self.empty.last_mut()
217 } else {
218 None
219 }
220 }
221
222 #[inline]
223 fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
224 if !self.current.is_empty() {
225 self.consolidate_and_flush_through(1);
227 self.empty.truncate(2);
230 }
231 self.extract()
232 }
233}
234
235pub trait ConsolidateLayout: DrainContainer {
242 type Key<'a>: Eq where Self: 'a;
244
245 type Diff<'a>;
247
248 type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
250
251 fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;
253
254 fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
256
257 fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
267
268 fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
270
271 fn len(&self) -> usize;
273
274 fn clear(&mut self);
276
277 fn consolidate_into(&mut self, target: &mut Self) {
279 let mut permutation = Vec::with_capacity(self.len());
281 permutation.extend(self.drain());
282 permutation.sort_by(|a, b| Self::cmp(a, b));
283
284 let mut iter = permutation.drain(..);
286 if let Some(item) = iter.next() {
287
288 let (k, d) = Self::into_parts(item);
289 let mut prev_key = k;
290 let mut prev_diff = Self::owned_diff(d);
291
292 for item in iter {
293 let (next_key, next_diff) = Self::into_parts(item);
294 if next_key == prev_key {
295 prev_diff.plus_equals(&next_diff);
296 }
297 else {
298 if !prev_diff.is_zero() {
299 target.push_with_diff(prev_key, prev_diff);
300 }
301 prev_key = next_key;
302 prev_diff = Self::owned_diff(next_diff);
303 }
304 }
305
306 if !prev_diff.is_zero() {
307 target.push_with_diff(prev_key, prev_diff);
308 }
309 }
310 }
311}
312
313impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
314where
315 D: Ord + Clone + 'static,
316 T: Ord + Clone + 'static,
317 R: Semigroup + Clone + 'static,
318{
319 type Key<'a> = (D, T) where Self: 'a;
320 type Diff<'a> = R where Self: 'a;
321 type DiffOwned = R;
322
323 fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }
324
325 fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
326 ((data, time), diff)
327 }
328
329 fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
330 (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
331 }
332
333 fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
334 self.push((data, time, diff));
335 }
336
337 #[inline] fn len(&self) -> usize { Vec::len(self) }
338 #[inline] fn clear(&mut self) { Vec::clear(self) }
339
340 fn consolidate_into(&mut self, target: &mut Self) {
342 consolidate_updates(self);
343 std::mem::swap(self, target);
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_consolidate() {
353 let test_cases = vec![
354 (
355 vec![("a", -1), ("b", -2), ("a", 1)],
356 vec![("b", -2)],
357 ),
358 (
359 vec![("a", -1), ("b", 0), ("a", 1)],
360 vec![],
361 ),
362 (
363 vec![("a", 0)],
364 vec![],
365 ),
366 (
367 vec![("a", 0), ("b", 0)],
368 vec![],
369 ),
370 (
371 vec![("a", 1), ("b", 1)],
372 vec![("a", 1), ("b", 1)],
373 ),
374 ];
375
376 for (mut input, output) in test_cases {
377 consolidate(&mut input);
378 assert_eq!(input, output);
379 }
380 }
381
382
383 #[test]
384 fn test_consolidate_updates() {
385 let test_cases = vec![
386 (
387 vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
388 vec![("b", 1, -2)],
389 ),
390 (
391 vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
392 vec![],
393 ),
394 (
395 vec![("a", 1, 0)],
396 vec![],
397 ),
398 (
399 vec![("a", 1, 0), ("b", 1, 0)],
400 vec![],
401 ),
402 (
403 vec![("a", 1, 1), ("b", 2, 1)],
404 vec![("a", 1, 1), ("b", 2, 1)],
405 ),
406 ];
407
408 for (mut input, output) in test_cases {
409 consolidate_updates(&mut input);
410 assert_eq!(input, output);
411 }
412 }
413
414 #[test]
415 fn test_consolidating_container_builder() {
416 let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
417 for _ in 0..1024 {
418 ccb.push_into((0, 0, 0));
419 }
420 assert_eq!(ccb.extract(), None);
421 assert_eq!(ccb.finish(), None);
422
423 for i in 0..1024 {
424 ccb.push_into((i, 0, 1));
425 }
426
427 let mut collected = Vec::default();
428 while let Some(container) = ccb.finish() {
429 collected.append(container);
430 }
431 collected.sort();
433 for i in 0..1024 {
434 assert_eq!((i, 0, 1), collected[i]);
435 }
436 }
437
438 #[test]
439 fn test_consolidate_into() {
440 let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
441 let mut target = Vec::default();
442 data.sort();
443 data.consolidate_into(&mut target);
444 assert_eq!(target, [(2, 1, 1)]);
445 }
446
447 #[cfg(not(debug_assertions))]
448 const LEN: usize = 256 << 10;
449 #[cfg(not(debug_assertions))]
450 const REPS: usize = 10 << 10;
451
452 #[cfg(debug_assertions)]
453 const LEN: usize = 256 << 1;
454 #[cfg(debug_assertions)]
455 const REPS: usize = 10 << 1;
456
457 #[test]
458 fn test_consolidator_duration() {
459 let mut data = Vec::with_capacity(LEN);
460 let mut data2 = Vec::with_capacity(LEN);
461 let mut target = Vec::new();
462 let mut duration = std::time::Duration::default();
463 for _ in 0..REPS {
464 data.clear();
465 data2.clear();
466 target.clear();
467 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
468 data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
469 data.sort_by(|x,y| x.0.cmp(&y.0));
470 let start = std::time::Instant::now();
471 data.consolidate_into(&mut target);
472 duration += start.elapsed();
473
474 consolidate_updates(&mut data2);
475 assert_eq!(target, data2);
476 }
477 println!("elapsed consolidator {duration:?}");
478 }
479
480 #[test]
481 fn test_consolidator_duration_vec() {
482 let mut data = Vec::with_capacity(LEN);
483 let mut duration = std::time::Duration::default();
484 for _ in 0..REPS {
485 data.clear();
486 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
487 data.sort_by(|x,y| x.0.cmp(&y.0));
488 let start = std::time::Instant::now();
489 consolidate_updates(&mut data);
490 duration += start.elapsed();
491 }
492 println!("elapsed vec {duration:?}");
493 }
494}