timely/dataflow/channels/pushers/
buffer.rs

1//! Buffering and session mechanisms to provide the appearance of record-at-a-time sending,
2//! with the performance of batched sends.
3
4use crate::communication::Push;
5use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
6use crate::dataflow::channels::Message;
7use crate::dataflow::operators::Capability;
8use crate::progress::Timestamp;
9use crate::{Container, Data};
10
11/// Buffers data sent at the same time, for efficient communication.
12///
13/// The `Buffer` type should be used by calling `session` with a time, which checks whether
14/// data must be flushed and creates a `Session` object which allows sending at the given time.
15#[derive(Debug)]
16pub struct Buffer<T, CB, P> {
17    /// The currently open time, if it is open.
18    time: Option<T>,
19    /// A builder for containers, to send at `self.time`.
20    builder: CB,
21    /// The pusher to send data downstream.
22    pusher: P,
23}
24
25impl<T, CB: Default, P> Buffer<T, CB, P> {
26    /// Creates a new `Buffer`.
27    pub fn new(pusher: P) -> Self {
28        Self {
29            time: None,
30            builder: Default::default(),
31            pusher,
32        }
33    }
34
35    /// Returns a reference to the inner `P: Push` type.
36    ///
37    /// This is currently used internally, and should not be used without some care.
38    pub fn inner(&mut self) -> &mut P { &mut self.pusher }
39
40    /// Access the builder. Immutable access to prevent races with flushing
41    /// the underlying buffer.
42    pub fn builder(&self) -> &CB {
43        &self.builder
44    }
45}
46
47impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
48    /// Returns a `Session`, which accepts data to send at the associated time
49    #[inline]
50    pub fn session(&mut self, time: &T) -> Session<'_, T, CapacityContainerBuilder<C>, P> {
51        self.session_with_builder(time)
52    }
53
54    /// Allocates a new `AutoflushSession` which flushes itself on drop.
55    #[inline]
56    pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<'_, T, CapacityContainerBuilder<C>, P> where T: Timestamp {
57        self.autoflush_session_with_builder(cap)
58    }
59}
60
61impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
62    /// Returns a `Session`, which accepts data to send at the associated time
63    pub fn session_with_builder(&mut self, time: &T) -> Session<'_, T, CB, P> {
64        if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
65        self.time = Some(time.clone());
66        Session { buffer: self }
67    }
68
69    /// Allocates a new `AutoflushSession` which flushes itself on drop.
70    pub fn autoflush_session_with_builder(&mut self, cap: Capability<T>) -> AutoflushSession<'_, T, CB, P> where T: Timestamp {
71        if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); }
72        self.time = Some(cap.time().clone());
73        AutoflushSession {
74            buffer: self,
75            _capability: cap,
76        }
77    }
78}
79
80impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
81    /// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush.
82    pub fn cease(&mut self) {
83        self.flush();
84        self.builder.relax();
85        self.pusher.push(&mut None);
86    }
87
88    /// Extract pending data from the builder, but not forcing a flush.
89    #[inline]
90    fn extract_and_send(&mut self) {
91        while let Some(container) = self.builder.extract() {
92            let time = self.time.as_ref().unwrap().clone();
93            Message::push_at(container, time, &mut self.pusher);
94        }
95    }
96
97    /// Flush the builder, forcing all its contents to be written.
98    #[inline]
99    fn flush(&mut self) {
100        while let Some(container) = self.builder.finish() {
101            let time = self.time.as_ref().unwrap().clone();
102            Message::push_at(container, time, &mut self.pusher);
103        }
104    }
105
106    /// Gives an entire container at the current time. Maintains FIFO order with previously pushed
107    /// data. Only intended to be used through [`Session::give_container`].
108    // TODO: This method could exist without a container builder, but we can't express this as a
109    // buffer always requires a container builder. We could expose the buffer's underlying pusher
110    // directly, but this would bypass the buffer's time tracking.
111    fn give_container(&mut self, container: &mut CB::Container) {
112        if !container.is_empty() {
113            self.flush();
114            let time = self.time.as_ref().unwrap().clone();
115            Message::push_at(container, time, &mut self.pusher);
116        }
117    }
118    
119    /// An internal implementation of push that should only be called by sessions.
120    #[inline]
121    fn push_internal<D>(&mut self, item: D) where CB: PushInto<D> {
122        self.builder.push_into(item);
123        self.extract_and_send();
124    }
125}
126
127/// An output session for sending records at a specified time.
128///
129/// The `Session` struct provides the user-facing interface to an operator output, namely
130/// the `Buffer` type. A `Session` wraps a session of output at a specified time, and
131/// avoids what would otherwise be a constant cost of checking timestamp equality.
132pub struct Session<'a, T, CB, P> {
133    buffer: &'a mut Buffer<T, CB, P>,
134}
135
136impl<'a, T, CB: ContainerBuilder, P> Session<'a, T, CB, P>
137where
138    T: Eq + Clone + 'a,
139    P: Push<Message<T, CB::Container>> + 'a,
140{
141    /// Provide a container at the time specified by the [Session]. Maintains FIFO order with
142    /// previously pushed data.
143    pub fn give_container(&mut self, container: &mut CB::Container) {
144        self.buffer.give_container(container)
145    }
146}
147
148impl<'a, T, CB, P> Session<'a, T, CB, P>
149where
150    T: Eq + Clone + 'a,
151    CB: ContainerBuilder + 'a,
152    P: Push<Message<T, CB::Container>> + 'a
153{
154    /// Access the builder. Immutable access to prevent races with flushing
155    /// the underlying buffer.
156    pub fn builder(&self) -> &CB {
157        self.buffer.builder()
158    }
159
160    /// Provides one record at the time specified by the `Session`.
161    #[inline]
162    pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
163        self.push_into(data);
164    }
165
166    /// Provides an iterator of records at the time specified by the `Session`.
167    #[inline]
168    pub fn give_iterator<I>(&mut self, iter: I)
169    where
170        I: Iterator,
171        CB: PushInto<I::Item>,
172    {
173        for item in iter {
174            self.push_into(item);
175        }
176    }
177}
178
179impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
180where
181    T: Eq + Clone + 'a,
182    CB: ContainerBuilder + PushInto<D> + 'a,
183    P: Push<Message<T, CB::Container>> + 'a,
184{
185    #[inline]
186    fn push_into(&mut self, item: D) {
187        self.buffer.push_internal(item);
188    }
189}
190
191/// A session which will flush itself when dropped.
192pub struct AutoflushSession<'a, T, CB, P>
193where
194    T: Timestamp + 'a,
195    CB: ContainerBuilder + 'a,
196    P: Push<Message<T, CB::Container>> + 'a,
197{
198    /// A reference to the underlying buffer.
199    buffer: &'a mut Buffer<T, CB, P>,
200    /// The capability being used to send the data.
201    _capability: Capability<T>,
202}
203
204impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P>
205where
206    T: Timestamp + 'a,
207    CB: ContainerBuilder + 'a,
208    P: Push<Message<T, CB::Container>> + 'a,
209{
210    /// Transmits a single record.
211    #[inline]
212    pub fn give<D>(&mut self, data: D)
213    where
214        CB: PushInto<D>,
215    {
216        self.push_into(data);
217    }
218
219    /// Transmits records produced by an iterator.
220    #[inline]
221    pub fn give_iterator<I, D>(&mut self, iter: I)
222    where
223        I: Iterator<Item=D>,
224        CB: PushInto<D>,
225    {
226        for item in iter {
227            self.push_into(item);
228        }
229    }
230}
231impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
232where
233    T: Timestamp + 'a,
234    CB: ContainerBuilder + PushInto<D> + 'a,
235    P: Push<Message<T, CB::Container>> + 'a,
236{
237    #[inline]
238    fn push_into(&mut self, item: D) {
239        self.buffer.push_internal(item);
240    }
241}
242
243impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
244where
245    T: Timestamp + 'a,
246    CB: ContainerBuilder + 'a,
247    P: Push<Message<T, CB::Container>> + 'a,
248{
249    fn drop(&mut self) {
250        self.buffer.cease();
251    }
252}