mz_timely_util/reclock.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! ## Notation
17//!
18//! Collections are represented with capital letters (T, S, R), collection traces as bold letters
19//! (𝐓, 𝐒, 𝐑), and difference traces as δ𝐓.
20//!
21//! Indexing a collection trace 𝐓 to obtain its version at `t` is written as 𝐓(t). Indexing a
22//! collection to obtain the multiplicity of a record `x` is written as T\[x\]. These can be combined
23//! to obtain the multiplicity of a record `x` at some version `t` as 𝐓(t)\[x\].
24//!
25//! ## Overview
26//!
27//! Reclocking transforms a source collection `S` that evolves with some timestamp `FromTime` into
28//! a collection `T` that evolves with some other timestamp `IntoTime`. The reclocked collection T
29//! contains all updates `u ∈ S` that are not beyond some `FromTime` frontier R(t). The collection
30//! `R` is called the remap collection.
31//!
32//! More formally, for some arbitrary time `t` of `IntoTime` and some arbitrary record `x`, the
33//! reclocked collection `T(t)[x]` is defined to be the `sum{δ𝐒(s)[x]: !(𝐑(t) ⪯ s)}`. Since this
34//! holds for any record we can write the definition of Reclock(𝐒, 𝐑) as:
35//!
36//! > Reclock(𝐒, 𝐑) ≜ 𝐓: ∀ t ∈ IntoTime : 𝐓(t) = sum{δ𝐒(s): !(𝐑(t) ⪯ s)}
37//!
38//! In order for the reclocked collection `T` to have a sensible definition of progress we require
39//! that `t1 ≤ t2 ⇒ 𝐑(t1) ⪯ 𝐑(t2)` where the first `≤` is the partial order of `IntoTime` and the
40//! second one the partial order of `FromTime` antichains.
41//!
42//! ## Total order simplification
43//!
44//! In order to simplify the implementation we will require that `IntoTime` is a total order. This
45//! limitation can be lifted in the future but further elaboration on the mechanics of reclocking
46//! is required to ensure a correct implementation.
47//!
48//! ## The difference trace
49//!
50//! By the definition of difference traces we have:
51//!
52//! ```text
53//! δ𝐓(t) = T(t) - sum{δ𝐓(s): s < t}
54//! ```
55//!
56//! Due to the total order assumption we only need to consider two cases.
57//!
58//! **Case 1:** `t` is the minimum timestamp
59//!
60//! In this case `sum{δ𝐓(s): s < t}` is the empty set and so we obtain:
61//!
62//! ```text
63//! δ𝐓(min) = T(min) = sum{δ𝐒(s): !(𝐑(min) ≤ s}
64//! ```
65//!
66//! **Case 2:** `t` is a timestamp with a predecessor `prev`
67//!
68//! In this case `sum{δ𝐓(s): s < t}` is equal to `T(prev)` because:
69//!
70//! ```text
71//! sum{δ𝐓(s): s < t} = sum{δ𝐓(s): s ≤ prev} + sum{δ𝐓(s): prev < s < t}
72//! = T(prev) + ∅
73//! = T(prev)
74//! ```
75//!
76//! And therefore the difference trace of T is:
77//!
78//! ```text
79//! δ𝐓(t) = 𝐓(t) - 𝐓(prev)
80//! = sum{δ𝐒(s): !(𝐑(t) ⪯ s)} - sum{δ𝐒(s): !(𝐑(prev) ⪯ s)}
81//! = sum{δ𝐒(s): (𝐑(prev) ⪯ s) ∧ !(𝐑(t) ⪯ s)}
82//! ```
83//!
84//! ## Unique mapping property
85//!
86//! Given the definition above we can derive the fact that for any source difference δ𝐒(s) there is
87//! at most one target timestamp t that it must be reclocked to. This property can be exploited by
88//! the implementation of the operator as it can safely discard source updates once a matching
89//! δT(t) has been found, making it "stateless" with respect to the source trace. A formal proof of
90//! this property is [provided below](#unique-mapping-property-proof).
91//!
92//! ## Operational description
93//!
94//! The operator follows a run-to-completion model where on each scheduling it completes all
95//! outstanding work that can be completed.
96//!
97//! ### Unique mapping property proof
98//!
99//! This section contains the formal proof the unique mapping property. The proof follows the
100//! structure proof notation created by Leslie Lamport. Readers unfamiliar with structured proofs
101//! can read about them here <https://lamport.azurewebsites.net/pubs/proof.pdf>.
102//!
103//! #### Statement
104//!
105//! AtMostOne(X, φ(x)) ≜ ∀ x1, x2 ∈ X : φ(x1) ∧ φ(x2) ⇒ x1 = x2
106//!
107//! * **THEOREM** UniqueMapping ≜
108//! * **ASSUME**
109//! * **NEW** (FromTime, ⪯) ∈ PartiallyOrderedTimestamps
110//! * **NEW** (IntoTime, ≤) ∈ TotallyOrderedTimestamps
111//! * **NEW** 𝐒 ∈ SetOfCollectionTraces(FromTime)
112//! * **NEW** 𝐑 ∈ SetOfCollectionTraces(IntoTime)
113//! * ∀ t ∈ IntoTime: 𝐑(t) ∈ SetOfAntichains(FromTime)
114//! * ∀ t1, t1 ∈ IntoTime: t1 ≤ t2 ⇒ 𝐑(t1) ⪯ 𝐑(t2)
115//! * **NEW** 𝐓 = Reclock(𝐒, 𝐑)
116//! * **PROVE** ∀ s ∈ FromTime : AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
117//!
118//! #### Proof
119//!
120//! 1. **SUFFICES ASSUME** ∃ s ∈ FromTime: ¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
121//! * **PROVE FALSE**
122//! * _By proof by contradiction._
123//! 2. **PICK** s ∈ FromTime : ¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
124//! * _Proof: Such time exists by <1>1._
125//! 3. ∃ t1, t2 ∈ IntoTime : t1 ≠ t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
126//! 1. ¬(∀ x1, x2 ∈ X : (δ𝐒(s) ∈ δ𝐓(x1)) ∧ (δ𝐒(s) ∈ δ𝐓(x2)) ⇒ x1 = x2)
127//! * _Proof: By <1>2 and definition of AtMostOne._
128//! 2. Q.E.D
129//! * _Proof: By <2>1, quantifier negation rules, and theorem of propositional logic ¬(P ⇒ Q) ≡ P ∧ ¬Q._
130//! 4. **PICK** t1, t2 ∈ IntoTime : t1 < t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
131//! * _Proof: By <1>3. Assume t1 < t2 without loss of generality._
132//! 5. ¬(𝐑(t1) ⪯ s)
133//! 1. **CASE** t1 = min(IntoTime)
134//! 1. δ𝐓(t1) = sum{δ𝐒(s): !(𝐑(t1)) ⪯ s}
135//! * _Proof: By definition of δ𝐓(min)._
136//! 2. δ𝐒(s) ∈ δ𝐓(t1)
137//! * _Proof: By <1>4._
138//! 3. Q.E.D
139//! * _Proof: By <3>1 and <3>2._
140//! 2. **CASE** t1 > min(IntoTime)
141//! 1. **PICK** t1_prev = Predecessor(t1)
142//! * _Proof: Predecessor exists because the set {t: t < t1} is non-empty since it must contain at least min(IntoTime)._
143//! 2. δ𝐓(t1) = sum{δ𝐒(s): (𝐑(t1_prev) ⪯ s) ∧ !(𝐑(t1) ⪯ s)}
144//! * _Proof: By definition of δ𝐓(t)._
145//! 3. δ𝐒(s) ∈ δ𝐓(t1)
146//! * _Proof: By <1>4._
147//! 3. Q.E.D
148//! * _Proof: By <3>2 and <3>3._
149//! 3. Q.E.D
150//! * _Proof: From cases <2>1 and <2>2 which are exhaustive_
151//! 6. **PICK** t2_prev ∈ IntoTime : t2_prev = Predecessor(t2)
152//! * _Proof: Predecessor exists because by <1>4 the set {t: t < t2} is non empty since it must contain at least t1._
153//! 7. t1 ≤ t2_prev
154//! * _Proof: t1 ∈ {t: t < t2} and t2_prev is the maximum element of the set._
155//! 8. 𝐑(t2) ⪯ s
156//! 1. t2 > min(IntoTime)
157//! * _Proof: By <1>5._
158//! 2. **PICK** t2_prev = Predecessor(t2)
159//! * _Proof: Predecessor exists because the set {t: t < t2} is non-empty since it must contain at least min(IntoTime)._
160//! 3. δ𝐓(t) = sum{δ𝐒(s): (𝐑(t2_prev) ⪯ s) ∧ !(𝐑(t) ⪯ s)}
161//! * _Proof: By definition of δ𝐓(t)_
162//! 4. δ𝐒(s) ∈ δ𝐓(t1)
163//! * _Proof: By <1>4._
164//! 5. Q.E.D
165//! * _Proof: By <2>3 and <2>4._
166//! 9. 𝐑(t1) ⪯ 𝐑(t2_prev)
167//! * _Proof: By <1>.7 and hypothesis on R_
168//! 10. 𝐑(t1) ⪯ s
169//! * _Proof: By <1>8 and <1>9._
170//! 11. Q.E.D
171//! * _Proof: By <1>5 and <1>10_
172
173use std::cmp::{Ordering, Reverse};
174use std::collections::VecDeque;
175use std::collections::binary_heap::{BinaryHeap, PeekMut};
176use std::iter::FromIterator;
177
178use differential_dataflow::difference::Semigroup;
179use differential_dataflow::lattice::Lattice;
180use differential_dataflow::{AsCollection, Collection, ExchangeData, consolidation};
181use mz_ore::Overflowing;
182use mz_ore::collections::CollectionExt;
183use timely::communication::{Pull, Push};
184use timely::dataflow::Scope;
185use timely::dataflow::channels::pact::Pipeline;
186use timely::dataflow::operators::CapabilitySet;
187use timely::dataflow::operators::capture::Event;
188use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
189use timely::order::{PartialOrder, TotalOrder};
190use timely::progress::frontier::{AntichainRef, MutableAntichain};
191use timely::progress::{Antichain, Timestamp};
192
193/// Constructs an operator that reclocks a `source` collection varying with some time `FromTime`
194/// into the corresponding `reclocked` collection varying over some time `IntoTime` using the
195/// provided `remap` collection.
196///
197/// In order for the operator to read the `source` collection a `Pusher` is returned which can be
198/// used with timely's capture facilities to connect a collection from a foreign scope to this
199/// operator.
200pub fn reclock<G, D, FromTime, IntoTime, R>(
201 remap_collection: &Collection<G, FromTime, Overflowing<i64>>,
202 as_of: Antichain<G::Timestamp>,
203) -> (
204 Box<dyn Push<Event<FromTime, Vec<(D, FromTime, R)>>>>,
205 Collection<G, D, R>,
206)
207where
208 G: Scope<Timestamp = IntoTime>,
209 D: ExchangeData,
210 FromTime: Timestamp,
211 IntoTime: Timestamp + Lattice + TotalOrder,
212 R: Semigroup + 'static,
213{
214 let mut scope = remap_collection.scope();
215 let mut builder = OperatorBuilder::new("Reclock".into(), scope.clone());
216 // Here we create a channel that can be used to send data from a foreign scope into this
217 // operator. The channel is associated with this operator's address so that it is activated
218 // every time events are available for consumption. This mechanism is similar to Timely's input
219 // handles where data can be introduced into a timely scope from an exogenous source.
220 let info = builder.operator_info();
221 let channel_id = scope.new_identifier();
222 let (pusher, mut events) =
223 scope.pipeline::<Event<FromTime, Vec<(D, FromTime, R)>>>(channel_id, info.address);
224
225 let mut remap_input = builder.new_input(&remap_collection.inner, Pipeline);
226 let (mut output, reclocked) = builder.new_output();
227
228 builder.build(move |caps| {
229 let mut capset = CapabilitySet::from_elem(caps.into_element());
230 capset.downgrade(&as_of.borrow());
231
232 // Received remap updates at times `into_time` greater or equal to `remap_input`'s input
233 // frontier. As the input frontier advances, we drop elements out of this priority queue
234 // and mint new associations.
235 let mut pending_remap: BinaryHeap<Reverse<(IntoTime, FromTime, i64)>> = BinaryHeap::new();
236 // A trace of `remap_input` that accumulates correctly for all times that are beyond
237 // `remap_since` and not beyond `remap_upper`. The updates in `remap_trace` are maintained
238 // in time order. An actual DD trace could be used here at the expense of a more
239 // complicated API to traverse it. This is left for future work if the naive trace
240 // maintenance implemented in this operator becomes problematic.
241 let mut remap_upper = Antichain::from_elem(IntoTime::minimum());
242 let mut remap_since = as_of.clone();
243 let mut remap_trace = Vec::new();
244
245 // A stash of source updates for which we don't know the corresponding binding yet.
246 let mut deferred_source_updates: Vec<ChainBatch<_, _, _>> = Vec::new();
247 // The frontier of the `events` input
248 let mut source_frontier = MutableAntichain::new_bottom(FromTime::minimum());
249
250 let mut binding_buffer = Vec::new();
251 let mut interesting_times = Vec::new();
252
253 // Accumulation buffer for `remap_input` updates.
254 use timely::progress::ChangeBatch;
255 let mut remap_accum_buffer: ChangeBatch<(IntoTime, FromTime)> = ChangeBatch::new();
256
257 // The operator drains `remap_input` and organizes new bindings that are not beyond
258 // `remap_input`'s frontier into the time ordered `remap_trace`.
259 //
260 // All received data events can either be reclocked to a time included in the
261 // `remap_trace`, or deferred until new associations are minted. Each data event that
262 // happens at some `FromTime` is mapped to the first `IntoTime` whose associated antichain
263 // is not less or equal to the input `FromTime`.
264 //
265 // As progress events are received from the `events` input, we can advance our
266 // held capability to track the least `IntoTime` a newly received `FromTime` could possibly
267 // map to and also compact the maintained `remap_trace` to that time.
268 move |frontiers| {
269 let Some(cap) = capset.get(0) else {
270 return;
271 };
272 let mut output = output.activate();
273 let mut session = output.session(cap);
274
275 // STEP 1. Accept new bindings into `pending_remap`.
276 // Advance all `into` times by `as_of`, and consolidate all updates at that frontier.
277 while let Some((_, data)) = remap_input.next() {
278 for (from, mut into, diff) in data.drain(..) {
279 into.advance_by(as_of.borrow());
280 remap_accum_buffer.update((into, from), diff.into_inner());
281 }
282 }
283 // Drain consolidated bindings into the `pending_remap` heap.
284 // Only do this once any of the `remap_input` frontier has passed `as_of`.
285 // For as long as the input frontier is less-equal `as_of`, we have no finalized times.
286 if !PartialOrder::less_equal(&frontiers[0].frontier(), &as_of.borrow()) {
287 for ((into, from), diff) in remap_accum_buffer.drain() {
288 pending_remap.push(Reverse((into, from, diff)));
289 }
290 }
291
292 // STEP 2. Extract bindings not beyond `remap_frontier` and commit them into `remap_trace`.
293 let prev_remap_upper =
294 std::mem::replace(&mut remap_upper, frontiers[0].frontier().to_owned());
295 while let Some(update) = pending_remap.peek_mut() {
296 if !remap_upper.less_equal(&update.0.0) {
297 let Reverse((into, from, diff)) = PeekMut::pop(update);
298 remap_trace.push((from, into, diff));
299 } else {
300 break;
301 }
302 }
303
304 // STEP 3. Receive new data updates
305 // The `events` input describes arbitrary progress and data over `FromTime`,
306 // which must be translated to `IntoTime`. Each `FromTime` can be found as the
307 // first `IntoTime` associated with a `[FromTime]` that is not less or equal to
308 // the input `FromTime`. Received events that are not yet associated to an
309 // `IntoTime` are collected, and formed into a "chain batch": a sequence of
310 // chains that results from sorting the updates by `FromTime`, and then
311 // segmenting the sequence at elements where the partial order on `FromTime` is
312 // violated.
313 let mut stash = Vec::new();
314 while let Some(event) = events.pull() {
315 match event {
316 Event::Progress(changes) => {
317 source_frontier.update_iter(changes.drain(..));
318 }
319 Event::Messages(_, data) => stash.append(data),
320 }
321 }
322 stash.sort_unstable_by(|(_, t1, _): &(D, FromTime, R), (_, t2, _)| t1.cmp(t2));
323 let mut new_source_updates = ChainBatch::from_iter(stash);
324
325 // STEP 4: Reclock new and deferred updates
326 // We are now ready to step through the remap bindings in time order and
327 // perform the following actions:
328 // 4.1. Match `new_source_updates` against the entirety of bindings contained
329 // in the trace.
330 // 4.2. Match `deferred_source_updates` against the bindings that were just
331 // added in the trace.
332 // 4.3. Reclock `source_frontier` to calculate the new since frontier of the
333 // remap trace.
334 //
335 // The steps above only make sense to perform if there are any times for which
336 // we can correctly accumulate the remap trace, which is what we check here.
337 if remap_since.iter().all(|t| !remap_upper.less_equal(t)) {
338 let mut cur_binding = MutableAntichain::new();
339
340 let mut remap = remap_trace.iter().peekable();
341 let mut reclocked_source_frontier = remap_upper.clone();
342
343 // We go over all the times for which we might need to output data at. These times
344 // are restrticted to the times at which there exists an update in `remap_trace`
345 // and the minimum timestamp for the case where `remap_trace` is completely empty,
346 // in which case the minimum timestamp maps to the empty `FromTime` frontier and
347 // therefore all data events map to that minimum timestamp.
348 //
349 // The approach taken here will take time proportional to the number of elements in
350 // `remap_trace`. During development an alternative approach was considered where
351 // the updates in `remap_trace` are instead fully materialized into an ordered list
352 // of antichains in which every data update can be binary searched into. The are
353 // two concerns with this alternative approach that led to preferring this one:
354 // 1. Materializing very wide antichains with small differences between them
355 // needs memory proportial to the number of bindings times the width of the
356 // antichain.
357 // 2. It locks in the requirement of a totally ordered target timestamp since only
358 // in that case can one binary search a binding.
359 // The linear scan is expected to be fine due to the run-to-completion nature of
360 // the operator since its cost is amortized among the number of outstanding
361 // updates.
362 let mut min_time = IntoTime::minimum();
363 min_time.advance_by(remap_since.borrow());
364 interesting_times.push(min_time);
365 interesting_times.extend(remap_trace.iter().map(|(_, t, _)| t.clone()));
366 interesting_times.dedup();
367 for cur_time in interesting_times.drain(..) {
368 // 4.0. Load updates of `cur_time` from the trace into `cur_binding` to
369 // construct the `[FromTime]` frontier that `cur_time` maps to.
370 while let Some((t_from, _, diff)) = remap.next_if(|(_, t, _)| t == &cur_time) {
371 binding_buffer.push((t_from.clone(), *diff));
372 }
373 cur_binding.update_iter(binding_buffer.drain(..));
374 let cur_binding = cur_binding.frontier();
375
376 // 4.1. Extract updates from `new_source_updates`
377 for (data, _, diff) in new_source_updates.extract(cur_binding) {
378 session.give((data, cur_time.clone(), diff));
379 }
380
381 // 4.2. Extract updates from `deferred_source_updates`.
382 // The deferred updates contain all updates that were not able to be
383 // reclocked with the bindings until `prev_remap_upper`. For this reason
384 // we only need to reconsider these updates when we start looking at new
385 // bindings, i.e bindings that are beyond `prev_remap_upper`.
386 if prev_remap_upper.less_equal(&cur_time) {
387 deferred_source_updates.retain_mut(|batch| {
388 for (data, _, diff) in batch.extract(cur_binding) {
389 session.give((data, cur_time.clone(), diff));
390 }
391 // Retain non-empty batches
392 !batch.is_empty()
393 })
394 }
395
396 // 4.3. Reclock `source_frontier`
397 // If any FromTime in source frontier could possibly be reclocked to this
398 // binding then we must maintain our capability to emit data at that time
399 // and not compact past it.
400 if source_frontier
401 .frontier()
402 .iter()
403 .any(|t| !cur_binding.less_equal(t))
404 {
405 reclocked_source_frontier.insert(cur_time);
406 }
407 }
408
409 // STEP 5. Downgrade capability and compact remap trace
410 capset.downgrade(&reclocked_source_frontier.borrow());
411 remap_since = reclocked_source_frontier;
412 for (_, t, _) in remap_trace.iter_mut() {
413 t.advance_by(remap_since.borrow());
414 }
415 consolidation::consolidate_updates(&mut remap_trace);
416 remap_trace
417 .sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| t1.cmp(t2));
418 }
419
420 // STEP 6. Tidy up deferred updates
421 // Deferred updates are represented as a list of chain batches where each batch
422 // contains two times the updates of the batch proceeding it. This organization
423 // leads to a logarithmic number of batches with respect to the outstanding
424 // number of updates.
425 deferred_source_updates.sort_unstable_by_key(|b| Reverse(b.len()));
426 if !new_source_updates.is_empty() {
427 deferred_source_updates.push(new_source_updates);
428 }
429 let dsu = &mut deferred_source_updates;
430 while dsu.len() > 1 && (dsu[dsu.len() - 1].len() >= dsu[dsu.len() - 2].len() / 2) {
431 let a = dsu.pop().unwrap();
432 let b = dsu.pop().unwrap();
433 dsu.push(a.merge_with(b));
434 }
435 }
436 });
437
438 (Box::new(pusher), reclocked.as_collection())
439}
440
441/// A batch of differential updates that vary over some partial order. This type maintains the data
442/// as a set of chains that allows for efficient extraction of batches given a frontier.
443#[derive(Debug, PartialEq)]
444struct ChainBatch<D, T, R> {
445 /// A list of chains (sets of mutually comparable times) sorted by the partial order.
446 chains: Vec<VecDeque<(D, T, R)>>,
447}
448
449impl<D, T: Timestamp, R> ChainBatch<D, T, R> {
450 /// Extracts all updates with time not greater or equal to any time in `upper`.
451 fn extract<'a>(
452 &'a mut self,
453 upper: AntichainRef<'a, T>,
454 ) -> impl Iterator<Item = (D, T, R)> + 'a {
455 self.chains.retain(|chain| !chain.is_empty());
456 self.chains.iter_mut().flat_map(move |chain| {
457 // A chain is a sorted list of mutually comparable elements so we keep extracting
458 // elements that are not beyond upper.
459 std::iter::from_fn(move || {
460 let (_, into, _) = chain.front()?;
461 if !upper.less_equal(into) {
462 chain.pop_front()
463 } else {
464 None
465 }
466 })
467 })
468 }
469
470 fn merge_with(
471 mut self: ChainBatch<D, T, R>,
472 mut other: ChainBatch<D, T, R>,
473 ) -> ChainBatch<D, T, R>
474 where
475 D: ExchangeData,
476 T: Timestamp,
477 R: Semigroup,
478 {
479 let mut updates1 = self.chains.drain(..).flatten().peekable();
480 let mut updates2 = other.chains.drain(..).flatten().peekable();
481
482 let merged = std::iter::from_fn(|| {
483 match (updates1.peek(), updates2.peek()) {
484 (Some((d1, t1, _)), Some((d2, t2, _))) => {
485 match (t1, d1).cmp(&(t2, d2)) {
486 Ordering::Less => updates1.next(),
487 Ordering::Greater => updates2.next(),
488 // If the same (d, t) pair is found, consolidate their diffs
489 Ordering::Equal => {
490 let (d1, t1, mut r1) = updates1.next().unwrap();
491 while let Some((_, _, r)) =
492 updates1.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
493 {
494 r1.plus_equals(&r);
495 }
496 while let Some((_, _, r)) =
497 updates2.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
498 {
499 r1.plus_equals(&r);
500 }
501 Some((d1, t1, r1))
502 }
503 }
504 }
505 (Some(_), None) => updates1.next(),
506 (None, Some(_)) => updates2.next(),
507 (None, None) => None,
508 }
509 });
510
511 ChainBatch::from_iter(merged.filter(|(_, _, r)| !r.is_zero()))
512 }
513
514 /// Returns the number of updates in the batch.
515 fn len(&self) -> usize {
516 self.chains.iter().map(|chain| chain.len()).sum()
517 }
518
519 /// Returns true if the batch contains no updates.
520 fn is_empty(&self) -> bool {
521 self.len() == 0
522 }
523}
524
525impl<D, T: Timestamp, R> FromIterator<(D, T, R)> for ChainBatch<D, T, R> {
526 /// Computes the chain decomposition of updates according to the partial order `T`.
527 fn from_iter<I: IntoIterator<Item = (D, T, R)>>(updates: I) -> Self {
528 let mut chains = vec![];
529 let mut updates = updates.into_iter();
530 if let Some((d, t, r)) = updates.next() {
531 let mut chain = VecDeque::new();
532 chain.push_back((d, t, r));
533 for (d, t, r) in updates {
534 let prev_t = &chain[chain.len() - 1].1;
535 if !PartialOrder::less_equal(prev_t, &t) {
536 chains.push(chain);
537 chain = VecDeque::new();
538 }
539 chain.push_back((d, t, r));
540 }
541 chains.push(chain);
542 }
543 Self { chains }
544 }
545}
546
547#[cfg(test)]
548mod test {
549 use std::sync::atomic::AtomicUsize;
550 use std::sync::mpsc::{Receiver, TryRecvError};
551
552 use differential_dataflow::consolidation;
553 use differential_dataflow::input::{Input, InputSession};
554 use serde::{Deserialize, Serialize};
555 use timely::communication::allocator::Thread;
556 use timely::dataflow::operators::capture::{Event, Extract};
557 use timely::dataflow::operators::unordered_input::UnorderedHandle;
558 use timely::dataflow::operators::{ActivateCapability, Capture, UnorderedInput};
559 use timely::progress::PathSummary;
560 use timely::progress::timestamp::Refines;
561 use timely::worker::Worker;
562
563 use crate::capture::PusherCapture;
564 use crate::order::Partitioned;
565
566 use super::*;
567
568 type Diff = Overflowing<i64>;
569 type FromTime = Partitioned<u64, u64>;
570 type IntoTime = u64;
571 type BindingHandle<FromTime> = InputSession<IntoTime, FromTime, Diff>;
572 type DataHandle<D, FromTime> = (
573 UnorderedHandle<FromTime, (D, FromTime, Diff)>,
574 ActivateCapability<FromTime>,
575 );
576 type ReclockedStream<D> = Receiver<Event<IntoTime, Vec<(D, IntoTime, Diff)>>>;
577
578 /// A helper function that sets up a dataflow program to test the reclocking operator. Each
579 /// test provides a test logic closure which accepts four arguments:
580 ///
581 /// * A reference to the worker that allows the test to step the computation
582 /// * A `BindingHandle` that allows the test to manipulate the remap bindings
583 /// * A `DataHandle` that allows the test to submit the data to be reclocked
584 /// * A `ReclockedStream` that allows observing the result of the reclocking process
585 fn harness<FromTime, D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
586 where
587 FromTime: Timestamp + Refines<()>,
588 D: ExchangeData,
589 F: FnOnce(
590 &mut Worker<Thread>,
591 BindingHandle<FromTime>,
592 DataHandle<D, FromTime>,
593 ReclockedStream<D>,
594 ) -> R
595 + Send
596 + Sync
597 + 'static,
598 R: Send + 'static,
599 {
600 timely::execute_directly(move |worker| {
601 let (bindings, data, data_cap, reclocked) = worker.dataflow::<(), _, _>(|scope| {
602 let (bindings, data_pusher, reclocked) =
603 scope.scoped::<IntoTime, _, _>("IntoScope", move |scope| {
604 let (binding_handle, binding_collection) = scope.new_collection();
605 let (data_pusher, reclocked_collection) =
606 reclock(&binding_collection, as_of);
607 let reclocked_capture = reclocked_collection.inner.capture();
608 (binding_handle, data_pusher, reclocked_capture)
609 });
610
611 let (data, data_cap) = scope.scoped::<FromTime, _, _>("FromScope", move |scope| {
612 let ((handle, cap), data) = scope.new_unordered_input();
613 data.capture_into(PusherCapture(data_pusher));
614 (handle, cap)
615 });
616
617 (bindings, data, data_cap, reclocked)
618 });
619
620 test_logic(worker, bindings, (data, data_cap), reclocked)
621 })
622 }
623
624 /// Steps the worker four times which is the required number of times for both data and
625 /// frontier updates to propagate across the two scopes and into the probing channels.
626 fn step(worker: &mut Worker<Thread>) {
627 for _ in 0..4 {
628 worker.step();
629 }
630 }
631
632 #[mz_ore::test]
633 fn basic_reclocking() {
634 let as_of = Antichain::from_elem(IntoTime::minimum());
635 harness::<FromTime, _, _, _>(
636 as_of,
637 |worker, bindings, (mut data, data_cap), reclocked| {
638 // Reclock everything at the minimum IntoTime
639 bindings.close();
640 data.session(data_cap)
641 .give(('a', Partitioned::minimum(), Diff::ONE));
642 step(worker);
643 let extracted = reclocked.extract();
644 let expected = vec![(0, vec![('a', 0, Diff::ONE)])];
645 assert_eq!(extracted, expected);
646 },
647 )
648 }
649
650 /// Generates a `Partitioned<u64, u64>` Antichain where all the provided
651 /// partitions are at the specified offset and the gaps in between are filled with range
652 /// timestamps at offset zero.
653 fn partitioned_frontier<I>(items: I) -> Antichain<Partitioned<u64, u64>>
654 where
655 I: IntoIterator<Item = (u64, u64)>,
656 {
657 let mut frontier = Antichain::new();
658 let mut prev = 0;
659 for (pid, offset) in items {
660 if prev < pid {
661 frontier.insert(Partitioned::new_range(prev, pid - 1, 0));
662 }
663 frontier.insert(Partitioned::new_singleton(pid, offset));
664 prev = pid + 1
665 }
666 frontier.insert(Partitioned::new_range(prev, u64::MAX, 0));
667 frontier
668 }
669
670 #[mz_ore::test]
671 fn test_basic_usage() {
672 let as_of = Antichain::from_elem(IntoTime::minimum());
673 harness(
674 as_of,
675 |worker, mut bindings, (mut data, data_cap), reclocked| {
676 // Reclock offsets 1 and 3 to timestamp 1000
677 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
678 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
679 for time in partitioned_frontier([(0, 4)]) {
680 bindings.update_at(time, 1000, Diff::ONE);
681 }
682 bindings.advance_to(1001);
683 bindings.flush();
684 data.session(data_cap.clone()).give_iterator(
685 vec![
686 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
687 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
688 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
689 ]
690 .into_iter(),
691 );
692
693 step(worker);
694 assert_eq!(
695 reclocked.try_recv(),
696 Ok(Event::Messages(
697 0u64,
698 vec![
699 (1, 1000, Diff::ONE),
700 (1, 1000, Diff::ONE),
701 (3, 1000, Diff::ONE)
702 ]
703 ))
704 );
705 assert_eq!(
706 reclocked.try_recv(),
707 Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
708 );
709
710 // Reclock more messages for offsets 3 to the same timestamp
711 data.session(data_cap.clone()).give_iterator(
712 vec![
713 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
714 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
715 ]
716 .into_iter(),
717 );
718 step(worker);
719 assert_eq!(
720 reclocked.try_recv(),
721 Ok(Event::Messages(
722 1000u64,
723 vec![(3, 1000, Diff::ONE), (3, 1000, Diff::ONE)]
724 ))
725 );
726
727 // Drop the capability which should advance the reclocked frontier to 1001.
728 drop(data_cap);
729 step(worker);
730 assert_eq!(
731 reclocked.try_recv(),
732 Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
733 );
734 },
735 );
736 }
737
738 #[mz_ore::test]
739 fn test_reclock_frontier() {
740 let as_of = Antichain::from_elem(IntoTime::minimum());
741 harness::<_, (), _, _>(
742 as_of,
743 |worker, mut bindings, (_data, data_cap), reclocked| {
744 // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
745 // frontier.
746 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
747 bindings.advance_to(1);
748 bindings.flush();
749 step(worker);
750 assert_eq!(
751 reclocked.try_recv(),
752 Ok(Event::Progress(vec![(0, -1), (1, 1)]))
753 );
754
755 // Mint a couple of bindings for multiple partitions
756 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
757 for time in partitioned_frontier([(1, 10)]) {
758 bindings.update_at(time.clone(), 1000, Diff::ONE);
759 bindings.update_at(time, 2000, Diff::MINUS_ONE);
760 }
761 for time in partitioned_frontier([(1, 10), (2, 10)]) {
762 bindings.update_at(time, 2000, Diff::ONE);
763 }
764 bindings.advance_to(2001);
765 bindings.flush();
766
767 // The initial frontier should now map to the minimum between the two partitions
768 step(worker);
769 step(worker);
770 assert_eq!(
771 reclocked.try_recv(),
772 Ok(Event::Progress(vec![(1, -1), (1000, 1)]))
773 );
774
775 // Downgrade data frontier such that only one of the partitions is advanced
776 let mut part1_cap = data_cap.delayed(&Partitioned::new_singleton(1, 9));
777 let mut part2_cap = data_cap.delayed(&Partitioned::new_singleton(2, 0));
778 let _rest_cap = data_cap.delayed(&Partitioned::new_range(3, u64::MAX, 0));
779 drop(data_cap);
780 step(worker);
781 assert_eq!(reclocked.try_recv(), Err(TryRecvError::Empty));
782
783 // Downgrade the data frontier past the first binding
784 part1_cap.downgrade(&Partitioned::new_singleton(1, 10));
785 step(worker);
786 assert_eq!(
787 reclocked.try_recv(),
788 Ok(Event::Progress(vec![(1000, -1), (2000, 1)]))
789 );
790
791 // Downgrade the data frontier past the second binding
792 part2_cap.downgrade(&Partitioned::new_singleton(2, 10));
793 step(worker);
794 assert_eq!(
795 reclocked.try_recv(),
796 Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
797 );
798
799 // Advance the binding frontier and confirm that we get to the next timestamp
800 bindings.advance_to(3001);
801 bindings.flush();
802 step(worker);
803 assert_eq!(
804 reclocked.try_recv(),
805 Ok(Event::Progress(vec![(2001, -1), (3001, 1)]))
806 );
807 },
808 );
809 }
810
811 #[mz_ore::test]
812 fn test_reclock() {
813 let as_of = Antichain::from_elem(IntoTime::minimum());
814 harness(
815 as_of,
816 |worker, mut bindings, (mut data, data_cap), reclocked| {
817 // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
818 // frontier.
819 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
820
821 // Setup more precise capabilities for the rest of the test
822 let mut part0_cap = data_cap.delayed(&Partitioned::new_singleton(0, 0));
823 let rest_cap = data_cap.delayed(&Partitioned::new_range(1, u64::MAX, 0));
824 drop(data_cap);
825
826 // Reclock offsets 1 and 2 to timestamp 1000
827 data.session(part0_cap.clone()).give_iterator(
828 vec![
829 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
830 (2, Partitioned::new_singleton(0, 2), Diff::ONE),
831 ]
832 .into_iter(),
833 );
834
835 part0_cap.downgrade(&Partitioned::new_singleton(0, 3));
836 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
837 bindings.update_at(part0_cap.time().clone(), 1000, Diff::ONE);
838 bindings.update_at(rest_cap.time().clone(), 1000, Diff::ONE);
839 bindings.advance_to(1001);
840 bindings.flush();
841 step(worker);
842 assert_eq!(
843 reclocked.try_recv(),
844 Ok(Event::Messages(
845 0,
846 vec![(1, 1000, Diff::ONE), (2, 1000, Diff::ONE)]
847 ))
848 );
849 assert_eq!(
850 reclocked.try_recv(),
851 Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
852 );
853 assert_eq!(
854 reclocked.try_recv(),
855 Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
856 );
857
858 // Reclock offsets 3 and 4 to timestamp 2000
859 data.session(part0_cap.clone()).give_iterator(
860 vec![
861 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
862 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
863 (4, Partitioned::new_singleton(0, 4), Diff::ONE),
864 ]
865 .into_iter(),
866 );
867 bindings.update_at(part0_cap.time().clone(), 2000, Diff::MINUS_ONE);
868 part0_cap.downgrade(&Partitioned::new_singleton(0, 5));
869 bindings.update_at(part0_cap.time().clone(), 2000, Diff::ONE);
870 bindings.advance_to(2001);
871 bindings.flush();
872 step(worker);
873 assert_eq!(
874 reclocked.try_recv(),
875 Ok(Event::Messages(
876 1001,
877 vec![
878 (3, 2000, Diff::ONE),
879 (3, 2000, Diff::ONE),
880 (4, 2000, Diff::ONE)
881 ]
882 ))
883 );
884 assert_eq!(
885 reclocked.try_recv(),
886 Ok(Event::Progress(vec![(1001, -1), (2000, 1)]))
887 );
888 assert_eq!(
889 reclocked.try_recv(),
890 Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
891 );
892 },
893 );
894 }
895
896 #[mz_ore::test]
897 fn test_reclock_gh16318() {
898 let as_of = Antichain::from_elem(IntoTime::minimum());
899 harness(
900 as_of,
901 |worker, mut bindings, (mut data, data_cap), reclocked| {
902 // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
903 // frontier.
904 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
905 // First mint bindings for 0 at timestamp 1000
906 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
907 for time in partitioned_frontier([(0, 50)]) {
908 bindings.update_at(time, 1000, Diff::ONE);
909 }
910 // Then only for 1 at timestamp 2000
911 for time in partitioned_frontier([(0, 50)]) {
912 bindings.update_at(time, 2000, Diff::MINUS_ONE);
913 }
914 for time in partitioned_frontier([(0, 50), (1, 50)]) {
915 bindings.update_at(time, 2000, Diff::ONE);
916 }
917 // Then again only for 0 at timestamp 3000
918 for time in partitioned_frontier([(0, 50), (1, 50)]) {
919 bindings.update_at(time, 3000, Diff::MINUS_ONE);
920 }
921 for time in partitioned_frontier([(0, 100), (1, 50)]) {
922 bindings.update_at(time, 3000, Diff::ONE);
923 }
924 bindings.advance_to(3001);
925 bindings.flush();
926
927 // Reclockng (0, 50) must ignore the updates on the FromTime frontier that happened at
928 // timestamp 2000 since those are completely unrelated
929 data.session(data_cap)
930 .give((50, Partitioned::new_singleton(0, 50), Diff::ONE));
931 step(worker);
932 assert_eq!(
933 reclocked.try_recv(),
934 Ok(Event::Messages(0, vec![(50, 3000, Diff::ONE),]))
935 );
936 assert_eq!(
937 reclocked.try_recv(),
938 Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
939 );
940 assert_eq!(
941 reclocked.try_recv(),
942 Ok(Event::Progress(vec![(1000, -1), (3001, 1)]))
943 );
944 },
945 );
946 }
947
948 /// Test that compact(reclock(remap, source)) == reclock(compact(remap), source)
949 #[mz_ore::test]
950 fn test_compaction() {
951 let mut remap = vec![];
952 remap.push((Partitioned::minimum(), 0, Diff::ONE));
953 // Reclock offsets 1 and 2 to timestamp 1000
954 remap.push((Partitioned::minimum(), 1000, Diff::MINUS_ONE));
955 for time in partitioned_frontier([(0, 3)]) {
956 remap.push((time, 1000, Diff::ONE));
957 }
958 // Reclock offsets 3 and 4 to timestamp 2000
959 for time in partitioned_frontier([(0, 3)]) {
960 remap.push((time, 2000, Diff::MINUS_ONE));
961 }
962 for time in partitioned_frontier([(0, 5)]) {
963 remap.push((time, 2000, Diff::ONE));
964 }
965
966 let source_updates = vec![
967 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
968 (2, Partitioned::new_singleton(0, 2), Diff::ONE),
969 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
970 (4, Partitioned::new_singleton(0, 4), Diff::ONE),
971 ];
972
973 let since = Antichain::from_elem(1500);
974
975 // Compute reclock(remap, source)
976 let as_of = Antichain::from_elem(IntoTime::minimum());
977 let remap1 = remap.clone();
978 let source_updates1 = source_updates.clone();
979 let reclock_remap = harness(
980 as_of,
981 move |worker, mut bindings, (mut data, data_cap), reclocked| {
982 for (from_ts, into_ts, diff) in remap1 {
983 bindings.update_at(from_ts, into_ts, diff);
984 }
985 bindings.close();
986 data.session(data_cap)
987 .give_iterator(source_updates1.iter().cloned());
988 step(worker);
989 reclocked.extract()
990 },
991 );
992 // Compute compact(reclock(remap, source))
993 let mut compact_reclock_remap = reclock_remap;
994 for (t, updates) in compact_reclock_remap.iter_mut() {
995 t.advance_by(since.borrow());
996 for (_, t, _) in updates.iter_mut() {
997 t.advance_by(since.borrow());
998 }
999 }
1000
1001 // Compute compact(remap)
1002 let mut compact_remap = remap;
1003 for (_, t, _) in compact_remap.iter_mut() {
1004 t.advance_by(since.borrow());
1005 }
1006 consolidation::consolidate_updates(&mut compact_remap);
1007 // Compute reclock(compact(remap), source)
1008 let reclock_compact_remap = harness(
1009 since,
1010 move |worker, mut bindings, (mut data, data_cap), reclocked| {
1011 for (from_ts, into_ts, diff) in compact_remap {
1012 bindings.update_at(from_ts, into_ts, diff);
1013 }
1014 bindings.close();
1015 data.session(data_cap)
1016 .give_iterator(source_updates.iter().cloned());
1017 step(worker);
1018 reclocked.extract()
1019 },
1020 );
1021
1022 let expected = vec![(
1023 1500,
1024 vec![
1025 (1, 1500, Diff::ONE),
1026 (2, 1500, Diff::ONE),
1027 (3, 2000, Diff::ONE),
1028 (4, 2000, Diff::ONE),
1029 ],
1030 )];
1031 assert_eq!(expected, reclock_compact_remap);
1032 assert_eq!(expected, compact_reclock_remap);
1033 }
1034
1035 #[mz_ore::test]
1036 fn test_chainbatch_merge() {
1037 let a = ChainBatch::from_iter([('a', 0, 1)]);
1038 let b = ChainBatch::from_iter([('a', 0, -1), ('a', 1, 1)]);
1039 assert_eq!(a.merge_with(b), ChainBatch::from_iter([('a', 1, 1)]));
1040 }
1041
1042 #[mz_ore::test]
1043 #[cfg_attr(miri, ignore)] // too slow
1044 fn test_binding_consolidation() {
1045 use std::sync::atomic::Ordering;
1046
1047 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1048 struct Time(u64);
1049
1050 // A counter of the number of active Time instances
1051 static INSTANCES: AtomicUsize = AtomicUsize::new(0);
1052
1053 impl Time {
1054 fn new(time: u64) -> Self {
1055 INSTANCES.fetch_add(1, Ordering::Relaxed);
1056 Self(time)
1057 }
1058 }
1059
1060 impl Clone for Time {
1061 fn clone(&self) -> Self {
1062 INSTANCES.fetch_add(1, Ordering::Relaxed);
1063 Self(self.0)
1064 }
1065 }
1066
1067 impl Drop for Time {
1068 fn drop(&mut self) {
1069 INSTANCES.fetch_sub(1, Ordering::Relaxed);
1070 }
1071 }
1072
1073 impl Timestamp for Time {
1074 type Summary = ();
1075
1076 fn minimum() -> Self {
1077 Time::new(0)
1078 }
1079 }
1080
1081 impl PathSummary<Time> for () {
1082 fn results_in(&self, src: &Time) -> Option<Time> {
1083 Some(src.clone())
1084 }
1085
1086 fn followed_by(&self, _other: &()) -> Option<Self> {
1087 Some(())
1088 }
1089 }
1090
1091 impl Refines<()> for Time {
1092 fn to_inner(_: ()) -> Self {
1093 Self::minimum()
1094 }
1095 fn to_outer(self) -> () {}
1096 fn summarize(_path: ()) {}
1097 }
1098
1099 impl PartialOrder for Time {
1100 fn less_equal(&self, other: &Self) -> bool {
1101 self.0.less_equal(&other.0)
1102 }
1103 }
1104
1105 let as_of = 1000;
1106
1107 // Test that supplying a single big batch of unconsolidated bindings gets
1108 // consolidated after a single worker step.
1109 harness::<Time, u64, _, _>(
1110 Antichain::from_elem(as_of),
1111 move |worker, mut bindings, _, _| {
1112 step(worker);
1113 let instances_before = INSTANCES.load(Ordering::Relaxed);
1114 for ts in 0..as_of {
1115 if ts > 0 {
1116 bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1117 }
1118 bindings.update_at(Time::new(ts), ts, Diff::ONE);
1119 }
1120 bindings.advance_to(as_of);
1121 bindings.flush();
1122 step(worker);
1123 let instances_after = INSTANCES.load(Ordering::Relaxed);
1124 // The extra instances live in a ChangeBatch which considers compaction when more
1125 // than 32 elements are inside.
1126 assert!(instances_after - instances_before < 32);
1127 },
1128 );
1129
1130 // Test that a slow feed of uncompacted bindings over multiple steps never leads to an
1131 // excessive number of bindings held in memory.
1132 harness::<Time, u64, _, _>(
1133 Antichain::from_elem(as_of),
1134 move |worker, mut bindings, _, _| {
1135 step(worker);
1136 let instances_before = INSTANCES.load(Ordering::Relaxed);
1137 for ts in 0..as_of {
1138 if ts > 0 {
1139 bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1140 }
1141 bindings.update_at(Time::new(ts), ts, Diff::ONE);
1142 bindings.advance_to(ts + 1);
1143 bindings.flush();
1144 step(worker);
1145 let instances_now = INSTANCES.load(Ordering::Relaxed);
1146 // The extra instances live in a ChangeBatch which considers compaction when
1147 // more than 32 elements are inside.
1148 assert!(instances_now - instances_before < 32);
1149 }
1150 },
1151 );
1152 }
1153}