use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};
use timely::container::flatcontainer::{FlatStack, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::Data;
use crate::difference::{IsZero, Semigroup};
use crate::trace::cursor::IntoOwned;
pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
consolidate_from(vec, 0);
}
pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
let length = consolidate_slice(&mut vec[offset..]);
vec.truncate(offset + length);
}
pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
if slice.len() > 1 {
slice.sort_by(|x,y| x.0.cmp(&y.0));
let mut offset = 0;
let mut accum = slice[offset].1.clone();
for index in 1 .. slice.len() {
if slice[index].0 == slice[index-1].0 {
accum.plus_equals(&slice[index].1);
}
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].1.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].1);
}
}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].1 = accum;
offset += 1;
}
offset
}
else {
slice.iter().filter(|x| !x.1.is_zero()).count()
}
}
pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
consolidate_updates_from(vec, 0);
}
pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
let length = consolidate_updates_slice(&mut vec[offset..]);
vec.truncate(offset + length);
}
pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
if slice.len() > 1 {
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
let mut offset = 0;
let mut accum = slice[offset].2.clone();
for index in 1 .. slice.len() {
if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
accum.plus_equals(&slice[index].2);
}
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].2.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].2);
}
}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].2 = accum;
offset += 1;
}
offset
}
else {
slice.iter().filter(|x| !x.2.is_zero()).count()
}
}
#[derive(Default)]
pub struct ConsolidatingContainerBuilder<C>{
current: C,
empty: Vec<C>,
outbound: VecDeque<C>,
}
impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
{
#[cold]
fn consolidate_and_flush_through(&mut self, multiple: usize) {
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
consolidate_updates(&mut self.current);
let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
while drain.peek().is_some() {
let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
container.clear();
container.extend((&mut drain).take(preferred_capacity));
self.outbound.push_back(container);
}
}
}
impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
Vec<(D, T, R)>: PushInto<P>,
{
#[inline]
fn push_into(&mut self, item: P) {
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
if self.current.capacity() < preferred_capacity * 2 {
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
}
self.current.push_into(item);
if self.current.len() == self.current.capacity() {
self.consolidate_and_flush_through(preferred_capacity);
}
}
}
impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
where
D: Data,
T: Data,
R: Semigroup+'static,
{
type Container = Vec<(D,T,R)>;
#[inline]
fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
if let Some(container) = self.outbound.pop_front() {
self.empty.push(container);
self.empty.last_mut()
} else {
None
}
}
#[inline]
fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
if !self.current.is_empty() {
self.consolidate_and_flush_through(1);
self.empty.truncate(2);
}
self.extract()
}
}
pub trait ConsolidateLayout: Container {
type Key<'a>: Eq where Self: 'a;
type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
fn consolidate_into(&mut self, target: &mut Self) {
let mut permutation = Vec::with_capacity(self.len());
permutation.extend(self.drain());
permutation.sort_by(|a, b| Self::cmp(a, b));
let mut iter = permutation.drain(..);
if let Some(item) = iter.next() {
let (k, d) = Self::into_parts(item);
let mut prev_key = k;
let mut prev_diff = d.into_owned();
for item in iter {
let (next_key, next_diff) = Self::into_parts(item);
if next_key == prev_key {
prev_diff.plus_equals(&next_diff);
}
else {
if !prev_diff.is_zero() {
target.push_with_diff(prev_key, prev_diff);
}
prev_key = next_key;
prev_diff = next_diff.into_owned();
}
}
if !prev_diff.is_zero() {
target.push_with_diff(prev_key, prev_diff);
}
}
}
}
impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
where
D: Ord + Clone + 'static,
T: Ord + Clone + 'static,
for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static,
{
type Key<'a> = (D, T) where Self: 'a;
type Diff<'a> = R where Self: 'a;
type DiffOwned = R;
fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((data, time), diff)
}
fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
(&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
}
fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.push((data, time, diff));
}
fn consolidate_into(&mut self, target: &mut Self) {
consolidate_updates(self);
std::mem::swap(self, target);
}
}
impl<K, V, T, R> ConsolidateLayout for FlatStack<TupleABCRegion<TupleABRegion<K, V>, T, R>>
where
for<'a> K: Region + Push<<K as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> K::ReadItem<'a>: Ord + Copy,
for<'a> V: Region + Push<<V as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> V::ReadItem<'a>: Ord + Copy,
for<'a> T: Region + Push<<T as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> T::ReadItem<'a>: Ord + Copy,
R: Region + Push<<R as Region>::Owned> + Clone + 'static,
for<'a> R::Owned: Semigroup<R::ReadItem<'a>>,
{
type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;
fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((key, val, time), diff)
}
fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering {
(K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2)))
}
fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.copy(((key, value), time, diff));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_consolidate() {
let test_cases = vec![
(
vec![("a", -1), ("b", -2), ("a", 1)],
vec![("b", -2)],
),
(
vec![("a", -1), ("b", 0), ("a", 1)],
vec![],
),
(
vec![("a", 0)],
vec![],
),
(
vec![("a", 0), ("b", 0)],
vec![],
),
(
vec![("a", 1), ("b", 1)],
vec![("a", 1), ("b", 1)],
),
];
for (mut input, output) in test_cases {
consolidate(&mut input);
assert_eq!(input, output);
}
}
#[test]
fn test_consolidate_updates() {
let test_cases = vec![
(
vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
vec![("b", 1, -2)],
),
(
vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
vec![],
),
(
vec![("a", 1, 0)],
vec![],
),
(
vec![("a", 1, 0), ("b", 1, 0)],
vec![],
),
(
vec![("a", 1, 1), ("b", 2, 1)],
vec![("a", 1, 1), ("b", 2, 1)],
),
];
for (mut input, output) in test_cases {
consolidate_updates(&mut input);
assert_eq!(input, output);
}
}
#[test]
fn test_consolidating_container_builder() {
let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
for _ in 0..1024 {
ccb.push_into((0, 0, 0));
}
assert_eq!(ccb.extract(), None);
assert_eq!(ccb.finish(), None);
for i in 0..1024 {
ccb.push_into((i, 0, 1));
}
let mut collected = Vec::default();
while let Some(container) = ccb.finish() {
collected.append(container);
}
collected.sort();
for i in 0..1024 {
assert_eq!((i, 0, 1), collected[i]);
}
}
#[test]
fn test_consolidate_into() {
let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
let mut target = Vec::default();
data.sort();
data.consolidate_into(&mut target);
assert_eq!(target, [(2, 1, 1)]);
}
#[cfg(not(debug_assertions))]
const LEN: usize = 256 << 10;
#[cfg(not(debug_assertions))]
const REPS: usize = 10 << 10;
#[cfg(debug_assertions)]
const LEN: usize = 256 << 1;
#[cfg(debug_assertions)]
const REPS: usize = 10 << 1;
#[test]
fn test_consolidator_duration() {
let mut data = Vec::with_capacity(LEN);
let mut data2 = Vec::with_capacity(LEN);
let mut target = Vec::new();
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data2.clear();
target.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
data.consolidate_into(&mut target);
duration += start.elapsed();
consolidate_updates(&mut data2);
assert_eq!(target, data2);
}
println!("elapsed consolidator {duration:?}");
}
#[test]
fn test_consolidator_duration_vec() {
let mut data = Vec::with_capacity(LEN);
let mut duration = std::time::Duration::default();
for _ in 0..REPS {
data.clear();
data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
data.sort_by(|x,y| x.0.cmp(&y.0));
let start = std::time::Instant::now();
consolidate_updates(&mut data);
duration += start.elapsed();
}
println!("elapsed vec {duration:?}");
}
}