mz_compute/render/join/
mz_join_core.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//!  * Differential's `JoinCore::join_core`
15//!  * A Materialize fork thereof, called `mz_join_core`
16//!
17//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
18//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
19//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
20//! control for multiple seconds or longer, which in turn causes degraded user experience.
21//!
22//! `mz_join_core` currently fixes the yielding issue by omitting the merge-join matching strategy
23//! implemented in DD's join implementation. This leaves only the nested loop strategy for which it
24//! is easy to implement yielding within keys.
25//!
26//! While `mz_join_core` retains responsiveness in the face of cross-joins it is also, due to its
27//! sole reliance on nested-loop matching, significantly slower than DD's join for workloads that
28//! have a large amount of edits at different times. We consider these niche workloads for
29//! Materialize today, due to the way source ingestion works, but that might change in the future.
30//!
31//! For the moment, we keep both implementations around, selectable through a feature flag.
32//! We expect `mz_join_core` to be more useful in Materialize today, but being able to fall back to
33//! DD's implementation provides a safety net in case that assumption is wrong.
34//!
35//! In the mid-term, we want to arrive at a single join implementation that is as efficient as DD's
36//! join and as responsive as `mz_join_core`. Whether that means adding merge-join matching to
37//! `mz_join_core` or adding better fueling to DD's join implementation is still TBD.
38
39use std::cmp::Ordering;
40use std::collections::VecDeque;
41use std::time::Instant;
42
43use differential_dataflow::Data;
44use differential_dataflow::IntoOwned;
45use differential_dataflow::consolidation::{consolidate, consolidate_updates};
46use differential_dataflow::difference::Multiply;
47use differential_dataflow::lattice::Lattice;
48use differential_dataflow::operators::arrange::arrangement::Arranged;
49use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
50use mz_repr::Diff;
51use timely::PartialOrder;
52use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::channels::pushers::Tee;
55use timely::dataflow::channels::pushers::buffer::Session;
56use timely::dataflow::operators::generic::OutputHandleCore;
57use timely::dataflow::operators::{Capability, Operator};
58use timely::dataflow::{Scope, StreamCore};
59use timely::progress::timestamp::Timestamp;
60use tracing::trace;
61
62use crate::render::context::ShutdownToken;
63
64/// Joins two arranged collections with the same key type.
65///
66/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
67/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
68/// every value returned by the iterator.
69pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
70    arranged1: &Arranged<G, Tr1>,
71    arranged2: &Arranged<G, Tr2>,
72    shutdown_token: ShutdownToken,
73    mut result: L,
74    yield_fn: YFn,
75) -> StreamCore<G, C>
76where
77    G: Scope,
78    G::Timestamp: Lattice,
79    Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
80    Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
81        + Clone
82        + 'static,
83    L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
84    I: IntoIterator,
85    I::Item: Data,
86    YFn: Fn(Instant, usize) -> bool + 'static,
87    C: SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
88{
89    let mut trace1 = arranged1.trace.clone();
90    let mut trace2 = arranged2.trace.clone();
91
92    arranged1.stream.binary_frontier(
93        &arranged2.stream,
94        Pipeline,
95        Pipeline,
96        "Join",
97        move |capability, info| {
98            let operator_id = info.global_id;
99
100            // Acquire an activator to reschedule the operator when it has unfinished work.
101            let activator = arranged1.stream.scope().activator_for(info.address);
102
103            // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
104            // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
105            // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
106            // initial work for the two traces, and before the operator is constructed.
107
108            // Acknowledged frontier for each input.
109            // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
110            // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
111            // the physical compaction frontier of their corresponding trace.
112            // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
113            use timely::progress::frontier::Antichain;
114            let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
115            let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
116
117            // deferred work of batches from each input.
118            let mut todo1 = VecDeque::new();
119            let mut todo2 = VecDeque::new();
120
121            // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
122            trace1.map_batches(|batch1| {
123                trace!(
124                    operator_id,
125                    input = 1,
126                    lower = ?batch1.lower().elements(),
127                    upper = ?batch1.upper().elements(),
128                    size = batch1.len(),
129                    "pre-loading batch",
130                );
131
132                acknowledged1.clone_from(batch1.upper());
133                // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
134                // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
135                // Once we start streaming batches in, we will need to respond to new batches from
136                // `input1` with logic that would have otherwise been here. Check out the next loop
137                // for the structure.
138            });
139            // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
140            // iterating through batches and capturing the upper bound. This is a great moment to assert that
141            // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
142            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
143            assert!(PartialOrder::less_equal(
144                &trace1.get_physical_compaction(),
145                &acknowledged1.borrow()
146            ));
147
148            trace!(
149                operator_id,
150                input = 1,
151                acknowledged1 = ?acknowledged1.elements(),
152                "pre-loading finished",
153            );
154
155            // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
156            // on both traces at the same time, as they could be the same trace and this would panic.
157            let mut batch2_cursors = Vec::new();
158            trace2.map_batches(|batch2| {
159                trace!(
160                    operator_id,
161                    input = 2,
162                    lower = ?batch2.lower().elements(),
163                    upper = ?batch2.upper().elements(),
164                    size = batch2.len(),
165                    "pre-loading batch",
166                );
167
168                acknowledged2.clone_from(batch2.upper());
169                batch2_cursors.push((batch2.cursor(), batch2.clone()));
170            });
171            // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
172            // iterating through batches and capturing the upper bound. This is a great moment to assert that
173            // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
174            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
175            assert!(PartialOrder::less_equal(
176                &trace2.get_physical_compaction(),
177                &acknowledged2.borrow()
178            ));
179
180            // Load up deferred work using trace2 cursors and batches captured just above.
181            for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
182                trace!(
183                    operator_id,
184                    input = 2,
185                    acknowledged1 = ?acknowledged1.elements(),
186                    "deferring work for batch",
187                );
188
189                // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
190                let (trace1_cursor, trace1_storage) =
191                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
192                // We could downgrade the capability here, but doing so is a bit complicated mathematically.
193                // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
194                // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
195                // that property.
196                todo2.push_back(Deferred::new(
197                    trace1_cursor,
198                    trace1_storage,
199                    batch2_cursor,
200                    batch2.clone(),
201                    capability.clone(),
202                ));
203            }
204
205            trace!(
206                operator_id,
207                input = 2,
208                acknowledged2 = ?acknowledged2.elements(),
209                "pre-loading finished",
210            );
211
212            // Droppable handles to shared trace data structures.
213            let mut trace1_option = Some(trace1);
214            let mut trace2_option = Some(trace2);
215
216            move |input1, input2, output| {
217                // If the dataflow is shutting down, discard all existing and future work.
218                if shutdown_token.in_shutdown() {
219                    trace!(operator_id, "shutting down");
220
221                    // Discard data at the inputs.
222                    input1.for_each(|_cap, _data| ());
223                    input2.for_each(|_cap, _data| ());
224
225                    // Discard queued work.
226                    todo1 = Default::default();
227                    todo2 = Default::default();
228
229                    // Stop holding on to input traces.
230                    trace1_option = None;
231                    trace2_option = None;
232
233                    return;
234                }
235
236                // 1. Consuming input.
237                //
238                // The join computation repeatedly accepts batches of updates from each of its inputs.
239                //
240                // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
241                // updates from its other input. It is important to track which updates have been accepted, because
242                // we use a shared trace and there may be updates present that are in advance of this accepted bound.
243                //
244                // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
245                // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
246                // This last case is a consequence of our inability to transmit empty batches, as they may be formed
247                // in the absence of timely dataflow capabilities.
248
249                // Drain input 1, prepare work.
250                input1.for_each(|capability, data| {
251                    let trace2 = trace2_option
252                        .as_mut()
253                        .expect("we only drop a trace in response to the other input emptying");
254                    let capability = capability.retain();
255                    for batch1 in data.drain(..) {
256                        // Ignore any pre-loaded data.
257                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
258                            trace!(
259                                operator_id,
260                                input = 1,
261                                lower = ?batch1.lower().elements(),
262                                upper = ?batch1.upper().elements(),
263                                size = batch1.len(),
264                                "loading batch",
265                            );
266
267                            if !batch1.is_empty() {
268                                trace!(
269                                    operator_id,
270                                    input = 1,
271                                    acknowledged2 = ?acknowledged2.elements(),
272                                    "deferring work for batch",
273                                );
274
275                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
276                                // at start-up, and have held back physical compaction ever since.
277                                let (trace2_cursor, trace2_storage) =
278                                    trace2.cursor_through(acknowledged2.borrow()).unwrap();
279                                let batch1_cursor = batch1.cursor();
280                                todo1.push_back(Deferred::new(
281                                    batch1_cursor,
282                                    batch1.clone(),
283                                    trace2_cursor,
284                                    trace2_storage,
285                                    capability.clone(),
286                                ));
287                            }
288
289                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
290                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
291                            // able to just assume the most recent `batch1.upper`
292                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
293                            acknowledged1.clone_from(batch1.upper());
294
295                            trace!(
296                                operator_id,
297                                input = 1,
298                                acknowledged1 = ?acknowledged1.elements(),
299                                "batch acknowledged",
300                            );
301                        }
302                    }
303                });
304
305                // Drain input 2, prepare work.
306                input2.for_each(|capability, data| {
307                    let trace1 = trace1_option
308                        .as_mut()
309                        .expect("we only drop a trace in response to the other input emptying");
310                    let capability = capability.retain();
311                    for batch2 in data.drain(..) {
312                        // Ignore any pre-loaded data.
313                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
314                            trace!(
315                                operator_id,
316                                input = 2,
317                                lower = ?batch2.lower().elements(),
318                                upper = ?batch2.upper().elements(),
319                                size = batch2.len(),
320                                "loading batch",
321                            );
322
323                            if !batch2.is_empty() {
324                                trace!(
325                                    operator_id,
326                                    input = 2,
327                                    acknowledged1 = ?acknowledged1.elements(),
328                                    "deferring work for batch",
329                                );
330
331                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
332                                // at start-up, and have held back physical compaction ever since.
333                                let (trace1_cursor, trace1_storage) =
334                                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
335                                let batch2_cursor = batch2.cursor();
336                                todo2.push_back(Deferred::new(
337                                    trace1_cursor,
338                                    trace1_storage,
339                                    batch2_cursor,
340                                    batch2.clone(),
341                                    capability.clone(),
342                                ));
343                            }
344
345                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
346                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
347                            // able to just assume the most recent `batch2.upper`
348                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
349                            acknowledged2.clone_from(batch2.upper());
350
351                            trace!(
352                                operator_id,
353                                input = 2,
354                                acknowledged2 = ?acknowledged2.elements(),
355                                "batch acknowledged",
356                            );
357                        }
358                    }
359                });
360
361                // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
362                if let Some(trace1) = trace1_option.as_mut() {
363                    trace!(
364                        operator_id,
365                        input = 1,
366                        acknowledged1 = ?acknowledged1.elements(),
367                        "advancing trace upper",
368                    );
369                    trace1.advance_upper(&mut acknowledged1);
370                }
371                if let Some(trace2) = trace2_option.as_mut() {
372                    trace!(
373                        operator_id,
374                        input = 2,
375                        acknowledged2 = ?acknowledged2.elements(),
376                        "advancing trace upper",
377                    );
378                    trace2.advance_upper(&mut acknowledged2);
379                }
380
381                // 2. Join computation.
382                //
383                // For each of the inputs, we do some amount of work (measured in terms of number
384                // of output records produced). This is meant to yield control to allow downstream
385                // operators to consume and reduce the output, but it it also means to provide some
386                // degree of responsiveness. There is a potential risk here that if we fall behind
387                // then the increasing queues hold back physical compaction of the underlying traces
388                // which results in unintentionally quadratic processing time (each batch of either
389                // input must scan all batches from the other input).
390
391                // Perform some amount of outstanding work.
392                trace!(
393                    operator_id,
394                    input = 1,
395                    work_left = todo1.len(),
396                    "starting work",
397                );
398
399                let start_time = Instant::now();
400                let mut work = 0;
401                while !todo1.is_empty() && !yield_fn(start_time, work) {
402                    todo1.front_mut().unwrap().work(
403                        output,
404                        &mut result,
405                        |w| yield_fn(start_time, w),
406                        &mut work,
407                    );
408                    if !todo1.front().unwrap().work_remains() {
409                        todo1.pop_front();
410                    }
411                }
412
413                trace!(
414                    operator_id,
415                    input = 1,
416                    work_left = todo1.len(),
417                    work_done = work,
418                    elapsed = ?start_time.elapsed(),
419                    "ceasing work",
420                );
421
422                // Perform some amount of outstanding work.
423                trace!(
424                    operator_id,
425                    input = 2,
426                    work_left = todo2.len(),
427                    "starting work",
428                );
429
430                let start_time = Instant::now();
431                let mut work = 0;
432                while !todo2.is_empty() && !yield_fn(start_time, work) {
433                    todo2.front_mut().unwrap().work(
434                        output,
435                        &mut result,
436                        |w| yield_fn(start_time, w),
437                        &mut work,
438                    );
439                    if !todo2.front().unwrap().work_remains() {
440                        todo2.pop_front();
441                    }
442                }
443
444                trace!(
445                    operator_id,
446                    input = 2,
447                    work_left = todo2.len(),
448                    work_done = work,
449                    elapsed = ?start_time.elapsed(),
450                    "ceasing work",
451                );
452
453                // Re-activate operator if work remains.
454                if !todo1.is_empty() || !todo2.is_empty() {
455                    activator.activate();
456                }
457
458                // 3. Trace maintenance.
459                //
460                // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
461                // the progress of an input, because should we ever drop one of the traces we will
462                // lose the ability to extract information from anything other than the input.
463                // For example, if we dropped `trace2` we would not be able to use `advance_upper`
464                // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
465                // compaction of `trace1`.
466
467                // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
468                if let Some(trace1) = trace1_option.as_mut() {
469                    if input2.frontier().is_empty() {
470                        trace!(operator_id, input = 1, "dropping trace handle");
471                        trace1_option = None;
472                    } else {
473                        trace!(
474                            operator_id,
475                            input = 1,
476                            logical = ?*input2.frontier().frontier(),
477                            physical = ?acknowledged1.elements(),
478                            "advancing trace compaction",
479                        );
480
481                        // Allow `trace1` to compact logically up to the frontier we may yet receive,
482                        // in the opposing input (`input2`). All `input2` times will be beyond this
483                        // frontier, and joined times only need to be accurate when advanced to it.
484                        trace1.set_logical_compaction(input2.frontier().frontier());
485                        // Allow `trace1` to compact physically up to the upper bound of batches we
486                        // have received in its input (`input1`). We will not require a cursor that
487                        // is not beyond this bound.
488                        trace1.set_physical_compaction(acknowledged1.borrow());
489                    }
490                }
491
492                // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
493                if let Some(trace2) = trace2_option.as_mut() {
494                    if input1.frontier().is_empty() {
495                        trace!(operator_id, input = 2, "dropping trace handle");
496                        trace2_option = None;
497                    } else {
498                        trace!(
499                            operator_id,
500                            input = 2,
501                            logical = ?*input1.frontier().frontier(),
502                            physical = ?acknowledged2.elements(),
503                            "advancing trace compaction",
504                        );
505
506                        // Allow `trace2` to compact logically up to the frontier we may yet receive,
507                        // in the opposing input (`input1`). All `input1` times will be beyond this
508                        // frontier, and joined times only need to be accurate when advanced to it.
509                        trace2.set_logical_compaction(input1.frontier().frontier());
510                        // Allow `trace2` to compact physically up to the upper bound of batches we
511                        // have received in its input (`input2`). We will not require a cursor that
512                        // is not beyond this bound.
513                        trace2.set_physical_compaction(acknowledged2.borrow());
514                    }
515                }
516            }
517        },
518    )
519}
520
521/// Deferred join computation.
522///
523/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
524/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
525/// dataflow system a chance to run operators that can consume and aggregate the data.
526struct Deferred<T, C1, C2, D>
527where
528    T: Timestamp,
529    C1: Cursor<Time = T, Diff = Diff>,
530    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T, Diff = Diff>,
531{
532    cursor1: C1,
533    storage1: C1::Storage,
534    cursor2: C2,
535    storage2: C2::Storage,
536    capability: Capability<T>,
537    done: bool,
538    temp: Vec<(D, T, Diff)>,
539}
540
541impl<T, C1, C2, D> Deferred<T, C1, C2, D>
542where
543    T: Timestamp + Lattice,
544    C1: Cursor<Time = T, Diff = Diff>,
545    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T, Diff = Diff>,
546    D: Data,
547{
548    fn new(
549        cursor1: C1,
550        storage1: C1::Storage,
551        cursor2: C2,
552        storage2: C2::Storage,
553        capability: Capability<T>,
554    ) -> Self {
555        Deferred {
556            cursor1,
557            storage1,
558            cursor2,
559            storage2,
560            capability,
561            done: false,
562            temp: Vec::new(),
563        }
564    }
565
566    fn work_remains(&self) -> bool {
567        !self.done
568    }
569
570    /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
571    fn work<L, I, YFn, C>(
572        &mut self,
573        output: &mut OutputHandleCore<T, CapacityContainerBuilder<C>, Tee<T, C>>,
574        mut logic: L,
575        yield_fn: YFn,
576        work: &mut usize,
577    ) where
578        I: IntoIterator<Item = D>,
579        L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I,
580        YFn: Fn(usize) -> bool,
581        C: SizableContainer + PushInto<(D, T, Diff)> + Data,
582    {
583        let meet = self.capability.time();
584
585        let mut session = output.session(&self.capability);
586
587        let storage1 = &self.storage1;
588        let storage2 = &self.storage2;
589
590        let cursor1 = &mut self.cursor1;
591        let cursor2 = &mut self.cursor2;
592
593        let temp = &mut self.temp;
594
595        let flush = |data: &mut Vec<_>, session: &mut Session<_, _, _>| {
596            let old_len = data.len();
597            // Consolidating here is important when the join closure produces data that
598            // consolidates well, for example when projecting columns.
599            consolidate_updates(data);
600            let recovered = old_len - data.len();
601            session.give_iterator(data.drain(..));
602            recovered
603        };
604
605        assert_eq!(temp.len(), 0);
606
607        let mut buffer = Vec::default();
608
609        while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) {
610            match cursor1.key(storage1).cmp(&cursor2.key(storage2)) {
611                Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
612                Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
613                Ordering::Equal => {
614                    // Populate `temp` with the results, until we should yield.
615                    let key = cursor2.key(storage2);
616                    while let Some(val1) = cursor1.get_val(storage1) {
617                        while let Some(val2) = cursor2.get_val(storage2) {
618                            // Evaluate logic on `key, val1, val2`. Note the absence of time and diff.
619                            let mut result = logic(key, val1, val2).into_iter().peekable();
620
621                            // We can only produce output if the result return something.
622                            if let Some(first) = result.next() {
623                                // Join times.
624                                cursor1.map_times(storage1, |time1, diff1| {
625                                    let mut time1 = time1.into_owned();
626                                    time1.join_assign(meet);
627                                    let diff1 = diff1.into_owned();
628                                    cursor2.map_times(storage2, |time2, diff2| {
629                                        let mut time2 = time2.into_owned();
630                                        time2.join_assign(&time1);
631                                        let diff = diff1.multiply(&diff2.into_owned());
632                                        buffer.push((time2, diff));
633                                    });
634                                });
635                                consolidate(&mut buffer);
636
637                                // Special case no results, one result, and potentially many results
638                                match (result.peek().is_some(), buffer.len()) {
639                                    // Certainly no output
640                                    (_, 0) => {}
641                                    // Single element, single time
642                                    (false, 1) => {
643                                        let (time, diff) = buffer.pop().unwrap();
644                                        temp.push((first, time, diff));
645                                    }
646                                    // Multiple elements or multiple times
647                                    (_, _) => {
648                                        for d in std::iter::once(first).chain(result) {
649                                            temp.extend(buffer.iter().map(|(time, diff)| {
650                                                (d.clone(), time.clone(), diff.clone())
651                                            }))
652                                        }
653                                    }
654                                }
655                                buffer.clear();
656                            }
657                            cursor2.step_val(storage2);
658                        }
659                        cursor1.step_val(storage1);
660                        cursor2.rewind_vals(storage2);
661
662                        *work = work.saturating_add(temp.len());
663
664                        if yield_fn(*work) {
665                            // Returning here is only allowed because we leave the cursors in a
666                            // state that will let us pick up the work correctly on the next
667                            // invocation.
668                            *work -= flush(temp, &mut session);
669                            if yield_fn(*work) {
670                                return;
671                            }
672                        }
673                    }
674
675                    cursor1.step_key(storage1);
676                    cursor2.step_key(storage2);
677                }
678            }
679        }
680
681        if !temp.is_empty() {
682            *work -= flush(temp, &mut session);
683        }
684
685        // We only get here after having iterated through all keys.
686        self.done = true;
687    }
688}