timely/dataflow/channels/pushers/
buffer.rs1use crate::communication::Push;
5use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
6use crate::dataflow::channels::Message;
7use crate::dataflow::operators::Capability;
8use crate::progress::Timestamp;
9use crate::{Container, Data};
10
11#[derive(Debug)]
16pub struct Buffer<T, CB, P> {
17 time: Option<T>,
19 builder: CB,
21 pusher: P,
23}
24
25impl<T, CB: Default, P> Buffer<T, CB, P> {
26 pub fn new(pusher: P) -> Self {
28 Self {
29 time: None,
30 builder: Default::default(),
31 pusher,
32 }
33 }
34
35 pub fn inner(&mut self) -> &mut P { &mut self.pusher }
39
40 pub fn builder(&self) -> &CB {
43 &self.builder
44 }
45}
46
47impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
48 #[inline]
50 pub fn session(&mut self, time: &T) -> Session<'_, T, CapacityContainerBuilder<C>, P> {
51 self.session_with_builder(time)
52 }
53
54 #[inline]
56 pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<'_, T, CapacityContainerBuilder<C>, P> where T: Timestamp {
57 self.autoflush_session_with_builder(cap)
58 }
59}
60
61impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
62 pub fn session_with_builder(&mut self, time: &T) -> Session<'_, T, CB, P> {
64 if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
65 self.time = Some(time.clone());
66 Session { buffer: self }
67 }
68
69 pub fn autoflush_session_with_builder(&mut self, cap: Capability<T>) -> AutoflushSession<'_, T, CB, P> where T: Timestamp {
71 if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); }
72 self.time = Some(cap.time().clone());
73 AutoflushSession {
74 buffer: self,
75 _capability: cap,
76 }
77 }
78}
79
80impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
81 pub fn cease(&mut self) {
83 self.flush();
84 self.builder.relax();
85 self.pusher.push(&mut None);
86 }
87
88 #[inline]
90 fn extract_and_send(&mut self) {
91 while let Some(container) = self.builder.extract() {
92 let time = self.time.as_ref().unwrap().clone();
93 Message::push_at(container, time, &mut self.pusher);
94 }
95 }
96
97 #[inline]
99 fn flush(&mut self) {
100 while let Some(container) = self.builder.finish() {
101 let time = self.time.as_ref().unwrap().clone();
102 Message::push_at(container, time, &mut self.pusher);
103 }
104 }
105
106 fn give_container(&mut self, container: &mut CB::Container) {
112 if !container.is_empty() {
113 self.flush();
114 let time = self.time.as_ref().unwrap().clone();
115 Message::push_at(container, time, &mut self.pusher);
116 }
117 }
118
119 #[inline]
121 fn push_internal<D>(&mut self, item: D) where CB: PushInto<D> {
122 self.builder.push_into(item);
123 self.extract_and_send();
124 }
125}
126
127pub struct Session<'a, T, CB, P> {
133 buffer: &'a mut Buffer<T, CB, P>,
134}
135
136impl<'a, T, CB: ContainerBuilder, P> Session<'a, T, CB, P>
137where
138 T: Eq + Clone + 'a,
139 P: Push<Message<T, CB::Container>> + 'a,
140{
141 pub fn give_container(&mut self, container: &mut CB::Container) {
144 self.buffer.give_container(container)
145 }
146}
147
148impl<'a, T, CB, P> Session<'a, T, CB, P>
149where
150 T: Eq + Clone + 'a,
151 CB: ContainerBuilder + 'a,
152 P: Push<Message<T, CB::Container>> + 'a
153{
154 pub fn builder(&self) -> &CB {
157 self.buffer.builder()
158 }
159
160 #[inline]
162 pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
163 self.push_into(data);
164 }
165
166 #[inline]
168 pub fn give_iterator<I>(&mut self, iter: I)
169 where
170 I: Iterator,
171 CB: PushInto<I::Item>,
172 {
173 for item in iter {
174 self.push_into(item);
175 }
176 }
177}
178
179impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
180where
181 T: Eq + Clone + 'a,
182 CB: ContainerBuilder + PushInto<D> + 'a,
183 P: Push<Message<T, CB::Container>> + 'a,
184{
185 #[inline]
186 fn push_into(&mut self, item: D) {
187 self.buffer.push_internal(item);
188 }
189}
190
191pub struct AutoflushSession<'a, T, CB, P>
193where
194 T: Timestamp + 'a,
195 CB: ContainerBuilder + 'a,
196 P: Push<Message<T, CB::Container>> + 'a,
197{
198 buffer: &'a mut Buffer<T, CB, P>,
200 _capability: Capability<T>,
202}
203
204impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P>
205where
206 T: Timestamp + 'a,
207 CB: ContainerBuilder + 'a,
208 P: Push<Message<T, CB::Container>> + 'a,
209{
210 #[inline]
212 pub fn give<D>(&mut self, data: D)
213 where
214 CB: PushInto<D>,
215 {
216 self.push_into(data);
217 }
218
219 #[inline]
221 pub fn give_iterator<I, D>(&mut self, iter: I)
222 where
223 I: Iterator<Item=D>,
224 CB: PushInto<D>,
225 {
226 for item in iter {
227 self.push_into(item);
228 }
229 }
230}
231impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
232where
233 T: Timestamp + 'a,
234 CB: ContainerBuilder + PushInto<D> + 'a,
235 P: Push<Message<T, CB::Container>> + 'a,
236{
237 #[inline]
238 fn push_into(&mut self, item: D) {
239 self.buffer.push_internal(item);
240 }
241}
242
243impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
244where
245 T: Timestamp + 'a,
246 CB: ContainerBuilder + 'a,
247 P: Push<Message<T, CB::Container>> + 'a,
248{
249 fn drop(&mut self) {
250 self.buffer.cease();
251 }
252}