mz_compute/render/join/
mz_join_core_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows three implementations for linear joins:
13//!
14//!  * Differential's `JoinCore::join_core`
15//!  * A Materialize fork thereof, called `mz_join_core`
16//!  * Another Materialize fork thereof, called `mz_join_core_v2`
17//!
18//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
19//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
20//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
21//! control for multiple seconds or longer, which in turn causes degraded user experience.
22//!
23//! `mz_join_core` resolves the yielding issue but compromises by being quadratic over the number
24//! of distinct times in the input. `mz_join_core_v2` is an attempt to replace the quadratic
25//! behavior by using a linear scan through times, copied from the DD implementation.
26//!
27//! For the moment, we keep all three implementations around, selectable through feature flags.
28//! Eventually, we hope that `mz_join_core_v2` proves itself sufficiently to become the only join
29//! implementation.
30
31use std::cell::Cell;
32use std::cell::RefCell;
33use std::cmp::Ordering;
34use std::collections::VecDeque;
35use std::marker::PhantomData;
36use std::pin::Pin;
37use std::rc::Rc;
38use std::time::Instant;
39
40use differential_dataflow::Data;
41use differential_dataflow::consolidation::{consolidate_from, consolidate_updates};
42use differential_dataflow::lattice::Lattice;
43use differential_dataflow::operators::arrange::arrangement::Arranged;
44use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
45use mz_ore::future::yield_now;
46use mz_repr::Diff;
47use timely::PartialOrder;
48use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
49use timely::dataflow::channels::pact::Pipeline;
50use timely::dataflow::channels::pushers::Tee;
51use timely::dataflow::operators::generic::OutputHandleCore;
52use timely::dataflow::operators::{Capability, Operator};
53use timely::dataflow::{Scope, StreamCore};
54use timely::progress::timestamp::Timestamp;
55use tracing::trace;
56
57use crate::render::context::ShutdownProbe;
58
59/// Joins two arranged collections with the same key type.
60///
61/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
62/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
63/// every value returned by the iterator.
64pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
65    arranged1: &Arranged<G, Tr1>,
66    arranged2: &Arranged<G, Tr2>,
67    shutdown_probe: ShutdownProbe,
68    result: L,
69    yield_fn: YFn,
70) -> StreamCore<G, C>
71where
72    G: Scope,
73    G::Timestamp: Lattice,
74    Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
75    Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
76        + Clone
77        + 'static,
78    L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
79    I: IntoIterator<Item: Data> + 'static,
80    YFn: Fn(Instant, usize) -> bool + 'static,
81    C: SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
82{
83    let mut trace1 = arranged1.trace.clone();
84    let mut trace2 = arranged2.trace.clone();
85
86    arranged1.stream.binary_frontier(
87        &arranged2.stream,
88        Pipeline,
89        Pipeline,
90        "Join",
91        move |capability, info| {
92            let operator_id = info.global_id;
93
94            // Acquire an activator to reschedule the operator when it has unfinished work.
95            let activator = arranged1.stream.scope().activator_for(info.address);
96
97            // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
98            // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
99            // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
100            // initial work for the two traces, and before the operator is constructed.
101
102            // Acknowledged frontier for each input.
103            // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
104            // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
105            // the physical compaction frontier of their corresponding trace.
106            // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
107            use timely::progress::frontier::Antichain;
108            let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
109            let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
110
111            // deferred work of batches from each input.
112            let result_fn = Rc::new(RefCell::new(result));
113            let mut todo1 = Work::<<Tr1::Batch as BatchReader>::Cursor, Tr2::Cursor, _, _>::new(
114                Rc::clone(&result_fn),
115            );
116            let mut todo2 =
117                Work::<Tr1::Cursor, <Tr2::Batch as BatchReader>::Cursor, _, _>::new(result_fn);
118
119            // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
120            trace1.map_batches(|batch1| {
121                trace!(
122                    operator_id,
123                    input = 1,
124                    lower = ?batch1.lower().elements(),
125                    upper = ?batch1.upper().elements(),
126                    size = batch1.len(),
127                    "pre-loading batch",
128                );
129
130                acknowledged1.clone_from(batch1.upper());
131                // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
132                // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
133                // Once we start streaming batches in, we will need to respond to new batches from
134                // `input1` with logic that would have otherwise been here. Check out the next loop
135                // for the structure.
136            });
137            // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
138            // iterating through batches and capturing the upper bound. This is a great moment to assert that
139            // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
140            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
141            assert!(PartialOrder::less_equal(
142                &trace1.get_physical_compaction(),
143                &acknowledged1.borrow()
144            ));
145
146            trace!(
147                operator_id,
148                input = 1,
149                acknowledged1 = ?acknowledged1.elements(),
150                "pre-loading finished",
151            );
152
153            // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
154            // on both traces at the same time, as they could be the same trace and this would panic.
155            let mut batch2_cursors = Vec::new();
156            trace2.map_batches(|batch2| {
157                trace!(
158                    operator_id,
159                    input = 2,
160                    lower = ?batch2.lower().elements(),
161                    upper = ?batch2.upper().elements(),
162                    size = batch2.len(),
163                    "pre-loading batch",
164                );
165
166                acknowledged2.clone_from(batch2.upper());
167                batch2_cursors.push((batch2.cursor(), batch2.clone()));
168            });
169            // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
170            // iterating through batches and capturing the upper bound. This is a great moment to assert that
171            // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
172            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
173            assert!(PartialOrder::less_equal(
174                &trace2.get_physical_compaction(),
175                &acknowledged2.borrow()
176            ));
177
178            // Load up deferred work using trace2 cursors and batches captured just above.
179            for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
180                trace!(
181                    operator_id,
182                    input = 2,
183                    acknowledged1 = ?acknowledged1.elements(),
184                    "deferring work for batch",
185                );
186
187                // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
188                let (trace1_cursor, trace1_storage) =
189                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
190                // We could downgrade the capability here, but doing so is a bit complicated mathematically.
191                // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
192                // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
193                // that property.
194                todo2.push(
195                    trace1_cursor,
196                    trace1_storage,
197                    batch2_cursor,
198                    batch2.clone(),
199                    capability.clone(),
200                );
201            }
202
203            trace!(
204                operator_id,
205                input = 2,
206                acknowledged2 = ?acknowledged2.elements(),
207                "pre-loading finished",
208            );
209
210            // Droppable handles to shared trace data structures.
211            let mut trace1_option = Some(trace1);
212            let mut trace2_option = Some(trace2);
213
214            move |input1, input2, output| {
215                // If the dataflow is shutting down, discard all existing and future work.
216                if shutdown_probe.in_shutdown() {
217                    trace!(operator_id, "shutting down");
218
219                    // Discard data at the inputs.
220                    input1.for_each(|_cap, _data| ());
221                    input2.for_each(|_cap, _data| ());
222
223                    // Discard queued work.
224                    todo1.discard();
225                    todo2.discard();
226
227                    // Stop holding on to input traces.
228                    trace1_option = None;
229                    trace2_option = None;
230
231                    return;
232                }
233
234                // 1. Consuming input.
235                //
236                // The join computation repeatedly accepts batches of updates from each of its inputs.
237                //
238                // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
239                // updates from its other input. It is important to track which updates have been accepted, because
240                // we use a shared trace and there may be updates present that are in advance of this accepted bound.
241                //
242                // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
243                // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
244                // This last case is a consequence of our inability to transmit empty batches, as they may be formed
245                // in the absence of timely dataflow capabilities.
246
247                // Drain input 1, prepare work.
248                input1.for_each(|capability, data| {
249                    let trace2 = trace2_option
250                        .as_mut()
251                        .expect("we only drop a trace in response to the other input emptying");
252                    let capability = capability.retain();
253                    for batch1 in data.drain(..) {
254                        // Ignore any pre-loaded data.
255                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
256                            trace!(
257                                operator_id,
258                                input = 1,
259                                lower = ?batch1.lower().elements(),
260                                upper = ?batch1.upper().elements(),
261                                size = batch1.len(),
262                                "loading batch",
263                            );
264
265                            if !batch1.is_empty() {
266                                trace!(
267                                    operator_id,
268                                    input = 1,
269                                    acknowledged2 = ?acknowledged2.elements(),
270                                    "deferring work for batch",
271                                );
272
273                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
274                                // at start-up, and have held back physical compaction ever since.
275                                let (trace2_cursor, trace2_storage) =
276                                    trace2.cursor_through(acknowledged2.borrow()).unwrap();
277                                let batch1_cursor = batch1.cursor();
278                                todo1.push(
279                                    batch1_cursor,
280                                    batch1.clone(),
281                                    trace2_cursor,
282                                    trace2_storage,
283                                    capability.clone(),
284                                );
285                            }
286
287                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
288                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
289                            // able to just assume the most recent `batch1.upper`
290                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
291                            acknowledged1.clone_from(batch1.upper());
292
293                            trace!(
294                                operator_id,
295                                input = 1,
296                                acknowledged1 = ?acknowledged1.elements(),
297                                "batch acknowledged",
298                            );
299                        }
300                    }
301                });
302
303                // Drain input 2, prepare work.
304                input2.for_each(|capability, data| {
305                    let trace1 = trace1_option
306                        .as_mut()
307                        .expect("we only drop a trace in response to the other input emptying");
308                    let capability = capability.retain();
309                    for batch2 in data.drain(..) {
310                        // Ignore any pre-loaded data.
311                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
312                            trace!(
313                                operator_id,
314                                input = 2,
315                                lower = ?batch2.lower().elements(),
316                                upper = ?batch2.upper().elements(),
317                                size = batch2.len(),
318                                "loading batch",
319                            );
320
321                            if !batch2.is_empty() {
322                                trace!(
323                                    operator_id,
324                                    input = 2,
325                                    acknowledged1 = ?acknowledged1.elements(),
326                                    "deferring work for batch",
327                                );
328
329                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
330                                // at start-up, and have held back physical compaction ever since.
331                                let (trace1_cursor, trace1_storage) =
332                                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
333                                let batch2_cursor = batch2.cursor();
334                                todo2.push(
335                                    trace1_cursor,
336                                    trace1_storage,
337                                    batch2_cursor,
338                                    batch2.clone(),
339                                    capability.clone(),
340                                );
341                            }
342
343                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
344                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
345                            // able to just assume the most recent `batch2.upper`
346                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
347                            acknowledged2.clone_from(batch2.upper());
348
349                            trace!(
350                                operator_id,
351                                input = 2,
352                                acknowledged2 = ?acknowledged2.elements(),
353                                "batch acknowledged",
354                            );
355                        }
356                    }
357                });
358
359                // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
360                if let Some(trace1) = trace1_option.as_mut() {
361                    trace!(
362                        operator_id,
363                        input = 1,
364                        acknowledged1 = ?acknowledged1.elements(),
365                        "advancing trace upper",
366                    );
367                    trace1.advance_upper(&mut acknowledged1);
368                }
369                if let Some(trace2) = trace2_option.as_mut() {
370                    trace!(
371                        operator_id,
372                        input = 2,
373                        acknowledged2 = ?acknowledged2.elements(),
374                        "advancing trace upper",
375                    );
376                    trace2.advance_upper(&mut acknowledged2);
377                }
378
379                // 2. Join computation.
380                //
381                // For each of the inputs, we do some amount of work (measured in terms of number
382                // of output records produced). This is meant to yield control to allow downstream
383                // operators to consume and reduce the output, but it it also means to provide some
384                // degree of responsiveness. There is a potential risk here that if we fall behind
385                // then the increasing queues hold back physical compaction of the underlying traces
386                // which results in unintentionally quadratic processing time (each batch of either
387                // input must scan all batches from the other input).
388
389                // Perform some amount of outstanding work for input 1.
390                trace!(
391                    operator_id,
392                    input = 1,
393                    work_left = todo1.remaining(),
394                    "starting work"
395                );
396                todo1.process(output, &yield_fn);
397                trace!(
398                    operator_id,
399                    input = 1,
400                    work_left = todo1.remaining(),
401                    "ceasing work",
402                );
403
404                // Perform some amount of outstanding work for input 2.
405                trace!(
406                    operator_id,
407                    input = 2,
408                    work_left = todo2.remaining(),
409                    "starting work"
410                );
411                todo2.process(output, &yield_fn);
412                trace!(
413                    operator_id,
414                    input = 2,
415                    work_left = todo2.remaining(),
416                    "ceasing work",
417                );
418
419                // Re-activate operator if work remains.
420                if !todo1.is_empty() || !todo2.is_empty() {
421                    activator.activate();
422                }
423
424                // 3. Trace maintenance.
425                //
426                // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
427                // the progress of an input, because should we ever drop one of the traces we will
428                // lose the ability to extract information from anything other than the input.
429                // For example, if we dropped `trace2` we would not be able to use `advance_upper`
430                // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
431                // compaction of `trace1`.
432
433                // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
434                if let Some(trace1) = trace1_option.as_mut() {
435                    if input2.frontier().is_empty() {
436                        trace!(operator_id, input = 1, "dropping trace handle");
437                        trace1_option = None;
438                    } else {
439                        trace!(
440                            operator_id,
441                            input = 1,
442                            logical = ?*input2.frontier().frontier(),
443                            physical = ?acknowledged1.elements(),
444                            "advancing trace compaction",
445                        );
446
447                        // Allow `trace1` to compact logically up to the frontier we may yet receive,
448                        // in the opposing input (`input2`). All `input2` times will be beyond this
449                        // frontier, and joined times only need to be accurate when advanced to it.
450                        trace1.set_logical_compaction(input2.frontier().frontier());
451                        // Allow `trace1` to compact physically up to the upper bound of batches we
452                        // have received in its input (`input1`). We will not require a cursor that
453                        // is not beyond this bound.
454                        trace1.set_physical_compaction(acknowledged1.borrow());
455                    }
456                }
457
458                // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
459                if let Some(trace2) = trace2_option.as_mut() {
460                    if input1.frontier().is_empty() {
461                        trace!(operator_id, input = 2, "dropping trace handle");
462                        trace2_option = None;
463                    } else {
464                        trace!(
465                            operator_id,
466                            input = 2,
467                            logical = ?*input1.frontier().frontier(),
468                            physical = ?acknowledged2.elements(),
469                            "advancing trace compaction",
470                        );
471
472                        // Allow `trace2` to compact logically up to the frontier we may yet receive,
473                        // in the opposing input (`input1`). All `input1` times will be beyond this
474                        // frontier, and joined times only need to be accurate when advanced to it.
475                        trace2.set_logical_compaction(input1.frontier().frontier());
476                        // Allow `trace2` to compact physically up to the upper bound of batches we
477                        // have received in its input (`input2`). We will not require a cursor that
478                        // is not beyond this bound.
479                        trace2.set_physical_compaction(acknowledged2.borrow());
480                    }
481                }
482            }
483        },
484    )
485}
486
487/// Work collected by the join operator.
488///
489/// The join operator enqueues new work here first, and then processes it at a controlled rate,
490/// potentially yielding control to the Timely runtime in between. This allows it to avoid OOMs,
491/// caused by buffering massive amounts of data at the output, and loss of interactivity.
492///
493/// Collected work can be reduced by calling the `process` method.
494struct Work<C1, C2, D, L>
495where
496    C1: Cursor,
497    C2: Cursor,
498{
499    /// Pending work.
500    todo: VecDeque<(Pin<Box<dyn Future<Output = ()>>>, Capability<C1::Time>)>,
501    /// A function that transforms raw join matches into join results.
502    result_fn: Rc<RefCell<L>>,
503    /// A buffer holding the join results.
504    ///
505    /// Written by the work futures, drained by `Work::process`.
506    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
507    /// The number of join results produced by work futures.
508    ///
509    /// Used with `yield_fn` to inform when `Work::process` should yield.
510    produced: Rc<Cell<usize>>,
511
512    _cursors: PhantomData<(C1, C2)>,
513}
514
515impl<C1, C2, D, L, I> Work<C1, C2, D, L>
516where
517    C1: Cursor<Diff = Diff> + 'static,
518    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff> + 'static,
519    D: Data,
520    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
521    I: IntoIterator<Item = D> + 'static,
522{
523    fn new(result_fn: Rc<RefCell<L>>) -> Self {
524        Self {
525            todo: Default::default(),
526            result_fn,
527            output: Default::default(),
528            produced: Default::default(),
529            _cursors: PhantomData,
530        }
531    }
532
533    /// Return the amount of remaining work chunks.
534    fn remaining(&self) -> usize {
535        self.todo.len()
536    }
537
538    /// Return whether there is any work pending.
539    fn is_empty(&self) -> bool {
540        self.remaining() == 0
541    }
542
543    /// Append some pending work.
544    fn push(
545        &mut self,
546        cursor1: C1,
547        storage1: C1::Storage,
548        cursor2: C2,
549        storage2: C2::Storage,
550        capability: Capability<C1::Time>,
551    ) {
552        let fut = self.start_work(
553            cursor1,
554            storage1,
555            cursor2,
556            storage2,
557            capability.time().clone(),
558        );
559
560        self.todo.push_back((Box::pin(fut), capability));
561    }
562
563    /// Discard all pending work.
564    fn discard(&mut self) {
565        self.todo = Default::default();
566    }
567
568    /// Process pending work until none is remaining or `yield_fn` requests a yield.
569    fn process<C, YFn>(
570        &mut self,
571        output: &mut OutputHandleCore<C1::Time, CapacityContainerBuilder<C>, Tee<C1::Time, C>>,
572        yield_fn: YFn,
573    ) where
574        C: SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
575        YFn: Fn(Instant, usize) -> bool,
576    {
577        let start_time = Instant::now();
578        self.produced.set(0);
579
580        let waker = futures::task::noop_waker();
581        let mut ctx = std::task::Context::from_waker(&waker);
582
583        while let Some((mut fut, cap)) = self.todo.pop_front() {
584            // Drive the work future until it's done or it's time to yield.
585            let mut done = false;
586            let mut should_yield = false;
587            while !done && !should_yield {
588                done = fut.as_mut().poll(&mut ctx).is_ready();
589                should_yield = yield_fn(start_time, self.produced.get());
590            }
591
592            // Drain the produced join results.
593            let mut output_buf = self.output.borrow_mut();
594
595            // Consolidating here is important when the join closure produces data that
596            // consolidates well, for example when projecting columns.
597            let old_len = output_buf.len();
598            consolidate_updates(&mut output_buf);
599            let recovered = old_len - output_buf.len();
600            self.produced.update(|x| x - recovered);
601
602            output.session(&cap).give_iterator(output_buf.drain(..));
603
604            if done {
605                // We have finished processing a chunk of work. Use this opportunity to truncate
606                // the output buffer, so we don't keep excess memory allocated forever.
607                *output_buf = Default::default();
608            } else if !done {
609                // Still work to do in this chunk.
610                self.todo.push_front((fut, cap));
611            }
612
613            if should_yield {
614                break;
615            }
616        }
617    }
618
619    /// Start the work of joining the updates produced by the given cursors.
620    ///
621    /// This method returns a `Future` that can be polled to make progress on the join work.
622    /// Returning a future allows us to implement the logic using async/await syntax where we can
623    /// conveniently pause the work at any point by calling `yield_now().await`. We are allowed to
624    /// hold references across yield points, which is something we wouldn't get with a hand-rolled
625    /// state machine implementation.
626    fn start_work(
627        &self,
628        mut cursor1: C1,
629        storage1: C1::Storage,
630        mut cursor2: C2,
631        storage2: C2::Storage,
632        meet: C1::Time,
633    ) -> impl Future<Output = ()> + use<C1, C2, D, L, I> {
634        let result_fn = Rc::clone(&self.result_fn);
635        let output = Rc::clone(&self.output);
636        let produced = Rc::clone(&self.produced);
637
638        async move {
639            let mut joiner = Joiner::new(result_fn, output, produced, meet);
640
641            while let Some(key1) = cursor1.get_key(&storage1)
642                && let Some(key2) = cursor2.get_key(&storage2)
643            {
644                match key1.cmp(&key2) {
645                    Ordering::Less => cursor1.seek_key(&storage1, key2),
646                    Ordering::Greater => cursor2.seek_key(&storage2, key1),
647                    Ordering::Equal => {
648                        joiner
649                            .join_key(key1, &mut cursor1, &storage1, &mut cursor2, &storage2)
650                            .await;
651
652                        cursor1.step_key(&storage1);
653                        cursor2.step_key(&storage2);
654                    }
655                }
656            }
657        }
658    }
659}
660
661/// Type that knows how to perform the core join logic.
662///
663/// The joiner implements two join strategies:
664///
665///  * The "simple" strategy produces a match for each combination of (val1, time1, val2, time2)
666///    found in the inputs. If there are multiple times in the input, it may produce matches for
667///    times in which one of the values wasn't present. These matches cancel each other out, so the
668///    result ends up correct.
669///  * The "linear scan over times" strategy sorts the input data by time and then steps through
670///    the input histories, producing matches for a pair of values only if both values where
671///    present at the same time.
672///
673/// The linear scan strategy avoids redundant work and is much more efficient than the simple
674/// strategy when many distinct times are present in the inputs. However, sorting the input data
675/// incurs some overhead, so we still prefer the simple variant when the input data is small.
676struct Joiner<'a, C1, C2, D, L>
677where
678    C1: Cursor,
679    C2: Cursor,
680{
681    /// A function that transforms raw join matches into join results.
682    result_fn: Rc<RefCell<L>>,
683    /// A buffer holding the join results.
684    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
685    /// The number of join results produced.
686    produced: Rc<Cell<usize>>,
687    /// A time to which all join results should be advanced.
688    meet: C1::Time,
689
690    /// Buffer for edit histories from the first input.
691    history1: ValueHistory<'a, C1>,
692    /// Buffer for edit histories from the second input.
693    history2: ValueHistory<'a, C2>,
694}
695
696impl<'a, C1, C2, D, L, I> Joiner<'a, C1, C2, D, L>
697where
698    C1: Cursor<Diff = Diff>,
699    C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
700    D: Data,
701    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
702    I: IntoIterator<Item = D> + 'static,
703{
704    fn new(
705        result_fn: Rc<RefCell<L>>,
706        output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
707        produced: Rc<Cell<usize>>,
708        meet: C1::Time,
709    ) -> Self {
710        Self {
711            result_fn,
712            output,
713            produced,
714            meet,
715            history1: ValueHistory::new(),
716            history2: ValueHistory::new(),
717        }
718    }
719
720    /// Produce matches for the values of a single key.
721    async fn join_key(
722        &mut self,
723        key: C1::Key<'_>,
724        cursor1: &mut C1,
725        storage1: &'a C1::Storage,
726        cursor2: &mut C2,
727        storage2: &'a C2::Storage,
728    ) {
729        self.history1.edits.load(cursor1, storage1, &self.meet);
730        self.history2.edits.load(cursor2, storage2, &self.meet);
731
732        // If the input data is small, use the simple strategy.
733        //
734        // TODO: This conditional is taken directly from DD. We should check if it might make sense
735        //       to do something different, like using the simple strategy always when the number
736        //       of distinct times is small.
737        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
738            self.join_key_simple(key);
739            yield_now().await;
740        } else {
741            self.join_key_linear_time_scan(key).await;
742        }
743    }
744
745    /// Produce matches for the values of a single key, using the simple strategy.
746    ///
747    /// This strategy is only meant to be used for small inputs, so we don't bother including yield
748    /// points or optimizations.
749    fn join_key_simple(&self, key: C1::Key<'_>) {
750        let mut result_fn = self.result_fn.borrow_mut();
751        let mut output = self.output.borrow_mut();
752
753        for (v1, t1, r1) in self.history1.edits.iter() {
754            for (v2, t2, r2) in self.history2.edits.iter() {
755                for data in result_fn(key, v1, v2) {
756                    output.push((data, t1.join(t2), r1 * r2));
757                    self.produced.update(|x| x + 1);
758                }
759            }
760        }
761    }
762
763    /// Produce matches for the values of a single key, using a linear scan through times.
764    async fn join_key_linear_time_scan(&mut self, key: C1::Key<'_>) {
765        let history1 = &mut self.history1;
766        let history2 = &mut self.history2;
767
768        history1.replay();
769        history2.replay();
770
771        // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
772        //       in here. If a time is ever repeated, for example, the call will be identical
773        //       and accomplish nothing. If only a single record has been added, it may not
774        //       be worth the time to collapse (advance, re-sort) the data when a linear scan
775        //       is sufficient.
776
777        // Join the next entry in `history1`.
778        let work_history1 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
779            let mut result_fn = self.result_fn.borrow_mut();
780            let mut output = self.output.borrow_mut();
781
782            let (t1, meet, v1, r1) = history1.get().unwrap();
783            history2.advance_past_by(meet);
784            for &(v2, ref t2, r2) in &history2.past {
785                for data in result_fn(key, v1, v2) {
786                    output.push((data, t1.join(t2), r1 * r2));
787                    self.produced.update(|x| x + 1);
788                }
789            }
790            history1.step();
791        };
792
793        // Join the next entry in `history2`.
794        let work_history2 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
795            let mut result_fn = self.result_fn.borrow_mut();
796            let mut output = self.output.borrow_mut();
797
798            let (t2, meet, v2, r2) = history2.get().unwrap();
799            history1.advance_past_by(meet);
800            for &(v1, ref t1, r1) in &history1.past {
801                for data in result_fn(key, v1, v2) {
802                    output.push((data, t1.join(t2), r1 * r2));
803                    self.produced.update(|x| x + 1);
804                }
805            }
806            history2.step();
807        };
808
809        while let Some(time1) = history1.get_time()
810            && let Some(time2) = history2.get_time()
811        {
812            if time1 < time2 {
813                work_history1(history1, history2)
814            } else {
815                work_history2(history1, history2)
816            };
817            yield_now().await;
818        }
819
820        while !history1.is_empty() {
821            work_history1(history1, history2);
822            yield_now().await;
823        }
824        while !history2.is_empty() {
825            work_history2(history1, history2);
826            yield_now().await;
827        }
828    }
829}
830
831/// An accumulation of (value, time, diff) updates.
832///
833/// Deduplicated values are stored in `values`. Each entry includes the end index of the
834/// corresponding range in `edits`. The edits stored for a value are consolidated.
835struct EditList<'a, C: Cursor> {
836    values: Vec<(C::Val<'a>, usize)>,
837    edits: Vec<(C::Time, Diff)>,
838}
839
840impl<'a, C> EditList<'a, C>
841where
842    C: Cursor<Diff = Diff>,
843{
844    fn len(&self) -> usize {
845        self.edits.len()
846    }
847
848    /// Load the updates in the given cursor.
849    ///
850    /// Steps over values, but not over keys.
851    fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: &C::Time) {
852        self.values.clear();
853        self.edits.clear();
854
855        let mut edit_idx = 0;
856        while let Some(value) = cursor.get_val(storage) {
857            cursor.map_times(storage, |time, diff| {
858                let mut time = C::owned_time(time);
859                time.join_assign(meet);
860                self.edits.push((time, C::owned_diff(diff)));
861            });
862
863            consolidate_from(&mut self.edits, edit_idx);
864
865            if self.edits.len() > edit_idx {
866                edit_idx = self.edits.len();
867                self.values.push((value, edit_idx));
868            }
869
870            cursor.step_val(storage);
871        }
872    }
873
874    /// Iterate over the contained updates.
875    fn iter(&self) -> impl Iterator<Item = (C::Val<'a>, &C::Time, Diff)> {
876        self.values
877            .iter()
878            .enumerate()
879            .flat_map(|(idx, (value, end))| {
880                let start = if idx == 0 { 0 } else { self.values[idx - 1].1 };
881                let edits = &self.edits[start..*end];
882                edits.iter().map(|(time, diff)| (*value, time, *diff))
883            })
884    }
885}
886
887/// A history for replaying updates in time order.
888struct ValueHistory<'a, C: Cursor> {
889    /// Unsorted updates to replay.
890    edits: EditList<'a, C>,
891    /// Time-sorted updates that have not been stepped over yet.
892    ///
893    /// Entries are (time, meet, value_idx, diff).
894    future: Vec<(C::Time, C::Time, usize, Diff)>,
895    /// Rolled-up updates that have been stepped over.
896    past: Vec<(C::Val<'a>, C::Time, Diff)>,
897}
898
899impl<'a, C> ValueHistory<'a, C>
900where
901    C: Cursor,
902{
903    /// Create a new empty `ValueHistory`.
904    fn new() -> Self {
905        Self {
906            edits: EditList {
907                values: Default::default(),
908                edits: Default::default(),
909            },
910            future: Default::default(),
911            past: Default::default(),
912        }
913    }
914
915    /// Return whether there are updates left to step over.
916    fn is_empty(&self) -> bool {
917        self.future.is_empty()
918    }
919
920    /// Return the next update.
921    fn get(&self) -> Option<(&C::Time, &C::Time, C::Val<'a>, Diff)> {
922        self.future.last().map(|(t, m, v, r)| {
923            let (value, _) = self.edits.values[*v];
924            (t, m, value, *r)
925        })
926    }
927
928    /// Return the time of the next update.
929    fn get_time(&self) -> Option<&C::Time> {
930        self.future.last().map(|(t, _, _, _)| t)
931    }
932
933    /// Populate `future` with the updates stored in `edits`.
934    fn replay(&mut self) {
935        self.future.clear();
936        self.past.clear();
937
938        let values = &self.edits.values;
939        let edits = &self.edits.edits;
940        for (idx, (_, end)) in values.iter().enumerate() {
941            let start = if idx == 0 { 0 } else { values[idx - 1].1 };
942            for edit_idx in start..*end {
943                let (time, diff) = &edits[edit_idx];
944                self.future.push((time.clone(), time.clone(), idx, *diff));
945            }
946        }
947
948        self.future.sort_by(|x, y| y.cmp(x));
949
950        for idx in 1..self.future.len() {
951            self.future[idx].1 = self.future[idx].1.meet(&self.future[idx - 1].1);
952        }
953    }
954
955    /// Advance the history by moving the next entry from `future` into `past`.
956    fn step(&mut self) {
957        let (time, _, value_idx, diff) = self.future.pop().unwrap();
958        let (value, _) = self.edits.values[value_idx];
959        self.past.push((value, time, diff));
960    }
961
962    /// Advance all times in `past` by `meet`.
963    fn advance_past_by(&mut self, meet: &C::Time) {
964        for (_, time, _) in &mut self.past {
965            time.join_assign(meet);
966        }
967        consolidate_updates(&mut self.past);
968    }
969}