timely/dataflow/operators/generic/
handles.rs1use std::rc::Rc;
7use std::cell::RefCell;
8use std::collections::VecDeque;
9
10use crate::progress::Timestamp;
11use crate::progress::ChangeBatch;
12use crate::progress::operate::PortConnectivity;
13use crate::dataflow::channels::pullers::Counter as PullCounter;
14use crate::dataflow::channels::Message;
15use crate::communication::Pull;
16use crate::{Container, ContainerBuilder, Accountable};
17use crate::container::{CapacityContainerBuilder, PushInto};
18
19use crate::dataflow::operators::InputCapability;
20use crate::dataflow::operators::capability::CapabilityTrait;
21
22pub struct InputHandleCore<T: Timestamp, C, P: Pull<Message<T, C>>> {
24 pull_counter: PullCounter<T, C, P>,
25 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
26 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
31 staging: VecDeque<(InputCapability<T>, C)>,
33 staged: Vec<C>,
34}
35
36impl<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
37 #[inline]
41 fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
42 let internal = &self.internal;
43 let summaries = &self.summaries;
44 self.pull_counter.next_guarded().map(|(guard, bundle)| {
45 (InputCapability::new(Rc::clone(internal), Rc::clone(summaries), guard), &mut bundle.data)
46 })
47 }
48 pub fn for_each<F>(&mut self, mut logic: F) where F: FnMut(InputCapability<T>, &mut C) {
53 while let Some((cap, data)) = self.next() { logic(cap, data); }
54 }
55 pub fn for_each_time<F>(&mut self, mut logic: F) where F: FnMut(InputCapability<T>, std::slice::IterMut::<C>), C: Default {
57 while let Some((cap, data)) = self.next() {
58 let data = std::mem::take(data);
59 self.staging.push_back((cap, data));
60 }
61 self.staging.make_contiguous().sort_unstable_by(|x,y| x.0.time().cmp(&y.0.time()));
62
63 while let Some((cap, data)) = self.staging.pop_front() {
64 self.staged.push(data);
65 let more = self.staging.iter().take_while(|(c,_)| c.time() == cap.time()).count();
66 self.staged.extend(self.staging.drain(..more).map(|(_,d)| d));
67 logic(cap, self.staged.iter_mut());
68 self.staged.clear();
70 }
71 }
72}
73
74pub fn new_input_handle<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(
77 pull_counter: PullCounter<T, C, P>,
78 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
79 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
80) -> InputHandleCore<T, C, P> {
81 InputHandleCore {
82 pull_counter,
83 internal,
84 summaries,
85 staging: Default::default(),
86 staged: Default::default(),
87 }
88}
89
90pub struct OutputBuilder<T: Timestamp, CB: ContainerBuilder> {
92 output: crate::dataflow::channels::pushers::Output<T, CB::Container>,
93 builder: CB,
94}
95
96impl<T: Timestamp, CB: ContainerBuilder> OutputBuilder<T, CB> {
97 pub fn from(output: crate::dataflow::channels::pushers::Output<T, CB::Container>) -> Self {
99 Self { output, builder: CB::default() }
100 }
101 pub fn activate<'a>(&'a mut self) -> OutputBuilderSession<'a, T, CB> {
103 OutputBuilderSession {
104 session: self.output.activate(),
105 builder: &mut self.builder,
106 }
107 }
108}
109
110pub struct OutputBuilderSession<'a, T: Timestamp, CB: ContainerBuilder> {
112 session: crate::dataflow::channels::pushers::OutputSession<'a, T, CB::Container>,
113 builder: &'a mut CB,
114}
115
116impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuilderSession<'a, T, CB> {
117 #[inline]
119 pub fn output_index(&self) -> usize { self.session.output_index()}
120
121 pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, capability: &'b CT) -> Session<'a, 'b, T, CB, CT> where 'a: 'b {
126 debug_assert!(self.session.valid(capability));
127 Session {
128 buffer: self,
129 capability,
130 }
131 }
132}
133
134impl<'a, T: Timestamp, C: Container> OutputBuilderSession<'a, T, CapacityContainerBuilder<C>> {
135 pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, capability: &'b CT) -> Session<'a, 'b, T, CapacityContainerBuilder<C>, CT> where 'a: 'b {
140 debug_assert!(self.session.valid(capability));
141 Session {
142 buffer: self,
143 capability,
144 }
145 }
146}
147
148pub struct Session<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> {
150 buffer: &'b mut OutputBuilderSession<'a, T, CB>,
151 capability: &'b CT,
152}
153
154impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Session<'a, 'b, T, CB, CT> {
155
156 pub fn builder(&mut self) -> &mut CB { &mut self.buffer.builder }
158
159 #[inline] pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
161 self.buffer.builder.push_into(data);
162 self.extract_and_send();
163 }
164 #[inline] pub fn give_iterator<I>(&mut self, iter: I) where I: Iterator, CB: PushInto<I::Item> {
166 for item in iter { self.buffer.builder.push_into(item); }
167 self.extract_and_send();
168 }
169 #[inline] pub fn give_container(&mut self, container: &mut CB::Container) {
171 self.buffer.session.give(&self.capability, container);
172 }
173 #[inline] pub fn give_containers<'c>(&mut self, containers: impl Iterator<Item = &'c mut CB::Container>) {
175 for container in containers { self.buffer.session.give(&self.capability, container); }
176 }
177
178 pub fn extract_and_send(&mut self) {
180 while let Some(container) = self.buffer.builder.extract() {
181 self.buffer.session.give(&self.capability, container);
182 }
183 }
184 pub fn flush(&mut self) {
186 while let Some(container) = self.buffer.builder.finish() {
187 self.buffer.session.give(&self.capability, container);
188 }
189 }
190}
191
192impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Drop for Session<'a, 'b, T, CB, CT> {
193 fn drop(&mut self) { self.flush() }
194}