timely/dataflow/operators/core/
unordered_input.rs
1use std::rc::Rc;
4use std::cell::RefCell;
5use crate::{Container, Data};
6use crate::container::{ContainerBuilder, CapacityContainerBuilder};
7
8use crate::scheduling::{Schedule, ActivateOnDrop};
9
10use crate::progress::{Operate, operate::SharedProgress, Timestamp};
11use crate::progress::Source;
12use crate::progress::ChangeBatch;
13use crate::progress::operate::Connectivity;
14use crate::dataflow::channels::pushers::{Counter, Tee};
15use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
16
17use crate::dataflow::operators::{ActivateCapability, Capability};
18
19use crate::dataflow::{Scope, StreamCore};
20
21pub trait UnorderedInput<G: Scope> {
23 fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>);
79}
80
81impl<G: Scope> UnorderedInput<G> for G {
82 fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>) {
83
84 let (output, registrar) = Tee::<G::Timestamp, CB::Container>::new();
85 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
86 let cap = Capability::new(G::Timestamp::minimum(), Rc::clone(&internal));
88 let counter = Counter::new(output);
89 let produced = Rc::clone(counter.produced());
90 let peers = self.peers();
91
92 let index = self.allocate_operator_index();
93 let address = self.addr_for_child(index);
94
95 let cap = ActivateCapability::new(cap, Rc::clone(&address), self.activations());
96
97 let helper = UnorderedHandle::new(counter);
98
99 self.add_operator_with_index(Box::new(UnorderedOperator {
100 name: "UnorderedInput".to_owned(),
101 address,
102 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
103 internal,
104 produced,
105 peers,
106 }), index);
107
108 ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone()))
109 }
110}
111
112struct UnorderedOperator<T:Timestamp> {
113 name: String,
114 address: Rc<[usize]>,
115 shared_progress: Rc<RefCell<SharedProgress<T>>>,
116 internal: Rc<RefCell<ChangeBatch<T>>>,
117 produced: Rc<RefCell<ChangeBatch<T>>>,
118 peers: usize,
119}
120
121impl<T:Timestamp> Schedule for UnorderedOperator<T> {
122 fn name(&self) -> &str { &self.name }
123 fn path(&self) -> &[usize] { &self.address[..] }
124 fn schedule(&mut self) -> bool {
125 let shared_progress = &mut *self.shared_progress.borrow_mut();
126 self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
127 self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
128 false
129 }
130}
131
132impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
133 fn inputs(&self) -> usize { 0 }
134 fn outputs(&self) -> usize { 1 }
135
136 fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
137 let mut borrow = self.internal.borrow_mut();
138 for (time, count) in borrow.drain() {
139 self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
140 }
141 (Vec::new(), Rc::clone(&self.shared_progress))
142 }
143
144 fn notify_me(&self) -> bool { false }
145}
146
147#[derive(Debug)]
149pub struct UnorderedHandle<T: Timestamp, CB: ContainerBuilder> {
150 buffer: PushBuffer<T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>,
151}
152
153impl<T: Timestamp, CB: ContainerBuilder> UnorderedHandle<T, CB> {
154 fn new(pusher: Counter<T, CB::Container, Tee<T, CB::Container>>) -> UnorderedHandle<T, CB> {
155 UnorderedHandle {
156 buffer: PushBuffer::new(pusher),
157 }
158 }
159
160 #[inline]
162 pub fn session_with_builder(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>> {
163 ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), Rc::clone(&cap.address), Rc::clone(&cap.activations))
164 }
165}
166
167impl<T: Timestamp, C: Container + Data> UnorderedHandle<T, CapacityContainerBuilder<C>> {
168 #[inline]
170 pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<T, CapacityContainerBuilder<C>, Counter<T, C, Tee<T, C>>>> {
171 self.session_with_builder(cap)
172 }
173}