1use std::cmp::Ordering;
14use std::collections::VecDeque;
15use timely::Container;
16use timely::container::{ContainerBuilder, PushInto};
17use crate::{IntoOwned, Data};
18use crate::difference::{IsZero, Semigroup};
19
20pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
26 consolidate_from(vec, 0);
27}
28
29pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
35 let length = consolidate_slice(&mut vec[offset..]);
36 vec.truncate(offset + length);
37}
38
39pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
41
42 if slice.len() > 1 {
43
44 slice.sort_by(|x,y| x.0.cmp(&y.0));
47
48 let mut offset = 0;
50 let mut accum = slice[offset].1.clone();
51
52 for index in 1 .. slice.len() {
53 if slice[index].0 == slice[index-1].0 {
54 accum.plus_equals(&slice[index].1);
55 }
56 else {
57 if !accum.is_zero() {
58 slice.swap(offset, index-1);
59 slice[offset].1.clone_from(&accum);
60 offset += 1;
61 }
62 accum.clone_from(&slice[index].1);
63 }
64 }
65 if !accum.is_zero() {
66 slice.swap(offset, slice.len()-1);
67 slice[offset].1 = accum;
68 offset += 1;
69 }
70
71 offset
72 }
73 else {
74 slice.iter().filter(|x| !x.1.is_zero()).count()
75 }
76}
77
78pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
84 consolidate_updates_from(vec, 0);
85}
86
87pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
93 let length = consolidate_updates_slice(&mut vec[offset..]);
94 vec.truncate(offset + length);
95}
96
97pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
99
100 if slice.len() > 1 {
101
102 slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
105
106 let mut offset = 0;
108 let mut accum = slice[offset].2.clone();
109
110 for index in 1 .. slice.len() {
111 if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
112 accum.plus_equals(&slice[index].2);
113 }
114 else {
115 if !accum.is_zero() {
116 slice.swap(offset, index-1);
117 slice[offset].2.clone_from(&accum);
118 offset += 1;
119 }
120 accum.clone_from(&slice[index].2);
121 }
122 }
123 if !accum.is_zero() {
124 slice.swap(offset, slice.len()-1);
125 slice[offset].2 = accum;
126 offset += 1;
127 }
128
129 offset
130 }
131 else {
132 slice.iter().filter(|x| !x.2.is_zero()).count()
133 }
134}
135
136
137#[derive(Default)]
140pub struct ConsolidatingContainerBuilder<C>{
141 current: C,
142 empty: Vec<C>,
143 outbound: VecDeque<C>,
144}
145
146impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
147where
148 D: Data,
149 T: Data,
150 R: Semigroup+'static,
151{
152 #[cold]
155 fn consolidate_and_flush_through(&mut self, multiple: usize) {
156 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
157 consolidate_updates(&mut self.current);
158 let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
159 while drain.peek().is_some() {
160 let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
161 container.clear();
162 container.extend((&mut drain).take(preferred_capacity));
163 self.outbound.push_back(container);
164 }
165 }
166}
167
168impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
169where
170 D: Data,
171 T: Data,
172 R: Semigroup+'static,
173 Vec<(D, T, R)>: PushInto<P>,
174{
175 #[inline]
179 fn push_into(&mut self, item: P) {
180 let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
181 if self.current.capacity() < preferred_capacity * 2 {
182 self.current.reserve(preferred_capacity * 2 - self.current.capacity());
183 }
184 self.current.push_into(item);
185 if self.current.len() == self.current.capacity() {
186 self.consolidate_and_flush_through(preferred_capacity);
188 }
189 }
190}
191
192impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
193where
194 D: Data,
195 T: Data,
196 R: Semigroup+'static,
197{
198 type Container = Vec<(D,T,R)>;
199
200 #[inline]
201 fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
202 if let Some(container) = self.outbound.pop_front() {
203 self.empty.push(container);
204 self.empty.last_mut()
205 } else {
206 None
207 }
208 }
209
210 #[inline]
211 fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
212 if !self.current.is_empty() {
213 self.consolidate_and_flush_through(1);
215 self.empty.truncate(2);
218 }
219 self.extract()
220 }
221}
222
223pub trait ConsolidateLayout: Container {
230 type Key<'a>: Eq where Self: 'a;
232
233 type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;
235
236 type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
238
239 fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
241
242 fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
252
253 fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
255
256 fn consolidate_into(&mut self, target: &mut Self) {
258 let mut permutation = Vec::with_capacity(self.len());
260 permutation.extend(self.drain());
261 permutation.sort_by(|a, b| Self::cmp(a, b));
262
263 let mut iter = permutation.drain(..);
265 if let Some(item) = iter.next() {
266
267 let (k, d) = Self::into_parts(item);
268 let mut prev_key = k;
269 let mut prev_diff = d.into_owned();
270
271 for item in iter {
272 let (next_key, next_diff) = Self::into_parts(item);
273 if next_key == prev_key {
274 prev_diff.plus_equals(&next_diff);
275 }
276 else {
277 if !prev_diff.is_zero() {
278 target.push_with_diff(prev_key, prev_diff);
279 }
280 prev_key = next_key;
281 prev_diff = next_diff.into_owned();
282 }
283 }
284
285 if !prev_diff.is_zero() {
286 target.push_with_diff(prev_key, prev_diff);
287 }
288 }
289 }
290}
291
292impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
293where
294 D: Ord + Clone + 'static,
295 T: Ord + Clone + 'static,
296 for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static,
297{
298 type Key<'a> = (D, T) where Self: 'a;
299 type Diff<'a> = R where Self: 'a;
300 type DiffOwned = R;
301
302 fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
303 ((data, time), diff)
304 }
305
306 fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
307 (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
308 }
309
310 fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
311 self.push((data, time, diff));
312 }
313
314 fn consolidate_into(&mut self, target: &mut Self) {
316 consolidate_updates(self);
317 std::mem::swap(self, target);
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn test_consolidate() {
327 let test_cases = vec![
328 (
329 vec![("a", -1), ("b", -2), ("a", 1)],
330 vec![("b", -2)],
331 ),
332 (
333 vec![("a", -1), ("b", 0), ("a", 1)],
334 vec![],
335 ),
336 (
337 vec![("a", 0)],
338 vec![],
339 ),
340 (
341 vec![("a", 0), ("b", 0)],
342 vec![],
343 ),
344 (
345 vec![("a", 1), ("b", 1)],
346 vec![("a", 1), ("b", 1)],
347 ),
348 ];
349
350 for (mut input, output) in test_cases {
351 consolidate(&mut input);
352 assert_eq!(input, output);
353 }
354 }
355
356
357 #[test]
358 fn test_consolidate_updates() {
359 let test_cases = vec![
360 (
361 vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
362 vec![("b", 1, -2)],
363 ),
364 (
365 vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
366 vec![],
367 ),
368 (
369 vec![("a", 1, 0)],
370 vec![],
371 ),
372 (
373 vec![("a", 1, 0), ("b", 1, 0)],
374 vec![],
375 ),
376 (
377 vec![("a", 1, 1), ("b", 2, 1)],
378 vec![("a", 1, 1), ("b", 2, 1)],
379 ),
380 ];
381
382 for (mut input, output) in test_cases {
383 consolidate_updates(&mut input);
384 assert_eq!(input, output);
385 }
386 }
387
388 #[test]
389 fn test_consolidating_container_builder() {
390 let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
391 for _ in 0..1024 {
392 ccb.push_into((0, 0, 0));
393 }
394 assert_eq!(ccb.extract(), None);
395 assert_eq!(ccb.finish(), None);
396
397 for i in 0..1024 {
398 ccb.push_into((i, 0, 1));
399 }
400
401 let mut collected = Vec::default();
402 while let Some(container) = ccb.finish() {
403 collected.append(container);
404 }
405 collected.sort();
407 for i in 0..1024 {
408 assert_eq!((i, 0, 1), collected[i]);
409 }
410 }
411
412 #[test]
413 fn test_consolidate_into() {
414 let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
415 let mut target = Vec::default();
416 data.sort();
417 data.consolidate_into(&mut target);
418 assert_eq!(target, [(2, 1, 1)]);
419 }
420
421 #[cfg(not(debug_assertions))]
422 const LEN: usize = 256 << 10;
423 #[cfg(not(debug_assertions))]
424 const REPS: usize = 10 << 10;
425
426 #[cfg(debug_assertions)]
427 const LEN: usize = 256 << 1;
428 #[cfg(debug_assertions)]
429 const REPS: usize = 10 << 1;
430
431 #[test]
432 fn test_consolidator_duration() {
433 let mut data = Vec::with_capacity(LEN);
434 let mut data2 = Vec::with_capacity(LEN);
435 let mut target = Vec::new();
436 let mut duration = std::time::Duration::default();
437 for _ in 0..REPS {
438 data.clear();
439 data2.clear();
440 target.clear();
441 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
442 data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
443 data.sort_by(|x,y| x.0.cmp(&y.0));
444 let start = std::time::Instant::now();
445 data.consolidate_into(&mut target);
446 duration += start.elapsed();
447
448 consolidate_updates(&mut data2);
449 assert_eq!(target, data2);
450 }
451 println!("elapsed consolidator {duration:?}");
452 }
453
454 #[test]
455 fn test_consolidator_duration_vec() {
456 let mut data = Vec::with_capacity(LEN);
457 let mut duration = std::time::Duration::default();
458 for _ in 0..REPS {
459 data.clear();
460 data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
461 data.sort_by(|x,y| x.0.cmp(&y.0));
462 let start = std::time::Instant::now();
463 consolidate_updates(&mut data);
464 duration += start.elapsed();
465 }
466 println!("elapsed vec {duration:?}");
467 }
468}