mz_compute/render/join/mz_join_core.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//! * Differential's `JoinCore::join_core`
15//! * A Materialize fork thereof, called `mz_join_core`
16//!
17//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
18//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
19//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
20//! control for multiple seconds or longer, which in turn causes degraded user experience.
21//! `mz_join_core` resolves the loss-of-interactivity issue by also yielding within keys.
22//!
23//! For the moment, we keep both implementations around, selectable through feature flags.
24//! Eventually, we hope that `mz_join_core` proves itself sufficiently to become the only join
25//! implementation.
26
27use std::cell::Cell;
28use std::cell::RefCell;
29use std::cmp::Ordering;
30use std::collections::VecDeque;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::rc::Rc;
34use std::time::Instant;
35
36use differential_dataflow::Data;
37use differential_dataflow::consolidation::{consolidate_from, consolidate_updates};
38use differential_dataflow::lattice::Lattice;
39use differential_dataflow::operators::arrange::arrangement::Arranged;
40use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
41use mz_ore::future::yield_now;
42use mz_repr::Diff;
43use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
44use timely::dataflow::channels::pact::Pipeline;
45use timely::dataflow::operators::generic::OutputBuilderSession;
46use timely::dataflow::operators::{Capability, Operator};
47use timely::dataflow::{Scope, StreamCore};
48use timely::progress::timestamp::Timestamp;
49use timely::{Container, PartialOrder};
50use tracing::trace;
51
52/// Joins two arranged collections with the same key type.
53///
54/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
55/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
56/// every value returned by the iterator.
57pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
58 arranged1: &Arranged<G, Tr1>,
59 arranged2: &Arranged<G, Tr2>,
60 result: L,
61 yield_fn: YFn,
62) -> StreamCore<G, C>
63where
64 G: Scope,
65 G::Timestamp: Lattice,
66 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
67 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
68 + Clone
69 + 'static,
70 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
71 I: IntoIterator<Item: Data> + 'static,
72 YFn: Fn(Instant, usize) -> bool + 'static,
73 C: Container + SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
74{
75 let mut trace1 = arranged1.trace.clone();
76 let mut trace2 = arranged2.trace.clone();
77
78 arranged1.stream.binary_frontier(
79 &arranged2.stream,
80 Pipeline,
81 Pipeline,
82 "Join",
83 move |capability, info| {
84 let operator_id = info.global_id;
85
86 // Acquire an activator to reschedule the operator when it has unfinished work.
87 let activator = arranged1.stream.scope().activator_for(info.address);
88
89 // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
90 // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
91 // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
92 // initial work for the two traces, and before the operator is constructed.
93
94 // Acknowledged frontier for each input.
95 // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
96 // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
97 // the physical compaction frontier of their corresponding trace.
98 // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
99 use timely::progress::frontier::Antichain;
100 let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
101 let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
102
103 // deferred work of batches from each input.
104 let result_fn = Rc::new(RefCell::new(result));
105 let mut todo1 = Work::<<Tr1::Batch as BatchReader>::Cursor, Tr2::Cursor, _, _>::new(
106 Rc::clone(&result_fn),
107 );
108 let mut todo2 =
109 Work::<Tr1::Cursor, <Tr2::Batch as BatchReader>::Cursor, _, _>::new(result_fn);
110
111 // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
112 trace1.map_batches(|batch1| {
113 trace!(
114 operator_id,
115 input = 1,
116 lower = ?batch1.lower().elements(),
117 upper = ?batch1.upper().elements(),
118 size = batch1.len(),
119 "pre-loading batch",
120 );
121
122 acknowledged1.clone_from(batch1.upper());
123 // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
124 // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
125 // Once we start streaming batches in, we will need to respond to new batches from
126 // `input1` with logic that would have otherwise been here. Check out the next loop
127 // for the structure.
128 });
129 // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
130 // iterating through batches and capturing the upper bound. This is a great moment to assert that
131 // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
132 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
133 assert!(PartialOrder::less_equal(
134 &trace1.get_physical_compaction(),
135 &acknowledged1.borrow()
136 ));
137
138 trace!(
139 operator_id,
140 input = 1,
141 acknowledged1 = ?acknowledged1.elements(),
142 "pre-loading finished",
143 );
144
145 // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
146 // on both traces at the same time, as they could be the same trace and this would panic.
147 let mut batch2_cursors = Vec::new();
148 trace2.map_batches(|batch2| {
149 trace!(
150 operator_id,
151 input = 2,
152 lower = ?batch2.lower().elements(),
153 upper = ?batch2.upper().elements(),
154 size = batch2.len(),
155 "pre-loading batch",
156 );
157
158 acknowledged2.clone_from(batch2.upper());
159 batch2_cursors.push((batch2.cursor(), batch2.clone()));
160 });
161 // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
162 // iterating through batches and capturing the upper bound. This is a great moment to assert that
163 // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
164 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
165 assert!(PartialOrder::less_equal(
166 &trace2.get_physical_compaction(),
167 &acknowledged2.borrow()
168 ));
169
170 // Load up deferred work using trace2 cursors and batches captured just above.
171 for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
172 trace!(
173 operator_id,
174 input = 2,
175 acknowledged1 = ?acknowledged1.elements(),
176 "deferring work for batch",
177 );
178
179 // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
180 let (trace1_cursor, trace1_storage) =
181 trace1.cursor_through(acknowledged1.borrow()).unwrap();
182 // We could downgrade the capability here, but doing so is a bit complicated mathematically.
183 // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
184 // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
185 // that property.
186 todo2.push(
187 trace1_cursor,
188 trace1_storage,
189 batch2_cursor,
190 batch2.clone(),
191 capability.clone(),
192 );
193 }
194
195 trace!(
196 operator_id,
197 input = 2,
198 acknowledged2 = ?acknowledged2.elements(),
199 "pre-loading finished",
200 );
201
202 // Droppable handles to shared trace data structures.
203 let mut trace1_option = Some(trace1);
204 let mut trace2_option = Some(trace2);
205
206 move |(input1, frontier1), (input2, frontier2), output| {
207 // 1. Consuming input.
208 //
209 // The join computation repeatedly accepts batches of updates from each of its inputs.
210 //
211 // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
212 // updates from its other input. It is important to track which updates have been accepted, because
213 // we use a shared trace and there may be updates present that are in advance of this accepted bound.
214 //
215 // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
216 // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
217 // This last case is a consequence of our inability to transmit empty batches, as they may be formed
218 // in the absence of timely dataflow capabilities.
219
220 // Drain input 1, prepare work.
221 input1.for_each(|capability, data| {
222 let trace2 = trace2_option
223 .as_mut()
224 .expect("we only drop a trace in response to the other input emptying");
225 let capability = capability.retain();
226 for batch1 in data.drain(..) {
227 // Ignore any pre-loaded data.
228 if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
229 trace!(
230 operator_id,
231 input = 1,
232 lower = ?batch1.lower().elements(),
233 upper = ?batch1.upper().elements(),
234 size = batch1.len(),
235 "loading batch",
236 );
237
238 if !batch1.is_empty() {
239 trace!(
240 operator_id,
241 input = 1,
242 acknowledged2 = ?acknowledged2.elements(),
243 "deferring work for batch",
244 );
245
246 // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
247 // at start-up, and have held back physical compaction ever since.
248 let (trace2_cursor, trace2_storage) =
249 trace2.cursor_through(acknowledged2.borrow()).unwrap();
250 let batch1_cursor = batch1.cursor();
251 todo1.push(
252 batch1_cursor,
253 batch1.clone(),
254 trace2_cursor,
255 trace2_storage,
256 capability.clone(),
257 );
258 }
259
260 // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
261 // may have skipped over empty batches. Still, the batches are in-order, and we should be
262 // able to just assume the most recent `batch1.upper`
263 debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
264 acknowledged1.clone_from(batch1.upper());
265
266 trace!(
267 operator_id,
268 input = 1,
269 acknowledged1 = ?acknowledged1.elements(),
270 "batch acknowledged",
271 );
272 }
273 }
274 });
275
276 // Drain input 2, prepare work.
277 input2.for_each(|capability, data| {
278 let trace1 = trace1_option
279 .as_mut()
280 .expect("we only drop a trace in response to the other input emptying");
281 let capability = capability.retain();
282 for batch2 in data.drain(..) {
283 // Ignore any pre-loaded data.
284 if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
285 trace!(
286 operator_id,
287 input = 2,
288 lower = ?batch2.lower().elements(),
289 upper = ?batch2.upper().elements(),
290 size = batch2.len(),
291 "loading batch",
292 );
293
294 if !batch2.is_empty() {
295 trace!(
296 operator_id,
297 input = 2,
298 acknowledged1 = ?acknowledged1.elements(),
299 "deferring work for batch",
300 );
301
302 // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
303 // at start-up, and have held back physical compaction ever since.
304 let (trace1_cursor, trace1_storage) =
305 trace1.cursor_through(acknowledged1.borrow()).unwrap();
306 let batch2_cursor = batch2.cursor();
307 todo2.push(
308 trace1_cursor,
309 trace1_storage,
310 batch2_cursor,
311 batch2.clone(),
312 capability.clone(),
313 );
314 }
315
316 // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
317 // may have skipped over empty batches. Still, the batches are in-order, and we should be
318 // able to just assume the most recent `batch2.upper`
319 debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
320 acknowledged2.clone_from(batch2.upper());
321
322 trace!(
323 operator_id,
324 input = 2,
325 acknowledged2 = ?acknowledged2.elements(),
326 "batch acknowledged",
327 );
328 }
329 }
330 });
331
332 // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
333 if let Some(trace1) = trace1_option.as_mut() {
334 trace!(
335 operator_id,
336 input = 1,
337 acknowledged1 = ?acknowledged1.elements(),
338 "advancing trace upper",
339 );
340 trace1.advance_upper(&mut acknowledged1);
341 }
342 if let Some(trace2) = trace2_option.as_mut() {
343 trace!(
344 operator_id,
345 input = 2,
346 acknowledged2 = ?acknowledged2.elements(),
347 "advancing trace upper",
348 );
349 trace2.advance_upper(&mut acknowledged2);
350 }
351
352 // 2. Join computation.
353 //
354 // For each of the inputs, we do some amount of work (measured in terms of number
355 // of output records produced). This is meant to yield control to allow downstream
356 // operators to consume and reduce the output, but it it also means to provide some
357 // degree of responsiveness. There is a potential risk here that if we fall behind
358 // then the increasing queues hold back physical compaction of the underlying traces
359 // which results in unintentionally quadratic processing time (each batch of either
360 // input must scan all batches from the other input).
361
362 // Perform some amount of outstanding work for input 1.
363 trace!(
364 operator_id,
365 input = 1,
366 work_left = todo1.remaining(),
367 "starting work"
368 );
369 todo1.process(output, &yield_fn);
370 trace!(
371 operator_id,
372 input = 1,
373 work_left = todo1.remaining(),
374 "ceasing work",
375 );
376
377 // Perform some amount of outstanding work for input 2.
378 trace!(
379 operator_id,
380 input = 2,
381 work_left = todo2.remaining(),
382 "starting work"
383 );
384 todo2.process(output, &yield_fn);
385 trace!(
386 operator_id,
387 input = 2,
388 work_left = todo2.remaining(),
389 "ceasing work",
390 );
391
392 // Re-activate operator if work remains.
393 if !todo1.is_empty() || !todo2.is_empty() {
394 activator.activate();
395 }
396
397 // 3. Trace maintenance.
398 //
399 // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
400 // the progress of an input, because should we ever drop one of the traces we will
401 // lose the ability to extract information from anything other than the input.
402 // For example, if we dropped `trace2` we would not be able to use `advance_upper`
403 // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
404 // compaction of `trace1`.
405
406 // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
407 if let Some(trace1) = trace1_option.as_mut() {
408 if frontier2.is_empty() {
409 trace!(operator_id, input = 1, "dropping trace handle");
410 trace1_option = None;
411 } else {
412 trace!(
413 operator_id,
414 input = 1,
415 logical = ?*frontier2.frontier(),
416 physical = ?acknowledged1.elements(),
417 "advancing trace compaction",
418 );
419
420 // Allow `trace1` to compact logically up to the frontier we may yet receive,
421 // in the opposing input (`input2`). All `input2` times will be beyond this
422 // frontier, and joined times only need to be accurate when advanced to it.
423 trace1.set_logical_compaction(frontier2.frontier());
424 // Allow `trace1` to compact physically up to the upper bound of batches we
425 // have received in its input (`input1`). We will not require a cursor that
426 // is not beyond this bound.
427 trace1.set_physical_compaction(acknowledged1.borrow());
428 }
429 }
430
431 // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
432 if let Some(trace2) = trace2_option.as_mut() {
433 if frontier1.is_empty() {
434 trace!(operator_id, input = 2, "dropping trace handle");
435 trace2_option = None;
436 } else {
437 trace!(
438 operator_id,
439 input = 2,
440 logical = ?*frontier1.frontier(),
441 physical = ?acknowledged2.elements(),
442 "advancing trace compaction",
443 );
444
445 // Allow `trace2` to compact logically up to the frontier we may yet receive,
446 // in the opposing input (`input1`). All `input1` times will be beyond this
447 // frontier, and joined times only need to be accurate when advanced to it.
448 trace2.set_logical_compaction(frontier1.frontier());
449 // Allow `trace2` to compact physically up to the upper bound of batches we
450 // have received in its input (`input2`). We will not require a cursor that
451 // is not beyond this bound.
452 trace2.set_physical_compaction(acknowledged2.borrow());
453 }
454 }
455 }
456 },
457 )
458}
459
460/// Work collected by the join operator.
461///
462/// The join operator enqueues new work here first, and then processes it at a controlled rate,
463/// potentially yielding control to the Timely runtime in between. This allows it to avoid OOMs,
464/// caused by buffering massive amounts of data at the output, and loss of interactivity.
465///
466/// Collected work can be reduced by calling the `process` method.
467struct Work<C1, C2, D, L>
468where
469 C1: Cursor,
470 C2: Cursor,
471{
472 /// Pending work.
473 todo: VecDeque<(Pin<Box<dyn Future<Output = ()>>>, Capability<C1::Time>)>,
474 /// A function that transforms raw join matches into join results.
475 result_fn: Rc<RefCell<L>>,
476 /// A buffer holding the join results.
477 ///
478 /// Written by the work futures, drained by `Work::process`.
479 output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
480 /// The number of join results produced by work futures.
481 ///
482 /// Used with `yield_fn` to inform when `Work::process` should yield.
483 produced: Rc<Cell<usize>>,
484
485 _cursors: PhantomData<(C1, C2)>,
486}
487
488impl<C1, C2, D, L, I> Work<C1, C2, D, L>
489where
490 C1: Cursor<Diff = Diff> + 'static,
491 C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff> + 'static,
492 D: Data,
493 L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
494 I: IntoIterator<Item = D> + 'static,
495{
496 fn new(result_fn: Rc<RefCell<L>>) -> Self {
497 Self {
498 todo: Default::default(),
499 result_fn,
500 output: Default::default(),
501 produced: Default::default(),
502 _cursors: PhantomData,
503 }
504 }
505
506 /// Return the amount of remaining work chunks.
507 fn remaining(&self) -> usize {
508 self.todo.len()
509 }
510
511 /// Return whether there is any work pending.
512 fn is_empty(&self) -> bool {
513 self.remaining() == 0
514 }
515
516 /// Append some pending work.
517 fn push(
518 &mut self,
519 cursor1: C1,
520 storage1: C1::Storage,
521 cursor2: C2,
522 storage2: C2::Storage,
523 capability: Capability<C1::Time>,
524 ) {
525 let fut = self.start_work(
526 cursor1,
527 storage1,
528 cursor2,
529 storage2,
530 capability.time().clone(),
531 );
532
533 self.todo.push_back((Box::pin(fut), capability));
534 }
535
536 /// Process pending work until none is remaining or `yield_fn` requests a yield.
537 fn process<C, YFn>(
538 &mut self,
539 output: &mut OutputBuilderSession<'_, C1::Time, CapacityContainerBuilder<C>>,
540 yield_fn: YFn,
541 ) where
542 C: Container + SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
543 YFn: Fn(Instant, usize) -> bool,
544 {
545 let start_time = Instant::now();
546 self.produced.set(0);
547
548 let waker = futures::task::noop_waker();
549 let mut ctx = std::task::Context::from_waker(&waker);
550
551 while let Some((mut fut, cap)) = self.todo.pop_front() {
552 // Drive the work future until it's done or it's time to yield.
553 let mut done = false;
554 let mut should_yield = false;
555 while !done && !should_yield {
556 done = fut.as_mut().poll(&mut ctx).is_ready();
557 should_yield = yield_fn(start_time, self.produced.get());
558 }
559
560 // Drain the produced join results.
561 let mut output_buf = self.output.borrow_mut();
562
563 // Consolidating here is important when the join closure produces data that
564 // consolidates well, for example when projecting columns.
565 let old_len = output_buf.len();
566 consolidate_updates(&mut output_buf);
567 let recovered = old_len - output_buf.len();
568 self.produced.update(|x| x - recovered);
569
570 output.session(&cap).give_iterator(output_buf.drain(..));
571
572 if done {
573 // We have finished processing a chunk of work. Use this opportunity to truncate
574 // the output buffer, so we don't keep excess memory allocated forever.
575 *output_buf = Default::default();
576 } else if !done {
577 // Still work to do in this chunk.
578 self.todo.push_front((fut, cap));
579 }
580
581 if should_yield {
582 break;
583 }
584 }
585 }
586
587 /// Start the work of joining the updates produced by the given cursors.
588 ///
589 /// This method returns a `Future` that can be polled to make progress on the join work.
590 /// Returning a future allows us to implement the logic using async/await syntax where we can
591 /// conveniently pause the work at any point by calling `yield_now().await`. We are allowed to
592 /// hold references across yield points, which is something we wouldn't get with a hand-rolled
593 /// state machine implementation.
594 fn start_work(
595 &self,
596 mut cursor1: C1,
597 storage1: C1::Storage,
598 mut cursor2: C2,
599 storage2: C2::Storage,
600 meet: C1::Time,
601 ) -> impl Future<Output = ()> + use<C1, C2, D, L, I> {
602 let result_fn = Rc::clone(&self.result_fn);
603 let output = Rc::clone(&self.output);
604 let produced = Rc::clone(&self.produced);
605
606 async move {
607 let mut joiner = Joiner::new(result_fn, output, produced, meet);
608
609 while let Some(key1) = cursor1.get_key(&storage1)
610 && let Some(key2) = cursor2.get_key(&storage2)
611 {
612 match key1.cmp(&key2) {
613 Ordering::Less => cursor1.seek_key(&storage1, key2),
614 Ordering::Greater => cursor2.seek_key(&storage2, key1),
615 Ordering::Equal => {
616 joiner
617 .join_key(key1, &mut cursor1, &storage1, &mut cursor2, &storage2)
618 .await;
619
620 cursor1.step_key(&storage1);
621 cursor2.step_key(&storage2);
622 }
623 }
624 }
625 }
626 }
627}
628
629/// Type that knows how to perform the core join logic.
630///
631/// The joiner implements two join strategies:
632///
633/// * The "simple" strategy produces a match for each combination of (val1, time1, val2, time2)
634/// found in the inputs. If there are multiple times in the input, it may produce matches for
635/// times in which one of the values wasn't present. These matches cancel each other out, so the
636/// result ends up correct.
637/// * The "linear scan over times" strategy sorts the input data by time and then steps through
638/// the input histories, producing matches for a pair of values only if both values where
639/// present at the same time.
640///
641/// The linear scan strategy avoids redundant work and is much more efficient than the simple
642/// strategy when many distinct times are present in the inputs. However, sorting the input data
643/// incurs some overhead, so we still prefer the simple variant when the input data is small.
644struct Joiner<'a, C1, C2, D, L>
645where
646 C1: Cursor,
647 C2: Cursor,
648{
649 /// A function that transforms raw join matches into join results.
650 result_fn: Rc<RefCell<L>>,
651 /// A buffer holding the join results.
652 output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
653 /// The number of join results produced.
654 produced: Rc<Cell<usize>>,
655 /// A time to which all join results should be advanced.
656 meet: C1::Time,
657
658 /// Buffer for edit histories from the first input.
659 history1: ValueHistory<'a, C1>,
660 /// Buffer for edit histories from the second input.
661 history2: ValueHistory<'a, C2>,
662}
663
664impl<'a, C1, C2, D, L, I> Joiner<'a, C1, C2, D, L>
665where
666 C1: Cursor<Diff = Diff>,
667 C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
668 D: Data,
669 L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
670 I: IntoIterator<Item = D> + 'static,
671{
672 fn new(
673 result_fn: Rc<RefCell<L>>,
674 output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
675 produced: Rc<Cell<usize>>,
676 meet: C1::Time,
677 ) -> Self {
678 Self {
679 result_fn,
680 output,
681 produced,
682 meet,
683 history1: ValueHistory::new(),
684 history2: ValueHistory::new(),
685 }
686 }
687
688 /// Produce matches for the values of a single key.
689 async fn join_key(
690 &mut self,
691 key: C1::Key<'_>,
692 cursor1: &mut C1,
693 storage1: &'a C1::Storage,
694 cursor2: &mut C2,
695 storage2: &'a C2::Storage,
696 ) {
697 self.history1.edits.load(cursor1, storage1, &self.meet);
698 self.history2.edits.load(cursor2, storage2, &self.meet);
699
700 // If the input data is small, use the simple strategy.
701 //
702 // TODO: This conditional is taken directly from DD. We should check if it might make sense
703 // to do something different, like using the simple strategy always when the number
704 // of distinct times is small.
705 if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
706 self.join_key_simple(key);
707 yield_now().await;
708 } else {
709 self.join_key_linear_time_scan(key).await;
710 }
711 }
712
713 /// Produce matches for the values of a single key, using the simple strategy.
714 ///
715 /// This strategy is only meant to be used for small inputs, so we don't bother including yield
716 /// points or optimizations.
717 fn join_key_simple(&self, key: C1::Key<'_>) {
718 let mut result_fn = self.result_fn.borrow_mut();
719 let mut output = self.output.borrow_mut();
720
721 for (v1, t1, r1) in self.history1.edits.iter() {
722 for (v2, t2, r2) in self.history2.edits.iter() {
723 for data in result_fn(key, v1, v2) {
724 output.push((data, t1.join(t2), r1 * r2));
725 self.produced.update(|x| x + 1);
726 }
727 }
728 }
729 }
730
731 /// Produce matches for the values of a single key, using a linear scan through times.
732 async fn join_key_linear_time_scan(&mut self, key: C1::Key<'_>) {
733 let history1 = &mut self.history1;
734 let history2 = &mut self.history2;
735
736 history1.replay();
737 history2.replay();
738
739 // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
740 // in here. If a time is ever repeated, for example, the call will be identical
741 // and accomplish nothing. If only a single record has been added, it may not
742 // be worth the time to collapse (advance, re-sort) the data when a linear scan
743 // is sufficient.
744
745 // Join the next entry in `history1`.
746 let work_history1 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
747 let mut result_fn = self.result_fn.borrow_mut();
748 let mut output = self.output.borrow_mut();
749
750 let (t1, meet, v1, r1) = history1.get().unwrap();
751 history2.advance_past_by(meet);
752 for &(v2, ref t2, r2) in &history2.past {
753 for data in result_fn(key, v1, v2) {
754 output.push((data, t1.join(t2), r1 * r2));
755 self.produced.update(|x| x + 1);
756 }
757 }
758 history1.step();
759 };
760
761 // Join the next entry in `history2`.
762 let work_history2 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
763 let mut result_fn = self.result_fn.borrow_mut();
764 let mut output = self.output.borrow_mut();
765
766 let (t2, meet, v2, r2) = history2.get().unwrap();
767 history1.advance_past_by(meet);
768 for &(v1, ref t1, r1) in &history1.past {
769 for data in result_fn(key, v1, v2) {
770 output.push((data, t1.join(t2), r1 * r2));
771 self.produced.update(|x| x + 1);
772 }
773 }
774 history2.step();
775 };
776
777 while let Some(time1) = history1.get_time()
778 && let Some(time2) = history2.get_time()
779 {
780 if time1 < time2 {
781 work_history1(history1, history2)
782 } else {
783 work_history2(history1, history2)
784 };
785 yield_now().await;
786 }
787
788 while !history1.is_empty() {
789 work_history1(history1, history2);
790 yield_now().await;
791 }
792 while !history2.is_empty() {
793 work_history2(history1, history2);
794 yield_now().await;
795 }
796 }
797}
798
799/// An accumulation of (value, time, diff) updates.
800///
801/// Deduplicated values are stored in `values`. Each entry includes the end index of the
802/// corresponding range in `edits`. The edits stored for a value are consolidated.
803struct EditList<'a, C: Cursor> {
804 values: Vec<(C::Val<'a>, usize)>,
805 edits: Vec<(C::Time, Diff)>,
806}
807
808impl<'a, C> EditList<'a, C>
809where
810 C: Cursor<Diff = Diff>,
811{
812 fn len(&self) -> usize {
813 self.edits.len()
814 }
815
816 /// Load the updates in the given cursor.
817 ///
818 /// Steps over values, but not over keys.
819 fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: &C::Time) {
820 self.values.clear();
821 self.edits.clear();
822
823 let mut edit_idx = 0;
824 while let Some(value) = cursor.get_val(storage) {
825 cursor.map_times(storage, |time, diff| {
826 let mut time = C::owned_time(time);
827 time.join_assign(meet);
828 self.edits.push((time, C::owned_diff(diff)));
829 });
830
831 consolidate_from(&mut self.edits, edit_idx);
832
833 if self.edits.len() > edit_idx {
834 edit_idx = self.edits.len();
835 self.values.push((value, edit_idx));
836 }
837
838 cursor.step_val(storage);
839 }
840 }
841
842 /// Iterate over the contained updates.
843 fn iter(&self) -> impl Iterator<Item = (C::Val<'a>, &C::Time, Diff)> {
844 self.values
845 .iter()
846 .enumerate()
847 .flat_map(|(idx, (value, end))| {
848 let start = if idx == 0 { 0 } else { self.values[idx - 1].1 };
849 let edits = &self.edits[start..*end];
850 edits.iter().map(|(time, diff)| (*value, time, *diff))
851 })
852 }
853}
854
855/// A history for replaying updates in time order.
856struct ValueHistory<'a, C: Cursor> {
857 /// Unsorted updates to replay.
858 edits: EditList<'a, C>,
859 /// Time-sorted updates that have not been stepped over yet.
860 ///
861 /// Entries are (time, meet, value_idx, diff).
862 future: Vec<(C::Time, C::Time, usize, Diff)>,
863 /// Rolled-up updates that have been stepped over.
864 past: Vec<(C::Val<'a>, C::Time, Diff)>,
865}
866
867impl<'a, C> ValueHistory<'a, C>
868where
869 C: Cursor,
870{
871 /// Create a new empty `ValueHistory`.
872 fn new() -> Self {
873 Self {
874 edits: EditList {
875 values: Default::default(),
876 edits: Default::default(),
877 },
878 future: Default::default(),
879 past: Default::default(),
880 }
881 }
882
883 /// Return whether there are updates left to step over.
884 fn is_empty(&self) -> bool {
885 self.future.is_empty()
886 }
887
888 /// Return the next update.
889 fn get(&self) -> Option<(&C::Time, &C::Time, C::Val<'a>, Diff)> {
890 self.future.last().map(|(t, m, v, r)| {
891 let (value, _) = self.edits.values[*v];
892 (t, m, value, *r)
893 })
894 }
895
896 /// Return the time of the next update.
897 fn get_time(&self) -> Option<&C::Time> {
898 self.future.last().map(|(t, _, _, _)| t)
899 }
900
901 /// Populate `future` with the updates stored in `edits`.
902 fn replay(&mut self) {
903 self.future.clear();
904 self.past.clear();
905
906 let values = &self.edits.values;
907 let edits = &self.edits.edits;
908 for (idx, (_, end)) in values.iter().enumerate() {
909 let start = if idx == 0 { 0 } else { values[idx - 1].1 };
910 for edit_idx in start..*end {
911 let (time, diff) = &edits[edit_idx];
912 self.future.push((time.clone(), time.clone(), idx, *diff));
913 }
914 }
915
916 self.future.sort_by(|x, y| y.cmp(x));
917
918 for idx in 1..self.future.len() {
919 self.future[idx].1 = self.future[idx].1.meet(&self.future[idx - 1].1);
920 }
921 }
922
923 /// Advance the history by moving the next entry from `future` into `past`.
924 fn step(&mut self) {
925 let (time, _, value_idx, diff) = self.future.pop().unwrap();
926 let (value, _) = self.edits.values[value_idx];
927 self.past.push((value, time, diff));
928 }
929
930 /// Advance all times in `past` by `meet`.
931 fn advance_past_by(&mut self, meet: &C::Time) {
932 for (_, time, _) in &mut self.past {
933 time.join_assign(meet);
934 }
935 consolidate_updates(&mut self.past);
936 }
937}