Skip to main content

mz_compute/render/join/
mz_join_core.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//!  * Differential's `JoinCore::join_core`
15//!  * A Materialize fork thereof, called `mz_join_core`
16//!
17//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
18//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
19//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
20//! control for multiple seconds or longer, which in turn causes degraded user experience.
21//! `mz_join_core` resolves the loss-of-interactivity issue by also yielding within keys.
22//!
23//! For the moment, we keep both implementations around, selectable through feature flags.
24//! Eventually, we hope that `mz_join_core` proves itself sufficiently to become the only join
25//! implementation.
26
27use std::cell::Cell;
28use std::cell::RefCell;
29use std::cmp::Ordering;
30use std::collections::VecDeque;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::rc::Rc;
34use std::time::Instant;
35
36use differential_dataflow::Data;
37use differential_dataflow::consolidation::{consolidate_from, consolidate_updates};
38use differential_dataflow::lattice::Lattice;
39use differential_dataflow::operators::arrange::arrangement::Arranged;
40use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
41use mz_ore::future::yield_now;
42use mz_repr::Diff;
43use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
44use timely::dataflow::Stream;
45use timely::dataflow::channels::pact::Pipeline;
46use timely::dataflow::operators::generic::OutputBuilderSession;
47use timely::dataflow::operators::{Capability, Operator};
48use timely::{Container, PartialOrder};
49use tracing::trace;
50
51/// Joins two arranged collections with the same key type.
52///
53/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
54/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
55/// every value returned by the iterator.
56pub(super) fn mz_join_core<'scope, T, Tr1, Tr2, L, I, YFn, C>(
57    arranged1: Arranged<'scope, Tr1>,
58    arranged2: Arranged<'scope, Tr2>,
59    result: L,
60    yield_fn: YFn,
61) -> Stream<'scope, T, C>
62where
63    T: timely::progress::Timestamp + Lattice,
64    Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
65    Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
66    L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
67    I: IntoIterator<Item: Data> + 'static,
68    YFn: Fn(Instant, usize) -> bool + 'static,
69    C: Container + SizableContainer + PushInto<(I::Item, T, Diff)> + Data,
70{
71    let scope = arranged1.stream.scope();
72    let mut trace1 = arranged1.trace.clone();
73    let mut trace2 = arranged2.trace.clone();
74
75    arranged1.stream.binary_frontier(
76        arranged2.stream,
77        Pipeline,
78        Pipeline,
79        "Join",
80        move |capability, info| {
81            let operator_id = info.global_id;
82
83            // Acquire an activator to reschedule the operator when it has unfinished work.
84            let activator = scope.activator_for(info.address);
85
86            // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
87            // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
88            // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
89            // initial work for the two traces, and before the operator is constructed.
90
91            // Acknowledged frontier for each input.
92            // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
93            // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
94            // the physical compaction frontier of their corresponding trace.
95            // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
96            use timely::progress::frontier::Antichain;
97            let mut acknowledged1 = Antichain::from_elem(<T>::minimum());
98            let mut acknowledged2 = Antichain::from_elem(<T>::minimum());
99
100            // deferred work of batches from each input.
101            let result_fn = Rc::new(RefCell::new(result));
102            let mut todo1 = Work::<<Tr1::Batch as BatchReader>::Cursor, Tr2::Cursor, _, _>::new(
103                Rc::clone(&result_fn),
104            );
105            let mut todo2 =
106                Work::<Tr1::Cursor, <Tr2::Batch as BatchReader>::Cursor, _, _>::new(result_fn);
107
108            // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
109            trace1.map_batches(|batch1| {
110                trace!(
111                    operator_id,
112                    input = 1,
113                    lower = ?batch1.lower().elements(),
114                    upper = ?batch1.upper().elements(),
115                    size = batch1.len(),
116                    "pre-loading batch",
117                );
118
119                acknowledged1.clone_from(batch1.upper());
120                // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
121                // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
122                // Once we start streaming batches in, we will need to respond to new batches from
123                // `input1` with logic that would have otherwise been here. Check out the next loop
124                // for the structure.
125            });
126            // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
127            // iterating through batches and capturing the upper bound. This is a great moment to assert that
128            // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
129            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
130            assert!(PartialOrder::less_equal(
131                &trace1.get_physical_compaction(),
132                &acknowledged1.borrow()
133            ));
134
135            trace!(
136                operator_id,
137                input = 1,
138                acknowledged1 = ?acknowledged1.elements(),
139                "pre-loading finished",
140            );
141
142            // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
143            // on both traces at the same time, as they could be the same trace and this would panic.
144            let mut batch2_cursors = Vec::new();
145            trace2.map_batches(|batch2| {
146                trace!(
147                    operator_id,
148                    input = 2,
149                    lower = ?batch2.lower().elements(),
150                    upper = ?batch2.upper().elements(),
151                    size = batch2.len(),
152                    "pre-loading batch",
153                );
154
155                acknowledged2.clone_from(batch2.upper());
156                batch2_cursors.push((batch2.cursor(), batch2.clone()));
157            });
158            // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
159            // iterating through batches and capturing the upper bound. This is a great moment to assert that
160            // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
161            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
162            assert!(PartialOrder::less_equal(
163                &trace2.get_physical_compaction(),
164                &acknowledged2.borrow()
165            ));
166
167            // Load up deferred work using trace2 cursors and batches captured just above.
168            for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
169                trace!(
170                    operator_id,
171                    input = 2,
172                    acknowledged1 = ?acknowledged1.elements(),
173                    "deferring work for batch",
174                );
175
176                // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
177                let (trace1_cursor, trace1_storage) =
178                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
179                // We could downgrade the capability here, but doing so is a bit complicated mathematically.
180                // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
181                // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
182                // that property.
183                todo2.push(
184                    trace1_cursor,
185                    trace1_storage,
186                    batch2_cursor,
187                    batch2.clone(),
188                    capability.clone(),
189                );
190            }
191
192            trace!(
193                operator_id,
194                input = 2,
195                acknowledged2 = ?acknowledged2.elements(),
196                "pre-loading finished",
197            );
198
199            // Droppable handles to shared trace data structures.
200            let mut trace1_option = Some(trace1);
201            let mut trace2_option = Some(trace2);
202
203            move |(input1, frontier1), (input2, frontier2), output| {
204                // 1. Consuming input.
205                //
206                // The join computation repeatedly accepts batches of updates from each of its inputs.
207                //
208                // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
209                // updates from its other input. It is important to track which updates have been accepted, because
210                // we use a shared trace and there may be updates present that are in advance of this accepted bound.
211                //
212                // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
213                // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
214                // This last case is a consequence of our inability to transmit empty batches, as they may be formed
215                // in the absence of timely dataflow capabilities.
216
217                // Drain input 1, prepare work.
218                input1.for_each(|capability, data| {
219                    let trace2 = trace2_option
220                        .as_mut()
221                        .expect("we only drop a trace in response to the other input emptying");
222                    let capability = capability.retain(0);
223                    for batch1 in data.drain(..) {
224                        // Ignore any pre-loaded data.
225                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
226                            trace!(
227                                operator_id,
228                                input = 1,
229                                lower = ?batch1.lower().elements(),
230                                upper = ?batch1.upper().elements(),
231                                size = batch1.len(),
232                                "loading batch",
233                            );
234
235                            if !batch1.is_empty() {
236                                trace!(
237                                    operator_id,
238                                    input = 1,
239                                    acknowledged2 = ?acknowledged2.elements(),
240                                    "deferring work for batch",
241                                );
242
243                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
244                                // at start-up, and have held back physical compaction ever since.
245                                let (trace2_cursor, trace2_storage) =
246                                    trace2.cursor_through(acknowledged2.borrow()).unwrap();
247                                let batch1_cursor = batch1.cursor();
248                                todo1.push(
249                                    batch1_cursor,
250                                    batch1.clone(),
251                                    trace2_cursor,
252                                    trace2_storage,
253                                    capability.clone(),
254                                );
255                            }
256
257                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
258                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
259                            // able to just assume the most recent `batch1.upper`
260                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
261                            acknowledged1.clone_from(batch1.upper());
262
263                            trace!(
264                                operator_id,
265                                input = 1,
266                                acknowledged1 = ?acknowledged1.elements(),
267                                "batch acknowledged",
268                            );
269                        }
270                    }
271                });
272
273                // Drain input 2, prepare work.
274                input2.for_each(|capability, data| {
275                    let trace1 = trace1_option
276                        .as_mut()
277                        .expect("we only drop a trace in response to the other input emptying");
278                    let capability = capability.retain(0);
279                    for batch2 in data.drain(..) {
280                        // Ignore any pre-loaded data.
281                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
282                            trace!(
283                                operator_id,
284                                input = 2,
285                                lower = ?batch2.lower().elements(),
286                                upper = ?batch2.upper().elements(),
287                                size = batch2.len(),
288                                "loading batch",
289                            );
290
291                            if !batch2.is_empty() {
292                                trace!(
293                                    operator_id,
294                                    input = 2,
295                                    acknowledged1 = ?acknowledged1.elements(),
296                                    "deferring work for batch",
297                                );
298
299                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
300                                // at start-up, and have held back physical compaction ever since.
301                                let (trace1_cursor, trace1_storage) =
302                                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
303                                let batch2_cursor = batch2.cursor();
304                                todo2.push(
305                                    trace1_cursor,
306                                    trace1_storage,
307                                    batch2_cursor,
308                                    batch2.clone(),
309                                    capability.clone(),
310                                );
311                            }
312
313                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
314                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
315                            // able to just assume the most recent `batch2.upper`
316                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
317                            acknowledged2.clone_from(batch2.upper());
318
319                            trace!(
320                                operator_id,
321                                input = 2,
322                                acknowledged2 = ?acknowledged2.elements(),
323                                "batch acknowledged",
324                            );
325                        }
326                    }
327                });
328
329                // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
330                if let Some(trace1) = trace1_option.as_mut() {
331                    trace!(
332                        operator_id,
333                        input = 1,
334                        acknowledged1 = ?acknowledged1.elements(),
335                        "advancing trace upper",
336                    );
337                    trace1.advance_upper(&mut acknowledged1);
338                }
339                if let Some(trace2) = trace2_option.as_mut() {
340                    trace!(
341                        operator_id,
342                        input = 2,
343                        acknowledged2 = ?acknowledged2.elements(),
344                        "advancing trace upper",
345                    );
346                    trace2.advance_upper(&mut acknowledged2);
347                }
348
349                // 2. Join computation.
350                //
351                // For each of the inputs, we do some amount of work (measured in terms of number
352                // of output records produced). This is meant to yield control to allow downstream
353                // operators to consume and reduce the output, but it it also means to provide some
354                // degree of responsiveness. There is a potential risk here that if we fall behind
355                // then the increasing queues hold back physical compaction of the underlying traces
356                // which results in unintentionally quadratic processing time (each batch of either
357                // input must scan all batches from the other input).
358
359                // Perform some amount of outstanding work for input 1.
360                trace!(
361                    operator_id,
362                    input = 1,
363                    work_left = todo1.remaining(),
364                    "starting work"
365                );
366                todo1.process(output, &yield_fn);
367                trace!(
368                    operator_id,
369                    input = 1,
370                    work_left = todo1.remaining(),
371                    "ceasing work",
372                );
373
374                // Perform some amount of outstanding work for input 2.
375                trace!(
376                    operator_id,
377                    input = 2,
378                    work_left = todo2.remaining(),
379                    "starting work"
380                );
381                todo2.process(output, &yield_fn);
382                trace!(
383                    operator_id,
384                    input = 2,
385                    work_left = todo2.remaining(),
386                    "ceasing work",
387                );
388
389                // Re-activate operator if work remains.
390                if !todo1.is_empty() || !todo2.is_empty() {
391                    activator.activate();
392                }
393
394                // 3. Trace maintenance.
395                //
396                // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
397                // the progress of an input, because should we ever drop one of the traces we will
398                // lose the ability to extract information from anything other than the input.
399                // For example, if we dropped `trace2` we would not be able to use `advance_upper`
400                // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
401                // compaction of `trace1`.
402
403                // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
404                if let Some(trace1) = trace1_option.as_mut() {
405                    if frontier2.is_empty() {
406                        trace!(operator_id, input = 1, "dropping trace handle");
407                        trace1_option = None;
408                    } else {
409                        trace!(
410                            operator_id,
411                            input = 1,
412                            logical = ?*frontier2.frontier(),
413                            physical = ?acknowledged1.elements(),
414                            "advancing trace compaction",
415                        );
416
417                        // Allow `trace1` to compact logically up to the frontier we may yet receive,
418                        // in the opposing input (`input2`). All `input2` times will be beyond this
419                        // frontier, and joined times only need to be accurate when advanced to it.
420                        trace1.set_logical_compaction(frontier2.frontier());
421                        // Allow `trace1` to compact physically up to the upper bound of batches we
422                        // have received in its input (`input1`). We will not require a cursor that
423                        // is not beyond this bound.
424                        trace1.set_physical_compaction(acknowledged1.borrow());
425                    }
426                }
427
428                // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
429                if let Some(trace2) = trace2_option.as_mut() {
430                    if frontier1.is_empty() {
431                        trace!(operator_id, input = 2, "dropping trace handle");
432                        trace2_option = None;
433                    } else {
434                        trace!(
435                            operator_id,
436                            input = 2,
437                            logical = ?*frontier1.frontier(),
438                            physical = ?acknowledged2.elements(),
439                            "advancing trace compaction",
440                        );
441
442                        // Allow `trace2` to compact logically up to the frontier we may yet receive,
443                        // in the opposing input (`input1`). All `input1` times will be beyond this
444                        // frontier, and joined times only need to be accurate when advanced to it.
445                        trace2.set_logical_compaction(frontier1.frontier());
446                        // Allow `trace2` to compact physically up to the upper bound of batches we
447                        // have received in its input (`input2`). We will not require a cursor that
448                        // is not beyond this bound.
449                        trace2.set_physical_compaction(acknowledged2.borrow());
450                    }
451                }
452            }
453        },
454    )
455}
456
457/// Work collected by the join operator.
458///
459/// The join operator enqueues new work here first, and then processes it at a controlled rate,
460/// potentially yielding control to the Timely runtime in between. This allows it to avoid OOMs,
461/// caused by buffering massive amounts of data at the output, and loss of interactivity.
462///
463/// Collected work can be reduced by calling the `process` method.
464struct Work<C1, C2, D, L>
465where
466    C1: Cursor,
467    C2: Cursor,
468{
469    /// Pending work.
470    todo: VecDeque<(Pin<Box<dyn Future<Output = ()>>>, Capability<C1::Time>)>,
471    /// A function that transforms raw join matches into join results.
472    result_fn: Rc<RefCell<L>>,
473    /// A buffer holding the join results.
474    ///
475    /// Written by the work futures, drained by `Work::process`.
476    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
477    /// The number of join results produced by work futures.
478    ///
479    /// Used with `yield_fn` to inform when `Work::process` should yield.
480    produced: Rc<Cell<usize>>,
481
482    _cursors: PhantomData<(C1, C2)>,
483}
484
485impl<C1, C2, D, L, I> Work<C1, C2, D, L>
486where
487    C1: Cursor<Diff = Diff> + 'static,
488    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff> + 'static,
489    D: Data,
490    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
491    I: IntoIterator<Item = D> + 'static,
492{
493    fn new(result_fn: Rc<RefCell<L>>) -> Self {
494        Self {
495            todo: Default::default(),
496            result_fn,
497            output: Default::default(),
498            produced: Default::default(),
499            _cursors: PhantomData,
500        }
501    }
502
503    /// Return the amount of remaining work chunks.
504    fn remaining(&self) -> usize {
505        self.todo.len()
506    }
507
508    /// Return whether there is any work pending.
509    fn is_empty(&self) -> bool {
510        self.remaining() == 0
511    }
512
513    /// Append some pending work.
514    fn push(
515        &mut self,
516        cursor1: C1,
517        storage1: C1::Storage,
518        cursor2: C2,
519        storage2: C2::Storage,
520        capability: Capability<C1::Time>,
521    ) {
522        let fut = self.start_work(
523            cursor1,
524            storage1,
525            cursor2,
526            storage2,
527            capability.time().clone(),
528        );
529
530        self.todo.push_back((Box::pin(fut), capability));
531    }
532
533    /// Process pending work until none is remaining or `yield_fn` requests a yield.
534    fn process<C, YFn>(
535        &mut self,
536        output: &mut OutputBuilderSession<'_, C1::Time, CapacityContainerBuilder<C>>,
537        yield_fn: YFn,
538    ) where
539        C: Container + SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
540        YFn: Fn(Instant, usize) -> bool,
541    {
542        let start_time = Instant::now();
543        self.produced.set(0);
544
545        let waker = futures::task::noop_waker();
546        let mut ctx = std::task::Context::from_waker(&waker);
547
548        while let Some((mut fut, cap)) = self.todo.pop_front() {
549            // Drive the work future until it's done or it's time to yield.
550            let mut done = false;
551            let mut should_yield = false;
552            while !done && !should_yield {
553                done = fut.as_mut().poll(&mut ctx).is_ready();
554                should_yield = yield_fn(start_time, self.produced.get());
555            }
556
557            // Drain the produced join results.
558            let mut output_buf = self.output.borrow_mut();
559
560            // Consolidating here is important when the join closure produces data that
561            // consolidates well, for example when projecting columns.
562            let old_len = output_buf.len();
563            consolidate_updates(&mut output_buf);
564            let recovered = old_len - output_buf.len();
565            self.produced.update(|x| x - recovered);
566
567            output.session(&cap).give_iterator(output_buf.drain(..));
568
569            if done {
570                // We have finished processing a chunk of work. Use this opportunity to truncate
571                // the output buffer, so we don't keep excess memory allocated forever.
572                *output_buf = Default::default();
573            } else if !done {
574                // Still work to do in this chunk.
575                self.todo.push_front((fut, cap));
576            }
577
578            if should_yield {
579                break;
580            }
581        }
582    }
583
584    /// Start the work of joining the updates produced by the given cursors.
585    ///
586    /// This method returns a `Future` that can be polled to make progress on the join work.
587    /// Returning a future allows us to implement the logic using async/await syntax where we can
588    /// conveniently pause the work at any point by calling `yield_now().await`. We are allowed to
589    /// hold references across yield points, which is something we wouldn't get with a hand-rolled
590    /// state machine implementation.
591    fn start_work(
592        &self,
593        mut cursor1: C1,
594        storage1: C1::Storage,
595        mut cursor2: C2,
596        storage2: C2::Storage,
597        meet: C1::Time,
598    ) -> impl Future<Output = ()> + use<C1, C2, D, L, I> {
599        let result_fn = Rc::clone(&self.result_fn);
600        let output = Rc::clone(&self.output);
601        let produced = Rc::clone(&self.produced);
602
603        async move {
604            let mut joiner = Joiner::new(result_fn, output, produced, meet);
605
606            while let Some(key1) = cursor1.get_key(&storage1)
607                && let Some(key2) = cursor2.get_key(&storage2)
608            {
609                match key1.cmp(&key2) {
610                    Ordering::Less => cursor1.seek_key(&storage1, key2),
611                    Ordering::Greater => cursor2.seek_key(&storage2, key1),
612                    Ordering::Equal => {
613                        joiner
614                            .join_key(key1, &mut cursor1, &storage1, &mut cursor2, &storage2)
615                            .await;
616
617                        cursor1.step_key(&storage1);
618                        cursor2.step_key(&storage2);
619                    }
620                }
621            }
622        }
623    }
624}
625
626/// Type that knows how to perform the core join logic.
627///
628/// The joiner implements two join strategies:
629///
630///  * The "simple" strategy produces a match for each combination of (val1, time1, val2, time2)
631///    found in the inputs. If there are multiple times in the input, it may produce matches for
632///    times in which one of the values wasn't present. These matches cancel each other out, so the
633///    result ends up correct.
634///  * The "linear scan over times" strategy sorts the input data by time and then steps through
635///    the input histories, producing matches for a pair of values only if both values where
636///    present at the same time.
637///
638/// The linear scan strategy avoids redundant work and is much more efficient than the simple
639/// strategy when many distinct times are present in the inputs. However, sorting the input data
640/// incurs some overhead, so we still prefer the simple variant when the input data is small.
641struct Joiner<'a, C1, C2, D, L>
642where
643    C1: Cursor,
644    C2: Cursor,
645{
646    /// A function that transforms raw join matches into join results.
647    result_fn: Rc<RefCell<L>>,
648    /// A buffer holding the join results.
649    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
650    /// The number of join results produced.
651    produced: Rc<Cell<usize>>,
652    /// A time to which all join results should be advanced.
653    meet: C1::Time,
654
655    /// Buffer for edit histories from the first input.
656    history1: ValueHistory<'a, C1>,
657    /// Buffer for edit histories from the second input.
658    history2: ValueHistory<'a, C2>,
659}
660
661impl<'a, C1, C2, D, L, I> Joiner<'a, C1, C2, D, L>
662where
663    C1: Cursor<Diff = Diff>,
664    C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
665    D: Data,
666    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
667    I: IntoIterator<Item = D> + 'static,
668{
669    fn new(
670        result_fn: Rc<RefCell<L>>,
671        output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
672        produced: Rc<Cell<usize>>,
673        meet: C1::Time,
674    ) -> Self {
675        Self {
676            result_fn,
677            output,
678            produced,
679            meet,
680            history1: ValueHistory::new(),
681            history2: ValueHistory::new(),
682        }
683    }
684
685    /// Produce matches for the values of a single key.
686    async fn join_key(
687        &mut self,
688        key: C1::Key<'_>,
689        cursor1: &mut C1,
690        storage1: &'a C1::Storage,
691        cursor2: &mut C2,
692        storage2: &'a C2::Storage,
693    ) {
694        self.history1.edits.load(cursor1, storage1, &self.meet);
695        self.history2.edits.load(cursor2, storage2, &self.meet);
696
697        // If the input data is small, use the simple strategy.
698        //
699        // TODO: This conditional is taken directly from DD. We should check if it might make sense
700        //       to do something different, like using the simple strategy always when the number
701        //       of distinct times is small.
702        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
703            self.join_key_simple(key);
704            yield_now().await;
705        } else {
706            self.join_key_linear_time_scan(key).await;
707        }
708    }
709
710    /// Produce matches for the values of a single key, using the simple strategy.
711    ///
712    /// This strategy is only meant to be used for small inputs, so we don't bother including yield
713    /// points or optimizations.
714    fn join_key_simple(&self, key: C1::Key<'_>) {
715        let mut result_fn = self.result_fn.borrow_mut();
716        let mut output = self.output.borrow_mut();
717
718        for (v1, t1, r1) in self.history1.edits.iter() {
719            for (v2, t2, r2) in self.history2.edits.iter() {
720                for data in result_fn(key, v1, v2) {
721                    output.push((data, t1.join(t2), r1 * r2));
722                    self.produced.update(|x| x + 1);
723                }
724            }
725        }
726    }
727
728    /// Produce matches for the values of a single key, using a linear scan through times.
729    async fn join_key_linear_time_scan(&mut self, key: C1::Key<'_>) {
730        let history1 = &mut self.history1;
731        let history2 = &mut self.history2;
732
733        history1.replay();
734        history2.replay();
735
736        // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
737        //       in here. If a time is ever repeated, for example, the call will be identical
738        //       and accomplish nothing. If only a single record has been added, it may not
739        //       be worth the time to collapse (advance, re-sort) the data when a linear scan
740        //       is sufficient.
741
742        // Join the next entry in `history1`.
743        let work_history1 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
744            let mut result_fn = self.result_fn.borrow_mut();
745            let mut output = self.output.borrow_mut();
746
747            let (t1, meet, v1, r1) = history1.get().unwrap();
748            history2.advance_past_by(meet);
749            for &(v2, ref t2, r2) in &history2.past {
750                for data in result_fn(key, v1, v2) {
751                    output.push((data, t1.join(t2), r1 * r2));
752                    self.produced.update(|x| x + 1);
753                }
754            }
755            history1.step();
756        };
757
758        // Join the next entry in `history2`.
759        let work_history2 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
760            let mut result_fn = self.result_fn.borrow_mut();
761            let mut output = self.output.borrow_mut();
762
763            let (t2, meet, v2, r2) = history2.get().unwrap();
764            history1.advance_past_by(meet);
765            for &(v1, ref t1, r1) in &history1.past {
766                for data in result_fn(key, v1, v2) {
767                    output.push((data, t1.join(t2), r1 * r2));
768                    self.produced.update(|x| x + 1);
769                }
770            }
771            history2.step();
772        };
773
774        while let Some(time1) = history1.get_time()
775            && let Some(time2) = history2.get_time()
776        {
777            if time1 < time2 {
778                work_history1(history1, history2)
779            } else {
780                work_history2(history1, history2)
781            };
782            yield_now().await;
783        }
784
785        while !history1.is_empty() {
786            work_history1(history1, history2);
787            yield_now().await;
788        }
789        while !history2.is_empty() {
790            work_history2(history1, history2);
791            yield_now().await;
792        }
793    }
794}
795
796/// An accumulation of (value, time, diff) updates.
797///
798/// Deduplicated values are stored in `values`. Each entry includes the end index of the
799/// corresponding range in `edits`. The edits stored for a value are consolidated.
800struct EditList<'a, C: Cursor> {
801    values: Vec<(C::Val<'a>, usize)>,
802    edits: Vec<(C::Time, Diff)>,
803}
804
805impl<'a, C> EditList<'a, C>
806where
807    C: Cursor<Diff = Diff>,
808{
809    fn len(&self) -> usize {
810        self.edits.len()
811    }
812
813    /// Load the updates in the given cursor.
814    ///
815    /// Steps over values, but not over keys.
816    fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: &C::Time) {
817        self.values.clear();
818        self.edits.clear();
819
820        let mut edit_idx = 0;
821        while let Some(value) = cursor.get_val(storage) {
822            cursor.map_times(storage, |time, diff| {
823                let mut time = C::owned_time(time);
824                time.join_assign(meet);
825                self.edits.push((time, C::owned_diff(diff)));
826            });
827
828            consolidate_from(&mut self.edits, edit_idx);
829
830            if self.edits.len() > edit_idx {
831                edit_idx = self.edits.len();
832                self.values.push((value, edit_idx));
833            }
834
835            cursor.step_val(storage);
836        }
837    }
838
839    /// Iterate over the contained updates.
840    fn iter(&self) -> impl Iterator<Item = (C::Val<'a>, &C::Time, Diff)> {
841        self.values
842            .iter()
843            .enumerate()
844            .flat_map(|(idx, (value, end))| {
845                let start = if idx == 0 { 0 } else { self.values[idx - 1].1 };
846                let edits = &self.edits[start..*end];
847                edits.iter().map(|(time, diff)| (*value, time, *diff))
848            })
849    }
850}
851
852/// A history for replaying updates in time order.
853struct ValueHistory<'a, C: Cursor> {
854    /// Unsorted updates to replay.
855    edits: EditList<'a, C>,
856    /// Time-sorted updates that have not been stepped over yet.
857    ///
858    /// Entries are (time, meet, value_idx, diff).
859    future: Vec<(C::Time, C::Time, usize, Diff)>,
860    /// Rolled-up updates that have been stepped over.
861    past: Vec<(C::Val<'a>, C::Time, Diff)>,
862}
863
864impl<'a, C> ValueHistory<'a, C>
865where
866    C: Cursor,
867{
868    /// Create a new empty `ValueHistory`.
869    fn new() -> Self {
870        Self {
871            edits: EditList {
872                values: Default::default(),
873                edits: Default::default(),
874            },
875            future: Default::default(),
876            past: Default::default(),
877        }
878    }
879
880    /// Return whether there are updates left to step over.
881    fn is_empty(&self) -> bool {
882        self.future.is_empty()
883    }
884
885    /// Return the next update.
886    fn get(&self) -> Option<(&C::Time, &C::Time, C::Val<'a>, Diff)> {
887        self.future.last().map(|(t, m, v, r)| {
888            let (value, _) = self.edits.values[*v];
889            (t, m, value, *r)
890        })
891    }
892
893    /// Return the time of the next update.
894    fn get_time(&self) -> Option<&C::Time> {
895        self.future.last().map(|(t, _, _, _)| t)
896    }
897
898    /// Populate `future` with the updates stored in `edits`.
899    fn replay(&mut self) {
900        self.future.clear();
901        self.past.clear();
902
903        let values = &self.edits.values;
904        let edits = &self.edits.edits;
905        for (idx, (_, end)) in values.iter().enumerate() {
906            let start = if idx == 0 { 0 } else { values[idx - 1].1 };
907            for edit_idx in start..*end {
908                let (time, diff) = &edits[edit_idx];
909                self.future.push((time.clone(), time.clone(), idx, *diff));
910            }
911        }
912
913        self.future.sort_by(|x, y| y.cmp(x));
914
915        for idx in 1..self.future.len() {
916            self.future[idx].1 = self.future[idx].1.meet(&self.future[idx - 1].1);
917        }
918    }
919
920    /// Advance the history by moving the next entry from `future` into `past`.
921    fn step(&mut self) {
922        let (time, _, value_idx, diff) = self.future.pop().unwrap();
923        let (value, _) = self.edits.values[value_idx];
924        self.past.push((value, time, diff));
925    }
926
927    /// Advance all times in `past` by `meet`.
928    fn advance_past_by(&mut self, meet: &C::Time) {
929        for (_, time, _) in &mut self.past {
930            time.join_assign(meet);
931        }
932        consolidate_updates(&mut self.past);
933    }
934}