timely/dataflow/operators/core/to_stream.rs
1//! Conversion to the `Stream` type from iterators.
2
3use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto};
4use crate::progress::Timestamp;
5use crate::{Container, ContainerBuilder};
6use crate::dataflow::operators::generic::operator::source;
7use crate::dataflow::{Stream, Scope};
8
9/// Converts to a timely [Stream], using a container builder.
10pub trait ToStreamBuilder {
11 /// The item type produced by this iterator-like source.
12 type Item;
13
14 /// Converts to a timely [Stream], using the supplied container builder type.
15 ///
16 /// # Examples
17 ///
18 /// ```
19 /// use timely::dataflow::operators::core::{ToStreamBuilder, Capture};
20 /// use timely::dataflow::operators::core::capture::Extract;
21 /// use timely::container::CapacityContainerBuilder;
22 ///
23 /// let (data1, data2) = timely::example(|scope| {
24 /// let data1 = (0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)
25 /// .container::<Vec<_>>()
26 /// .capture();
27 /// let data2 = vec![0,1,2].to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)
28 /// .container::<Vec<_>>()
29 /// .capture();
30 /// (data1, data2)
31 /// });
32 ///
33 /// assert_eq!(data1.extract(), data2.extract());
34 /// ```
35 fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
36 where
37 CB: PushInto<Self::Item>;
38}
39
40impl<I: IntoIterator+'static> ToStreamBuilder for I {
41 type Item = I::Item;
42
43 fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
44 where
45 CB: PushInto<I::Item>
46 {
47 source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {
48
49 // Acquire an activator, so that the operator can rescheduled itself.
50 let activator = scope.activator_for(info.address);
51
52 let mut iterator = self.into_iter().fuse();
53 let mut capability = Some(capability);
54
55 move |output| {
56
57 if let Some(element) = iterator.next() {
58 let mut session = output.session_with_builder(capability.as_ref().unwrap());
59 session.give(element);
60 let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
61 session.give_iterator(iterator.by_ref().take(n - 1));
62 activator.activate();
63 }
64 else {
65 capability = None;
66 }
67 }
68 })
69 }
70}
71
72/// Converts to a timely [Stream]. Equivalent to [`ToStreamBuilder`] but
73/// uses a [`CapacityContainerBuilder`].
74pub trait ToStream<C> {
75 /// Converts to a timely [Stream].
76 ///
77 /// # Examples
78 ///
79 /// ```
80 /// use timely::dataflow::operators::core::{ToStream, Capture};
81 /// use timely::dataflow::operators::core::capture::Extract;
82 ///
83 /// let (data1, data2) = timely::example(|scope| {
84 /// let data1 = (0..3).to_stream(scope).container::<Vec<_>>().capture();
85 /// let data2 = vec![0,1,2].to_stream(scope).container::<Vec<_>>().capture();
86 /// (data1, data2)
87 /// });
88 ///
89 /// assert_eq!(data1.extract(), data2.extract());
90 /// ```
91 fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C>;
92}
93
94impl<C: Container + SizableContainer, I: IntoIterator+'static> ToStream<C> for I where C: PushInto<I::Item> {
95 fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
96 ToStreamBuilder::to_stream_with_builder::<_, CapacityContainerBuilder<C>>(self, scope)
97 }
98}