differential_dataflow/
capture.rs

1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use std::time::Duration;
13use serde::{Deserialize, Serialize};
14
15/// A message in the CDC V2 protocol.
16#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
17pub enum Message<D, T, R> {
18    /// A batch of updates that are certain to occur.
19    ///
20    /// Each triple is an irrevocable statement about a change that occurs.
21    /// Each statement contains a datum, a time, and a difference, and asserts
22    /// that the multiplicity of the datum changes at the time by the difference.
23    Updates(Vec<(D, T, R)>),
24    /// An irrevocable statement about the number of updates within a time interval.
25    Progress(Progress<T>),
26}
27
28/// An irrevocable statement about the number of updates at times within an interval.
29///
30/// This statement covers all times beyond `lower` and not beyond `upper`.
31/// Each element of `counts` is an irrevocable statement about the exact number of
32/// distinct updates that occur at that time.
33/// Times not present in `counts` have a count of zero.
34#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
35pub struct Progress<T> {
36    /// The lower bound of times contained in this statement.
37    pub lower: Vec<T>,
38    /// The upper bound of times contained in this statement.
39    pub upper: Vec<T>,
40    /// All non-zero counts for times beyond `lower` and not beyond `upper`.
41    pub counts: Vec<(T, usize)>,
42}
43
44/// A simple sink for byte slices.
45pub trait Writer<T> {
46    /// Returns an amount of time to wait before retrying, or `None` for success.
47    fn poll(&mut self, item: &T) -> Option<Duration>;
48    /// Indicates if the sink has committed all sent data and can be safely dropped.
49    fn done(&self) -> bool;
50}
51
52/// A deduplicating, re-ordering iterator.
53pub mod iterator {
54
55    use super::{Message, Progress};
56    use crate::lattice::Lattice;
57    use std::hash::Hash;
58    use timely::order::PartialOrder;
59    use timely::progress::{
60        frontier::{AntichainRef, MutableAntichain},
61        Antichain,
62        Timestamp,
63    };
64
65    /// A direct implementation of a deduplicating, re-ordering iterator.
66    ///
67    /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
68    /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
69    /// footprint, proportional to the mismatch between the received updates and progress messages.
70    pub struct Iter<I, D, T, R>
71    where
72        I: Iterator<Item = Message<D, T, R>>,
73        T: Hash + Ord + Lattice + Clone,
74        D: Hash + Eq,
75        T: Hash + Eq,
76        R: Hash + Eq,
77    {
78        /// Source of potentially duplicated, out of order cdc_v2 messages.
79        iterator: I,
80        /// Updates that have been received, but are still beyond `reported_frontier`.
81        ///
82        /// These updates are retained both so that they can eventually be transmitted,
83        /// but also so that they can deduplicate updates that may still be received.
84        updates: std::collections::HashSet<(D, T, R)>,
85        /// Frontier through which the iterator has reported updates.
86        ///
87        /// All updates not beyond this frontier have been reported.
88        /// Any information related to times not beyond this frontier can be discarded.
89        ///
90        /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
91        /// our two bounds on potential uncertainty in progress and update messages.
92        reported_frontier: Antichain<T>,
93        /// Frontier of accepted progress statements.
94        ///
95        /// All progress message counts for times not beyond this frontier have been
96        /// incorporated in to `messages_frontier`. This frontier also guides which
97        /// received progress statements can be incorporated: those whose for which
98        /// this frontier is beyond their lower bound.
99        progress_frontier: Antichain<T>,
100        /// Counts of outstanding messages at times.
101        ///
102        /// These counts track the difference between message counts at times announced
103        /// by progress messages, and message counts at times received in distinct updates.
104        messages_frontier: MutableAntichain<T>,
105        /// Progress statements that are not yet actionable due to out-of-Iterness.
106        ///
107        /// A progress statement becomes actionable once the progress frontier is beyond
108        /// its lower frontier. This ensures that the [0, lower) interval is already
109        /// incorporated, and that we will not leave a gap by incorporating the counts
110        /// and reflecting the progress statement's upper frontier.
111        progress_queue: Vec<Progress<T>>,
112    }
113
114    impl<D, T, R, I> Iterator for Iter<I, D, T, R>
115    where
116        I: Iterator<Item = Message<D, T, R>>,
117        T: Hash + Ord + Lattice + Clone,
118        D: Hash + Eq + Clone,
119        R: Hash + Eq + Clone,
120    {
121        type Item = (Vec<(D, T, R)>, Antichain<T>);
122        fn next(&mut self) -> Option<Self::Item> {
123            // Each call to `next` should return some newly carved interval of time.
124            // As such, we should read from our source until we find such a thing.
125            //
126            // An interval can be completed once our frontier of received progress
127            // information and our frontier of unresolved counts have advanced.
128            while let Some(message) = self.iterator.next() {
129                match message {
130                    Message::Updates(mut updates) => {
131                        // Discard updates at reported times, or duplicates at unreported times.
132                        updates.retain(|dtr| {
133                            self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
134                        });
135                        // Decrement our counts of accounted-for messages.
136                        self.messages_frontier
137                            .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
138                        // Record the messages in our de-duplication collection.
139                        self.updates.extend(updates.into_iter());
140                    }
141                    Message::Progress(progress) => {
142                        // A progress statement may not be immediately actionable.
143                        self.progress_queue.push(progress);
144                    }
145                }
146
147                // Attempt to drain actionable progress messages.
148                // A progress message is actionable if `self.progress_frontier` is greater or
149                // equal to the message's lower bound.
150                while let Some(position) = self.progress_queue.iter().position(|p| {
151                    <_ as PartialOrder>::less_equal(
152                        &AntichainRef::new(&p.lower),
153                        &self.progress_frontier.borrow(),
154                    )
155                }) {
156                    let mut progress = self.progress_queue.remove(position);
157                    // Discard counts that have already been incorporated.
158                    progress
159                        .counts
160                        .retain(|(time, _count)| self.progress_frontier.less_equal(time));
161                    // Record any new reports of expected counts.
162                    self.messages_frontier
163                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
164                    // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
165                    let mut new_frontier = Antichain::new();
166                    for time1 in progress.upper {
167                        for time2 in self.progress_frontier.elements() {
168                            new_frontier.insert(time1.join(time2));
169                        }
170                    }
171                    self.progress_queue.retain(|p| {
172                        !<_ as PartialOrder>::less_equal(
173                            &AntichainRef::new(&p.upper),
174                            &new_frontier.borrow(),
175                        )
176                    });
177                    self.progress_frontier = new_frontier;
178                }
179
180                // Now check and see if our lower bound exceeds `self.reported_frontier`.
181                let mut lower_bound = self.progress_frontier.clone();
182                lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
183                if lower_bound != self.reported_frontier {
184                    let to_publish = self
185                        .updates
186                        .iter()
187                        .filter(|(_, t, _)| !lower_bound.less_equal(t))
188                        .cloned()
189                        .collect::<Vec<_>>();
190                    self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
191                    self.reported_frontier = lower_bound.clone();
192                    return Some((to_publish, lower_bound));
193                }
194            }
195            None
196        }
197    }
198
199    impl<D, T, R, I> Iter<I, D, T, R>
200    where
201        I: Iterator<Item = Message<D, T, R>>,
202        T: Hash + Ord + Lattice + Clone + Timestamp,
203        D: Hash + Eq + Clone,
204        R: Hash + Eq + Clone,
205    {
206        /// Construct a new re-ordering, deduplicating iterator.
207        pub fn new(iterator: I) -> Self {
208            Self {
209                iterator,
210                updates: std::collections::HashSet::new(),
211                reported_frontier: Antichain::from_elem(T::minimum()),
212                progress_frontier: Antichain::from_elem(T::minimum()),
213                messages_frontier: MutableAntichain::new(),
214                progress_queue: Vec::new(),
215            }
216        }
217    }
218}
219
220/// Methods for recovering update streams from binary bundles.
221pub mod source {
222
223    use super::{Message, Progress};
224    use crate::{lattice::Lattice, ExchangeData};
225    use std::cell::RefCell;
226    use std::hash::Hash;
227    use std::rc::Rc;
228    use std::marker::{Send, Sync};
229    use std::sync::Arc;
230    use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
231    use timely::dataflow::operators::generic::OutputBuilder;
232    use timely::progress::Timestamp;
233    use timely::scheduling::SyncActivator;
234
235    // TODO(guswynn): implement this generally in timely
236    struct DropActivator {
237        activator: Arc<SyncActivator>,
238    }
239
240    impl Drop for DropActivator {
241        fn drop(&mut self) {
242            // Best effort: failure to activate
243            // is ignored
244            let _ = self.activator.activate();
245        }
246    }
247
248    /// Constructs a stream of updates from a source of messages.
249    ///
250    /// The stream is built in the supplied `scope` and continues to run until
251    /// the returned `Box<Any>` token is dropped. The `source_builder` argument
252    /// is invoked with a `SyncActivator` that will re-activate the source.
253    pub fn build<G, B, I, D, T, R>(
254        scope: G,
255        source_builder: B,
256    ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
257    where
258        G: Scope<Timestamp = T>,
259        B: FnOnce(SyncActivator) -> I,
260        I: Iterator<Item = Message<D, T, R>> + 'static,
261        D: ExchangeData + Hash,
262        T: ExchangeData + Hash + Timestamp + Lattice,
263        R: ExchangeData + Hash,
264    {
265        // Read messages are either updates or progress messages.
266        // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
267        // This includes both emitting updates, and setting expectations for update counts.
268        //
269        // Updates need to be deduplicated by (data, time), and we should exchange them by such.
270        // Progress needs to be deduplicated by time, and we should exchange them by such.
271        //
272        // The first cut of this is a dataflow graph that looks like (flowing downward)
273        //
274        // 1. MESSAGES:
275        //      Reads `Message` stream; maintains capabilities.
276        //      Sends `Updates` to UPDATES stage by hash((data, time, diff)).
277        //      Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
278        //      Shares capabilities with downstream operator.
279        // 2. UPDATES:
280        //      Maintains and deduplicates updates.
281        //      Ships updates once frontier advances.
282        //      Ships counts to PROGRESS stage, by hash(time).
283        // 3. PROGRESS:
284        //      Maintains outstanding message counts by time. Tracks frontiers.
285        //      Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
286        // 4. FEEDBACK:
287        //      Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
288        //
289        // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
290        // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
291        // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
292        // broadcast things are meant to be very reduced data.
293
294        use crate::hashable::Hashable;
295        use timely::dataflow::channels::pact::Exchange;
296        use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
297        use timely::progress::frontier::MutableAntichain;
298        use timely::progress::ChangeBatch;
299
300        // Some message distribution logic depends on the number of workers.
301        let workers = scope.peers();
302
303        let mut token = None;
304        // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
305        let mut antichain = MutableAntichain::new();
306        antichain.update_iter(Some((T::minimum(), workers as i64)));
307        let shared_frontier = Rc::new(RefCell::new(antichain));
308        let shared_frontier2 = shared_frontier.clone();
309
310        // Step 1: The MESSAGES operator.
311        let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
312        let address = messages_op.operator_info().address;
313        let activator = scope.sync_activator_for(address.to_vec());
314        let activator2 = scope.activator_for(Rc::clone(&address));
315        let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
316        let mut source = source_builder(activator);
317        let (updates_out, updates) = messages_op.new_output();
318        let mut updates_out = OutputBuilder::from(updates_out);
319        let (progress_out, progress) = messages_op.new_output();
320        let mut progress_out = OutputBuilder::from(progress_out);
321
322        messages_op.build(|capabilities| {
323
324            // A Weak that communicates whether the returned token has been dropped.
325            let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
326
327            token = Some(drop_activator);
328
329            // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
330            let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
331            let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
332            // Capture the shared frontier to read out frontier updates to apply.
333            let local_frontier = shared_frontier.clone();
334            //
335            move |_frontiers| {
336                // First check to ensure that we haven't been terminated by someone dropping our tokens.
337                if drop_activator_weak.upgrade().is_none() {
338                    // Give up our capabilities
339                    updates_caps.downgrade(&[]);
340                    progress_caps.downgrade(&[]);
341                    // never continue, even if we are (erroneously) activated again.
342                    return;
343                }
344
345                // Consult our shared frontier, and ensure capabilities are downgraded to it.
346                let shared_frontier = local_frontier.borrow();
347                updates_caps.downgrade(&shared_frontier.frontier());
348                progress_caps.downgrade(&shared_frontier.frontier());
349
350                // Next check to see if we have been terminated by the source being complete.
351                if !updates_caps.is_empty() && !progress_caps.is_empty() {
352                    let mut updates = updates_out.activate();
353                    let mut progress = progress_out.activate();
354
355                    // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
356                    // Specifically, there may not be one capability valid for all updates.
357                    let mut updates_session = updates.session(&updates_caps[0]);
358                    let mut progress_session = progress.session(&progress_caps[0]);
359
360                    // We presume the iterator will yield if appropriate.
361                    for message in source.by_ref() {
362                        match message {
363                            Message::Updates(mut updates) => {
364                                updates_session.give_container(&mut updates);
365                            }
366                            Message::Progress(progress) => {
367                                // We must send a copy of each progress message to all workers,
368                                // but we can partition the counts across workers by timestamp.
369                                let mut to_worker = vec![Vec::new(); workers];
370                                for (time, count) in progress.counts {
371                                    to_worker[(time.hashed() as usize) % workers]
372                                        .push((time, count));
373                                }
374                                for (worker, counts) in to_worker.into_iter().enumerate() {
375                                    progress_session.give((
376                                        worker,
377                                        Progress {
378                                            lower: progress.lower.clone(),
379                                            upper: progress.upper.clone(),
380                                            counts,
381                                        },
382                                    ));
383                                }
384                            }
385                        }
386                    }
387                }
388            }
389        });
390
391        // Step 2: The UPDATES operator.
392        let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
393        let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
394        let (changes_out, changes) = updates_op.new_output();
395        let mut changes_out = OutputBuilder::from(changes_out);
396        let (counts_out, counts) = updates_op.new_output();
397        let mut counts_out = OutputBuilder::from(counts_out);
398
399        updates_op.build(move |_capability| {
400            // Deduplicates updates, and ships novel updates and the counts for each time.
401            // For simplicity, this operator ships updates as they are discovered to be new.
402            // This has the defect that on load we may have two copies of the data (shipped,
403            // and here for deduplication).
404            //
405            // Filters may be pushed ahead of this operator, but because of deduplication we
406            // may not push projections ahead of this operator (at least, not without fields
407            // that are known to form keys, and even then only carefully).
408            let mut pending = std::collections::HashMap::new();
409            let mut change_batch = ChangeBatch::<T>::new();
410            move |frontiers| {
411                // Thin out deduplication buffer.
412                // This is the moment in a more advanced implementation where we might send
413                // the data for the first time, maintaining only one copy of each update live
414                // at a time in the system.
415                pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
416
417                // Deduplicate newly received updates, sending new updates and timestamp counts.
418                let mut changes = changes_out.activate();
419                let mut counts = counts_out.activate();
420                while let Some((capability, updates)) = input.next() {
421                    let mut changes_session = changes.session(&capability);
422                    let mut counts_session = counts.session(&capability);
423                    for (data, time, diff) in updates.iter() {
424                        if frontiers[0].less_equal(time) {
425                            if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
426                                assert_eq!(&prior, diff);
427                            } else {
428                                change_batch.update(time.clone(), -1);
429                                changes_session.give((data.clone(), time.clone(), diff.clone()));
430                            }
431                        }
432                    }
433                    if !change_batch.is_empty() {
434                        counts_session.give_iterator(change_batch.drain());
435                    }
436                }
437            }
438        });
439
440        // Step 3: The PROGRESS operator.
441        let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
442        let mut input = progress_op.new_input(
443            &progress,
444            Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
445        );
446        let mut counts =
447            progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
448        let (frontier_out, frontier) = progress_op.new_output();
449        let mut frontier_out = OutputBuilder::from(frontier_out);
450        progress_op.build(move |_capability| {
451            // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
452
453            use timely::order::PartialOrder;
454            use timely::progress::{frontier::AntichainRef, Antichain};
455
456            let mut progress_queue = Vec::new();
457            let mut progress_frontier = Antichain::from_elem(T::minimum());
458            let mut updates_frontier = MutableAntichain::new();
459            let mut reported_frontier = Antichain::from_elem(T::minimum());
460
461            move |_frontiers| {
462                let mut frontier = frontier_out.activate();
463
464                // If the frontier changes we need a capability to express that.
465                // Any capability should work; the downstream listener doesn't care.
466                let mut capability: Option<Capability<T>> = None;
467
468                // Drain all relevant update counts in to the mutable antichain tracking its frontier.
469                while let Some((cap, counts)) = counts.next() {
470                    updates_frontier.update_iter(counts.iter().cloned());
471                    capability = Some(cap.retain());
472                }
473                // Drain all progress statements into the queue out of which we will work.
474                while let Some((cap, progress)) = input.next() {
475                    progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
476                    capability = Some(cap.retain());
477                }
478
479                // Extract and act on actionable progress messages.
480                // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
481                while let Some(position) = progress_queue.iter().position(|p| {
482                    <_ as PartialOrder>::less_equal(
483                        &AntichainRef::new(&p.lower),
484                        &progress_frontier.borrow(),
485                    )
486                }) {
487                    // Extract progress statement.
488                    let mut progress = progress_queue.remove(position);
489                    // Discard counts that have already been incorporated.
490                    progress
491                        .counts
492                        .retain(|(time, _count)| progress_frontier.less_equal(time));
493                    // Record any new reports of expected counts.
494                    updates_frontier
495                        .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
496                    // Extend self.progress_frontier by progress.upper.
497                    let mut new_frontier = Antichain::new();
498                    for time1 in progress.upper {
499                        for time2 in progress_frontier.elements() {
500                            new_frontier.insert(time1.join(time2));
501                        }
502                    }
503                    progress_frontier = new_frontier;
504                }
505
506                // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
507                let mut lower_bound = progress_frontier.clone();
508                lower_bound.extend(updates_frontier.frontier().iter().cloned());
509                if lower_bound != reported_frontier {
510                    let capability =
511                        capability.expect("Changes occurred, without surfacing a capability");
512                    let mut changes = ChangeBatch::new();
513                    changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
514                    changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
515                    let mut frontier_session = frontier.session(&capability);
516                    for peer in 0..workers {
517                        frontier_session.give((peer, changes.clone()));
518                    }
519                    reported_frontier = lower_bound.clone();
520                }
521            }
522        });
523
524        // Step 4: The FEEDBACK operator.
525        let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
526        let mut input = feedback_op.new_input(
527            &frontier,
528            Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
529        );
530        feedback_op.build(move |_capability| {
531            // Receive frontier changes and share the net result with MESSAGES.
532            move |_frontiers| {
533                let mut antichain = shared_frontier2.borrow_mut();
534                let mut must_activate = false;
535                while let Some((_cap, frontier_changes)) = input.next() {
536                    for (_self, input_changes) in frontier_changes.iter() {
537                        // Apply the updates, and observe if the lower bound has changed.
538                        if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
539                            must_activate = true;
540                        }
541                    }
542                }
543                // If the lower bound has changed, we must activate MESSAGES.
544                if must_activate { activator2.activate(); }
545            }
546        });
547
548        (Box::new(token.unwrap()), changes)
549    }
550}
551
552/// Methods for recording update streams to binary bundles.
553pub mod sink {
554
555    use std::hash::Hash;
556    use std::cell::RefCell;
557    use std::rc::Weak;
558
559    use serde::{Deserialize, Serialize};
560
561    use timely::order::PartialOrder;
562    use timely::progress::{Antichain, ChangeBatch, Timestamp};
563    use timely::dataflow::{Scope, Stream};
564    use timely::dataflow::channels::pact::{Exchange, Pipeline};
565    use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
566
567    use crate::{lattice::Lattice, ExchangeData};
568    use super::{Writer, Message, Progress};
569
570    /// Constructs a sink, for recording the updates in `stream`.
571    ///
572    /// It is crucial that `stream` has been consolidated before this method, which
573    /// will *not* perform the consolidation on the stream's behalf. If this is not
574    /// performed before calling the method, the recorded output may not be correctly
575    /// reconstructed by readers.
576    pub fn build<G, BS, D, T, R>(
577        stream: &Stream<G, (D, T, R)>,
578        sink_hash: u64,
579        updates_sink: Weak<RefCell<BS>>,
580        progress_sink: Weak<RefCell<BS>>,
581    ) where
582        G: Scope<Timestamp = T>,
583        BS: Writer<Message<D,T,R>> + 'static,
584        D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
585        T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
586        R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
587    {
588        // First we record the updates that stream in.
589        // We can simply record all updates, under the presumption that the have been consolidated
590        // and so any record we see is in fact guaranteed to happen.
591        let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
592        let reactivator = stream.scope().activator_for(builder.operator_info().address);
593        let mut input = builder.new_input(stream, Pipeline);
594        let (updates_out, updates) = builder.new_output();
595        let mut updates_out = OutputBuilder::from(updates_out);
596
597        builder.build_reschedule(
598            move |_capability| {
599                let mut timestamps = <ChangeBatch<_>>::new();
600                let mut send_queue = std::collections::VecDeque::new();
601                move |_frontiers| {
602                    let mut output = updates_out.activate();
603
604                    // We want to drain inputs always...
605                    input.for_each(|capability, updates| {
606                        // Write each update out, and record the timestamp.
607                        for (_data, time, _diff) in updates.iter() {
608                            timestamps.update(time.clone(), 1);
609                        }
610
611                        // Now record the update to the writer.
612                        send_queue.push_back(Message::Updates(std::mem::take(updates)));
613
614                        // Transmit timestamp counts downstream.
615                        output
616                            .session(&capability)
617                            .give_iterator(timestamps.drain());
618                    });
619
620                    // Drain whatever we can from the queue of bytes to send.
621                    // ... but needn't do anything more if our sink is closed.
622                    if let Some(sink) = updates_sink.upgrade() {
623                        let mut sink = sink.borrow_mut();
624                        while let Some(message) = send_queue.front() {
625                            if let Some(duration) = sink.poll(message) {
626                                // Reschedule after `duration` and then bail.
627                                reactivator.activate_after(duration);
628                                return true;
629                            } else {
630                                send_queue.pop_front();
631                            }
632                        }
633                        // Signal incompleteness if messages remain to be sent.
634                        !sink.done() || !send_queue.is_empty()
635                    } else {
636                        // We have been terminated, but may still receive indefinite data.
637                        send_queue.clear();
638                        // Signal that there are no outstanding writes.
639                        false
640                    }
641                }
642            },
643        );
644
645        // We use a lower-level builder here to get access to the operator address, for rescheduling.
646        let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
647        let reactivator = stream.scope().activator_for(builder.operator_info().address);
648        let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
649        let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
650
651        // We now record the numbers of updates at each timestamp between lower and upper bounds.
652        // Track the advancing frontier, to know when to produce utterances.
653        let mut frontier = Antichain::from_elem(T::minimum());
654        // Track accumulated counts for timestamps.
655        let mut timestamps = <ChangeBatch<_>>::new();
656        // Stash for serialized data yet to send.
657        let mut send_queue = std::collections::VecDeque::new();
658        let mut retain = Vec::new();
659
660        builder.build_reschedule(|_capabilities| {
661            move |frontiers| {
662
663                // We want to drain inputs no matter what.
664                // We could do this after the next step, as we are certain these timestamps will
665                // not be part of a closed frontier (as they have not yet been read). This has the
666                // potential to make things speedier as we scan less and keep a smaller footprint.
667                input.for_each(|_capability, counts| {
668                    timestamps.extend(counts.iter().cloned());
669                });
670
671                if should_write {
672                    if let Some(sink) = progress_sink.upgrade() {
673                        let mut sink = sink.borrow_mut();
674
675                        // If our frontier advances strictly, we have the opportunity to issue a progress statement.
676                        if <_ as PartialOrder>::less_than(
677                            &frontier.borrow(),
678                            &frontiers[0].frontier(),
679                        ) {
680                            let new_frontier = frontiers[0].frontier();
681
682                            // Extract the timestamp counts to announce.
683                            let mut announce = Vec::new();
684                            for (time, count) in timestamps.drain() {
685                                if !new_frontier.less_equal(&time) {
686                                    announce.push((time, count as usize));
687                                } else {
688                                    retain.push((time, count));
689                                }
690                            }
691                            timestamps.extend(retain.drain(..));
692
693                            // Announce the lower bound, upper bound, and timestamp counts.
694                            let progress = Progress {
695                                lower: frontier.elements().to_vec(),
696                                upper: new_frontier.to_vec(),
697                                counts: announce,
698                            };
699                            send_queue.push_back(Message::Progress(progress));
700
701                            // Advance our frontier to track our progress utterance.
702                            frontier = frontiers[0].frontier().to_owned();
703
704                            while let Some(message) = send_queue.front() {
705                                if let Some(duration) = sink.poll(message) {
706                                    // Reschedule after `duration` and then bail.
707                                    reactivator.activate_after(duration);
708                                    // Signal that work remains to be done.
709                                    return true;
710                                } else {
711                                    send_queue.pop_front();
712                                }
713                            }
714                        }
715                        // Signal incompleteness if messages remain to be sent.
716                        !sink.done() || !send_queue.is_empty()
717                    } else {
718                        timestamps.clear();
719                        send_queue.clear();
720                        // Signal that there are no outstanding writes.
721                        false
722                    }
723                } else { false }
724            }
725        });
726    }
727}
728
729// pub mod kafka {
730
731//     use serde::{Serialize, Deserialize};
732//     use timely::scheduling::SyncActivator;
733//     use rdkafka::{ClientContext, config::ClientConfig};
734//     use rdkafka::consumer::{BaseConsumer, ConsumerContext};
735//     use rdkafka::error::{KafkaError, RDKafkaError};
736//     use super::BytesSink;
737
738//     use std::hash::Hash;
739//     use timely::progress::Timestamp;
740//     use timely::dataflow::{Scope, Stream};
741//     use crate::ExchangeData;
742//     use crate::lattice::Lattice;
743
744//     /// Creates a Kafka source from supplied configuration information.
745//     pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
746//     where
747//         G: Scope<Timestamp = T>,
748//         D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
749//         T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
750//         R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
751//     {
752//         super::source::build(scope, |activator| {
753//             let source = KafkaSource::new(addr, topic, group, activator);
754//             // An iterator combinator that yields every "duration" even if more items exist.
755//             // The implementation of such an iterator exists in the git history, or can be rewritten easily.
756//             super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
757//         })
758//     }
759
760//     pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
761//     where
762//         G: Scope<Timestamp = T>,
763//         D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
764//         T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
765//         R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
766//     {
767//         use std::rc::Rc;
768//         use std::cell::RefCell;
769//         use crate::hashable::Hashable;
770
771//         let sink = KafkaSink::new(addr, topic);
772//         let result = Rc::new(RefCell::new(sink));
773//         let sink_hash = (addr.to_string(), topic.to_string()).hashed();
774//         super::sink::build(
775//             &stream,
776//             sink_hash,
777//             Rc::downgrade(&result),
778//             Rc::downgrade(&result),
779//         );
780//         Box::new(result)
781
782//     }
783
784//     pub struct KafkaSource {
785//         consumer: BaseConsumer<ActivationConsumerContext>,
786//     }
787
788//     impl KafkaSource {
789//         pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
790//             let mut kafka_config = ClientConfig::new();
791//             // This is mostly cargo-cult'd in from `source/kafka.rs`.
792//             kafka_config.set("bootstrap.servers", &addr.to_string());
793//             kafka_config
794//                 .set("enable.auto.commit", "false")
795//                 .set("auto.offset.reset", "earliest");
796
797//             kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
798//             kafka_config.set("fetch.message.max.bytes", "134217728");
799//             kafka_config.set("group.id", group);
800//             kafka_config.set("isolation.level", "read_committed");
801//             let activator = ActivationConsumerContext(activator);
802//             let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
803//             use rdkafka::consumer::Consumer;
804//             consumer.subscribe(&[topic]).unwrap();
805//             Self {
806//                 consumer,
807//             }
808//         }
809//     }
810
811//     pub struct Iter<D, T, R> {
812//         pub source: KafkaSource,
813//         phantom: std::marker::PhantomData<(D, T, R)>,
814//     }
815
816//     impl<D, T, R> Iter<D, T, R> {
817//         /// Constructs a new iterator from a bytes source.
818//         pub fn new_from(source: KafkaSource) -> Self {
819//             Self {
820//                 source,
821//                 phantom: std::marker::PhantomData,
822//             }
823//         }
824//     }
825
826//     impl<D, T, R> Iterator for Iter<D, T, R>
827//     where
828//         D: for<'a>Deserialize<'a>,
829//         T: for<'a>Deserialize<'a>,
830//         R: for<'a>Deserialize<'a>,
831//     {
832//         type Item = super::Message<D, T, R>;
833//         fn next(&mut self) -> Option<Self::Item> {
834//             use rdkafka::message::Message;
835//             self.source
836//                 .consumer
837//                 .poll(std::time::Duration::from_millis(0))
838//                 .and_then(|result| result.ok())
839//                 .and_then(|message| {
840//                     message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
841//                 })
842//         }
843//     }
844
845//     /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
846//     /// when the message queue switches from nonempty to empty.
847//     struct ActivationConsumerContext(SyncActivator);
848
849//     impl ClientContext for ActivationConsumerContext { }
850
851//     impl ActivationConsumerContext {
852//         fn activate(&self) {
853//             self.0.activate().unwrap();
854//         }
855//     }
856
857//     impl ConsumerContext for ActivationConsumerContext {
858//         fn message_queue_nonempty_callback(&self) {
859//             self.activate();
860//         }
861//     }
862
863//     use std::time::Duration;
864//     use rdkafka::producer::DefaultProducerContext;
865//     use rdkafka::producer::{BaseRecord, ThreadedProducer};
866
867//     pub struct KafkaSink {
868//         topic: String,
869//         producer: ThreadedProducer<DefaultProducerContext>,
870//     }
871
872//     impl KafkaSink {
873//         pub fn new(addr: &str, topic: &str) -> Self {
874//             let mut config = ClientConfig::new();
875//             config.set("bootstrap.servers", &addr);
876//             config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
877//             config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
878//             config.set("queue.buffering.max.ms", &format!("{}", 10));
879//             let producer = config
880//                 .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
881//                 .expect("creating kafka producer for kafka sinks failed");
882//             Self {
883//                 producer,
884//                 topic: topic.to_string(),
885//             }
886//         }
887//     }
888
889//     impl BytesSink for KafkaSink {
890//         fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
891//             let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
892
893//             self.producer.send(record).err().map(|(e, _)| {
894//                 if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
895//                     Duration::from_secs(1)
896//                 } else {
897//                     // TODO(frank): report this error upwards so the user knows the sink is dead.
898//                     Duration::from_secs(1)
899//                 }
900//             })
901//         }
902//         fn done(&self) -> bool {
903//             self.producer.in_flight_count() == 0
904//         }
905//     }
906
907// }