differential_dataflow/operators/arrange/
writer.rs1use std::rc::{Rc, Weak};
7use std::cell::RefCell;
8
9use timely::progress::Antichain;
10
11use crate::trace::{Trace, Batch, BatchReader};
12use crate::trace::wrappers::rc::TraceBox;
13
14
15use super::TraceAgentQueueWriter;
16use super::TraceReplayInstruction;
17
18pub struct TraceWriter<Tr: Trace> {
23 upper: Antichain<Tr::Time>,
25 trace: Weak<RefCell<TraceBox<Tr>>>,
27 queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
29}
30
31impl<Tr: Trace> TraceWriter<Tr> {
32 pub fn new(
34 upper: Vec<Tr::Time>,
35 trace: Weak<RefCell<TraceBox<Tr>>>,
36 queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
37 ) -> Self
38 {
39 let mut temp = Antichain::new();
40 temp.extend(upper);
41 Self { upper: temp, trace, queues }
42 }
43
44 pub fn exert(&mut self) {
46 if let Some(trace) = self.trace.upgrade() {
47 trace.borrow_mut().trace.exert();
48 }
49 }
50
51 pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
57
58 if !(&self.upper == batch.lower()) {
60 println!("{:?} vs {:?}", self.upper, batch.lower());
61 }
62 assert!(&self.upper == batch.lower());
63 assert!(batch.lower() != batch.upper());
64
65 self.upper.clone_from(batch.upper());
66
67 let mut borrow = self.queues.borrow_mut();
69 for queue in borrow.iter_mut() {
70 if let Some(pair) = queue.upgrade() {
71 pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
72 pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().clone()));
73 pair.0.activate();
74 }
75 }
76 borrow.retain(|w| w.upgrade().is_some());
77
78 if let Some(trace) = self.trace.upgrade() {
80 trace.borrow_mut().trace.insert(batch);
81 }
82
83 }
84
85 pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
87 if self.upper != upper {
88 self.insert(Tr::Batch::empty(self.upper.clone(), upper), None);
89 }
90 }
91}
92
93impl<Tr: Trace> Drop for TraceWriter<Tr> {
94 fn drop(&mut self) {
95 self.seal(Antichain::new())
96 }
97}