mz_timely_util/reclock.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! ## Notation
17//!
18//! Collections are represented with capital letters (T, S, R), collection traces as bold letters
19//! (π, π, π), and difference traces as Ξ΄π.
20//!
21//! Indexing a collection trace π to obtain its version at `t` is written as π(t). Indexing a
22//! collection to obtain the multiplicity of a record `x` is written as T\[x\]. These can be combined
23//! to obtain the multiplicity of a record `x` at some version `t` as π(t)\[x\].
24//!
25//! ## Overview
26//!
27//! Reclocking transforms a source collection `S` that evolves with some timestamp `FromTime` into
28//! a collection `T` that evolves with some other timestamp `IntoTime`. The reclocked collection T
29//! contains all updates `u β S` that are not beyond some `FromTime` frontier R(t). The collection
30//! `R` is called the remap collection.
31//!
32//! More formally, for some arbitrary time `t` of `IntoTime` and some arbitrary record `x`, the
33//! reclocked collection `T(t)[x]` is defined to be the `sum{Ξ΄π(s)[x]: !(π(t) βͺ― s)}`. Since this
34//! holds for any record we can write the definition of Reclock(π, π) as:
35//!
36//! > Reclock(π, π) β π: β t β IntoTime : π(t) = sum{Ξ΄π(s): !(π(t) βͺ― s)}
37//!
38//! In order for the reclocked collection `T` to have a sensible definition of progress we require
39//! that `t1 β€ t2 β π(t1) βͺ― π(t2)` where the first `β€` is the partial order of `IntoTime` and the
40//! second one the partial order of `FromTime` antichains.
41//!
42//! ## Total order simplification
43//!
44//! In order to simplify the implementation we will require that `IntoTime` is a total order. This
45//! limitation can be lifted in the future but further elaboration on the mechanics of reclocking
46//! is required to ensure a correct implementation.
47//!
48//! ## The difference trace
49//!
50//! By the definition of difference traces we have:
51//!
52//! ```text
53//! Ξ΄π(t) = T(t) - sum{Ξ΄π(s): s < t}
54//! ```
55//!
56//! Due to the total order assumption we only need to consider two cases.
57//!
58//! **Case 1:** `t` is the minimum timestamp
59//!
60//! In this case `sum{Ξ΄π(s): s < t}` is the empty set and so we obtain:
61//!
62//! ```text
63//! Ξ΄π(min) = T(min) = sum{Ξ΄π(s): !(π(min) β€ s}
64//! ```
65//!
66//! **Case 2:** `t` is a timestamp with a predecessor `prev`
67//!
68//! In this case `sum{Ξ΄π(s): s < t}` is equal to `T(prev)` because:
69//!
70//! ```text
71//! sum{Ξ΄π(s): s < t} = sum{Ξ΄π(s): s β€ prev} + sum{Ξ΄π(s): prev < s < t}
72//! = T(prev) + β
73//! = T(prev)
74//! ```
75//!
76//! And therefore the difference trace of T is:
77//!
78//! ```text
79//! Ξ΄π(t) = π(t) - π(prev)
80//! = sum{Ξ΄π(s): !(π(t) βͺ― s)} - sum{Ξ΄π(s): !(π(prev) βͺ― s)}
81//! = sum{Ξ΄π(s): (π(prev) βͺ― s) β§ !(π(t) βͺ― s)}
82//! ```
83//!
84//! ## Unique mapping property
85//!
86//! Given the definition above we can derive the fact that for any source difference Ξ΄π(s) there is
87//! at most one target timestamp t that it must be reclocked to. This property can be exploited by
88//! the implementation of the operator as it can safely discard source updates once a matching
89//! Ξ΄T(t) has been found, making it "stateless" with respect to the source trace. A formal proof of
90//! this property is [provided below](#unique-mapping-property-proof).
91//!
92//! ## Operational description
93//!
94//! The operator follows a run-to-completion model where on each scheduling it completes all
95//! outstanding work that can be completed.
96//!
97//! ### Unique mapping property proof
98//!
99//! This section contains the formal proof the unique mapping property. The proof follows the
100//! structure proof notation created by Leslie Lamport. Readers unfamiliar with structured proofs
101//! can read about them here <https://lamport.azurewebsites.net/pubs/proof.pdf>.
102//!
103//! #### Statement
104//!
105//! AtMostOne(X, Ο(x)) β β x1, x2 β X : Ο(x1) β§ Ο(x2) β x1 = x2
106//!
107//! * **THEOREM** UniqueMapping β
108//! * **ASSUME**
109//! * **NEW** (FromTime, βͺ―) β PartiallyOrderedTimestamps
110//! * **NEW** (IntoTime, β€) β TotallyOrderedTimestamps
111//! * **NEW** π β SetOfCollectionTraces(FromTime)
112//! * **NEW** π β SetOfCollectionTraces(IntoTime)
113//! * β t β IntoTime: π(t) β SetOfAntichains(FromTime)
114//! * β t1, t1 β IntoTime: t1 β€ t2 β π(t1) βͺ― π(t2)
115//! * **NEW** π = Reclock(π, π)
116//! * **PROVE** β s β FromTime : AtMostOne(IntoTime, Ξ΄π(s) β Ξ΄π(x))
117//!
118//! #### Proof
119//!
120//! 1. **SUFFICES ASSUME** β s β FromTime: Β¬AtMostOne(IntoTime, Ξ΄π(s) β Ξ΄π(x))
121//! * **PROVE FALSE**
122//! * _By proof by contradiction._
123//! 2. **PICK** s β FromTime : Β¬AtMostOne(IntoTime, Ξ΄π(s) β Ξ΄π(x))
124//! * _Proof: Such time exists by <1>1._
125//! 3. β t1, t2 β IntoTime : t1 β t2 β§ Ξ΄π(s) β Ξ΄π(t1) β§ Ξ΄π(s) β Ξ΄π(t2)
126//! 1. Β¬(β x1, x2 β X : (Ξ΄π(s) β Ξ΄π(x1)) β§ (Ξ΄π(s) β Ξ΄π(x2)) β x1 = x2)
127//! * _Proof: By <1>2 and definition of AtMostOne._
128//! 2. Q.E.D
129//! * _Proof: By <2>1, quantifier negation rules, and theorem of propositional logic Β¬(P β Q) β‘ P β§ Β¬Q._
130//! 4. **PICK** t1, t2 β IntoTime : t1 < t2 β§ Ξ΄π(s) β Ξ΄π(t1) β§ Ξ΄π(s) β Ξ΄π(t2)
131//! * _Proof: By <1>3. Assume t1 < t2 without loss of generality._
132//! 5. Β¬(π(t1) βͺ― s)
133//! 1. **CASE** t1 = min(IntoTime)
134//! 1. Ξ΄π(t1) = sum{Ξ΄π(s): !(π(t1)) βͺ― s}
135//! * _Proof: By definition of Ξ΄π(min)._
136//! 2. Ξ΄π(s) β Ξ΄π(t1)
137//! * _Proof: By <1>4._
138//! 3. Q.E.D
139//! * _Proof: By <3>1 and <3>2._
140//! 2. **CASE** t1 > min(IntoTime)
141//! 1. **PICK** t1_prev = Predecessor(t1)
142//! * _Proof: Predecessor exists because the set {t: t < t1} is non-empty since it must contain at least min(IntoTime)._
143//! 2. Ξ΄π(t1) = sum{Ξ΄π(s): (π(t1_prev) βͺ― s) β§ !(π(t1) βͺ― s)}
144//! * _Proof: By definition of Ξ΄π(t)._
145//! 3. Ξ΄π(s) β Ξ΄π(t1)
146//! * _Proof: By <1>4._
147//! 3. Q.E.D
148//! * _Proof: By <3>2 and <3>3._
149//! 3. Q.E.D
150//! * _Proof: From cases <2>1 and <2>2 which are exhaustive_
151//! 6. **PICK** t2_prev β IntoTime : t2_prev = Predecessor(t2)
152//! * _Proof: Predecessor exists because by <1>4 the set {t: t < t2} is non empty since it must contain at least t1._
153//! 7. t1 β€ t2_prev
154//! * _Proof: t1 β {t: t < t2} and t2_prev is the maximum element of the set._
155//! 8. π(t2) βͺ― s
156//! 1. t2 > min(IntoTime)
157//! * _Proof: By <1>5._
158//! 2. **PICK** t2_prev = Predecessor(t2)
159//! * _Proof: Predecessor exists because the set {t: t < t2} is non-empty since it must contain at least min(IntoTime)._
160//! 3. Ξ΄π(t) = sum{Ξ΄π(s): (π(t2_prev) βͺ― s) β§ !(π(t) βͺ― s)}
161//! * _Proof: By definition of Ξ΄π(t)_
162//! 4. Ξ΄π(s) β Ξ΄π(t1)
163//! * _Proof: By <1>4._
164//! 5. Q.E.D
165//! * _Proof: By <2>3 and <2>4._
166//! 9. π(t1) βͺ― π(t2_prev)
167//! * _Proof: By <1>.7 and hypothesis on R_
168//! 10. π(t1) βͺ― s
169//! * _Proof: By <1>8 and <1>9._
170//! 11. Q.E.D
171//! * _Proof: By <1>5 and <1>10_
172
173use std::cmp::{Ordering, Reverse};
174use std::collections::VecDeque;
175use std::collections::binary_heap::{BinaryHeap, PeekMut};
176use std::iter::FromIterator;
177
178use differential_dataflow::difference::Semigroup;
179use differential_dataflow::lattice::Lattice;
180use differential_dataflow::{AsCollection, ExchangeData, VecCollection, consolidation};
181use mz_ore::Overflowing;
182use mz_ore::collections::CollectionExt;
183use timely::communication::{Pull, Push};
184use timely::dataflow::channels::pact::Pipeline;
185use timely::dataflow::operators::CapabilitySet;
186use timely::dataflow::operators::capture::Event;
187use timely::dataflow::operators::generic::OutputBuilder;
188use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
189use timely::order::{PartialOrder, TotalOrder};
190use timely::progress::frontier::{AntichainRef, MutableAntichain};
191use timely::progress::{Antichain, Timestamp};
192
193/// Constructs an operator that reclocks a `source` collection varying with some time `FromTime`
194/// into the corresponding `reclocked` collection varying over some time `IntoTime` using the
195/// provided `remap` collection.
196///
197/// In order for the operator to read the `source` collection a `Pusher` is returned which can be
198/// used with timely's capture facilities to connect a collection from a foreign scope to this
199/// operator.
200pub fn reclock<'scope, D, FromTime, IntoTime, R>(
201 remap_collection: VecCollection<'scope, IntoTime, FromTime, Overflowing<i64>>,
202 as_of: Antichain<IntoTime>,
203) -> (
204 Box<dyn Push<Event<FromTime, Vec<(D, FromTime, R)>>>>,
205 VecCollection<'scope, IntoTime, D, R>,
206)
207where
208 D: ExchangeData,
209 FromTime: Timestamp,
210 IntoTime: Timestamp + Lattice + TotalOrder,
211 R: Semigroup + 'static,
212{
213 let scope = remap_collection.scope();
214 let mut builder = OperatorBuilder::new("Reclock".into(), scope.clone());
215 // Here we create a channel that can be used to send data from a foreign scope into this
216 // operator. The channel is associated with this operator's address so that it is activated
217 // every time events are available for consumption. This mechanism is similar to Timely's input
218 // handles where data can be introduced into a timely scope from an exogenous source.
219 let info = builder.operator_info();
220 let channel_id = scope.worker().new_identifier();
221 let (pusher, mut events) = scope
222 .worker()
223 .pipeline::<Event<FromTime, Vec<(D, FromTime, R)>>>(channel_id, info.address);
224
225 let mut remap_input = builder.new_input(remap_collection.inner, Pipeline);
226 let (output, reclocked) = builder.new_output();
227 let mut output = OutputBuilder::from(output);
228
229 builder.build(move |caps| {
230 let mut capset = CapabilitySet::from_elem(caps.into_element());
231 capset.downgrade(&as_of.borrow());
232
233 // Received remap updates at times `into_time` greater or equal to `remap_input`'s input
234 // frontier. As the input frontier advances, we drop elements out of this priority queue
235 // and mint new associations.
236 let mut pending_remap: BinaryHeap<Reverse<(IntoTime, FromTime, i64)>> = BinaryHeap::new();
237 // A trace of `remap_input` that accumulates correctly for all times that are beyond
238 // `remap_since` and not beyond `remap_upper`. The updates in `remap_trace` are maintained
239 // in time order. An actual DD trace could be used here at the expense of a more
240 // complicated API to traverse it. This is left for future work if the naive trace
241 // maintenance implemented in this operator becomes problematic.
242 let mut remap_upper = Antichain::from_elem(IntoTime::minimum());
243 let mut remap_since = as_of.clone();
244 let mut remap_trace = VecDeque::new();
245
246 // A stash of source updates for which we don't know the corresponding binding yet.
247 let mut deferred_source_updates: Vec<ChainBatch<_, _, _>> = Vec::new();
248 // The frontier of the `events` input
249 let mut source_frontier = MutableAntichain::from_elem(FromTime::minimum());
250
251 let mut binding_buffer = Vec::new();
252
253 // Accumulation buffer for `remap_input` updates.
254 use timely::progress::ChangeBatch;
255 let mut remap_accum_buffer: ChangeBatch<(IntoTime, FromTime)> = ChangeBatch::new();
256
257 // The operator drains `remap_input` and organizes new bindings that are not beyond
258 // `remap_input`'s frontier into the time ordered `remap_trace`.
259 //
260 // All received data events can either be reclocked to a time included in the
261 // `remap_trace`, or deferred until new associations are minted. Each data event that
262 // happens at some `FromTime` is mapped to the first `IntoTime` whose associated antichain
263 // is not less or equal to the input `FromTime`.
264 //
265 // As progress events are received from the `events` input, we can advance our
266 // held capability to track the least `IntoTime` a newly received `FromTime` could possibly
267 // map to and also compact the maintained `remap_trace` to that time.
268 move |frontiers| {
269 let Some(cap) = capset.get(0).cloned() else {
270 return;
271 };
272 let mut output = output.activate();
273 let mut session = output.session(&cap);
274
275 // STEP 1. Accept new bindings into `pending_remap`.
276 // Advance all `into` times by `as_of`, and consolidate all updates at that frontier.
277 remap_input.for_each(|_, data| {
278 for (from, mut into, diff) in data.drain(..) {
279 into.advance_by(as_of.borrow());
280 remap_accum_buffer.update((into, from), diff.into_inner());
281 }
282 });
283 // Drain consolidated bindings into the `pending_remap` heap.
284 // Only do this once any of the `remap_input` frontier has passed `as_of`.
285 // For as long as the input frontier is less-equal `as_of`, we have no finalized times.
286 if !PartialOrder::less_equal(&frontiers[0].frontier(), &as_of.borrow()) {
287 for ((into, from), diff) in remap_accum_buffer.drain() {
288 pending_remap.push(Reverse((into, from, diff)));
289 }
290 }
291
292 // STEP 2. Extract bindings not beyond `remap_frontier` and commit them into `remap_trace`.
293 let prev_remap_upper =
294 std::mem::replace(&mut remap_upper, frontiers[0].frontier().to_owned());
295 while let Some(update) = pending_remap.peek_mut() {
296 if !remap_upper.less_equal(&update.0.0) {
297 let Reverse((into, from, diff)) = PeekMut::pop(update);
298 remap_trace.push_back((from, into, diff));
299 } else {
300 break;
301 }
302 }
303
304 // STEP 3. Receive new data updates
305 // The `events` input describes arbitrary progress and data over `FromTime`,
306 // which must be translated to `IntoTime`. Each `FromTime` can be found as the
307 // first `IntoTime` associated with a `[FromTime]` that is not less or equal to
308 // the input `FromTime`. Received events that are not yet associated to an
309 // `IntoTime` are collected, and formed into a "chain batch": a sequence of
310 // chains that results from sorting the updates by `FromTime`, and then
311 // segmenting the sequence at elements where the partial order on `FromTime` is
312 // violated.
313 let mut stash = Vec::new();
314 // Consolidate progress updates before applying them to `source_frontier`, to avoid quadratic
315 // behavior in overload scenarios.
316 let mut change_batch = ChangeBatch::<FromTime, 2>::default();
317 while let Some(event) = events.pull() {
318 match event {
319 Event::Progress(changes) => {
320 change_batch.extend(changes.drain(..));
321 }
322 Event::Messages(_, data) => stash.append(data),
323 }
324 }
325 source_frontier.update_iter(change_batch.drain());
326 stash.sort_unstable_by(|(_, t1, _): &(D, FromTime, R), (_, t2, _)| t1.cmp(t2));
327 let mut new_source_updates = ChainBatch::from_iter(stash);
328
329 // STEP 4: Reclock new and deferred updates
330 // We are now ready to step through the remap bindings in time order and
331 // perform the following actions:
332 // 4.1. Match `new_source_updates` against the entirety of bindings contained
333 // in the trace.
334 // 4.2. Match `deferred_source_updates` against the bindings that were just
335 // added in the trace.
336 // 4.3. Reclock `source_frontier` to calculate the new since frontier of the
337 // remap trace.
338 //
339 // The steps above only make sense to perform if there are any times for which
340 // we can correctly accumulate the remap trace, which is what we check here.
341 if remap_since.iter().all(|t| !remap_upper.less_equal(t)) {
342 let mut cur_binding = MutableAntichain::new();
343
344 let mut remap = remap_trace.iter().peekable();
345 let mut reclocked_source_frontier = remap_upper.clone();
346
347 // We go over all the times for which we might need to output data at. These times
348 // are restrticted to the times at which there exists an update in `remap_trace`
349 // and the minimum timestamp for the case where `remap_trace` is completely empty,
350 // in which case the minimum timestamp maps to the empty `FromTime` frontier and
351 // therefore all data events map to that minimum timestamp.
352 //
353 // The approach taken here will take time proportional to the number of elements in
354 // `remap_trace`. During development an alternative approach was considered where
355 // the updates in `remap_trace` are instead fully materialized into an ordered list
356 // of antichains in which every data update can be binary searched into. The are
357 // two concerns with this alternative approach that led to preferring this one:
358 // 1. Materializing very wide antichains with small differences between them
359 // needs memory proportial to the number of bindings times the width of the
360 // antichain.
361 // 2. It locks in the requirement of a totally ordered target timestamp since only
362 // in that case can one binary search a binding.
363 // The linear scan is expected to be fine due to the run-to-completion nature of
364 // the operator since its cost is amortized among the number of outstanding
365 // updates.
366 let mut min_time = IntoTime::minimum();
367 min_time.advance_by(remap_since.borrow());
368 let mut prev_cur_time = None;
369 let mut interesting_times = std::iter::once(&min_time)
370 .chain(remap_trace.iter().map(|(_, t, _)| t))
371 .filter(|&v| {
372 let prev = prev_cur_time.replace(v);
373 prev != prev_cur_time
374 });
375 let mut frontier_reclocked = false;
376 while !(new_source_updates.is_empty()
377 && deferred_source_updates.is_empty()
378 && frontier_reclocked)
379 && let Some(cur_time) = interesting_times.next()
380 {
381 // 4.0. Load updates of `cur_time` from the trace into `cur_binding` to
382 // construct the `[FromTime]` frontier that `cur_time` maps to.
383 while let Some((t_from, _, diff)) = remap.next_if(|(_, t, _)| t == cur_time) {
384 binding_buffer.push((t_from.clone(), *diff));
385 }
386 cur_binding.update_iter(binding_buffer.drain(..));
387 let cur_binding = cur_binding.frontier();
388
389 // 4.1. Extract updates from `new_source_updates`
390 for (data, _, diff) in new_source_updates.extract(cur_binding) {
391 session.give((data, cur_time.clone(), diff));
392 }
393
394 // 4.2. Extract updates from `deferred_source_updates`.
395 // The deferred updates contain all updates that were not able to be
396 // reclocked with the bindings until `prev_remap_upper`. For this reason
397 // we only need to reconsider these updates when we start looking at new
398 // bindings, i.e bindings that are beyond `prev_remap_upper`.
399 if prev_remap_upper.less_equal(cur_time) {
400 deferred_source_updates.retain_mut(|batch| {
401 for (data, _, diff) in batch.extract(cur_binding) {
402 session.give((data, cur_time.clone(), diff));
403 }
404 // Retain non-empty batches
405 !batch.is_empty()
406 })
407 }
408
409 // 4.3. Reclock `source_frontier`
410 // If any FromTime in source frontier could possibly be reclocked to this
411 // binding then we must maintain our capability to emit data at that time
412 // and not compact past it. Since we iterate over this loop in time order
413 // and IntoTime is a total order we only need to perform this step once.
414 // Once a `cur_time` is inserted into `reclocked_source_frontier` no more
415 // changes can be made to the frontier by inserting times later in the
416 // loop.
417 if !frontier_reclocked
418 && source_frontier
419 .frontier()
420 .iter()
421 .any(|t| !cur_binding.less_equal(t))
422 {
423 reclocked_source_frontier.insert(cur_time.clone());
424 frontier_reclocked = true;
425 }
426 }
427
428 // STEP 5. Downgrade capability and compact remap trace if our since frontier
429 // advanced
430 if PartialOrder::less_than(&remap_since, &reclocked_source_frontier) {
431 capset.downgrade(&reclocked_source_frontier.borrow());
432 remap_since = reclocked_source_frontier;
433
434 // The remap trace is stored in time order and T is a total order. The updates
435 // that can have their time advanced will form a prefix, which we extract here.
436 let mut advanced = vec![];
437 while !remap_trace.is_empty() && !remap_since.less_equal(&remap_trace[0].1) {
438 let (d, mut t, r) = remap_trace.pop_front().unwrap();
439 t.advance_by(remap_since.borrow());
440 advanced.push((d, t, r));
441 }
442 if !advanced.is_empty() {
443 // If we have updates whose time was advanced, further peel the prefix of
444 // updates that sit *at* the time of the since frontier.
445 while !remap_trace.is_empty() && remap_since.contains(&remap_trace[0].1) {
446 advanced.push(remap_trace.pop_front().unwrap());
447 }
448 consolidation::consolidate_updates(&mut advanced);
449 advanced.sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| {
450 t1.cmp(t2)
451 });
452 for u in advanced.into_iter().rev() {
453 remap_trace.push_front(u);
454 }
455 // If using less than a quarter of the capacity, shrink the container. To avoid having
456 // to resize the container on a subsequent push, shrink to 2x the length, which is
457 // what push would grow it to.
458 if remap_trace.len() < remap_trace.capacity() / 4 {
459 remap_trace.shrink_to(remap_trace.len() * 2);
460 }
461 }
462 }
463 }
464
465 // STEP 6. Tidy up deferred updates
466 // Deferred updates are represented as a list of chain batches where each batch
467 // contains two times the updates of the batch proceeding it. This organization
468 // leads to a logarithmic number of batches with respect to the outstanding
469 // number of updates.
470 deferred_source_updates.sort_unstable_by_key(|b| Reverse(b.len()));
471 if !new_source_updates.is_empty() {
472 deferred_source_updates.push(new_source_updates);
473 }
474 let dsu = &mut deferred_source_updates;
475 while dsu.len() > 1 && (dsu[dsu.len() - 1].len() >= dsu[dsu.len() - 2].len() / 2) {
476 let a = dsu.pop().unwrap();
477 let b = dsu.pop().unwrap();
478 dsu.push(a.merge_with(b));
479 }
480
481 // If using less than a quarter of the capacity, shrink the container. To avoid having
482 // to resize the container on a subsequent push, shrink to 2x the length, which is
483 // what push would grow it to.
484 if deferred_source_updates.len() < deferred_source_updates.capacity() / 4 {
485 deferred_source_updates.shrink_to(deferred_source_updates.len() * 2);
486 }
487 }
488 });
489
490 (Box::new(pusher), reclocked.as_collection())
491}
492
493/// A batch of differential updates that vary over some partial order. This type maintains the data
494/// as a set of chains that allows for efficient extraction of batches given a frontier.
495#[derive(Debug, PartialEq)]
496struct ChainBatch<D, T, R> {
497 /// A list of chains (sets of mutually comparable times) sorted by the partial order.
498 chains: Vec<VecDeque<(D, T, R)>>,
499}
500
501impl<D, T: Timestamp, R> ChainBatch<D, T, R> {
502 /// Extracts all updates with time not greater or equal to any time in `upper`.
503 fn extract<'a>(
504 &'a mut self,
505 upper: AntichainRef<'a, T>,
506 ) -> impl Iterator<Item = (D, T, R)> + 'a {
507 self.chains.retain(|chain| !chain.is_empty());
508 self.chains.iter_mut().flat_map(move |chain| {
509 // A chain is a sorted list of mutually comparable elements so we keep extracting
510 // elements that are not beyond upper.
511 std::iter::from_fn(move || {
512 let (_, into, _) = chain.front()?;
513 if !upper.less_equal(into) {
514 chain.pop_front()
515 } else {
516 None
517 }
518 })
519 })
520 }
521
522 fn merge_with(
523 mut self: ChainBatch<D, T, R>,
524 mut other: ChainBatch<D, T, R>,
525 ) -> ChainBatch<D, T, R>
526 where
527 D: ExchangeData,
528 T: Timestamp,
529 R: Semigroup,
530 {
531 let mut updates1 = self.chains.drain(..).flatten().peekable();
532 let mut updates2 = other.chains.drain(..).flatten().peekable();
533
534 let merged = std::iter::from_fn(|| {
535 match (updates1.peek(), updates2.peek()) {
536 (Some((d1, t1, _)), Some((d2, t2, _))) => {
537 match (t1, d1).cmp(&(t2, d2)) {
538 Ordering::Less => updates1.next(),
539 Ordering::Greater => updates2.next(),
540 // If the same (d, t) pair is found, consolidate their diffs
541 Ordering::Equal => {
542 let (d1, t1, mut r1) = updates1.next().unwrap();
543 while let Some((_, _, r)) =
544 updates1.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
545 {
546 r1.plus_equals(&r);
547 }
548 while let Some((_, _, r)) =
549 updates2.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
550 {
551 r1.plus_equals(&r);
552 }
553 Some((d1, t1, r1))
554 }
555 }
556 }
557 (Some(_), None) => updates1.next(),
558 (None, Some(_)) => updates2.next(),
559 (None, None) => None,
560 }
561 });
562
563 ChainBatch::from_iter(merged.filter(|(_, _, r)| !r.is_zero()))
564 }
565
566 /// Returns the number of updates in the batch.
567 fn len(&self) -> usize {
568 self.chains.iter().map(|chain| chain.len()).sum()
569 }
570
571 /// Returns true if the batch contains no updates.
572 fn is_empty(&self) -> bool {
573 self.len() == 0
574 }
575}
576
577impl<D, T: Timestamp, R> FromIterator<(D, T, R)> for ChainBatch<D, T, R> {
578 /// Computes the chain decomposition of updates according to the partial order `T`.
579 fn from_iter<I: IntoIterator<Item = (D, T, R)>>(updates: I) -> Self {
580 let mut chains = vec![];
581 let mut updates = updates.into_iter();
582 if let Some((d, t, r)) = updates.next() {
583 let mut chain = VecDeque::new();
584 chain.push_back((d, t, r));
585 for (d, t, r) in updates {
586 let prev_t = &chain[chain.len() - 1].1;
587 if !PartialOrder::less_equal(prev_t, &t) {
588 chains.push(chain);
589 chain = VecDeque::new();
590 }
591 chain.push_back((d, t, r));
592 }
593 chains.push(chain);
594 }
595 Self { chains }
596 }
597}
598
599#[cfg(test)]
600mod test {
601 use std::sync::atomic::AtomicUsize;
602 use std::sync::mpsc::{Receiver, TryRecvError};
603
604 use differential_dataflow::consolidation;
605 use differential_dataflow::input::{Input, InputSession};
606 use serde::{Deserialize, Serialize};
607 use timely::dataflow::operators::capture::{Event, Extract};
608 use timely::dataflow::operators::vec::UnorderedInput;
609 use timely::dataflow::operators::vec::unordered_input::UnorderedHandle;
610 use timely::dataflow::operators::{ActivateCapability, Capture};
611 use timely::progress::PathSummary;
612 use timely::progress::timestamp::Refines;
613 use timely::worker::Worker;
614
615 use crate::capture::PusherCapture;
616 use crate::order::Partitioned;
617
618 use super::*;
619
620 type Diff = Overflowing<i64>;
621 type FromTime = Partitioned<u64, u64>;
622 type IntoTime = u64;
623 type BindingHandle<FromTime> = InputSession<IntoTime, FromTime, Diff>;
624 type DataHandle<D, FromTime> = (
625 UnorderedHandle<FromTime, (D, FromTime, Diff)>,
626 ActivateCapability<FromTime>,
627 );
628 type ReclockedStream<D> = Receiver<Event<IntoTime, Vec<(D, IntoTime, Diff)>>>;
629
630 /// A helper function that sets up a dataflow program to test the reclocking operator. Each
631 /// test provides a test logic closure which accepts four arguments:
632 ///
633 /// * A reference to the worker that allows the test to step the computation
634 /// * A [`BindingHandle`] that allows the test to manipulate the remap bindings
635 /// * A [`DataHandle`] that allows the test to submit the data to be reclocked
636 /// * A [`ReclockedStream`] that allows observing the result of the reclocking process
637 ///
638 /// Note that the `DataHandle` contains a capability that should be dropped or downgraded before
639 /// calling [`step`] to process data at the time.
640 fn harness<FromTime, D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
641 where
642 FromTime: Timestamp + Refines<()>,
643 D: ExchangeData,
644 F: FnOnce(
645 &mut Worker,
646 BindingHandle<FromTime>,
647 DataHandle<D, FromTime>,
648 ReclockedStream<D>,
649 ) -> R
650 + Send
651 + Sync
652 + 'static,
653 R: Send + 'static,
654 {
655 timely::execute_directly(move |worker| {
656 let (bindings, data, data_cap, reclocked) = worker.dataflow::<(), _, _>(|scope| {
657 let (bindings, data_pusher, reclocked) =
658 scope.scoped::<IntoTime, _, _>("IntoScope", move |scope| {
659 let (binding_handle, binding_collection) = scope.new_collection();
660 let (data_pusher, reclocked_collection) =
661 reclock(binding_collection, as_of);
662 let reclocked_capture = reclocked_collection.inner.capture();
663 (binding_handle, data_pusher, reclocked_capture)
664 });
665
666 let (data, data_cap) = scope.scoped::<FromTime, _, _>("FromScope", move |scope| {
667 let ((handle, cap), data) = scope.new_unordered_input::<(D, FromTime, Diff)>();
668 data.capture_into(PusherCapture(data_pusher));
669 (handle, cap)
670 });
671
672 (bindings, data, data_cap, reclocked)
673 });
674
675 test_logic(worker, bindings, (data, data_cap), reclocked)
676 })
677 }
678
679 /// Steps the worker four times which is the required number of times for both data and
680 /// frontier updates to propagate across the two scopes and into the probing channels.
681 fn step(worker: &mut Worker) {
682 for _ in 0..4 {
683 worker.step();
684 }
685 }
686
687 #[mz_ore::test]
688 fn basic_reclocking() {
689 let as_of = Antichain::from_elem(IntoTime::minimum());
690 harness::<FromTime, _, _, _>(
691 as_of,
692 |worker, bindings, (mut data, data_cap), reclocked| {
693 // Reclock everything at the minimum IntoTime
694 bindings.close();
695 data.activate()
696 .session(&data_cap)
697 .give(('a', Partitioned::minimum(), Diff::ONE));
698 drop(data_cap);
699 step(worker);
700 let extracted = reclocked.extract();
701 let expected = vec![(0, vec![('a', 0, Diff::ONE)])];
702 assert_eq!(extracted, expected);
703 },
704 )
705 }
706
707 /// Generates a `Partitioned<u64, u64>` Antichain where all the provided
708 /// partitions are at the specified offset and the gaps in between are filled with range
709 /// timestamps at offset zero.
710 fn partitioned_frontier<I>(items: I) -> Antichain<Partitioned<u64, u64>>
711 where
712 I: IntoIterator<Item = (u64, u64)>,
713 {
714 let mut frontier = Antichain::new();
715 let mut prev = 0;
716 for (pid, offset) in items {
717 if prev < pid {
718 frontier.insert(Partitioned::new_range(prev, pid - 1, 0));
719 }
720 frontier.insert(Partitioned::new_singleton(pid, offset));
721 prev = pid + 1
722 }
723 frontier.insert(Partitioned::new_range(prev, u64::MAX, 0));
724 frontier
725 }
726
727 #[mz_ore::test]
728 fn test_basic_usage() {
729 let as_of = Antichain::from_elem(IntoTime::minimum());
730 harness(
731 as_of,
732 |worker, mut bindings, (mut data, data_cap), reclocked| {
733 // Reclock offsets 1 and 3 to timestamp 1000
734 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
735 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
736 for time in partitioned_frontier([(0, 4)]) {
737 bindings.update_at(time, 1000, Diff::ONE);
738 }
739 bindings.advance_to(1001);
740 bindings.flush();
741 data.activate().session(&data_cap).give_iterator(
742 vec![
743 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
744 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
745 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
746 ]
747 .into_iter(),
748 );
749
750 step(worker);
751 assert_eq!(
752 reclocked.try_recv(),
753 Ok(Event::Messages(
754 0u64,
755 vec![
756 (1, 1000, Diff::ONE),
757 (1, 1000, Diff::ONE),
758 (3, 1000, Diff::ONE)
759 ]
760 ))
761 );
762 assert_eq!(
763 reclocked.try_recv(),
764 Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
765 );
766
767 // Reclock more messages for offsets 3 to the same timestamp
768 data.activate().session(&data_cap).give_iterator(
769 vec![
770 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
771 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
772 ]
773 .into_iter(),
774 );
775 step(worker);
776 assert_eq!(
777 reclocked.try_recv(),
778 Ok(Event::Messages(
779 1000u64,
780 vec![(3, 1000, Diff::ONE), (3, 1000, Diff::ONE)]
781 ))
782 );
783
784 // Drop the capability which should advance the reclocked frontier to 1001.
785 drop(data_cap);
786 step(worker);
787 assert_eq!(
788 reclocked.try_recv(),
789 Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
790 );
791 },
792 );
793 }
794
795 #[mz_ore::test]
796 fn test_reclock_frontier() {
797 let as_of = Antichain::from_elem(IntoTime::minimum());
798 harness::<_, (), _, _>(
799 as_of,
800 |worker, mut bindings, (_data, data_cap), reclocked| {
801 // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
802 // frontier.
803 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
804 bindings.advance_to(1);
805 bindings.flush();
806 step(worker);
807 assert_eq!(
808 reclocked.try_recv(),
809 Ok(Event::Progress(vec![(0, -1), (1, 1)]))
810 );
811
812 // Mint a couple of bindings for multiple partitions
813 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
814 for time in partitioned_frontier([(1, 10)]) {
815 bindings.update_at(time.clone(), 1000, Diff::ONE);
816 bindings.update_at(time, 2000, Diff::MINUS_ONE);
817 }
818 for time in partitioned_frontier([(1, 10), (2, 10)]) {
819 bindings.update_at(time, 2000, Diff::ONE);
820 }
821 bindings.advance_to(2001);
822 bindings.flush();
823
824 // The initial frontier should now map to the minimum between the two partitions
825 step(worker);
826 step(worker);
827 assert_eq!(
828 reclocked.try_recv(),
829 Ok(Event::Progress(vec![(1, -1), (1000, 1)]))
830 );
831
832 // Downgrade data frontier such that only one of the partitions is advanced
833 let mut part1_cap = data_cap.delayed(&Partitioned::new_singleton(1, 9));
834 let mut part2_cap = data_cap.delayed(&Partitioned::new_singleton(2, 0));
835 let _rest_cap = data_cap.delayed(&Partitioned::new_range(3, u64::MAX, 0));
836 drop(data_cap);
837 step(worker);
838 assert_eq!(reclocked.try_recv(), Err(TryRecvError::Empty));
839
840 // Downgrade the data frontier past the first binding
841 part1_cap.downgrade(&Partitioned::new_singleton(1, 10));
842 step(worker);
843 assert_eq!(
844 reclocked.try_recv(),
845 Ok(Event::Progress(vec![(1000, -1), (2000, 1)]))
846 );
847
848 // Downgrade the data frontier past the second binding
849 part2_cap.downgrade(&Partitioned::new_singleton(2, 10));
850 step(worker);
851 assert_eq!(
852 reclocked.try_recv(),
853 Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
854 );
855
856 // Advance the binding frontier and confirm that we get to the next timestamp
857 bindings.advance_to(3001);
858 bindings.flush();
859 step(worker);
860 assert_eq!(
861 reclocked.try_recv(),
862 Ok(Event::Progress(vec![(2001, -1), (3001, 1)]))
863 );
864 },
865 );
866 }
867
868 #[mz_ore::test]
869 fn test_reclock() {
870 let as_of = Antichain::from_elem(IntoTime::minimum());
871 harness(
872 as_of,
873 |worker, mut bindings, (mut data, data_cap), reclocked| {
874 // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
875 // frontier.
876 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
877
878 // Setup more precise capabilities for the rest of the test
879 let mut part0_cap = data_cap.delayed(&Partitioned::new_singleton(0, 0));
880 let rest_cap = data_cap.delayed(&Partitioned::new_range(1, u64::MAX, 0));
881 drop(data_cap);
882
883 // Reclock offsets 1 and 2 to timestamp 1000
884 data.activate().session(&part0_cap).give_iterator(
885 vec![
886 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
887 (2, Partitioned::new_singleton(0, 2), Diff::ONE),
888 ]
889 .into_iter(),
890 );
891
892 part0_cap.downgrade(&Partitioned::new_singleton(0, 3));
893 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
894 bindings.update_at(part0_cap.time().clone(), 1000, Diff::ONE);
895 bindings.update_at(rest_cap.time().clone(), 1000, Diff::ONE);
896 bindings.advance_to(1001);
897 bindings.flush();
898 step(worker);
899 assert_eq!(
900 reclocked.try_recv(),
901 Ok(Event::Messages(
902 0,
903 vec![(1, 1000, Diff::ONE), (2, 1000, Diff::ONE)]
904 ))
905 );
906 assert_eq!(
907 reclocked.try_recv(),
908 Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
909 );
910 assert_eq!(
911 reclocked.try_recv(),
912 Ok(Event::Progress(vec![(1000, -1), (1001, 1)]))
913 );
914
915 // Reclock offsets 3 and 4 to timestamp 2000
916 data.activate().session(&part0_cap).give_iterator(
917 vec![
918 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
919 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
920 (4, Partitioned::new_singleton(0, 4), Diff::ONE),
921 ]
922 .into_iter(),
923 );
924 bindings.update_at(part0_cap.time().clone(), 2000, Diff::MINUS_ONE);
925 part0_cap.downgrade(&Partitioned::new_singleton(0, 5));
926 bindings.update_at(part0_cap.time().clone(), 2000, Diff::ONE);
927 bindings.advance_to(2001);
928 bindings.flush();
929 step(worker);
930 assert_eq!(
931 reclocked.try_recv(),
932 Ok(Event::Messages(
933 1001,
934 vec![
935 (3, 2000, Diff::ONE),
936 (3, 2000, Diff::ONE),
937 (4, 2000, Diff::ONE)
938 ]
939 ))
940 );
941 assert_eq!(
942 reclocked.try_recv(),
943 Ok(Event::Progress(vec![(1001, -1), (2000, 1)]))
944 );
945 assert_eq!(
946 reclocked.try_recv(),
947 Ok(Event::Progress(vec![(2000, -1), (2001, 1)]))
948 );
949 },
950 );
951 }
952
953 #[mz_ore::test]
954 fn test_reclock_gh16318() {
955 let as_of = Antichain::from_elem(IntoTime::minimum());
956 harness(
957 as_of,
958 |worker, mut bindings, (mut data, data_cap), reclocked| {
959 // Initialize the bindings such that the minimum IntoTime contains the minimum FromTime
960 // frontier.
961 bindings.update_at(Partitioned::minimum(), 0, Diff::ONE);
962 // First mint bindings for 0 at timestamp 1000
963 bindings.update_at(Partitioned::minimum(), 1000, Diff::MINUS_ONE);
964 for time in partitioned_frontier([(0, 50)]) {
965 bindings.update_at(time, 1000, Diff::ONE);
966 }
967 // Then only for 1 at timestamp 2000
968 for time in partitioned_frontier([(0, 50)]) {
969 bindings.update_at(time, 2000, Diff::MINUS_ONE);
970 }
971 for time in partitioned_frontier([(0, 50), (1, 50)]) {
972 bindings.update_at(time, 2000, Diff::ONE);
973 }
974 // Then again only for 0 at timestamp 3000
975 for time in partitioned_frontier([(0, 50), (1, 50)]) {
976 bindings.update_at(time, 3000, Diff::MINUS_ONE);
977 }
978 for time in partitioned_frontier([(0, 100), (1, 50)]) {
979 bindings.update_at(time, 3000, Diff::ONE);
980 }
981 bindings.advance_to(3001);
982 bindings.flush();
983
984 // Reclockng (0, 50) must ignore the updates on the FromTime frontier that happened at
985 // timestamp 2000 since those are completely unrelated
986 data.activate().session(&data_cap).give((
987 50,
988 Partitioned::new_singleton(0, 50),
989 Diff::ONE,
990 ));
991 drop(data_cap);
992 step(worker);
993 assert_eq!(
994 reclocked.try_recv(),
995 Ok(Event::Messages(0, vec![(50, 3000, Diff::ONE),]))
996 );
997 assert_eq!(
998 reclocked.try_recv(),
999 Ok(Event::Progress(vec![(0, -1), (1000, 1)]))
1000 );
1001 assert_eq!(
1002 reclocked.try_recv(),
1003 Ok(Event::Progress(vec![(1000, -1), (3001, 1)]))
1004 );
1005 },
1006 );
1007 }
1008
1009 /// Test that compact(reclock(remap, source)) == reclock(compact(remap), source)
1010 #[mz_ore::test]
1011 fn test_compaction() {
1012 let mut remap = vec![];
1013 remap.push((Partitioned::minimum(), 0, Diff::ONE));
1014 // Reclock offsets 1 and 2 to timestamp 1000
1015 remap.push((Partitioned::minimum(), 1000, Diff::MINUS_ONE));
1016 for time in partitioned_frontier([(0, 3)]) {
1017 remap.push((time, 1000, Diff::ONE));
1018 }
1019 // Reclock offsets 3 and 4 to timestamp 2000
1020 for time in partitioned_frontier([(0, 3)]) {
1021 remap.push((time, 2000, Diff::MINUS_ONE));
1022 }
1023 for time in partitioned_frontier([(0, 5)]) {
1024 remap.push((time, 2000, Diff::ONE));
1025 }
1026
1027 let source_updates = vec![
1028 (1, Partitioned::new_singleton(0, 1), Diff::ONE),
1029 (2, Partitioned::new_singleton(0, 2), Diff::ONE),
1030 (3, Partitioned::new_singleton(0, 3), Diff::ONE),
1031 (4, Partitioned::new_singleton(0, 4), Diff::ONE),
1032 ];
1033
1034 let since = Antichain::from_elem(1500);
1035
1036 // Compute reclock(remap, source)
1037 let as_of = Antichain::from_elem(IntoTime::minimum());
1038 let remap1 = remap.clone();
1039 let source_updates1 = source_updates.clone();
1040 let reclock_remap = harness(
1041 as_of,
1042 move |worker, mut bindings, (mut data, data_cap), reclocked| {
1043 for (from_ts, into_ts, diff) in remap1 {
1044 bindings.update_at(from_ts, into_ts, diff);
1045 }
1046 bindings.close();
1047 data.activate()
1048 .session(&data_cap)
1049 .give_iterator(source_updates1.iter().cloned());
1050 drop(data_cap);
1051 step(worker);
1052 reclocked.extract()
1053 },
1054 );
1055 // Compute compact(reclock(remap, source))
1056 let mut compact_reclock_remap = reclock_remap;
1057 for (t, updates) in compact_reclock_remap.iter_mut() {
1058 t.advance_by(since.borrow());
1059 for (_, t, _) in updates.iter_mut() {
1060 t.advance_by(since.borrow());
1061 }
1062 }
1063
1064 // Compute compact(remap)
1065 let mut compact_remap = remap;
1066 for (_, t, _) in compact_remap.iter_mut() {
1067 t.advance_by(since.borrow());
1068 }
1069 consolidation::consolidate_updates(&mut compact_remap);
1070 // Compute reclock(compact(remap), source)
1071 let reclock_compact_remap = harness(
1072 since,
1073 move |worker, mut bindings, (mut data, data_cap), reclocked| {
1074 for (from_ts, into_ts, diff) in compact_remap {
1075 bindings.update_at(from_ts, into_ts, diff);
1076 }
1077 bindings.close();
1078 data.activate()
1079 .session(&data_cap)
1080 .give_iterator(source_updates.iter().cloned());
1081 drop(data_cap);
1082 step(worker);
1083 reclocked.extract()
1084 },
1085 );
1086
1087 let expected = vec![(
1088 1500,
1089 vec![
1090 (1, 1500, Diff::ONE),
1091 (2, 1500, Diff::ONE),
1092 (3, 2000, Diff::ONE),
1093 (4, 2000, Diff::ONE),
1094 ],
1095 )];
1096 assert_eq!(expected, reclock_compact_remap);
1097 assert_eq!(expected, compact_reclock_remap);
1098 }
1099
1100 #[mz_ore::test]
1101 fn test_chainbatch_merge() {
1102 let a = ChainBatch::from_iter([('a', 0, 1)]);
1103 let b = ChainBatch::from_iter([('a', 0, -1), ('a', 1, 1)]);
1104 assert_eq!(a.merge_with(b), ChainBatch::from_iter([('a', 1, 1)]));
1105 }
1106
1107 #[mz_ore::test]
1108 #[cfg_attr(miri, ignore)] // too slow
1109 fn test_binding_consolidation() {
1110 use std::sync::atomic::Ordering;
1111
1112 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1113 struct Time(u64);
1114
1115 // A counter of the number of active Time instances
1116 static INSTANCES: AtomicUsize = AtomicUsize::new(0);
1117
1118 impl Time {
1119 fn new(time: u64) -> Self {
1120 INSTANCES.fetch_add(1, Ordering::Relaxed);
1121 Self(time)
1122 }
1123 }
1124
1125 impl Clone for Time {
1126 fn clone(&self) -> Self {
1127 INSTANCES.fetch_add(1, Ordering::Relaxed);
1128 Self(self.0)
1129 }
1130 }
1131
1132 impl Drop for Time {
1133 fn drop(&mut self) {
1134 INSTANCES.fetch_sub(1, Ordering::Relaxed);
1135 }
1136 }
1137
1138 impl Timestamp for Time {
1139 type Summary = ();
1140
1141 fn minimum() -> Self {
1142 Time::new(0)
1143 }
1144 }
1145
1146 impl PathSummary<Time> for () {
1147 fn results_in(&self, src: &Time) -> Option<Time> {
1148 Some(src.clone())
1149 }
1150
1151 fn followed_by(&self, _other: &()) -> Option<Self> {
1152 Some(())
1153 }
1154 }
1155
1156 impl Refines<()> for Time {
1157 fn to_inner(_: ()) -> Self {
1158 Self::minimum()
1159 }
1160 fn to_outer(self) -> () {}
1161 fn summarize(_path: ()) {}
1162 }
1163
1164 impl PartialOrder for Time {
1165 fn less_equal(&self, other: &Self) -> bool {
1166 self.0.less_equal(&other.0)
1167 }
1168 }
1169
1170 let as_of = 1000;
1171
1172 // Test that supplying a single big batch of unconsolidated bindings gets
1173 // consolidated after a single worker step.
1174 harness::<Time, u64, _, _>(
1175 Antichain::from_elem(as_of),
1176 move |worker, mut bindings, _, _| {
1177 step(worker);
1178 let instances_before = INSTANCES.load(Ordering::Relaxed);
1179 for ts in 0..as_of {
1180 if ts > 0 {
1181 bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1182 }
1183 bindings.update_at(Time::new(ts), ts, Diff::ONE);
1184 }
1185 bindings.advance_to(as_of);
1186 bindings.flush();
1187 step(worker);
1188 let instances_after = INSTANCES.load(Ordering::Relaxed);
1189 // The extra instances live in a ChangeBatch which considers compaction when more
1190 // than 32 elements are inside.
1191 assert!(instances_after - instances_before < 32);
1192 },
1193 );
1194
1195 // Test that a slow feed of uncompacted bindings over multiple steps never leads to an
1196 // excessive number of bindings held in memory.
1197 harness::<Time, u64, _, _>(
1198 Antichain::from_elem(as_of),
1199 move |worker, mut bindings, _, _| {
1200 step(worker);
1201 let instances_before = INSTANCES.load(Ordering::Relaxed);
1202 for ts in 0..as_of {
1203 if ts > 0 {
1204 bindings.update_at(Time::new(ts - 1), ts, Diff::MINUS_ONE);
1205 }
1206 bindings.update_at(Time::new(ts), ts, Diff::ONE);
1207 bindings.advance_to(ts + 1);
1208 bindings.flush();
1209 step(worker);
1210 let instances_now = INSTANCES.load(Ordering::Relaxed);
1211 // The extra instances live in a ChangeBatch which considers compaction when
1212 // more than 32 elements are inside.
1213 assert!(instances_now - instances_before < 32);
1214 }
1215 },
1216 );
1217 }
1218
1219 #[cfg(feature = "count-allocations")]
1220 #[mz_ore::test]
1221 #[cfg_attr(miri, ignore)] // too slow
1222 fn test_shrinking() {
1223 let as_of = 1000_u64;
1224
1225 // This workflow accumulates updates in remap_trace, advances the source frontier,
1226 // and validates that memory was reclaimed. To avoid errant test failures due to
1227 // optimizations, this only validates that memory is reclaimed, not how much.
1228 harness::<FromTime, u64, _, _>(
1229 Antichain::from_elem(0),
1230 move |worker, mut bindings, (_data, mut data_cap), _| {
1231 let info1 = allocation_counter::measure(|| {
1232 step(worker);
1233 for ts in 0..as_of {
1234 if ts > 0 {
1235 bindings.update_at(
1236 Partitioned::new_singleton(0, ts - 1),
1237 ts,
1238 Diff::MINUS_ONE,
1239 );
1240 }
1241 bindings.update_at(Partitioned::new_singleton(0, ts), ts, Diff::ONE);
1242 bindings.advance_to(ts + 1);
1243 bindings.flush();
1244 step(worker);
1245 }
1246 });
1247 println!("info = {info1:?}");
1248
1249 let info2 = allocation_counter::measure(|| {
1250 data_cap.downgrade(&Partitioned::new_singleton(0, as_of));
1251 step(worker);
1252 });
1253 println!("info = {info2:?}");
1254 assert!(info2.bytes_current < 0);
1255 },
1256 );
1257 }
1258}