use std::rc::Rc;
use std::cell::RefCell;
use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};
use crate::trace::TraceReader;
pub struct TraceBox<Tr: TraceReader> {
pub logical_compaction: MutableAntichain<Tr::Time>,
pub physical_compaction: MutableAntichain<Tr::Time>,
pub trace: Tr,
}
impl<Tr: TraceReader> TraceBox<Tr> {
pub fn new(mut trace: Tr) -> Self {
let mut logical_compaction = MutableAntichain::new();
logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
let mut physical_compaction = MutableAntichain::new();
physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
TraceBox {
logical_compaction,
physical_compaction,
trace,
}
}
#[inline]
pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_logical_compaction(self.logical_compaction.frontier());
}
#[inline]
pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.set_physical_compaction(self.physical_compaction.frontier());
}
}
pub struct TraceRc<Tr: TraceReader> {
logical_compaction: Antichain<Tr::Time>,
physical_compaction: Antichain<Tr::Time>,
pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
}
impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
type Key<'a> = Tr::Key<'a>;
type Val<'a> = Tr::Val<'a>;
type Time = Tr::Time;
type TimeGat<'a> = Tr::TimeGat<'a>;
type Diff = Tr::Diff;
type DiffGat<'a> = Tr::DiffGat<'a>;
type Batch = Tr::Batch;
type Storage = Tr::Storage;
type Cursor = Tr::Cursor;
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
self.logical_compaction = frontier.to_owned();
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
self.physical_compaction = frontier.to_owned();
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
}
}
impl<Tr: TraceReader> TraceRc<Tr> {
pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
let handle = TraceRc {
logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
wrapper: wrapped.clone(),
};
(handle, wrapped)
}
}
impl<Tr: TraceReader> Clone for TraceRc<Tr> {
fn clone(&self) -> Self {
self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
TraceRc {
logical_compaction: self.logical_compaction.clone(),
physical_compaction: self.physical_compaction.clone(),
wrapper: self.wrapper.clone(),
}
}
}
impl<Tr: TraceReader> Drop for TraceRc<Tr> {
fn drop(&mut self) {
self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
self.logical_compaction = Antichain::new();
self.physical_compaction = Antichain::new();
}
}