differential_dataflow/capture.rs
1//! Logic related to capture and replay of differential collections.
2//!
3//! This module defines a protocol for capturing and replaying differential collections
4//! to streaming storage that may both duplicate and reorder messages. It records facts
5//! about the collection that once true stay true, such as the exact changes data undergo
6//! at each time, and the number of distinct updates at each time.
7//!
8//! The methods are parameterized by implementors of byte sources and byte sinks. For
9//! example implementations of these traits, consult the commented text at the end of
10//! this file.
11
12use std::time::Duration;
13use serde::{Deserialize, Serialize};
14
15/// A message in the CDC V2 protocol.
16#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
17pub enum Message<D, T, R> {
18 /// A batch of updates that are certain to occur.
19 ///
20 /// Each triple is an irrevocable statement about a change that occurs.
21 /// Each statement contains a datum, a time, and a difference, and asserts
22 /// that the multiplicity of the datum changes at the time by the difference.
23 Updates(Vec<(D, T, R)>),
24 /// An irrevocable statement about the number of updates within a time interval.
25 Progress(Progress<T>),
26}
27
28/// An irrevocable statement about the number of updates at times within an interval.
29///
30/// This statement covers all times beyond `lower` and not beyond `upper`.
31/// Each element of `counts` is an irrevocable statement about the exact number of
32/// distinct updates that occur at that time.
33/// Times not present in `counts` have a count of zero.
34#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
35pub struct Progress<T> {
36 /// The lower bound of times contained in this statement.
37 pub lower: Vec<T>,
38 /// The upper bound of times contained in this statement.
39 pub upper: Vec<T>,
40 /// All non-zero counts for times beyond `lower` and not beyond `upper`.
41 pub counts: Vec<(T, usize)>,
42}
43
44/// A simple sink for byte slices.
45pub trait Writer<T> {
46 /// Returns an amount of time to wait before retrying, or `None` for success.
47 fn poll(&mut self, item: &T) -> Option<Duration>;
48 /// Indicates if the sink has committed all sent data and can be safely dropped.
49 fn done(&self) -> bool;
50}
51
52/// A deduplicating, re-ordering iterator.
53pub mod iterator {
54
55 use super::{Message, Progress};
56 use crate::lattice::Lattice;
57 use std::hash::Hash;
58 use timely::order::PartialOrder;
59 use timely::progress::{
60 frontier::{AntichainRef, MutableAntichain},
61 Antichain,
62 Timestamp,
63 };
64
65 /// A direct implementation of a deduplicating, re-ordering iterator.
66 ///
67 /// The iterator draws from a source that may have arbitrary duplication, be arbitrarily out of order,
68 /// and yet produces each update once, with in-order batches. The iterator maintains a bounded memory
69 /// footprint, proportional to the mismatch between the received updates and progress messages.
70 pub struct Iter<I, D, T, R>
71 where
72 I: Iterator<Item = Message<D, T, R>>,
73 T: Hash + Ord + Lattice + Clone,
74 D: Hash + Eq,
75 T: Hash + Eq,
76 R: Hash + Eq,
77 {
78 /// Source of potentially duplicated, out of order cdc_v2 messages.
79 iterator: I,
80 /// Updates that have been received, but are still beyond `reported_frontier`.
81 ///
82 /// These updates are retained both so that they can eventually be transmitted,
83 /// but also so that they can deduplicate updates that may still be received.
84 updates: std::collections::HashSet<(D, T, R)>,
85 /// Frontier through which the iterator has reported updates.
86 ///
87 /// All updates not beyond this frontier have been reported.
88 /// Any information related to times not beyond this frontier can be discarded.
89 ///
90 /// This frontier tracks the meet of `progress_frontier` and `messages_frontier`,
91 /// our two bounds on potential uncertainty in progress and update messages.
92 reported_frontier: Antichain<T>,
93 /// Frontier of accepted progress statements.
94 ///
95 /// All progress message counts for times not beyond this frontier have been
96 /// incorporated in to `messages_frontier`. This frontier also guides which
97 /// received progress statements can be incorporated: those whose for which
98 /// this frontier is beyond their lower bound.
99 progress_frontier: Antichain<T>,
100 /// Counts of outstanding messages at times.
101 ///
102 /// These counts track the difference between message counts at times announced
103 /// by progress messages, and message counts at times received in distinct updates.
104 messages_frontier: MutableAntichain<T>,
105 /// Progress statements that are not yet actionable due to out-of-Iterness.
106 ///
107 /// A progress statement becomes actionable once the progress frontier is beyond
108 /// its lower frontier. This ensures that the [0, lower) interval is already
109 /// incorporated, and that we will not leave a gap by incorporating the counts
110 /// and reflecting the progress statement's upper frontier.
111 progress_queue: Vec<Progress<T>>,
112 }
113
114 impl<D, T, R, I> Iterator for Iter<I, D, T, R>
115 where
116 I: Iterator<Item = Message<D, T, R>>,
117 T: Hash + Ord + Lattice + Clone,
118 D: Hash + Eq + Clone,
119 R: Hash + Eq + Clone,
120 {
121 type Item = (Vec<(D, T, R)>, Antichain<T>);
122 fn next(&mut self) -> Option<Self::Item> {
123 // Each call to `next` should return some newly carved interval of time.
124 // As such, we should read from our source until we find such a thing.
125 //
126 // An interval can be completed once our frontier of received progress
127 // information and our frontier of unresolved counts have advanced.
128 while let Some(message) = self.iterator.next() {
129 match message {
130 Message::Updates(mut updates) => {
131 // Discard updates at reported times, or duplicates at unreported times.
132 updates.retain(|dtr| {
133 self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
134 });
135 // Decrement our counts of accounted-for messages.
136 self.messages_frontier
137 .update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
138 // Record the messages in our de-duplication collection.
139 self.updates.extend(updates.into_iter());
140 }
141 Message::Progress(progress) => {
142 // A progress statement may not be immediately actionable.
143 self.progress_queue.push(progress);
144 }
145 }
146
147 // Attempt to drain actionable progress messages.
148 // A progress message is actionable if `self.progress_frontier` is greater or
149 // equal to the message's lower bound.
150 while let Some(position) = self.progress_queue.iter().position(|p| {
151 <_ as PartialOrder>::less_equal(
152 &AntichainRef::new(&p.lower),
153 &self.progress_frontier.borrow(),
154 )
155 }) {
156 let mut progress = self.progress_queue.remove(position);
157 // Discard counts that have already been incorporated.
158 progress
159 .counts
160 .retain(|(time, _count)| self.progress_frontier.less_equal(time));
161 // Record any new reports of expected counts.
162 self.messages_frontier
163 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
164 // Extend the frontier to be times greater or equal to both progress.upper and self.progress_frontier.
165 let mut new_frontier = Antichain::new();
166 for time1 in progress.upper {
167 for time2 in self.progress_frontier.elements() {
168 new_frontier.insert(time1.join(time2));
169 }
170 }
171 self.progress_queue.retain(|p| {
172 !<_ as PartialOrder>::less_equal(
173 &AntichainRef::new(&p.upper),
174 &new_frontier.borrow(),
175 )
176 });
177 self.progress_frontier = new_frontier;
178 }
179
180 // Now check and see if our lower bound exceeds `self.reported_frontier`.
181 let mut lower_bound = self.progress_frontier.clone();
182 lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
183 if lower_bound != self.reported_frontier {
184 let to_publish = self
185 .updates
186 .iter()
187 .filter(|(_, t, _)| !lower_bound.less_equal(t))
188 .cloned()
189 .collect::<Vec<_>>();
190 self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
191 self.reported_frontier = lower_bound.clone();
192 return Some((to_publish, lower_bound));
193 }
194 }
195 None
196 }
197 }
198
199 impl<D, T, R, I> Iter<I, D, T, R>
200 where
201 I: Iterator<Item = Message<D, T, R>>,
202 T: Hash + Ord + Lattice + Clone + Timestamp,
203 D: Hash + Eq + Clone,
204 R: Hash + Eq + Clone,
205 {
206 /// Construct a new re-ordering, deduplicating iterator.
207 pub fn new(iterator: I) -> Self {
208 Self {
209 iterator,
210 updates: std::collections::HashSet::new(),
211 reported_frontier: Antichain::from_elem(T::minimum()),
212 progress_frontier: Antichain::from_elem(T::minimum()),
213 messages_frontier: MutableAntichain::new(),
214 progress_queue: Vec::new(),
215 }
216 }
217 }
218}
219
220/// Methods for recovering update streams from binary bundles.
221pub mod source {
222
223 use super::{Message, Progress};
224 use crate::{lattice::Lattice, ExchangeData};
225 use std::cell::RefCell;
226 use std::hash::Hash;
227 use std::rc::Rc;
228 use std::marker::{Send, Sync};
229 use std::sync::Arc;
230 use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
231 use timely::dataflow::operators::generic::OutputBuilder;
232 use timely::progress::Timestamp;
233 use timely::scheduling::SyncActivator;
234
235 // TODO(guswynn): implement this generally in timely
236 struct DropActivator {
237 activator: Arc<SyncActivator>,
238 }
239
240 impl Drop for DropActivator {
241 fn drop(&mut self) {
242 // Best effort: failure to activate
243 // is ignored
244 let _ = self.activator.activate();
245 }
246 }
247
248 /// Constructs a stream of updates from a source of messages.
249 ///
250 /// The stream is built in the supplied `scope` and continues to run until
251 /// the returned `Box<Any>` token is dropped. The `source_builder` argument
252 /// is invoked with a `SyncActivator` that will re-activate the source.
253 pub fn build<G, B, I, D, T, R>(
254 scope: G,
255 source_builder: B,
256 ) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
257 where
258 G: Scope<Timestamp = T>,
259 B: FnOnce(SyncActivator) -> I,
260 I: Iterator<Item = Message<D, T, R>> + 'static,
261 D: ExchangeData + Hash,
262 T: ExchangeData + Hash + Timestamp + Lattice,
263 R: ExchangeData + Hash,
264 {
265 // Read messages are either updates or progress messages.
266 // Each may contain duplicates, and we must take care to deduplicate information before introducing it to an accumulation.
267 // This includes both emitting updates, and setting expectations for update counts.
268 //
269 // Updates need to be deduplicated by (data, time), and we should exchange them by such.
270 // Progress needs to be deduplicated by time, and we should exchange them by such.
271 //
272 // The first cut of this is a dataflow graph that looks like (flowing downward)
273 //
274 // 1. MESSAGES:
275 // Reads `Message` stream; maintains capabilities.
276 // Sends `Updates` to UPDATES stage by hash((data, time, diff)).
277 // Sends `Progress` to PROGRESS stage by hash(time), each with lower, upper bounds.
278 // Shares capabilities with downstream operator.
279 // 2. UPDATES:
280 // Maintains and deduplicates updates.
281 // Ships updates once frontier advances.
282 // Ships counts to PROGRESS stage, by hash(time).
283 // 3. PROGRESS:
284 // Maintains outstanding message counts by time. Tracks frontiers.
285 // Tracks lower bounds of messages and progress frontier. Broadcasts changes to FEEDBACK stage
286 // 4. FEEDBACK:
287 // Shares capabilities with MESSAGES; downgrades to track input from PROGRESS.
288 //
289 // Each of these stages can be arbitrarily data-parallel, and FEEDBACK *must* have the same parallelism as RAW.
290 // Limitations: MESSAGES must broadcast lower and upper bounds to PROGRESS and PROGRESS must broadcast its changes
291 // to FEEDBACK. This may mean that scaling up PROGRESS could introduce quadratic problems. Though, both of these
292 // broadcast things are meant to be very reduced data.
293
294 use crate::hashable::Hashable;
295 use timely::dataflow::channels::pact::Exchange;
296 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
297 use timely::progress::frontier::MutableAntichain;
298 use timely::progress::ChangeBatch;
299
300 // Some message distribution logic depends on the number of workers.
301 let workers = scope.peers();
302
303 let mut token = None;
304 // Frontier owned by the FEEDBACK operator and consulted by the MESSAGES operators.
305 let mut antichain = MutableAntichain::new();
306 antichain.update_iter(Some((T::minimum(), workers as i64)));
307 let shared_frontier = Rc::new(RefCell::new(antichain));
308 let shared_frontier2 = shared_frontier.clone();
309
310 // Step 1: The MESSAGES operator.
311 let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
312 let address = messages_op.operator_info().address;
313 let activator = scope.sync_activator_for(address.to_vec());
314 let activator2 = scope.activator_for(Rc::clone(&address));
315 let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
316 let mut source = source_builder(activator);
317 let (updates_out, updates) = messages_op.new_output();
318 let mut updates_out = OutputBuilder::from(updates_out);
319 let (progress_out, progress) = messages_op.new_output();
320 let mut progress_out = OutputBuilder::from(progress_out);
321
322 messages_op.build(|capabilities| {
323
324 // A Weak that communicates whether the returned token has been dropped.
325 let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
326
327 token = Some(drop_activator);
328
329 // Read messages from some source; shuffle them to UPDATES and PROGRESS; share capability with FEEDBACK.
330 let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
331 let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
332 // Capture the shared frontier to read out frontier updates to apply.
333 let local_frontier = shared_frontier.clone();
334 //
335 move |_frontiers| {
336 // First check to ensure that we haven't been terminated by someone dropping our tokens.
337 if drop_activator_weak.upgrade().is_none() {
338 // Give up our capabilities
339 updates_caps.downgrade(&[]);
340 progress_caps.downgrade(&[]);
341 // never continue, even if we are (erroneously) activated again.
342 return;
343 }
344
345 // Consult our shared frontier, and ensure capabilities are downgraded to it.
346 let shared_frontier = local_frontier.borrow();
347 updates_caps.downgrade(&shared_frontier.frontier());
348 progress_caps.downgrade(&shared_frontier.frontier());
349
350 // Next check to see if we have been terminated by the source being complete.
351 if !updates_caps.is_empty() && !progress_caps.is_empty() {
352 let mut updates = updates_out.activate();
353 let mut progress = progress_out.activate();
354
355 // TODO(frank): this is a moment where multi-temporal capabilities need to be fixed up.
356 // Specifically, there may not be one capability valid for all updates.
357 let mut updates_session = updates.session(&updates_caps[0]);
358 let mut progress_session = progress.session(&progress_caps[0]);
359
360 // We presume the iterator will yield if appropriate.
361 for message in source.by_ref() {
362 match message {
363 Message::Updates(mut updates) => {
364 updates_session.give_container(&mut updates);
365 }
366 Message::Progress(progress) => {
367 // We must send a copy of each progress message to all workers,
368 // but we can partition the counts across workers by timestamp.
369 let mut to_worker = vec![Vec::new(); workers];
370 for (time, count) in progress.counts {
371 to_worker[(time.hashed() as usize) % workers]
372 .push((time, count));
373 }
374 for (worker, counts) in to_worker.into_iter().enumerate() {
375 progress_session.give((
376 worker,
377 Progress {
378 lower: progress.lower.clone(),
379 upper: progress.upper.clone(),
380 counts,
381 },
382 ));
383 }
384 }
385 }
386 }
387 }
388 }
389 });
390
391 // Step 2: The UPDATES operator.
392 let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
393 let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
394 let (changes_out, changes) = updates_op.new_output();
395 let mut changes_out = OutputBuilder::from(changes_out);
396 let (counts_out, counts) = updates_op.new_output();
397 let mut counts_out = OutputBuilder::from(counts_out);
398
399 updates_op.build(move |_capability| {
400 // Deduplicates updates, and ships novel updates and the counts for each time.
401 // For simplicity, this operator ships updates as they are discovered to be new.
402 // This has the defect that on load we may have two copies of the data (shipped,
403 // and here for deduplication).
404 //
405 // Filters may be pushed ahead of this operator, but because of deduplication we
406 // may not push projections ahead of this operator (at least, not without fields
407 // that are known to form keys, and even then only carefully).
408 let mut pending = std::collections::HashMap::new();
409 let mut change_batch = ChangeBatch::<T>::new();
410 move |frontiers| {
411 // Thin out deduplication buffer.
412 // This is the moment in a more advanced implementation where we might send
413 // the data for the first time, maintaining only one copy of each update live
414 // at a time in the system.
415 pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
416
417 // Deduplicate newly received updates, sending new updates and timestamp counts.
418 let mut changes = changes_out.activate();
419 let mut counts = counts_out.activate();
420 while let Some((capability, updates)) = input.next() {
421 let mut changes_session = changes.session(&capability);
422 let mut counts_session = counts.session(&capability);
423 for (data, time, diff) in updates.iter() {
424 if frontiers[0].less_equal(time) {
425 if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
426 assert_eq!(&prior, diff);
427 } else {
428 change_batch.update(time.clone(), -1);
429 changes_session.give((data.clone(), time.clone(), diff.clone()));
430 }
431 }
432 }
433 if !change_batch.is_empty() {
434 counts_session.give_iterator(change_batch.drain());
435 }
436 }
437 }
438 });
439
440 // Step 3: The PROGRESS operator.
441 let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
442 let mut input = progress_op.new_input(
443 &progress,
444 Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
445 );
446 let mut counts =
447 progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
448 let (frontier_out, frontier) = progress_op.new_output();
449 let mut frontier_out = OutputBuilder::from(frontier_out);
450 progress_op.build(move |_capability| {
451 // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes.
452
453 use timely::order::PartialOrder;
454 use timely::progress::{frontier::AntichainRef, Antichain};
455
456 let mut progress_queue = Vec::new();
457 let mut progress_frontier = Antichain::from_elem(T::minimum());
458 let mut updates_frontier = MutableAntichain::new();
459 let mut reported_frontier = Antichain::from_elem(T::minimum());
460
461 move |_frontiers| {
462 let mut frontier = frontier_out.activate();
463
464 // If the frontier changes we need a capability to express that.
465 // Any capability should work; the downstream listener doesn't care.
466 let mut capability: Option<Capability<T>> = None;
467
468 // Drain all relevant update counts in to the mutable antichain tracking its frontier.
469 while let Some((cap, counts)) = counts.next() {
470 updates_frontier.update_iter(counts.iter().cloned());
471 capability = Some(cap.retain());
472 }
473 // Drain all progress statements into the queue out of which we will work.
474 while let Some((cap, progress)) = input.next() {
475 progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
476 capability = Some(cap.retain());
477 }
478
479 // Extract and act on actionable progress messages.
480 // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound.
481 while let Some(position) = progress_queue.iter().position(|p| {
482 <_ as PartialOrder>::less_equal(
483 &AntichainRef::new(&p.lower),
484 &progress_frontier.borrow(),
485 )
486 }) {
487 // Extract progress statement.
488 let mut progress = progress_queue.remove(position);
489 // Discard counts that have already been incorporated.
490 progress
491 .counts
492 .retain(|(time, _count)| progress_frontier.less_equal(time));
493 // Record any new reports of expected counts.
494 updates_frontier
495 .update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
496 // Extend self.progress_frontier by progress.upper.
497 let mut new_frontier = Antichain::new();
498 for time1 in progress.upper {
499 for time2 in progress_frontier.elements() {
500 new_frontier.insert(time1.join(time2));
501 }
502 }
503 progress_frontier = new_frontier;
504 }
505
506 // Determine if the lower bound of frontiers have advanced, and transmit updates if so.
507 let mut lower_bound = progress_frontier.clone();
508 lower_bound.extend(updates_frontier.frontier().iter().cloned());
509 if lower_bound != reported_frontier {
510 let capability =
511 capability.expect("Changes occurred, without surfacing a capability");
512 let mut changes = ChangeBatch::new();
513 changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
514 changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
515 let mut frontier_session = frontier.session(&capability);
516 for peer in 0..workers {
517 frontier_session.give((peer, changes.clone()));
518 }
519 reported_frontier = lower_bound.clone();
520 }
521 }
522 });
523
524 // Step 4: The FEEDBACK operator.
525 let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
526 let mut input = feedback_op.new_input(
527 &frontier,
528 Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
529 );
530 feedback_op.build(move |_capability| {
531 // Receive frontier changes and share the net result with MESSAGES.
532 move |_frontiers| {
533 let mut antichain = shared_frontier2.borrow_mut();
534 let mut must_activate = false;
535 while let Some((_cap, frontier_changes)) = input.next() {
536 for (_self, input_changes) in frontier_changes.iter() {
537 // Apply the updates, and observe if the lower bound has changed.
538 if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
539 must_activate = true;
540 }
541 }
542 }
543 // If the lower bound has changed, we must activate MESSAGES.
544 if must_activate { activator2.activate(); }
545 }
546 });
547
548 (Box::new(token.unwrap()), changes)
549 }
550}
551
552/// Methods for recording update streams to binary bundles.
553pub mod sink {
554
555 use std::hash::Hash;
556 use std::cell::RefCell;
557 use std::rc::Weak;
558
559 use serde::{Deserialize, Serialize};
560
561 use timely::order::PartialOrder;
562 use timely::progress::{Antichain, ChangeBatch, Timestamp};
563 use timely::dataflow::{Scope, Stream};
564 use timely::dataflow::channels::pact::{Exchange, Pipeline};
565 use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder};
566
567 use crate::{lattice::Lattice, ExchangeData};
568 use super::{Writer, Message, Progress};
569
570 /// Constructs a sink, for recording the updates in `stream`.
571 ///
572 /// It is crucial that `stream` has been consolidated before this method, which
573 /// will *not* perform the consolidation on the stream's behalf. If this is not
574 /// performed before calling the method, the recorded output may not be correctly
575 /// reconstructed by readers.
576 pub fn build<G, BS, D, T, R>(
577 stream: &Stream<G, (D, T, R)>,
578 sink_hash: u64,
579 updates_sink: Weak<RefCell<BS>>,
580 progress_sink: Weak<RefCell<BS>>,
581 ) where
582 G: Scope<Timestamp = T>,
583 BS: Writer<Message<D,T,R>> + 'static,
584 D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
585 T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
586 R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
587 {
588 // First we record the updates that stream in.
589 // We can simply record all updates, under the presumption that the have been consolidated
590 // and so any record we see is in fact guaranteed to happen.
591 let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
592 let reactivator = stream.scope().activator_for(builder.operator_info().address);
593 let mut input = builder.new_input(stream, Pipeline);
594 let (updates_out, updates) = builder.new_output();
595 let mut updates_out = OutputBuilder::from(updates_out);
596
597 builder.build_reschedule(
598 move |_capability| {
599 let mut timestamps = <ChangeBatch<_>>::new();
600 let mut send_queue = std::collections::VecDeque::new();
601 move |_frontiers| {
602 let mut output = updates_out.activate();
603
604 // We want to drain inputs always...
605 input.for_each(|capability, updates| {
606 // Write each update out, and record the timestamp.
607 for (_data, time, _diff) in updates.iter() {
608 timestamps.update(time.clone(), 1);
609 }
610
611 // Now record the update to the writer.
612 send_queue.push_back(Message::Updates(std::mem::take(updates)));
613
614 // Transmit timestamp counts downstream.
615 output
616 .session(&capability)
617 .give_iterator(timestamps.drain());
618 });
619
620 // Drain whatever we can from the queue of bytes to send.
621 // ... but needn't do anything more if our sink is closed.
622 if let Some(sink) = updates_sink.upgrade() {
623 let mut sink = sink.borrow_mut();
624 while let Some(message) = send_queue.front() {
625 if let Some(duration) = sink.poll(message) {
626 // Reschedule after `duration` and then bail.
627 reactivator.activate_after(duration);
628 return true;
629 } else {
630 send_queue.pop_front();
631 }
632 }
633 // Signal incompleteness if messages remain to be sent.
634 !sink.done() || !send_queue.is_empty()
635 } else {
636 // We have been terminated, but may still receive indefinite data.
637 send_queue.clear();
638 // Signal that there are no outstanding writes.
639 false
640 }
641 }
642 },
643 );
644
645 // We use a lower-level builder here to get access to the operator address, for rescheduling.
646 let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
647 let reactivator = stream.scope().activator_for(builder.operator_info().address);
648 let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
649 let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
650
651 // We now record the numbers of updates at each timestamp between lower and upper bounds.
652 // Track the advancing frontier, to know when to produce utterances.
653 let mut frontier = Antichain::from_elem(T::minimum());
654 // Track accumulated counts for timestamps.
655 let mut timestamps = <ChangeBatch<_>>::new();
656 // Stash for serialized data yet to send.
657 let mut send_queue = std::collections::VecDeque::new();
658 let mut retain = Vec::new();
659
660 builder.build_reschedule(|_capabilities| {
661 move |frontiers| {
662
663 // We want to drain inputs no matter what.
664 // We could do this after the next step, as we are certain these timestamps will
665 // not be part of a closed frontier (as they have not yet been read). This has the
666 // potential to make things speedier as we scan less and keep a smaller footprint.
667 input.for_each(|_capability, counts| {
668 timestamps.extend(counts.iter().cloned());
669 });
670
671 if should_write {
672 if let Some(sink) = progress_sink.upgrade() {
673 let mut sink = sink.borrow_mut();
674
675 // If our frontier advances strictly, we have the opportunity to issue a progress statement.
676 if <_ as PartialOrder>::less_than(
677 &frontier.borrow(),
678 &frontiers[0].frontier(),
679 ) {
680 let new_frontier = frontiers[0].frontier();
681
682 // Extract the timestamp counts to announce.
683 let mut announce = Vec::new();
684 for (time, count) in timestamps.drain() {
685 if !new_frontier.less_equal(&time) {
686 announce.push((time, count as usize));
687 } else {
688 retain.push((time, count));
689 }
690 }
691 timestamps.extend(retain.drain(..));
692
693 // Announce the lower bound, upper bound, and timestamp counts.
694 let progress = Progress {
695 lower: frontier.elements().to_vec(),
696 upper: new_frontier.to_vec(),
697 counts: announce,
698 };
699 send_queue.push_back(Message::Progress(progress));
700
701 // Advance our frontier to track our progress utterance.
702 frontier = frontiers[0].frontier().to_owned();
703
704 while let Some(message) = send_queue.front() {
705 if let Some(duration) = sink.poll(message) {
706 // Reschedule after `duration` and then bail.
707 reactivator.activate_after(duration);
708 // Signal that work remains to be done.
709 return true;
710 } else {
711 send_queue.pop_front();
712 }
713 }
714 }
715 // Signal incompleteness if messages remain to be sent.
716 !sink.done() || !send_queue.is_empty()
717 } else {
718 timestamps.clear();
719 send_queue.clear();
720 // Signal that there are no outstanding writes.
721 false
722 }
723 } else { false }
724 }
725 });
726 }
727}
728
729// pub mod kafka {
730
731// use serde::{Serialize, Deserialize};
732// use timely::scheduling::SyncActivator;
733// use rdkafka::{ClientContext, config::ClientConfig};
734// use rdkafka::consumer::{BaseConsumer, ConsumerContext};
735// use rdkafka::error::{KafkaError, RDKafkaError};
736// use super::BytesSink;
737
738// use std::hash::Hash;
739// use timely::progress::Timestamp;
740// use timely::dataflow::{Scope, Stream};
741// use crate::ExchangeData;
742// use crate::lattice::Lattice;
743
744// /// Creates a Kafka source from supplied configuration information.
745// pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
746// where
747// G: Scope<Timestamp = T>,
748// D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
749// T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice,
750// R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
751// {
752// super::source::build(scope, |activator| {
753// let source = KafkaSource::new(addr, topic, group, activator);
754// // An iterator combinator that yields every "duration" even if more items exist.
755// // The implementation of such an iterator exists in the git history, or can be rewritten easily.
756// super::YieldingIter::new_from(Iter::<D,T,R>::new_from(source), std::time::Duration::from_millis(10))
757// })
758// }
759
760// pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
761// where
762// G: Scope<Timestamp = T>,
763// D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
764// T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
765// R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
766// {
767// use std::rc::Rc;
768// use std::cell::RefCell;
769// use crate::hashable::Hashable;
770
771// let sink = KafkaSink::new(addr, topic);
772// let result = Rc::new(RefCell::new(sink));
773// let sink_hash = (addr.to_string(), topic.to_string()).hashed();
774// super::sink::build(
775// &stream,
776// sink_hash,
777// Rc::downgrade(&result),
778// Rc::downgrade(&result),
779// );
780// Box::new(result)
781
782// }
783
784// pub struct KafkaSource {
785// consumer: BaseConsumer<ActivationConsumerContext>,
786// }
787
788// impl KafkaSource {
789// pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self {
790// let mut kafka_config = ClientConfig::new();
791// // This is mostly cargo-cult'd in from `source/kafka.rs`.
792// kafka_config.set("bootstrap.servers", &addr.to_string());
793// kafka_config
794// .set("enable.auto.commit", "false")
795// .set("auto.offset.reset", "earliest");
796
797// kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds
798// kafka_config.set("fetch.message.max.bytes", "134217728");
799// kafka_config.set("group.id", group);
800// kafka_config.set("isolation.level", "read_committed");
801// let activator = ActivationConsumerContext(activator);
802// let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap();
803// use rdkafka::consumer::Consumer;
804// consumer.subscribe(&[topic]).unwrap();
805// Self {
806// consumer,
807// }
808// }
809// }
810
811// pub struct Iter<D, T, R> {
812// pub source: KafkaSource,
813// phantom: std::marker::PhantomData<(D, T, R)>,
814// }
815
816// impl<D, T, R> Iter<D, T, R> {
817// /// Constructs a new iterator from a bytes source.
818// pub fn new_from(source: KafkaSource) -> Self {
819// Self {
820// source,
821// phantom: std::marker::PhantomData,
822// }
823// }
824// }
825
826// impl<D, T, R> Iterator for Iter<D, T, R>
827// where
828// D: for<'a>Deserialize<'a>,
829// T: for<'a>Deserialize<'a>,
830// R: for<'a>Deserialize<'a>,
831// {
832// type Item = super::Message<D, T, R>;
833// fn next(&mut self) -> Option<Self::Item> {
834// use rdkafka::message::Message;
835// self.source
836// .consumer
837// .poll(std::time::Duration::from_millis(0))
838// .and_then(|result| result.ok())
839// .and_then(|message| {
840// message.payload().and_then(|message| bincode::deserialize::<super::Message<D, T, R>>(message).ok())
841// })
842// }
843// }
844
845// /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
846// /// when the message queue switches from nonempty to empty.
847// struct ActivationConsumerContext(SyncActivator);
848
849// impl ClientContext for ActivationConsumerContext { }
850
851// impl ActivationConsumerContext {
852// fn activate(&self) {
853// self.0.activate().unwrap();
854// }
855// }
856
857// impl ConsumerContext for ActivationConsumerContext {
858// fn message_queue_nonempty_callback(&self) {
859// self.activate();
860// }
861// }
862
863// use std::time::Duration;
864// use rdkafka::producer::DefaultProducerContext;
865// use rdkafka::producer::{BaseRecord, ThreadedProducer};
866
867// pub struct KafkaSink {
868// topic: String,
869// producer: ThreadedProducer<DefaultProducerContext>,
870// }
871
872// impl KafkaSink {
873// pub fn new(addr: &str, topic: &str) -> Self {
874// let mut config = ClientConfig::new();
875// config.set("bootstrap.servers", &addr);
876// config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20));
877// config.set("queue.buffering.max.messages", &format!("{}", 10_000_000));
878// config.set("queue.buffering.max.ms", &format!("{}", 10));
879// let producer = config
880// .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext)
881// .expect("creating kafka producer for kafka sinks failed");
882// Self {
883// producer,
884// topic: topic.to_string(),
885// }
886// }
887// }
888
889// impl BytesSink for KafkaSink {
890// fn poll(&mut self, bytes: &[u8]) -> Option<Duration> {
891// let record = BaseRecord::<[u8], _>::to(&self.topic).payload(bytes);
892
893// self.producer.send(record).err().map(|(e, _)| {
894// if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e {
895// Duration::from_secs(1)
896// } else {
897// // TODO(frank): report this error upwards so the user knows the sink is dead.
898// Duration::from_secs(1)
899// }
900// })
901// }
902// fn done(&self) -> bool {
903// self.producer.in_flight_count() == 0
904// }
905// }
906
907// }