mz_compute/render/continual_task.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A continual task presents as something like a `TRIGGER`: it watches some
11//! _input_ and whenever it changes at time `T`, executes a SQL txn, writing to
12//! some _output_ at the same time `T`. It can also read anything in materialize
13//! as a _reference_, most notably including the output.
14//!
15//! Only reacting to new inputs (and not the full history) makes a CT's
16//! rehydration time independent of the size of the inputs (NB this is not true
17//! for references), enabling things like writing UPSERT on top of an
18//! append-only shard in SQL (ignore the obvious bug with my upsert impl):
19//!
20//! ```sql
21//! CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
22//! DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
23//! INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
24//! )
25//! ```
26//!
27//! Unlike a materialized view, the continual task does not update outputs if
28//! references later change. This enables things like auditing:
29//!
30//! ```sql
31//! CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS (
32//! INSERT INTO audit_log SELECT * FROM anomalies;
33//! )
34//! ```
35//!
36//! Rough implementation overview:
37//! - A CT is created and starts at some `start_ts` optionally later dropped and
38//! stopped at some `end_ts`.
39//! - A CT takes one or more _input_s. These must be persist shards (i.e. TABLE,
40//! SOURCE, MV, but not VIEW).
41//! - A CT has one or more _output_s. The outputs are (initially) owned by the
42//! task and cannot be written to by other parts of the system.
43//! - The task is run for each time one of the inputs changes starting at
44//! `start_ts`.
45//! - It is given the changes in its inputs at time `T` as diffs.
46//! - These are presented as two SQL relations with just the inserts/deletes.
47//! - NB: A full collection for the input can always be recovered by also
48//! using the input as a "reference" (see below) and applying the diffs.
49//! - The task logic is expressed as a SQL transaction that does all reads at
50//! commits all writes at `T`
51//! - The notable exception to this is self-referential reads of the CT
52//! output. See below for how that works.
53//! - This logic can _reference_ any nameable object in the system, not just the
54//! inputs.
55//! - However, the logic/transaction can mutate only the outputs.
56//! - Summary of differences between inputs and references:
57//! - The task receives snapshot + changes for references (like regular
58//! dataflow inputs today) but only changes for inputs.
59//! - The task only produces output in response to changes in the inputs but
60//! not in response to changes in the references.
61//! - Instead of re-evaluating the task logic from scratch for each input time,
62//! we maintain the collection representing desired writes to the output(s) as
63//! a dataflow.
64//! - The task dataflow is tied to a `CLUSTER` and runs on each `REPLICA`.
65//! - HA strategy: multi-replica clusters race to commit and the losers throw
66//! away the result.
67//!
68//! ## Self-References
69//!
70//! Self-references must be handled differently from other reads. When computing
71//! the proposed write to some output at `T`, we can only know the contents of
72//! it through `T-1` (the exclusive upper is `T`).
73//!
74//! We address this by initially assuming that the output contains no changes at
75//! `T`, then evaluating each of the statements in order, allowing them to see
76//! the proposed output changes made by the previous statements. By default,
77//! this is stopped after one iteration and proposed output diffs are committed
78//! if possible. (We could also add options for iterating to a fixpoint,
79//! stop/error after N iters, etc.) Then to compute the changes at `T+1`, we
80//! read in what was actually written to the output at `T` (maybe some other
81//! replica wrote something different) and begin again.
82//!
83//! The above is very similar to how timely/differential dataflow iteration
84//! works, except that our feedback loop goes through persist and the loop
85//! timestamp is already `mz_repr::Timestamp`.
86//!
87//! This is implemented as follows:
88//! - `let I = persist_source(self-reference)`
89//! - Transform `I` such that the contents at `T-1` are presented at `T` (i.e.
90//! initially assume `T` is unchanged from `T-1`).
91//! - TODO(ct3): Actually implement the following.
92//! - In an iteration sub-scope:
93//! - Bring `I` into the sub-scope and `let proposed = Variable`.
94//! - We need a collection that at `(T, 0)` is always the contents of `I` at
95//! `T`, but at `(T, 1...)` contains the proposed diffs by the CT logic. We
96//! can construct it by concatenating `I` with `proposed` except that we
97//! also need to retract everything in `proposed` for the next `(T+1, 0)`
98//! (because `I` is the source of truth for what actually committed).
99//! - `let R = retract_at_next_outer_ts(proposed)`
100//! - `let result = logic(concat(I, proposed, R))`
101//! - `proposed.set(result)`
102//! - Then we return `proposed.leave()` for attempted write to persist.
103//!
104//! ## As Ofs and Output Uppers
105//!
106//! - A continual task is first created with an initial as_of `I`. It is
107//! initially rendered at as_of `I==A` but as it makes progress, it may be
108//! rendered at later as_ofs `I<A`.
109//! - It is required that the output collection springs into existence at `I`
110//! (i.e. receives the initial contents at `I`).
111//! - For a snapshot CT, the full contents of the input at `I` are run through
112//! the CT logic and written at `I`.
113//! - For a non-snapshot CT, the collection is defined to be empty at `I`
114//! (i.e. if the input happened to be written exactly at `I`, we'd ignore
115//! it) and then start writing at `I+1`.
116//! - As documented in [DataflowDescription::as_of], `A` is the time we render
117//! the inputs.
118//! - An MV with an as_of of `A` will both have inputs rendered at `A` and
119//! also the first time it could write is also `A`.
120//! - A CT is the same on the initial render (`I==A`), but on renders after it
121//! has made progress (`I<A`) the first time that it could potentially
122//! write is `A+1`. This is because a persist_source started with
123//! SnapshotMode::Exclude can only start emitting diffs at `as_of+1`.
124//! - As a result, we hold back the since on inputs to be strictly less than
125//! the upper of the output. (This is only necessary for CTs, but we also do
126//! it for MVs to avoid the special case.)
127//! - For CT "inputs" (which are disallowed from being the output), we render
128//! the persist_source with as_of `A`.
129//! - When `I==A` we include the snapshot iff the snapshot option is used.
130//! - When `I<A` we always exclude the snapshot. It would be unnecessary and
131//! this is an absolutely critical performance optimization to make CT
132//! rehydration times independent of input size.
133//! - For CT "references", we render the persist_source with as_of `A` and
134//! always include the snapshot.
135//! - There is one subtlety: self-references on the initial render. We need
136//! the contents to be available at `A-1`, so that we can do the
137//! step_forward described above to get it at `A`. However, the collection
138//! springs into existence at `I`, so we when `I==A`, we're not allowed to
139//! read it as_of `A-1` (the since of the shard may have advanced past
140//! that). We address this by rendering the persist_source as normal at
141//! `A`. On startup, persist_source immediately downgrades its frontier to
142//! `A` (making `A-1` readable). Combined with step_forward, this is
143//! enough to unblock the CT self-reference. We do however have to tweak
144//! the `suppress_early_progress` operator to use `A-1` instead of `A` for
145//! this case.
146//! - On subsequent renders, self-references work as normal.
147
148use std::any::Any;
149use std::cell::RefCell;
150use std::collections::BTreeSet;
151use std::rc::Rc;
152use std::sync::Arc;
153
154use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
155use differential_dataflow::difference::Semigroup;
156use differential_dataflow::lattice::Lattice;
157use differential_dataflow::{AsCollection, Hashable, VecCollection};
158use futures::{Future, FutureExt, StreamExt};
159use mz_compute_types::dataflows::DataflowDescription;
160use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection};
161use mz_ore::cast::CastFrom;
162use mz_ore::collections::HashMap;
163use mz_persist_client::Diagnostics;
164use mz_persist_client::error::UpperMismatch;
165use mz_persist_client::operators::shard_source::SnapshotMode;
166use mz_persist_client::write::WriteHandle;
167use mz_persist_types::codec_impls::UnitSchema;
168use mz_repr::{Diff, GlobalId, Row, Timestamp};
169use mz_storage_types::StorageDiff;
170use mz_storage_types::controller::CollectionMetadata;
171use mz_storage_types::errors::DataflowError;
172use mz_storage_types::sources::SourceData;
173use mz_timely_util::builder_async::{Button, Event, OperatorBuilder as AsyncOperatorBuilder};
174use mz_timely_util::operator::CollectionExt;
175use mz_timely_util::probe;
176use mz_timely_util::probe::ProbeNotify;
177use timely::PartialOrder;
178use timely::dataflow::channels::pact::{Exchange, Pipeline};
179use timely::dataflow::operators::generic::OutputBuilder;
180use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
181use timely::dataflow::operators::vec::{Filter, Map};
182use timely::dataflow::operators::{FrontierNotificator, Operator};
183use timely::dataflow::{ProbeHandle, Scope};
184use timely::progress::frontier::AntichainRef;
185use timely::progress::operate::FrontierInterest;
186use timely::progress::{Antichain, Timestamp as _};
187use tracing::debug;
188
189use crate::compute_state::ComputeState;
190use crate::render::StartSignal;
191use crate::render::sinks::SinkRender;
192use crate::sink::ConsolidatingVec;
193
194pub(crate) struct ContinualTaskCtx<'scope> {
195 name: Option<String>,
196 dataflow_as_of: Option<Antichain<Timestamp>>,
197 inputs_with_snapshot: Option<bool>,
198 ct_inputs: BTreeSet<GlobalId>,
199 ct_outputs: BTreeSet<GlobalId>,
200 pub ct_times: Vec<VecCollection<'scope, Timestamp, (), Diff>>,
201}
202
203/// An encapsulation of the transformation logic necessary on data coming into a
204/// continual task.
205///
206/// NB: In continual task jargon, an "input" contains diffs and a "reference" is
207/// a normal source/collection.
208pub(crate) enum ContinualTaskSourceTransformer {
209 /// A collection containing, at each time T, exactly the inserts at time T
210 /// in the transformed collection.
211 ///
212 /// For example:
213 /// - Input: {} at 0, {1} at 1, {1} at 2, ...
214 /// - Output: {} at 0, {1} at 1, {} at 2, ...
215 ///
216 /// We'll presumably have the same for deletes eventually, but it's not
217 /// exposed in the SQL frontend yet.
218 InsertsInput {
219 source_id: GlobalId,
220 with_snapshot: bool,
221 },
222 /// A self-reference to the continual task's output. This is essentially a
223 /// timely feedback loop via the persist shard. See module rustdoc for how
224 /// this works.
225 SelfReference { source_id: GlobalId },
226 /// A normal collection (no-op transformation).
227 NormalReference,
228}
229
230impl ContinualTaskSourceTransformer {
231 /// The persist_source `SnapshotMode` to use when reading this source.
232 pub fn snapshot_mode(&self) -> SnapshotMode {
233 use ContinualTaskSourceTransformer::*;
234 match self {
235 InsertsInput {
236 with_snapshot: false,
237 ..
238 } => SnapshotMode::Exclude,
239 InsertsInput {
240 with_snapshot: true,
241 ..
242 }
243 | SelfReference { .. }
244 | NormalReference => SnapshotMode::Include,
245 }
246 }
247
248 /// Returns the as_of to use with the suppress_early_progress operator for
249 /// this source. See the module rustdoc for context.
250 pub fn suppress_early_progress_as_of(
251 &self,
252 as_of: Antichain<Timestamp>,
253 ) -> Antichain<Timestamp> {
254 use ContinualTaskSourceTransformer::*;
255 match self {
256 InsertsInput { .. } => as_of,
257 SelfReference { .. } => as_of
258 .iter()
259 .map(|x| x.step_back().unwrap_or_else(Timestamp::minimum))
260 .collect(),
261 NormalReference => as_of,
262 }
263 }
264
265 /// Performs the necessary transformation on the source collection.
266 ///
267 /// Returns the transformed "oks" and "errs" collections. Also returns the
268 /// appropriate `ct_times` collection used to inform the sink which times
269 /// were changed in the inputs.
270 pub fn transform<'s>(
271 &self,
272 oks: VecCollection<'s, Timestamp, Row, Diff>,
273 errs: VecCollection<'s, Timestamp, DataflowError, Diff>,
274 ) -> (
275 VecCollection<'s, Timestamp, Row, Diff>,
276 VecCollection<'s, Timestamp, DataflowError, Diff>,
277 VecCollection<'s, Timestamp, (), Diff>,
278 ) {
279 use ContinualTaskSourceTransformer::*;
280 match self {
281 // Make a collection s.t, for each time T in the input, the output
282 // contains the inserts at T.
283 InsertsInput { source_id, .. } => {
284 let name = source_id.to_string();
285 // Keep only the inserts.
286 let oks = oks.inner.filter(|(_, _, diff)| diff.is_positive());
287 // Grab the original times for use in the sink operator.
288 let (oks, times) = oks.as_collection().times_extract(&name);
289 // Then retract everything at the next timestamp.
290 let oks = oks.inner.flat_map(|(row, ts, diff)| {
291 let retract_ts = ts.step_forward();
292 let negation = -diff;
293 [(row.clone(), ts, diff), (row, retract_ts, negation)]
294 });
295 (oks.as_collection(), errs, times)
296 }
297 NormalReference => {
298 let times = VecCollection::empty(oks.scope());
299 (oks, errs, times)
300 }
301 // When computing an self-referential output at `T`, start by
302 // assuming there are no changes from the contents at `T-1`. See the
303 // module rustdoc for how this fits into the larger picture.
304 SelfReference { source_id } => {
305 let name = source_id.to_string();
306 let times = VecCollection::empty(oks.scope());
307 // step_forward will panic at runtime if it receives a data or
308 // capability with a time that cannot be stepped forward (i.e.
309 // because it is already the max). We're safe here because this
310 // is stepping `T-1` forward to `T`.
311 let oks = oks.step_forward(&name);
312 let errs = errs.step_forward(&name);
313 (oks, errs, times)
314 }
315 }
316 }
317}
318
319impl<'scope> ContinualTaskCtx<'scope> {
320 pub fn new<P, S>(dataflow: &DataflowDescription<P, S>) -> Self {
321 let mut name = None;
322 let mut ct_inputs = BTreeSet::new();
323 let mut ct_outputs = BTreeSet::new();
324 let mut inputs_with_snapshot = None;
325 for (sink_id, sink) in &dataflow.sink_exports {
326 match &sink.connection {
327 ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
328 input_id, ..
329 }) => {
330 ct_outputs.insert(*sink_id);
331 ct_inputs.insert(*input_id);
332 // There's only one CT sink per dataflow at this point.
333 assert_eq!(name, None);
334 name = Some(sink_id.to_string());
335 assert_eq!(inputs_with_snapshot, None);
336 match (
337 sink.with_snapshot,
338 dataflow.as_of.as_ref(),
339 dataflow.initial_storage_as_of.as_ref(),
340 ) {
341 // User specified no snapshot when creating the CT.
342 (false, _, _) => inputs_with_snapshot = Some(false),
343 // User specified a snapshot but we're past the initial
344 // as_of.
345 (true, Some(as_of), Some(initial_as_of))
346 if PartialOrder::less_than(initial_as_of, as_of) =>
347 {
348 inputs_with_snapshot = Some(false)
349 }
350 // User specified a snapshot and we're either at the
351 // initial creation, or we don't know (builtin CTs). If
352 // we don't know, it's always safe to fall back to
353 // snapshotting, at worst it's wasted work and will get
354 // filtered.
355 (true, _, _) => inputs_with_snapshot = Some(true),
356 }
357 }
358 _ => continue,
359 }
360 }
361 let mut ret = ContinualTaskCtx {
362 name,
363 dataflow_as_of: None,
364 inputs_with_snapshot,
365 ct_inputs,
366 ct_outputs,
367 ct_times: Vec::new(),
368 };
369 // Only clone the as_of if we're in a CT dataflow.
370 if ret.is_ct_dataflow() {
371 ret.dataflow_as_of = dataflow.as_of.clone();
372 // Sanity check that we have a name if we're in a CT dataflow.
373 assert!(ret.name.is_some());
374 }
375 ret
376 }
377
378 pub fn is_ct_dataflow(&self) -> bool {
379 // Inputs are non-empty iff outputs are non-empty.
380 assert_eq!(self.ct_inputs.is_empty(), self.ct_outputs.is_empty());
381 !self.ct_outputs.is_empty()
382 }
383
384 pub fn get_ct_source_transformer(
385 &self,
386 source_id: GlobalId,
387 ) -> Option<ContinualTaskSourceTransformer> {
388 let Some(inputs_with_snapshot) = self.inputs_with_snapshot else {
389 return None;
390 };
391 let transformer = match (
392 self.ct_inputs.contains(&source_id),
393 self.ct_outputs.contains(&source_id),
394 ) {
395 (false, false) => ContinualTaskSourceTransformer::NormalReference,
396 (false, true) => ContinualTaskSourceTransformer::SelfReference { source_id },
397 (true, false) => ContinualTaskSourceTransformer::InsertsInput {
398 source_id,
399 with_snapshot: inputs_with_snapshot,
400 },
401 (true, true) => panic!("ct output is not allowed to be an input"),
402 };
403 Some(transformer)
404 }
405
406 pub fn input_times(
407 &self,
408 scope: Scope<'scope, Timestamp>,
409 ) -> Option<VecCollection<'scope, Timestamp, (), Diff>> {
410 // We have a name iff this is a CT dataflow.
411 assert_eq!(self.is_ct_dataflow(), self.name.is_some());
412 let Some(name) = self.name.as_ref() else {
413 return None;
414 };
415 // Note that self.ct_times might be empty (if the user didn't reference
416 // the input), but this still does the correct, though maybe useless,
417 // thing: no diffs coming into the input means no times to write at.
418 let ct_times =
419 differential_dataflow::collection::concatenate(scope, self.ct_times.iter().cloned());
420 // Reduce this down to one update per-time-per-worker before exchanging
421 // it, so we don't waste work on unnecessarily high data volumes.
422 let ct_times = ct_times.times_reduce(name);
423 Some(ct_times)
424 }
425}
426
427impl<'scope> SinkRender<'scope> for ContinualTaskConnection<CollectionMetadata> {
428 fn render_sink(
429 &self,
430 compute_state: &mut ComputeState,
431 _sink: &ComputeSinkDesc<CollectionMetadata>,
432 sink_id: GlobalId,
433 as_of: Antichain<Timestamp>,
434 start_signal: StartSignal,
435 oks: VecCollection<'scope, Timestamp, Row, Diff>,
436 errs: VecCollection<'scope, Timestamp, DataflowError, Diff>,
437 append_times: Option<VecCollection<'scope, Timestamp, (), Diff>>,
438 flow_control_probe: &probe::Handle<Timestamp>,
439 ) -> Option<Rc<dyn Any>> {
440 let name = sink_id.to_string();
441
442 let to_append = oks
443 .map(|x| SourceData(Ok(x)))
444 .concat(errs.map(|x| SourceData(Err(x))));
445 let append_times = append_times.expect("should be provided by ContinualTaskCtx");
446
447 let write_handle = {
448 let clients = Arc::clone(&compute_state.persist_clients);
449 let metadata = self.storage_metadata.clone();
450 let handle_purpose = format!("ct_sink({})", name);
451 async move {
452 let client = clients
453 .open(metadata.persist_location)
454 .await
455 .expect("valid location");
456 client
457 .open_writer(
458 metadata.data_shard,
459 metadata.relation_desc.into(),
460 UnitSchema.into(),
461 Diagnostics {
462 shard_name: sink_id.to_string(),
463 handle_purpose,
464 },
465 )
466 .await
467 .expect("codecs should match")
468 }
469 };
470
471 let collection = compute_state.expect_collection_mut(sink_id);
472 let probe = ProbeHandle::default();
473 let to_append = to_append
474 .probe_with(&probe)
475 .inner
476 .probe_notify_with(vec![flow_control_probe.clone()])
477 .as_collection();
478 collection.compute_probe = Some(probe);
479 let sink_write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
480 collection.sink_write_frontier = Some(Rc::clone(&sink_write_frontier));
481
482 // TODO(ct1): Obey `compute_state.read_only_rx`
483 //
484 // Seemingly, the read-only env needs to tail the output shard and keep
485 // historical updates around until it sees that the output frontier
486 // advances beyond their times.
487 let sink_button = continual_task_sink(
488 &name,
489 to_append,
490 append_times,
491 as_of,
492 write_handle,
493 start_signal,
494 sink_write_frontier,
495 );
496 Some(Rc::new(sink_button.press_on_drop()))
497 }
498}
499
500fn continual_task_sink<'scope>(
501 name: &str,
502 to_append: VecCollection<'scope, Timestamp, SourceData, Diff>,
503 append_times: VecCollection<'scope, Timestamp, (), Diff>,
504 as_of: Antichain<Timestamp>,
505 write_handle: impl Future<Output = WriteHandle<SourceData, (), Timestamp, StorageDiff>>
506 + Send
507 + 'static,
508 start_signal: StartSignal,
509 output_frontier: Rc<RefCell<Antichain<Timestamp>>>,
510) -> Button {
511 let scope = to_append.scope();
512 let mut op = AsyncOperatorBuilder::new(format!("ct_sink({})", name), scope.clone());
513
514 // TODO(ct2): This all works perfectly well data parallel (assuming we
515 // broadcast the append_times). We just need to hook it up to the
516 // multi-worker persist-sink, but that requires some refactoring. This would
517 // also remove the need for this to be an async timely operator.
518 let active_worker = name.hashed();
519 let to_append_input =
520 op.new_input_for_many(to_append.inner, Exchange::new(move |_| active_worker), []);
521 let append_times_input = op.new_input_for_many(
522 append_times.inner,
523 Exchange::new(move |_| active_worker),
524 [],
525 );
526
527 let active_worker = usize::cast_from(active_worker) % scope.peers() == scope.index();
528 let button = op.build(move |_capabilities| async move {
529 if !active_worker {
530 output_frontier.borrow_mut().clear();
531 return;
532 }
533
534 // SUBTLE: The start_signal below may not be unblocked by the compute
535 // controller until it thinks the inputs are "ready" (i.e. readable at
536 // the as_of), but if the CT is self-referential, one of the inputs will
537 // be the output (which starts at `T::minimum()`, not the as_of). To
538 // break this cycle, before we even get the start signal, go ahead and
539 // advance the output's (exclusive) upper to the first time that this CT
540 // might write: `as_of+1`. Because we don't want this to happen on
541 // restarts, only do it if the upper is `T::minimum()`.
542 let mut write_handle = write_handle.await;
543 {
544 let res = write_handle
545 .compare_and_append_batch(
546 &mut [],
547 Antichain::from_elem(Timestamp::minimum()),
548 as_of.clone(),
549 true,
550 )
551 .await
552 .expect("usage was valid");
553 match res {
554 // We advanced the upper.
555 Ok(()) => {}
556 // Someone else advanced the upper.
557 Err(UpperMismatch { .. }) => {}
558 }
559 }
560
561 let () = start_signal.await;
562
563 #[derive(Debug)]
564 enum OpEvent<C> {
565 ToAppend(Event<Timestamp, C, Vec<(SourceData, Timestamp, Diff)>>),
566 AppendTimes(Event<Timestamp, C, Vec<((), Timestamp, Diff)>>),
567 }
568
569 impl<C: std::fmt::Debug> OpEvent<C> {
570 fn apply(self, state: &mut SinkState<SourceData, Timestamp>) {
571 debug!("ct_sink event {:?}", self);
572 match self {
573 OpEvent::ToAppend(Event::Data(_cap, x)) => {
574 for (k, t, d) in x {
575 state.to_append.push(((k, t), d));
576 }
577 }
578 OpEvent::ToAppend(Event::Progress(x)) => state.to_append_progress = x,
579 OpEvent::AppendTimes(Event::Data(_cap, x)) => state
580 .append_times
581 .extend(x.into_iter().map(|((), t, _d)| t)),
582 OpEvent::AppendTimes(Event::Progress(x)) => state.append_times_progress = x,
583 }
584 }
585 }
586
587 let to_insert_input = to_append_input.map(OpEvent::ToAppend);
588 let append_times_input = append_times_input.map(OpEvent::AppendTimes);
589 let mut op_inputs = futures::stream::select(to_insert_input, append_times_input);
590
591 let mut state = SinkState::new();
592 loop {
593 // Loop until we've processed all the work we can.
594 loop {
595 if PartialOrder::less_than(&*output_frontier.borrow(), &state.output_progress) {
596 output_frontier.borrow_mut().clear();
597 output_frontier
598 .borrow_mut()
599 .extend(state.output_progress.iter().cloned());
600 }
601
602 debug!("ct_sink about to process {:?}", state);
603 let Some((new_upper, to_append)) = state.process() else {
604 break;
605 };
606 debug!("ct_sink got write {:?}: {:?}", new_upper, to_append);
607 state.output_progress =
608 truncating_compare_and_append(&mut write_handle, to_append, new_upper).await;
609 }
610
611 // Then try to generate some more work by reading inputs.
612 let Some(event) = op_inputs.next().await else {
613 // Inputs exhausted, shutting down.
614 output_frontier.borrow_mut().clear();
615 return;
616 };
617 event.apply(&mut state);
618 // Also drain any other events that may be ready.
619 while let Some(Some(event)) = op_inputs.next().now_or_never() {
620 event.apply(&mut state);
621 }
622 }
623 });
624
625 button
626}
627
628/// Writes the given data to the shard, truncating it as necessary.
629///
630/// Returns the latest known upper for the shard.
631async fn truncating_compare_and_append(
632 write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
633 to_append: Vec<((&SourceData, &()), &Timestamp, StorageDiff)>,
634 new_upper: Antichain<Timestamp>,
635) -> Antichain<Timestamp> {
636 let mut expected_upper = write_handle.shared_upper();
637 loop {
638 if !PartialOrder::less_than(&expected_upper, &new_upper) {
639 debug!("ct_sink skipping {:?}", new_upper.elements());
640 return expected_upper;
641 }
642 let res = write_handle
643 .compare_and_append(&to_append, expected_upper.clone(), new_upper.clone())
644 .await
645 .expect("usage was valid");
646 debug!(
647 "ct_sink write res {:?}-{:?}: {:?}",
648 expected_upper.elements(),
649 new_upper.elements(),
650 res
651 );
652 match res {
653 Ok(()) => return new_upper,
654 Err(err) => {
655 expected_upper = err.current;
656 continue;
657 }
658 }
659 }
660}
661
662#[derive(Debug)]
663struct SinkState<D, T> {
664 /// The known times at which we're going to write data to the output. This
665 /// is guaranteed to include all times < append_times_progress, except that
666 /// ones < output_progress may have been truncated.
667 append_times: BTreeSet<T>,
668 append_times_progress: Antichain<T>,
669
670 /// The data we've collected to append to the output. This is often
671 /// compacted to advancing times and is expected to be ~empty in the steady
672 /// state.
673 to_append: ConsolidatingVec<(D, T)>,
674 to_append_progress: Antichain<T>,
675
676 /// A lower bound on the upper of the output.
677 output_progress: Antichain<T>,
678}
679
680impl<D: Ord> SinkState<D, Timestamp> {
681 fn new() -> Self {
682 SinkState {
683 append_times: BTreeSet::new(),
684 append_times_progress: Antichain::from_elem(Timestamp::minimum()),
685 to_append: ConsolidatingVec::new(128, 0),
686 to_append_progress: Antichain::from_elem(Timestamp::minimum()),
687 output_progress: Antichain::from_elem(Timestamp::minimum()),
688 }
689 }
690
691 /// Returns data to write to the output, if any, and the new upper to use.
692 fn process(
693 &mut self,
694 ) -> Option<(
695 Antichain<Timestamp>,
696 Vec<((&D, &()), &Timestamp, StorageDiff)>,
697 )> {
698 // We can only append at times >= the output_progress, so pop off
699 // anything unnecessary.
700 while let Some(x) = self.append_times.first() {
701 if self.output_progress.less_equal(x) {
702 break;
703 }
704 self.append_times.pop_first();
705 }
706
707 // Find the smallest append_time before append_time_progress. This is
708 // the next time we might need to write data at. Note that we can only
709 // act on append_times once the progress has passed them, because they
710 // could come out of order.
711 let write_ts = match self.append_times.first() {
712 Some(x) if !self.append_times_progress.less_equal(x) => x,
713 Some(_) | None => {
714 // The CT sink's contract is that it only writes data at times
715 // we received an input diff. There are none in
716 // `[output_progress, append_times_progress)`, so we can go
717 // ahead and advance the upper of the output, if it's not
718 // already.
719 //
720 // We could instead ensure liveness by basing this off of
721 // to_append, but for any CTs reading the output (expected to be
722 // a common case) we'd end up looping each timestamp through
723 // persist one-by-one.
724 if PartialOrder::less_than(&self.output_progress, &self.append_times_progress) {
725 return Some((self.append_times_progress.clone(), Vec::new()));
726 }
727 // Otherwise, nothing to do!
728 return None;
729 }
730 };
731
732 if self.to_append_progress.less_equal(write_ts) {
733 // Don't have all the necessary data yet.
734 if self.output_progress.less_than(write_ts) {
735 // We can advance the output upper up to the write_ts. For
736 // self-referential CTs this might be necessary to ensure
737 // dataflow progress.
738 return Some((Antichain::from_elem(write_ts.clone()), Vec::new()));
739 }
740 return None;
741 }
742
743 // Time to write some data! Produce the collection as of write_ts by
744 // advancing timestamps, consolidating, and filtering out anything at
745 // future timestamps.
746 let as_of = std::slice::from_ref(write_ts);
747 for ((_, t), _) in self.to_append.iter_mut() {
748 t.advance_by(AntichainRef::new(as_of))
749 }
750 // TODO(ct2): Metrics for vec len and cap.
751 self.to_append.consolidate();
752
753 let append_data = self
754 .to_append
755 .iter()
756 .filter_map(|((k, t), d)| (t <= write_ts).then_some(((k, &()), t, d.into_inner())))
757 .collect();
758 Some((Antichain::from_elem(write_ts.step_forward()), append_data))
759 }
760}
761
762trait StepForward<D, R> {
763 /// Translates a collection one timestamp "forward" (i.e. `T` -> `T+1` as
764 /// defined by `TimestampManipulation::step_forward`).
765 ///
766 /// This includes:
767 /// - The differential timestamps in each data.
768 /// - The capabilities paired with that data.
769 /// - (As a consequence of the previous) the output frontier is one step forward
770 /// of the input frontier.
771 ///
772 /// The caller is responsible for ensuring that all data and capabilities given
773 /// to this operator can be stepped forward without panicking, otherwise the
774 /// operator will panic at runtime.
775 fn step_forward(self, name: &str) -> Self;
776}
777
778impl<'scope, D, R> StepForward<D, R> for VecCollection<'scope, Timestamp, D, R>
779where
780 D: Clone + 'static,
781 R: Semigroup + 'static,
782{
783 fn step_forward(self, name: &str) -> VecCollection<'scope, Timestamp, D, R> {
784 let name = format!("ct_step_forward({})", name);
785 let mut builder = OperatorBuilder::new(name, self.scope());
786 let (output, output_stream) = builder.new_output();
787 let mut output = OutputBuilder::from(output);
788
789 // We step forward (by one) each data timestamp and capability. As a
790 // result the output's frontier is guaranteed to be one past the input
791 // frontier, so make this promise to timely.
792 let step_forward_summary = Timestamp::from(1);
793 let mut input = builder.new_input_connection(
794 self.inner,
795 Pipeline,
796 [(0, Antichain::from_elem(step_forward_summary))],
797 );
798 builder.set_notify_for(0, FrontierInterest::Never);
799 builder.build(move |_caps| {
800 move |_frontiers| {
801 let mut output = output.activate();
802 input.for_each(|cap, data| {
803 for (_, ts, _) in data.iter_mut() {
804 *ts = ts.step_forward();
805 }
806 let cap = cap.delayed(&cap.time().step_forward(), 0);
807 output.session(&cap).give_container(data);
808 });
809 }
810 });
811
812 output_stream.as_collection()
813 }
814}
815
816trait TimesExtract<D, R> {
817 type Times;
818 fn times_extract(self, name: &str) -> (Self, Self::Times)
819 where
820 Self: Sized;
821}
822
823impl<'scope, D, R> TimesExtract<D, R> for VecCollection<'scope, Timestamp, D, R>
824where
825 D: Clone + 'static,
826 R: Semigroup + 'static + std::fmt::Debug,
827{
828 type Times = VecCollection<'scope, Timestamp, (), R>;
829 fn times_extract(
830 self,
831 name: &str,
832 ) -> (
833 VecCollection<'scope, Timestamp, D, R>,
834 VecCollection<'scope, Timestamp, (), R>,
835 ) {
836 let name = format!("ct_times_extract({})", name);
837 let mut builder = OperatorBuilder::new(name, self.scope());
838 let (passthrough, passthrough_stream) = builder.new_output();
839 let mut passthrough = OutputBuilder::from(passthrough);
840 let (times, times_stream) = builder.new_output();
841 let mut times = OutputBuilder::<_, ConsolidatingContainerBuilder<_>>::from(times);
842 let mut input = builder.new_input(self.inner, Pipeline);
843 builder.set_notify_for(0, FrontierInterest::Never);
844 builder.build(|_caps| {
845 move |_frontiers| {
846 let mut passthrough = passthrough.activate();
847 let mut times = times.activate();
848 input.for_each_time(|time, data| {
849 let mut times_session = times.session_with_builder(&time);
850 let mut passthrough_session = passthrough.session(&time);
851 for data in data {
852 let times_iter =
853 data.iter().map(|(_data, ts, diff)| ((), *ts, diff.clone()));
854 times_session.give_iterator(times_iter);
855 passthrough_session.give_container(data);
856 }
857 });
858 }
859 });
860 (
861 passthrough_stream.as_collection(),
862 times_stream.as_collection(),
863 )
864 }
865}
866
867trait TimesReduce<R> {
868 /// This is essentially a specialized impl of consolidate, with a HashMap
869 /// instead of the Trace.
870 fn times_reduce(self, name: &str) -> Self;
871}
872
873impl<'scope, R> TimesReduce<R> for VecCollection<'scope, Timestamp, (), R>
874where
875 R: Semigroup + 'static + std::fmt::Debug,
876{
877 fn times_reduce(self, name: &str) -> VecCollection<'scope, Timestamp, (), R> {
878 let name = format!("ct_times_reduce({})", name);
879 self.inner
880 .unary_frontier(Pipeline, &name, |_caps, _info| {
881 let mut notificator = FrontierNotificator::default();
882 let mut stash = HashMap::<_, R>::new();
883 move |(input, frontier), output| {
884 input.for_each(|cap, data| {
885 for ((), ts, diff) in data.drain(..) {
886 notificator.notify_at(cap.delayed(&ts, 0));
887 if let Some(sum) = stash.get_mut(&ts) {
888 sum.plus_equals(&diff);
889 } else {
890 stash.insert(ts, diff);
891 }
892 }
893 });
894 notificator.for_each(&[frontier], |cap, _not| {
895 if let Some(diff) = stash.remove(cap.time()) {
896 output.session(&cap).give(((), cap.time().clone(), diff));
897 }
898 });
899 }
900 })
901 .as_collection()
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use differential_dataflow::AsCollection;
908 use mz_repr::Timestamp;
909 use timely::Config;
910 use timely::dataflow::ProbeHandle;
911 use timely::dataflow::operators::capture::Extract;
912 use timely::dataflow::operators::{Capture, Input, ToStream};
913 use timely::progress::Antichain;
914
915 use super::*;
916
917 #[mz_ore::test]
918 fn step_forward() {
919 timely::execute(Config::thread(), |worker| {
920 let (mut input, probe, output) = worker.dataflow(|scope| {
921 let (handle, input) = scope.new_input();
922 let probe = ProbeHandle::<Timestamp>::new();
923 let output = input
924 .as_collection()
925 .step_forward("test")
926 .probe_with(&probe)
927 .inner
928 .capture();
929 (handle, probe, output)
930 });
931
932 let mut expected = Vec::new();
933 for i in 0u64..10 {
934 let in_ts = Timestamp::new(i);
935 let out_ts = in_ts.step_forward();
936 input.send((i, in_ts, 1));
937 input.advance_to(in_ts.step_forward());
938
939 // We should get the data out advanced by `step_forward` and
940 // also, crucially, the output frontier should do the same (i.e.
941 // this is why we can't simply use `VecCollection::delay`).
942 worker.step_while(|| probe.less_than(&out_ts.step_forward()));
943 expected.push((i, out_ts, 1));
944 }
945 // Closing the input should allow the output to advance and the
946 // dataflow to shut down.
947 input.close();
948 while worker.step() {}
949
950 let actual = output
951 .extract()
952 .into_iter()
953 .flat_map(|x| x.1)
954 .collect::<Vec<_>>();
955 assert_eq!(actual, expected);
956 })
957 .unwrap();
958 }
959
960 #[mz_ore::test]
961 fn times_extract() {
962 struct PanicOnClone;
963
964 impl Clone for PanicOnClone {
965 fn clone(&self) -> Self {
966 panic!("boom")
967 }
968 }
969
970 let output = timely::execute_directly(|worker| {
971 worker.dataflow(|scope| {
972 let input = [
973 (PanicOnClone, Timestamp::new(0), 0),
974 (PanicOnClone, Timestamp::new(1), 1),
975 (PanicOnClone, Timestamp::new(1), 1),
976 (PanicOnClone, Timestamp::new(2), -2),
977 (PanicOnClone, Timestamp::new(2), 1),
978 ]
979 .to_stream(scope)
980 .as_collection();
981 let (_passthrough, times) = input.times_extract("test");
982 times.inner.capture()
983 })
984 });
985 let expected = vec![((), Timestamp::new(1), 2), ((), Timestamp::new(2), -1)];
986 let actual = output
987 .extract()
988 .into_iter()
989 .flat_map(|x| x.1)
990 .collect::<Vec<_>>();
991 assert_eq!(actual, expected);
992 }
993
994 #[mz_ore::test]
995 fn times_reduce() {
996 let output = timely::execute_directly(|worker| {
997 worker.dataflow(|scope| {
998 let input = [
999 ((), Timestamp::new(3), 1),
1000 ((), Timestamp::new(2), 1),
1001 ((), Timestamp::new(1), 1),
1002 ((), Timestamp::new(2), 1),
1003 ((), Timestamp::new(3), 1),
1004 ((), Timestamp::new(3), 1),
1005 ]
1006 .to_stream(scope)
1007 .as_collection();
1008 input.times_reduce("test").inner.capture()
1009 })
1010 });
1011 let expected = vec![
1012 ((), Timestamp::new(1), 1),
1013 ((), Timestamp::new(2), 2),
1014 ((), Timestamp::new(3), 3),
1015 ];
1016 let actual = output
1017 .extract()
1018 .into_iter()
1019 .flat_map(|x| x.1)
1020 .collect::<Vec<_>>();
1021 assert_eq!(actual, expected);
1022 }
1023
1024 #[mz_ore::test]
1025 fn ct_sink_state() {
1026 #[track_caller]
1027 fn assert_noop(state: &mut super::SinkState<&'static str, Timestamp>) {
1028 if let Some(ret) = state.process() {
1029 panic!("should be nothing to write: {:?}", ret);
1030 }
1031 }
1032
1033 #[track_caller]
1034 fn assert_write(
1035 state: &mut super::SinkState<&'static str, Timestamp>,
1036 expected_upper: u64,
1037 expected_append: &[&str],
1038 ) {
1039 let (new_upper, to_append) = state.process().expect("should be something to write");
1040 assert_eq!(
1041 new_upper,
1042 Antichain::from_elem(Timestamp::new(expected_upper))
1043 );
1044 let to_append = to_append
1045 .into_iter()
1046 .map(|((k, ()), _ts, _diff)| *k)
1047 .collect::<Vec<_>>();
1048 assert_eq!(to_append, expected_append);
1049 }
1050
1051 let mut s = super::SinkState::new();
1052
1053 // Nothing to do at the initial state.
1054 assert_noop(&mut s);
1055
1056 // Getting data to append is not enough to do anything yet.
1057 s.to_append.push((("a", 1.into()), Diff::ONE));
1058 s.to_append.push((("b", 1.into()), Diff::ONE));
1059 assert_noop(&mut s);
1060
1061 // Knowing that this is the only data we'll get for that timestamp is
1062 // still not enough.
1063 s.to_append_progress = Antichain::from_elem(2.into());
1064 assert_noop(&mut s);
1065
1066 // Even knowing that we got input at that time is not quite enough yet
1067 // (we could be getting these out of order).
1068 s.append_times.insert(1.into());
1069 assert_noop(&mut s);
1070
1071 // Indeed, it did come out of order. Also note that this checks the ==
1072 // case for time vs progress.
1073 s.append_times.insert(0.into());
1074 assert_noop(&mut s);
1075
1076 // Okay, now we know that we've seen all the times we got input up to 3.
1077 // This is enough to allow the empty write of `[0,1)`.
1078 s.append_times_progress = Antichain::from_elem(3.into());
1079 assert_write(&mut s, 1, &[]);
1080
1081 // That succeeded, now we can write the data at 1.
1082 s.output_progress = Antichain::from_elem(1.into());
1083 assert_write(&mut s, 2, &["a", "b"]);
1084
1085 // That succeeded, now we know about some empty time.
1086 s.output_progress = Antichain::from_elem(2.into());
1087 assert_write(&mut s, 3, &[]);
1088
1089 // That succeeded, now nothing to do.
1090 s.output_progress = Antichain::from_elem(3.into());
1091 assert_noop(&mut s);
1092
1093 // Find out about a new time to write at. Even without the data, we can
1094 // do an empty write up to that time.
1095 s.append_times.insert(5.into());
1096 s.append_times_progress = Antichain::from_elem(6.into());
1097 assert_write(&mut s, 5, &[]);
1098
1099 // That succeeded, now nothing to do again.
1100 s.output_progress = Antichain::from_elem(5.into());
1101
1102 // Retract one of the things currently in the collection and add a new
1103 // thing, to verify the consolidate.
1104 s.to_append.push((("a", 5.into()), Diff::MINUS_ONE));
1105 s.to_append.push((("c", 5.into()), Diff::ONE));
1106 s.to_append_progress = Antichain::from_elem(6.into());
1107 assert_write(&mut s, 6, &["b", "c"]);
1108 }
1109}