Skip to main content

mz_timely_util/
reclock.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! ## Notation
17//!
18//! Collections are represented with capital letters (T, S, R), collection traces as bold letters
19//! (𝐓, 𝐒, 𝐑), and difference traces as δ𝐓.
20//!
21//! Indexing a collection trace 𝐓 to obtain its version at `t` is written as 𝐓(t). Indexing a
22//! collection to obtain the multiplicity of a record `x` is written as T\[x\]. These can be combined
23//! to obtain the multiplicity of a record `x` at some version `t` as 𝐓(t)\[x\].
24//!
25//! ## Overview
26//!
27//! Reclocking transforms a source collection `S` that evolves with some timestamp `FromTime` into
28//! a collection `T` that evolves with some other timestamp `IntoTime`. The reclocked collection T
29//! contains all updates `u ∈ S` that are not beyond some `FromTime` frontier R(t). The collection
30//! `R` is called the remap collection.
31//!
32//! More formally, for some arbitrary time `t` of `IntoTime` and some arbitrary record `x`, the
33//! reclocked collection `T(t)[x]` is defined to be the `sum{δ𝐒(s)[x]: !(𝐑(t) βͺ― s)}`. Since this
34//! holds for any record we can write the definition of Reclock(𝐒, 𝐑) as:
35//!
36//! > Reclock(𝐒, 𝐑) β‰œ 𝐓: βˆ€ t ∈ IntoTime : 𝐓(t) = sum{δ𝐒(s): !(𝐑(t) βͺ― s)}
37//!
38//! In order for the reclocked collection `T` to have a sensible definition of progress we require
39//! that `t1 ≀ t2 β‡’ 𝐑(t1) βͺ― 𝐑(t2)` where the first `≀` is the partial order of `IntoTime` and the
40//! second one the partial order of `FromTime` antichains.
41//!
42//! ## Total order simplification
43//!
44//! In order to simplify the implementation we will require that `IntoTime` is a total order. This
45//! limitation can be lifted in the future but further elaboration on the mechanics of reclocking
46//! is required to ensure a correct implementation.
47//!
48//! ## The difference trace
49//!
50//! By the definition of difference traces we have:
51//!
52//! ```text
53//!     δ𝐓(t) = T(t) - sum{δ𝐓(s): s < t}
54//! ```
55//!
56//! Due to the total order assumption we only need to consider two cases.
57//!
58//! **Case 1:** `t` is the minimum timestamp
59//!
60//! In this case `sum{δ𝐓(s): s < t}` is the empty set and so we obtain:
61//!
62//! ```text
63//!     δ𝐓(min) = T(min) = sum{δ𝐒(s): !(𝐑(min) ≀ s}
64//! ```
65//!
66//! **Case 2:** `t` is a timestamp with a predecessor `prev`
67//!
68//! In this case `sum{δ𝐓(s): s < t}` is equal to `T(prev)` because:
69//!
70//! ```text
71//!     sum{δ𝐓(s): s < t} = sum{δ𝐓(s): s ≀ prev} + sum{δ𝐓(s): prev < s < t}
72//!                       = T(prev) + βˆ…
73//!                       = T(prev)
74//! ```
75//!
76//! And therefore the difference trace of T is:
77//!
78//! ```text
79//!     δ𝐓(t) = 𝐓(t) - 𝐓(prev)
80//!           = sum{δ𝐒(s): !(𝐑(t) βͺ― s)} - sum{δ𝐒(s): !(𝐑(prev) βͺ― s)}
81//!           = sum{δ𝐒(s): (𝐑(prev) βͺ― s) ∧ !(𝐑(t) βͺ― s)}
82//! ```
83//!
84//! ## Unique mapping property
85//!
86//! Given the definition above we can derive the fact that for any source difference δ𝐒(s) there is
87//! at most one target timestamp t that it must be reclocked to. This property can be exploited by
88//! the implementation of the operator as it can safely discard source updates once a matching
89//! Ξ΄T(t) has been found, making it "stateless" with respect to the source trace. A formal proof of
90//! this property is [provided below](#unique-mapping-property-proof).
91//!
92//! ## Operational description
93//!
94//! The operator follows a run-to-completion model where on each scheduling it completes all
95//! outstanding work that can be completed.
96//!
97//! ### Unique mapping property proof
98//!
99//! This section contains the formal proof the unique mapping property. The proof follows the
100//! structure proof notation created by Leslie Lamport. Readers unfamiliar with structured proofs
101//! can read about them here <https://lamport.azurewebsites.net/pubs/proof.pdf>.
102//!
103//! #### Statement
104//!
105//! AtMostOne(X, Ο†(x)) β‰œ βˆ€ x1, x2 ∈ X : Ο†(x1) ∧ Ο†(x2) β‡’ x1 = x2
106//!
107//! * **THEOREM** UniqueMapping β‰œ
108//!     * **ASSUME**
109//!         * **NEW** (FromTime, βͺ―) ∈ PartiallyOrderedTimestamps
110//!         * **NEW** (IntoTime, ≀) ∈ TotallyOrderedTimestamps
111//!         * **NEW** 𝐒 ∈ SetOfCollectionTraces(FromTime)
112//!         * **NEW** 𝐑 ∈ SetOfCollectionTraces(IntoTime)
113//!         * βˆ€ t ∈ IntoTime: 𝐑(t) ∈ SetOfAntichains(FromTime)
114//!         * βˆ€ t1, t1 ∈ IntoTime: t1 ≀ t2 β‡’ 𝐑(t1) βͺ― 𝐑(t2)
115//!         * **NEW** 𝐓 = Reclock(𝐒, 𝐑)
116//!     * **PROVE**  βˆ€ s ∈ FromTime : AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
117//!
118//! #### Proof
119//!
120//! 1. **SUFFICES ASSUME** βˆƒ s ∈ FromTime: Β¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
121//!     * **PROVE FALSE**
122//!     * _By proof by contradiction._
123//! 2. **PICK** s ∈ FromTime : Β¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
124//!    * _Proof: Such time exists by <1>1._
125//! 3. βˆƒ t1, t2 ∈ IntoTime : t1 β‰  t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
126//!     1. Β¬(βˆ€ x1, x2 ∈ X : (δ𝐒(s) ∈ δ𝐓(x1)) ∧ (δ𝐒(s) ∈ δ𝐓(x2)) β‡’ x1 = x2)
127//!         * _Proof: By <1>2 and definition of AtMostOne._
128//!     2. Q.E.D
129//!         * _Proof: By <2>1, quantifier negation rules, and theorem of propositional logic Β¬(P β‡’ Q) ≑ P ∧ Β¬Q._
130//! 4. **PICK** t1, t2 ∈ IntoTime : t1 < t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
131//!    * _Proof: By <1>3. Assume t1 < t2 without loss of generality._
132//! 5. Β¬(𝐑(t1) βͺ― s)
133//!     1. **CASE** t1 = min(IntoTime)
134//!         1. δ𝐓(t1) = sum{δ𝐒(s): !(𝐑(t1)) βͺ― s}
135//!             * _Proof: By definition of δ𝐓(min)._
136//!         2. δ𝐒(s) ∈ δ𝐓(t1)
137//!             * _Proof: By <1>4._
138//!         3. Q.E.D
139//!             * _Proof: By <3>1 and <3>2._
140//!     2. **CASE** t1 > min(IntoTime)
141//!         1. **PICK** t1_prev = Predecessor(t1)
142//!             * _Proof: Predecessor exists because the set {t: t < t1} is non-empty since it must contain at least min(IntoTime)._
143//!         2. δ𝐓(t1) = sum{δ𝐒(s): (𝐑(t1_prev) βͺ― s) ∧ !(𝐑(t1) βͺ― s)}
144//!             * _Proof: By definition of δ𝐓(t)._
145//!         3. δ𝐒(s) ∈ δ𝐓(t1)
146//!             * _Proof: By <1>4._
147//!         3. Q.E.D
148//!             * _Proof: By <3>2 and <3>3._
149//!     3. Q.E.D
150//!         * _Proof: From cases <2>1 and <2>2 which are exhaustive_
151//! 6. **PICK** t2_prev ∈ IntoTime : t2_prev = Predecessor(t2)
152//!    * _Proof: Predecessor exists because by <1>4 the set {t: t < t2} is non empty since it must contain at least t1._
153//! 7. t1 ≀ t2_prev
154//!    * _Proof: t1 ∈ {t: t < t2} and t2_prev is the maximum element of the set._
155//! 8. 𝐑(t2) βͺ― s
156//!     1. t2 > min(IntoTime)
157//!         * _Proof: By <1>5._
158//!     2. **PICK** t2_prev = Predecessor(t2)
159//!         * _Proof: Predecessor exists because the set {t: t < t2} is non-empty since it must contain at least min(IntoTime)._
160//!     3. δ𝐓(t) = sum{δ𝐒(s): (𝐑(t2_prev) βͺ― s) ∧ !(𝐑(t) βͺ― s)}
161//!         * _Proof: By definition of δ𝐓(t)_
162//!     4. δ𝐒(s) ∈ δ𝐓(t1)
163//!         * _Proof: By <1>4._
164//!     5. Q.E.D
165//!         * _Proof: By <2>3 and <2>4._
166//! 9. 𝐑(t1) βͺ― 𝐑(t2_prev)
167//!     * _Proof: By <1>.7 and hypothesis on R_
168//! 10. 𝐑(t1) βͺ― s
169//!     * _Proof: By <1>8 and <1>9._
170//! 11. Q.E.D
171//!     * _Proof: By <1>5 and <1>10_
172
173use std::cmp::{Ordering, Reverse};
174use std::collections::VecDeque;
175use std::collections::binary_heap::{BinaryHeap, PeekMut};
176use std::iter::FromIterator;
177
178use differential_dataflow::difference::Semigroup;
179use differential_dataflow::lattice::Lattice;
180use differential_dataflow::{AsCollection, ExchangeData, VecCollection, consolidation};
181use mz_ore::Overflowing;
182use mz_ore::collections::CollectionExt;
183use timely::communication::{Pull, Push};
184use timely::dataflow::Scope;
185use timely::dataflow::channels::pact::Pipeline;
186use timely::dataflow::operators::CapabilitySet;
187use timely::dataflow::operators::capture::Event;
188use timely::dataflow::operators::generic::OutputBuilder;
189use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
190use timely::order::{PartialOrder, TotalOrder};
191use timely::progress::frontier::{AntichainRef, MutableAntichain};
192use timely::progress::{Antichain, Timestamp};
193
194/// Constructs an operator that reclocks a `source` collection varying with some time `FromTime`
195/// into the corresponding `reclocked` collection varying over some time `IntoTime` using the
196/// provided `remap` collection.
197///
198/// In order for the operator to read the `source` collection a `Pusher` is returned which can be
199/// used with timely's capture facilities to connect a collection from a foreign scope to this
200/// operator.
201pub fn reclock<G, D, FromTime, IntoTime, R>(
202    remap_collection: VecCollection<G, FromTime, Overflowing<i64>>,
203    as_of: Antichain<G::Timestamp>,
204) -> (
205    Box<dyn Push<Event<FromTime, Vec<(D, FromTime, R)>>>>,
206    VecCollection<G, D, R>,
207)
208where
209    G: Scope<Timestamp = IntoTime>,
210    D: ExchangeData,
211    FromTime: Timestamp,
212    IntoTime: Timestamp + Lattice + TotalOrder,
213    R: Semigroup + 'static,
214{
215    let mut scope = remap_collection.scope();
216    let mut builder = OperatorBuilder::new("Reclock".into(), scope.clone());
217    // Here we create a channel that can be used to send data from a foreign scope into this
218    // operator. The channel is associated with this operator's address so that it is activated
219    // every time events are available for consumption. This mechanism is similar to Timely's input
220    // handles where data can be introduced into a timely scope from an exogenous source.
221    let info = builder.operator_info();
222    let channel_id = scope.new_identifier();
223    let (pusher, mut events) =
224        scope.pipeline::<Event<FromTime, Vec<(D, FromTime, R)>>>(channel_id, info.address);
225
226    let mut remap_input = builder.new_input(remap_collection.inner, Pipeline);
227    let (output, reclocked) = builder.new_output();
228    let mut output = OutputBuilder::from(output);
229
230    builder.build(move |caps| {
231        let mut capset = CapabilitySet::from_elem(caps.into_element());
232        capset.downgrade(&as_of.borrow());
233
234        // Received remap updates at times `into_time` greater or equal to `remap_input`'s input
235        // frontier. As the input frontier advances, we drop elements out of this priority queue
236        // and mint new associations.
237        let mut pending_remap: BinaryHeap<Reverse<(IntoTime, FromTime, i64)>> = BinaryHeap::new();
238        // A trace of `remap_input` that accumulates correctly for all times that are beyond
239        // `remap_since` and not beyond `remap_upper`. The updates in `remap_trace` are maintained
240        // in time order. An actual DD trace could be used here at the expense of a more
241        // complicated API to traverse it. This is left for future work if the naive trace
242        // maintenance implemented in this operator becomes problematic.
243        let mut remap_upper = Antichain::from_elem(IntoTime::minimum());
244        let mut remap_since = as_of.clone();
245        let mut remap_trace = Vec::new();
246
247        // A stash of source updates for which we don't know the corresponding binding yet.
248        let mut deferred_source_updates: Vec<ChainBatch<_, _, _>> = Vec::new();
249        // The frontier of the `events` input
250        let mut source_frontier = MutableAntichain::new_bottom(FromTime::minimum());
251
252        let mut binding_buffer = Vec::new();
253
254        // Accumulation buffer for `remap_input` updates.
255        use timely::progress::ChangeBatch;
256        let mut remap_accum_buffer: ChangeBatch<(IntoTime, FromTime)> = ChangeBatch::new();
257
258        // The operator drains `remap_input` and organizes new bindings that are not beyond
259        // `remap_input`'s frontier into the time ordered `remap_trace`.
260        //
261        // All received data events can either be reclocked to a time included in the
262        // `remap_trace`, or deferred until new associations are minted. Each data event that
263        // happens at some `FromTime` is mapped to the first `IntoTime` whose associated antichain
264        // is not less or equal to the input `FromTime`.
265        //
266        // As progress events are received from the `events` input, we can advance our
267        // held capability to track the least `IntoTime` a newly received `FromTime` could possibly
268        // map to and also compact the maintained `remap_trace` to that time.
269        move |frontiers| {
270            let Some(cap) = capset.get(0).cloned() else {
271                return;
272            };
273            let mut output = output.activate();
274            let mut session = output.session(&cap);
275
276            // STEP 1. Accept new bindings into `pending_remap`.
277            // Advance all `into` times by `as_of`, and consolidate all updates at that frontier.
278            remap_input.for_each(|_, data| {
279                for (from, mut into, diff) in data.drain(..) {
280                    into.advance_by(as_of.borrow());
281                    remap_accum_buffer.update((into, from), diff.into_inner());
282                }
283            });
284            // Drain consolidated bindings into the `pending_remap` heap.
285            // Only do this once any of the `remap_input` frontier has passed `as_of`.
286            // For as long as the input frontier is less-equal `as_of`, we have no finalized times.
287            if !PartialOrder::less_equal(&frontiers[0].frontier(), &as_of.borrow()) {
288                for ((into, from), diff) in remap_accum_buffer.drain() {
289                    pending_remap.push(Reverse((into, from, diff)));
290                }
291            }
292
293            // STEP 2. Extract bindings not beyond `remap_frontier` and commit them into `remap_trace`.
294            let prev_remap_upper =
295                std::mem::replace(&mut remap_upper, frontiers[0].frontier().to_owned());
296            while let Some(update) = pending_remap.peek_mut() {
297                if !remap_upper.less_equal(&update.0.0) {
298                    let Reverse((into, from, diff)) = PeekMut::pop(update);
299                    remap_trace.push((from, into, diff));
300                } else {
301                    break;
302                }
303            }
304
305            // STEP 3. Receive new data updates
306            //         The `events` input describes arbitrary progress and data over `FromTime`,
307            //         which must be translated to `IntoTime`. Each `FromTime` can be found as the
308            //         first `IntoTime` associated with a `[FromTime]` that is not less or equal to
309            //         the input `FromTime`. Received events that are not yet associated to an
310            //         `IntoTime` are collected, and formed into a "chain batch": a sequence of
311            //         chains that results from sorting the updates by `FromTime`, and then
312            //         segmenting the sequence at elements where the partial order on `FromTime` is
313            //         violated.
314            let mut stash = Vec::new();
315            // Consolidate progress updates before applying them to `source_frontier`, to avoid quadratic
316            // behavior in overload scenarios.
317            let mut change_batch = ChangeBatch::<FromTime, 2>::default();
318            while let Some(event) = events.pull() {
319                match event {
320                    Event::Progress(changes) => {
321                        change_batch.extend(changes.drain(..));
322                    }
323                    Event::Messages(_, data) => stash.append(data),
324                }
325            }
326            source_frontier.update_iter(change_batch.drain());
327            stash.sort_unstable_by(|(_, t1, _): &(D, FromTime, R), (_, t2, _)| t1.cmp(t2));
328            let mut new_source_updates = ChainBatch::from_iter(stash);
329
330            // STEP 4: Reclock new and deferred updates
331            //         We are now ready to step through the remap bindings in time order and
332            //         perform the following actions:
333            //         4.1. Match `new_source_updates` against the entirety of bindings contained
334            //              in the trace.
335            //         4.2. Match `deferred_source_updates` against the bindings that were just
336            //              added in the trace.
337            //         4.3. Reclock `source_frontier` to calculate the new since frontier of the
338            //              remap trace.
339            //
340            //         The steps above only make sense to perform if there are any times for which
341            //         we can correctly accumulate the remap trace, which is what we check here.
342            if remap_since.iter().all(|t| !remap_upper.less_equal(t)) {
343                let mut cur_binding = MutableAntichain::new();
344
345                let mut remap = remap_trace.iter().peekable();
346                let mut reclocked_source_frontier = remap_upper.clone();
347
348                // We go over all the times for which we might need to output data at. These times
349                // are restrticted to the times at which there exists an update in `remap_trace`
350                // and the minimum timestamp for the case where `remap_trace` is completely empty,
351                // in which case the minimum timestamp maps to the empty `FromTime` frontier and
352                // therefore all data events map to that minimum timestamp.
353                //
354                // The approach taken here will take time proportional to the number of elements in
355                // `remap_trace`. During development an alternative approach was considered where
356                // the updates in `remap_trace` are instead fully materialized into an ordered list
357                // of antichains in which every data update can be binary searched into. The are
358                // two concerns with this alternative approach that led to preferring this one:
359                // 1. Materializing very wide antichains with small differences between them
360                //    needs memory proportial to the number of bindings times the width of the
361                //    antichain.
362                // 2. It locks in the requirement of a totally ordered target timestamp since only
363                //    in that case can one binary search a binding.
364                // The linear scan is expected to be fine due to the run-to-completion nature of
365                // the operator since its cost is amortized among the number of outstanding
366                // updates.
367                let mut min_time = IntoTime::minimum();
368                min_time.advance_by(remap_since.borrow());
369                let mut prev_cur_time = None;
370                let mut interesting_times = std::iter::once(&min_time)
371                    .chain(remap_trace.iter().map(|(_, t, _)| t))
372                    .filter(|&v| {
373                        let prev = prev_cur_time.replace(v);
374                        prev != prev_cur_time
375                    });
376                let mut frontier_reclocked = false;
377                while !(new_source_updates.is_empty()
378                    && deferred_source_updates.is_empty()
379                    && frontier_reclocked)
380                    && let Some(cur_time) = interesting_times.next()
381                {
382                    // 4.0. Load updates of `cur_time` from the trace into `cur_binding` to
383                    //      construct the `[FromTime]` frontier that `cur_time` maps to.
384                    while let Some((t_from, _, diff)) = remap.next_if(|(_, t, _)| t == cur_time) {
385                        binding_buffer.push((t_from.clone(), *diff));
386                    }
387                    cur_binding.update_iter(binding_buffer.drain(..));
388                    let cur_binding = cur_binding.frontier();
389
390                    // 4.1. Extract updates from `new_source_updates`
391                    for (data, _, diff) in new_source_updates.extract(cur_binding) {
392                        session.give((data, cur_time.clone(), diff));
393                    }
394
395                    // 4.2. Extract updates from `deferred_source_updates`.
396                    //      The deferred updates contain all updates that were not able to be
397                    //      reclocked with the bindings until `prev_remap_upper`. For this reason
398                    //      we only need to reconsider these updates when we start looking at new
399                    //      bindings, i.e bindings that are beyond `prev_remap_upper`.
400                    if prev_remap_upper.less_equal(cur_time) {
401                        deferred_source_updates.retain_mut(|batch| {
402                            for (data, _, diff) in batch.extract(cur_binding) {
403                                session.give((data, cur_time.clone(), diff));
404                            }
405                            // Retain non-empty batches
406                            !batch.is_empty()
407                        })
408                    }
409
410                    // 4.3. Reclock `source_frontier`
411                    //      If any FromTime in source frontier could possibly be reclocked to this
412                    //      binding then we must maintain our capability to emit data at that time
413                    //      and not compact past it. Since we iterate over this loop in time order
414                    //      and IntoTime is a total order we only need to perform this step once.
415                    //      Once a `cur_time` is inserted into `reclocked_source_frontier` no more
416                    //      changes can be made to the frontier by inserting times later in the
417                    //      loop.
418                    if !frontier_reclocked
419                        && source_frontier
420                            .frontier()
421                            .iter()
422                            .any(|t| !cur_binding.less_equal(t))
423                    {
424                        reclocked_source_frontier.insert(cur_time.clone());
425                        frontier_reclocked = true;
426                    }
427                }
428
429                // STEP 5. Downgrade capability and compact remap trace
430                capset.downgrade(&reclocked_source_frontier.borrow());
431                remap_since = reclocked_source_frontier;
432                for (_, t, _) in remap_trace.iter_mut() {
433                    t.advance_by(remap_since.borrow());
434                }
435                consolidation::consolidate_updates(&mut remap_trace);
436                remap_trace
437                    .sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| t1.cmp(t2));
438
439                // If using less than a quarter of the capacity, shrink the container. To avoid having
440                // to resize the container on a subsequent push, shrink to 2x the length, which is
441                // what push would grow it to.
442                if remap_trace.len() < remap_trace.capacity() / 4 {
443                    remap_trace.shrink_to(remap_trace.len() * 2);
444                }
445            }
446
447            // STEP 6. Tidy up deferred updates
448            //         Deferred updates are represented as a list of chain batches where each batch
449            //         contains two times the updates of the batch proceeding it. This organization
450            //         leads to a logarithmic number of batches with respect to the outstanding
451            //         number of updates.
452            deferred_source_updates.sort_unstable_by_key(|b| Reverse(b.len()));
453            if !new_source_updates.is_empty() {
454                deferred_source_updates.push(new_source_updates);
455            }
456            let dsu = &mut deferred_source_updates;
457            while dsu.len() > 1 && (dsu[dsu.len() - 1].len() >= dsu[dsu.len() - 2].len() / 2) {
458                let a = dsu.pop().unwrap();
459                let b = dsu.pop().unwrap();
460                dsu.push(a.merge_with(b));
461            }
462
463            // If using less than a quarter of the capacity, shrink the container. To avoid having
464            // to resize the container on a subsequent push, shrink to 2x the length, which is
465            // what push would grow it to.
466            if deferred_source_updates.len() < deferred_source_updates.capacity() / 4 {
467                deferred_source_updates.shrink_to(deferred_source_updates.len() * 2);
468            }
469        }
470    });
471
472    (Box::new(pusher), reclocked.as_collection())
473}
474
475/// A batch of differential updates that vary over some partial order. This type maintains the data
476/// as a set of chains that allows for efficient extraction of batches given a frontier.
477#[derive(Debug, PartialEq)]
478struct ChainBatch<D, T, R> {
479    /// A list of chains (sets of mutually comparable times) sorted by the partial order.
480    chains: Vec<VecDeque<(D, T, R)>>,
481}
482
483impl<D, T: Timestamp, R> ChainBatch<D, T, R> {
484    /// Extracts all updates with time not greater or equal to any time in `upper`.
485    fn extract<'a>(
486        &'a mut self,
487        upper: AntichainRef<'a, T>,
488    ) -> impl Iterator<Item = (D, T, R)> + 'a {
489        self.chains.retain(|chain| !chain.is_empty());
490        self.chains.iter_mut().flat_map(move |chain| {
491            // A chain is a sorted list of mutually comparable elements so we keep extracting
492            // elements that are not beyond upper.
493            std::iter::from_fn(move || {
494                let (_, into, _) = chain.front()?;
495                if !upper.less_equal(into) {
496                    chain.pop_front()
497                } else {
498                    None
499                }
500            })
501        })
502    }
503
504    fn merge_with(
505        mut self: ChainBatch<D, T, R>,
506        mut other: ChainBatch<D, T, R>,
507    ) -> ChainBatch<D, T, R>
508    where
509        D: ExchangeData,
510        T: Timestamp,
511        R: Semigroup,
512    {
513        let mut updates1 = self.chains.drain(..).flatten().peekable();
514        let mut updates2 = other.chains.drain(..).flatten().peekable();
515
516        let merged = std::iter::from_fn(|| {
517            match (updates1.peek(), updates2.peek()) {
518                (Some((d1, t1, _)), Some((d2, t2, _))) => {
519                    match (t1, d1).cmp(&(t2, d2)) {
520                        Ordering::Less => updates1.next(),
521                        Ordering::Greater => updates2.next(),
522                        // If the same (d, t) pair is found, consolidate their diffs
523                        Ordering::Equal => {
524                            let (d1, t1, mut r1) = updates1.next().unwrap();
525                            while let Some((_, _, r)) =
526                                updates1.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
527                            {
528                                r1.plus_equals(&r);
529                            }
530                            while let Some((_, _, r)) =
531                                updates2.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
532                            {
533                                r1.plus_equals(&r);
534                            }
535                            Some((d1, t1, r1))
536                        }
537                    }
538                }
539                (Some(_), None) => updates1.next(),
540                (None, Some(_)) => updates2.next(),
541                (None, None) => None,
542            }
543        });
544
545        ChainBatch::from_iter(merged.filter(|(_, _, r)| !r.is_zero()))
546    }
547
548    /// Returns the number of updates in the batch.
549    fn len(&self) -> usize {
550        self.chains.iter().map(|chain| chain.len()).sum()
551    }
552
553    /// Returns true if the batch contains no updates.
554    fn is_empty(&self) -> bool {
555        self.len() == 0
556    }
557}
558
559impl<D, T: Timestamp, R> FromIterator<(D, T, R)> for ChainBatch<D, T, R> {
560    /// Computes the chain decomposition of updates according to the partial order `T`.
561    fn from_iter<I: IntoIterator<Item = (D, T, R)>>(updates: I) -> Self {
562        let mut chains = vec![];
563        let mut updates = updates.into_iter();
564        if let Some((d, t, r)) = updates.next() {
565            let mut chain = VecDeque::new();
566            chain.push_back((d, t, r));
567            for (d, t, r) in updates {
568                let prev_t = &chain[chain.len() - 1].1;
569                if !PartialOrder::less_equal(prev_t, &t) {
570                    chains.push(chain);
571                    chain = VecDeque::new();
572                }
573                chain.push_back((d, t, r));
574            }
575            chains.push(chain);
576        }
577        Self { chains }
578    }
579}
580
581#[cfg(test)]
582mod test {
583    use std::sync::atomic::AtomicUsize;
584    use std::sync::mpsc::{Receiver, TryRecvError};
585
586    use differential_dataflow::consolidation;
587    use differential_dataflow::input::{Input, InputSession};
588    use serde::{Deserialize, Serialize};
589    use timely::communication::allocator::Thread;
590    use timely::dataflow::operators::capture::{Event, Extract};
591    use timely::dataflow::operators::vec::UnorderedInput;
592    use timely::dataflow::operators::vec::unordered_input::UnorderedHandle;
593    use timely::dataflow::operators::{ActivateCapability, Capture};
594    use timely::progress::PathSummary;
595    use timely::progress::timestamp::Refines;
596    use timely::worker::Worker;
597
598    use crate::capture::PusherCapture;
599    use crate::order::Partitioned;
600
601    use super::*;
602
603    type Diff = Overflowing<i64>;
604    type FromTime = Partitioned<u64, u64>;
605    type IntoTime = u64;
606    type BindingHandle<FromTime> = InputSession<IntoTime, FromTime, Diff>;
607    type DataHandle<D, FromTime> = (
608        UnorderedHandle<FromTime, (D, FromTime, Diff)>,
609        ActivateCapability<FromTime>,
610    );
611    type ReclockedStream<D> = Receiver<Event<IntoTime, Vec<(D, IntoTime, Diff)>>>;
612
613    /// A helper function that sets up a dataflow program to test the reclocking operator. Each
614    /// test provides a test logic closure which accepts four arguments:
615    ///
616    /// * A reference to the worker that allows the test to step the computation
617    /// * A [`BindingHandle`] that allows the test to manipulate the remap bindings
618    /// * A [`DataHandle`] that allows the test to submit the data to be reclocked
619    /// * A [`ReclockedStream`] that allows observing the result of the reclocking process
620    ///
621    /// Note that the `DataHandle` contains a capability that should be dropped or downgraded before
622    /// calling [`step`] to process data at the time.
623    fn harness<FromTime, D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
624    where
625        FromTime: Timestamp + Refines<()>,
626        D: ExchangeData,
627        F: FnOnce(
628                &mut Worker<Thread>,
629                BindingHandle<FromTime>,
630                DataHandle<D, FromTime>,
631                ReclockedStream<D>,
632            ) -> R
633            + Send
634            + Sync
635            + 'static,
636        R: Send + 'static,
637    {
638        timely::execute_directly(move |worker| {
639            let (bindings, data, data_cap, reclocked) = worker.dataflow::<(), _, _>(|scope| {
640                let (bindings, data_pusher, reclocked) =
641                    scope.scoped::<IntoTime, _, _>("IntoScope", move |scope| {
642                        let (binding_handle, binding_collection) = scope.new_collection();
643                        let (data_pusher, reclocked_collection) =
644                            reclock(binding_collection, as_of);
645                        let reclocked_capture = reclocked_collection.inner.capture();
646                        (binding_handle, data_pusher, reclocked_capture)
647                    });
648
649                let (data, data_cap) = scope.scoped::<FromTime, _, _>("FromScope", move |scope| {
650                    let ((handle, cap), data) = scope.new_unordered_input::<(D, FromTime, Diff)>();
651                    data.capture_into(PusherCapture(data_pusher));
652                    (handle, cap)
653                });
654
655                (bindings, data, data_cap, reclocked)
656            });
657
658            test_logic(worker, bindings, (data, data_cap), reclocked)
659        })
660    }
661
662    /// Steps the worker four times which is the required number of times for both data and
663    /// frontier updates to propagate across the two scopes and into the probing channels.
664    fn step(worker: &mut Worker<Thread>) {
665        for _ in 0..4 {
666            worker.step();
667        }
668    }
669
670    #[mz_ore::test]
671    fn basic_reclocking() {
672        let as_of = Antichain::from_elem(IntoTime::minimum());
673        harness::<FromTime, _, _, _>(
674            as_of,
675            |worker, bindings, (mut data, data_cap), reclocked| {
676                // Reclock everything at the minimum IntoTime
677                bindings.close();
678                data.activate()
679                    .session(&data_cap)
680                    .give(('a', Partitioned::minimum(), Diff::ONE));
681                drop(data_cap);
682                step(worker);
683                let extracted = reclocked.extract();
684                let expected = vec![(0, vec![('a', 0, Diff::ONE)])];
685                assert_eq!(extracted, expected);
686            },
687        )
688    }
689
690    /// Generates a `Partitioned<u64, u64>` Antichain where all the provided
691    /// partitions are at the specified offset and the gaps in between are filled with range
692    /// timestamps at offset zero.
693    fn partitioned_frontier<I>(items: I) -> Antichain<Partitioned<u64, u64>>
694    where
695        I: IntoIterator<Item = (u64, u64)>,
696    {
697        let mut frontier = Antichain::new();
698        let mut prev = 0;
699        for (pid, offset) in items {
700            if prev < pid {
701                frontier.insert(Partitioned::new_range(prev, pid - 1, 0));
702            }
703            frontier.insert(Partitioned::new_singleton(pid, offset));
704            prev = pid + 1
705        }
706        frontier.insert(Partitioned::new_range(prev, u64::MAX, 0));
707        frontier
708    }
709
710    #[mz_ore::test]
711    fn test_basic_usage() {
712        let as_of = Antichain::from_elem(IntoTime::minimum());
713        harness(
714            as_of,
715            |worker, mut bindings, (mut data, data_cap), reclocked| {
716                // Reclock offsets 1 and 3 to timestamp 1000
717                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
718                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
719                for time in partitioned_frontier([(0, 4)]) {
720                    bindings.update_at(time, 1000, Diff::ONE);
721                }
722                bindings.advance_to(1001);
723                bindings.flush();
724                data.activate().session(&data_cap).give_iterator(
725                    vec![
726                        (1, Partitioned::new_singleton(0, 1), Diff::ONE),
727                        (1, Partitioned::new_singleton(0, 1), Diff::ONE),
728                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
729                    ]
730                    .into_iter(),
731                );
732
733                step(worker);
734                assert_eq!(
735                    reclocked.try_recv(),
736                    Ok(Event::Messages(
737                        0u64,
738                        vec![
739                            (1, 1000, Diff::ONE),
740                            (1, 1000, Diff::ONE),
741                            (3, 1000, Diff::ONE)
742                        ]
743                    ))
744                );
745                assert_eq!(
746                    reclocked.try_recv(),
747                    Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
748                );
749
750                // Reclock more messages for offsets 3 to the same timestamp
751                data.activate().session(&data_cap).give_iterator(
752                    vec![
753                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
754                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
755                    ]
756                    .into_iter(),
757                );
758                step(worker);
759                assert_eq!(
760                    reclocked.try_recv(),
761                    Ok(Event::Messages(
762                        1000u64,
763                        vec![(3, 1000, Diff::ONE), (3, 1000, Diff::ONE)]
764                    ))
765                );
766
767                // Drop the capability which should advance the reclocked frontier to 1001.
768                drop(data_cap);
769                step(worker);
770                assert_eq!(
771                    reclocked.try_recv(),
772                    Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
773                );
774            },
775        );
776    }
777
778    #[mz_ore::test]
779    fn test_reclock_frontier() {
780        let as_of = Antichain::from_elem(IntoTime::minimum());
781        harness::<_, (), _, _>(
782            as_of,
783            |worker, mut bindings, (_data, data_cap), reclocked| {
784                // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
785                // frontier.
786                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
787                bindings.advance_to(1);
788                bindings.flush();
789                step(worker);
790                assert_eq!(
791                    reclocked.try_recv(),
792                    Ok(Event::Progress(vec![(0, -1), (1, 1)]))
793                );
794
795                // Mint a couple of bindings for multiple partitions
796                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
797                for time in partitioned_frontier([(1, 10)]) {
798                    bindings.update_at(time.clone(), 1000, Diff::ONE);
799                    bindings.update_at(time, 2000, Diff::MINUS_ONE);
800                }
801                for time in partitioned_frontier([(1, 10), (2, 10)]) {
802                    bindings.update_at(time, 2000, Diff::ONE);
803                }
804                bindings.advance_to(2001);
805                bindings.flush();
806
807                // The initial frontier should now map to the minimum between the two partitions
808                step(worker);
809                step(worker);
810                assert_eq!(
811                    reclocked.try_recv(),
812                    Ok(Event::Progress(vec![(1, -1), (1000, 1)]))
813                );
814
815                // Downgrade data frontier such that only one of the partitions is advanced
816                let mut part1_cap = data_cap.delayed(&Partitioned::new_singleton(1, 9));
817                let mut part2_cap = data_cap.delayed(&Partitioned::new_singleton(2, 0));
818                let _rest_cap = data_cap.delayed(&Partitioned::new_range(3, u64::MAX, 0));
819                drop(data_cap);
820                step(worker);
821                assert_eq!(reclocked.try_recv(), Err(TryRecvError::Empty));
822
823                // Downgrade the data frontier past the first binding
824                part1_cap.downgrade(&Partitioned::new_singleton(1, 10));
825                step(worker);
826                assert_eq!(
827                    reclocked.try_recv(),
828                    Ok(Event::Progress(vec![(1000, -1), (2000, 1)]))
829                );
830
831                // Downgrade the data frontier past the second binding
832                part2_cap.downgrade(&Partitioned::new_singleton(2, 10));
833                step(worker);
834                assert_eq!(
835                    reclocked.try_recv(),
836                    Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
837                );
838
839                // Advance the binding frontier and confirm that we get to the next timestamp
840                bindings.advance_to(3001);
841                bindings.flush();
842                step(worker);
843                assert_eq!(
844                    reclocked.try_recv(),
845                    Ok(Event::Progress(vec![(2001, -1), (3001, 1)]))
846                );
847            },
848        );
849    }
850
851    #[mz_ore::test]
852    fn test_reclock() {
853        let as_of = Antichain::from_elem(IntoTime::minimum());
854        harness(
855            as_of,
856            |worker, mut bindings, (mut data, data_cap), reclocked| {
857                // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
858                // frontier.
859                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
860
861                // Setup more precise capabilities for the rest of the test
862                let mut part0_cap = data_cap.delayed(&Partitioned::new_singleton(0, 0));
863                let rest_cap = data_cap.delayed(&Partitioned::new_range(1, u64::MAX, 0));
864                drop(data_cap);
865
866                // Reclock offsets 1 and 2 to timestamp 1000
867                data.activate().session(&part0_cap).give_iterator(
868                    vec![
869                        (1, Partitioned::new_singleton(0, 1), Diff::ONE),
870                        (2, Partitioned::new_singleton(0, 2), Diff::ONE),
871                    ]
872                    .into_iter(),
873                );
874
875                part0_cap.downgrade(&Partitioned::new_singleton(0, 3));
876                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
877                bindings.update_at(part0_cap.time().clone(), 1000, Diff::ONE);
878                bindings.update_at(rest_cap.time().clone(), 1000, Diff::ONE);
879                bindings.advance_to(1001);
880                bindings.flush();
881                step(worker);
882                assert_eq!(
883                    reclocked.try_recv(),
884                    Ok(Event::Messages(
885                        0,
886                        vec![(1, 1000, Diff::ONE), (2, 1000, Diff::ONE)]
887                    ))
888                );
889                assert_eq!(
890                    reclocked.try_recv(),
891                    Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
892                );
893                assert_eq!(
894                    reclocked.try_recv(),
895                    Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
896                );
897
898                // Reclock offsets 3 and 4 to timestamp 2000
899                data.activate().session(&part0_cap).give_iterator(
900                    vec![
901                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
902                        (3, Partitioned::new_singleton(0, 3), Diff::ONE),
903                        (4, Partitioned::new_singleton(0, 4), Diff::ONE),
904                    ]
905                    .into_iter(),
906                );
907                bindings.update_at(part0_cap.time().clone(), 2000, Diff::MINUS_ONE);
908                part0_cap.downgrade(&Partitioned::new_singleton(0, 5));
909                bindings.update_at(part0_cap.time().clone(), 2000, Diff::ONE);
910                bindings.advance_to(2001);
911                bindings.flush();
912                step(worker);
913                assert_eq!(
914                    reclocked.try_recv(),
915                    Ok(Event::Messages(
916                        1001,
917                        vec![
918                            (3, 2000, Diff::ONE),
919                            (3, 2000, Diff::ONE),
920                            (4, 2000, Diff::ONE)
921                        ]
922                    ))
923                );
924                assert_eq!(
925                    reclocked.try_recv(),
926                    Ok(Event::Progress(vec![(1001, -1), (2000, 1)]))
927                );
928                assert_eq!(
929                    reclocked.try_recv(),
930                    Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
931                );
932            },
933        );
934    }
935
936    #[mz_ore::test]
937    fn test_reclock_gh16318() {
938        let as_of = Antichain::from_elem(IntoTime::minimum());
939        harness(
940            as_of,
941            |worker, mut bindings, (mut data, data_cap), reclocked| {
942                // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
943                // frontier.
944                bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
945                // First mint bindings for 0 at timestamp 1000
946                bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
947                for time in partitioned_frontier([(0, 50)]) {
948                    bindings.update_at(time, 1000, Diff::ONE);
949                }
950                // Then only for 1 at timestamp 2000
951                for time in partitioned_frontier([(0, 50)]) {
952                    bindings.update_at(time, 2000, Diff::MINUS_ONE);
953                }
954                for time in partitioned_frontier([(0, 50), (1, 50)]) {
955                    bindings.update_at(time, 2000, Diff::ONE);
956                }
957                // Then again only for 0 at timestamp 3000
958                for time in partitioned_frontier([(0, 50), (1, 50)]) {
959                    bindings.update_at(time, 3000, Diff::MINUS_ONE);
960                }
961                for time in partitioned_frontier([(0, 100), (1, 50)]) {
962                    bindings.update_at(time, 3000, Diff::ONE);
963                }
964                bindings.advance_to(3001);
965                bindings.flush();
966
967                // Reclockng (0, 50) must ignore the updates on the FromTime frontier that happened at
968                // timestamp 2000 since those are completely unrelated
969                data.activate().session(&data_cap).give((
970                    50,
971                    Partitioned::new_singleton(0, 50),
972                    Diff::ONE,
973                ));
974                drop(data_cap);
975                step(worker);
976                assert_eq!(
977                    reclocked.try_recv(),
978                    Ok(Event::Messages(0, vec![(50, 3000, Diff::ONE),]))
979                );
980                assert_eq!(
981                    reclocked.try_recv(),
982                    Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
983                );
984                assert_eq!(
985                    reclocked.try_recv(),
986                    Ok(Event::Progress(vec![(1000, -1), (3001, 1)]))
987                );
988            },
989        );
990    }
991
992    /// Test that compact(reclock(remap, source)) == reclock(compact(remap), source)
993    #[mz_ore::test]
994    fn test_compaction() {
995        let mut remap = vec![];
996        remap.push((Partitioned::minimum(), 0, Diff::ONE));
997        // Reclock offsets 1 and 2 to timestamp 1000
998        remap.push((Partitioned::minimum(), 1000, Diff::MINUS_ONE));
999        for time in partitioned_frontier([(0, 3)]) {
1000            remap.push((time, 1000, Diff::ONE));
1001        }
1002        // Reclock offsets 3 and 4 to timestamp 2000
1003        for time in partitioned_frontier([(0, 3)]) {
1004            remap.push((time, 2000, Diff::MINUS_ONE));
1005        }
1006        for time in partitioned_frontier([(0, 5)]) {
1007            remap.push((time, 2000, Diff::ONE));
1008        }
1009
1010        let source_updates = vec![
1011            (1, Partitioned::new_singleton(0, 1), Diff::ONE),
1012            (2, Partitioned::new_singleton(0, 2), Diff::ONE),
1013            (3, Partitioned::new_singleton(0, 3), Diff::ONE),
1014            (4, Partitioned::new_singleton(0, 4), Diff::ONE),
1015        ];
1016
1017        let since = Antichain::from_elem(1500);
1018
1019        // Compute reclock(remap, source)
1020        let as_of = Antichain::from_elem(IntoTime::minimum());
1021        let remap1 = remap.clone();
1022        let source_updates1 = source_updates.clone();
1023        let reclock_remap = harness(
1024            as_of,
1025            move |worker, mut bindings, (mut data, data_cap), reclocked| {
1026                for (from_ts, into_ts, diff) in remap1 {
1027                    bindings.update_at(from_ts, into_ts, diff);
1028                }
1029                bindings.close();
1030                data.activate()
1031                    .session(&data_cap)
1032                    .give_iterator(source_updates1.iter().cloned());
1033                drop(data_cap);
1034                step(worker);
1035                reclocked.extract()
1036            },
1037        );
1038        // Compute compact(reclock(remap, source))
1039        let mut compact_reclock_remap = reclock_remap;
1040        for (t, updates) in compact_reclock_remap.iter_mut() {
1041            t.advance_by(since.borrow());
1042            for (_, t, _) in updates.iter_mut() {
1043                t.advance_by(since.borrow());
1044            }
1045        }
1046
1047        // Compute compact(remap)
1048        let mut compact_remap = remap;
1049        for (_, t, _) in compact_remap.iter_mut() {
1050            t.advance_by(since.borrow());
1051        }
1052        consolidation::consolidate_updates(&mut compact_remap);
1053        // Compute reclock(compact(remap), source)
1054        let reclock_compact_remap = harness(
1055            since,
1056            move |worker, mut bindings, (mut data, data_cap), reclocked| {
1057                for (from_ts, into_ts, diff) in compact_remap {
1058                    bindings.update_at(from_ts, into_ts, diff);
1059                }
1060                bindings.close();
1061                data.activate()
1062                    .session(&data_cap)
1063                    .give_iterator(source_updates.iter().cloned());
1064                drop(data_cap);
1065                step(worker);
1066                reclocked.extract()
1067            },
1068        );
1069
1070        let expected = vec![(
1071            1500,
1072            vec![
1073                (1, 1500, Diff::ONE),
1074                (2, 1500, Diff::ONE),
1075                (3, 2000, Diff::ONE),
1076                (4, 2000, Diff::ONE),
1077            ],
1078        )];
1079        assert_eq!(expected, reclock_compact_remap);
1080        assert_eq!(expected, compact_reclock_remap);
1081    }
1082
1083    #[mz_ore::test]
1084    fn test_chainbatch_merge() {
1085        let a = ChainBatch::from_iter([('a', 0, 1)]);
1086        let b = ChainBatch::from_iter([('a', 0, -1), ('a', 1, 1)]);
1087        assert_eq!(a.merge_with(b), ChainBatch::from_iter([('a', 1, 1)]));
1088    }
1089
1090    #[mz_ore::test]
1091    #[cfg_attr(miri, ignore)] // too slow
1092    fn test_binding_consolidation() {
1093        use std::sync::atomic::Ordering;
1094
1095        #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1096        struct Time(u64);
1097
1098        // A counter of the number of active Time instances
1099        static INSTANCES: AtomicUsize = AtomicUsize::new(0);
1100
1101        impl Time {
1102            fn new(time: u64) -> Self {
1103                INSTANCES.fetch_add(1, Ordering::Relaxed);
1104                Self(time)
1105            }
1106        }
1107
1108        impl Clone for Time {
1109            fn clone(&self) -> Self {
1110                INSTANCES.fetch_add(1, Ordering::Relaxed);
1111                Self(self.0)
1112            }
1113        }
1114
1115        impl Drop for Time {
1116            fn drop(&mut self) {
1117                INSTANCES.fetch_sub(1, Ordering::Relaxed);
1118            }
1119        }
1120
1121        impl Timestamp for Time {
1122            type Summary = ();
1123
1124            fn minimum() -> Self {
1125                Time::new(0)
1126            }
1127        }
1128
1129        impl PathSummary<Time> for () {
1130            fn results_in(&self, src: &Time) -> Option<Time> {
1131                Some(src.clone())
1132            }
1133
1134            fn followed_by(&self, _other: &()) -> Option<Self> {
1135                Some(())
1136            }
1137        }
1138
1139        impl Refines<()> for Time {
1140            fn to_inner(_: ()) -> Self {
1141                Self::minimum()
1142            }
1143            fn to_outer(self) -> () {}
1144            fn summarize(_path: ()) {}
1145        }
1146
1147        impl PartialOrder for Time {
1148            fn less_equal(&self, other: &Self) -> bool {
1149                self.0.less_equal(&other.0)
1150            }
1151        }
1152
1153        let as_of = 1000;
1154
1155        // Test that supplying a single big batch of unconsolidated bindings gets
1156        // consolidated after a single worker step.
1157        harness::<Time, u64, _, _>(
1158            Antichain::from_elem(as_of),
1159            move |worker, mut bindings, _, _| {
1160                step(worker);
1161                let instances_before = INSTANCES.load(Ordering::Relaxed);
1162                for ts in 0..as_of {
1163                    if ts > 0 {
1164                        bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1165                    }
1166                    bindings.update_at(Time::new(ts), ts, Diff::ONE);
1167                }
1168                bindings.advance_to(as_of);
1169                bindings.flush();
1170                step(worker);
1171                let instances_after = INSTANCES.load(Ordering::Relaxed);
1172                // The extra instances live in a ChangeBatch which considers compaction when more
1173                // than 32 elements are inside.
1174                assert!(instances_after - instances_before < 32);
1175            },
1176        );
1177
1178        // Test that a slow feed of uncompacted bindings over multiple steps never leads to an
1179        // excessive number of bindings held in memory.
1180        harness::<Time, u64, _, _>(
1181            Antichain::from_elem(as_of),
1182            move |worker, mut bindings, _, _| {
1183                step(worker);
1184                let instances_before = INSTANCES.load(Ordering::Relaxed);
1185                for ts in 0..as_of {
1186                    if ts > 0 {
1187                        bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1188                    }
1189                    bindings.update_at(Time::new(ts), ts, Diff::ONE);
1190                    bindings.advance_to(ts + 1);
1191                    bindings.flush();
1192                    step(worker);
1193                    let instances_now = INSTANCES.load(Ordering::Relaxed);
1194                    // The extra instances live in a ChangeBatch which considers compaction when
1195                    // more than 32 elements are inside.
1196                    assert!(instances_now - instances_before < 32);
1197                }
1198            },
1199        );
1200    }
1201
1202    #[cfg(feature = "count-allocations")]
1203    #[mz_ore::test]
1204    #[cfg_attr(miri, ignore)] // too slow
1205    fn test_shrinking() {
1206        let as_of = 1000_u64;
1207
1208        // This workflow accumulates updates in remap_trace, advances the source frontier,
1209        // and validates that memory was reclaimed.  To avoid errant test failures due to
1210        // optimizations, this only validates that memory is reclaimed, not how much.
1211        harness::<FromTime, u64, _, _>(
1212            Antichain::from_elem(0),
1213            move |worker, mut bindings, (_data, mut data_cap), _| {
1214                let info1 = allocation_counter::measure(|| {
1215                    step(worker);
1216                    for ts in 0..as_of {
1217                        if ts > 0 {
1218                            bindings.update_at(
1219                                Partitioned::new_singleton(0, ts - 1),
1220                                ts,
1221                                Diff::MINUS_ONE,
1222                            );
1223                        }
1224                        bindings.update_at(Partitioned::new_singleton(0, ts), ts, Diff::ONE);
1225                        bindings.advance_to(ts + 1);
1226                        bindings.flush();
1227                        step(worker);
1228                    }
1229                });
1230                println!("info = {info1:?}");
1231
1232                let info2 = allocation_counter::measure(|| {
1233                    data_cap.downgrade(&Partitioned::new_singleton(0, as_of));
1234                    step(worker);
1235                });
1236                println!("info = {info2:?}");
1237                assert!(info2.bytes_current < 0);
1238            },
1239        );
1240    }
1241}