use std::rc::{Rc, Weak};
use std::cell::RefCell;
use timely::progress::{Antichain, Timestamp};
use crate::trace::{Trace, Batch, BatchReader};
use crate::trace::wrappers::rc::TraceBox;
use super::TraceAgentQueueWriter;
use super::TraceReplayInstruction;
pub struct TraceWriter<Tr>
where
Tr: Trace,
Tr::Batch: Batch,
{
upper: Antichain<Tr::Time>,
trace: Weak<RefCell<TraceBox<Tr>>>,
queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
}
impl<Tr> TraceWriter<Tr>
where
Tr: Trace,
Tr::Batch: Batch,
{
pub fn new(
upper: Vec<Tr::Time>,
trace: Weak<RefCell<TraceBox<Tr>>>,
queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
) -> Self
{
let mut temp = Antichain::new();
temp.extend(upper);
Self { upper: temp, trace, queues }
}
pub fn exert(&mut self) {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.exert();
}
}
pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
if !(&self.upper == batch.lower()) {
println!("{:?} vs {:?}", self.upper, batch.lower());
}
assert!(&self.upper == batch.lower());
assert!(batch.lower() != batch.upper());
self.upper.clone_from(batch.upper());
let mut borrow = self.queues.borrow_mut();
for queue in borrow.iter_mut() {
if let Some(pair) = queue.upgrade() {
pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().clone()));
pair.0.activate();
}
}
borrow.retain(|w| w.upgrade().is_some());
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.insert(batch);
}
}
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
if self.upper != upper {
use crate::trace::Builder;
let builder = Tr::Builder::new();
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
self.insert(batch, None);
}
}
}
impl<Tr> Drop for TraceWriter<Tr>
where
Tr: Trace,
Tr::Batch: Batch,
{
fn drop(&mut self) {
self.seal(Antichain::new())
}
}