timely/dataflow/operators/core/to_stream.rs
1//! Conversion to the `StreamCore` type from iterators.
2
3use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto};
4use crate::Container;
5use crate::dataflow::operators::generic::operator::source;
6use crate::dataflow::{StreamCore, Scope};
7
8/// Converts to a timely [StreamCore], using a container builder.
9pub trait ToStreamBuilder<CB: ContainerBuilder> {
10    /// Converts to a timely [StreamCore], using the supplied container builder type.
11    ///
12    /// # Examples
13    ///
14    /// ```
15    /// use timely::dataflow::operators::core::{ToStreamBuilder, Capture};
16    /// use timely::dataflow::operators::core::capture::Extract;
17    /// use timely::container::CapacityContainerBuilder;
18    ///
19    /// let (data1, data2) = timely::example(|scope| {
20    ///     let data1 = ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(0..3, scope)
21    ///         .container::<Vec<_>>()
22    ///         .capture();
23    ///     let data2 = ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(vec![0,1,2], scope)
24    ///         .container::<Vec<_>>()
25    ///         .capture();
26    ///     (data1, data2)
27    /// });
28    ///
29    /// assert_eq!(data1.extract(), data2.extract());
30    /// ```
31    fn to_stream_with_builder<S: Scope>(self, scope: &mut S) -> StreamCore<S, CB::Container>;
32}
33
34impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I where CB: PushInto<I::Item> {
35    fn to_stream_with_builder<S: Scope>(self, scope: &mut S) -> StreamCore<S, CB::Container> {
36
37        source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {
38
39            // Acquire an activator, so that the operator can rescheduled itself.
40            let activator = scope.activator_for(info.address);
41
42            let mut iterator = self.into_iter().fuse();
43            let mut capability = Some(capability);
44
45            move |output| {
46
47                if let Some(element) = iterator.next() {
48                    let mut session = output.session_with_builder(capability.as_ref().unwrap());
49                    session.give(element);
50                    let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
51                    session.give_iterator(iterator.by_ref().take(n - 1));
52                    activator.activate();
53                }
54                else {
55                    capability = None;
56                }
57            }
58        })
59    }
60}
61
62/// Converts to a timely [StreamCore]. Equivalent to [`ToStreamBuilder`] but
63/// uses a [`CapacityContainerBuilder`].
64pub trait ToStream<C> {
65    /// Converts to a timely [StreamCore].
66    ///
67    /// # Examples
68    ///
69    /// ```
70    /// use timely::dataflow::operators::core::{ToStream, Capture};
71    /// use timely::dataflow::operators::core::capture::Extract;
72    ///
73    /// let (data1, data2) = timely::example(|scope| {
74    ///     let data1 = (0..3).to_stream(scope).container::<Vec<_>>().capture();
75    ///     let data2 = vec![0,1,2].to_stream(scope).container::<Vec<_>>().capture();
76    ///     (data1, data2)
77    /// });
78    ///
79    /// assert_eq!(data1.extract(), data2.extract());
80    /// ```
81    fn to_stream<S: Scope>(self, scope: &mut S) -> StreamCore<S, C>;
82}
83
84impl<C: Container + SizableContainer, I: IntoIterator+'static> ToStream<C> for I where C: PushInto<I::Item> {
85    fn to_stream<S: Scope>(self, scope: &mut S) -> StreamCore<S, C> {
86        ToStreamBuilder::<CapacityContainerBuilder<C>>::to_stream_with_builder(self, scope)
87    }
88}