mz_compute/render/join/
mz_join_core.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//!  * Differential's `JoinCore::join_core`
15//!  * A Materialize fork thereof, called `mz_join_core`
16//!  * Another Materialize fork thereof, called `mz_join_core_v2`
17//!
18//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
19//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
20//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
21//! control for multiple seconds or longer, which in turn causes degraded user experience.
22//!
23//! `mz_join_core` currently fixes the yielding issue by omitting the linear scan through times
24//! implemented in DD's join implementation. This leaves only the quadratic strategy for which it
25//! is easy to implement yielding within keys.
26//!
27//! While `mz_join_core` retains responsiveness in the face of cross-joins it is also significantly
28//! slower than DD's join for workloads that have a large amount of edits at different times.
29//! `mz_join_core_v2` resolves this by adding support for the DD join's linear scan through times.
30//!
31//! For the moment, we keep all three implementations around, selectable through feature flags.
32//! Eventually, we hope that `mz_join_core_v2` proves itself sufficiently to become the only join
33//! implementation.
34
35use std::cmp::Ordering;
36use std::collections::VecDeque;
37use std::time::Instant;
38
39use differential_dataflow::Data;
40use differential_dataflow::consolidation::{consolidate, consolidate_updates};
41use differential_dataflow::difference::Multiply;
42use differential_dataflow::lattice::Lattice;
43use differential_dataflow::operators::arrange::arrangement::Arranged;
44use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
45use mz_repr::Diff;
46use timely::PartialOrder;
47use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
48use timely::dataflow::channels::pact::Pipeline;
49use timely::dataflow::channels::pushers::Tee;
50use timely::dataflow::channels::pushers::buffer::Session;
51use timely::dataflow::operators::generic::OutputHandleCore;
52use timely::dataflow::operators::{Capability, Operator};
53use timely::dataflow::{Scope, StreamCore};
54use timely::progress::timestamp::Timestamp;
55use tracing::trace;
56
57use crate::render::context::ShutdownProbe;
58
59/// Joins two arranged collections with the same key type.
60///
61/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
62/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
63/// every value returned by the iterator.
64pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
65    arranged1: &Arranged<G, Tr1>,
66    arranged2: &Arranged<G, Tr2>,
67    shutdown_probe: ShutdownProbe,
68    mut result: L,
69    yield_fn: YFn,
70) -> StreamCore<G, C>
71where
72    G: Scope,
73    G::Timestamp: Lattice,
74    Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
75    Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
76        + Clone
77        + 'static,
78    L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
79    I: IntoIterator,
80    I::Item: Data,
81    YFn: Fn(Instant, usize) -> bool + 'static,
82    C: SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
83{
84    let mut trace1 = arranged1.trace.clone();
85    let mut trace2 = arranged2.trace.clone();
86
87    arranged1.stream.binary_frontier(
88        &arranged2.stream,
89        Pipeline,
90        Pipeline,
91        "Join",
92        move |capability, info| {
93            let operator_id = info.global_id;
94
95            // Acquire an activator to reschedule the operator when it has unfinished work.
96            let activator = arranged1.stream.scope().activator_for(info.address);
97
98            // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
99            // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
100            // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
101            // initial work for the two traces, and before the operator is constructed.
102
103            // Acknowledged frontier for each input.
104            // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
105            // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
106            // the physical compaction frontier of their corresponding trace.
107            // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
108            use timely::progress::frontier::Antichain;
109            let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
110            let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
111
112            // deferred work of batches from each input.
113            let mut todo1 = VecDeque::new();
114            let mut todo2 = VecDeque::new();
115
116            // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
117            trace1.map_batches(|batch1| {
118                trace!(
119                    operator_id,
120                    input = 1,
121                    lower = ?batch1.lower().elements(),
122                    upper = ?batch1.upper().elements(),
123                    size = batch1.len(),
124                    "pre-loading batch",
125                );
126
127                acknowledged1.clone_from(batch1.upper());
128                // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
129                // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
130                // Once we start streaming batches in, we will need to respond to new batches from
131                // `input1` with logic that would have otherwise been here. Check out the next loop
132                // for the structure.
133            });
134            // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
135            // iterating through batches and capturing the upper bound. This is a great moment to assert that
136            // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
137            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
138            assert!(PartialOrder::less_equal(
139                &trace1.get_physical_compaction(),
140                &acknowledged1.borrow()
141            ));
142
143            trace!(
144                operator_id,
145                input = 1,
146                acknowledged1 = ?acknowledged1.elements(),
147                "pre-loading finished",
148            );
149
150            // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
151            // on both traces at the same time, as they could be the same trace and this would panic.
152            let mut batch2_cursors = Vec::new();
153            trace2.map_batches(|batch2| {
154                trace!(
155                    operator_id,
156                    input = 2,
157                    lower = ?batch2.lower().elements(),
158                    upper = ?batch2.upper().elements(),
159                    size = batch2.len(),
160                    "pre-loading batch",
161                );
162
163                acknowledged2.clone_from(batch2.upper());
164                batch2_cursors.push((batch2.cursor(), batch2.clone()));
165            });
166            // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
167            // iterating through batches and capturing the upper bound. This is a great moment to assert that
168            // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
169            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
170            assert!(PartialOrder::less_equal(
171                &trace2.get_physical_compaction(),
172                &acknowledged2.borrow()
173            ));
174
175            // Load up deferred work using trace2 cursors and batches captured just above.
176            for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
177                trace!(
178                    operator_id,
179                    input = 2,
180                    acknowledged1 = ?acknowledged1.elements(),
181                    "deferring work for batch",
182                );
183
184                // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
185                let (trace1_cursor, trace1_storage) =
186                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
187                // We could downgrade the capability here, but doing so is a bit complicated mathematically.
188                // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
189                // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
190                // that property.
191                todo2.push_back(Deferred::new(
192                    trace1_cursor,
193                    trace1_storage,
194                    batch2_cursor,
195                    batch2.clone(),
196                    capability.clone(),
197                ));
198            }
199
200            trace!(
201                operator_id,
202                input = 2,
203                acknowledged2 = ?acknowledged2.elements(),
204                "pre-loading finished",
205            );
206
207            // Droppable handles to shared trace data structures.
208            let mut trace1_option = Some(trace1);
209            let mut trace2_option = Some(trace2);
210
211            move |input1, input2, output| {
212                // If the dataflow is shutting down, discard all existing and future work.
213                if shutdown_probe.in_shutdown() {
214                    trace!(operator_id, "shutting down");
215
216                    // Discard data at the inputs.
217                    input1.for_each(|_cap, _data| ());
218                    input2.for_each(|_cap, _data| ());
219
220                    // Discard queued work.
221                    todo1 = Default::default();
222                    todo2 = Default::default();
223
224                    // Stop holding on to input traces.
225                    trace1_option = None;
226                    trace2_option = None;
227
228                    return;
229                }
230
231                // 1. Consuming input.
232                //
233                // The join computation repeatedly accepts batches of updates from each of its inputs.
234                //
235                // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
236                // updates from its other input. It is important to track which updates have been accepted, because
237                // we use a shared trace and there may be updates present that are in advance of this accepted bound.
238                //
239                // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
240                // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
241                // This last case is a consequence of our inability to transmit empty batches, as they may be formed
242                // in the absence of timely dataflow capabilities.
243
244                // Drain input 1, prepare work.
245                input1.for_each(|capability, data| {
246                    let trace2 = trace2_option
247                        .as_mut()
248                        .expect("we only drop a trace in response to the other input emptying");
249                    let capability = capability.retain();
250                    for batch1 in data.drain(..) {
251                        // Ignore any pre-loaded data.
252                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
253                            trace!(
254                                operator_id,
255                                input = 1,
256                                lower = ?batch1.lower().elements(),
257                                upper = ?batch1.upper().elements(),
258                                size = batch1.len(),
259                                "loading batch",
260                            );
261
262                            if !batch1.is_empty() {
263                                trace!(
264                                    operator_id,
265                                    input = 1,
266                                    acknowledged2 = ?acknowledged2.elements(),
267                                    "deferring work for batch",
268                                );
269
270                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
271                                // at start-up, and have held back physical compaction ever since.
272                                let (trace2_cursor, trace2_storage) =
273                                    trace2.cursor_through(acknowledged2.borrow()).unwrap();
274                                let batch1_cursor = batch1.cursor();
275                                todo1.push_back(Deferred::new(
276                                    batch1_cursor,
277                                    batch1.clone(),
278                                    trace2_cursor,
279                                    trace2_storage,
280                                    capability.clone(),
281                                ));
282                            }
283
284                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
285                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
286                            // able to just assume the most recent `batch1.upper`
287                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
288                            acknowledged1.clone_from(batch1.upper());
289
290                            trace!(
291                                operator_id,
292                                input = 1,
293                                acknowledged1 = ?acknowledged1.elements(),
294                                "batch acknowledged",
295                            );
296                        }
297                    }
298                });
299
300                // Drain input 2, prepare work.
301                input2.for_each(|capability, data| {
302                    let trace1 = trace1_option
303                        .as_mut()
304                        .expect("we only drop a trace in response to the other input emptying");
305                    let capability = capability.retain();
306                    for batch2 in data.drain(..) {
307                        // Ignore any pre-loaded data.
308                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
309                            trace!(
310                                operator_id,
311                                input = 2,
312                                lower = ?batch2.lower().elements(),
313                                upper = ?batch2.upper().elements(),
314                                size = batch2.len(),
315                                "loading batch",
316                            );
317
318                            if !batch2.is_empty() {
319                                trace!(
320                                    operator_id,
321                                    input = 2,
322                                    acknowledged1 = ?acknowledged1.elements(),
323                                    "deferring work for batch",
324                                );
325
326                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
327                                // at start-up, and have held back physical compaction ever since.
328                                let (trace1_cursor, trace1_storage) =
329                                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
330                                let batch2_cursor = batch2.cursor();
331                                todo2.push_back(Deferred::new(
332                                    trace1_cursor,
333                                    trace1_storage,
334                                    batch2_cursor,
335                                    batch2.clone(),
336                                    capability.clone(),
337                                ));
338                            }
339
340                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
341                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
342                            // able to just assume the most recent `batch2.upper`
343                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
344                            acknowledged2.clone_from(batch2.upper());
345
346                            trace!(
347                                operator_id,
348                                input = 2,
349                                acknowledged2 = ?acknowledged2.elements(),
350                                "batch acknowledged",
351                            );
352                        }
353                    }
354                });
355
356                // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
357                if let Some(trace1) = trace1_option.as_mut() {
358                    trace!(
359                        operator_id,
360                        input = 1,
361                        acknowledged1 = ?acknowledged1.elements(),
362                        "advancing trace upper",
363                    );
364                    trace1.advance_upper(&mut acknowledged1);
365                }
366                if let Some(trace2) = trace2_option.as_mut() {
367                    trace!(
368                        operator_id,
369                        input = 2,
370                        acknowledged2 = ?acknowledged2.elements(),
371                        "advancing trace upper",
372                    );
373                    trace2.advance_upper(&mut acknowledged2);
374                }
375
376                // 2. Join computation.
377                //
378                // For each of the inputs, we do some amount of work (measured in terms of number
379                // of output records produced). This is meant to yield control to allow downstream
380                // operators to consume and reduce the output, but it it also means to provide some
381                // degree of responsiveness. There is a potential risk here that if we fall behind
382                // then the increasing queues hold back physical compaction of the underlying traces
383                // which results in unintentionally quadratic processing time (each batch of either
384                // input must scan all batches from the other input).
385
386                // Perform some amount of outstanding work.
387                trace!(
388                    operator_id,
389                    input = 1,
390                    work_left = todo1.len(),
391                    "starting work",
392                );
393
394                let start_time = Instant::now();
395                let mut work = 0;
396                while !todo1.is_empty() && !yield_fn(start_time, work) {
397                    todo1.front_mut().unwrap().work(
398                        output,
399                        &mut result,
400                        |w| yield_fn(start_time, w),
401                        &mut work,
402                    );
403                    if !todo1.front().unwrap().work_remains() {
404                        todo1.pop_front();
405                    }
406                }
407
408                trace!(
409                    operator_id,
410                    input = 1,
411                    work_left = todo1.len(),
412                    work_done = work,
413                    elapsed = ?start_time.elapsed(),
414                    "ceasing work",
415                );
416
417                // Perform some amount of outstanding work.
418                trace!(
419                    operator_id,
420                    input = 2,
421                    work_left = todo2.len(),
422                    "starting work",
423                );
424
425                let start_time = Instant::now();
426                let mut work = 0;
427                while !todo2.is_empty() && !yield_fn(start_time, work) {
428                    todo2.front_mut().unwrap().work(
429                        output,
430                        &mut result,
431                        |w| yield_fn(start_time, w),
432                        &mut work,
433                    );
434                    if !todo2.front().unwrap().work_remains() {
435                        todo2.pop_front();
436                    }
437                }
438
439                trace!(
440                    operator_id,
441                    input = 2,
442                    work_left = todo2.len(),
443                    work_done = work,
444                    elapsed = ?start_time.elapsed(),
445                    "ceasing work",
446                );
447
448                // Re-activate operator if work remains.
449                if !todo1.is_empty() || !todo2.is_empty() {
450                    activator.activate();
451                }
452
453                // 3. Trace maintenance.
454                //
455                // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
456                // the progress of an input, because should we ever drop one of the traces we will
457                // lose the ability to extract information from anything other than the input.
458                // For example, if we dropped `trace2` we would not be able to use `advance_upper`
459                // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
460                // compaction of `trace1`.
461
462                // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
463                if let Some(trace1) = trace1_option.as_mut() {
464                    if input2.frontier().is_empty() {
465                        trace!(operator_id, input = 1, "dropping trace handle");
466                        trace1_option = None;
467                    } else {
468                        trace!(
469                            operator_id,
470                            input = 1,
471                            logical = ?*input2.frontier().frontier(),
472                            physical = ?acknowledged1.elements(),
473                            "advancing trace compaction",
474                        );
475
476                        // Allow `trace1` to compact logically up to the frontier we may yet receive,
477                        // in the opposing input (`input2`). All `input2` times will be beyond this
478                        // frontier, and joined times only need to be accurate when advanced to it.
479                        trace1.set_logical_compaction(input2.frontier().frontier());
480                        // Allow `trace1` to compact physically up to the upper bound of batches we
481                        // have received in its input (`input1`). We will not require a cursor that
482                        // is not beyond this bound.
483                        trace1.set_physical_compaction(acknowledged1.borrow());
484                    }
485                }
486
487                // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
488                if let Some(trace2) = trace2_option.as_mut() {
489                    if input1.frontier().is_empty() {
490                        trace!(operator_id, input = 2, "dropping trace handle");
491                        trace2_option = None;
492                    } else {
493                        trace!(
494                            operator_id,
495                            input = 2,
496                            logical = ?*input1.frontier().frontier(),
497                            physical = ?acknowledged2.elements(),
498                            "advancing trace compaction",
499                        );
500
501                        // Allow `trace2` to compact logically up to the frontier we may yet receive,
502                        // in the opposing input (`input1`). All `input1` times will be beyond this
503                        // frontier, and joined times only need to be accurate when advanced to it.
504                        trace2.set_logical_compaction(input1.frontier().frontier());
505                        // Allow `trace2` to compact physically up to the upper bound of batches we
506                        // have received in its input (`input2`). We will not require a cursor that
507                        // is not beyond this bound.
508                        trace2.set_physical_compaction(acknowledged2.borrow());
509                    }
510                }
511            }
512        },
513    )
514}
515
516/// Deferred join computation.
517///
518/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
519/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
520/// dataflow system a chance to run operators that can consume and aggregate the data.
521struct Deferred<C1, C2, D>
522where
523    C1: Cursor<Diff = Diff>,
524    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
525{
526    cursor1: C1,
527    storage1: C1::Storage,
528    cursor2: C2,
529    storage2: C2::Storage,
530    capability: Capability<C1::Time>,
531    done: bool,
532    temp: Vec<(D, C1::Time, Diff)>,
533}
534
535impl<C1, C2, D> Deferred<C1, C2, D>
536where
537    C1: Cursor<Diff = Diff>,
538    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
539    D: Data,
540{
541    fn new(
542        cursor1: C1,
543        storage1: C1::Storage,
544        cursor2: C2,
545        storage2: C2::Storage,
546        capability: Capability<C1::Time>,
547    ) -> Self {
548        Deferred {
549            cursor1,
550            storage1,
551            cursor2,
552            storage2,
553            capability,
554            done: false,
555            temp: Vec::new(),
556        }
557    }
558
559    fn work_remains(&self) -> bool {
560        !self.done
561    }
562
563    /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
564    fn work<L, I, YFn, C>(
565        &mut self,
566        output: &mut OutputHandleCore<C1::Time, CapacityContainerBuilder<C>, Tee<C1::Time, C>>,
567        mut logic: L,
568        yield_fn: YFn,
569        work: &mut usize,
570    ) where
571        I: IntoIterator<Item = D>,
572        L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I,
573        YFn: Fn(usize) -> bool,
574        C: SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
575    {
576        let meet = self.capability.time();
577
578        let mut session = output.session(&self.capability);
579
580        let storage1 = &self.storage1;
581        let storage2 = &self.storage2;
582
583        let cursor1 = &mut self.cursor1;
584        let cursor2 = &mut self.cursor2;
585
586        let temp = &mut self.temp;
587
588        let flush = |data: &mut Vec<_>, session: &mut Session<_, _, _>| {
589            let old_len = data.len();
590            // Consolidating here is important when the join closure produces data that
591            // consolidates well, for example when projecting columns.
592            consolidate_updates(data);
593            let recovered = old_len - data.len();
594            session.give_iterator(data.drain(..));
595            recovered
596        };
597
598        assert_eq!(temp.len(), 0);
599
600        let mut buffer = Vec::default();
601
602        while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) {
603            match cursor1.key(storage1).cmp(&cursor2.key(storage2)) {
604                Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
605                Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
606                Ordering::Equal => {
607                    // Populate `temp` with the results, until we should yield.
608                    let key = cursor2.key(storage2);
609                    while let Some(val1) = cursor1.get_val(storage1) {
610                        while let Some(val2) = cursor2.get_val(storage2) {
611                            // Evaluate logic on `key, val1, val2`. Note the absence of time and diff.
612                            let mut result = logic(key, val1, val2).into_iter().peekable();
613
614                            // We can only produce output if the result return something.
615                            if let Some(first) = result.next() {
616                                // Join times.
617                                cursor1.map_times(storage1, |time1, diff1| {
618                                    let mut time1 = C1::owned_time(time1);
619                                    time1.join_assign(meet);
620                                    let diff1 = C1::owned_diff(diff1);
621                                    cursor2.map_times(storage2, |time2, diff2| {
622                                        let mut time2 = C2::owned_time(time2);
623                                        time2.join_assign(&time1);
624                                        let diff = diff1.multiply(&C2::owned_diff(diff2));
625                                        buffer.push((time2, diff));
626                                    });
627                                });
628                                consolidate(&mut buffer);
629
630                                // Special case no results, one result, and potentially many results
631                                match (result.peek().is_some(), buffer.len()) {
632                                    // Certainly no output
633                                    (_, 0) => {}
634                                    // Single element, single time
635                                    (false, 1) => {
636                                        let (time, diff) = buffer.pop().unwrap();
637                                        temp.push((first, time, diff));
638                                    }
639                                    // Multiple elements or multiple times
640                                    (_, _) => {
641                                        for d in std::iter::once(first).chain(result) {
642                                            temp.extend(buffer.iter().map(|(time, diff)| {
643                                                (d.clone(), time.clone(), diff.clone())
644                                            }))
645                                        }
646                                    }
647                                }
648                                buffer.clear();
649                            }
650                            cursor2.step_val(storage2);
651                        }
652                        cursor1.step_val(storage1);
653                        cursor2.rewind_vals(storage2);
654
655                        *work = work.saturating_add(temp.len());
656
657                        if yield_fn(*work) {
658                            // Returning here is only allowed because we leave the cursors in a
659                            // state that will let us pick up the work correctly on the next
660                            // invocation.
661                            *work -= flush(temp, &mut session);
662                            if yield_fn(*work) {
663                                return;
664                            }
665                        }
666                    }
667
668                    cursor1.step_key(storage1);
669                    cursor2.step_key(storage2);
670                }
671            }
672        }
673
674        if !temp.is_empty() {
675            *work -= flush(temp, &mut session);
676        }
677
678        // We only get here after having iterated through all keys.
679        self.done = true;
680    }
681}