use std::rc::Rc;
use timely::dataflow::Scope;
use timely::dataflow::operators::Map;
use timely::progress::frontier::AntichainRef;
use crate::operators::arrange::Arranged;
use crate::trace::{TraceReader, BatchReader, Description};
use crate::trace::cursor::Cursor;
use crate::trace::cursor::IntoOwned;
pub fn freeze<G, T, F>(arranged: &Arranged<G, T>, func: F) -> Arranged<G, TraceFreeze<T, F>>
where
G: Scope<Timestamp=T::Time>,
T: TraceReader+Clone,
F: Fn(T::TimeGat<'_>)->Option<T::Time>+'static,
{
let func1 = Rc::new(func);
let func2 = func1.clone();
Arranged {
stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())),
trace: TraceFreeze::make_from(arranged.trace.clone(), func2),
}
}
pub struct TraceFreeze<Tr, F>
where
Tr: TraceReader,
F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
{
trace: Tr,
func: Rc<F>,
}
impl<Tr,F> Clone for TraceFreeze<Tr, F>
where
Tr: TraceReader+Clone,
F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
{
fn clone(&self) -> Self {
TraceFreeze {
trace: self.trace.clone(),
func: self.func.clone(),
}
}
}
impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
where
Tr: TraceReader,
Tr::Batch: Clone,
F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>+'static,
{
type Key<'a> = Tr::Key<'a>;
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type TimeGat<'a> = Tr::TimeGat<'a>;
type Diff = Tr::Diff;
type DiffGat<'a> = Tr::DiffGat<'a>;
type Batch = BatchFreeze<Tr::Batch, F>;
type Storage = Tr::Storage;
type Cursor = CursorFreeze<Tr::Cursor, F>;
fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
let func = &self.func;
self.trace.map_batches(|batch| {
f(&Self::Batch::make_from(batch.clone(), func.clone()));
})
}
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
let func = &self.func;
self.trace.cursor_through(upper)
.map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
}
}
impl<Tr, F> TraceFreeze<Tr, F>
where
Tr: TraceReader,
Tr::Batch: Clone,
F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
{
pub fn make_from(trace: Tr, func: Rc<F>) -> Self {
Self { trace, func }
}
}
pub struct BatchFreeze<B, F> {
batch: B,
func: Rc<F>,
}
impl<B: Clone, F> Clone for BatchFreeze<B, F> {
fn clone(&self) -> Self {
BatchFreeze {
batch: self.batch.clone(),
func: self.func.clone(),
}
}
}
impl<B, F> BatchReader for BatchFreeze<B, F>
where
B: BatchReader,
F: Fn(B::TimeGat<'_>)->Option<B::Time>,
{
type Key<'a> = B::Key<'a>;
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type TimeGat<'a> = B::TimeGat<'a>;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;
type Cursor = BatchCursorFreeze<B::Cursor, F>;
fn cursor(&self) -> Self::Cursor {
BatchCursorFreeze::new(self.batch.cursor(), self.func.clone())
}
fn len(&self) -> usize { self.batch.len() }
fn description(&self) -> &Description<B::Time> { self.batch.description() }
}
impl<B, F> BatchFreeze<B, F>
where
B: BatchReader,
F: Fn(B::TimeGat<'_>)->Option<B::Time>
{
pub fn make_from(batch: B, func: Rc<F>) -> Self {
Self { batch, func }
}
}
pub struct CursorFreeze<C, F> {
cursor: C,
func: Rc<F>,
}
impl<C, F> CursorFreeze<C, F> {
fn new(cursor: C, func: Rc<F>) -> Self {
Self { cursor, func }
}
}
impl<C, F> Cursor for CursorFreeze<C, F>
where
C: Cursor,
F: Fn(C::TimeGat<'_>)->Option<C::Time>,
{
type Key<'a> = C::Key<'a>;
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type TimeGat<'a> = C::TimeGat<'a>;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;
type Storage = C::Storage;
#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
#[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let func = &self.func;
self.cursor.map_times(storage, |time, diff| {
if let Some(time) = func(time) {
logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&time), diff);
}
})
}
#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}
pub struct BatchCursorFreeze<C, F> {
cursor: C,
func: Rc<F>,
}
impl<C, F> BatchCursorFreeze<C, F> {
fn new(cursor: C, func: Rc<F>) -> Self {
Self { cursor, func }
}
}
impl<C: Cursor, F> Cursor for BatchCursorFreeze<C, F>
where
F: Fn(C::TimeGat<'_>)->Option<C::Time>,
{
type Key<'a> = C::Key<'a>;
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type TimeGat<'a> = C::TimeGat<'a>;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;
type Storage = BatchFreeze<C::Storage, F>;
#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
#[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let func = &self.func;
self.cursor.map_times(&storage.batch, |time, diff| {
if let Some(time) = func(time) {
logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&time), diff);
}
})
}
#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
}