differential_dataflow/
capture.rs

1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use std::time::Duration;
13use serde::{Deserialize, Serialize};
14
15/// A message in the CDC V2 protocol.
16#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
17pub enum Message<D, T, R> {
18    /// A batch of updates that are certain to occur.
19    ///
20    /// Each triple is an irrevocable statement about a change that occurs.
21    /// Each statement contains a datum, a time, and a difference, and asserts
22    /// that the multiplicity of the datum changes at the time by the difference.
23    Updates(Vec<(D, T, R)>),
24    /// An irrevocable statement about the number of updates within a time interval.
25    Progress(Progress<T>),
26}
27
28/// An irrevocable statement about the number of updates at times within an interval.
29///
30/// This statement covers all times beyond `lower` and not beyond `upper`.
31/// Each element of `counts` is an irrevocable statement about the exact number of
32/// distinct updates that occur at that time.
33/// Times not present in `counts` have a count of zero.
34#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
35pub struct Progress<T> {
36    /// The lower bound of times contained in this statement.
37    pub lower: Vec<T>,
38    /// The upper bound of times contained in this statement.
39    pub upper: Vec<T>,
40    /// All non-zero counts for times beyond `lower` and not beyond `upper`.
41    pub counts: Vec<(T, usize)>,
42}
43
44/// A simple sink for byte slices.
45pub trait Writer<T> {
46    /// Returns an amount of time to wait before retrying, or `None` for success.
47    fn poll(&mut self, item: &T) -> Option<Duration>;
48    /// Indicates if the sink has committed all sent data and can be safely dropped.
49    fn done(&self) -> bool;
50}
51
52/// A deduplicating, re-ordering iterator.
53pub mod iterator {
54
55    use super::{Message, Progress};
56    use crate::lattice::Lattice;
57    use std::hash::Hash;
58    use timely::order::PartialOrder;
59    use timely::progress::{
60        frontier::{AntichainRef, MutableAntichain},
61        Antichain,
62        Timestamp,
63    };
64
65    /// A direct implementation of a deduplicating, re-ordering iterator.
66    ///
67    /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
68    /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
69    /// footprint, proportional to the mismatch between the received updates and progress messages.
70    pub struct Iter<I, D, T, R>
71    where
72        I: Iterator<Item = Message<D, T, R>>,
73        T: Hash + Ord + Lattice + Clone,
74        D: Hash + Eq,
75        T: Hash + Eq,
76        R: Hash + Eq,
77    {
78        /// Source of potentially duplicated, out of order cdc_v2 messages.
79        iterator: I,
80        /// Updates that have been received, but are still beyond `reported_frontier`.
81        ///
82        /// These updates are retained both so that they can eventually be transmitted,
83        /// but also so that they can deduplicate updates that may still be received.
84        updates: std::collections::HashSet<(D, T, R)>,
85        /// Frontier through which the iterator has reported updates.
86        ///
87        /// All updates not beyond this frontier have been reported.
88        /// Any information related to times not beyond this frontier can be discarded.
89        ///
90        /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
91        /// our two bounds on potential uncertainty in progress and update messages.
92        reported_frontier: Antichain<T>,
93        /// Frontier of accepted progress statements.
94        ///
95        /// All progress message counts for times not beyond this frontier have been
96        /// incorporated in to `messages_frontier`. This frontier also guides which
97        /// received progress statements can be incorporated: those whose for which
98        /// this frontier is beyond their lower bound.
99        progress_frontier: Antichain<T>,
100        /// Counts of outstanding messages at times.
101        ///
102        /// These counts track the difference between message counts at times announced
103        /// by progress messages, and message counts at times received in distinct updates.
104        messages_frontier: MutableAntichain<T>,
105        /// Progress statements that are not yet actionable due to out-of-Iterness.
106        ///
107        /// A progress statement becomes actionable once the progress frontier is beyond
108        /// its lower frontier. This ensures that the [0, lower) interval is already
109        /// incorporated, and that we will not leave a gap by incorporating the counts
110        /// and reflecting the progress statement's upper frontier.
111        progress_queue: Vec<Progress<T>>,
112    }
113
114    impl<D, T, R, I> Iterator for Iter<I, D, T, R>
115    where
116        I: Iterator<Item = Message<D, T, R>>,
117        T: Hash + Ord + Lattice + Clone,
118        D: Hash + Eq + Clone,
119        R: Hash + Eq + Clone,
120    {
121        type Item = (Vec<(D, T, R)>, Antichain<T>);
122        fn next(&mut self) -> Option<Self::Item> {
123            // Each call to `next` should return some newly carved interval of time.
124            // As such, we should read from our source until we find such a thing.
125            //
126            // An interval can be completed once our frontier of received progress
127            // information and our frontier of unresolved counts have advanced.
128            while let Some(message) = self.iterator.next() {
129                match message {
130                    Message::Updates(mut updates) => {
131                        // Discard updates at reported times, or duplicates at unreported times.
132                        updates.retain(|dtr| {
133                            self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
134                        });
135                        // Decrement our counts of accounted-for messages.
136                        self.messages_frontier
137                            .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
138                        // Record the messages in our de-duplication collection.
139                        self.updates.extend(updates.into_iter());
140                    }
141                    Message::Progress(progress) => {
142                        // A progress statement may not be immediately actionable.
143                        self.progress_queue.push(progress);
144                    }
145                }
146
147                // Attempt to drain actionable progress messages.
148                // A progress message is actionable if `self.progress_frontier` is greater or
149                // equal to the message's lower bound.
150                while let Some(position) = self.progress_queue.iter().position(|p| {
151                    <_ as PartialOrder>::less_equal(
152                        &AntichainRef::new(&p.lower),
153                        &self.progress_frontier.borrow(),
154                    )
155                }) {
156                    let mut progress = self.progress_queue.remove(position);
157                    // Discard counts that have already been incorporated.
158                    progress
159                        .counts
160                        .retain(|(time, _count)| self.progress_frontier.less_equal(time));
161                    // Record any new reports of expected counts.
162                    self.messages_frontier
163                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
164                    // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
165                    let mut new_frontier = Antichain::new();
166                    for time1 in progress.upper {
167                        for time2 in self.progress_frontier.elements() {
168                            new_frontier.insert(time1.join(time2));
169                        }
170                    }
171                    self.progress_queue.retain(|p| {
172                        !<_ as PartialOrder>::less_equal(
173                            &AntichainRef::new(&p.upper),
174                            &new_frontier.borrow(),
175                        )
176                    });
177                    self.progress_frontier = new_frontier;
178                }
179
180                // Now check and see if our lower bound exceeds `self.reported_frontier`.
181                let mut lower_bound = self.progress_frontier.clone();
182                lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
183                if lower_bound != self.reported_frontier {
184                    let to_publish = self
185                        .updates
186                        .iter()
187                        .filter(|(_, t, _)| !lower_bound.less_equal(t))
188                        .cloned()
189                        .collect::<Vec<_>>();
190                    self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
191                    self.reported_frontier = lower_bound.clone();
192                    return Some((to_publish, lower_bound));
193                }
194            }
195            None
196        }
197    }
198
199    impl<D, T, R, I> Iter<I, D, T, R>
200    where
201        I: Iterator<Item = Message<D, T, R>>,
202        T: Hash + Ord + Lattice + Clone + Timestamp,
203        D: Hash + Eq + Clone,
204        R: Hash + Eq + Clone,
205    {
206        /// Construct a new re-ordering, deduplicating iterator.
207        pub fn new(iterator: I) -> Self {
208            Self {
209                iterator,
210                updates: std::collections::HashSet::new(),
211                reported_frontier: Antichain::from_elem(T::minimum()),
212                progress_frontier: Antichain::from_elem(T::minimum()),
213                messages_frontier: MutableAntichain::new(),
214                progress_queue: Vec::new(),
215            }
216        }
217    }
218}
219
220/// Methods for recovering update streams from binary bundles.
221pub mod source {
222
223    use super::{Message, Progress};
224    use crate::{lattice::Lattice, ExchangeData};
225    use std::cell::RefCell;
226    use std::hash::Hash;
227    use std::rc::Rc;
228    use std::marker::{Send, Sync};
229    use std::sync::Arc;
230    use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
231    use timely::progress::Timestamp;
232    use timely::scheduling::SyncActivator;
233
234    // TODO(guswynn): implement this generally in timely
235    struct DropActivator {
236        activator: Arc<SyncActivator>,
237    }
238
239    impl Drop for DropActivator {
240        fn drop(&mut self) {
241            // Best effort: failure to activate
242            // is ignored
243            let _ = self.activator.activate();
244        }
245    }
246
247    /// Constructs a stream of updates from a source of messages.
248    ///
249    /// The stream is built in the supplied `scope` and continues to run until
250    /// the returned `Box<Any>` token is dropped. The `source_builder` argument
251    /// is invoked with a `SyncActivator` that will re-activate the source.
252    pub fn build<G, B, I, D, T, R>(
253        scope: G,
254        source_builder: B,
255    ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
256    where
257        G: Scope<Timestamp = T>,
258        B: FnOnce(SyncActivator) -> I,
259        I: Iterator<Item = Message<D, T, R>> + 'static,
260        D: ExchangeData + Hash,
261        T: ExchangeData + Hash + Timestamp + Lattice,
262        R: ExchangeData + Hash,
263    {
264        // Read messages are either updates or progress messages.
265        // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
266        // This includes both emitting updates, and setting expectations for update counts.
267        //
268        // Updates need to be deduplicated by (data, time), and we should exchange them by such.
269        // Progress needs to be deduplicated by time, and we should exchange them by such.
270        //
271        // The first cut of this is a dataflow graph that looks like (flowing downward)
272        //
273        // 1. MESSAGES:
274        //      Reads `Message` stream; maintains capabilities.
275        //      Sends `Updates` to UPDATES stage by hash((data, time, diff)).
276        //      Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
277        //      Shares capabilities with downstream operator.
278        // 2. UPDATES:
279        //      Maintains and deduplicates updates.
280        //      Ships updates once frontier advances.
281        //      Ships counts to PROGRESS stage, by hash(time).
282        // 3. PROGRESS:
283        //      Maintains outstanding message counts by time. Tracks frontiers.
284        //      Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
285        // 4. FEEDBACK:
286        //      Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
287        //
288        // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
289        // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
290        // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
291        // broadcast things are meant to be very reduced data.
292
293        use crate::hashable::Hashable;
294        use timely::dataflow::channels::pact::Exchange;
295        use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
296        use timely::progress::frontier::MutableAntichain;
297        use timely::progress::ChangeBatch;
298
299        // Some message distribution logic depends on the number of workers.
300        let workers = scope.peers();
301
302        let mut token = None;
303        // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
304        let mut antichain = MutableAntichain::new();
305        antichain.update_iter(Some((T::minimum(), workers as i64)));
306        let shared_frontier = Rc::new(RefCell::new(antichain));
307        let shared_frontier2 = shared_frontier.clone();
308
309        // Step 1: The MESSAGES operator.
310        let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
311        let address = messages_op.operator_info().address;
312        let activator = scope.sync_activator_for(address.to_vec());
313        let activator2 = scope.activator_for(Rc::clone(&address));
314        let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
315        let mut source = source_builder(activator);
316        let (mut updates_out, updates) = messages_op.new_output();
317        let (mut progress_out, progress) = messages_op.new_output();
318        messages_op.build(|capabilities| {
319
320            // A Weak that communicates whether the returned token has been dropped.
321            let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
322
323            token = Some(drop_activator);
324
325            // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
326            let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
327            let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
328            // Capture the shared frontier to read out frontier updates to apply.
329            let local_frontier = shared_frontier.clone();
330            //
331            move |_frontiers| {
332                // First check to ensure that we haven't been terminated by someone dropping our tokens.
333                if drop_activator_weak.upgrade().is_none() {
334                    // Give up our capabilities
335                    updates_caps.downgrade(&[]);
336                    progress_caps.downgrade(&[]);
337                    // never continue, even if we are (erroneously) activated again.
338                    return;
339                }
340
341                // Consult our shared frontier, and ensure capabilities are downgraded to it.
342                let shared_frontier = local_frontier.borrow();
343                updates_caps.downgrade(&shared_frontier.frontier());
344                progress_caps.downgrade(&shared_frontier.frontier());
345
346                // Next check to see if we have been terminated by the source being complete.
347                if !updates_caps.is_empty() && !progress_caps.is_empty() {
348                    let mut updates = updates_out.activate();
349                    let mut progress = progress_out.activate();
350
351                    // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
352                    // Specifically, there may not be one capability valid for all updates.
353                    let mut updates_session = updates.session(&updates_caps[0]);
354                    let mut progress_session = progress.session(&progress_caps[0]);
355
356                    // We presume the iterator will yield if appropriate.
357                    for message in source.by_ref() {
358                        match message {
359                            Message::Updates(mut updates) => {
360                                updates_session.give_container(&mut updates);
361                            }
362                            Message::Progress(progress) => {
363                                // We must send a copy of each progress message to all workers,
364                                // but we can partition the counts across workers by timestamp.
365                                let mut to_worker = vec![Vec::new(); workers];
366                                for (time, count) in progress.counts {
367                                    to_worker[(time.hashed() as usize) % workers]
368                                        .push((time, count));
369                                }
370                                for (worker, counts) in to_worker.into_iter().enumerate() {
371                                    progress_session.give((
372                                        worker,
373                                        Progress {
374                                            lower: progress.lower.clone(),
375                                            upper: progress.upper.clone(),
376                                            counts,
377                                        },
378                                    ));
379                                }
380                            }
381                        }
382                    }
383                }
384            }
385        });
386
387        // Step 2: The UPDATES operator.
388        let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
389        let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
390        let (mut changes_out, changes) = updates_op.new_output();
391        let (mut counts_out, counts) = updates_op.new_output();
392        updates_op.build(move |_capability| {
393            // Deduplicates updates, and ships novel updates and the counts for each time.
394            // For simplicity, this operator ships updates as they are discovered to be new.
395            // This has the defect that on load we may have two copies of the data (shipped,
396            // and here for deduplication).
397            //
398            // Filters may be pushed ahead of this operator, but because of deduplication we
399            // may not push projections ahead of this operator (at least, not without fields
400            // that are known to form keys, and even then only carefully).
401            let mut pending = std::collections::HashMap::new();
402            let mut change_batch = ChangeBatch::<T>::new();
403            move |frontiers| {
404                // Thin out deduplication buffer.
405                // This is the moment in a more advanced implementation where we might send
406                // the data for the first time, maintaining only one copy of each update live
407                // at a time in the system.
408                pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
409
410                // Deduplicate newly received updates, sending new updates and timestamp counts.
411                let mut changes = changes_out.activate();
412                let mut counts = counts_out.activate();
413                while let Some((capability, updates)) = input.next() {
414                    let mut changes_session = changes.session(&capability);
415                    let mut counts_session = counts.session(&capability);
416                    for (data, time, diff) in updates.iter() {
417                        if frontiers[0].less_equal(time) {
418                            if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
419                                assert_eq!(&prior, diff);
420                            } else {
421                                change_batch.update(time.clone(), -1);
422                                changes_session.give((data.clone(), time.clone(), diff.clone()));
423                            }
424                        }
425                    }
426                    if !change_batch.is_empty() {
427                        counts_session.give_iterator(change_batch.drain());
428                    }
429                }
430            }
431        });
432
433        // Step 3: The PROGRESS operator.
434        let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
435        let mut input = progress_op.new_input(
436            &progress,
437            Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
438        );
439        let mut counts =
440            progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
441        let (mut frontier_out, frontier) = progress_op.new_output();
442        progress_op.build(move |_capability| {
443            // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
444
445            use timely::order::PartialOrder;
446            use timely::progress::{frontier::AntichainRef, Antichain};
447
448            let mut progress_queue = Vec::new();
449            let mut progress_frontier = Antichain::from_elem(T::minimum());
450            let mut updates_frontier = MutableAntichain::new();
451            let mut reported_frontier = Antichain::from_elem(T::minimum());
452
453            move |_frontiers| {
454                let mut frontier = frontier_out.activate();
455
456                // If the frontier changes we need a capability to express that.
457                // Any capability should work; the downstream listener doesn't care.
458                let mut capability: Option<Capability<T>> = None;
459
460                // Drain all relevant update counts in to the mutable antichain tracking its frontier.
461                while let Some((cap, counts)) = counts.next() {
462                    updates_frontier.update_iter(counts.iter().cloned());
463                    capability = Some(cap.retain());
464                }
465                // Drain all progress statements into the queue out of which we will work.
466                while let Some((cap, progress)) = input.next() {
467                    progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
468                    capability = Some(cap.retain());
469                }
470
471                // Extract and act on actionable progress messages.
472                // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
473                while let Some(position) = progress_queue.iter().position(|p| {
474                    <_ as PartialOrder>::less_equal(
475                        &AntichainRef::new(&p.lower),
476                        &progress_frontier.borrow(),
477                    )
478                }) {
479                    // Extract progress statement.
480                    let mut progress = progress_queue.remove(position);
481                    // Discard counts that have already been incorporated.
482                    progress
483                        .counts
484                        .retain(|(time, _count)| progress_frontier.less_equal(time));
485                    // Record any new reports of expected counts.
486                    updates_frontier
487                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
488                    // Extend self.progress_frontier by progress.upper.
489                    let mut new_frontier = Antichain::new();
490                    for time1 in progress.upper {
491                        for time2 in progress_frontier.elements() {
492                            new_frontier.insert(time1.join(time2));
493                        }
494                    }
495                    progress_frontier = new_frontier;
496                }
497
498                // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
499                let mut lower_bound = progress_frontier.clone();
500                lower_bound.extend(updates_frontier.frontier().iter().cloned());
501                if lower_bound != reported_frontier {
502                    let capability =
503                        capability.expect("Changes occurred, without surfacing a capability");
504                    let mut changes = ChangeBatch::new();
505                    changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
506                    changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
507                    let mut frontier_session = frontier.session(&capability);
508                    for peer in 0..workers {
509                        frontier_session.give((peer, changes.clone()));
510                    }
511                    reported_frontier = lower_bound.clone();
512                }
513            }
514        });
515
516        // Step 4: The FEEDBACK operator.
517        let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
518        let mut input = feedback_op.new_input(
519            &frontier,
520            Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
521        );
522        feedback_op.build(move |_capability| {
523            // Receive frontier changes and share the net result with MESSAGES.
524            move |_frontiers| {
525                let mut antichain = shared_frontier2.borrow_mut();
526                let mut must_activate = false;
527                while let Some((_cap, frontier_changes)) = input.next() {
528                    for (_self, input_changes) in frontier_changes.iter() {
529                        // Apply the updates, and observe if the lower bound has changed.
530                        if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
531                            must_activate = true;
532                        }
533                    }
534                }
535                // If the lower bound has changed, we must activate MESSAGES.
536                if must_activate { activator2.activate(); }
537            }
538        });
539
540        (Box::new(token.unwrap()), changes)
541    }
542}
543
544/// Methods for recording update streams to binary bundles.
545pub mod sink {
546
547    use std::hash::Hash;
548    use std::cell::RefCell;
549    use std::rc::Weak;
550
551    use serde::{Deserialize, Serialize};
552
553    use timely::order::PartialOrder;
554    use timely::progress::{Antichain, ChangeBatch, Timestamp};
555    use timely::dataflow::{Scope, Stream};
556    use timely::dataflow::channels::pact::{Exchange, Pipeline};
557    use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder};
558
559    use crate::{lattice::Lattice, ExchangeData};
560    use super::{Writer, Message, Progress};
561
562    /// Constructs a sink, for recording the updates in `stream`.
563    ///
564    /// It is crucial that `stream` has been consolidated before this method, which
565    /// will *not* perform the consolidation on the stream's behalf. If this is not
566    /// performed before calling the method, the recorded output may not be correctly
567    /// reconstructed by readers.
568    pub fn build<G, BS, D, T, R>(
569        stream: &Stream<G, (D, T, R)>,
570        sink_hash: u64,
571        updates_sink: Weak<RefCell<BS>>,
572        progress_sink: Weak<RefCell<BS>>,
573    ) where
574        G: Scope<Timestamp = T>,
575        BS: Writer<Message<D,T,R>> + 'static,
576        D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
577        T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
578        R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
579    {
580        // First we record the updates that stream in.
581        // We can simply record all updates, under the presumption that the have been consolidated
582        // and so any record we see is in fact guaranteed to happen.
583        let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
584        let reactivator = stream.scope().activator_for(builder.operator_info().address);
585        let mut input = builder.new_input(stream, Pipeline);
586        let (mut updates_out, updates) = builder.new_output();
587
588        builder.build_reschedule(
589            move |_capability| {
590                let mut timestamps = <ChangeBatch<_>>::new();
591                let mut send_queue = std::collections::VecDeque::new();
592                move |_frontiers| {
593                    let mut output = updates_out.activate();
594
595                    // We want to drain inputs always...
596                    input.for_each(|capability, updates| {
597                        // Write each update out, and record the timestamp.
598                        for (_data, time, _diff) in updates.iter() {
599                            timestamps.update(time.clone(), 1);
600                        }
601
602                        // Now record the update to the writer.
603                        send_queue.push_back(Message::Updates(std::mem::take(updates)));
604
605                        // Transmit timestamp counts downstream.
606                        output
607                            .session(&capability)
608                            .give_iterator(timestamps.drain());
609                    });
610
611                    // Drain whatever we can from the queue of bytes to send.
612                    // ... but needn't do anything more if our sink is closed.
613                    if let Some(sink) = updates_sink.upgrade() {
614                        let mut sink = sink.borrow_mut();
615                        while let Some(message) = send_queue.front() {
616                            if let Some(duration) = sink.poll(message) {
617                                // Reschedule after `duration` and then bail.
618                                reactivator.activate_after(duration);
619                                return true;
620                            } else {
621                                send_queue.pop_front();
622                            }
623                        }
624                        // Signal incompleteness if messages remain to be sent.
625                        !sink.done() || !send_queue.is_empty()
626                    } else {
627                        // We have been terminated, but may still receive indefinite data.
628                        send_queue.clear();
629                        // Signal that there are no outstanding writes.
630                        false
631                    }
632                }
633            },
634        );
635
636        // We use a lower-level builder here to get access to the operator address, for rescheduling.
637        let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
638        let reactivator = stream.scope().activator_for(builder.operator_info().address);
639        let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
640        let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
641
642        // We now record the numbers of updates at each timestamp between lower and upper bounds.
643        // Track the advancing frontier, to know when to produce utterances.
644        let mut frontier = Antichain::from_elem(T::minimum());
645        // Track accumulated counts for timestamps.
646        let mut timestamps = <ChangeBatch<_>>::new();
647        // Stash for serialized data yet to send.
648        let mut send_queue = std::collections::VecDeque::new();
649        let mut retain = Vec::new();
650
651        builder.build_reschedule(|_capabilities| {
652            move |frontiers| {
653                let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]);
654
655                // We want to drain inputs no matter what.
656                // We could do this after the next step, as we are certain these timestamps will
657                // not be part of a closed frontier (as they have not yet been read). This has the
658                // potential to make things speedier as we scan less and keep a smaller footprint.
659                input.for_each(|_capability, counts| {
660                    timestamps.extend(counts.iter().cloned());
661                });
662
663                if should_write {
664                    if let Some(sink) = progress_sink.upgrade() {
665                        let mut sink = sink.borrow_mut();
666
667                        // If our frontier advances strictly, we have the opportunity to issue a progress statement.
668                        if <_ as PartialOrder>::less_than(
669                            &frontier.borrow(),
670                            &input.frontier.frontier(),
671                        ) {
672                            let new_frontier = input.frontier.frontier();
673
674                            // Extract the timestamp counts to announce.
675                            let mut announce = Vec::new();
676                            for (time, count) in timestamps.drain() {
677                                if !new_frontier.less_equal(&time) {
678                                    announce.push((time, count as usize));
679                                } else {
680                                    retain.push((time, count));
681                                }
682                            }
683                            timestamps.extend(retain.drain(..));
684
685                            // Announce the lower bound, upper bound, and timestamp counts.
686                            let progress = Progress {
687                                lower: frontier.elements().to_vec(),
688                                upper: new_frontier.to_vec(),
689                                counts: announce,
690                            };
691                            send_queue.push_back(Message::Progress(progress));
692
693                            // Advance our frontier to track our progress utterance.
694                            frontier = input.frontier.frontier().to_owned();
695
696                            while let Some(message) = send_queue.front() {
697                                if let Some(duration) = sink.poll(message) {
698                                    // Reschedule after `duration` and then bail.
699                                    reactivator.activate_after(duration);
700                                    // Signal that work remains to be done.
701                                    return true;
702                                } else {
703                                    send_queue.pop_front();
704                                }
705                            }
706                        }
707                        // Signal incompleteness if messages remain to be sent.
708                        !sink.done() || !send_queue.is_empty()
709                    } else {
710                        timestamps.clear();
711                        send_queue.clear();
712                        // Signal that there are no outstanding writes.
713                        false
714                    }
715                } else { false }
716            }
717        });
718    }
719}
720
721// pub mod kafka {
722
723//     use serde::{Serialize, Deserialize};
724//     use timely::scheduling::SyncActivator;
725//     use rdkafka::{ClientContext, config::ClientConfig};
726//     use rdkafka::consumer::{BaseConsumer, ConsumerContext};
727//     use rdkafka::error::{KafkaError, RDKafkaError};
728//     use super::BytesSink;
729
730//     use std::hash::Hash;
731//     use timely::progress::Timestamp;
732//     use timely::dataflow::{Scope, Stream};
733//     use crate::ExchangeData;
734//     use crate::lattice::Lattice;
735
736//     /// Creates a Kafka source from supplied configuration information.
737//     pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
738//     where
739//         G: Scope<Timestamp = T>,
740//         D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
741//         T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
742//         R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
743//     {
744//         super::source::build(scope, |activator| {
745//             let source = KafkaSource::new(addr, topic, group, activator);
746//             // An iterator combinator that yields every "duration" even if more items exist.
747//             // The implementation of such an iterator exists in the git history, or can be rewritten easily.
748//             super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
749//         })
750//     }
751
752//     pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
753//     where
754//         G: Scope<Timestamp = T>,
755//         D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
756//         T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
757//         R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
758//     {
759//         use std::rc::Rc;
760//         use std::cell::RefCell;
761//         use crate::hashable::Hashable;
762
763//         let sink = KafkaSink::new(addr, topic);
764//         let result = Rc::new(RefCell::new(sink));
765//         let sink_hash = (addr.to_string(), topic.to_string()).hashed();
766//         super::sink::build(
767//             &stream,
768//             sink_hash,
769//             Rc::downgrade(&result),
770//             Rc::downgrade(&result),
771//         );
772//         Box::new(result)
773
774//     }
775
776//     pub struct KafkaSource {
777//         consumer: BaseConsumer<ActivationConsumerContext>,
778//     }
779
780//     impl KafkaSource {
781//         pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
782//             let mut kafka_config = ClientConfig::new();
783//             // This is mostly cargo-cult'd in from `source/kafka.rs`.
784//             kafka_config.set("bootstrap.servers", &addr.to_string());
785//             kafka_config
786//                 .set("enable.auto.commit", "false")
787//                 .set("auto.offset.reset", "earliest");
788
789//             kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
790//             kafka_config.set("fetch.message.max.bytes", "134217728");
791//             kafka_config.set("group.id", group);
792//             kafka_config.set("isolation.level", "read_committed");
793//             let activator = ActivationConsumerContext(activator);
794//             let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
795//             use rdkafka::consumer::Consumer;
796//             consumer.subscribe(&[topic]).unwrap();
797//             Self {
798//                 consumer,
799//             }
800//         }
801//     }
802
803//     pub struct Iter<D, T, R> {
804//         pub source: KafkaSource,
805//         phantom: std::marker::PhantomData<(D, T, R)>,
806//     }
807
808//     impl<D, T, R> Iter<D, T, R> {
809//         /// Constructs a new iterator from a bytes source.
810//         pub fn new_from(source: KafkaSource) -> Self {
811//             Self {
812//                 source,
813//                 phantom: std::marker::PhantomData,
814//             }
815//         }
816//     }
817
818//     impl<D, T, R> Iterator for Iter<D, T, R>
819//     where
820//         D: for<'a>Deserialize<'a>,
821//         T: for<'a>Deserialize<'a>,
822//         R: for<'a>Deserialize<'a>,
823//     {
824//         type Item = super::Message<D, T, R>;
825//         fn next(&mut self) -> Option<Self::Item> {
826//             use rdkafka::message::Message;
827//             self.source
828//                 .consumer
829//                 .poll(std::time::Duration::from_millis(0))
830//                 .and_then(|result| result.ok())
831//                 .and_then(|message| {
832//                     message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
833//                 })
834//         }
835//     }
836
837//     /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
838//     /// when the message queue switches from nonempty to empty.
839//     struct ActivationConsumerContext(SyncActivator);
840
841//     impl ClientContext for ActivationConsumerContext { }
842
843//     impl ActivationConsumerContext {
844//         fn activate(&self) {
845//             self.0.activate().unwrap();
846//         }
847//     }
848
849//     impl ConsumerContext for ActivationConsumerContext {
850//         fn message_queue_nonempty_callback(&self) {
851//             self.activate();
852//         }
853//     }
854
855//     use std::time::Duration;
856//     use rdkafka::producer::DefaultProducerContext;
857//     use rdkafka::producer::{BaseRecord, ThreadedProducer};
858
859//     pub struct KafkaSink {
860//         topic: String,
861//         producer: ThreadedProducer<DefaultProducerContext>,
862//     }
863
864//     impl KafkaSink {
865//         pub fn new(addr: &str, topic: &str) -> Self {
866//             let mut config = ClientConfig::new();
867//             config.set("bootstrap.servers", &addr);
868//             config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
869//             config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
870//             config.set("queue.buffering.max.ms", &format!("{}", 10));
871//             let producer = config
872//                 .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
873//                 .expect("creating kafka producer for kafka sinks failed");
874//             Self {
875//                 producer,
876//                 topic: topic.to_string(),
877//             }
878//         }
879//     }
880
881//     impl BytesSink for KafkaSink {
882//         fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
883//             let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
884
885//             self.producer.send(record).err().map(|(e, _)| {
886//                 if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
887//                     Duration::from_secs(1)
888//                 } else {
889//                     // TODO(frank): report this error upwards so the user knows the sink is dead.
890//                     Duration::from_secs(1)
891//                 }
892//             })
893//         }
894//         fn done(&self) -> bool {
895//             self.producer.in_flight_count() == 0
896//         }
897//     }
898
899// }