Skip to main content

timely/dataflow/operators/core/
to_stream.rs

1//! Conversion to the `Stream` type from iterators.
2
3use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto};
4use crate::progress::Timestamp;
5use crate::{Container, ContainerBuilder};
6use crate::dataflow::operators::generic::operator::source;
7use crate::dataflow::{Stream, Scope};
8
9/// Converts to a timely [Stream], using a container builder.
10pub trait ToStreamBuilder {
11    /// The item type produced by this iterator-like source.
12    type Item;
13
14    /// Converts to a timely [Stream], using the supplied container builder type.
15    ///
16    /// # Examples
17    ///
18    /// ```
19    /// use timely::dataflow::operators::core::{ToStreamBuilder, Capture};
20    /// use timely::dataflow::operators::core::capture::Extract;
21    /// use timely::container::CapacityContainerBuilder;
22    ///
23    /// let (data1, data2) = timely::example(|scope| {
24    ///     let data1 = (0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)
25    ///         .container::<Vec<_>>()
26    ///         .capture();
27    ///     let data2 = vec![0,1,2].to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)
28    ///         .container::<Vec<_>>()
29    ///         .capture();
30    ///     (data1, data2)
31    /// });
32    ///
33    /// assert_eq!(data1.extract(), data2.extract());
34    /// ```
35    fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
36    where
37        CB: PushInto<Self::Item>;
38}
39
40impl<I: IntoIterator+'static> ToStreamBuilder for I {
41    type Item = I::Item;
42
43    fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
44    where
45        CB: PushInto<I::Item>
46    {
47        source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {
48
49            // Acquire an activator, so that the operator can rescheduled itself.
50            let activator = scope.activator_for(info.address);
51
52            let mut iterator = self.into_iter().fuse();
53            let mut capability = Some(capability);
54
55            move |output| {
56
57                if let Some(element) = iterator.next() {
58                    let mut session = output.session_with_builder(capability.as_ref().unwrap());
59                    session.give(element);
60                    let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
61                    session.give_iterator(iterator.by_ref().take(n - 1));
62                    activator.activate();
63                }
64                else {
65                    capability = None;
66                }
67            }
68        })
69    }
70}
71
72/// Converts to a timely [Stream]. Equivalent to [`ToStreamBuilder`] but
73/// uses a [`CapacityContainerBuilder`].
74pub trait ToStream<C> {
75    /// Converts to a timely [Stream].
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// use timely::dataflow::operators::core::{ToStream, Capture};
81    /// use timely::dataflow::operators::core::capture::Extract;
82    ///
83    /// let (data1, data2) = timely::example(|scope| {
84    ///     let data1 = (0..3).to_stream(scope).container::<Vec<_>>().capture();
85    ///     let data2 = vec![0,1,2].to_stream(scope).container::<Vec<_>>().capture();
86    ///     (data1, data2)
87    /// });
88    ///
89    /// assert_eq!(data1.extract(), data2.extract());
90    /// ```
91    fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C>;
92}
93
94impl<C: Container + SizableContainer, I: IntoIterator+'static> ToStream<C> for I where C: PushInto<I::Item> {
95    fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
96        ToStreamBuilder::to_stream_with_builder::<_, CapacityContainerBuilder<C>>(self, scope)
97    }
98}