timely/dataflow/operators/core/to_stream.rs
1//! Conversion to the `StreamCore` type from iterators.
2
3use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto};
4use crate::{Container, Data};
5use crate::dataflow::operators::generic::operator::source;
6use crate::dataflow::{StreamCore, Scope};
7
8/// Converts to a timely [StreamCore], using a container builder.
9pub trait ToStreamBuilder<CB: ContainerBuilder> {
10 /// Converts to a timely [StreamCore], using the supplied container builder type.
11 ///
12 /// # Examples
13 ///
14 /// ```
15 /// use timely::dataflow::operators::core::{ToStreamBuilder, Capture};
16 /// use timely::dataflow::operators::core::capture::Extract;
17 /// use timely::container::CapacityContainerBuilder;
18 ///
19 /// let (data1, data2) = timely::example(|scope| {
20 /// let data1 = ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(0..3, scope)
21 /// .container::<Vec<_>>()
22 /// .capture();
23 /// let data2 = ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(vec![0,1,2], scope)
24 /// .container::<Vec<_>>()
25 /// .capture();
26 /// (data1, data2)
27 /// });
28 ///
29 /// assert_eq!(data1.extract(), data2.extract());
30 /// ```
31 fn to_stream_with_builder<S: Scope>(self, scope: &mut S) -> StreamCore<S, CB::Container>;
32}
33
34impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I where CB: PushInto<I::Item> {
35 fn to_stream_with_builder<S: Scope>(self, scope: &mut S) -> StreamCore<S, CB::Container> {
36
37 source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {
38
39 // Acquire an activator, so that the operator can rescheduled itself.
40 let activator = scope.activator_for(info.address);
41
42 let mut iterator = self.into_iter().fuse();
43 let mut capability = Some(capability);
44
45 move |output| {
46
47 if let Some(element) = iterator.next() {
48 let mut session = output.session_with_builder(capability.as_ref().unwrap());
49 session.give(element);
50 let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
51 session.give_iterator(iterator.by_ref().take(n - 1));
52 activator.activate();
53 }
54 else {
55 capability = None;
56 }
57 }
58 })
59 }
60}
61
62/// Converts to a timely [StreamCore]. Equivalent to [`ToStreamBuilder`] but
63/// uses a [`CapacityContainerBuilder`].
64pub trait ToStream<C: Container> {
65 /// Converts to a timely [StreamCore].
66 ///
67 /// # Examples
68 ///
69 /// ```
70 /// use timely::dataflow::operators::core::{ToStream, Capture};
71 /// use timely::dataflow::operators::core::capture::Extract;
72 ///
73 /// let (data1, data2) = timely::example(|scope| {
74 /// let data1 = (0..3).to_stream(scope).container::<Vec<_>>().capture();
75 /// let data2 = vec![0,1,2].to_stream(scope).container::<Vec<_>>().capture();
76 /// (data1, data2)
77 /// });
78 ///
79 /// assert_eq!(data1.extract(), data2.extract());
80 /// ```
81 fn to_stream<S: Scope>(self, scope: &mut S) -> StreamCore<S, C>;
82}
83
84impl<C: SizableContainer + Data, I: IntoIterator+'static> ToStream<C> for I where C: PushInto<I::Item> {
85 fn to_stream<S: Scope>(self, scope: &mut S) -> StreamCore<S, C> {
86 ToStreamBuilder::<CapacityContainerBuilder<C>>::to_stream_with_builder(self, scope)
87 }
88}