use std::rc::Rc;
use timely::container::columnation::{TimelyStack};
use timely::container::flatcontainer::{FlatStack, RegionPreference};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker};
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger;
use crate::trace::rc_blanket_impls::RcBuilder;
use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
pub use self::val_batch::{OrdValBatch, OrdValBuilder};
pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>>;
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>>;
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
pub type FlatValSpine<L> = Spine<Rc<OrdValBatch<L>>>;
pub type FlatValBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>>;
pub type FlatValBuilder<L, R> = RcBuilder<OrdValBuilder<L, FlatStack<R>>>;
pub type FlatValSpineDefault<K, V, T, R> = FlatValSpine<
FlatLayout<<K as RegionPreference>::Region, <V as RegionPreference>::Region, <T as RegionPreference>::Region, <R as RegionPreference>::Region>,
>;
pub type FlatValBatcherDefault<K, V, T, R, C> = FlatValBatcher<TupleABCRegion<TupleABRegion<<K as RegionPreference>::Region, <V as RegionPreference>::Region>, <T as RegionPreference>::Region, <R as RegionPreference>::Region>, C>;
pub type FlatValBuilderDefault<K, V, T, R> = FlatValBuilder<FlatLayout<<K as RegionPreference>::Region, <V as RegionPreference>::Region, <T as RegionPreference>::Region, <R as RegionPreference>::Region>, TupleABCRegion<TupleABRegion<<K as RegionPreference>::Region, <V as RegionPreference>::Region>, <T as RegionPreference>::Region, <R as RegionPreference>::Region>>;
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>>;
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>>;
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
pub type FlatKeySpine<L> = Spine<Rc<OrdKeyBatch<L>>>;
pub type FlatKeyBatcher<R, C> = MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>>;
pub type FlatKeyBuilder<L, R> = RcBuilder<OrdKeyBuilder<L, FlatStack<R>>>;
pub type FlatKeySpineDefault<K,T,R> = FlatKeySpine<
FlatLayout<<K as RegionPreference>::Region, <() as RegionPreference>::Region, <T as RegionPreference>::Region, <R as RegionPreference>::Region>,
>;
pub type FlatKeyBatcherDefault<K, T, R, C> = FlatValBatcher<TupleABCRegion<TupleABRegion<<K as RegionPreference>::Region, <() as RegionPreference>::Region>, <T as RegionPreference>::Region, <R as RegionPreference>::Region>, C>;
pub type FlatKeyBuilderDefault<K, T, R> = FlatKeyBuilder<FlatLayout<<K as RegionPreference>::Region, <() as RegionPreference>::Region, <T as RegionPreference>::Region, <R as RegionPreference>::Region>, TupleABCRegion<TupleABRegion<<K as RegionPreference>::Region, <() as RegionPreference>::Region>, <T as RegionPreference>::Region, <R as RegionPreference>::Region>>;
pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationMerger<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>;
pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
mod val_batch {
use std::marker::PhantomData;
use serde::{Deserialize, Serialize};
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};
use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::cursor::IntoOwned;
use super::{Layout, Update};
#[derive(Debug, Serialize, Deserialize)]
pub struct OrdValStorage<L: Layout> {
pub keys: L::KeyContainer,
pub keys_offs: L::OffsetContainer,
pub vals: L::ValContainer,
pub vals_offs: L::OffsetContainer,
pub times: L::TimeContainer,
pub diffs: L::DiffContainer,
}
impl<L: Layout> OrdValStorage<L> {
fn values_for_key(&self, index: usize) -> (usize, usize) {
(self.keys_offs.index(index), self.keys_offs.index(index+1))
}
fn updates_for_value(&self, index: usize) -> (usize, usize) {
let mut lower = self.vals_offs.index(index);
let upper = self.vals_offs.index(index+1);
if lower == upper {
assert!(lower > 0);
lower -= 1;
}
(lower, upper)
}
}
#[derive(Serialize, Deserialize)]
#[serde(bound = "
L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
L::ValContainer: Serialize + for<'a> Deserialize<'a>,
L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
")]
pub struct OrdValBatch<L: Layout> {
pub storage: OrdValStorage<L>,
pub description: Description<<L::Target as Update>::Time>,
pub updates: usize,
}
impl<L: Layout> BatchReader for OrdValBatch<L> {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
type Cursor = OrdValCursor<L>;
fn cursor(&self) -> Self::Cursor {
OrdValCursor {
key_cursor: 0,
val_cursor: 0,
phantom: PhantomData,
}
}
fn len(&self) -> usize {
self.updates
}
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
}
impl<L: Layout> Batch for OrdValBatch<L> {
type Merger = OrdValMerger<L>;
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
use timely::progress::Timestamp;
Self {
storage: OrdValStorage {
keys: L::KeyContainer::with_capacity(0),
keys_offs: L::OffsetContainer::with_capacity(0),
vals: L::ValContainer::with_capacity(0),
vals_offs: L::OffsetContainer::with_capacity(0),
times: L::TimeContainer::with_capacity(0),
diffs: L::DiffContainer::with_capacity(0),
},
description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
updates: 0,
}
}
}
pub struct OrdValMerger<L: Layout> {
key_cursor1: usize,
key_cursor2: usize,
result: OrdValStorage<L>,
description: Description<<L::Target as Update>::Time>,
update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
singletons: usize,
}
impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
where
OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
{
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
assert!(batch1.upper() == batch2.lower());
use crate::lattice::Lattice;
let mut since = batch1.description().since().join(batch2.description().since());
since = since.join(&compaction_frontier.to_owned());
let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
let batch1 = &batch1.storage;
let batch2 = &batch2.storage;
let mut storage = OrdValStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
};
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);
let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
vals_offs.push(0);
OrdValMerger {
key_cursor1: 0,
key_cursor2: 0,
result: storage,
description,
update_stash: Vec::new(),
singletons: 0,
}
}
fn done(self) -> OrdValBatch<L> {
OrdValBatch {
updates: self.result.times.len() + self.singletons,
storage: self.result,
description: self.description,
}
}
fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
let starting_updates = self.result.times.len();
let mut effort = 0isize;
while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.merge_key(&source1.storage, &source2.storage);
effort = (self.result.times.len() - starting_updates) as isize;
}
while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
self.copy_key(&source1.storage, self.key_cursor1);
self.key_cursor1 += 1;
effort = (self.result.times.len() - starting_updates) as isize;
}
while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.copy_key(&source2.storage, self.key_cursor2);
self.key_cursor2 += 1;
effort = (self.result.times.len() - starting_updates) as isize;
}
*fuel -= effort;
}
}
impl<L: Layout> OrdValMerger<L> {
fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
let init_vals = self.result.vals.len();
let (mut lower, upper) = source.values_for_key(cursor);
while lower < upper {
self.stash_updates_for_val(source, lower);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.push(off);
self.result.vals.push(source.vals.index(lower));
}
lower += 1;
}
if self.result.vals.len() > init_vals {
self.result.keys.push(source.keys.index(cursor));
self.result.keys_offs.push(self.result.vals.len());
}
}
fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
use ::std::cmp::Ordering;
match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
Ordering::Less => {
self.copy_key(source1, self.key_cursor1);
self.key_cursor1 += 1;
},
Ordering::Equal => {
let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
self.result.keys.push(source1.keys.index(self.key_cursor1));
self.result.keys_offs.push(off);
}
self.key_cursor1 += 1;
self.key_cursor2 += 1;
},
Ordering::Greater => {
self.copy_key(source2, self.key_cursor2);
self.key_cursor2 += 1;
},
}
}
fn merge_vals(
&mut self,
(source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
(source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
) -> Option<usize> {
let init_vals = self.result.vals.len();
while lower1 < upper1 && lower2 < upper2 {
use ::std::cmp::Ordering;
match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
Ordering::Less => {
self.stash_updates_for_val(source1, lower1);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
},
Ordering::Equal => {
self.stash_updates_for_val(source1, lower1);
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
lower2 += 1;
},
Ordering::Greater => {
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.push(off);
self.result.vals.push(source2.vals.index(lower2));
}
lower2 += 1;
},
}
}
while lower1 < upper1 {
self.stash_updates_for_val(source1, lower1);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
}
while lower2 < upper2 {
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.push(off);
self.result.vals.push(source2.vals.index(lower2));
}
lower2 += 1;
}
if self.result.vals.len() > init_vals {
Some(self.result.vals.len())
} else {
None
}
}
fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
let (lower, upper) = source.updates_for_value(index);
for i in lower .. upper {
let time = source.times.index(i);
let diff = source.diffs.index(i);
use crate::lattice::Lattice;
let mut new_time: <L::Target as Update>::Time = time.into_owned();
new_time.advance_by(self.description.since().borrow());
self.update_stash.push((new_time, diff.into_owned()));
}
}
fn consolidate_updates(&mut self) -> Option<usize> {
use crate::consolidation;
consolidation::consolidate(&mut self.update_stash);
if !self.update_stash.is_empty() {
let time_diff = self.result.times.last().zip(self.result.diffs.last());
let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
t1.eq(&t2) && d1.eq(&d2)
});
if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
self.update_stash.clear();
self.singletons += 1;
}
else {
for (time, diff) in self.update_stash.drain(..) {
self.result.times.push(time);
self.result.diffs.push(diff);
}
}
Some(self.result.times.len())
} else {
None
}
}
}
pub struct OrdValCursor<L: Layout> {
key_cursor: usize,
val_cursor: usize,
phantom: PhantomData<L>,
}
impl<L: Layout> Cursor for OrdValCursor<L> {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
type Storage = OrdValBatch<L>;
fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index);
let diff = storage.storage.diffs.index(index);
logic(time, diff);
}
}
fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
fn step_key(&mut self, storage: &OrdValBatch<L>){
self.key_cursor += 1;
if self.key_valid(storage) {
self.rewind_vals(storage);
}
else {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
if self.key_valid(storage) {
self.rewind_vals(storage);
}
}
fn step_val(&mut self, storage: &OrdValBatch<L>) {
self.val_cursor += 1;
if !self.val_valid(storage) {
self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
}
}
fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
}
fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
self.key_cursor = 0;
if self.key_valid(storage) {
self.rewind_vals(storage)
}
}
fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
}
}
pub struct OrdValBuilder<L: Layout, CI> {
result: OrdValStorage<L>,
singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
singletons: usize,
_marker: PhantomData<CI>,
}
impl<L: Layout, CI> OrdValBuilder<L, CI> {
fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
{
assert!(self.singleton.is_none());
self.singleton = Some((time, diff));
}
else {
if let Some((time, diff)) = self.singleton.take() {
self.result.times.push(time);
self.result.diffs.push(diff);
}
self.result.times.push(time);
self.result.diffs.push(diff);
}
}
}
impl<L, CI> Builder for OrdValBuilder<L, CI>
where
L: Layout,
CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
{
type Input = CI;
type Time = <L::Target as Update>::Time;
type Output = OrdValBatch<L>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
Self {
result: OrdValStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
vals: L::ValContainer::with_capacity(vals),
vals_offs: L::OffsetContainer::with_capacity(vals + 1),
times: L::TimeContainer::with_capacity(upds),
diffs: L::DiffContainer::with_capacity(upds),
},
singleton: None,
singletons: 0,
_marker: PhantomData,
}
}
#[inline]
fn push(&mut self, chunk: &mut Self::Input) {
for item in chunk.drain() {
let (key, val, time, diff) = CI::into_parts(item);
if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
self.push_update(time, diff);
} else {
self.result.vals_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
self.result.vals.push(val);
}
} else {
self.result.vals_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len());
self.push_update(time, diff);
self.result.vals.push(val);
self.result.keys.push(key);
}
}
}
#[inline(never)]
fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
self.result.vals_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len());
OrdValBatch {
updates: self.result.times.len() + self.singletons,
storage: self.result,
description,
}
}
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}
builder.done(description)
}
}
}
mod key_batch {
use std::marker::PhantomData;
use serde::{Deserialize, Serialize};
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};
use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::cursor::IntoOwned;
use super::{Layout, Update};
#[derive(Debug, Serialize, Deserialize)]
pub struct OrdKeyStorage<L: Layout> {
pub keys: L::KeyContainer,
pub keys_offs: L::OffsetContainer,
pub times: L::TimeContainer,
pub diffs: L::DiffContainer,
}
impl<L: Layout> OrdKeyStorage<L> {
fn updates_for_key(&self, index: usize) -> (usize, usize) {
let mut lower = self.keys_offs.index(index);
let upper = self.keys_offs.index(index+1);
if lower == upper {
assert!(lower > 0);
lower -= 1;
}
(lower, upper)
}
}
#[derive(Serialize, Deserialize)]
#[serde(bound = "
L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
")]
pub struct OrdKeyBatch<L: Layout> {
pub storage: OrdKeyStorage<L>,
pub description: Description<<L::Target as Update>::Time>,
pub updates: usize,
}
impl<L: Layout> BatchReader for OrdKeyBatch<L> {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = &'a ();
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
type Cursor = OrdKeyCursor<L>;
fn cursor(&self) -> Self::Cursor {
OrdKeyCursor {
key_cursor: 0,
val_stepped: false,
phantom: std::marker::PhantomData,
}
}
fn len(&self) -> usize {
self.updates
}
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
}
impl<L: Layout> Batch for OrdKeyBatch<L> {
type Merger = OrdKeyMerger<L>;
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdKeyMerger::new(self, other, compaction_frontier)
}
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
use timely::progress::Timestamp;
Self {
storage: OrdKeyStorage {
keys: L::KeyContainer::with_capacity(0),
keys_offs: L::OffsetContainer::with_capacity(0),
times: L::TimeContainer::with_capacity(0),
diffs: L::DiffContainer::with_capacity(0),
},
description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
updates: 0,
}
}
}
pub struct OrdKeyMerger<L: Layout> {
key_cursor1: usize,
key_cursor2: usize,
result: OrdKeyStorage<L>,
description: Description<<L::Target as Update>::Time>,
update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
singletons: usize,
}
impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
where
OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
{
fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
assert!(batch1.upper() == batch2.lower());
use crate::lattice::Lattice;
let mut since = batch1.description().since().join(batch2.description().since());
since = since.join(&compaction_frontier.to_owned());
let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
let batch1 = &batch1.storage;
let batch2 = &batch2.storage;
let mut storage = OrdKeyStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
};
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.push(0);
OrdKeyMerger {
key_cursor1: 0,
key_cursor2: 0,
result: storage,
description,
update_stash: Vec::new(),
singletons: 0,
}
}
fn done(self) -> OrdKeyBatch<L> {
OrdKeyBatch {
updates: self.result.times.len() + self.singletons,
storage: self.result,
description: self.description,
}
}
fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
let starting_updates = self.result.times.len();
let mut effort = 0isize;
while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.merge_key(&source1.storage, &source2.storage);
effort = (self.result.times.len() - starting_updates) as isize;
}
while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
self.copy_key(&source1.storage, self.key_cursor1);
self.key_cursor1 += 1;
effort = (self.result.times.len() - starting_updates) as isize;
}
while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.copy_key(&source2.storage, self.key_cursor2);
self.key_cursor2 += 1;
effort = (self.result.times.len() - starting_updates) as isize;
}
*fuel -= effort;
}
}
impl<L: Layout> OrdKeyMerger<L> {
fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
self.stash_updates_for_key(source, cursor);
if let Some(off) = self.consolidate_updates() {
self.result.keys_offs.push(off);
self.result.keys.push(source.keys.index(cursor));
}
}
fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
use ::std::cmp::Ordering;
match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
Ordering::Less => {
self.copy_key(source1, self.key_cursor1);
self.key_cursor1 += 1;
},
Ordering::Equal => {
self.stash_updates_for_key(source1, self.key_cursor1);
self.stash_updates_for_key(source2, self.key_cursor2);
if let Some(off) = self.consolidate_updates() {
self.result.keys_offs.push(off);
self.result.keys.push(source1.keys.index(self.key_cursor1));
}
self.key_cursor1 += 1;
self.key_cursor2 += 1;
},
Ordering::Greater => {
self.copy_key(source2, self.key_cursor2);
self.key_cursor2 += 1;
},
}
}
fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
let (lower, upper) = source.updates_for_key(index);
for i in lower .. upper {
let time = source.times.index(i);
let diff = source.diffs.index(i);
use crate::lattice::Lattice;
let mut new_time = time.into_owned();
new_time.advance_by(self.description.since().borrow());
self.update_stash.push((new_time, diff.into_owned()));
}
}
fn consolidate_updates(&mut self) -> Option<usize> {
use crate::consolidation;
consolidation::consolidate(&mut self.update_stash);
if !self.update_stash.is_empty() {
let time_diff = self.result.times.last().zip(self.result.diffs.last());
let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
t1.eq(&t2) && d1.eq(&d2)
});
if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
self.update_stash.clear();
self.singletons += 1;
}
else {
for (time, diff) in self.update_stash.drain(..) {
self.result.times.push(time);
self.result.diffs.push(diff);
}
}
Some(self.result.times.len())
} else {
None
}
}
}
pub struct OrdKeyCursor<L: Layout> {
key_cursor: usize,
val_stepped: bool,
phantom: PhantomData<L>,
}
impl<L: Layout> Cursor for OrdKeyCursor<L> {
type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
type Val<'a> = &'a ();
type Time = <L::Target as Update>::Time;
type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
type Diff = <L::Target as Update>::Diff;
type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
type Storage = OrdKeyBatch<L>;
fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
for index in lower .. upper {
let time = storage.storage.times.index(index);
let diff = storage.storage.diffs.index(index);
logic(time, diff);
}
}
fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
fn step_key(&mut self, storage: &Self::Storage){
self.key_cursor += 1;
if self.key_valid(storage) {
self.rewind_vals(storage);
}
else {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
if self.key_valid(storage) {
self.rewind_vals(storage);
}
}
fn step_val(&mut self, _storage: &Self::Storage) {
self.val_stepped = true;
}
fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.key_cursor = 0;
if self.key_valid(storage) {
self.rewind_vals(storage)
}
}
fn rewind_vals(&mut self, _storage: &Self::Storage) {
self.val_stepped = false;
}
}
pub struct OrdKeyBuilder<L: Layout, CI> {
result: OrdKeyStorage<L>,
singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
singletons: usize,
_marker: PhantomData<CI>,
}
impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
assert!(self.singleton.is_none());
self.singleton = Some((time, diff));
}
else {
if let Some((time, diff)) = self.singleton.take() {
self.result.times.push(time);
self.result.diffs.push(diff);
}
self.result.times.push(time);
self.result.diffs.push(diff);
}
}
}
impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
where
L: Layout,
CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
{
type Input = CI;
type Time = <L::Target as Update>::Time;
type Output = OrdKeyBatch<L>;
fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
Self {
result: OrdKeyStorage {
keys: L::KeyContainer::with_capacity(keys),
keys_offs: L::OffsetContainer::with_capacity(keys + 1),
times: L::TimeContainer::with_capacity(upds),
diffs: L::DiffContainer::with_capacity(upds),
},
singleton: None,
singletons: 0,
_marker: PhantomData,
}
}
#[inline]
fn push(&mut self, chunk: &mut Self::Input) {
for item in chunk.drain() {
let (key, _val, time, diff) = CI::into_parts(item);
if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
self.push_update(time, diff);
} else {
self.result.keys_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
self.result.keys.push(key);
}
}
}
#[inline(never)]
fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
self.result.keys_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
OrdKeyBatch {
updates: self.result.times.len() + self.singletons,
storage: self.result,
description,
}
}
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}
builder.done(description)
}
}
}