mz_timely_util/
reclock.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! ## Notation
17//!
18//! Collections are represented with capital letters (T, S, R), collection traces as bold letters
19//! (𝐓, 𝐒, 𝐑), and difference traces as δ𝐓.
20//!
21//! Indexing a collection trace 𝐓 to obtain its version at `t` is written as 𝐓(t). Indexing a
22//! collection to obtain the multiplicity of a record `x` is written as T\[x\]. These can be combined
23//! to obtain the multiplicity of a record `x` at some version `t` as 𝐓(t)\[x\].
24//!
25//! ## Overview
26//!
27//! Reclocking transforms a source collection `S` that evolves with some timestamp `FromTime` into
28//! a collection `T` that evolves with some other timestamp `IntoTime`. The reclocked collection T
29//! contains all updates `u ∈ S` that are not beyond some `FromTime` frontier R(t). The collection
30//! `R` is called the remap collection.
31//!
32//! More formally, for some arbitrary time `t` of `IntoTime` and some arbitrary record `x`, the
33//! reclocked collection `T(t)[x]` is defined to be the `sum{δ𝐒(s)[x]: !(𝐑(t) ⪯ s)}`. Since this
34//! holds for any record we can write the definition of Reclock(𝐒, 𝐑) as:
35//!
36//! > Reclock(𝐒, 𝐑) ≜ 𝐓: ∀ t ∈ IntoTime : 𝐓(t) = sum{δ𝐒(s): !(𝐑(t) ⪯ s)}
37//!
38//! In order for the reclocked collection `T` to have a sensible definition of progress we require
39//! that `t1 ≤ t2 ⇒ 𝐑(t1) ⪯ 𝐑(t2)` where the first `≤` is the partial order of `IntoTime` and the
40//! second one the partial order of `FromTime` antichains.
41//!
42//! ## Total order simplification
43//!
44//! In order to simplify the implementation we will require that `IntoTime` is a total order. This
45//! limitation can be lifted in the future but further elaboration on the mechanics of reclocking
46//! is required to ensure a correct implementation.
47//!
48//! ## The difference trace
49//!
50//! By the definition of difference traces we have:
51//!
52//! ```text
53//!     δ𝐓(t) = T(t) - sum{δ𝐓(s): s < t}
54//! ```
55//!
56//! Due to the total order assumption we only need to consider two cases.
57//!
58//! **Case 1:** `t` is the minimum timestamp
59//!
60//! In this case `sum{δ𝐓(s): s < t}` is the empty set and so we obtain:
61//!
62//! ```text
63//!     δ𝐓(min) = T(min) = sum{δ𝐒(s): !(𝐑(min) ≤ s}
64//! ```
65//!
66//! **Case 2:** `t` is a timestamp with a predecessor `prev`
67//!
68//! In this case `sum{δ𝐓(s): s < t}` is equal to `T(prev)` because:
69//!
70//! ```text
71//!     sum{δ𝐓(s): s < t} = sum{δ𝐓(s): s ≤ prev} + sum{δ𝐓(s): prev < s < t}
72//!                       = T(prev) + ∅
73//!                       = T(prev)
74//! ```
75//!
76//! And therefore the difference trace of T is:
77//!
78//! ```text
79//!     δ𝐓(t) = 𝐓(t) - 𝐓(prev)
80//!           = sum{δ𝐒(s): !(𝐑(t) ⪯ s)} - sum{δ𝐒(s): !(𝐑(prev) ⪯ s)}
81//!           = sum{δ𝐒(s): (𝐑(prev) ⪯ s) ∧ !(𝐑(t) ⪯ s)}
82//! ```
83//!
84//! ## Unique mapping property
85//!
86//! Given the definition above we can derive the fact that for any source difference δ𝐒(s) there is
87//! at most one target timestamp t that it must be reclocked to. This property can be exploited by
88//! the implementation of the operator as it can safely discard source updates once a matching
89//! δT(t) has been found, making it "stateless" with respect to the source trace. A formal proof of
90//! this property is [provided below](#unique-mapping-property-proof).
91//!
92//! ## Operational description
93//!
94//! The operator follows a run-to-completion model where on each scheduling it completes all
95//! outstanding work that can be completed.
96//!
97//! ### Unique mapping property proof
98//!
99//! This section contains the formal proof the unique mapping property. The proof follows the
100//! structure proof notation created by Leslie Lamport. Readers unfamiliar with structured proofs
101//! can read about them here <https://lamport.azurewebsites.net/pubs/proof.pdf>.
102//!
103//! #### Statement
104//!
105//! AtMostOne(X, φ(x)) ≜ ∀ x1, x2 ∈ X : φ(x1) ∧ φ(x2) ⇒ x1 = x2
106//!
107//! * **THEOREM** UniqueMapping ≜
108//!     * **ASSUME**
109//!         * **NEW** (FromTime, ⪯) ∈ PartiallyOrderedTimestamps
110//!         * **NEW** (IntoTime, ≤) ∈ TotallyOrderedTimestamps
111//!         * **NEW** 𝐒 ∈ SetOfCollectionTraces(FromTime)
112//!         * **NEW** 𝐑 ∈ SetOfCollectionTraces(IntoTime)
113//!         * ∀ t ∈ IntoTime: 𝐑(t) ∈ SetOfAntichains(FromTime)
114//!         * ∀ t1, t1 ∈ IntoTime: t1 ≤ t2 ⇒ 𝐑(t1) ⪯ 𝐑(t2)
115//!         * **NEW** 𝐓 = Reclock(𝐒, 𝐑)
116//!     * **PROVE**  ∀ s ∈ FromTime : AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
117//!
118//! #### Proof
119//!
120//! 1. **SUFFICES ASSUME** ∃ s ∈ FromTime: ¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
121//!     * **PROVE FALSE**
122//!     * _By proof by contradiction._
123//! 2. **PICK** s ∈ FromTime : ¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
124//!    * _Proof: Such time exists by <1>1._
125//! 3. ∃ t1, t2 ∈ IntoTime : t1 ≠ t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
126//!     1. ¬(∀ x1, x2 ∈ X : (δ𝐒(s) ∈ δ𝐓(x1)) ∧ (δ𝐒(s) ∈ δ𝐓(x2)) ⇒ x1 = x2)
127//!         * _Proof: By <1>2 and definition of AtMostOne._
128//!     2. Q.E.D
129//!         * _Proof: By <2>1, quantifier negation rules, and theorem of propositional logic ¬(P ⇒ Q) ≡ P ∧ ¬Q._
130//! 4. **PICK** t1, t2 ∈ IntoTime : t1 < t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
131//!    * _Proof: By <1>3. Assume t1 < t2 without loss of generality._
132//! 5. ¬(𝐑(t1) ⪯ s)
133//!     1. **CASE** t1 = min(IntoTime)
134//!         1. δ𝐓(t1) = sum{δ𝐒(s): !(𝐑(t1)) ⪯ s}
135//!             * _Proof: By definition of δ𝐓(min)._
136//!         2. δ𝐒(s) ∈ δ𝐓(t1)
137//!             * _Proof: By <1>4._
138//!         3. Q.E.D
139//!             * _Proof: By <3>1 and <3>2._
140//!     2. **CASE** t1 > min(IntoTime)
141//!         1. **PICK** t1_prev = Predecessor(t1)
142//!             * _Proof: Predecessor exists because the set {t: t < t1} is non-empty since it must contain at least min(IntoTime)._
143//!         2. δ𝐓(t1) = sum{δ𝐒(s): (𝐑(t1_prev) ⪯ s) ∧ !(𝐑(t1) ⪯ s)}
144//!             * _Proof: By definition of δ𝐓(t)._
145//!         3. δ𝐒(s) ∈ δ𝐓(t1)
146//!             * _Proof: By <1>4._
147//!         3. Q.E.D
148//!             * _Proof: By <3>2 and <3>3._
149//!     3. Q.E.D
150//!         * _Proof: From cases <2>1 and <2>2 which are exhaustive_
151//! 6. **PICK** t2_prev ∈ IntoTime : t2_prev = Predecessor(t2)
152//!    * _Proof: Predecessor exists because by <1>4 the set {t: t < t2} is non empty since it must contain at least t1._
153//! 7. t1 ≤ t2_prev
154//!    * _Proof: t1 ∈ {t: t < t2} and t2_prev is the maximum element of the set._
155//! 8. 𝐑(t2) ⪯ s
156//!     1. t2 > min(IntoTime)
157//!         * _Proof: By <1>5._
158//!     2. **PICK** t2_prev = Predecessor(t2)
159//!         * _Proof: Predecessor exists because the set {t: t < t2} is non-empty since it must contain at least min(IntoTime)._
160//!     3. δ𝐓(t) = sum{δ𝐒(s): (𝐑(t2_prev) ⪯ s) ∧ !(𝐑(t) ⪯ s)}
161//!         * _Proof: By definition of δ𝐓(t)_
162//!     4. δ𝐒(s) ∈ δ𝐓(t1)
163//!         * _Proof: By <1>4._
164//!     5. Q.E.D
165//!         * _Proof: By <2>3 and <2>4._
166//! 9. 𝐑(t1) ⪯ 𝐑(t2_prev)
167//!     * _Proof: By <1>.7 and hypothesis on R_
168//! 10. 𝐑(t1) ⪯ s
169//!     * _Proof: By <1>8 and <1>9._
170//! 11. Q.E.D
171//!     * _Proof: By <1>5 and <1>10_
172
173use std::cmp::{Ordering, Reverse};
174use std::collections::VecDeque;
175use std::collections::binary_heap::{BinaryHeap, PeekMut};
176use std::iter::FromIterator;
177
178use differential_dataflow::difference::Semigroup;
179use differential_dataflow::lattice::Lattice;
180use differential_dataflow::{AsCollection, Collection, ExchangeData, consolidation};
181use mz_ore::Overflowing;
182use mz_ore::collections::CollectionExt;
183use timely::communication::{Pull, Push};
184use timely::dataflow::Scope;
185use timely::dataflow::channels::pact::Pipeline;
186use timely::dataflow::operators::CapabilitySet;
187use timely::dataflow::operators::capture::Event;
188use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
189use timely::order::{PartialOrder, TotalOrder};
190use timely::progress::frontier::{AntichainRef, MutableAntichain};
191use timely::progress::{Antichain, Timestamp};
192
193/// Constructs an operator that reclocks a `source` collection varying with some time `FromTime`
194/// into the corresponding `reclocked` collection varying over some time `IntoTime` using the
195/// provided `remap` collection.
196///
197/// In order for the operator to read the `source` collection a `Pusher` is returned which can be
198/// used with timely's capture facilities to connect a collection from a foreign scope to this
199/// operator.
200pub fn reclock<G, D, FromTime, IntoTime, R>(
201    remap_collection: &Collection<G, FromTime, Overflowing<i64>>,
202    as_of: Antichain<G::Timestamp>,
203) -> (
204    Box<dyn Push<Event<FromTime, Vec<(D, FromTime, R)>>>>,
205    Collection<G, D, R>,
206)
207where
208    G: Scope<Timestamp = IntoTime>,
209    D: ExchangeData,
210    FromTime: Timestamp,
211    IntoTime: Timestamp + Lattice + TotalOrder,
212    R: Semigroup + 'static,
213{
214    let mut scope = remap_collection.scope();
215    let mut builder = OperatorBuilder::new("Reclock".into(), scope.clone());
216    // Here we create a channel that can be used to send data from a foreign scope into this
217    // operator. The channel is associated with this operator's address so that it is activated
218    // every time events are available for consumption. This mechanism is similar to Timely's input
219    // handles where data can be introduced into a timely scope from an exogenous source.
220    let info = builder.operator_info();
221    let channel_id = scope.new_identifier();
222    let (pusher, mut events) =
223        scope.pipeline::<Event<FromTime, Vec<(D, FromTime, R)>>>(channel_id, info.address);
224
225    let mut remap_input = builder.new_input(&remap_collection.inner, Pipeline);
226    let (mut output, reclocked) = builder.new_output();
227
228    builder.build(move |caps| {
229        let mut capset = CapabilitySet::from_elem(caps.into_element());
230        capset.downgrade(&as_of.borrow());
231
232        // Received remap updates at times `into_time` greater or equal to `remap_input`'s input
233        // frontier. As the input frontier advances, we drop elements out of this priority queue
234        // and mint new associations.
235        let mut pending_remap: BinaryHeap<Reverse<(IntoTime, FromTime, i64)>> = BinaryHeap::new();
236        // A trace of `remap_input` that accumulates correctly for all times that are beyond
237        // `remap_since` and not beyond `remap_upper`. The updates in `remap_trace` are maintained
238        // in time order. An actual DD trace could be used here at the expense of a more
239        // complicated API to traverse it. This is left for future work if the naive trace
240        // maintenance implemented in this operator becomes problematic.
241        let mut remap_upper = Antichain::from_elem(IntoTime::minimum());
242        let mut remap_since = as_of.clone();
243        let mut remap_trace = Vec::new();
244
245        // A stash of source updates for which we don't know the corresponding binding yet.
246        let mut deferred_source_updates: Vec<ChainBatch<_, _, _>> = Vec::new();
247        // The frontier of the `events` input
248        let mut source_frontier = MutableAntichain::new_bottom(FromTime::minimum());
249
250        let mut binding_buffer = Vec::new();
251        let mut interesting_times = Vec::new();
252
253        // Accumulation buffer for `remap_input` updates.
254        use timely::progress::ChangeBatch;
255        let mut remap_accum_buffer: ChangeBatch<(IntoTime, FromTime)> = ChangeBatch::new();
256
257        // The operator drains `remap_input` and organizes new bindings that are not beyond
258        // `remap_input`'s frontier into the time ordered `remap_trace`.
259        //
260        // All received data events can either be reclocked to a time included in the
261        // `remap_trace`, or deferred until new associations are minted. Each data event that
262        // happens at some `FromTime` is mapped to the first `IntoTime` whose associated antichain
263        // is not less or equal to the input `FromTime`.
264        //
265        // As progress events are received from the `events` input, we can advance our
266        // held capability to track the least `IntoTime` a newly received `FromTime` could possibly
267        // map to and also compact the maintained `remap_trace` to that time.
268        move |frontiers| {
269            let Some(cap) = capset.get(0) else {
270                return;
271            };
272            let mut output = output.activate();
273            let mut session = output.session(cap);
274
275            // STEP 1. Accept new bindings into `pending_remap`.
276            // Advance all `into` times by `as_of`, and consolidate all updates at that frontier.
277            while let Some((_, data)) = remap_input.next() {
278                for (from, mut into, diff) in data.drain(..) {
279                    into.advance_by(as_of.borrow());
280                    remap_accum_buffer.update((into, from), diff.into_inner());
281                }
282            }
283            // Drain consolidated bindings into the `pending_remap` heap.
284            // Only do this once any of the `remap_input` frontier has passed `as_of`.
285            // For as long as the input frontier is less-equal `as_of`, we have no finalized times.
286            if !PartialOrder::less_equal(&frontiers[0].frontier(), &as_of.borrow()) {
287                for ((into, from), diff) in remap_accum_buffer.drain() {
288                    pending_remap.push(Reverse((into, from, diff)));
289                }
290            }
291
292            // STEP 2. Extract bindings not beyond `remap_frontier` and commit them into `remap_trace`.
293            let prev_remap_upper =
294                std::mem::replace(&mut remap_upper, frontiers[0].frontier().to_owned());
295            while let Some(update) = pending_remap.peek_mut() {
296                if !remap_upper.less_equal(&update.0.0) {
297                    let Reverse((into, from, diff)) = PeekMut::pop(update);
298                    remap_trace.push((from, into, diff));
299                } else {
300                    break;
301                }
302            }
303
304            // STEP 3. Receive new data updates
305            //         The `events` input describes arbitrary progress and data over `FromTime`,
306            //         which must be translated to `IntoTime`. Each `FromTime` can be found as the
307            //         first `IntoTime` associated with a `[FromTime]` that is not less or equal to
308            //         the input `FromTime`. Received events that are not yet associated to an
309            //         `IntoTime` are collected, and formed into a "chain batch": a sequence of
310            //         chains that results from sorting the updates by `FromTime`, and then
311            //         segmenting the sequence at elements where the partial order on `FromTime` is
312            //         violated.
313            let mut stash = Vec::new();
314            while let Some(event) = events.pull() {
315                match event {
316                    Event::Progress(changes) => {
317                        source_frontier.update_iter(changes.drain(..));
318                    }
319                    Event::Messages(_, data) => stash.append(data),
320                }
321            }
322            stash.sort_unstable_by(|(_, t1, _): &(D, FromTime, R), (_, t2, _)| t1.cmp(t2));
323            let mut new_source_updates = ChainBatch::from_iter(stash);
324
325            // STEP 4: Reclock new and deferred updates
326            //         We are now ready to step through the remap bindings in time order and
327            //         perform the following actions:
328            //         4.1. Match `new_source_updates` against the entirety of bindings contained
329            //              in the trace.
330            //         4.2. Match `deferred_source_updates` against the bindings that were just
331            //              added in the trace.
332            //         4.3. Reclock `source_frontier` to calculate the new since frontier of the
333            //              remap trace.
334            //
335            //         The steps above only make sense to perform if there are any times for which
336            //         we can correctly accumulate the remap trace, which is what we check here.
337            if remap_since.iter().all(|t| !remap_upper.less_equal(t)) {
338                let mut cur_binding = MutableAntichain::new();
339
340                let mut remap = remap_trace.iter().peekable();
341                let mut reclocked_source_frontier = remap_upper.clone();
342
343                // We go over all the times for which we might need to output data at. These times
344                // are restrticted to the times at which there exists an update in `remap_trace`
345                // and the minimum timestamp for the case where `remap_trace` is completely empty,
346                // in which case the minimum timestamp maps to the empty `FromTime` frontier and
347                // therefore all data events map to that minimum timestamp.
348                //
349                // The approach taken here will take time proportional to the number of elements in
350                // `remap_trace`. During development an alternative approach was considered where
351                // the updates in `remap_trace` are instead fully materialized into an ordered list
352                // of antichains in which every data update can be binary searched into. The are
353                // two concerns with this alternative approach that led to preferring this one:
354                // 1. Materializing very wide antichains with small differences between them
355                //    needs memory proportial to the number of bindings times the width of the
356                //    antichain.
357                // 2. It locks in the requirement of a totally ordered target timestamp since only
358                //    in that case can one binary search a binding.
359                // The linear scan is expected to be fine due to the run-to-completion nature of
360                // the operator since its cost is amortized among the number of outstanding
361                // updates.
362                let mut min_time = IntoTime::minimum();
363                min_time.advance_by(remap_since.borrow());
364                interesting_times.push(min_time);
365                interesting_times.extend(remap_trace.iter().map(|(_, t, _)| t.clone()));
366                interesting_times.dedup();
367                for cur_time in interesting_times.drain(..) {
368                    // 4.0. Load updates of `cur_time` from the trace into `cur_binding` to
369                    //      construct the `[FromTime]` frontier that `cur_time` maps to.
370                    while let Some((t_from, _, diff)) = remap.next_if(|(_, t, _)| t == &cur_time) {
371                        binding_buffer.push((t_from.clone(), *diff));
372                    }
373                    cur_binding.update_iter(binding_buffer.drain(..));
374                    let cur_binding = cur_binding.frontier();
375
376                    // 4.1. Extract updates from `new_source_updates`
377                    for (data, _, diff) in new_source_updates.extract(cur_binding) {
378                        session.give((data, cur_time.clone(), diff));
379                    }
380
381                    // 4.2. Extract updates from `deferred_source_updates`.
382                    //      The deferred updates contain all updates that were not able to be
383                    //      reclocked with the bindings until `prev_remap_upper`. For this reason
384                    //      we only need to reconsider these updates when we start looking at new
385                    //      bindings, i.e bindings that are beyond `prev_remap_upper`.
386                    if prev_remap_upper.less_equal(&cur_time) {
387                        deferred_source_updates.retain_mut(|batch| {
388                            for (data, _, diff) in batch.extract(cur_binding) {
389                                session.give((data, cur_time.clone(), diff));
390                            }
391                            // Retain non-empty batches
392                            !batch.is_empty()
393                        })
394                    }
395
396                    // 4.3. Reclock `source_frontier`
397                    //      If any FromTime in source frontier could possibly be reclocked to this
398                    //      binding then we must maintain our capability to emit data at that time
399                    //      and not compact past it.
400                    if source_frontier
401                        .frontier()
402                        .iter()
403                        .any(|t| !cur_binding.less_equal(t))
404                    {
405                        reclocked_source_frontier.insert(cur_time);
406                    }
407                }
408
409                // STEP 5. Downgrade capability and compact remap trace
410                capset.downgrade(&reclocked_source_frontier.borrow());
411                remap_since = reclocked_source_frontier;
412                for (_, t, _) in remap_trace.iter_mut() {
413                    t.advance_by(remap_since.borrow());
414                }
415                consolidation::consolidate_updates(&mut remap_trace);
416                remap_trace
417                    .sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| t1.cmp(t2));
418            }
419
420            // STEP 6. Tidy up deferred updates
421            //         Deferred updates are represented as a list of chain batches where each batch
422            //         contains two times the updates of the batch proceeding it. This organization
423            //         leads to a logarithmic number of batches with respect to the outstanding
424            //         number of updates.
425            deferred_source_updates.sort_unstable_by_key(|b| Reverse(b.len()));
426            if !new_source_updates.is_empty() {
427                deferred_source_updates.push(new_source_updates);
428            }
429            let dsu = &mut deferred_source_updates;
430            while dsu.len() > 1 && (dsu[dsu.len() - 1].len() >= dsu[dsu.len() - 2].len() / 2) {
431                let a = dsu.pop().unwrap();
432                let b = dsu.pop().unwrap();
433                dsu.push(a.merge_with(b));
434            }
435        }
436    });
437
438    (Box::new(pusher), reclocked.as_collection())
439}
440
441/// A batch of differential updates that vary over some partial order. This type maintains the data
442/// as a set of chains that allows for efficient extraction of batches given a frontier.
443#[derive(Debug, PartialEq)]
444struct ChainBatch<D, T, R> {
445    /// A list of chains (sets of mutually comparable times) sorted by the partial order.
446    chains: Vec<VecDeque<(D, T, R)>>,
447}
448
449impl<D, T: Timestamp, R> ChainBatch<D, T, R> {
450    /// Extracts all updates with time not greater or equal to any time in `upper`.
451    fn extract<'a>(
452        &'a mut self,
453        upper: AntichainRef<'a, T>,
454    ) -> impl Iterator<Item = (D, T, R)> + 'a {
455        self.chains.retain(|chain| !chain.is_empty());
456        self.chains.iter_mut().flat_map(move |chain| {
457            // A chain is a sorted list of mutually comparable elements so we keep extracting
458            // elements that are not beyond upper.
459            std::iter::from_fn(move || {
460                let (_, into, _) = chain.front()?;
461                if !upper.less_equal(into) {
462                    chain.pop_front()
463                } else {
464                    None
465                }
466            })
467        })
468    }
469
470    fn merge_with(
471        mut self: ChainBatch<D, T, R>,
472        mut other: ChainBatch<D, T, R>,
473    ) -> ChainBatch<D, T, R>
474    where
475        D: ExchangeData,
476        T: Timestamp,
477        R: Semigroup,
478    {
479        let mut updates1 = self.chains.drain(..).flatten().peekable();
480        let mut updates2 = other.chains.drain(..).flatten().peekable();
481
482        let merged = std::iter::from_fn(|| {
483            match (updates1.peek(), updates2.peek()) {
484                (Some((d1, t1, _)), Some((d2, t2, _))) => {
485                    match (t1, d1).cmp(&(t2, d2)) {
486                        Ordering::Less => updates1.next(),
487                        Ordering::Greater => updates2.next(),
488                        // If the same (d, t) pair is found, consolidate their diffs
489                        Ordering::Equal => {
490                            let (d1, t1, mut r1) = updates1.next().unwrap();
491                            while let Some((_, _, r)) =
492                                updates1.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
493                            {
494                                r1.plus_equals(&r);
495                            }
496                            while let Some((_, _, r)) =
497                                updates2.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
498                            {
499                                r1.plus_equals(&r);
500                            }
501                            Some((d1, t1, r1))
502                        }
503                    }
504                }
505                (Some(_), None) => updates1.next(),
506                (None, Some(_)) => updates2.next(),
507                (None, None) => None,
508            }
509        });
510
511        ChainBatch::from_iter(merged.filter(|(_, _, r)| !r.is_zero()))
512    }
513
514    /// Returns the number of updates in the batch.
515    fn len(&self) -> usize {
516        self.chains.iter().map(|chain| chain.len()).sum()
517    }
518
519    /// Returns true if the batch contains no updates.
520    fn is_empty(&self) -> bool {
521        self.len() == 0
522    }
523}
524
525impl<D, T: Timestamp, R> FromIterator<(D, T, R)> for ChainBatch<D, T, R> {
526    /// Computes the chain decomposition of updates according to the partial order `T`.
527    fn from_iter<I: IntoIterator<Item = (D, T, R)>>(updates: I) -> Self {
528        let mut chains = vec![];
529        let mut updates = updates.into_iter();
530        if let Some((d, t, r)) = updates.next() {
531            let mut chain = VecDeque::new();
532            chain.push_back((d, t, r));
533            for (d, t, r) in updates {
534                let prev_t = &chain[chain.len() - 1].1;
535                if !PartialOrder::less_equal(prev_t, &t) {
536                    chains.push(chain);
537                    chain = VecDeque::new();
538                }
539                chain.push_back((d, t, r));
540            }
541            chains.push(chain);
542        }
543        Self { chains }
544    }
545}
546
547#[cfg(test)]
548mod test {
549    use std::sync::atomic::AtomicUsize;
550    use std::sync::mpsc::{Receiver, TryRecvError};
551
552    use differential_dataflow::consolidation;
553    use differential_dataflow::input::{Input, InputSession};
554    use serde::{Deserialize, Serialize};
555    use timely::communication::allocator::Thread;
556    use timely::dataflow::operators::capture::{Event, Extract};
557    use timely::dataflow::operators::unordered_input::UnorderedHandle;
558    use timely::dataflow::operators::{ActivateCapability, Capture, UnorderedInput};
559    use timely::progress::PathSummary;
560    use timely::progress::timestamp::Refines;
561    use timely::worker::Worker;
562
563    use crate::capture::PusherCapture;
564    use crate::order::Partitioned;
565
566    use super::*;
567
568    type Diff = Overflowing<i64>;
569    type FromTime = Partitioned<u64, u64>;
570    type IntoTime = u64;
571    type BindingHandle<FromTime> = InputSession<IntoTime, FromTime, Diff>;
572    type DataHandle<D, FromTime> = (
573        UnorderedHandle<FromTime, (D, FromTime, Diff)>,
574        ActivateCapability<FromTime>,
575    );
576    type ReclockedStream<D> = Receiver<Event<IntoTime, Vec<(D, IntoTime, Diff)>>>;
577
578    /// A helper function that sets up a dataflow program to test the reclocking operator. Each
579    /// test provides a test logic closure which accepts four arguments:
580    ///
581    /// * A reference to the worker that allows the test to step the computation
582    /// * A `BindingHandle` that allows the test to manipulate the remap bindings
583    /// * A `DataHandle` that allows the test to submit the data to be reclocked
584    /// * A `ReclockedStream` that allows observing the result of the reclocking process
585    fn harness<FromTime, D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
586    where
587        FromTime: Timestamp + Refines<()>,
588        D: ExchangeData,
589        F: FnOnce(
590                &mut Worker<Thread>,
591                BindingHandle<FromTime>,
592                DataHandle<D, FromTime>,
593                ReclockedStream<D>,
594            ) -> R
595            + Send
596            + Sync
597            + 'static,
598        R: Send + 'static,
599    {
600        timely::execute_directly(move |worker| {
601            let (bindings, data, data_cap, reclocked) = worker.dataflow::<(), _, _>(|scope| {
602                let (bindings, data_pusher, reclocked) =
603                    scope.scoped::<IntoTime, _, _>("IntoScope", move |scope| {
604                        let (binding_handle, binding_collection) = scope.new_collection();
605                        let (data_pusher, reclocked_collection) =
606                            reclock(&binding_collection, as_of);
607                        let reclocked_capture = reclocked_collection.inner.capture();
608                        (binding_handle, data_pusher, reclocked_capture)
609                    });
610
611                let (data, data_cap) = scope.scoped::<FromTime, _, _>("FromScope", move |scope| {
612                    let ((handle, cap), data) = scope.new_unordered_input();
613                    data.capture_into(PusherCapture(data_pusher));
614                    (handle, cap)
615                });
616
617                (bindings, data, data_cap, reclocked)
618            });
619
620            test_logic(worker, bindings, (data, data_cap), reclocked)
621        })
622    }
623
624    /// Steps the worker four times which is the required number of times for both data and
625    /// frontier updates to propagate across the two scopes and into the probing channels.
626    fn step(worker: &mut Worker<Thread>) {
627        for _ in 0..4 {
628            worker.step();
629        }
630    }
631
632    #[mz_ore::test]
633    fn basic_reclocking() {
634        let as_of = Antichain::from_elem(IntoTime::minimum());
635        harness::<FromTime, _, _, _>(
636            as_of,
637            |worker, bindings, (mut data, data_cap), reclocked| {
638                // Reclock everything at the minimum IntoTime
639                bindings.close();
640                data.session(data_cap)
641                    .give(('a', Partitioned::minimum(), Diff::ONE));
642                step(worker);
643                let extracted = reclocked.extract();
644                let expected = vec![(0, vec![('a', 0, Diff::ONE)])];
645                assert_eq!(extracted, expected);
646            },
647        )
648    }
649
650    /// Generates a `Partitioned<u64, u64>` Antichain where all the provided
651    /// partitions are at the specified offset and the gaps in between are filled with range
652    /// timestamps at offset zero.
653    fn partitioned_frontier<I>(items: I) -> Antichain<Partitioned<u64, u64>>
654    where
655        I: IntoIterator<Item = (u64, u64)>,
656    {
657        let mut frontier = Antichain::new();
658        let mut prev = 0;
659        for (pid, offset) in items {
660            if prev < pid {
661                frontier.insert(Partitioned::new_range(prev, pid - 1, 0));
662            }
663            frontier.insert(Partitioned::new_singleton(pid, offset));
664            prev = pid + 1
665        }
666        frontier.insert(Partitioned::new_range(prev, u64::MAX, 0));
667        frontier
668    }
669
670    #[mz_ore::test]
671    fn test_basic_usage() {
672        let as_of = Antichain::from_elem(IntoTime::minimum());
673        harness(
674            as_of,
675            |worker, mut bindings, (mut data, data_cap), reclocked| {
676                // Reclock offsets 1 and 3 to timestamp 1000
677                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
678                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
679                for time in partitioned_frontier([(0, 4)]) {
680                    bindings.update_at(time, 1000, Diff::ONE);
681                }
682                bindings.advance_to(1001);
683                bindings.flush();
684                data.session(data_cap.clone()).give_iterator(
685                    vec![
686                        (1, Partitioned::new_singleton(0, 1), Diff::ONE),
687                        (1, Partitioned::new_singleton(0, 1), Diff::ONE),
688                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
689                    ]
690                    .into_iter(),
691                );
692
693                step(worker);
694                assert_eq!(
695                    reclocked.try_recv(),
696                    Ok(Event::Messages(
697                        0u64,
698                        vec![
699                            (1, 1000, Diff::ONE),
700                            (1, 1000, Diff::ONE),
701                            (3, 1000, Diff::ONE)
702                        ]
703                    ))
704                );
705                assert_eq!(
706                    reclocked.try_recv(),
707                    Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
708                );
709
710                // Reclock more messages for offsets 3 to the same timestamp
711                data.session(data_cap.clone()).give_iterator(
712                    vec![
713                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
714                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
715                    ]
716                    .into_iter(),
717                );
718                step(worker);
719                assert_eq!(
720                    reclocked.try_recv(),
721                    Ok(Event::Messages(
722                        1000u64,
723                        vec![(3, 1000, Diff::ONE), (3, 1000, Diff::ONE)]
724                    ))
725                );
726
727                // Drop the capability which should advance the reclocked frontier to 1001.
728                drop(data_cap);
729                step(worker);
730                assert_eq!(
731                    reclocked.try_recv(),
732                    Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
733                );
734            },
735        );
736    }
737
738    #[mz_ore::test]
739    fn test_reclock_frontier() {
740        let as_of = Antichain::from_elem(IntoTime::minimum());
741        harness::<_, (), _, _>(
742            as_of,
743            |worker, mut bindings, (_data, data_cap), reclocked| {
744                // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
745                // frontier.
746                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
747                bindings.advance_to(1);
748                bindings.flush();
749                step(worker);
750                assert_eq!(
751                    reclocked.try_recv(),
752                    Ok(Event::Progress(vec![(0, -1), (1, 1)]))
753                );
754
755                // Mint a couple of bindings for multiple partitions
756                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
757                for time in partitioned_frontier([(1, 10)]) {
758                    bindings.update_at(time.clone(), 1000, Diff::ONE);
759                    bindings.update_at(time, 2000, Diff::MINUS_ONE);
760                }
761                for time in partitioned_frontier([(1, 10), (2, 10)]) {
762                    bindings.update_at(time, 2000, Diff::ONE);
763                }
764                bindings.advance_to(2001);
765                bindings.flush();
766
767                // The initial frontier should now map to the minimum between the two partitions
768                step(worker);
769                step(worker);
770                assert_eq!(
771                    reclocked.try_recv(),
772                    Ok(Event::Progress(vec![(1, -1), (1000, 1)]))
773                );
774
775                // Downgrade data frontier such that only one of the partitions is advanced
776                let mut part1_cap = data_cap.delayed(&Partitioned::new_singleton(1, 9));
777                let mut part2_cap = data_cap.delayed(&Partitioned::new_singleton(2, 0));
778                let _rest_cap = data_cap.delayed(&Partitioned::new_range(3, u64::MAX, 0));
779                drop(data_cap);
780                step(worker);
781                assert_eq!(reclocked.try_recv(), Err(TryRecvError::Empty));
782
783                // Downgrade the data frontier past the first binding
784                part1_cap.downgrade(&Partitioned::new_singleton(1, 10));
785                step(worker);
786                assert_eq!(
787                    reclocked.try_recv(),
788                    Ok(Event::Progress(vec![(1000, -1), (2000, 1)]))
789                );
790
791                // Downgrade the data frontier past the second binding
792                part2_cap.downgrade(&Partitioned::new_singleton(2, 10));
793                step(worker);
794                assert_eq!(
795                    reclocked.try_recv(),
796                    Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
797                );
798
799                // Advance the binding frontier and confirm that we get to the next timestamp
800                bindings.advance_to(3001);
801                bindings.flush();
802                step(worker);
803                assert_eq!(
804                    reclocked.try_recv(),
805                    Ok(Event::Progress(vec![(2001, -1), (3001, 1)]))
806                );
807            },
808        );
809    }
810
811    #[mz_ore::test]
812    fn test_reclock() {
813        let as_of = Antichain::from_elem(IntoTime::minimum());
814        harness(
815            as_of,
816            |worker, mut bindings, (mut data, data_cap), reclocked| {
817                // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
818                // frontier.
819                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
820
821                // Setup more precise capabilities for the rest of the test
822                let mut part0_cap = data_cap.delayed(&Partitioned::new_singleton(0, 0));
823                let rest_cap = data_cap.delayed(&Partitioned::new_range(1, u64::MAX, 0));
824                drop(data_cap);
825
826                // Reclock offsets 1 and 2 to timestamp 1000
827                data.session(part0_cap.clone()).give_iterator(
828                    vec![
829                        (1, Partitioned::new_singleton(0, 1), Diff::ONE),
830                        (2, Partitioned::new_singleton(0, 2), Diff::ONE),
831                    ]
832                    .into_iter(),
833                );
834
835                part0_cap.downgrade(&Partitioned::new_singleton(0, 3));
836                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
837                bindings.update_at(part0_cap.time().clone(), 1000, Diff::ONE);
838                bindings.update_at(rest_cap.time().clone(), 1000, Diff::ONE);
839                bindings.advance_to(1001);
840                bindings.flush();
841                step(worker);
842                assert_eq!(
843                    reclocked.try_recv(),
844                    Ok(Event::Messages(
845                        0,
846                        vec![(1, 1000, Diff::ONE), (2, 1000, Diff::ONE)]
847                    ))
848                );
849                assert_eq!(
850                    reclocked.try_recv(),
851                    Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
852                );
853                assert_eq!(
854                    reclocked.try_recv(),
855                    Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
856                );
857
858                // Reclock offsets 3 and 4 to timestamp 2000
859                data.session(part0_cap.clone()).give_iterator(
860                    vec![
861                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
862                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
863                        (4, Partitioned::new_singleton(0, 4), Diff::ONE),
864                    ]
865                    .into_iter(),
866                );
867                bindings.update_at(part0_cap.time().clone(), 2000, Diff::MINUS_ONE);
868                part0_cap.downgrade(&Partitioned::new_singleton(0, 5));
869                bindings.update_at(part0_cap.time().clone(), 2000, Diff::ONE);
870                bindings.advance_to(2001);
871                bindings.flush();
872                step(worker);
873                assert_eq!(
874                    reclocked.try_recv(),
875                    Ok(Event::Messages(
876                        1001,
877                        vec![
878                            (3, 2000, Diff::ONE),
879                            (3, 2000, Diff::ONE),
880                            (4, 2000, Diff::ONE)
881                        ]
882                    ))
883                );
884                assert_eq!(
885                    reclocked.try_recv(),
886                    Ok(Event::Progress(vec![(1001, -1), (2000, 1)]))
887                );
888                assert_eq!(
889                    reclocked.try_recv(),
890                    Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
891                );
892            },
893        );
894    }
895
896    #[mz_ore::test]
897    fn test_reclock_gh16318() {
898        let as_of = Antichain::from_elem(IntoTime::minimum());
899        harness(
900            as_of,
901            |worker, mut bindings, (mut data, data_cap), reclocked| {
902                // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
903                // frontier.
904                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
905                // First mint bindings for 0 at timestamp 1000
906                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
907                for time in partitioned_frontier([(0, 50)]) {
908                    bindings.update_at(time, 1000, Diff::ONE);
909                }
910                // Then only for 1 at timestamp 2000
911                for time in partitioned_frontier([(0, 50)]) {
912                    bindings.update_at(time, 2000, Diff::MINUS_ONE);
913                }
914                for time in partitioned_frontier([(0, 50), (1, 50)]) {
915                    bindings.update_at(time, 2000, Diff::ONE);
916                }
917                // Then again only for 0 at timestamp 3000
918                for time in partitioned_frontier([(0, 50), (1, 50)]) {
919                    bindings.update_at(time, 3000, Diff::MINUS_ONE);
920                }
921                for time in partitioned_frontier([(0, 100), (1, 50)]) {
922                    bindings.update_at(time, 3000, Diff::ONE);
923                }
924                bindings.advance_to(3001);
925                bindings.flush();
926
927                // Reclockng (0, 50) must ignore the updates on the FromTime frontier that happened at
928                // timestamp 2000 since those are completely unrelated
929                data.session(data_cap)
930                    .give((50, Partitioned::new_singleton(0, 50), Diff::ONE));
931                step(worker);
932                assert_eq!(
933                    reclocked.try_recv(),
934                    Ok(Event::Messages(0, vec![(50, 3000, Diff::ONE),]))
935                );
936                assert_eq!(
937                    reclocked.try_recv(),
938                    Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
939                );
940                assert_eq!(
941                    reclocked.try_recv(),
942                    Ok(Event::Progress(vec![(1000, -1), (3001, 1)]))
943                );
944            },
945        );
946    }
947
948    /// Test that compact(reclock(remap, source)) == reclock(compact(remap), source)
949    #[mz_ore::test]
950    fn test_compaction() {
951        let mut remap = vec![];
952        remap.push((Partitioned::minimum(), 0, Diff::ONE));
953        // Reclock offsets 1 and 2 to timestamp 1000
954        remap.push((Partitioned::minimum(), 1000, Diff::MINUS_ONE));
955        for time in partitioned_frontier([(0, 3)]) {
956            remap.push((time, 1000, Diff::ONE));
957        }
958        // Reclock offsets 3 and 4 to timestamp 2000
959        for time in partitioned_frontier([(0, 3)]) {
960            remap.push((time, 2000, Diff::MINUS_ONE));
961        }
962        for time in partitioned_frontier([(0, 5)]) {
963            remap.push((time, 2000, Diff::ONE));
964        }
965
966        let source_updates = vec![
967            (1, Partitioned::new_singleton(0, 1), Diff::ONE),
968            (2, Partitioned::new_singleton(0, 2), Diff::ONE),
969            (3, Partitioned::new_singleton(0, 3), Diff::ONE),
970            (4, Partitioned::new_singleton(0, 4), Diff::ONE),
971        ];
972
973        let since = Antichain::from_elem(1500);
974
975        // Compute reclock(remap, source)
976        let as_of = Antichain::from_elem(IntoTime::minimum());
977        let remap1 = remap.clone();
978        let source_updates1 = source_updates.clone();
979        let reclock_remap = harness(
980            as_of,
981            move |worker, mut bindings, (mut data, data_cap), reclocked| {
982                for (from_ts, into_ts, diff) in remap1 {
983                    bindings.update_at(from_ts, into_ts, diff);
984                }
985                bindings.close();
986                data.session(data_cap)
987                    .give_iterator(source_updates1.iter().cloned());
988                step(worker);
989                reclocked.extract()
990            },
991        );
992        // Compute compact(reclock(remap, source))
993        let mut compact_reclock_remap = reclock_remap;
994        for (t, updates) in compact_reclock_remap.iter_mut() {
995            t.advance_by(since.borrow());
996            for (_, t, _) in updates.iter_mut() {
997                t.advance_by(since.borrow());
998            }
999        }
1000
1001        // Compute compact(remap)
1002        let mut compact_remap = remap;
1003        for (_, t, _) in compact_remap.iter_mut() {
1004            t.advance_by(since.borrow());
1005        }
1006        consolidation::consolidate_updates(&mut compact_remap);
1007        // Compute reclock(compact(remap), source)
1008        let reclock_compact_remap = harness(
1009            since,
1010            move |worker, mut bindings, (mut data, data_cap), reclocked| {
1011                for (from_ts, into_ts, diff) in compact_remap {
1012                    bindings.update_at(from_ts, into_ts, diff);
1013                }
1014                bindings.close();
1015                data.session(data_cap)
1016                    .give_iterator(source_updates.iter().cloned());
1017                step(worker);
1018                reclocked.extract()
1019            },
1020        );
1021
1022        let expected = vec![(
1023            1500,
1024            vec![
1025                (1, 1500, Diff::ONE),
1026                (2, 1500, Diff::ONE),
1027                (3, 2000, Diff::ONE),
1028                (4, 2000, Diff::ONE),
1029            ],
1030        )];
1031        assert_eq!(expected, reclock_compact_remap);
1032        assert_eq!(expected, compact_reclock_remap);
1033    }
1034
1035    #[mz_ore::test]
1036    fn test_chainbatch_merge() {
1037        let a = ChainBatch::from_iter([('a', 0, 1)]);
1038        let b = ChainBatch::from_iter([('a', 0, -1), ('a', 1, 1)]);
1039        assert_eq!(a.merge_with(b), ChainBatch::from_iter([('a', 1, 1)]));
1040    }
1041
1042    #[mz_ore::test]
1043    #[cfg_attr(miri, ignore)] // too slow
1044    fn test_binding_consolidation() {
1045        use std::sync::atomic::Ordering;
1046
1047        #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1048        struct Time(u64);
1049
1050        // A counter of the number of active Time instances
1051        static INSTANCES: AtomicUsize = AtomicUsize::new(0);
1052
1053        impl Time {
1054            fn new(time: u64) -> Self {
1055                INSTANCES.fetch_add(1, Ordering::Relaxed);
1056                Self(time)
1057            }
1058        }
1059
1060        impl Clone for Time {
1061            fn clone(&self) -> Self {
1062                INSTANCES.fetch_add(1, Ordering::Relaxed);
1063                Self(self.0)
1064            }
1065        }
1066
1067        impl Drop for Time {
1068            fn drop(&mut self) {
1069                INSTANCES.fetch_sub(1, Ordering::Relaxed);
1070            }
1071        }
1072
1073        impl Timestamp for Time {
1074            type Summary = ();
1075
1076            fn minimum() -> Self {
1077                Time::new(0)
1078            }
1079        }
1080
1081        impl PathSummary<Time> for () {
1082            fn results_in(&self, src: &Time) -> Option<Time> {
1083                Some(src.clone())
1084            }
1085
1086            fn followed_by(&self, _other: &()) -> Option<Self> {
1087                Some(())
1088            }
1089        }
1090
1091        impl Refines<()> for Time {
1092            fn to_inner(_: ()) -> Self {
1093                Self::minimum()
1094            }
1095            fn to_outer(self) -> () {}
1096            fn summarize(_path: ()) {}
1097        }
1098
1099        impl PartialOrder for Time {
1100            fn less_equal(&self, other: &Self) -> bool {
1101                self.0.less_equal(&other.0)
1102            }
1103        }
1104
1105        let as_of = 1000;
1106
1107        // Test that supplying a single big batch of unconsolidated bindings gets
1108        // consolidated after a single worker step.
1109        harness::<Time, u64, _, _>(
1110            Antichain::from_elem(as_of),
1111            move |worker, mut bindings, _, _| {
1112                step(worker);
1113                let instances_before = INSTANCES.load(Ordering::Relaxed);
1114                for ts in 0..as_of {
1115                    if ts > 0 {
1116                        bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1117                    }
1118                    bindings.update_at(Time::new(ts), ts, Diff::ONE);
1119                }
1120                bindings.advance_to(as_of);
1121                bindings.flush();
1122                step(worker);
1123                let instances_after = INSTANCES.load(Ordering::Relaxed);
1124                // The extra instances live in a ChangeBatch which considers compaction when more
1125                // than 32 elements are inside.
1126                assert!(instances_after - instances_before < 32);
1127            },
1128        );
1129
1130        // Test that a slow feed of uncompacted bindings over multiple steps never leads to an
1131        // excessive number of bindings held in memory.
1132        harness::<Time, u64, _, _>(
1133            Antichain::from_elem(as_of),
1134            move |worker, mut bindings, _, _| {
1135                step(worker);
1136                let instances_before = INSTANCES.load(Ordering::Relaxed);
1137                for ts in 0..as_of {
1138                    if ts > 0 {
1139                        bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1140                    }
1141                    bindings.update_at(Time::new(ts), ts, Diff::ONE);
1142                    bindings.advance_to(ts + 1);
1143                    bindings.flush();
1144                    step(worker);
1145                    let instances_now = INSTANCES.load(Ordering::Relaxed);
1146                    // The extra instances live in a ChangeBatch which considers compaction when
1147                    // more than 32 elements are inside.
1148                    assert!(instances_now - instances_before < 32);
1149                }
1150            },
1151        );
1152    }
1153}