1//! Buffering and session mechanisms to provide the appearance of record-at-a-time sending,
2//! with the performance of batched sends.
34use crate::communication::Push;
5use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
6use crate::dataflow::channels::Message;
7use crate::dataflow::operators::Capability;
8use crate::progress::Timestamp;
9use crate::{Container, Data};
1011/// Buffers data sent at the same time, for efficient communication.
12///
13/// The `Buffer` type should be used by calling `session` with a time, which checks whether
14/// data must be flushed and creates a `Session` object which allows sending at the given time.
15#[derive(Debug)]
16pub struct Buffer<T, CB, P> {
17/// The currently open time, if it is open.
18time: Option<T>,
19/// A builder for containers, to send at `self.time`.
20builder: CB,
21/// The pusher to send data downstream.
22pusher: P,
23}
2425impl<T, CB: Default, P> Buffer<T, CB, P> {
26/// Creates a new `Buffer`.
27pub fn new(pusher: P) -> Self {
28Self {
29 time: None,
30 builder: Default::default(),
31 pusher,
32 }
33 }
3435/// Returns a reference to the inner `P: Push` type.
36 ///
37 /// This is currently used internally, and should not be used without some care.
38pub fn inner(&mut self) -> &mut P { &mut self.pusher }
3940/// Access the builder. Immutable access to prevent races with flushing
41 /// the underlying buffer.
42pub fn builder(&self) -> &CB {
43&self.builder
44 }
45}
4647impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
48/// Returns a `Session`, which accepts data to send at the associated time
49#[inline]
50pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
51self.session_with_builder(time)
52 }
5354/// Allocates a new `AutoflushSession` which flushes itself on drop.
55#[inline]
56pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<T, CapacityContainerBuilder<C>, P> where T: Timestamp {
57self.autoflush_session_with_builder(cap)
58 }
59}
6061impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
62/// Returns a `Session`, which accepts data to send at the associated time
63pub fn session_with_builder(&mut self, time: &T) -> Session<T, CB, P> {
64if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
65self.time = Some(time.clone());
66 Session { buffer: self }
67 }
6869/// Allocates a new `AutoflushSession` which flushes itself on drop.
70pub fn autoflush_session_with_builder(&mut self, cap: Capability<T>) -> AutoflushSession<T, CB, P> where T: Timestamp {
71if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); }
72self.time = Some(cap.time().clone());
73 AutoflushSession {
74 buffer: self,
75 _capability: cap,
76 }
77 }
78}
7980impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
81/// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush.
82pub fn cease(&mut self) {
83self.flush();
84self.pusher.push(&mut None);
85 }
8687/// Extract pending data from the builder, but not forcing a flush.
88#[inline]
89fn extract_and_send(&mut self) {
90while let Some(container) = self.builder.extract() {
91let time = self.time.as_ref().unwrap().clone();
92 Message::push_at(container, time, &mut self.pusher);
93 }
94 }
9596/// Flush the builder, forcing all its contents to be written.
97#[inline]
98fn flush(&mut self) {
99while let Some(container) = self.builder.finish() {
100let time = self.time.as_ref().unwrap().clone();
101 Message::push_at(container, time, &mut self.pusher);
102 }
103 }
104105/// Gives an entire container at the current time. Maintains FIFO order with previously pushed
106 /// data. Only intended to be used through [`Session::give_container`].
107// TODO: This method could exist without a container builder, but we can't express this as a
108 // buffer always requires a container builder. We could expose the buffer's underlying pusher
109 // directly, but this would bypass the buffer's time tracking.
110fn give_container(&mut self, container: &mut CB::Container) {
111if !container.is_empty() {
112self.flush();
113let time = self.time.as_ref().unwrap().clone();
114 Message::push_at(container, time, &mut self.pusher);
115 }
116 }
117118/// An internal implementation of push that should only be called by sessions.
119#[inline]
120fn push_internal<D>(&mut self, item: D) where CB: PushInto<D> {
121self.builder.push_into(item);
122self.extract_and_send();
123 }
124}
125126/// An output session for sending records at a specified time.
127///
128/// The `Session` struct provides the user-facing interface to an operator output, namely
129/// the `Buffer` type. A `Session` wraps a session of output at a specified time, and
130/// avoids what would otherwise be a constant cost of checking timestamp equality.
131pub struct Session<'a, T, CB, P> {
132 buffer: &'a mut Buffer<T, CB, P>,
133}
134135impl<'a, T, CB: ContainerBuilder, P> Session<'a, T, CB, P>
136where
137T: Eq + Clone + 'a,
138 P: Push<Message<T, CB::Container>> + 'a,
139{
140/// Provide a container at the time specified by the [Session]. Maintains FIFO order with
141 /// previously pushed data.
142pub fn give_container(&mut self, container: &mut CB::Container) {
143self.buffer.give_container(container)
144 }
145}
146147impl<'a, T, CB, P> Session<'a, T, CB, P>
148where
149T: Eq + Clone + 'a,
150 CB: ContainerBuilder + 'a,
151 P: Push<Message<T, CB::Container>> + 'a
152{
153/// Access the builder. Immutable access to prevent races with flushing
154 /// the underlying buffer.
155pub fn builder(&self) -> &CB {
156self.buffer.builder()
157 }
158159/// Provides one record at the time specified by the `Session`.
160#[inline]
161pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
162self.push_into(data);
163 }
164165/// Provides an iterator of records at the time specified by the `Session`.
166#[inline]
167pub fn give_iterator<I>(&mut self, iter: I)
168where
169I: Iterator,
170 CB: PushInto<I::Item>,
171 {
172for item in iter {
173self.push_into(item);
174 }
175 }
176}
177178impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
179where
180T: Eq + Clone + 'a,
181 CB: ContainerBuilder + PushInto<D> + 'a,
182 P: Push<Message<T, CB::Container>> + 'a,
183{
184#[inline]
185fn push_into(&mut self, item: D) {
186self.buffer.push_internal(item);
187 }
188}
189190/// A session which will flush itself when dropped.
191pub struct AutoflushSession<'a, T, CB, P>
192where
193T: Timestamp + 'a,
194 CB: ContainerBuilder + 'a,
195 P: Push<Message<T, CB::Container>> + 'a,
196{
197/// A reference to the underlying buffer.
198buffer: &'a mut Buffer<T, CB, P>,
199/// The capability being used to send the data.
200_capability: Capability<T>,
201}
202203impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P>
204where
205T: Timestamp + 'a,
206 CB: ContainerBuilder + 'a,
207 P: Push<Message<T, CB::Container>> + 'a,
208{
209/// Transmits a single record.
210#[inline]
211pub fn give<D>(&mut self, data: D)
212where
213CB: PushInto<D>,
214 {
215self.push_into(data);
216 }
217218/// Transmits records produced by an iterator.
219#[inline]
220pub fn give_iterator<I, D>(&mut self, iter: I)
221where
222I: Iterator<Item=D>,
223 CB: PushInto<D>,
224 {
225for item in iter {
226self.push_into(item);
227 }
228 }
229}
230impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
231where
232T: Timestamp + 'a,
233 CB: ContainerBuilder + PushInto<D> + 'a,
234 P: Push<Message<T, CB::Container>> + 'a,
235{
236#[inline]
237fn push_into(&mut self, item: D) {
238self.buffer.push_internal(item);
239 }
240}
241242impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
243where
244T: Timestamp + 'a,
245 CB: ContainerBuilder + 'a,
246 P: Push<Message<T, CB::Container>> + 'a,
247{
248fn drop(&mut self) {
249self.buffer.cease();
250 }
251}