mz_compute/render/join/mz_join_core.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A fork of DD's `JoinCore::join_core`.
11//!
12//! Currently, compute rendering knows two implementations for linear joins:
13//!
14//! * Differential's `JoinCore::join_core`
15//! * A Materialize fork thereof, called `mz_join_core`
16//! * Another Materialize fork thereof, called `mz_join_core_v2`
17//!
18//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
19//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
20//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
21//! control for multiple seconds or longer, which in turn causes degraded user experience.
22//!
23//! `mz_join_core` currently fixes the yielding issue by omitting the linear scan through times
24//! implemented in DD's join implementation. This leaves only the quadratic strategy for which it
25//! is easy to implement yielding within keys.
26//!
27//! While `mz_join_core` retains responsiveness in the face of cross-joins it is also significantly
28//! slower than DD's join for workloads that have a large amount of edits at different times.
29//! `mz_join_core_v2` resolves this by adding support for the DD join's linear scan through times.
30//!
31//! For the moment, we keep all three implementations around, selectable through feature flags.
32//! Eventually, we hope that `mz_join_core_v2` proves itself sufficiently to become the only join
33//! implementation.
34
35use std::cmp::Ordering;
36use std::collections::VecDeque;
37use std::time::Instant;
38
39use differential_dataflow::Data;
40use differential_dataflow::consolidation::{consolidate, consolidate_updates};
41use differential_dataflow::difference::Multiply;
42use differential_dataflow::lattice::Lattice;
43use differential_dataflow::operators::arrange::arrangement::Arranged;
44use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
45use mz_repr::Diff;
46use timely::PartialOrder;
47use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
48use timely::dataflow::channels::pact::Pipeline;
49use timely::dataflow::channels::pushers::Tee;
50use timely::dataflow::channels::pushers::buffer::Session;
51use timely::dataflow::operators::generic::OutputHandleCore;
52use timely::dataflow::operators::{Capability, Operator};
53use timely::dataflow::{Scope, StreamCore};
54use timely::progress::timestamp::Timestamp;
55use tracing::trace;
56
57use crate::render::context::ShutdownProbe;
58
59/// Joins two arranged collections with the same key type.
60///
61/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
62/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
63/// every value returned by the iterator.
64pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, YFn, C>(
65 arranged1: &Arranged<G, Tr1>,
66 arranged2: &Arranged<G, Tr2>,
67 shutdown_probe: ShutdownProbe,
68 mut result: L,
69 yield_fn: YFn,
70) -> StreamCore<G, C>
71where
72 G: Scope,
73 G::Timestamp: Lattice,
74 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
75 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
76 + Clone
77 + 'static,
78 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
79 I: IntoIterator,
80 I::Item: Data,
81 YFn: Fn(Instant, usize) -> bool + 'static,
82 C: SizableContainer + PushInto<(I::Item, G::Timestamp, Diff)> + Data,
83{
84 let mut trace1 = arranged1.trace.clone();
85 let mut trace2 = arranged2.trace.clone();
86
87 arranged1.stream.binary_frontier(
88 &arranged2.stream,
89 Pipeline,
90 Pipeline,
91 "Join",
92 move |capability, info| {
93 let operator_id = info.global_id;
94
95 // Acquire an activator to reschedule the operator when it has unfinished work.
96 let activator = arranged1.stream.scope().activator_for(info.address);
97
98 // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
99 // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
100 // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
101 // initial work for the two traces, and before the operator is constructed.
102
103 // Acknowledged frontier for each input.
104 // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
105 // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
106 // the physical compaction frontier of their corresponding trace.
107 // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
108 use timely::progress::frontier::Antichain;
109 let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
110 let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
111
112 // deferred work of batches from each input.
113 let mut todo1 = VecDeque::new();
114 let mut todo2 = VecDeque::new();
115
116 // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
117 trace1.map_batches(|batch1| {
118 trace!(
119 operator_id,
120 input = 1,
121 lower = ?batch1.lower().elements(),
122 upper = ?batch1.upper().elements(),
123 size = batch1.len(),
124 "pre-loading batch",
125 );
126
127 acknowledged1.clone_from(batch1.upper());
128 // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
129 // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
130 // Once we start streaming batches in, we will need to respond to new batches from
131 // `input1` with logic that would have otherwise been here. Check out the next loop
132 // for the structure.
133 });
134 // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
135 // iterating through batches and capturing the upper bound. This is a great moment to assert that
136 // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
137 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
138 assert!(PartialOrder::less_equal(
139 &trace1.get_physical_compaction(),
140 &acknowledged1.borrow()
141 ));
142
143 trace!(
144 operator_id,
145 input = 1,
146 acknowledged1 = ?acknowledged1.elements(),
147 "pre-loading finished",
148 );
149
150 // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
151 // on both traces at the same time, as they could be the same trace and this would panic.
152 let mut batch2_cursors = Vec::new();
153 trace2.map_batches(|batch2| {
154 trace!(
155 operator_id,
156 input = 2,
157 lower = ?batch2.lower().elements(),
158 upper = ?batch2.upper().elements(),
159 size = batch2.len(),
160 "pre-loading batch",
161 );
162
163 acknowledged2.clone_from(batch2.upper());
164 batch2_cursors.push((batch2.cursor(), batch2.clone()));
165 });
166 // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
167 // iterating through batches and capturing the upper bound. This is a great moment to assert that
168 // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
169 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
170 assert!(PartialOrder::less_equal(
171 &trace2.get_physical_compaction(),
172 &acknowledged2.borrow()
173 ));
174
175 // Load up deferred work using trace2 cursors and batches captured just above.
176 for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
177 trace!(
178 operator_id,
179 input = 2,
180 acknowledged1 = ?acknowledged1.elements(),
181 "deferring work for batch",
182 );
183
184 // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
185 let (trace1_cursor, trace1_storage) =
186 trace1.cursor_through(acknowledged1.borrow()).unwrap();
187 // We could downgrade the capability here, but doing so is a bit complicated mathematically.
188 // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
189 // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
190 // that property.
191 todo2.push_back(Deferred::new(
192 trace1_cursor,
193 trace1_storage,
194 batch2_cursor,
195 batch2.clone(),
196 capability.clone(),
197 ));
198 }
199
200 trace!(
201 operator_id,
202 input = 2,
203 acknowledged2 = ?acknowledged2.elements(),
204 "pre-loading finished",
205 );
206
207 // Droppable handles to shared trace data structures.
208 let mut trace1_option = Some(trace1);
209 let mut trace2_option = Some(trace2);
210
211 move |input1, input2, output| {
212 // If the dataflow is shutting down, discard all existing and future work.
213 if shutdown_probe.in_shutdown() {
214 trace!(operator_id, "shutting down");
215
216 // Discard data at the inputs.
217 input1.for_each(|_cap, _data| ());
218 input2.for_each(|_cap, _data| ());
219
220 // Discard queued work.
221 todo1 = Default::default();
222 todo2 = Default::default();
223
224 // Stop holding on to input traces.
225 trace1_option = None;
226 trace2_option = None;
227
228 return;
229 }
230
231 // 1. Consuming input.
232 //
233 // The join computation repeatedly accepts batches of updates from each of its inputs.
234 //
235 // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
236 // updates from its other input. It is important to track which updates have been accepted, because
237 // we use a shared trace and there may be updates present that are in advance of this accepted bound.
238 //
239 // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
240 // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
241 // This last case is a consequence of our inability to transmit empty batches, as they may be formed
242 // in the absence of timely dataflow capabilities.
243
244 // Drain input 1, prepare work.
245 input1.for_each(|capability, data| {
246 let trace2 = trace2_option
247 .as_mut()
248 .expect("we only drop a trace in response to the other input emptying");
249 let capability = capability.retain();
250 for batch1 in data.drain(..) {
251 // Ignore any pre-loaded data.
252 if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
253 trace!(
254 operator_id,
255 input = 1,
256 lower = ?batch1.lower().elements(),
257 upper = ?batch1.upper().elements(),
258 size = batch1.len(),
259 "loading batch",
260 );
261
262 if !batch1.is_empty() {
263 trace!(
264 operator_id,
265 input = 1,
266 acknowledged2 = ?acknowledged2.elements(),
267 "deferring work for batch",
268 );
269
270 // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
271 // at start-up, and have held back physical compaction ever since.
272 let (trace2_cursor, trace2_storage) =
273 trace2.cursor_through(acknowledged2.borrow()).unwrap();
274 let batch1_cursor = batch1.cursor();
275 todo1.push_back(Deferred::new(
276 batch1_cursor,
277 batch1.clone(),
278 trace2_cursor,
279 trace2_storage,
280 capability.clone(),
281 ));
282 }
283
284 // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
285 // may have skipped over empty batches. Still, the batches are in-order, and we should be
286 // able to just assume the most recent `batch1.upper`
287 debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
288 acknowledged1.clone_from(batch1.upper());
289
290 trace!(
291 operator_id,
292 input = 1,
293 acknowledged1 = ?acknowledged1.elements(),
294 "batch acknowledged",
295 );
296 }
297 }
298 });
299
300 // Drain input 2, prepare work.
301 input2.for_each(|capability, data| {
302 let trace1 = trace1_option
303 .as_mut()
304 .expect("we only drop a trace in response to the other input emptying");
305 let capability = capability.retain();
306 for batch2 in data.drain(..) {
307 // Ignore any pre-loaded data.
308 if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
309 trace!(
310 operator_id,
311 input = 2,
312 lower = ?batch2.lower().elements(),
313 upper = ?batch2.upper().elements(),
314 size = batch2.len(),
315 "loading batch",
316 );
317
318 if !batch2.is_empty() {
319 trace!(
320 operator_id,
321 input = 2,
322 acknowledged1 = ?acknowledged1.elements(),
323 "deferring work for batch",
324 );
325
326 // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
327 // at start-up, and have held back physical compaction ever since.
328 let (trace1_cursor, trace1_storage) =
329 trace1.cursor_through(acknowledged1.borrow()).unwrap();
330 let batch2_cursor = batch2.cursor();
331 todo2.push_back(Deferred::new(
332 trace1_cursor,
333 trace1_storage,
334 batch2_cursor,
335 batch2.clone(),
336 capability.clone(),
337 ));
338 }
339
340 // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
341 // may have skipped over empty batches. Still, the batches are in-order, and we should be
342 // able to just assume the most recent `batch2.upper`
343 debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
344 acknowledged2.clone_from(batch2.upper());
345
346 trace!(
347 operator_id,
348 input = 2,
349 acknowledged2 = ?acknowledged2.elements(),
350 "batch acknowledged",
351 );
352 }
353 }
354 });
355
356 // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
357 if let Some(trace1) = trace1_option.as_mut() {
358 trace!(
359 operator_id,
360 input = 1,
361 acknowledged1 = ?acknowledged1.elements(),
362 "advancing trace upper",
363 );
364 trace1.advance_upper(&mut acknowledged1);
365 }
366 if let Some(trace2) = trace2_option.as_mut() {
367 trace!(
368 operator_id,
369 input = 2,
370 acknowledged2 = ?acknowledged2.elements(),
371 "advancing trace upper",
372 );
373 trace2.advance_upper(&mut acknowledged2);
374 }
375
376 // 2. Join computation.
377 //
378 // For each of the inputs, we do some amount of work (measured in terms of number
379 // of output records produced). This is meant to yield control to allow downstream
380 // operators to consume and reduce the output, but it it also means to provide some
381 // degree of responsiveness. There is a potential risk here that if we fall behind
382 // then the increasing queues hold back physical compaction of the underlying traces
383 // which results in unintentionally quadratic processing time (each batch of either
384 // input must scan all batches from the other input).
385
386 // Perform some amount of outstanding work.
387 trace!(
388 operator_id,
389 input = 1,
390 work_left = todo1.len(),
391 "starting work",
392 );
393
394 let start_time = Instant::now();
395 let mut work = 0;
396 while !todo1.is_empty() && !yield_fn(start_time, work) {
397 todo1.front_mut().unwrap().work(
398 output,
399 &mut result,
400 |w| yield_fn(start_time, w),
401 &mut work,
402 );
403 if !todo1.front().unwrap().work_remains() {
404 todo1.pop_front();
405 }
406 }
407
408 trace!(
409 operator_id,
410 input = 1,
411 work_left = todo1.len(),
412 work_done = work,
413 elapsed = ?start_time.elapsed(),
414 "ceasing work",
415 );
416
417 // Perform some amount of outstanding work.
418 trace!(
419 operator_id,
420 input = 2,
421 work_left = todo2.len(),
422 "starting work",
423 );
424
425 let start_time = Instant::now();
426 let mut work = 0;
427 while !todo2.is_empty() && !yield_fn(start_time, work) {
428 todo2.front_mut().unwrap().work(
429 output,
430 &mut result,
431 |w| yield_fn(start_time, w),
432 &mut work,
433 );
434 if !todo2.front().unwrap().work_remains() {
435 todo2.pop_front();
436 }
437 }
438
439 trace!(
440 operator_id,
441 input = 2,
442 work_left = todo2.len(),
443 work_done = work,
444 elapsed = ?start_time.elapsed(),
445 "ceasing work",
446 );
447
448 // Re-activate operator if work remains.
449 if !todo1.is_empty() || !todo2.is_empty() {
450 activator.activate();
451 }
452
453 // 3. Trace maintenance.
454 //
455 // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
456 // the progress of an input, because should we ever drop one of the traces we will
457 // lose the ability to extract information from anything other than the input.
458 // For example, if we dropped `trace2` we would not be able to use `advance_upper`
459 // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
460 // compaction of `trace1`.
461
462 // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
463 if let Some(trace1) = trace1_option.as_mut() {
464 if input2.frontier().is_empty() {
465 trace!(operator_id, input = 1, "dropping trace handle");
466 trace1_option = None;
467 } else {
468 trace!(
469 operator_id,
470 input = 1,
471 logical = ?*input2.frontier().frontier(),
472 physical = ?acknowledged1.elements(),
473 "advancing trace compaction",
474 );
475
476 // Allow `trace1` to compact logically up to the frontier we may yet receive,
477 // in the opposing input (`input2`). All `input2` times will be beyond this
478 // frontier, and joined times only need to be accurate when advanced to it.
479 trace1.set_logical_compaction(input2.frontier().frontier());
480 // Allow `trace1` to compact physically up to the upper bound of batches we
481 // have received in its input (`input1`). We will not require a cursor that
482 // is not beyond this bound.
483 trace1.set_physical_compaction(acknowledged1.borrow());
484 }
485 }
486
487 // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
488 if let Some(trace2) = trace2_option.as_mut() {
489 if input1.frontier().is_empty() {
490 trace!(operator_id, input = 2, "dropping trace handle");
491 trace2_option = None;
492 } else {
493 trace!(
494 operator_id,
495 input = 2,
496 logical = ?*input1.frontier().frontier(),
497 physical = ?acknowledged2.elements(),
498 "advancing trace compaction",
499 );
500
501 // Allow `trace2` to compact logically up to the frontier we may yet receive,
502 // in the opposing input (`input1`). All `input1` times will be beyond this
503 // frontier, and joined times only need to be accurate when advanced to it.
504 trace2.set_logical_compaction(input1.frontier().frontier());
505 // Allow `trace2` to compact physically up to the upper bound of batches we
506 // have received in its input (`input2`). We will not require a cursor that
507 // is not beyond this bound.
508 trace2.set_physical_compaction(acknowledged2.borrow());
509 }
510 }
511 }
512 },
513 )
514}
515
516/// Deferred join computation.
517///
518/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
519/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
520/// dataflow system a chance to run operators that can consume and aggregate the data.
521struct Deferred<C1, C2, D>
522where
523 C1: Cursor<Diff = Diff>,
524 C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
525{
526 cursor1: C1,
527 storage1: C1::Storage,
528 cursor2: C2,
529 storage2: C2::Storage,
530 capability: Capability<C1::Time>,
531 done: bool,
532 temp: Vec<(D, C1::Time, Diff)>,
533}
534
535impl<C1, C2, D> Deferred<C1, C2, D>
536where
537 C1: Cursor<Diff = Diff>,
538 C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time, Diff = Diff>,
539 D: Data,
540{
541 fn new(
542 cursor1: C1,
543 storage1: C1::Storage,
544 cursor2: C2,
545 storage2: C2::Storage,
546 capability: Capability<C1::Time>,
547 ) -> Self {
548 Deferred {
549 cursor1,
550 storage1,
551 cursor2,
552 storage2,
553 capability,
554 done: false,
555 temp: Vec::new(),
556 }
557 }
558
559 fn work_remains(&self) -> bool {
560 !self.done
561 }
562
563 /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
564 fn work<L, I, YFn, C>(
565 &mut self,
566 output: &mut OutputHandleCore<C1::Time, CapacityContainerBuilder<C>, Tee<C1::Time, C>>,
567 mut logic: L,
568 yield_fn: YFn,
569 work: &mut usize,
570 ) where
571 I: IntoIterator<Item = D>,
572 L: FnMut(C1::Key<'_>, C1::Val<'_>, C2::Val<'_>) -> I,
573 YFn: Fn(usize) -> bool,
574 C: SizableContainer + PushInto<(D, C1::Time, Diff)> + Data,
575 {
576 let meet = self.capability.time();
577
578 let mut session = output.session(&self.capability);
579
580 let storage1 = &self.storage1;
581 let storage2 = &self.storage2;
582
583 let cursor1 = &mut self.cursor1;
584 let cursor2 = &mut self.cursor2;
585
586 let temp = &mut self.temp;
587
588 let flush = |data: &mut Vec<_>, session: &mut Session<_, _, _>| {
589 let old_len = data.len();
590 // Consolidating here is important when the join closure produces data that
591 // consolidates well, for example when projecting columns.
592 consolidate_updates(data);
593 let recovered = old_len - data.len();
594 session.give_iterator(data.drain(..));
595 recovered
596 };
597
598 assert_eq!(temp.len(), 0);
599
600 let mut buffer = Vec::default();
601
602 while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) {
603 match cursor1.key(storage1).cmp(&cursor2.key(storage2)) {
604 Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
605 Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
606 Ordering::Equal => {
607 // Populate `temp` with the results, until we should yield.
608 let key = cursor2.key(storage2);
609 while let Some(val1) = cursor1.get_val(storage1) {
610 while let Some(val2) = cursor2.get_val(storage2) {
611 // Evaluate logic on `key, val1, val2`. Note the absence of time and diff.
612 let mut result = logic(key, val1, val2).into_iter().peekable();
613
614 // We can only produce output if the result return something.
615 if let Some(first) = result.next() {
616 // Join times.
617 cursor1.map_times(storage1, |time1, diff1| {
618 let mut time1 = C1::owned_time(time1);
619 time1.join_assign(meet);
620 let diff1 = C1::owned_diff(diff1);
621 cursor2.map_times(storage2, |time2, diff2| {
622 let mut time2 = C2::owned_time(time2);
623 time2.join_assign(&time1);
624 let diff = diff1.multiply(&C2::owned_diff(diff2));
625 buffer.push((time2, diff));
626 });
627 });
628 consolidate(&mut buffer);
629
630 // Special case no results, one result, and potentially many results
631 match (result.peek().is_some(), buffer.len()) {
632 // Certainly no output
633 (_, 0) => {}
634 // Single element, single time
635 (false, 1) => {
636 let (time, diff) = buffer.pop().unwrap();
637 temp.push((first, time, diff));
638 }
639 // Multiple elements or multiple times
640 (_, _) => {
641 for d in std::iter::once(first).chain(result) {
642 temp.extend(buffer.iter().map(|(time, diff)| {
643 (d.clone(), time.clone(), diff.clone())
644 }))
645 }
646 }
647 }
648 buffer.clear();
649 }
650 cursor2.step_val(storage2);
651 }
652 cursor1.step_val(storage1);
653 cursor2.rewind_vals(storage2);
654
655 *work = work.saturating_add(temp.len());
656
657 if yield_fn(*work) {
658 // Returning here is only allowed because we leave the cursors in a
659 // state that will let us pick up the work correctly on the next
660 // invocation.
661 *work -= flush(temp, &mut session);
662 if yield_fn(*work) {
663 return;
664 }
665 }
666 }
667
668 cursor1.step_key(storage1);
669 cursor2.step_key(storage2);
670 }
671 }
672 }
673
674 if !temp.is_empty() {
675 *work -= flush(temp, &mut session);
676 }
677
678 // We only get here after having iterated through all keys.
679 self.done = true;
680 }
681}