mz_compute/render/join/
mz_join_core.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//!  * Differential's `JoinCore::join_core`
15//!  * A Materialize fork thereof, called `mz_join_core`
16//!
17//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
18//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
19//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
20//! control for multiple seconds or longer, which in turn causes degraded user experience.
21//! `mz_join_core` resolves the loss-of-interactivity issue by also yielding within keys.
22//!
23//! For the moment, we keep both implementations around, selectable through feature flags.
24//! Eventually, we hope that `mz_join_core` proves itself sufficiently to become the only join
25//! implementation.
26
27use std::cell::Cell;
28use std::cell::RefCell;
29use std::cmp::Ordering;
30use std::collections::VecDeque;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::rc::Rc;
34use std::time::Instant;
35
36use differential_dataflow::Data;
37use differential_dataflow::consolidation::{consolidate_from, consolidate_updates};
38use differential_dataflow::lattice::Lattice;
39use differential_dataflow::operators::arrange::arrangement::Arranged;
40use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
41use mz_ore::future::yield_now;
42use mz_repr::Diff;
43use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
44use timely::dataflow::channels::pact::Pipeline;
45use timely::dataflow::channels::pushers::Tee;
46use timely::dataflow::operators::generic::OutputHandleCore;
47use timely::dataflow::operators::{Capability, Operator};
48use timely::dataflow::{Scope, StreamCore};
49use timely::progress::timestamp::Timestamp;
50use timely::{Container, PartialOrder};
51use tracing::trace;
52
53use crate::render::context::ShutdownProbe;
54
55/// Joins two arranged collections with the same key type.
56///
57/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
58/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
59/// every value returned by the iterator.
60pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
61    arranged1: &Arranged<G, Tr1>,
62    arranged2: &Arranged<G, Tr2>,
63    shutdown_probe: ShutdownProbe,
64    result: L,
65    yield_fn: YFn,
66) -> StreamCore<G, C>
67where
68    G: Scope,
69    G::Timestamp: Lattice,
70    Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
71    Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
72        + Clone
73        + 'static,
74    L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
75    I: IntoIterator<Item: Data> + 'static,
76    YFn: Fn(Instant, usize) -> bool + 'static,
77    C: Container + SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
78{
79    let mut trace1 = arranged1.trace.clone();
80    let mut trace2 = arranged2.trace.clone();
81
82    arranged1.stream.binary_frontier(
83        &arranged2.stream,
84        Pipeline,
85        Pipeline,
86        "Join",
87        move |capability, info| {
88            let operator_id = info.global_id;
89
90            // Acquire an activator to reschedule the operator when it has unfinished work.
91            let activator = arranged1.stream.scope().activator_for(info.address);
92
93            // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
94            // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
95            // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
96            // initial work for the two traces, and before the operator is constructed.
97
98            // Acknowledged frontier for each input.
99            // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
100            // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
101            // the physical compaction frontier of their corresponding trace.
102            // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
103            use timely::progress::frontier::Antichain;
104            let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
105            let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
106
107            // deferred work of batches from each input.
108            let result_fn = Rc::new(RefCell::new(result));
109            let mut todo1 = Work::<<Tr1::Batch as BatchReader>::Cursor, Tr2::Cursor, _, _>::new(
110                Rc::clone(&result_fn),
111            );
112            let mut todo2 =
113                Work::<Tr1::Cursor, <Tr2::Batch as BatchReader>::Cursor, _, _>::new(result_fn);
114
115            // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
116            trace1.map_batches(|batch1| {
117                trace!(
118                    operator_id,
119                    input = 1,
120                    lower = ?batch1.lower().elements(),
121                    upper = ?batch1.upper().elements(),
122                    size = batch1.len(),
123                    "pre-loading batch",
124                );
125
126                acknowledged1.clone_from(batch1.upper());
127                // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
128                // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
129                // Once we start streaming batches in, we will need to respond to new batches from
130                // `input1` with logic that would have otherwise been here. Check out the next loop
131                // for the structure.
132            });
133            // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
134            // iterating through batches and capturing the upper bound. This is a great moment to assert that
135            // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
136            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
137            assert!(PartialOrder::less_equal(
138                &trace1.get_physical_compaction(),
139                &acknowledged1.borrow()
140            ));
141
142            trace!(
143                operator_id,
144                input = 1,
145                acknowledged1 = ?acknowledged1.elements(),
146                "pre-loading finished",
147            );
148
149            // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
150            // on both traces at the same time, as they could be the same trace and this would panic.
151            let mut batch2_cursors = Vec::new();
152            trace2.map_batches(|batch2| {
153                trace!(
154                    operator_id,
155                    input = 2,
156                    lower = ?batch2.lower().elements(),
157                    upper = ?batch2.upper().elements(),
158                    size = batch2.len(),
159                    "pre-loading batch",
160                );
161
162                acknowledged2.clone_from(batch2.upper());
163                batch2_cursors.push((batch2.cursor(), batch2.clone()));
164            });
165            // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
166            // iterating through batches and capturing the upper bound. This is a great moment to assert that
167            // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
168            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
169            assert!(PartialOrder::less_equal(
170                &trace2.get_physical_compaction(),
171                &acknowledged2.borrow()
172            ));
173
174            // Load up deferred work using trace2 cursors and batches captured just above.
175            for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
176                trace!(
177                    operator_id,
178                    input = 2,
179                    acknowledged1 = ?acknowledged1.elements(),
180                    "deferring work for batch",
181                );
182
183                // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
184                let (trace1_cursor, trace1_storage) =
185                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
186                // We could downgrade the capability here, but doing so is a bit complicated mathematically.
187                // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
188                // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
189                // that property.
190                todo2.push(
191                    trace1_cursor,
192                    trace1_storage,
193                    batch2_cursor,
194                    batch2.clone(),
195                    capability.clone(),
196                );
197            }
198
199            trace!(
200                operator_id,
201                input = 2,
202                acknowledged2 = ?acknowledged2.elements(),
203                "pre-loading finished",
204            );
205
206            // Droppable handles to shared trace data structures.
207            let mut trace1_option = Some(trace1);
208            let mut trace2_option = Some(trace2);
209
210            move |input1, input2, output| {
211                // If the dataflow is shutting down, discard all existing and future work.
212                if shutdown_probe.in_shutdown() {
213                    trace!(operator_id, "shutting down");
214
215                    // Discard data at the inputs.
216                    input1.for_each(|_cap, _data| ());
217                    input2.for_each(|_cap, _data| ());
218
219                    // Discard queued work.
220                    todo1.discard();
221                    todo2.discard();
222
223                    // Stop holding on to input traces.
224                    trace1_option = None;
225                    trace2_option = None;
226
227                    return;
228                }
229
230                // 1. Consuming input.
231                //
232                // The join computation repeatedly accepts batches of updates from each of its inputs.
233                //
234                // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
235                // updates from its other input. It is important to track which updates have been accepted, because
236                // we use a shared trace and there may be updates present that are in advance of this accepted bound.
237                //
238                // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
239                // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
240                // This last case is a consequence of our inability to transmit empty batches, as they may be formed
241                // in the absence of timely dataflow capabilities.
242
243                // Drain input 1, prepare work.
244                input1.for_each(|capability, data| {
245                    let trace2 = trace2_option
246                        .as_mut()
247                        .expect("we only drop a trace in response to the other input emptying");
248                    let capability = capability.retain();
249                    for batch1 in data.drain(..) {
250                        // Ignore any pre-loaded data.
251                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
252                            trace!(
253                                operator_id,
254                                input = 1,
255                                lower = ?batch1.lower().elements(),
256                                upper = ?batch1.upper().elements(),
257                                size = batch1.len(),
258                                "loading batch",
259                            );
260
261                            if !batch1.is_empty() {
262                                trace!(
263                                    operator_id,
264                                    input = 1,
265                                    acknowledged2 = ?acknowledged2.elements(),
266                                    "deferring work for batch",
267                                );
268
269                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
270                                // at start-up, and have held back physical compaction ever since.
271                                let (trace2_cursor, trace2_storage) =
272                                    trace2.cursor_through(acknowledged2.borrow()).unwrap();
273                                let batch1_cursor = batch1.cursor();
274                                todo1.push(
275                                    batch1_cursor,
276                                    batch1.clone(),
277                                    trace2_cursor,
278                                    trace2_storage,
279                                    capability.clone(),
280                                );
281                            }
282
283                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
284                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
285                            // able to just assume the most recent `batch1.upper`
286                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
287                            acknowledged1.clone_from(batch1.upper());
288
289                            trace!(
290                                operator_id,
291                                input = 1,
292                                acknowledged1 = ?acknowledged1.elements(),
293                                "batch acknowledged",
294                            );
295                        }
296                    }
297                });
298
299                // Drain input 2, prepare work.
300                input2.for_each(|capability, data| {
301                    let trace1 = trace1_option
302                        .as_mut()
303                        .expect("we only drop a trace in response to the other input emptying");
304                    let capability = capability.retain();
305                    for batch2 in data.drain(..) {
306                        // Ignore any pre-loaded data.
307                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
308                            trace!(
309                                operator_id,
310                                input = 2,
311                                lower = ?batch2.lower().elements(),
312                                upper = ?batch2.upper().elements(),
313                                size = batch2.len(),
314                                "loading batch",
315                            );
316
317                            if !batch2.is_empty() {
318                                trace!(
319                                    operator_id,
320                                    input = 2,
321                                    acknowledged1 = ?acknowledged1.elements(),
322                                    "deferring work for batch",
323                                );
324
325                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
326                                // at start-up, and have held back physical compaction ever since.
327                                let (trace1_cursor, trace1_storage) =
328                                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
329                                let batch2_cursor = batch2.cursor();
330                                todo2.push(
331                                    trace1_cursor,
332                                    trace1_storage,
333                                    batch2_cursor,
334                                    batch2.clone(),
335                                    capability.clone(),
336                                );
337                            }
338
339                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
340                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
341                            // able to just assume the most recent `batch2.upper`
342                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
343                            acknowledged2.clone_from(batch2.upper());
344
345                            trace!(
346                                operator_id,
347                                input = 2,
348                                acknowledged2 = ?acknowledged2.elements(),
349                                "batch acknowledged",
350                            );
351                        }
352                    }
353                });
354
355                // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
356                if let Some(trace1) = trace1_option.as_mut() {
357                    trace!(
358                        operator_id,
359                        input = 1,
360                        acknowledged1 = ?acknowledged1.elements(),
361                        "advancing trace upper",
362                    );
363                    trace1.advance_upper(&mut acknowledged1);
364                }
365                if let Some(trace2) = trace2_option.as_mut() {
366                    trace!(
367                        operator_id,
368                        input = 2,
369                        acknowledged2 = ?acknowledged2.elements(),
370                        "advancing trace upper",
371                    );
372                    trace2.advance_upper(&mut acknowledged2);
373                }
374
375                // 2. Join computation.
376                //
377                // For each of the inputs, we do some amount of work (measured in terms of number
378                // of output records produced). This is meant to yield control to allow downstream
379                // operators to consume and reduce the output, but it it also means to provide some
380                // degree of responsiveness. There is a potential risk here that if we fall behind
381                // then the increasing queues hold back physical compaction of the underlying traces
382                // which results in unintentionally quadratic processing time (each batch of either
383                // input must scan all batches from the other input).
384
385                // Perform some amount of outstanding work for input 1.
386                trace!(
387                    operator_id,
388                    input = 1,
389                    work_left = todo1.remaining(),
390                    "starting work"
391                );
392                todo1.process(output, &yield_fn);
393                trace!(
394                    operator_id,
395                    input = 1,
396                    work_left = todo1.remaining(),
397                    "ceasing work",
398                );
399
400                // Perform some amount of outstanding work for input 2.
401                trace!(
402                    operator_id,
403                    input = 2,
404                    work_left = todo2.remaining(),
405                    "starting work"
406                );
407                todo2.process(output, &yield_fn);
408                trace!(
409                    operator_id,
410                    input = 2,
411                    work_left = todo2.remaining(),
412                    "ceasing work",
413                );
414
415                // Re-activate operator if work remains.
416                if !todo1.is_empty() || !todo2.is_empty() {
417                    activator.activate();
418                }
419
420                // 3. Trace maintenance.
421                //
422                // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
423                // the progress of an input, because should we ever drop one of the traces we will
424                // lose the ability to extract information from anything other than the input.
425                // For example, if we dropped `trace2` we would not be able to use `advance_upper`
426                // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
427                // compaction of `trace1`.
428
429                // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
430                if let Some(trace1) = trace1_option.as_mut() {
431                    if input2.frontier().is_empty() {
432                        trace!(operator_id, input = 1, "dropping trace handle");
433                        trace1_option = None;
434                    } else {
435                        trace!(
436                            operator_id,
437                            input = 1,
438                            logical = ?*input2.frontier().frontier(),
439                            physical = ?acknowledged1.elements(),
440                            "advancing trace compaction",
441                        );
442
443                        // Allow `trace1` to compact logically up to the frontier we may yet receive,
444                        // in the opposing input (`input2`). All `input2` times will be beyond this
445                        // frontier, and joined times only need to be accurate when advanced to it.
446                        trace1.set_logical_compaction(input2.frontier().frontier());
447                        // Allow `trace1` to compact physically up to the upper bound of batches we
448                        // have received in its input (`input1`). We will not require a cursor that
449                        // is not beyond this bound.
450                        trace1.set_physical_compaction(acknowledged1.borrow());
451                    }
452                }
453
454                // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
455                if let Some(trace2) = trace2_option.as_mut() {
456                    if input1.frontier().is_empty() {
457                        trace!(operator_id, input = 2, "dropping trace handle");
458                        trace2_option = None;
459                    } else {
460                        trace!(
461                            operator_id,
462                            input = 2,
463                            logical = ?*input1.frontier().frontier(),
464                            physical = ?acknowledged2.elements(),
465                            "advancing trace compaction",
466                        );
467
468                        // Allow `trace2` to compact logically up to the frontier we may yet receive,
469                        // in the opposing input (`input1`). All `input1` times will be beyond this
470                        // frontier, and joined times only need to be accurate when advanced to it.
471                        trace2.set_logical_compaction(input1.frontier().frontier());
472                        // Allow `trace2` to compact physically up to the upper bound of batches we
473                        // have received in its input (`input2`). We will not require a cursor that
474                        // is not beyond this bound.
475                        trace2.set_physical_compaction(acknowledged2.borrow());
476                    }
477                }
478            }
479        },
480    )
481}
482
483/// Work collected by the join operator.
484///
485/// The join operator enqueues new work here first, and then processes it at a controlled rate,
486/// potentially yielding control to the Timely runtime in between. This allows it to avoid OOMs,
487/// caused by buffering massive amounts of data at the output, and loss of interactivity.
488///
489/// Collected work can be reduced by calling the `process` method.
490struct Work<C1, C2, D, L>
491where
492    C1: Cursor,
493    C2: Cursor,
494{
495    /// Pending work.
496    todo: VecDeque<(Pin<Box<dyn Future<Output = ()>>>, Capability<C1::Time>)>,
497    /// A function that transforms raw join matches into join results.
498    result_fn: Rc<RefCell<L>>,
499    /// A buffer holding the join results.
500    ///
501    /// Written by the work futures, drained by `Work::process`.
502    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
503    /// The number of join results produced by work futures.
504    ///
505    /// Used with `yield_fn` to inform when `Work::process` should yield.
506    produced: Rc<Cell<usize>>,
507
508    _cursors: PhantomData<(C1, C2)>,
509}
510
511impl<C1, C2, D, L, I> Work<C1, C2, D, L>
512where
513    C1: Cursor<Diff = Diff> + 'static,
514    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff> + 'static,
515    D: Data,
516    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
517    I: IntoIterator<Item = D> + 'static,
518{
519    fn new(result_fn: Rc<RefCell<L>>) -> Self {
520        Self {
521            todo: Default::default(),
522            result_fn,
523            output: Default::default(),
524            produced: Default::default(),
525            _cursors: PhantomData,
526        }
527    }
528
529    /// Return the amount of remaining work chunks.
530    fn remaining(&self) -> usize {
531        self.todo.len()
532    }
533
534    /// Return whether there is any work pending.
535    fn is_empty(&self) -> bool {
536        self.remaining() == 0
537    }
538
539    /// Append some pending work.
540    fn push(
541        &mut self,
542        cursor1: C1,
543        storage1: C1::Storage,
544        cursor2: C2,
545        storage2: C2::Storage,
546        capability: Capability<C1::Time>,
547    ) {
548        let fut = self.start_work(
549            cursor1,
550            storage1,
551            cursor2,
552            storage2,
553            capability.time().clone(),
554        );
555
556        self.todo.push_back((Box::pin(fut), capability));
557    }
558
559    /// Discard all pending work.
560    fn discard(&mut self) {
561        self.todo = Default::default();
562    }
563
564    /// Process pending work until none is remaining or `yield_fn` requests a yield.
565    fn process<C, YFn>(
566        &mut self,
567        output: &mut OutputHandleCore<C1::Time, CapacityContainerBuilder<C>, Tee<C1::Time, C>>,
568        yield_fn: YFn,
569    ) where
570        C: Container + SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
571        YFn: Fn(Instant, usize) -> bool,
572    {
573        let start_time = Instant::now();
574        self.produced.set(0);
575
576        let waker = futures::task::noop_waker();
577        let mut ctx = std::task::Context::from_waker(&waker);
578
579        while let Some((mut fut, cap)) = self.todo.pop_front() {
580            // Drive the work future until it's done or it's time to yield.
581            let mut done = false;
582            let mut should_yield = false;
583            while !done && !should_yield {
584                done = fut.as_mut().poll(&mut ctx).is_ready();
585                should_yield = yield_fn(start_time, self.produced.get());
586            }
587
588            // Drain the produced join results.
589            let mut output_buf = self.output.borrow_mut();
590
591            // Consolidating here is important when the join closure produces data that
592            // consolidates well, for example when projecting columns.
593            let old_len = output_buf.len();
594            consolidate_updates(&mut output_buf);
595            let recovered = old_len - output_buf.len();
596            self.produced.update(|x| x - recovered);
597
598            output.session(&cap).give_iterator(output_buf.drain(..));
599
600            if done {
601                // We have finished processing a chunk of work. Use this opportunity to truncate
602                // the output buffer, so we don't keep excess memory allocated forever.
603                *output_buf = Default::default();
604            } else if !done {
605                // Still work to do in this chunk.
606                self.todo.push_front((fut, cap));
607            }
608
609            if should_yield {
610                break;
611            }
612        }
613    }
614
615    /// Start the work of joining the updates produced by the given cursors.
616    ///
617    /// This method returns a `Future` that can be polled to make progress on the join work.
618    /// Returning a future allows us to implement the logic using async/await syntax where we can
619    /// conveniently pause the work at any point by calling `yield_now().await`. We are allowed to
620    /// hold references across yield points, which is something we wouldn't get with a hand-rolled
621    /// state machine implementation.
622    fn start_work(
623        &self,
624        mut cursor1: C1,
625        storage1: C1::Storage,
626        mut cursor2: C2,
627        storage2: C2::Storage,
628        meet: C1::Time,
629    ) -> impl Future<Output = ()> + use<C1, C2, D, L, I> {
630        let result_fn = Rc::clone(&self.result_fn);
631        let output = Rc::clone(&self.output);
632        let produced = Rc::clone(&self.produced);
633
634        async move {
635            let mut joiner = Joiner::new(result_fn, output, produced, meet);
636
637            while let Some(key1) = cursor1.get_key(&storage1)
638                && let Some(key2) = cursor2.get_key(&storage2)
639            {
640                match key1.cmp(&key2) {
641                    Ordering::Less => cursor1.seek_key(&storage1, key2),
642                    Ordering::Greater => cursor2.seek_key(&storage2, key1),
643                    Ordering::Equal => {
644                        joiner
645                            .join_key(key1, &mut cursor1, &storage1, &mut cursor2, &storage2)
646                            .await;
647
648                        cursor1.step_key(&storage1);
649                        cursor2.step_key(&storage2);
650                    }
651                }
652            }
653        }
654    }
655}
656
657/// Type that knows how to perform the core join logic.
658///
659/// The joiner implements two join strategies:
660///
661///  * The "simple" strategy produces a match for each combination of (val1, time1, val2, time2)
662///    found in the inputs. If there are multiple times in the input, it may produce matches for
663///    times in which one of the values wasn't present. These matches cancel each other out, so the
664///    result ends up correct.
665///  * The "linear scan over times" strategy sorts the input data by time and then steps through
666///    the input histories, producing matches for a pair of values only if both values where
667///    present at the same time.
668///
669/// The linear scan strategy avoids redundant work and is much more efficient than the simple
670/// strategy when many distinct times are present in the inputs. However, sorting the input data
671/// incurs some overhead, so we still prefer the simple variant when the input data is small.
672struct Joiner<'a, C1, C2, D, L>
673where
674    C1: Cursor,
675    C2: Cursor,
676{
677    /// A function that transforms raw join matches into join results.
678    result_fn: Rc<RefCell<L>>,
679    /// A buffer holding the join results.
680    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
681    /// The number of join results produced.
682    produced: Rc<Cell<usize>>,
683    /// A time to which all join results should be advanced.
684    meet: C1::Time,
685
686    /// Buffer for edit histories from the first input.
687    history1: ValueHistory<'a, C1>,
688    /// Buffer for edit histories from the second input.
689    history2: ValueHistory<'a, C2>,
690}
691
692impl<'a, C1, C2, D, L, I> Joiner<'a, C1, C2, D, L>
693where
694    C1: Cursor<Diff = Diff>,
695    C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
696    D: Data,
697    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
698    I: IntoIterator<Item = D> + 'static,
699{
700    fn new(
701        result_fn: Rc<RefCell<L>>,
702        output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
703        produced: Rc<Cell<usize>>,
704        meet: C1::Time,
705    ) -> Self {
706        Self {
707            result_fn,
708            output,
709            produced,
710            meet,
711            history1: ValueHistory::new(),
712            history2: ValueHistory::new(),
713        }
714    }
715
716    /// Produce matches for the values of a single key.
717    async fn join_key(
718        &mut self,
719        key: C1::Key<'_>,
720        cursor1: &mut C1,
721        storage1: &'a C1::Storage,
722        cursor2: &mut C2,
723        storage2: &'a C2::Storage,
724    ) {
725        self.history1.edits.load(cursor1, storage1, &self.meet);
726        self.history2.edits.load(cursor2, storage2, &self.meet);
727
728        // If the input data is small, use the simple strategy.
729        //
730        // TODO: This conditional is taken directly from DD. We should check if it might make sense
731        //       to do something different, like using the simple strategy always when the number
732        //       of distinct times is small.
733        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
734            self.join_key_simple(key);
735            yield_now().await;
736        } else {
737            self.join_key_linear_time_scan(key).await;
738        }
739    }
740
741    /// Produce matches for the values of a single key, using the simple strategy.
742    ///
743    /// This strategy is only meant to be used for small inputs, so we don't bother including yield
744    /// points or optimizations.
745    fn join_key_simple(&self, key: C1::Key<'_>) {
746        let mut result_fn = self.result_fn.borrow_mut();
747        let mut output = self.output.borrow_mut();
748
749        for (v1, t1, r1) in self.history1.edits.iter() {
750            for (v2, t2, r2) in self.history2.edits.iter() {
751                for data in result_fn(key, v1, v2) {
752                    output.push((data, t1.join(t2), r1 * r2));
753                    self.produced.update(|x| x + 1);
754                }
755            }
756        }
757    }
758
759    /// Produce matches for the values of a single key, using a linear scan through times.
760    async fn join_key_linear_time_scan(&mut self, key: C1::Key<'_>) {
761        let history1 = &mut self.history1;
762        let history2 = &mut self.history2;
763
764        history1.replay();
765        history2.replay();
766
767        // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
768        //       in here. If a time is ever repeated, for example, the call will be identical
769        //       and accomplish nothing. If only a single record has been added, it may not
770        //       be worth the time to collapse (advance, re-sort) the data when a linear scan
771        //       is sufficient.
772
773        // Join the next entry in `history1`.
774        let work_history1 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
775            let mut result_fn = self.result_fn.borrow_mut();
776            let mut output = self.output.borrow_mut();
777
778            let (t1, meet, v1, r1) = history1.get().unwrap();
779            history2.advance_past_by(meet);
780            for &(v2, ref t2, r2) in &history2.past {
781                for data in result_fn(key, v1, v2) {
782                    output.push((data, t1.join(t2), r1 * r2));
783                    self.produced.update(|x| x + 1);
784                }
785            }
786            history1.step();
787        };
788
789        // Join the next entry in `history2`.
790        let work_history2 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
791            let mut result_fn = self.result_fn.borrow_mut();
792            let mut output = self.output.borrow_mut();
793
794            let (t2, meet, v2, r2) = history2.get().unwrap();
795            history1.advance_past_by(meet);
796            for &(v1, ref t1, r1) in &history1.past {
797                for data in result_fn(key, v1, v2) {
798                    output.push((data, t1.join(t2), r1 * r2));
799                    self.produced.update(|x| x + 1);
800                }
801            }
802            history2.step();
803        };
804
805        while let Some(time1) = history1.get_time()
806            && let Some(time2) = history2.get_time()
807        {
808            if time1 < time2 {
809                work_history1(history1, history2)
810            } else {
811                work_history2(history1, history2)
812            };
813            yield_now().await;
814        }
815
816        while !history1.is_empty() {
817            work_history1(history1, history2);
818            yield_now().await;
819        }
820        while !history2.is_empty() {
821            work_history2(history1, history2);
822            yield_now().await;
823        }
824    }
825}
826
827/// An accumulation of (value, time, diff) updates.
828///
829/// Deduplicated values are stored in `values`. Each entry includes the end index of the
830/// corresponding range in `edits`. The edits stored for a value are consolidated.
831struct EditList<'a, C: Cursor> {
832    values: Vec<(C::Val<'a>, usize)>,
833    edits: Vec<(C::Time, Diff)>,
834}
835
836impl<'a, C> EditList<'a, C>
837where
838    C: Cursor<Diff = Diff>,
839{
840    fn len(&self) -> usize {
841        self.edits.len()
842    }
843
844    /// Load the updates in the given cursor.
845    ///
846    /// Steps over values, but not over keys.
847    fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: &C::Time) {
848        self.values.clear();
849        self.edits.clear();
850
851        let mut edit_idx = 0;
852        while let Some(value) = cursor.get_val(storage) {
853            cursor.map_times(storage, |time, diff| {
854                let mut time = C::owned_time(time);
855                time.join_assign(meet);
856                self.edits.push((time, C::owned_diff(diff)));
857            });
858
859            consolidate_from(&mut self.edits, edit_idx);
860
861            if self.edits.len() > edit_idx {
862                edit_idx = self.edits.len();
863                self.values.push((value, edit_idx));
864            }
865
866            cursor.step_val(storage);
867        }
868    }
869
870    /// Iterate over the contained updates.
871    fn iter(&self) -> impl Iterator<Item = (C::Val<'a>, &C::Time, Diff)> {
872        self.values
873            .iter()
874            .enumerate()
875            .flat_map(|(idx, (value, end))| {
876                let start = if idx == 0 { 0 } else { self.values[idx - 1].1 };
877                let edits = &self.edits[start..*end];
878                edits.iter().map(|(time, diff)| (*value, time, *diff))
879            })
880    }
881}
882
883/// A history for replaying updates in time order.
884struct ValueHistory<'a, C: Cursor> {
885    /// Unsorted updates to replay.
886    edits: EditList<'a, C>,
887    /// Time-sorted updates that have not been stepped over yet.
888    ///
889    /// Entries are (time, meet, value_idx, diff).
890    future: Vec<(C::Time, C::Time, usize, Diff)>,
891    /// Rolled-up updates that have been stepped over.
892    past: Vec<(C::Val<'a>, C::Time, Diff)>,
893}
894
895impl<'a, C> ValueHistory<'a, C>
896where
897    C: Cursor,
898{
899    /// Create a new empty `ValueHistory`.
900    fn new() -> Self {
901        Self {
902            edits: EditList {
903                values: Default::default(),
904                edits: Default::default(),
905            },
906            future: Default::default(),
907            past: Default::default(),
908        }
909    }
910
911    /// Return whether there are updates left to step over.
912    fn is_empty(&self) -> bool {
913        self.future.is_empty()
914    }
915
916    /// Return the next update.
917    fn get(&self) -> Option<(&C::Time, &C::Time, C::Val<'a>, Diff)> {
918        self.future.last().map(|(t, m, v, r)| {
919            let (value, _) = self.edits.values[*v];
920            (t, m, value, *r)
921        })
922    }
923
924    /// Return the time of the next update.
925    fn get_time(&self) -> Option<&C::Time> {
926        self.future.last().map(|(t, _, _, _)| t)
927    }
928
929    /// Populate `future` with the updates stored in `edits`.
930    fn replay(&mut self) {
931        self.future.clear();
932        self.past.clear();
933
934        let values = &self.edits.values;
935        let edits = &self.edits.edits;
936        for (idx, (_, end)) in values.iter().enumerate() {
937            let start = if idx == 0 { 0 } else { values[idx - 1].1 };
938            for edit_idx in start..*end {
939                let (time, diff) = &edits[edit_idx];
940                self.future.push((time.clone(), time.clone(), idx, *diff));
941            }
942        }
943
944        self.future.sort_by(|x, y| y.cmp(x));
945
946        for idx in 1..self.future.len() {
947            self.future[idx].1 = self.future[idx].1.meet(&self.future[idx - 1].1);
948        }
949    }
950
951    /// Advance the history by moving the next entry from `future` into `past`.
952    fn step(&mut self) {
953        let (time, _, value_idx, diff) = self.future.pop().unwrap();
954        let (value, _) = self.edits.values[value_idx];
955        self.past.push((value, time, diff));
956    }
957
958    /// Advance all times in `past` by `meet`.
959    fn advance_past_by(&mut self, meet: &C::Time) {
960        for (_, time, _) in &mut self.past {
961            time.join_assign(meet);
962        }
963        consolidate_updates(&mut self.past);
964    }
965}