Skip to main content

mz_compute/render/join/
mz_join_core.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//!  * Differential's `JoinCore::join_core`
15//!  * A Materialize fork thereof, called `mz_join_core`
16//!
17//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
18//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
19//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
20//! control for multiple seconds or longer, which in turn causes degraded user experience.
21//! `mz_join_core` resolves the loss-of-interactivity issue by also yielding within keys.
22//!
23//! For the moment, we keep both implementations around, selectable through feature flags.
24//! Eventually, we hope that `mz_join_core` proves itself sufficiently to become the only join
25//! implementation.
26
27use std::cell::Cell;
28use std::cell::RefCell;
29use std::cmp::Ordering;
30use std::collections::VecDeque;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::rc::Rc;
34use std::time::Instant;
35
36use differential_dataflow::Data;
37use differential_dataflow::consolidation::{consolidate_from, consolidate_updates};
38use differential_dataflow::lattice::Lattice;
39use differential_dataflow::operators::arrange::arrangement::Arranged;
40use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
41use mz_ore::future::yield_now;
42use mz_repr::Diff;
43use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
44use timely::dataflow::channels::pact::Pipeline;
45use timely::dataflow::operators::generic::OutputBuilderSession;
46use timely::dataflow::operators::{Capability, Operator};
47use timely::dataflow::{Scope, Stream};
48use timely::progress::timestamp::Timestamp;
49use timely::{Container, PartialOrder};
50use tracing::trace;
51
52/// Joins two arranged collections with the same key type.
53///
54/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
55/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
56/// every value returned by the iterator.
57pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
58    arranged1: Arranged<G, Tr1>,
59    arranged2: Arranged<G, Tr2>,
60    result: L,
61    yield_fn: YFn,
62) -> Stream<G, C>
63where
64    G: Scope,
65    G::Timestamp: Lattice,
66    Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
67    Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
68        + Clone
69        + 'static,
70    L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
71    I: IntoIterator<Item: Data> + 'static,
72    YFn: Fn(Instant, usize) -> bool + 'static,
73    C: Container + SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
74{
75    let scope = arranged1.stream.scope();
76    let mut trace1 = arranged1.trace.clone();
77    let mut trace2 = arranged2.trace.clone();
78
79    arranged1.stream.binary_frontier(
80        arranged2.stream,
81        Pipeline,
82        Pipeline,
83        "Join",
84        move |capability, info| {
85            let operator_id = info.global_id;
86
87            // Acquire an activator to reschedule the operator when it has unfinished work.
88            let activator = scope.activator_for(info.address);
89
90            // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
91            // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
92            // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
93            // initial work for the two traces, and before the operator is constructed.
94
95            // Acknowledged frontier for each input.
96            // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
97            // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
98            // the physical compaction frontier of their corresponding trace.
99            // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
100            use timely::progress::frontier::Antichain;
101            let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
102            let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
103
104            // deferred work of batches from each input.
105            let result_fn = Rc::new(RefCell::new(result));
106            let mut todo1 = Work::<<Tr1::Batch as BatchReader>::Cursor, Tr2::Cursor, _, _>::new(
107                Rc::clone(&result_fn),
108            );
109            let mut todo2 =
110                Work::<Tr1::Cursor, <Tr2::Batch as BatchReader>::Cursor, _, _>::new(result_fn);
111
112            // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
113            trace1.map_batches(|batch1| {
114                trace!(
115                    operator_id,
116                    input = 1,
117                    lower = ?batch1.lower().elements(),
118                    upper = ?batch1.upper().elements(),
119                    size = batch1.len(),
120                    "pre-loading batch",
121                );
122
123                acknowledged1.clone_from(batch1.upper());
124                // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
125                // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
126                // Once we start streaming batches in, we will need to respond to new batches from
127                // `input1` with logic that would have otherwise been here. Check out the next loop
128                // for the structure.
129            });
130            // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
131            // iterating through batches and capturing the upper bound. This is a great moment to assert that
132            // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
133            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
134            assert!(PartialOrder::less_equal(
135                &trace1.get_physical_compaction(),
136                &acknowledged1.borrow()
137            ));
138
139            trace!(
140                operator_id,
141                input = 1,
142                acknowledged1 = ?acknowledged1.elements(),
143                "pre-loading finished",
144            );
145
146            // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
147            // on both traces at the same time, as they could be the same trace and this would panic.
148            let mut batch2_cursors = Vec::new();
149            trace2.map_batches(|batch2| {
150                trace!(
151                    operator_id,
152                    input = 2,
153                    lower = ?batch2.lower().elements(),
154                    upper = ?batch2.upper().elements(),
155                    size = batch2.len(),
156                    "pre-loading batch",
157                );
158
159                acknowledged2.clone_from(batch2.upper());
160                batch2_cursors.push((batch2.cursor(), batch2.clone()));
161            });
162            // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
163            // iterating through batches and capturing the upper bound. This is a great moment to assert that
164            // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
165            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
166            assert!(PartialOrder::less_equal(
167                &trace2.get_physical_compaction(),
168                &acknowledged2.borrow()
169            ));
170
171            // Load up deferred work using trace2 cursors and batches captured just above.
172            for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
173                trace!(
174                    operator_id,
175                    input = 2,
176                    acknowledged1 = ?acknowledged1.elements(),
177                    "deferring work for batch",
178                );
179
180                // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
181                let (trace1_cursor, trace1_storage) =
182                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
183                // We could downgrade the capability here, but doing so is a bit complicated mathematically.
184                // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
185                // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
186                // that property.
187                todo2.push(
188                    trace1_cursor,
189                    trace1_storage,
190                    batch2_cursor,
191                    batch2.clone(),
192                    capability.clone(),
193                );
194            }
195
196            trace!(
197                operator_id,
198                input = 2,
199                acknowledged2 = ?acknowledged2.elements(),
200                "pre-loading finished",
201            );
202
203            // Droppable handles to shared trace data structures.
204            let mut trace1_option = Some(trace1);
205            let mut trace2_option = Some(trace2);
206
207            move |(input1, frontier1), (input2, frontier2), output| {
208                // 1. Consuming input.
209                //
210                // The join computation repeatedly accepts batches of updates from each of its inputs.
211                //
212                // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
213                // updates from its other input. It is important to track which updates have been accepted, because
214                // we use a shared trace and there may be updates present that are in advance of this accepted bound.
215                //
216                // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
217                // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
218                // This last case is a consequence of our inability to transmit empty batches, as they may be formed
219                // in the absence of timely dataflow capabilities.
220
221                // Drain input 1, prepare work.
222                input1.for_each(|capability, data| {
223                    let trace2 = trace2_option
224                        .as_mut()
225                        .expect("we only drop a trace in response to the other input emptying");
226                    let capability = capability.retain(0);
227                    for batch1 in data.drain(..) {
228                        // Ignore any pre-loaded data.
229                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
230                            trace!(
231                                operator_id,
232                                input = 1,
233                                lower = ?batch1.lower().elements(),
234                                upper = ?batch1.upper().elements(),
235                                size = batch1.len(),
236                                "loading batch",
237                            );
238
239                            if !batch1.is_empty() {
240                                trace!(
241                                    operator_id,
242                                    input = 1,
243                                    acknowledged2 = ?acknowledged2.elements(),
244                                    "deferring work for batch",
245                                );
246
247                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
248                                // at start-up, and have held back physical compaction ever since.
249                                let (trace2_cursor, trace2_storage) =
250                                    trace2.cursor_through(acknowledged2.borrow()).unwrap();
251                                let batch1_cursor = batch1.cursor();
252                                todo1.push(
253                                    batch1_cursor,
254                                    batch1.clone(),
255                                    trace2_cursor,
256                                    trace2_storage,
257                                    capability.clone(),
258                                );
259                            }
260
261                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
262                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
263                            // able to just assume the most recent `batch1.upper`
264                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
265                            acknowledged1.clone_from(batch1.upper());
266
267                            trace!(
268                                operator_id,
269                                input = 1,
270                                acknowledged1 = ?acknowledged1.elements(),
271                                "batch acknowledged",
272                            );
273                        }
274                    }
275                });
276
277                // Drain input 2, prepare work.
278                input2.for_each(|capability, data| {
279                    let trace1 = trace1_option
280                        .as_mut()
281                        .expect("we only drop a trace in response to the other input emptying");
282                    let capability = capability.retain(0);
283                    for batch2 in data.drain(..) {
284                        // Ignore any pre-loaded data.
285                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
286                            trace!(
287                                operator_id,
288                                input = 2,
289                                lower = ?batch2.lower().elements(),
290                                upper = ?batch2.upper().elements(),
291                                size = batch2.len(),
292                                "loading batch",
293                            );
294
295                            if !batch2.is_empty() {
296                                trace!(
297                                    operator_id,
298                                    input = 2,
299                                    acknowledged1 = ?acknowledged1.elements(),
300                                    "deferring work for batch",
301                                );
302
303                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
304                                // at start-up, and have held back physical compaction ever since.
305                                let (trace1_cursor, trace1_storage) =
306                                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
307                                let batch2_cursor = batch2.cursor();
308                                todo2.push(
309                                    trace1_cursor,
310                                    trace1_storage,
311                                    batch2_cursor,
312                                    batch2.clone(),
313                                    capability.clone(),
314                                );
315                            }
316
317                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
318                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
319                            // able to just assume the most recent `batch2.upper`
320                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
321                            acknowledged2.clone_from(batch2.upper());
322
323                            trace!(
324                                operator_id,
325                                input = 2,
326                                acknowledged2 = ?acknowledged2.elements(),
327                                "batch acknowledged",
328                            );
329                        }
330                    }
331                });
332
333                // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
334                if let Some(trace1) = trace1_option.as_mut() {
335                    trace!(
336                        operator_id,
337                        input = 1,
338                        acknowledged1 = ?acknowledged1.elements(),
339                        "advancing trace upper",
340                    );
341                    trace1.advance_upper(&mut acknowledged1);
342                }
343                if let Some(trace2) = trace2_option.as_mut() {
344                    trace!(
345                        operator_id,
346                        input = 2,
347                        acknowledged2 = ?acknowledged2.elements(),
348                        "advancing trace upper",
349                    );
350                    trace2.advance_upper(&mut acknowledged2);
351                }
352
353                // 2. Join computation.
354                //
355                // For each of the inputs, we do some amount of work (measured in terms of number
356                // of output records produced). This is meant to yield control to allow downstream
357                // operators to consume and reduce the output, but it it also means to provide some
358                // degree of responsiveness. There is a potential risk here that if we fall behind
359                // then the increasing queues hold back physical compaction of the underlying traces
360                // which results in unintentionally quadratic processing time (each batch of either
361                // input must scan all batches from the other input).
362
363                // Perform some amount of outstanding work for input 1.
364                trace!(
365                    operator_id,
366                    input = 1,
367                    work_left = todo1.remaining(),
368                    "starting work"
369                );
370                todo1.process(output, &yield_fn);
371                trace!(
372                    operator_id,
373                    input = 1,
374                    work_left = todo1.remaining(),
375                    "ceasing work",
376                );
377
378                // Perform some amount of outstanding work for input 2.
379                trace!(
380                    operator_id,
381                    input = 2,
382                    work_left = todo2.remaining(),
383                    "starting work"
384                );
385                todo2.process(output, &yield_fn);
386                trace!(
387                    operator_id,
388                    input = 2,
389                    work_left = todo2.remaining(),
390                    "ceasing work",
391                );
392
393                // Re-activate operator if work remains.
394                if !todo1.is_empty() || !todo2.is_empty() {
395                    activator.activate();
396                }
397
398                // 3. Trace maintenance.
399                //
400                // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
401                // the progress of an input, because should we ever drop one of the traces we will
402                // lose the ability to extract information from anything other than the input.
403                // For example, if we dropped `trace2` we would not be able to use `advance_upper`
404                // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
405                // compaction of `trace1`.
406
407                // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
408                if let Some(trace1) = trace1_option.as_mut() {
409                    if frontier2.is_empty() {
410                        trace!(operator_id, input = 1, "dropping trace handle");
411                        trace1_option = None;
412                    } else {
413                        trace!(
414                            operator_id,
415                            input = 1,
416                            logical = ?*frontier2.frontier(),
417                            physical = ?acknowledged1.elements(),
418                            "advancing trace compaction",
419                        );
420
421                        // Allow `trace1` to compact logically up to the frontier we may yet receive,
422                        // in the opposing input (`input2`). All `input2` times will be beyond this
423                        // frontier, and joined times only need to be accurate when advanced to it.
424                        trace1.set_logical_compaction(frontier2.frontier());
425                        // Allow `trace1` to compact physically up to the upper bound of batches we
426                        // have received in its input (`input1`). We will not require a cursor that
427                        // is not beyond this bound.
428                        trace1.set_physical_compaction(acknowledged1.borrow());
429                    }
430                }
431
432                // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
433                if let Some(trace2) = trace2_option.as_mut() {
434                    if frontier1.is_empty() {
435                        trace!(operator_id, input = 2, "dropping trace handle");
436                        trace2_option = None;
437                    } else {
438                        trace!(
439                            operator_id,
440                            input = 2,
441                            logical = ?*frontier1.frontier(),
442                            physical = ?acknowledged2.elements(),
443                            "advancing trace compaction",
444                        );
445
446                        // Allow `trace2` to compact logically up to the frontier we may yet receive,
447                        // in the opposing input (`input1`). All `input1` times will be beyond this
448                        // frontier, and joined times only need to be accurate when advanced to it.
449                        trace2.set_logical_compaction(frontier1.frontier());
450                        // Allow `trace2` to compact physically up to the upper bound of batches we
451                        // have received in its input (`input2`). We will not require a cursor that
452                        // is not beyond this bound.
453                        trace2.set_physical_compaction(acknowledged2.borrow());
454                    }
455                }
456            }
457        },
458    )
459}
460
461/// Work collected by the join operator.
462///
463/// The join operator enqueues new work here first, and then processes it at a controlled rate,
464/// potentially yielding control to the Timely runtime in between. This allows it to avoid OOMs,
465/// caused by buffering massive amounts of data at the output, and loss of interactivity.
466///
467/// Collected work can be reduced by calling the `process` method.
468struct Work<C1, C2, D, L>
469where
470    C1: Cursor,
471    C2: Cursor,
472{
473    /// Pending work.
474    todo: VecDeque<(Pin<Box<dyn Future<Output = ()>>>, Capability<C1::Time>)>,
475    /// A function that transforms raw join matches into join results.
476    result_fn: Rc<RefCell<L>>,
477    /// A buffer holding the join results.
478    ///
479    /// Written by the work futures, drained by `Work::process`.
480    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
481    /// The number of join results produced by work futures.
482    ///
483    /// Used with `yield_fn` to inform when `Work::process` should yield.
484    produced: Rc<Cell<usize>>,
485
486    _cursors: PhantomData<(C1, C2)>,
487}
488
489impl<C1, C2, D, L, I> Work<C1, C2, D, L>
490where
491    C1: Cursor<Diff = Diff> + 'static,
492    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff> + 'static,
493    D: Data,
494    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
495    I: IntoIterator<Item = D> + 'static,
496{
497    fn new(result_fn: Rc<RefCell<L>>) -> Self {
498        Self {
499            todo: Default::default(),
500            result_fn,
501            output: Default::default(),
502            produced: Default::default(),
503            _cursors: PhantomData,
504        }
505    }
506
507    /// Return the amount of remaining work chunks.
508    fn remaining(&self) -> usize {
509        self.todo.len()
510    }
511
512    /// Return whether there is any work pending.
513    fn is_empty(&self) -> bool {
514        self.remaining() == 0
515    }
516
517    /// Append some pending work.
518    fn push(
519        &mut self,
520        cursor1: C1,
521        storage1: C1::Storage,
522        cursor2: C2,
523        storage2: C2::Storage,
524        capability: Capability<C1::Time>,
525    ) {
526        let fut = self.start_work(
527            cursor1,
528            storage1,
529            cursor2,
530            storage2,
531            capability.time().clone(),
532        );
533
534        self.todo.push_back((Box::pin(fut), capability));
535    }
536
537    /// Process pending work until none is remaining or `yield_fn` requests a yield.
538    fn process<C, YFn>(
539        &mut self,
540        output: &mut OutputBuilderSession<'_, C1::Time, CapacityContainerBuilder<C>>,
541        yield_fn: YFn,
542    ) where
543        C: Container + SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
544        YFn: Fn(Instant, usize) -> bool,
545    {
546        let start_time = Instant::now();
547        self.produced.set(0);
548
549        let waker = futures::task::noop_waker();
550        let mut ctx = std::task::Context::from_waker(&waker);
551
552        while let Some((mut fut, cap)) = self.todo.pop_front() {
553            // Drive the work future until it's done or it's time to yield.
554            let mut done = false;
555            let mut should_yield = false;
556            while !done && !should_yield {
557                done = fut.as_mut().poll(&mut ctx).is_ready();
558                should_yield = yield_fn(start_time, self.produced.get());
559            }
560
561            // Drain the produced join results.
562            let mut output_buf = self.output.borrow_mut();
563
564            // Consolidating here is important when the join closure produces data that
565            // consolidates well, for example when projecting columns.
566            let old_len = output_buf.len();
567            consolidate_updates(&mut output_buf);
568            let recovered = old_len - output_buf.len();
569            self.produced.update(|x| x - recovered);
570
571            output.session(&cap).give_iterator(output_buf.drain(..));
572
573            if done {
574                // We have finished processing a chunk of work. Use this opportunity to truncate
575                // the output buffer, so we don't keep excess memory allocated forever.
576                *output_buf = Default::default();
577            } else if !done {
578                // Still work to do in this chunk.
579                self.todo.push_front((fut, cap));
580            }
581
582            if should_yield {
583                break;
584            }
585        }
586    }
587
588    /// Start the work of joining the updates produced by the given cursors.
589    ///
590    /// This method returns a `Future` that can be polled to make progress on the join work.
591    /// Returning a future allows us to implement the logic using async/await syntax where we can
592    /// conveniently pause the work at any point by calling `yield_now().await`. We are allowed to
593    /// hold references across yield points, which is something we wouldn't get with a hand-rolled
594    /// state machine implementation.
595    fn start_work(
596        &self,
597        mut cursor1: C1,
598        storage1: C1::Storage,
599        mut cursor2: C2,
600        storage2: C2::Storage,
601        meet: C1::Time,
602    ) -> impl Future<Output = ()> + use<C1, C2, D, L, I> {
603        let result_fn = Rc::clone(&self.result_fn);
604        let output = Rc::clone(&self.output);
605        let produced = Rc::clone(&self.produced);
606
607        async move {
608            let mut joiner = Joiner::new(result_fn, output, produced, meet);
609
610            while let Some(key1) = cursor1.get_key(&storage1)
611                && let Some(key2) = cursor2.get_key(&storage2)
612            {
613                match key1.cmp(&key2) {
614                    Ordering::Less => cursor1.seek_key(&storage1, key2),
615                    Ordering::Greater => cursor2.seek_key(&storage2, key1),
616                    Ordering::Equal => {
617                        joiner
618                            .join_key(key1, &mut cursor1, &storage1, &mut cursor2, &storage2)
619                            .await;
620
621                        cursor1.step_key(&storage1);
622                        cursor2.step_key(&storage2);
623                    }
624                }
625            }
626        }
627    }
628}
629
630/// Type that knows how to perform the core join logic.
631///
632/// The joiner implements two join strategies:
633///
634///  * The "simple" strategy produces a match for each combination of (val1, time1, val2, time2)
635///    found in the inputs. If there are multiple times in the input, it may produce matches for
636///    times in which one of the values wasn't present. These matches cancel each other out, so the
637///    result ends up correct.
638///  * The "linear scan over times" strategy sorts the input data by time and then steps through
639///    the input histories, producing matches for a pair of values only if both values where
640///    present at the same time.
641///
642/// The linear scan strategy avoids redundant work and is much more efficient than the simple
643/// strategy when many distinct times are present in the inputs. However, sorting the input data
644/// incurs some overhead, so we still prefer the simple variant when the input data is small.
645struct Joiner<'a, C1, C2, D, L>
646where
647    C1: Cursor,
648    C2: Cursor,
649{
650    /// A function that transforms raw join matches into join results.
651    result_fn: Rc<RefCell<L>>,
652    /// A buffer holding the join results.
653    output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
654    /// The number of join results produced.
655    produced: Rc<Cell<usize>>,
656    /// A time to which all join results should be advanced.
657    meet: C1::Time,
658
659    /// Buffer for edit histories from the first input.
660    history1: ValueHistory<'a, C1>,
661    /// Buffer for edit histories from the second input.
662    history2: ValueHistory<'a, C2>,
663}
664
665impl<'a, C1, C2, D, L, I> Joiner<'a, C1, C2, D, L>
666where
667    C1: Cursor<Diff = Diff>,
668    C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
669    D: Data,
670    L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I + 'static,
671    I: IntoIterator<Item = D> + 'static,
672{
673    fn new(
674        result_fn: Rc<RefCell<L>>,
675        output: Rc<RefCell<Vec<(D, C1::Time, Diff)>>>,
676        produced: Rc<Cell<usize>>,
677        meet: C1::Time,
678    ) -> Self {
679        Self {
680            result_fn,
681            output,
682            produced,
683            meet,
684            history1: ValueHistory::new(),
685            history2: ValueHistory::new(),
686        }
687    }
688
689    /// Produce matches for the values of a single key.
690    async fn join_key(
691        &mut self,
692        key: C1::Key<'_>,
693        cursor1: &mut C1,
694        storage1: &'a C1::Storage,
695        cursor2: &mut C2,
696        storage2: &'a C2::Storage,
697    ) {
698        self.history1.edits.load(cursor1, storage1, &self.meet);
699        self.history2.edits.load(cursor2, storage2, &self.meet);
700
701        // If the input data is small, use the simple strategy.
702        //
703        // TODO: This conditional is taken directly from DD. We should check if it might make sense
704        //       to do something different, like using the simple strategy always when the number
705        //       of distinct times is small.
706        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
707            self.join_key_simple(key);
708            yield_now().await;
709        } else {
710            self.join_key_linear_time_scan(key).await;
711        }
712    }
713
714    /// Produce matches for the values of a single key, using the simple strategy.
715    ///
716    /// This strategy is only meant to be used for small inputs, so we don't bother including yield
717    /// points or optimizations.
718    fn join_key_simple(&self, key: C1::Key<'_>) {
719        let mut result_fn = self.result_fn.borrow_mut();
720        let mut output = self.output.borrow_mut();
721
722        for (v1, t1, r1) in self.history1.edits.iter() {
723            for (v2, t2, r2) in self.history2.edits.iter() {
724                for data in result_fn(key, v1, v2) {
725                    output.push((data, t1.join(t2), r1 * r2));
726                    self.produced.update(|x| x + 1);
727                }
728            }
729        }
730    }
731
732    /// Produce matches for the values of a single key, using a linear scan through times.
733    async fn join_key_linear_time_scan(&mut self, key: C1::Key<'_>) {
734        let history1 = &mut self.history1;
735        let history2 = &mut self.history2;
736
737        history1.replay();
738        history2.replay();
739
740        // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
741        //       in here. If a time is ever repeated, for example, the call will be identical
742        //       and accomplish nothing. If only a single record has been added, it may not
743        //       be worth the time to collapse (advance, re-sort) the data when a linear scan
744        //       is sufficient.
745
746        // Join the next entry in `history1`.
747        let work_history1 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
748            let mut result_fn = self.result_fn.borrow_mut();
749            let mut output = self.output.borrow_mut();
750
751            let (t1, meet, v1, r1) = history1.get().unwrap();
752            history2.advance_past_by(meet);
753            for &(v2, ref t2, r2) in &history2.past {
754                for data in result_fn(key, v1, v2) {
755                    output.push((data, t1.join(t2), r1 * r2));
756                    self.produced.update(|x| x + 1);
757                }
758            }
759            history1.step();
760        };
761
762        // Join the next entry in `history2`.
763        let work_history2 = |history1: &mut ValueHistory<C1>, history2: &mut ValueHistory<C2>| {
764            let mut result_fn = self.result_fn.borrow_mut();
765            let mut output = self.output.borrow_mut();
766
767            let (t2, meet, v2, r2) = history2.get().unwrap();
768            history1.advance_past_by(meet);
769            for &(v1, ref t1, r1) in &history1.past {
770                for data in result_fn(key, v1, v2) {
771                    output.push((data, t1.join(t2), r1 * r2));
772                    self.produced.update(|x| x + 1);
773                }
774            }
775            history2.step();
776        };
777
778        while let Some(time1) = history1.get_time()
779            && let Some(time2) = history2.get_time()
780        {
781            if time1 < time2 {
782                work_history1(history1, history2)
783            } else {
784                work_history2(history1, history2)
785            };
786            yield_now().await;
787        }
788
789        while !history1.is_empty() {
790            work_history1(history1, history2);
791            yield_now().await;
792        }
793        while !history2.is_empty() {
794            work_history2(history1, history2);
795            yield_now().await;
796        }
797    }
798}
799
800/// An accumulation of (value, time, diff) updates.
801///
802/// Deduplicated values are stored in `values`. Each entry includes the end index of the
803/// corresponding range in `edits`. The edits stored for a value are consolidated.
804struct EditList<'a, C: Cursor> {
805    values: Vec<(C::Val<'a>, usize)>,
806    edits: Vec<(C::Time, Diff)>,
807}
808
809impl<'a, C> EditList<'a, C>
810where
811    C: Cursor<Diff = Diff>,
812{
813    fn len(&self) -> usize {
814        self.edits.len()
815    }
816
817    /// Load the updates in the given cursor.
818    ///
819    /// Steps over values, but not over keys.
820    fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: &C::Time) {
821        self.values.clear();
822        self.edits.clear();
823
824        let mut edit_idx = 0;
825        while let Some(value) = cursor.get_val(storage) {
826            cursor.map_times(storage, |time, diff| {
827                let mut time = C::owned_time(time);
828                time.join_assign(meet);
829                self.edits.push((time, C::owned_diff(diff)));
830            });
831
832            consolidate_from(&mut self.edits, edit_idx);
833
834            if self.edits.len() > edit_idx {
835                edit_idx = self.edits.len();
836                self.values.push((value, edit_idx));
837            }
838
839            cursor.step_val(storage);
840        }
841    }
842
843    /// Iterate over the contained updates.
844    fn iter(&self) -> impl Iterator<Item = (C::Val<'a>, &C::Time, Diff)> {
845        self.values
846            .iter()
847            .enumerate()
848            .flat_map(|(idx, (value, end))| {
849                let start = if idx == 0 { 0 } else { self.values[idx - 1].1 };
850                let edits = &self.edits[start..*end];
851                edits.iter().map(|(time, diff)| (*value, time, *diff))
852            })
853    }
854}
855
856/// A history for replaying updates in time order.
857struct ValueHistory<'a, C: Cursor> {
858    /// Unsorted updates to replay.
859    edits: EditList<'a, C>,
860    /// Time-sorted updates that have not been stepped over yet.
861    ///
862    /// Entries are (time, meet, value_idx, diff).
863    future: Vec<(C::Time, C::Time, usize, Diff)>,
864    /// Rolled-up updates that have been stepped over.
865    past: Vec<(C::Val<'a>, C::Time, Diff)>,
866}
867
868impl<'a, C> ValueHistory<'a, C>
869where
870    C: Cursor,
871{
872    /// Create a new empty `ValueHistory`.
873    fn new() -> Self {
874        Self {
875            edits: EditList {
876                values: Default::default(),
877                edits: Default::default(),
878            },
879            future: Default::default(),
880            past: Default::default(),
881        }
882    }
883
884    /// Return whether there are updates left to step over.
885    fn is_empty(&self) -> bool {
886        self.future.is_empty()
887    }
888
889    /// Return the next update.
890    fn get(&self) -> Option<(&C::Time, &C::Time, C::Val<'a>, Diff)> {
891        self.future.last().map(|(t, m, v, r)| {
892            let (value, _) = self.edits.values[*v];
893            (t, m, value, *r)
894        })
895    }
896
897    /// Return the time of the next update.
898    fn get_time(&self) -> Option<&C::Time> {
899        self.future.last().map(|(t, _, _, _)| t)
900    }
901
902    /// Populate `future` with the updates stored in `edits`.
903    fn replay(&mut self) {
904        self.future.clear();
905        self.past.clear();
906
907        let values = &self.edits.values;
908        let edits = &self.edits.edits;
909        for (idx, (_, end)) in values.iter().enumerate() {
910            let start = if idx == 0 { 0 } else { values[idx - 1].1 };
911            for edit_idx in start..*end {
912                let (time, diff) = &edits[edit_idx];
913                self.future.push((time.clone(), time.clone(), idx, *diff));
914            }
915        }
916
917        self.future.sort_by(|x, y| y.cmp(x));
918
919        for idx in 1..self.future.len() {
920            self.future[idx].1 = self.future[idx].1.meet(&self.future[idx - 1].1);
921        }
922    }
923
924    /// Advance the history by moving the next entry from `future` into `past`.
925    fn step(&mut self) {
926        let (time, _, value_idx, diff) = self.future.pop().unwrap();
927        let (value, _) = self.edits.values[value_idx];
928        self.past.push((value, time, diff));
929    }
930
931    /// Advance all times in `past` by `meet`.
932    fn advance_past_by(&mut self, meet: &C::Time) {
933        for (_, time, _) in &mut self.past {
934            time.join_assign(meet);
935        }
936        consolidate_updates(&mut self.past);
937    }
938}