1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
//! Write endpoint for a sequence of batches.
//!
//! A `TraceWriter` accepts a sequence of batches and distributes them
//! to both a shared trace and to a sequence of private queues.
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use timely::progress::Antichain;
use crate::trace::{Trace, Batch, BatchReader};
use crate::trace::wrappers::rc::TraceBox;
use super::TraceAgentQueueWriter;
use super::TraceReplayInstruction;
/// Write endpoint for a sequence of batches.
///
/// A `TraceWriter` accepts a sequence of batches and distributes them
/// to both a shared trace and to a sequence of private queues.
pub struct TraceWriter<Tr>
where
Tr: Trace,
Tr::Batch: Batch,
{
/// Current upper limit.
upper: Antichain<Tr::Time>,
/// Shared trace, possibly absent (due to weakness).
trace: Weak<RefCell<TraceBox<Tr>>>,
/// A sequence of private queues into which batches are written.
queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
}
impl<Tr> TraceWriter<Tr>
where
Tr: Trace,
Tr::Batch: Batch,
{
/// Creates a new `TraceWriter`.
pub fn new(
upper: Vec<Tr::Time>,
trace: Weak<RefCell<TraceBox<Tr>>>,
queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
) -> Self
{
let mut temp = Antichain::new();
temp.extend(upper);
Self { upper: temp, trace, queues }
}
/// Exerts merge effort, even without additional updates.
pub fn exert(&mut self) {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.exert();
}
}
/// Advances the trace by `batch`.
///
/// The `hint` argument is either `None` in the case of an empty batch,
/// or is `Some(time)` for a time less or equal to all updates in the
/// batch and which is suitable for use as a capability.
pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
// Something is wrong if not a sequence.
if !(&self.upper == batch.lower()) {
println!("{:?} vs {:?}", self.upper, batch.lower());
}
assert!(&self.upper == batch.lower());
assert!(batch.lower() != batch.upper());
self.upper.clone_from(batch.upper());
// push information to each listener that still exists.
let mut borrow = self.queues.borrow_mut();
for queue in borrow.iter_mut() {
if let Some(pair) = queue.upgrade() {
pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().clone()));
pair.0.activate();
}
}
borrow.retain(|w| w.upgrade().is_some());
// push data to the trace, if it still exists.
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.insert(batch);
}
}
/// Inserts an empty batch up to `upper`.
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
if self.upper != upper {
self.insert(Tr::Batch::empty(self.upper.clone(), upper), None);
}
}
}
impl<Tr> Drop for TraceWriter<Tr>
where
Tr: Trace,
Tr::Batch: Batch,
{
fn drop(&mut self) {
self.seal(Antichain::new())
}
}