Skip to main content

mz_timely_util/
columnation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Columnation-based containers vendored from differential-dataflow.
17//!
18//! This module provides [`ColumnationStack`], a columnar container that stores data using the
19//! columnation library, along with [`ColumnationChunker`] for organizing streams into sorted
20//! chunks, and the [`ColInternalMerger`] [`Merger`] implementation needed by the merge batcher.
21
22use std::collections::VecDeque;
23use std::iter::FromIterator;
24
25use columnation::{Columnation, Region};
26use differential_dataflow::consolidation::consolidate_updates;
27use differential_dataflow::difference::Semigroup;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::implementations::merge_batcher::Merger;
30use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput};
31use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
32use timely::progress::Timestamp;
33use timely::progress::frontier::{Antichain, AntichainRef};
34use timely::{Accountable, PartialOrder};
35
36// ---------------------------------------------------------------------------
37// ColumnationStack
38// ---------------------------------------------------------------------------
39
40/// An append-only vector that stores records as columns.
41///
42/// This container maintains elements that might conventionally own
43/// memory allocations, but instead the pointers to those allocations
44/// reference larger regions of memory shared with multiple instances
45/// of the type. Elements can be retrieved as references, and care is
46/// taken when this type is dropped to ensure that the correct memory
47/// is returned (rather than the incorrect memory, from running the
48/// elements `Drop` implementations).
49pub struct ColumnationStack<T: Columnation> {
50    local: Vec<T>,
51    inner: T::InnerRegion,
52}
53
54impl<T: Columnation> ColumnationStack<T> {
55    /// Construct a [`ColumnationStack`], reserving space for `capacity` elements.
56    ///
57    /// Note that the associated region is not initialized to a specific capacity
58    /// because we can't generally know how much space would be required.
59    pub fn with_capacity(capacity: usize) -> Self {
60        Self {
61            local: Vec::with_capacity(capacity),
62            inner: T::InnerRegion::default(),
63        }
64    }
65
66    /// Ensures `Self` can absorb `items` without further allocations.
67    ///
68    /// The argument `items` may be cloned and iterated multiple times.
69    /// Please be careful if it contains side effects.
70    #[inline(always)]
71    pub fn reserve_items<'a, I>(&mut self, items: I)
72    where
73        I: Iterator<Item = &'a T> + Clone,
74        T: 'a,
75    {
76        self.local.reserve(items.clone().count());
77        self.inner.reserve_items(items);
78    }
79
80    /// Ensures `Self` can absorb `items` without further allocations.
81    ///
82    /// The argument `items` may be cloned and iterated multiple times.
83    /// Please be careful if it contains side effects.
84    #[inline(always)]
85    pub fn reserve_regions<'a, I>(&mut self, regions: I)
86    where
87        Self: 'a,
88        I: Iterator<Item = &'a Self> + Clone,
89    {
90        self.local
91            .reserve(regions.clone().map(|cs| cs.local.len()).sum());
92        self.inner.reserve_regions(regions.map(|cs| &cs.inner));
93    }
94
95    /// Copies an element into the region.
96    ///
97    /// The element can be read by indexing.
98    pub fn copy(&mut self, item: &T) {
99        unsafe {
100            self.local.push(self.inner.copy(item));
101        }
102    }
103
104    /// Empties the collection.
105    pub fn clear(&mut self) {
106        unsafe {
107            self.local.set_len(0);
108            self.inner.clear();
109        }
110    }
111
112    /// Retain elements that pass a predicate, from a specified offset.
113    ///
114    /// This method may or may not reclaim memory in the inner region.
115    pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
116        let mut write_position = index;
117        for position in index..self.local.len() {
118            if predicate(&self[position]) {
119                self.local.swap(position, write_position);
120                write_position += 1;
121            }
122        }
123        unsafe {
124            // Unsafety justified in that `write_position` is no greater than
125            // `self.local.len()` and so this exposes no invalid data.
126            self.local.set_len(write_position);
127        }
128    }
129
130    /// Unsafe access to `local` data. The slices store data that is backed by a region
131    /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice.
132    ///
133    /// # Safety
134    /// Elements within `local` can be reordered, but not mutated, removed and/or dropped.
135    pub unsafe fn local(&mut self) -> &mut [T] {
136        &mut self.local[..]
137    }
138
139    /// Estimate the memory capacity in bytes.
140    #[inline]
141    pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
142        let size_of = std::mem::size_of::<T>();
143        callback(self.local.len() * size_of, self.local.capacity() * size_of);
144        self.inner.heap_size(callback);
145    }
146
147    /// Estimate the consumed memory capacity in bytes, summing both used and total capacity.
148    #[inline]
149    pub fn summed_heap_size(&self) -> (usize, usize) {
150        let (mut length, mut capacity) = (0, 0);
151        self.heap_size(|len, cap| {
152            length += len;
153            capacity += cap
154        });
155        (length, capacity)
156    }
157
158    /// The length in items.
159    #[inline]
160    pub fn len(&self) -> usize {
161        self.local.len()
162    }
163
164    /// Returns `true` if the stack is empty.
165    pub fn is_empty(&self) -> bool {
166        self.local.is_empty()
167    }
168
169    /// The capacity of the local vector.
170    #[inline]
171    pub fn capacity(&self) -> usize {
172        self.local.capacity()
173    }
174
175    /// Reserve space for `additional` elements.
176    #[inline]
177    pub fn reserve(&mut self, additional: usize) {
178        self.local.reserve(additional)
179    }
180}
181
182impl<A: Columnation, B: Columnation> ColumnationStack<(A, B)> {
183    /// Copies a destructured tuple `(A, B)` into this column stack.
184    pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
185        unsafe {
186            self.local.push(self.inner.copy_destructured(t1, t2));
187        }
188    }
189}
190
191impl<A: Columnation, B: Columnation, C: Columnation> ColumnationStack<(A, B, C)> {
192    /// Copies a destructured tuple `(A, B, C)` into this column stack.
193    pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
194        unsafe {
195            self.local.push(self.inner.copy_destructured(r0, r1, r2));
196        }
197    }
198}
199
200impl<T: Columnation> std::ops::Deref for ColumnationStack<T> {
201    type Target = [T];
202    #[inline(always)]
203    fn deref(&self) -> &Self::Target {
204        &self.local[..]
205    }
206}
207
208impl<T: Columnation> Drop for ColumnationStack<T> {
209    fn drop(&mut self) {
210        self.clear();
211    }
212}
213
214impl<T: Columnation> Default for ColumnationStack<T> {
215    fn default() -> Self {
216        Self {
217            local: Vec::new(),
218            inner: T::InnerRegion::default(),
219        }
220    }
221}
222
223impl<'a, A: 'a + Columnation> FromIterator<&'a A> for ColumnationStack<A> {
224    fn from_iter<I: IntoIterator<Item = &'a A>>(iter: I) -> Self {
225        let iter = iter.into_iter();
226        let mut c = ColumnationStack::<A>::with_capacity(iter.size_hint().0);
227        for element in iter {
228            c.copy(element);
229        }
230        c
231    }
232}
233
234impl<T: Columnation + PartialEq> PartialEq for ColumnationStack<T> {
235    fn eq(&self, other: &Self) -> bool {
236        PartialEq::eq(&self[..], &other[..])
237    }
238}
239
240impl<T: Columnation + Eq> Eq for ColumnationStack<T> {}
241
242impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for ColumnationStack<T> {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        self[..].fmt(f)
245    }
246}
247
248impl<T: Columnation> Clone for ColumnationStack<T> {
249    fn clone(&self) -> Self {
250        let mut new: Self = Default::default();
251        for item in &self[..] {
252            new.copy(item);
253        }
254        new
255    }
256
257    fn clone_from(&mut self, source: &Self) {
258        self.clear();
259        for item in &source[..] {
260            self.copy(item);
261        }
262    }
263}
264
265impl<T: Columnation> PushInto<T> for ColumnationStack<T> {
266    #[inline]
267    fn push_into(&mut self, item: T) {
268        self.copy(&item);
269    }
270}
271
272impl<T: Columnation> PushInto<&T> for ColumnationStack<T> {
273    #[inline]
274    fn push_into(&mut self, item: &T) {
275        self.copy(item);
276    }
277}
278
279impl<T: Columnation> PushInto<&&T> for ColumnationStack<T> {
280    #[inline]
281    fn push_into(&mut self, item: &&T) {
282        self.copy(*item);
283    }
284}
285
286// Container trait impls
287
288impl<T: Columnation> Accountable for ColumnationStack<T> {
289    #[inline]
290    fn record_count(&self) -> i64 {
291        i64::try_from(self.local.len()).unwrap()
292    }
293    #[inline]
294    fn is_empty(&self) -> bool {
295        self.local.is_empty()
296    }
297}
298
299impl<T: Columnation> DrainContainer for ColumnationStack<T> {
300    type Item<'a>
301        = &'a T
302    where
303        Self: 'a;
304    type DrainIter<'a>
305        = std::slice::Iter<'a, T>
306    where
307        Self: 'a;
308    #[inline]
309    fn drain(&mut self) -> Self::DrainIter<'_> {
310        (*self).iter()
311    }
312}
313
314impl<T: Columnation> SizableContainer for ColumnationStack<T> {
315    fn at_capacity(&self) -> bool {
316        self.len() == self.capacity()
317    }
318    fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
319        if self.capacity() == 0 {
320            *self = stash.take().unwrap_or_default();
321            self.clear();
322        }
323        let preferred = timely::container::buffer::default_capacity::<T>();
324        if self.capacity() < preferred {
325            self.reserve(preferred - self.capacity());
326        }
327    }
328}
329
330impl<T: Clone + Ord + Columnation + 'static> BatchContainer for ColumnationStack<T> {
331    type Owned = T;
332    type ReadItem<'a> = &'a T;
333
334    #[inline(always)]
335    fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
336        item.clone()
337    }
338    #[inline(always)]
339    fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
340        other.clone_from(item);
341    }
342
343    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
344        item
345    }
346
347    fn push_ref(&mut self, item: Self::ReadItem<'_>) {
348        self.push_into(item)
349    }
350    fn push_own(&mut self, item: &Self::Owned) {
351        self.push_into(item)
352    }
353
354    fn clear(&mut self) {
355        self.clear()
356    }
357
358    fn with_capacity(size: usize) -> Self {
359        Self::with_capacity(size)
360    }
361    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
362        let mut new = Self::default();
363        new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
364        new
365    }
366    fn index(&self, index: usize) -> Self::ReadItem<'_> {
367        &self[index]
368    }
369    fn len(&self) -> usize {
370        self[..].len()
371    }
372}
373
374impl<K, V, T, R> BuilderInput<K, V> for ColumnationStack<((K::Owned, V::Owned), T, R)>
375where
376    K: for<'a> BatchContainer<
377            ReadItem<'a>: PartialEq<&'a K::Owned>,
378            Owned: Ord + Columnation + Clone + 'static,
379        >,
380    V: for<'a> BatchContainer<
381            ReadItem<'a>: PartialEq<&'a V::Owned>,
382            Owned: Ord + Columnation + Clone + 'static,
383        >,
384    T: Timestamp + Lattice + Columnation + Clone + 'static,
385    R: Ord + Clone + Semigroup + Columnation + 'static,
386{
387    type Key<'a> = &'a K::Owned;
388    type Val<'a> = &'a V::Owned;
389    type Time = T;
390    type Diff = R;
391
392    fn into_parts<'a>(
393        ((key, val), time, diff): Self::Item<'a>,
394    ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
395        (key, val, time.clone(), diff.clone())
396    }
397
398    fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
399        K::reborrow(other) == *this
400    }
401
402    fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
403        V::reborrow(other) == *this
404    }
405
406    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
407        let mut keys = 0;
408        let mut vals = 0;
409        let mut upds = 0;
410        let mut prev_keyval = None;
411        for link in chain.iter() {
412            for ((key, val), _, _) in link.iter() {
413                if let Some((p_key, p_val)) = prev_keyval {
414                    if p_key != key {
415                        keys += 1;
416                        vals += 1;
417                    } else if p_val != val {
418                        vals += 1;
419                    }
420                } else {
421                    keys += 1;
422                    vals += 1;
423                }
424                upds += 1;
425                prev_keyval = Some((key, val));
426            }
427        }
428        (keys, vals, upds)
429    }
430}
431
432// ---------------------------------------------------------------------------
433// ColumnationChunker
434// ---------------------------------------------------------------------------
435
436/// Chunk a stream of vectors into chains of columnation stacks.
437///
438/// This chunker accumulates into a `Vec` (not a `ColumnationStack`) for efficient
439/// in-place sorting and consolidation, then copies the consolidated results
440/// into `ColumnationStack` chunks. This avoids the cost of sorting through
441/// columnation indirection.
442pub struct ColumnationChunker<T: Columnation> {
443    pending: Vec<T>,
444    ready: VecDeque<ColumnationStack<T>>,
445    empty: Option<ColumnationStack<T>>,
446}
447
448impl<T: Columnation> Default for ColumnationChunker<T> {
449    fn default() -> Self {
450        Self {
451            pending: Vec::default(),
452            ready: VecDeque::default(),
453            empty: None,
454        }
455    }
456}
457
458impl<D, T, R> ColumnationChunker<(D, T, R)>
459where
460    D: Columnation + Ord,
461    T: Columnation + Ord,
462    R: Columnation + Semigroup,
463{
464    const BUFFER_SIZE_BYTES: usize = 64 << 10;
465
466    fn chunk_capacity() -> usize {
467        let size = std::mem::size_of::<(D, T, R)>();
468        if size == 0 {
469            Self::BUFFER_SIZE_BYTES
470        } else if size <= Self::BUFFER_SIZE_BYTES {
471            Self::BUFFER_SIZE_BYTES / size
472        } else {
473            1
474        }
475    }
476
477    fn form_chunk(&mut self) {
478        consolidate_updates(&mut self.pending);
479        if self.pending.len() >= Self::chunk_capacity() {
480            while self.pending.len() > Self::chunk_capacity() {
481                let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
482                for item in self.pending.drain(..chunk.capacity()) {
483                    chunk.copy(&item);
484                }
485                self.ready.push_back(chunk);
486            }
487        }
488    }
489}
490
491impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
492where
493    D: Columnation + Ord + Clone,
494    T: Columnation + Ord + Clone,
495    R: Columnation + Semigroup + Clone,
496{
497    fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
498        if self.pending.capacity() < Self::chunk_capacity() * 2 {
499            self.pending
500                .reserve(Self::chunk_capacity() * 2 - self.pending.len());
501        }
502
503        let mut drain = container.drain(..).peekable();
504        while drain.peek().is_some() {
505            self.pending
506                .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
507            if self.pending.len() == self.pending.capacity() {
508                self.form_chunk();
509            }
510        }
511    }
512}
513
514impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
515where
516    D: Columnation + Ord + Clone + 'static,
517    T: Columnation + Ord + Clone + 'static,
518    R: Columnation + Semigroup + Clone + 'static,
519{
520    type Container = ColumnationStack<(D, T, R)>;
521
522    fn extract(&mut self) -> Option<&mut Self::Container> {
523        if let Some(ready) = self.ready.pop_front() {
524            self.empty = Some(ready);
525            self.empty.as_mut()
526        } else {
527            None
528        }
529    }
530
531    fn finish(&mut self) -> Option<&mut Self::Container> {
532        consolidate_updates(&mut self.pending);
533        while !self.pending.is_empty() {
534            let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
535            for item in self
536                .pending
537                .drain(..std::cmp::min(self.pending.len(), chunk.capacity()))
538            {
539                chunk.copy(&item);
540            }
541            self.ready.push_back(chunk);
542        }
543        self.empty = self.ready.pop_front();
544        self.empty.as_mut()
545    }
546}
547
548// ---------------------------------------------------------------------------
549// ColInternalMerger: a `Merger` for `ColumnationStack` chunks
550// ---------------------------------------------------------------------------
551
552/// A [`Merger`] using internal iteration over [`ColumnationStack`] chunks.
553///
554/// Implements differential-dataflow's chunk-list merge-batcher interface by
555/// merging and splitting chains of sorted, consolidated `ColumnationStack`
556/// chunks. Elements are copied through `copy` / `copy_destructured` rather than
557/// moved out of their backing regions, which is why this operates by internal
558/// iteration rather than draining owned tuples like the stock `VecMerger`.
559pub struct ColInternalMerger<D, T, R> {
560    _marker: std::marker::PhantomData<(D, T, R)>,
561}
562
563impl<D, T, R> Default for ColInternalMerger<D, T, R> {
564    fn default() -> Self {
565        Self {
566            _marker: std::marker::PhantomData,
567        }
568    }
569}
570
571impl<D, T, R> ColInternalMerger<D, T, R>
572where
573    D: Ord + Columnation + Clone + 'static,
574    T: Ord + Columnation + Clone + PartialOrder + 'static,
575    R: Default + Semigroup + Columnation + Clone + 'static,
576{
577    /// Target chunk capacity in elements, mirroring [`ColumnationChunker`].
578    fn chunk_capacity() -> usize {
579        const BUFFER_SIZE_BYTES: usize = 64 << 10;
580        let size = std::mem::size_of::<(D, T, R)>();
581        if size == 0 {
582            BUFFER_SIZE_BYTES
583        } else if size <= BUFFER_SIZE_BYTES {
584            BUFFER_SIZE_BYTES / size
585        } else {
586            1
587        }
588    }
589
590    /// Acquire an empty chunk with the target capacity, recycling from `stash`.
591    fn empty(stash: &mut Vec<ColumnationStack<(D, T, R)>>) -> ColumnationStack<(D, T, R)> {
592        let target = Self::chunk_capacity();
593        match stash.pop() {
594            Some(mut chunk) if chunk.capacity() >= target => {
595                chunk.clear();
596                chunk
597            }
598            _ => ColumnationStack::with_capacity(target),
599        }
600    }
601
602    /// Recycle a consumed chunk into `stash`, retaining its allocations.
603    fn recycle(
604        mut chunk: ColumnationStack<(D, T, R)>,
605        stash: &mut Vec<ColumnationStack<(D, T, R)>>,
606    ) {
607        chunk.clear();
608        stash.push(chunk);
609    }
610
611    /// Drain remaining items from one side into `result`/`output`.
612    ///
613    /// Copies the partially-consumed head into `result` (swapping wholesale
614    /// when `result` is empty and the head untouched), then appends remaining
615    /// full chunks directly to `output` without copying.
616    fn drain_side(
617        head: &mut ColumnationStack<(D, T, R)>,
618        pos: &mut usize,
619        list: &mut std::vec::IntoIter<ColumnationStack<(D, T, R)>>,
620        result: &mut ColumnationStack<(D, T, R)>,
621        output: &mut Vec<ColumnationStack<(D, T, R)>>,
622        stash: &mut Vec<ColumnationStack<(D, T, R)>>,
623    ) {
624        // Copy the partially-consumed head into result.
625        if *pos < head[..].len() {
626            if result.is_empty() && *pos == 0 {
627                std::mem::swap(result, head);
628            } else {
629                for i in *pos..head[..].len() {
630                    result.copy(&head[i]);
631                }
632            }
633            *pos = head[..].len();
634        }
635        // Flush result before appending full chunks.
636        if !result.is_empty() {
637            output.push(std::mem::replace(result, Self::empty(stash)));
638        }
639        // Remaining full chunks go directly to output.
640        output.extend(list);
641    }
642}
643
644impl<D, T, R> Merger for ColInternalMerger<D, T, R>
645where
646    D: Ord + Columnation + Clone + 'static,
647    T: Ord + Columnation + Clone + PartialOrder + 'static,
648    R: Default + Semigroup + Columnation + Clone + 'static,
649{
650    type Chunk = ColumnationStack<(D, T, R)>;
651    type Time = T;
652
653    fn merge(
654        &mut self,
655        list1: Vec<Self::Chunk>,
656        list2: Vec<Self::Chunk>,
657        output: &mut Vec<Self::Chunk>,
658        stash: &mut Vec<Self::Chunk>,
659    ) {
660        use std::cmp::Ordering;
661
662        let mut list1 = list1.into_iter();
663        let mut list2 = list2.into_iter();
664        let mut head1 = list1.next().unwrap_or_default();
665        let mut head2 = list2.next().unwrap_or_default();
666        let mut pos1 = 0;
667        let mut pos2 = 0;
668        let mut result = Self::empty(stash);
669        let mut diff = R::default();
670
671        // Main merge loop: both sides have data.
672        while pos1 < head1[..].len() && pos2 < head2[..].len() {
673            // Tight inner loop bounded by the current heads and result capacity.
674            while pos1 < head1[..].len() && pos2 < head2[..].len() && !result.at_capacity() {
675                let (d1, t1, r1) = &head1[pos1];
676                let (d2, t2, r2) = &head2[pos2];
677                match (d1, t1).cmp(&(d2, t2)) {
678                    Ordering::Less => {
679                        result.copy(&head1[pos1]);
680                        pos1 += 1;
681                    }
682                    Ordering::Greater => {
683                        result.copy(&head2[pos2]);
684                        pos2 += 1;
685                    }
686                    Ordering::Equal => {
687                        diff.clone_from(r1);
688                        diff.plus_equals(r2);
689                        if !diff.is_zero() {
690                            result.copy_destructured(d1, t1, &diff);
691                        }
692                        pos1 += 1;
693                        pos2 += 1;
694                    }
695                }
696            }
697            if result.at_capacity() {
698                output.push(std::mem::replace(&mut result, Self::empty(stash)));
699            }
700            if pos1 >= head1[..].len() {
701                let old = std::mem::replace(&mut head1, list1.next().unwrap_or_default());
702                Self::recycle(old, stash);
703                pos1 = 0;
704            }
705            if pos2 >= head2[..].len() {
706                let old = std::mem::replace(&mut head2, list2.next().unwrap_or_default());
707                Self::recycle(old, stash);
708                pos2 = 0;
709            }
710        }
711
712        // After the loop at least one side is exhausted; draining it is a
713        // no-op, and the other contributes its sorted tail in order — the
714        // partial head by copy, remaining full chunks by passthrough.
715        Self::drain_side(
716            &mut head1,
717            &mut pos1,
718            &mut list1,
719            &mut result,
720            output,
721            stash,
722        );
723        Self::drain_side(
724            &mut head2,
725            &mut pos2,
726            &mut list2,
727            &mut result,
728            output,
729            stash,
730        );
731
732        if !result.is_empty() {
733            output.push(result);
734        }
735    }
736
737    fn extract(
738        &mut self,
739        merged: Vec<Self::Chunk>,
740        upper: AntichainRef<T>,
741        frontier: &mut Antichain<T>,
742        readied: &mut Vec<Self::Chunk>,
743        kept: &mut Vec<Self::Chunk>,
744        stash: &mut Vec<Self::Chunk>,
745    ) {
746        let mut keep = Self::empty(stash);
747        let mut ready = Self::empty(stash);
748
749        for chunk in merged {
750            let len = chunk[..].len();
751            for i in 0..len {
752                let (data, time, diff) = &chunk[i];
753                if upper.less_equal(time) {
754                    frontier.insert_with(time, |time| time.clone());
755                    keep.copy_destructured(data, time, diff);
756                } else {
757                    ready.copy_destructured(data, time, diff);
758                }
759                if keep.at_capacity() {
760                    kept.push(std::mem::replace(&mut keep, Self::empty(stash)));
761                }
762                if ready.at_capacity() {
763                    readied.push(std::mem::replace(&mut ready, Self::empty(stash)));
764                }
765            }
766            // Chunk fully consumed; recycle its allocations.
767            Self::recycle(chunk, stash);
768        }
769
770        if !keep.is_empty() {
771            kept.push(keep);
772        }
773        if !ready.is_empty() {
774            readied.push(ready);
775        }
776    }
777
778    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
779        let (mut size, mut capacity, mut allocations) = (0, 0, 0);
780        chunk.heap_size(|siz, cap| {
781            size += siz;
782            capacity += cap;
783            allocations += 1;
784        });
785        (chunk[..].len(), size, capacity, allocations)
786    }
787}