differential_dataflow/capture.rs
1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use std::time::Duration;
13use serde::{Deserialize, Serialize};
14
15/// A message in the CDC V2 protocol.
16#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
17pub enum Message<D, T, R> {
18 /// A batch of updates that are certain to occur.
19 ///
20 /// Each triple is an irrevocable statement about a change that occurs.
21 /// Each statement contains a datum, a time, and a difference, and asserts
22 /// that the multiplicity of the datum changes at the time by the difference.
23 Updates(Vec<(D, T, R)>),
24 /// An irrevocable statement about the number of updates within a time interval.
25 Progress(Progress<T>),
26}
27
28/// An irrevocable statement about the number of updates at times within an interval.
29///
30/// This statement covers all times beyond `lower` and not beyond `upper`.
31/// Each element of `counts` is an irrevocable statement about the exact number of
32/// distinct updates that occur at that time.
33/// Times not present in `counts` have a count of zero.
34#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
35pub struct Progress<T> {
36 /// The lower bound of times contained in this statement.
37 pub lower: Vec<T>,
38 /// The upper bound of times contained in this statement.
39 pub upper: Vec<T>,
40 /// All non-zero counts for times beyond `lower` and not beyond `upper`.
41 pub counts: Vec<(T, usize)>,
42}
43
44/// A simple sink for byte slices.
45pub trait Writer<T> {
46 /// Returns an amount of time to wait before retrying, or `None` for success.
47 fn poll(&mut self, item: &T) -> Option<Duration>;
48 /// Indicates if the sink has committed all sent data and can be safely dropped.
49 fn done(&self) -> bool;
50}
51
52/// A deduplicating, re-ordering iterator.
53pub mod iterator {
54
55 use super::{Message, Progress};
56 use crate::lattice::Lattice;
57 use std::hash::Hash;
58 use timely::order::PartialOrder;
59 use timely::progress::{
60 frontier::{AntichainRef, MutableAntichain},
61 Antichain,
62 Timestamp,
63 };
64
65 /// A direct implementation of a deduplicating, re-ordering iterator.
66 ///
67 /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
68 /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
69 /// footprint, proportional to the mismatch between the received updates and progress messages.
70 pub struct Iter<I, D, T, R>
71 where
72 I: Iterator<Item = Message<D, T, R>>,
73 T: Hash + Ord + Lattice + Clone,
74 D: Hash + Eq,
75 T: Hash + Eq,
76 R: Hash + Eq,
77 {
78 /// Source of potentially duplicated, out of order cdc_v2 messages.
79 iterator: I,
80 /// Updates that have been received, but are still beyond `reported_frontier`.
81 ///
82 /// These updates are retained both so that they can eventually be transmitted,
83 /// but also so that they can deduplicate updates that may still be received.
84 updates: std::collections::HashSet<(D, T, R)>,
85 /// Frontier through which the iterator has reported updates.
86 ///
87 /// All updates not beyond this frontier have been reported.
88 /// Any information related to times not beyond this frontier can be discarded.
89 ///
90 /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
91 /// our two bounds on potential uncertainty in progress and update messages.
92 reported_frontier: Antichain<T>,
93 /// Frontier of accepted progress statements.
94 ///
95 /// All progress message counts for times not beyond this frontier have been
96 /// incorporated in to `messages_frontier`. This frontier also guides which
97 /// received progress statements can be incorporated: those whose for which
98 /// this frontier is beyond their lower bound.
99 progress_frontier: Antichain<T>,
100 /// Counts of outstanding messages at times.
101 ///
102 /// These counts track the difference between message counts at times announced
103 /// by progress messages, and message counts at times received in distinct updates.
104 messages_frontier: MutableAntichain<T>,
105 /// Progress statements that are not yet actionable due to out-of-Iterness.
106 ///
107 /// A progress statement becomes actionable once the progress frontier is beyond
108 /// its lower frontier. This ensures that the [0, lower) interval is already
109 /// incorporated, and that we will not leave a gap by incorporating the counts
110 /// and reflecting the progress statement's upper frontier.
111 progress_queue: Vec<Progress<T>>,
112 }
113
114 impl<D, T, R, I> Iterator for Iter<I, D, T, R>
115 where
116 I: Iterator<Item = Message<D, T, R>>,
117 T: Hash + Ord + Lattice + Clone,
118 D: Hash + Eq + Clone,
119 R: Hash + Eq + Clone,
120 {
121 type Item = (Vec<(D, T, R)>, Antichain<T>);
122 fn next(&mut self) -> Option<Self::Item> {
123 // Each call to `next` should return some newly carved interval of time.
124 // As such, we should read from our source until we find such a thing.
125 //
126 // An interval can be completed once our frontier of received progress
127 // information and our frontier of unresolved counts have advanced.
128 while let Some(message) = self.iterator.next() {
129 match message {
130 Message::Updates(mut updates) => {
131 // Discard updates at reported times, or duplicates at unreported times.
132 updates.retain(|dtr| {
133 self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
134 });
135 // Decrement our counts of accounted-for messages.
136 self.messages_frontier
137 .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
138 // Record the messages in our de-duplication collection.
139 self.updates.extend(updates.into_iter());
140 }
141 Message::Progress(progress) => {
142 // A progress statement may not be immediately actionable.
143 self.progress_queue.push(progress);
144 }
145 }
146
147 // Attempt to drain actionable progress messages.
148 // A progress message is actionable if `self.progress_frontier` is greater or
149 // equal to the message's lower bound.
150 while let Some(position) = self.progress_queue.iter().position(|p| {
151 <_ as PartialOrder>::less_equal(
152 &AntichainRef::new(&p.lower),
153 &self.progress_frontier.borrow(),
154 )
155 }) {
156 let mut progress = self.progress_queue.remove(position);
157 // Discard counts that have already been incorporated.
158 progress
159 .counts
160 .retain(|(time, _count)| self.progress_frontier.less_equal(time));
161 // Record any new reports of expected counts.
162 self.messages_frontier
163 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
164 // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
165 let mut new_frontier = Antichain::new();
166 for time1 in progress.upper {
167 for time2 in self.progress_frontier.elements() {
168 new_frontier.insert(time1.join(time2));
169 }
170 }
171 self.progress_queue.retain(|p| {
172 !<_ as PartialOrder>::less_equal(
173 &AntichainRef::new(&p.upper),
174 &new_frontier.borrow(),
175 )
176 });
177 self.progress_frontier = new_frontier;
178 }
179
180 // Now check and see if our lower bound exceeds `self.reported_frontier`.
181 let mut lower_bound = self.progress_frontier.clone();
182 lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
183 if lower_bound != self.reported_frontier {
184 let to_publish = self
185 .updates
186 .iter()
187 .filter(|(_, t, _)| !lower_bound.less_equal(t))
188 .cloned()
189 .collect::<Vec<_>>();
190 self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
191 self.reported_frontier = lower_bound.clone();
192 return Some((to_publish, lower_bound));
193 }
194 }
195 None
196 }
197 }
198
199 impl<D, T, R, I> Iter<I, D, T, R>
200 where
201 I: Iterator<Item = Message<D, T, R>>,
202 T: Hash + Ord + Lattice + Clone + Timestamp,
203 D: Hash + Eq + Clone,
204 R: Hash + Eq + Clone,
205 {
206 /// Construct a new re-ordering, deduplicating iterator.
207 pub fn new(iterator: I) -> Self {
208 Self {
209 iterator,
210 updates: std::collections::HashSet::new(),
211 reported_frontier: Antichain::from_elem(T::minimum()),
212 progress_frontier: Antichain::from_elem(T::minimum()),
213 messages_frontier: MutableAntichain::new(),
214 progress_queue: Vec::new(),
215 }
216 }
217 }
218}
219
220/// Methods for recovering update streams from binary bundles.
221pub mod source {
222
223 use super::{Message, Progress};
224 use crate::{lattice::Lattice, ExchangeData};
225 use std::cell::RefCell;
226 use std::hash::Hash;
227 use std::rc::Rc;
228 use std::marker::{Send, Sync};
229 use std::sync::Arc;
230 use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
231 use timely::progress::Timestamp;
232 use timely::scheduling::SyncActivator;
233
234 // TODO(guswynn): implement this generally in timely
235 struct DropActivator {
236 activator: Arc<SyncActivator>,
237 }
238
239 impl Drop for DropActivator {
240 fn drop(&mut self) {
241 // Best effort: failure to activate
242 // is ignored
243 let _ = self.activator.activate();
244 }
245 }
246
247 /// Constructs a stream of updates from a source of messages.
248 ///
249 /// The stream is built in the supplied `scope` and continues to run until
250 /// the returned `Box<Any>` token is dropped. The `source_builder` argument
251 /// is invoked with a `SyncActivator` that will re-activate the source.
252 pub fn build<G, B, I, D, T, R>(
253 scope: G,
254 source_builder: B,
255 ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
256 where
257 G: Scope<Timestamp = T>,
258 B: FnOnce(SyncActivator) -> I,
259 I: Iterator<Item = Message<D, T, R>> + 'static,
260 D: ExchangeData + Hash,
261 T: ExchangeData + Hash + Timestamp + Lattice,
262 R: ExchangeData + Hash,
263 {
264 // Read messages are either updates or progress messages.
265 // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
266 // This includes both emitting updates, and setting expectations for update counts.
267 //
268 // Updates need to be deduplicated by (data, time), and we should exchange them by such.
269 // Progress needs to be deduplicated by time, and we should exchange them by such.
270 //
271 // The first cut of this is a dataflow graph that looks like (flowing downward)
272 //
273 // 1. MESSAGES:
274 // Reads `Message` stream; maintains capabilities.
275 // Sends `Updates` to UPDATES stage by hash((data, time, diff)).
276 // Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
277 // Shares capabilities with downstream operator.
278 // 2. UPDATES:
279 // Maintains and deduplicates updates.
280 // Ships updates once frontier advances.
281 // Ships counts to PROGRESS stage, by hash(time).
282 // 3. PROGRESS:
283 // Maintains outstanding message counts by time. Tracks frontiers.
284 // Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
285 // 4. FEEDBACK:
286 // Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
287 //
288 // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
289 // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
290 // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
291 // broadcast things are meant to be very reduced data.
292
293 use crate::hashable::Hashable;
294 use timely::dataflow::channels::pact::Exchange;
295 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
296 use timely::progress::frontier::MutableAntichain;
297 use timely::progress::ChangeBatch;
298
299 // Some message distribution logic depends on the number of workers.
300 let workers = scope.peers();
301
302 let mut token = None;
303 // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
304 let mut antichain = MutableAntichain::new();
305 antichain.update_iter(Some((T::minimum(), workers as i64)));
306 let shared_frontier = Rc::new(RefCell::new(antichain));
307 let shared_frontier2 = shared_frontier.clone();
308
309 // Step 1: The MESSAGES operator.
310 let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
311 let address = messages_op.operator_info().address;
312 let activator = scope.sync_activator_for(address.to_vec());
313 let activator2 = scope.activator_for(Rc::clone(&address));
314 let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
315 let mut source = source_builder(activator);
316 let (mut updates_out, updates) = messages_op.new_output();
317 let (mut progress_out, progress) = messages_op.new_output();
318 messages_op.build(|capabilities| {
319
320 // A Weak that communicates whether the returned token has been dropped.
321 let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
322
323 token = Some(drop_activator);
324
325 // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
326 let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
327 let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
328 // Capture the shared frontier to read out frontier updates to apply.
329 let local_frontier = shared_frontier.clone();
330 //
331 move |_frontiers| {
332 // First check to ensure that we haven't been terminated by someone dropping our tokens.
333 if drop_activator_weak.upgrade().is_none() {
334 // Give up our capabilities
335 updates_caps.downgrade(&[]);
336 progress_caps.downgrade(&[]);
337 // never continue, even if we are (erroneously) activated again.
338 return;
339 }
340
341 // Consult our shared frontier, and ensure capabilities are downgraded to it.
342 let shared_frontier = local_frontier.borrow();
343 updates_caps.downgrade(&shared_frontier.frontier());
344 progress_caps.downgrade(&shared_frontier.frontier());
345
346 // Next check to see if we have been terminated by the source being complete.
347 if !updates_caps.is_empty() && !progress_caps.is_empty() {
348 let mut updates = updates_out.activate();
349 let mut progress = progress_out.activate();
350
351 // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
352 // Specifically, there may not be one capability valid for all updates.
353 let mut updates_session = updates.session(&updates_caps[0]);
354 let mut progress_session = progress.session(&progress_caps[0]);
355
356 // We presume the iterator will yield if appropriate.
357 for message in source.by_ref() {
358 match message {
359 Message::Updates(mut updates) => {
360 updates_session.give_container(&mut updates);
361 }
362 Message::Progress(progress) => {
363 // We must send a copy of each progress message to all workers,
364 // but we can partition the counts across workers by timestamp.
365 let mut to_worker = vec![Vec::new(); workers];
366 for (time, count) in progress.counts {
367 to_worker[(time.hashed() as usize) % workers]
368 .push((time, count));
369 }
370 for (worker, counts) in to_worker.into_iter().enumerate() {
371 progress_session.give((
372 worker,
373 Progress {
374 lower: progress.lower.clone(),
375 upper: progress.upper.clone(),
376 counts,
377 },
378 ));
379 }
380 }
381 }
382 }
383 }
384 }
385 });
386
387 // Step 2: The UPDATES operator.
388 let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
389 let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
390 let (mut changes_out, changes) = updates_op.new_output();
391 let (mut counts_out, counts) = updates_op.new_output();
392 updates_op.build(move |_capability| {
393 // Deduplicates updates, and ships novel updates and the counts for each time.
394 // For simplicity, this operator ships updates as they are discovered to be new.
395 // This has the defect that on load we may have two copies of the data (shipped,
396 // and here for deduplication).
397 //
398 // Filters may be pushed ahead of this operator, but because of deduplication we
399 // may not push projections ahead of this operator (at least, not without fields
400 // that are known to form keys, and even then only carefully).
401 let mut pending = std::collections::HashMap::new();
402 let mut change_batch = ChangeBatch::<T>::new();
403 move |frontiers| {
404 // Thin out deduplication buffer.
405 // This is the moment in a more advanced implementation where we might send
406 // the data for the first time, maintaining only one copy of each update live
407 // at a time in the system.
408 pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
409
410 // Deduplicate newly received updates, sending new updates and timestamp counts.
411 let mut changes = changes_out.activate();
412 let mut counts = counts_out.activate();
413 while let Some((capability, updates)) = input.next() {
414 let mut changes_session = changes.session(&capability);
415 let mut counts_session = counts.session(&capability);
416 for (data, time, diff) in updates.iter() {
417 if frontiers[0].less_equal(time) {
418 if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
419 assert_eq!(&prior, diff);
420 } else {
421 change_batch.update(time.clone(), -1);
422 changes_session.give((data.clone(), time.clone(), diff.clone()));
423 }
424 }
425 }
426 if !change_batch.is_empty() {
427 counts_session.give_iterator(change_batch.drain());
428 }
429 }
430 }
431 });
432
433 // Step 3: The PROGRESS operator.
434 let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
435 let mut input = progress_op.new_input(
436 &progress,
437 Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
438 );
439 let mut counts =
440 progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
441 let (mut frontier_out, frontier) = progress_op.new_output();
442 progress_op.build(move |_capability| {
443 // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
444
445 use timely::order::PartialOrder;
446 use timely::progress::{frontier::AntichainRef, Antichain};
447
448 let mut progress_queue = Vec::new();
449 let mut progress_frontier = Antichain::from_elem(T::minimum());
450 let mut updates_frontier = MutableAntichain::new();
451 let mut reported_frontier = Antichain::from_elem(T::minimum());
452
453 move |_frontiers| {
454 let mut frontier = frontier_out.activate();
455
456 // If the frontier changes we need a capability to express that.
457 // Any capability should work; the downstream listener doesn't care.
458 let mut capability: Option<Capability<T>> = None;
459
460 // Drain all relevant update counts in to the mutable antichain tracking its frontier.
461 while let Some((cap, counts)) = counts.next() {
462 updates_frontier.update_iter(counts.iter().cloned());
463 capability = Some(cap.retain());
464 }
465 // Drain all progress statements into the queue out of which we will work.
466 while let Some((cap, progress)) = input.next() {
467 progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
468 capability = Some(cap.retain());
469 }
470
471 // Extract and act on actionable progress messages.
472 // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
473 while let Some(position) = progress_queue.iter().position(|p| {
474 <_ as PartialOrder>::less_equal(
475 &AntichainRef::new(&p.lower),
476 &progress_frontier.borrow(),
477 )
478 }) {
479 // Extract progress statement.
480 let mut progress = progress_queue.remove(position);
481 // Discard counts that have already been incorporated.
482 progress
483 .counts
484 .retain(|(time, _count)| progress_frontier.less_equal(time));
485 // Record any new reports of expected counts.
486 updates_frontier
487 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
488 // Extend self.progress_frontier by progress.upper.
489 let mut new_frontier = Antichain::new();
490 for time1 in progress.upper {
491 for time2 in progress_frontier.elements() {
492 new_frontier.insert(time1.join(time2));
493 }
494 }
495 progress_frontier = new_frontier;
496 }
497
498 // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
499 let mut lower_bound = progress_frontier.clone();
500 lower_bound.extend(updates_frontier.frontier().iter().cloned());
501 if lower_bound != reported_frontier {
502 let capability =
503 capability.expect("Changes occurred, without surfacing a capability");
504 let mut changes = ChangeBatch::new();
505 changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
506 changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
507 let mut frontier_session = frontier.session(&capability);
508 for peer in 0..workers {
509 frontier_session.give((peer, changes.clone()));
510 }
511 reported_frontier = lower_bound.clone();
512 }
513 }
514 });
515
516 // Step 4: The FEEDBACK operator.
517 let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
518 let mut input = feedback_op.new_input(
519 &frontier,
520 Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
521 );
522 feedback_op.build(move |_capability| {
523 // Receive frontier changes and share the net result with MESSAGES.
524 move |_frontiers| {
525 let mut antichain = shared_frontier2.borrow_mut();
526 let mut must_activate = false;
527 while let Some((_cap, frontier_changes)) = input.next() {
528 for (_self, input_changes) in frontier_changes.iter() {
529 // Apply the updates, and observe if the lower bound has changed.
530 if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
531 must_activate = true;
532 }
533 }
534 }
535 // If the lower bound has changed, we must activate MESSAGES.
536 if must_activate { activator2.activate(); }
537 }
538 });
539
540 (Box::new(token.unwrap()), changes)
541 }
542}
543
544/// Methods for recording update streams to binary bundles.
545pub mod sink {
546
547 use std::hash::Hash;
548 use std::cell::RefCell;
549 use std::rc::Weak;
550
551 use serde::{Deserialize, Serialize};
552
553 use timely::order::PartialOrder;
554 use timely::progress::{Antichain, ChangeBatch, Timestamp};
555 use timely::dataflow::{Scope, Stream};
556 use timely::dataflow::channels::pact::{Exchange, Pipeline};
557 use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder};
558
559 use crate::{lattice::Lattice, ExchangeData};
560 use super::{Writer, Message, Progress};
561
562 /// Constructs a sink, for recording the updates in `stream`.
563 ///
564 /// It is crucial that `stream` has been consolidated before this method, which
565 /// will *not* perform the consolidation on the stream's behalf. If this is not
566 /// performed before calling the method, the recorded output may not be correctly
567 /// reconstructed by readers.
568 pub fn build<G, BS, D, T, R>(
569 stream: &Stream<G, (D, T, R)>,
570 sink_hash: u64,
571 updates_sink: Weak<RefCell<BS>>,
572 progress_sink: Weak<RefCell<BS>>,
573 ) where
574 G: Scope<Timestamp = T>,
575 BS: Writer<Message<D,T,R>> + 'static,
576 D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
577 T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
578 R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
579 {
580 // First we record the updates that stream in.
581 // We can simply record all updates, under the presumption that the have been consolidated
582 // and so any record we see is in fact guaranteed to happen.
583 let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
584 let reactivator = stream.scope().activator_for(builder.operator_info().address);
585 let mut input = builder.new_input(stream, Pipeline);
586 let (mut updates_out, updates) = builder.new_output();
587
588 builder.build_reschedule(
589 move |_capability| {
590 let mut timestamps = <ChangeBatch<_>>::new();
591 let mut send_queue = std::collections::VecDeque::new();
592 move |_frontiers| {
593 let mut output = updates_out.activate();
594
595 // We want to drain inputs always...
596 input.for_each(|capability, updates| {
597 // Write each update out, and record the timestamp.
598 for (_data, time, _diff) in updates.iter() {
599 timestamps.update(time.clone(), 1);
600 }
601
602 // Now record the update to the writer.
603 send_queue.push_back(Message::Updates(std::mem::take(updates)));
604
605 // Transmit timestamp counts downstream.
606 output
607 .session(&capability)
608 .give_iterator(timestamps.drain());
609 });
610
611 // Drain whatever we can from the queue of bytes to send.
612 // ... but needn't do anything more if our sink is closed.
613 if let Some(sink) = updates_sink.upgrade() {
614 let mut sink = sink.borrow_mut();
615 while let Some(message) = send_queue.front() {
616 if let Some(duration) = sink.poll(message) {
617 // Reschedule after `duration` and then bail.
618 reactivator.activate_after(duration);
619 return true;
620 } else {
621 send_queue.pop_front();
622 }
623 }
624 // Signal incompleteness if messages remain to be sent.
625 !sink.done() || !send_queue.is_empty()
626 } else {
627 // We have been terminated, but may still receive indefinite data.
628 send_queue.clear();
629 // Signal that there are no outstanding writes.
630 false
631 }
632 }
633 },
634 );
635
636 // We use a lower-level builder here to get access to the operator address, for rescheduling.
637 let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
638 let reactivator = stream.scope().activator_for(builder.operator_info().address);
639 let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
640 let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
641
642 // We now record the numbers of updates at each timestamp between lower and upper bounds.
643 // Track the advancing frontier, to know when to produce utterances.
644 let mut frontier = Antichain::from_elem(T::minimum());
645 // Track accumulated counts for timestamps.
646 let mut timestamps = <ChangeBatch<_>>::new();
647 // Stash for serialized data yet to send.
648 let mut send_queue = std::collections::VecDeque::new();
649 let mut retain = Vec::new();
650
651 builder.build_reschedule(|_capabilities| {
652 move |frontiers| {
653 let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]);
654
655 // We want to drain inputs no matter what.
656 // We could do this after the next step, as we are certain these timestamps will
657 // not be part of a closed frontier (as they have not yet been read). This has the
658 // potential to make things speedier as we scan less and keep a smaller footprint.
659 input.for_each(|_capability, counts| {
660 timestamps.extend(counts.iter().cloned());
661 });
662
663 if should_write {
664 if let Some(sink) = progress_sink.upgrade() {
665 let mut sink = sink.borrow_mut();
666
667 // If our frontier advances strictly, we have the opportunity to issue a progress statement.
668 if <_ as PartialOrder>::less_than(
669 &frontier.borrow(),
670 &input.frontier.frontier(),
671 ) {
672 let new_frontier = input.frontier.frontier();
673
674 // Extract the timestamp counts to announce.
675 let mut announce = Vec::new();
676 for (time, count) in timestamps.drain() {
677 if !new_frontier.less_equal(&time) {
678 announce.push((time, count as usize));
679 } else {
680 retain.push((time, count));
681 }
682 }
683 timestamps.extend(retain.drain(..));
684
685 // Announce the lower bound, upper bound, and timestamp counts.
686 let progress = Progress {
687 lower: frontier.elements().to_vec(),
688 upper: new_frontier.to_vec(),
689 counts: announce,
690 };
691 send_queue.push_back(Message::Progress(progress));
692
693 // Advance our frontier to track our progress utterance.
694 frontier = input.frontier.frontier().to_owned();
695
696 while let Some(message) = send_queue.front() {
697 if let Some(duration) = sink.poll(message) {
698 // Reschedule after `duration` and then bail.
699 reactivator.activate_after(duration);
700 // Signal that work remains to be done.
701 return true;
702 } else {
703 send_queue.pop_front();
704 }
705 }
706 }
707 // Signal incompleteness if messages remain to be sent.
708 !sink.done() || !send_queue.is_empty()
709 } else {
710 timestamps.clear();
711 send_queue.clear();
712 // Signal that there are no outstanding writes.
713 false
714 }
715 } else { false }
716 }
717 });
718 }
719}
720
721// pub mod kafka {
722
723// use serde::{Serialize, Deserialize};
724// use timely::scheduling::SyncActivator;
725// use rdkafka::{ClientContext, config::ClientConfig};
726// use rdkafka::consumer::{BaseConsumer, ConsumerContext};
727// use rdkafka::error::{KafkaError, RDKafkaError};
728// use super::BytesSink;
729
730// use std::hash::Hash;
731// use timely::progress::Timestamp;
732// use timely::dataflow::{Scope, Stream};
733// use crate::ExchangeData;
734// use crate::lattice::Lattice;
735
736// /// Creates a Kafka source from supplied configuration information.
737// pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
738// where
739// G: Scope<Timestamp = T>,
740// D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
741// T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
742// R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
743// {
744// super::source::build(scope, |activator| {
745// let source = KafkaSource::new(addr, topic, group, activator);
746// // An iterator combinator that yields every "duration" even if more items exist.
747// // The implementation of such an iterator exists in the git history, or can be rewritten easily.
748// super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
749// })
750// }
751
752// pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
753// where
754// G: Scope<Timestamp = T>,
755// D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
756// T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
757// R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
758// {
759// use std::rc::Rc;
760// use std::cell::RefCell;
761// use crate::hashable::Hashable;
762
763// let sink = KafkaSink::new(addr, topic);
764// let result = Rc::new(RefCell::new(sink));
765// let sink_hash = (addr.to_string(), topic.to_string()).hashed();
766// super::sink::build(
767// &stream,
768// sink_hash,
769// Rc::downgrade(&result),
770// Rc::downgrade(&result),
771// );
772// Box::new(result)
773
774// }
775
776// pub struct KafkaSource {
777// consumer: BaseConsumer<ActivationConsumerContext>,
778// }
779
780// impl KafkaSource {
781// pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
782// let mut kafka_config = ClientConfig::new();
783// // This is mostly cargo-cult'd in from `source/kafka.rs`.
784// kafka_config.set("bootstrap.servers", &addr.to_string());
785// kafka_config
786// .set("enable.auto.commit", "false")
787// .set("auto.offset.reset", "earliest");
788
789// kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
790// kafka_config.set("fetch.message.max.bytes", "134217728");
791// kafka_config.set("group.id", group);
792// kafka_config.set("isolation.level", "read_committed");
793// let activator = ActivationConsumerContext(activator);
794// let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
795// use rdkafka::consumer::Consumer;
796// consumer.subscribe(&[topic]).unwrap();
797// Self {
798// consumer,
799// }
800// }
801// }
802
803// pub struct Iter<D, T, R> {
804// pub source: KafkaSource,
805// phantom: std::marker::PhantomData<(D, T, R)>,
806// }
807
808// impl<D, T, R> Iter<D, T, R> {
809// /// Constructs a new iterator from a bytes source.
810// pub fn new_from(source: KafkaSource) -> Self {
811// Self {
812// source,
813// phantom: std::marker::PhantomData,
814// }
815// }
816// }
817
818// impl<D, T, R> Iterator for Iter<D, T, R>
819// where
820// D: for<'a>Deserialize<'a>,
821// T: for<'a>Deserialize<'a>,
822// R: for<'a>Deserialize<'a>,
823// {
824// type Item = super::Message<D, T, R>;
825// fn next(&mut self) -> Option<Self::Item> {
826// use rdkafka::message::Message;
827// self.source
828// .consumer
829// .poll(std::time::Duration::from_millis(0))
830// .and_then(|result| result.ok())
831// .and_then(|message| {
832// message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
833// })
834// }
835// }
836
837// /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
838// /// when the message queue switches from nonempty to empty.
839// struct ActivationConsumerContext(SyncActivator);
840
841// impl ClientContext for ActivationConsumerContext { }
842
843// impl ActivationConsumerContext {
844// fn activate(&self) {
845// self.0.activate().unwrap();
846// }
847// }
848
849// impl ConsumerContext for ActivationConsumerContext {
850// fn message_queue_nonempty_callback(&self) {
851// self.activate();
852// }
853// }
854
855// use std::time::Duration;
856// use rdkafka::producer::DefaultProducerContext;
857// use rdkafka::producer::{BaseRecord, ThreadedProducer};
858
859// pub struct KafkaSink {
860// topic: String,
861// producer: ThreadedProducer<DefaultProducerContext>,
862// }
863
864// impl KafkaSink {
865// pub fn new(addr: &str, topic: &str) -> Self {
866// let mut config = ClientConfig::new();
867// config.set("bootstrap.servers", &addr);
868// config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
869// config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
870// config.set("queue.buffering.max.ms", &format!("{}", 10));
871// let producer = config
872// .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
873// .expect("creating kafka producer for kafka sinks failed");
874// Self {
875// producer,
876// topic: topic.to_string(),
877// }
878// }
879// }
880
881// impl BytesSink for KafkaSink {
882// fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
883// let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
884
885// self.producer.send(record).err().map(|(e, _)| {
886// if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
887// Duration::from_secs(1)
888// } else {
889// // TODO(frank): report this error upwards so the user knows the sink is dead.
890// Duration::from_secs(1)
891// }
892// })
893// }
894// fn done(&self) -> bool {
895// self.producer.in_flight_count() == 0
896// }
897// }
898
899// }