mz_compute/render/join/mz_join_core.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//! * Differential's `JoinCore::join_core`
15//! * A Materialize fork thereof, called `mz_join_core`
16//!
17//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
18//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
19//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
20//! control for multiple seconds or longer, which in turn causes degraded user experience.
21//!
22//! `mz_join_core` currently fixes the yielding issue by omitting the merge-join matching strategy
23//! implemented in DD's join implementation. This leaves only the nested loop strategy for which it
24//! is easy to implement yielding within keys.
25//!
26//! While `mz_join_core` retains responsiveness in the face of cross-joins it is also, due to its
27//! sole reliance on nested-loop matching, significantly slower than DD's join for workloads that
28//! have a large amount of edits at different times. We consider these niche workloads for
29//! Materialize today, due to the way source ingestion works, but that might change in the future.
30//!
31//! For the moment, we keep both implementations around, selectable through a feature flag.
32//! We expect `mz_join_core` to be more useful in Materialize today, but being able to fall back to
33//! DD's implementation provides a safety net in case that assumption is wrong.
34//!
35//! In the mid-term, we want to arrive at a single join implementation that is as efficient as DD's
36//! join and as responsive as `mz_join_core`. Whether that means adding merge-join matching to
37//! `mz_join_core` or adding better fueling to DD's join implementation is still TBD.
38
39use std::cmp::Ordering;
40use std::collections::VecDeque;
41use std::time::Instant;
42
43use differential_dataflow::Data;
44use differential_dataflow::IntoOwned;
45use differential_dataflow::consolidation::{consolidate, consolidate_updates};
46use differential_dataflow::difference::Multiply;
47use differential_dataflow::lattice::Lattice;
48use differential_dataflow::operators::arrange::arrangement::Arranged;
49use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
50use mz_repr::Diff;
51use timely::PartialOrder;
52use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
53use timely::dataflow::channels::pact::Pipeline;
54use timely::dataflow::channels::pushers::Tee;
55use timely::dataflow::channels::pushers::buffer::Session;
56use timely::dataflow::operators::generic::OutputHandleCore;
57use timely::dataflow::operators::{Capability, Operator};
58use timely::dataflow::{Scope, StreamCore};
59use timely::progress::timestamp::Timestamp;
60use tracing::trace;
61
62use crate::render::context::ShutdownToken;
63
64/// Joins two arranged collections with the same key type.
65///
66/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
67/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
68/// every value returned by the iterator.
69pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
70 arranged1: &Arranged<G, Tr1>,
71 arranged2: &Arranged<G, Tr2>,
72 shutdown_token: ShutdownToken,
73 mut result: L,
74 yield_fn: YFn,
75) -> StreamCore<G, C>
76where
77 G: Scope,
78 G::Timestamp: Lattice,
79 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
80 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
81 + Clone
82 + 'static,
83 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
84 I: IntoIterator,
85 I::Item: Data,
86 YFn: Fn(Instant, usize) -> bool + 'static,
87 C: SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
88{
89 let mut trace1 = arranged1.trace.clone();
90 let mut trace2 = arranged2.trace.clone();
91
92 arranged1.stream.binary_frontier(
93 &arranged2.stream,
94 Pipeline,
95 Pipeline,
96 "Join",
97 move |capability, info| {
98 let operator_id = info.global_id;
99
100 // Acquire an activator to reschedule the operator when it has unfinished work.
101 let activator = arranged1.stream.scope().activator_for(info.address);
102
103 // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
104 // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
105 // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
106 // initial work for the two traces, and before the operator is constructed.
107
108 // Acknowledged frontier for each input.
109 // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
110 // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
111 // the physical compaction frontier of their corresponding trace.
112 // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
113 use timely::progress::frontier::Antichain;
114 let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
115 let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
116
117 // deferred work of batches from each input.
118 let mut todo1 = VecDeque::new();
119 let mut todo2 = VecDeque::new();
120
121 // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
122 trace1.map_batches(|batch1| {
123 trace!(
124 operator_id,
125 input = 1,
126 lower = ?batch1.lower().elements(),
127 upper = ?batch1.upper().elements(),
128 size = batch1.len(),
129 "pre-loading batch",
130 );
131
132 acknowledged1.clone_from(batch1.upper());
133 // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
134 // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
135 // Once we start streaming batches in, we will need to respond to new batches from
136 // `input1` with logic that would have otherwise been here. Check out the next loop
137 // for the structure.
138 });
139 // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
140 // iterating through batches and capturing the upper bound. This is a great moment to assert that
141 // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
142 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
143 assert!(PartialOrder::less_equal(
144 &trace1.get_physical_compaction(),
145 &acknowledged1.borrow()
146 ));
147
148 trace!(
149 operator_id,
150 input = 1,
151 acknowledged1 = ?acknowledged1.elements(),
152 "pre-loading finished",
153 );
154
155 // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
156 // on both traces at the same time, as they could be the same trace and this would panic.
157 let mut batch2_cursors = Vec::new();
158 trace2.map_batches(|batch2| {
159 trace!(
160 operator_id,
161 input = 2,
162 lower = ?batch2.lower().elements(),
163 upper = ?batch2.upper().elements(),
164 size = batch2.len(),
165 "pre-loading batch",
166 );
167
168 acknowledged2.clone_from(batch2.upper());
169 batch2_cursors.push((batch2.cursor(), batch2.clone()));
170 });
171 // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
172 // iterating through batches and capturing the upper bound. This is a great moment to assert that
173 // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
174 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
175 assert!(PartialOrder::less_equal(
176 &trace2.get_physical_compaction(),
177 &acknowledged2.borrow()
178 ));
179
180 // Load up deferred work using trace2 cursors and batches captured just above.
181 for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
182 trace!(
183 operator_id,
184 input = 2,
185 acknowledged1 = ?acknowledged1.elements(),
186 "deferring work for batch",
187 );
188
189 // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
190 let (trace1_cursor, trace1_storage) =
191 trace1.cursor_through(acknowledged1.borrow()).unwrap();
192 // We could downgrade the capability here, but doing so is a bit complicated mathematically.
193 // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
194 // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
195 // that property.
196 todo2.push_back(Deferred::new(
197 trace1_cursor,
198 trace1_storage,
199 batch2_cursor,
200 batch2.clone(),
201 capability.clone(),
202 ));
203 }
204
205 trace!(
206 operator_id,
207 input = 2,
208 acknowledged2 = ?acknowledged2.elements(),
209 "pre-loading finished",
210 );
211
212 // Droppable handles to shared trace data structures.
213 let mut trace1_option = Some(trace1);
214 let mut trace2_option = Some(trace2);
215
216 move |input1, input2, output| {
217 // If the dataflow is shutting down, discard all existing and future work.
218 if shutdown_token.in_shutdown() {
219 trace!(operator_id, "shutting down");
220
221 // Discard data at the inputs.
222 input1.for_each(|_cap, _data| ());
223 input2.for_each(|_cap, _data| ());
224
225 // Discard queued work.
226 todo1 = Default::default();
227 todo2 = Default::default();
228
229 // Stop holding on to input traces.
230 trace1_option = None;
231 trace2_option = None;
232
233 return;
234 }
235
236 // 1. Consuming input.
237 //
238 // The join computation repeatedly accepts batches of updates from each of its inputs.
239 //
240 // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
241 // updates from its other input. It is important to track which updates have been accepted, because
242 // we use a shared trace and there may be updates present that are in advance of this accepted bound.
243 //
244 // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
245 // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
246 // This last case is a consequence of our inability to transmit empty batches, as they may be formed
247 // in the absence of timely dataflow capabilities.
248
249 // Drain input 1, prepare work.
250 input1.for_each(|capability, data| {
251 let trace2 = trace2_option
252 .as_mut()
253 .expect("we only drop a trace in response to the other input emptying");
254 let capability = capability.retain();
255 for batch1 in data.drain(..) {
256 // Ignore any pre-loaded data.
257 if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
258 trace!(
259 operator_id,
260 input = 1,
261 lower = ?batch1.lower().elements(),
262 upper = ?batch1.upper().elements(),
263 size = batch1.len(),
264 "loading batch",
265 );
266
267 if !batch1.is_empty() {
268 trace!(
269 operator_id,
270 input = 1,
271 acknowledged2 = ?acknowledged2.elements(),
272 "deferring work for batch",
273 );
274
275 // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
276 // at start-up, and have held back physical compaction ever since.
277 let (trace2_cursor, trace2_storage) =
278 trace2.cursor_through(acknowledged2.borrow()).unwrap();
279 let batch1_cursor = batch1.cursor();
280 todo1.push_back(Deferred::new(
281 batch1_cursor,
282 batch1.clone(),
283 trace2_cursor,
284 trace2_storage,
285 capability.clone(),
286 ));
287 }
288
289 // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
290 // may have skipped over empty batches. Still, the batches are in-order, and we should be
291 // able to just assume the most recent `batch1.upper`
292 debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
293 acknowledged1.clone_from(batch1.upper());
294
295 trace!(
296 operator_id,
297 input = 1,
298 acknowledged1 = ?acknowledged1.elements(),
299 "batch acknowledged",
300 );
301 }
302 }
303 });
304
305 // Drain input 2, prepare work.
306 input2.for_each(|capability, data| {
307 let trace1 = trace1_option
308 .as_mut()
309 .expect("we only drop a trace in response to the other input emptying");
310 let capability = capability.retain();
311 for batch2 in data.drain(..) {
312 // Ignore any pre-loaded data.
313 if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
314 trace!(
315 operator_id,
316 input = 2,
317 lower = ?batch2.lower().elements(),
318 upper = ?batch2.upper().elements(),
319 size = batch2.len(),
320 "loading batch",
321 );
322
323 if !batch2.is_empty() {
324 trace!(
325 operator_id,
326 input = 2,
327 acknowledged1 = ?acknowledged1.elements(),
328 "deferring work for batch",
329 );
330
331 // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
332 // at start-up, and have held back physical compaction ever since.
333 let (trace1_cursor, trace1_storage) =
334 trace1.cursor_through(acknowledged1.borrow()).unwrap();
335 let batch2_cursor = batch2.cursor();
336 todo2.push_back(Deferred::new(
337 trace1_cursor,
338 trace1_storage,
339 batch2_cursor,
340 batch2.clone(),
341 capability.clone(),
342 ));
343 }
344
345 // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
346 // may have skipped over empty batches. Still, the batches are in-order, and we should be
347 // able to just assume the most recent `batch2.upper`
348 debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
349 acknowledged2.clone_from(batch2.upper());
350
351 trace!(
352 operator_id,
353 input = 2,
354 acknowledged2 = ?acknowledged2.elements(),
355 "batch acknowledged",
356 );
357 }
358 }
359 });
360
361 // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
362 if let Some(trace1) = trace1_option.as_mut() {
363 trace!(
364 operator_id,
365 input = 1,
366 acknowledged1 = ?acknowledged1.elements(),
367 "advancing trace upper",
368 );
369 trace1.advance_upper(&mut acknowledged1);
370 }
371 if let Some(trace2) = trace2_option.as_mut() {
372 trace!(
373 operator_id,
374 input = 2,
375 acknowledged2 = ?acknowledged2.elements(),
376 "advancing trace upper",
377 );
378 trace2.advance_upper(&mut acknowledged2);
379 }
380
381 // 2. Join computation.
382 //
383 // For each of the inputs, we do some amount of work (measured in terms of number
384 // of output records produced). This is meant to yield control to allow downstream
385 // operators to consume and reduce the output, but it it also means to provide some
386 // degree of responsiveness. There is a potential risk here that if we fall behind
387 // then the increasing queues hold back physical compaction of the underlying traces
388 // which results in unintentionally quadratic processing time (each batch of either
389 // input must scan all batches from the other input).
390
391 // Perform some amount of outstanding work.
392 trace!(
393 operator_id,
394 input = 1,
395 work_left = todo1.len(),
396 "starting work",
397 );
398
399 let start_time = Instant::now();
400 let mut work = 0;
401 while !todo1.is_empty() && !yield_fn(start_time, work) {
402 todo1.front_mut().unwrap().work(
403 output,
404 &mut result,
405 |w| yield_fn(start_time, w),
406 &mut work,
407 );
408 if !todo1.front().unwrap().work_remains() {
409 todo1.pop_front();
410 }
411 }
412
413 trace!(
414 operator_id,
415 input = 1,
416 work_left = todo1.len(),
417 work_done = work,
418 elapsed = ?start_time.elapsed(),
419 "ceasing work",
420 );
421
422 // Perform some amount of outstanding work.
423 trace!(
424 operator_id,
425 input = 2,
426 work_left = todo2.len(),
427 "starting work",
428 );
429
430 let start_time = Instant::now();
431 let mut work = 0;
432 while !todo2.is_empty() && !yield_fn(start_time, work) {
433 todo2.front_mut().unwrap().work(
434 output,
435 &mut result,
436 |w| yield_fn(start_time, w),
437 &mut work,
438 );
439 if !todo2.front().unwrap().work_remains() {
440 todo2.pop_front();
441 }
442 }
443
444 trace!(
445 operator_id,
446 input = 2,
447 work_left = todo2.len(),
448 work_done = work,
449 elapsed = ?start_time.elapsed(),
450 "ceasing work",
451 );
452
453 // Re-activate operator if work remains.
454 if !todo1.is_empty() || !todo2.is_empty() {
455 activator.activate();
456 }
457
458 // 3. Trace maintenance.
459 //
460 // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
461 // the progress of an input, because should we ever drop one of the traces we will
462 // lose the ability to extract information from anything other than the input.
463 // For example, if we dropped `trace2` we would not be able to use `advance_upper`
464 // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
465 // compaction of `trace1`.
466
467 // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
468 if let Some(trace1) = trace1_option.as_mut() {
469 if input2.frontier().is_empty() {
470 trace!(operator_id, input = 1, "dropping trace handle");
471 trace1_option = None;
472 } else {
473 trace!(
474 operator_id,
475 input = 1,
476 logical = ?*input2.frontier().frontier(),
477 physical = ?acknowledged1.elements(),
478 "advancing trace compaction",
479 );
480
481 // Allow `trace1` to compact logically up to the frontier we may yet receive,
482 // in the opposing input (`input2`). All `input2` times will be beyond this
483 // frontier, and joined times only need to be accurate when advanced to it.
484 trace1.set_logical_compaction(input2.frontier().frontier());
485 // Allow `trace1` to compact physically up to the upper bound of batches we
486 // have received in its input (`input1`). We will not require a cursor that
487 // is not beyond this bound.
488 trace1.set_physical_compaction(acknowledged1.borrow());
489 }
490 }
491
492 // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
493 if let Some(trace2) = trace2_option.as_mut() {
494 if input1.frontier().is_empty() {
495 trace!(operator_id, input = 2, "dropping trace handle");
496 trace2_option = None;
497 } else {
498 trace!(
499 operator_id,
500 input = 2,
501 logical = ?*input1.frontier().frontier(),
502 physical = ?acknowledged2.elements(),
503 "advancing trace compaction",
504 );
505
506 // Allow `trace2` to compact logically up to the frontier we may yet receive,
507 // in the opposing input (`input1`). All `input1` times will be beyond this
508 // frontier, and joined times only need to be accurate when advanced to it.
509 trace2.set_logical_compaction(input1.frontier().frontier());
510 // Allow `trace2` to compact physically up to the upper bound of batches we
511 // have received in its input (`input2`). We will not require a cursor that
512 // is not beyond this bound.
513 trace2.set_physical_compaction(acknowledged2.borrow());
514 }
515 }
516 }
517 },
518 )
519}
520
521/// Deferred join computation.
522///
523/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
524/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
525/// dataflow system a chance to run operators that can consume and aggregate the data.
526struct Deferred<T, C1, C2, D>
527where
528 T: Timestamp,
529 C1: Cursor<Time = T, Diff = Diff>,
530 C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T, Diff = Diff>,
531{
532 cursor1: C1,
533 storage1: C1::Storage,
534 cursor2: C2,
535 storage2: C2::Storage,
536 capability: Capability<T>,
537 done: bool,
538 temp: Vec<(D, T, Diff)>,
539}
540
541impl<T, C1, C2, D> Deferred<T, C1, C2, D>
542where
543 T: Timestamp + Lattice,
544 C1: Cursor<Time = T, Diff = Diff>,
545 C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T, Diff = Diff>,
546 D: Data,
547{
548 fn new(
549 cursor1: C1,
550 storage1: C1::Storage,
551 cursor2: C2,
552 storage2: C2::Storage,
553 capability: Capability<T>,
554 ) -> Self {
555 Deferred {
556 cursor1,
557 storage1,
558 cursor2,
559 storage2,
560 capability,
561 done: false,
562 temp: Vec::new(),
563 }
564 }
565
566 fn work_remains(&self) -> bool {
567 !self.done
568 }
569
570 /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
571 fn work<L, I, YFn, C>(
572 &mut self,
573 output: &mut OutputHandleCore<T, CapacityContainerBuilder<C>, Tee<T, C>>,
574 mut logic: L,
575 yield_fn: YFn,
576 work: &mut usize,
577 ) where
578 I: IntoIterator<Item = D>,
579 L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I,
580 YFn: Fn(usize) -> bool,
581 C: SizableContainer + PushInto<(D, T, Diff)> + Data,
582 {
583 let meet = self.capability.time();
584
585 let mut session = output.session(&self.capability);
586
587 let storage1 = &self.storage1;
588 let storage2 = &self.storage2;
589
590 let cursor1 = &mut self.cursor1;
591 let cursor2 = &mut self.cursor2;
592
593 let temp = &mut self.temp;
594
595 let flush = |data: &mut Vec<_>, session: &mut Session<_, _, _>| {
596 let old_len = data.len();
597 // Consolidating here is important when the join closure produces data that
598 // consolidates well, for example when projecting columns.
599 consolidate_updates(data);
600 let recovered = old_len - data.len();
601 session.give_iterator(data.drain(..));
602 recovered
603 };
604
605 assert_eq!(temp.len(), 0);
606
607 let mut buffer = Vec::default();
608
609 while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) {
610 match cursor1.key(storage1).cmp(&cursor2.key(storage2)) {
611 Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
612 Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
613 Ordering::Equal => {
614 // Populate `temp` with the results, until we should yield.
615 let key = cursor2.key(storage2);
616 while let Some(val1) = cursor1.get_val(storage1) {
617 while let Some(val2) = cursor2.get_val(storage2) {
618 // Evaluate logic on `key, val1, val2`. Note the absence of time and diff.
619 let mut result = logic(key, val1, val2).into_iter().peekable();
620
621 // We can only produce output if the result return something.
622 if let Some(first) = result.next() {
623 // Join times.
624 cursor1.map_times(storage1, |time1, diff1| {
625 let mut time1 = time1.into_owned();
626 time1.join_assign(meet);
627 let diff1 = diff1.into_owned();
628 cursor2.map_times(storage2, |time2, diff2| {
629 let mut time2 = time2.into_owned();
630 time2.join_assign(&time1);
631 let diff = diff1.multiply(&diff2.into_owned());
632 buffer.push((time2, diff));
633 });
634 });
635 consolidate(&mut buffer);
636
637 // Special case no results, one result, and potentially many results
638 match (result.peek().is_some(), buffer.len()) {
639 // Certainly no output
640 (_, 0) => {}
641 // Single element, single time
642 (false, 1) => {
643 let (time, diff) = buffer.pop().unwrap();
644 temp.push((first, time, diff));
645 }
646 // Multiple elements or multiple times
647 (_, _) => {
648 for d in std::iter::once(first).chain(result) {
649 temp.extend(buffer.iter().map(|(time, diff)| {
650 (d.clone(), time.clone(), diff.clone())
651 }))
652 }
653 }
654 }
655 buffer.clear();
656 }
657 cursor2.step_val(storage2);
658 }
659 cursor1.step_val(storage1);
660 cursor2.rewind_vals(storage2);
661
662 *work = work.saturating_add(temp.len());
663
664 if yield_fn(*work) {
665 // Returning here is only allowed because we leave the cursors in a
666 // state that will let us pick up the work correctly on the next
667 // invocation.
668 *work -= flush(temp, &mut session);
669 if yield_fn(*work) {
670 return;
671 }
672 }
673 }
674
675 cursor1.step_key(storage1);
676 cursor2.step_key(storage2);
677 }
678 }
679 }
680
681 if !temp.is_empty() {
682 *work -= flush(temp, &mut session);
683 }
684
685 // We only get here after having iterated through all keys.
686 self.done = true;
687 }
688}