timely/dataflow/operators/core/
unordered_input.rs1use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::ContainerBuilder;
7
8use crate::scheduling::{Schedule, ActivateOnDrop};
9
10use crate::progress::{Operate, operate::SharedProgress, Timestamp};
11use crate::progress::Source;
12use crate::progress::ChangeBatch;
13use crate::progress::operate::Connectivity;
14use crate::dataflow::channels::pushers::{Counter, Tee, Output};
15use crate::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
16use crate::dataflow::operators::{ActivateCapability, Capability};
17use crate::dataflow::{Scope, StreamCore};
18
19use crate::scheduling::Activations;
20
21pub trait UnorderedInput<G: Scope> {
23 fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>);
79}
80
81impl<G: Scope> UnorderedInput<G> for G {
82 fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>) {
83
84 let (output, registrar) = Tee::<G::Timestamp, CB::Container>::new();
85 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
86 let cap = Capability::new(G::Timestamp::minimum(), Rc::clone(&internal));
88 let counter = Counter::new(output);
89 let produced = Rc::clone(counter.produced());
90 let counter = Output::new(counter, Rc::clone(&internal), 0);
91 let peers = self.peers();
92
93 let index = self.allocate_operator_index();
94 let address = self.addr_for_child(index);
95
96 let cap = ActivateCapability::new(cap, Rc::clone(&address), self.activations());
97
98 let helper = UnorderedHandle::new(counter, Rc::clone(&address), self.activations());
99
100 self.add_operator_with_index(Box::new(UnorderedOperator {
101 name: "UnorderedInput".to_owned(),
102 address,
103 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
104 internal,
105 produced,
106 peers,
107 }), index);
108
109 ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone()))
110 }
111}
112
113struct UnorderedOperator<T:Timestamp> {
114 name: String,
115 address: Rc<[usize]>,
116 shared_progress: Rc<RefCell<SharedProgress<T>>>,
117 internal: Rc<RefCell<ChangeBatch<T>>>,
118 produced: Rc<RefCell<ChangeBatch<T>>>,
119 peers: usize,
120}
121
122impl<T:Timestamp> Schedule for UnorderedOperator<T> {
123 fn name(&self) -> &str { &self.name }
124 fn path(&self) -> &[usize] { &self.address[..] }
125 fn schedule(&mut self) -> bool {
126 let shared_progress = &mut *self.shared_progress.borrow_mut();
127 self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
128 self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
129 false
130 }
131}
132
133impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134 fn inputs(&self) -> usize { 0 }
135 fn outputs(&self) -> usize { 1 }
136
137 fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
138 let mut borrow = self.internal.borrow_mut();
139 for (time, count) in borrow.drain() {
140 self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
141 }
142 (Vec::new(), Rc::clone(&self.shared_progress))
143 }
144
145 fn notify_me(&self) -> bool { false }
146}
147
148pub struct UnorderedHandle<T: Timestamp, CB: ContainerBuilder> {
150 output: OutputBuilder<T, CB>,
151 address: Rc<[usize]>,
152 activations: Rc<RefCell<Activations>>,
153}
154
155impl<T: Timestamp, CB: ContainerBuilder> UnorderedHandle<T, CB> {
156 fn new(output: Output<T, CB::Container>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
157 Self {
158 output: OutputBuilder::from(output),
159 address,
160 activations,
161 }
162 }
163
164 #[inline]
166 pub fn activate(&mut self) -> ActivateOnDrop<OutputBuilderSession<'_, T, CB>> {
167 ActivateOnDrop::new(self.output.activate(), Rc::clone(&self.address), Rc::clone(&self.activations))
168 }
169}