Skip to main content

mz_compute/render/
continual_task.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A continual task presents as something like a `TRIGGER`: it watches some
11//! _input_ and whenever it changes at time `T`, executes a SQL txn, writing to
12//! some _output_ at the same time `T`. It can also read anything in materialize
13//! as a _reference_, most notably including the output.
14//!
15//! Only reacting to new inputs (and not the full history) makes a CT's
16//! rehydration time independent of the size of the inputs (NB this is not true
17//! for references), enabling things like writing UPSERT on top of an
18//! append-only shard in SQL (ignore the obvious bug with my upsert impl):
19//!
20//! ```sql
21//! CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
22//!     DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
23//!     INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
24//! )
25//! ```
26//!
27//! Unlike a materialized view, the continual task does not update outputs if
28//! references later change. This enables things like auditing:
29//!
30//! ```sql
31//! CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS (
32//!     INSERT INTO audit_log SELECT * FROM anomalies;
33//! )
34//! ```
35//!
36//! Rough implementation overview:
37//! - A CT is created and starts at some `start_ts` optionally later dropped and
38//!   stopped at some `end_ts`.
39//! - A CT takes one or more _input_s. These must be persist shards (i.e. TABLE,
40//!   SOURCE, MV, but not VIEW).
41//! - A CT has one or more _output_s. The outputs are (initially) owned by the
42//!   task and cannot be written to by other parts of the system.
43//! - The task is run for each time one of the inputs changes starting at
44//!   `start_ts`.
45//! - It is given the changes in its inputs at time `T` as diffs.
46//!   - These are presented as two SQL relations with just the inserts/deletes.
47//!   - NB: A full collection for the input can always be recovered by also
48//!     using the input as a "reference" (see below) and applying the diffs.
49//! - The task logic is expressed as a SQL transaction that does all reads at
50//!   commits all writes at `T`
51//!   - The notable exception to this is self-referential reads of the CT
52//!     output. See below for how that works.
53//! - This logic can _reference_ any nameable object in the system, not just the
54//!   inputs.
55//!   - However, the logic/transaction can mutate only the outputs.
56//! - Summary of differences between inputs and references:
57//!   - The task receives snapshot + changes for references (like regular
58//!     dataflow inputs today) but only changes for inputs.
59//!   - The task only produces output in response to changes in the inputs but
60//!     not in response to changes in the references.
61//! - Instead of re-evaluating the task logic from scratch for each input time,
62//!   we maintain the collection representing desired writes to the output(s) as
63//!   a dataflow.
64//! - The task dataflow is tied to a `CLUSTER` and runs on each `REPLICA`.
65//!   - HA strategy: multi-replica clusters race to commit and the losers throw
66//!     away the result.
67//!
68//! ## Self-References
69//!
70//! Self-references must be handled differently from other reads. When computing
71//! the proposed write to some output at `T`, we can only know the contents of
72//! it through `T-1` (the exclusive upper is `T`).
73//!
74//! We address this by initially assuming that the output contains no changes at
75//! `T`, then evaluating each of the statements in order, allowing them to see
76//! the proposed output changes made by the previous statements. By default,
77//! this is stopped after one iteration and proposed output diffs are committed
78//! if possible. (We could also add options for iterating to a fixpoint,
79//! stop/error after N iters, etc.) Then to compute the changes at `T+1`, we
80//! read in what was actually written to the output at `T` (maybe some other
81//! replica wrote something different) and begin again.
82//!
83//! The above is very similar to how timely/differential dataflow iteration
84//! works, except that our feedback loop goes through persist and the loop
85//! timestamp is already `mz_repr::Timestamp`.
86//!
87//! This is implemented as follows:
88//! - `let I = persist_source(self-reference)`
89//! - Transform `I` such that the contents at `T-1` are presented at `T` (i.e.
90//!   initially assume `T` is unchanged from `T-1`).
91//! - TODO(ct3): Actually implement the following.
92//! - In an iteration sub-scope:
93//!   - Bring `I` into the sub-scope and `let proposed = Variable`.
94//!   - We need a collection that at `(T, 0)` is always the contents of `I` at
95//!     `T`, but at `(T, 1...)` contains the proposed diffs by the CT logic. We
96//!     can construct it by concatenating `I` with `proposed` except that we
97//!     also need to retract everything in `proposed` for the next `(T+1, 0)`
98//!     (because `I` is the source of truth for what actually committed).
99//!  - `let R = retract_at_next_outer_ts(proposed)`
100//!  - `let result = logic(concat(I, proposed, R))`
101//!  - `proposed.set(result)`
102//! - Then we return `proposed.leave()` for attempted write to persist.
103//!
104//! ## As Ofs and Output Uppers
105//!
106//! - A continual task is first created with an initial as_of `I`. It is
107//!   initially rendered at as_of `I==A` but as it makes progress, it may be
108//!   rendered at later as_ofs `I<A`.
109//! - It is required that the output collection springs into existence at `I`
110//!   (i.e. receives the initial contents at `I`).
111//!   - For a snapshot CT, the full contents of the input at `I` are run through
112//!     the CT logic and written at `I`.
113//!   - For a non-snapshot CT, the collection is defined to be empty at `I`
114//!     (i.e. if the input happened to be written exactly at `I`, we'd ignore
115//!     it) and then start writing at `I+1`.
116//! - As documented in [DataflowDescription::as_of], `A` is the time we render
117//!   the inputs.
118//!   - An MV with an as_of of `A` will both have inputs rendered at `A` and
119//!     also the first time it could write is also `A`.
120//!   - A CT is the same on the initial render (`I==A`), but on renders after it
121//!     has made progress (`I<A`) the first time that  it could potentially
122//!     write is `A+1`. This is because a persist_source started with
123//!     SnapshotMode::Exclude can only start emitting diffs at `as_of+1`.
124//!   - As a result, we hold back the since on inputs to be strictly less than
125//!     the upper of the output. (This is only necessary for CTs, but we also do
126//!     it for MVs to avoid the special case.)
127//!   - For CT "inputs" (which are disallowed from being the output), we render
128//!     the persist_source with as_of `A`.
129//!     - When `I==A` we include the snapshot iff the snapshot option is used.
130//!     - When `I<A` we always exclude the snapshot. It would be unnecessary and
131//!       this is an absolutely critical performance optimization to make CT
132//!       rehydration times independent of input size.
133//!   - For CT "references", we render the persist_source with as_of `A` and
134//!     always include the snapshot.
135//!     - There is one subtlety: self-references on the initial render. We need
136//!       the contents to be available at `A-1`, so that we can do the
137//!       step_forward described above to get it at `A`. However, the collection
138//!       springs into existence at `I`, so we when `I==A`, we're not allowed to
139//!       read it as_of `A-1` (the since of the shard may have advanced past
140//!       that). We address this by rendering the persist_source as normal at
141//!       `A`. On startup, persist_source immediately downgrades its frontier to
142//!       `A` (making `A-1` readable). Combined with step_forward, this is
143//!       enough to unblock the CT self-reference. We do however have to tweak
144//!       the `suppress_early_progress` operator to use `A-1` instead of `A` for
145//!       this case.
146//!     - On subsequent renders, self-references work as normal.
147
148use std::any::Any;
149use std::cell::RefCell;
150use std::collections::BTreeSet;
151use std::rc::Rc;
152use std::sync::Arc;
153
154use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
155use differential_dataflow::difference::Semigroup;
156use differential_dataflow::lattice::Lattice;
157use differential_dataflow::{AsCollection, Hashable, VecCollection};
158use futures::{Future, FutureExt, StreamExt};
159use mz_compute_types::dataflows::DataflowDescription;
160use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, ContinualTaskConnection};
161use mz_ore::cast::CastFrom;
162use mz_ore::collections::HashMap;
163use mz_persist_client::Diagnostics;
164use mz_persist_client::error::UpperMismatch;
165use mz_persist_client::operators::shard_source::SnapshotMode;
166use mz_persist_client::write::WriteHandle;
167use mz_persist_types::codec_impls::UnitSchema;
168use mz_repr::{Diff, GlobalId, Row, Timestamp};
169use mz_storage_types::StorageDiff;
170use mz_storage_types::controller::CollectionMetadata;
171use mz_storage_types::errors::DataflowError;
172use mz_storage_types::sources::SourceData;
173use mz_timely_util::builder_async::{Button, Event, OperatorBuilder as AsyncOperatorBuilder};
174use mz_timely_util::operator::CollectionExt;
175use mz_timely_util::probe;
176use mz_timely_util::probe::ProbeNotify;
177use timely::PartialOrder;
178use timely::dataflow::channels::pact::{Exchange, Pipeline};
179use timely::dataflow::operators::generic::OutputBuilder;
180use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
181use timely::dataflow::operators::vec::{Filter, Map};
182use timely::dataflow::operators::{FrontierNotificator, Operator};
183use timely::dataflow::{ProbeHandle, Scope};
184use timely::progress::frontier::AntichainRef;
185use timely::progress::operate::FrontierInterest;
186use timely::progress::{Antichain, Timestamp as _};
187use tracing::debug;
188
189use crate::compute_state::ComputeState;
190use crate::render::StartSignal;
191use crate::render::sinks::SinkRender;
192use crate::sink::ConsolidatingVec;
193
194pub(crate) struct ContinualTaskCtx<G: Scope<Timestamp = Timestamp>> {
195    name: Option<String>,
196    dataflow_as_of: Option<Antichain<Timestamp>>,
197    inputs_with_snapshot: Option<bool>,
198    ct_inputs: BTreeSet<GlobalId>,
199    ct_outputs: BTreeSet<GlobalId>,
200    pub ct_times: Vec<VecCollection<G, (), Diff>>,
201}
202
203/// An encapsulation of the transformation logic necessary on data coming into a
204/// continual task.
205///
206/// NB: In continual task jargon, an "input" contains diffs and a "reference" is
207/// a normal source/collection.
208pub(crate) enum ContinualTaskSourceTransformer {
209    /// A collection containing, at each time T, exactly the inserts at time T
210    /// in the transformed collection.
211    ///
212    /// For example:
213    /// - Input: {} at 0, {1} at 1, {1} at 2, ...
214    /// - Output: {} at 0, {1} at 1, {} at 2, ...
215    ///
216    /// We'll presumably have the same for deletes eventually, but it's not
217    /// exposed in the SQL frontend yet.
218    InsertsInput {
219        source_id: GlobalId,
220        with_snapshot: bool,
221    },
222    /// A self-reference to the continual task's output. This is essentially a
223    /// timely feedback loop via the persist shard. See module rustdoc for how
224    /// this works.
225    SelfReference { source_id: GlobalId },
226    /// A normal collection (no-op transformation).
227    NormalReference,
228}
229
230impl ContinualTaskSourceTransformer {
231    /// The persist_source `SnapshotMode` to use when reading this source.
232    pub fn snapshot_mode(&self) -> SnapshotMode {
233        use ContinualTaskSourceTransformer::*;
234        match self {
235            InsertsInput {
236                with_snapshot: false,
237                ..
238            } => SnapshotMode::Exclude,
239            InsertsInput {
240                with_snapshot: true,
241                ..
242            }
243            | SelfReference { .. }
244            | NormalReference => SnapshotMode::Include,
245        }
246    }
247
248    /// Returns the as_of to use with the suppress_early_progress operator for
249    /// this source. See the module rustdoc for context.
250    pub fn suppress_early_progress_as_of(
251        &self,
252        as_of: Antichain<Timestamp>,
253    ) -> Antichain<Timestamp> {
254        use ContinualTaskSourceTransformer::*;
255        match self {
256            InsertsInput { .. } => as_of,
257            SelfReference { .. } => as_of
258                .iter()
259                .map(|x| x.step_back().unwrap_or_else(Timestamp::minimum))
260                .collect(),
261            NormalReference => as_of,
262        }
263    }
264
265    /// Performs the necessary transformation on the source collection.
266    ///
267    /// Returns the transformed "oks" and "errs" collections. Also returns the
268    /// appropriate `ct_times` collection used to inform the sink which times
269    /// were changed in the inputs.
270    pub fn transform<S: Scope<Timestamp = Timestamp>>(
271        &self,
272        oks: VecCollection<S, Row, Diff>,
273        errs: VecCollection<S, DataflowError, Diff>,
274    ) -> (
275        VecCollection<S, Row, Diff>,
276        VecCollection<S, DataflowError, Diff>,
277        VecCollection<S, (), Diff>,
278    ) {
279        use ContinualTaskSourceTransformer::*;
280        match self {
281            // Make a collection s.t, for each time T in the input, the output
282            // contains the inserts at T.
283            InsertsInput { source_id, .. } => {
284                let name = source_id.to_string();
285                // Keep only the inserts.
286                let oks = oks.inner.filter(|(_, _, diff)| diff.is_positive());
287                // Grab the original times for use in the sink operator.
288                let (oks, times) = oks.as_collection().times_extract(&name);
289                // Then retract everything at the next timestamp.
290                let oks = oks.inner.flat_map(|(row, ts, diff)| {
291                    let retract_ts = ts.step_forward();
292                    let negation = -diff;
293                    [(row.clone(), ts, diff), (row, retract_ts, negation)]
294                });
295                (oks.as_collection(), errs, times)
296            }
297            NormalReference => {
298                let times = VecCollection::empty(&oks.scope());
299                (oks, errs, times)
300            }
301            // When computing an self-referential output at `T`, start by
302            // assuming there are no changes from the contents at `T-1`. See the
303            // module rustdoc for how this fits into the larger picture.
304            SelfReference { source_id } => {
305                let name = source_id.to_string();
306                let times = VecCollection::empty(&oks.scope());
307                // step_forward will panic at runtime if it receives a data or
308                // capability with a time that cannot be stepped forward (i.e.
309                // because it is already the max). We're safe here because this
310                // is stepping `T-1` forward to `T`.
311                let oks = oks.step_forward(&name);
312                let errs = errs.step_forward(&name);
313                (oks, errs, times)
314            }
315        }
316    }
317}
318
319impl<G: Scope<Timestamp = Timestamp>> ContinualTaskCtx<G> {
320    pub fn new<P, S>(dataflow: &DataflowDescription<P, S, Timestamp>) -> Self {
321        let mut name = None;
322        let mut ct_inputs = BTreeSet::new();
323        let mut ct_outputs = BTreeSet::new();
324        let mut inputs_with_snapshot = None;
325        for (sink_id, sink) in &dataflow.sink_exports {
326            match &sink.connection {
327                ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
328                    input_id, ..
329                }) => {
330                    ct_outputs.insert(*sink_id);
331                    ct_inputs.insert(*input_id);
332                    // There's only one CT sink per dataflow at this point.
333                    assert_eq!(name, None);
334                    name = Some(sink_id.to_string());
335                    assert_eq!(inputs_with_snapshot, None);
336                    match (
337                        sink.with_snapshot,
338                        dataflow.as_of.as_ref(),
339                        dataflow.initial_storage_as_of.as_ref(),
340                    ) {
341                        // User specified no snapshot when creating the CT.
342                        (false, _, _) => inputs_with_snapshot = Some(false),
343                        // User specified a snapshot but we're past the initial
344                        // as_of.
345                        (true, Some(as_of), Some(initial_as_of))
346                            if PartialOrder::less_than(initial_as_of, as_of) =>
347                        {
348                            inputs_with_snapshot = Some(false)
349                        }
350                        // User specified a snapshot and we're either at the
351                        // initial creation, or we don't know (builtin CTs). If
352                        // we don't know, it's always safe to fall back to
353                        // snapshotting, at worst it's wasted work and will get
354                        // filtered.
355                        (true, _, _) => inputs_with_snapshot = Some(true),
356                    }
357                }
358                _ => continue,
359            }
360        }
361        let mut ret = ContinualTaskCtx {
362            name,
363            dataflow_as_of: None,
364            inputs_with_snapshot,
365            ct_inputs,
366            ct_outputs,
367            ct_times: Vec::new(),
368        };
369        // Only clone the as_of if we're in a CT dataflow.
370        if ret.is_ct_dataflow() {
371            ret.dataflow_as_of = dataflow.as_of.clone();
372            // Sanity check that we have a name if we're in a CT dataflow.
373            assert!(ret.name.is_some());
374        }
375        ret
376    }
377
378    pub fn is_ct_dataflow(&self) -> bool {
379        // Inputs are non-empty iff outputs are non-empty.
380        assert_eq!(self.ct_inputs.is_empty(), self.ct_outputs.is_empty());
381        !self.ct_outputs.is_empty()
382    }
383
384    pub fn get_ct_source_transformer(
385        &self,
386        source_id: GlobalId,
387    ) -> Option<ContinualTaskSourceTransformer> {
388        let Some(inputs_with_snapshot) = self.inputs_with_snapshot else {
389            return None;
390        };
391        let transformer = match (
392            self.ct_inputs.contains(&source_id),
393            self.ct_outputs.contains(&source_id),
394        ) {
395            (false, false) => ContinualTaskSourceTransformer::NormalReference,
396            (false, true) => ContinualTaskSourceTransformer::SelfReference { source_id },
397            (true, false) => ContinualTaskSourceTransformer::InsertsInput {
398                source_id,
399                with_snapshot: inputs_with_snapshot,
400            },
401            (true, true) => panic!("ct output is not allowed to be an input"),
402        };
403        Some(transformer)
404    }
405
406    pub fn input_times(&self, scope: &G) -> Option<VecCollection<G, (), Diff>> {
407        // We have a name iff this is a CT dataflow.
408        assert_eq!(self.is_ct_dataflow(), self.name.is_some());
409        let Some(name) = self.name.as_ref() else {
410            return None;
411        };
412        // Note that self.ct_times might be empty (if the user didn't reference
413        // the input), but this still does the correct, though maybe useless,
414        // thing: no diffs coming into the input means no times to write at.
415        let ct_times = differential_dataflow::collection::concatenate(
416            &mut scope.clone(),
417            self.ct_times.iter().cloned(),
418        );
419        // Reduce this down to one update per-time-per-worker before exchanging
420        // it, so we don't waste work on unnecessarily high data volumes.
421        let ct_times = ct_times.times_reduce(name);
422        Some(ct_times)
423    }
424}
425
426impl<G> SinkRender<G> for ContinualTaskConnection<CollectionMetadata>
427where
428    G: Scope<Timestamp = Timestamp>,
429{
430    fn render_sink(
431        &self,
432        compute_state: &mut ComputeState,
433        _sink: &ComputeSinkDesc<CollectionMetadata>,
434        sink_id: GlobalId,
435        as_of: Antichain<Timestamp>,
436        start_signal: StartSignal,
437        oks: VecCollection<G, Row, Diff>,
438        errs: VecCollection<G, DataflowError, Diff>,
439        append_times: Option<VecCollection<G, (), Diff>>,
440        flow_control_probe: &probe::Handle<Timestamp>,
441    ) -> Option<Rc<dyn Any>> {
442        let name = sink_id.to_string();
443
444        let to_append = oks
445            .map(|x| SourceData(Ok(x)))
446            .concat(errs.map(|x| SourceData(Err(x))));
447        let append_times = append_times.expect("should be provided by ContinualTaskCtx");
448
449        let write_handle = {
450            let clients = Arc::clone(&compute_state.persist_clients);
451            let metadata = self.storage_metadata.clone();
452            let handle_purpose = format!("ct_sink({})", name);
453            async move {
454                let client = clients
455                    .open(metadata.persist_location)
456                    .await
457                    .expect("valid location");
458                client
459                    .open_writer(
460                        metadata.data_shard,
461                        metadata.relation_desc.into(),
462                        UnitSchema.into(),
463                        Diagnostics {
464                            shard_name: sink_id.to_string(),
465                            handle_purpose,
466                        },
467                    )
468                    .await
469                    .expect("codecs should match")
470            }
471        };
472
473        let collection = compute_state.expect_collection_mut(sink_id);
474        let probe = ProbeHandle::default();
475        let to_append = to_append
476            .probe_with(&probe)
477            .inner
478            .probe_notify_with(vec![flow_control_probe.clone()])
479            .as_collection();
480        collection.compute_probe = Some(probe);
481        let sink_write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
482        collection.sink_write_frontier = Some(Rc::clone(&sink_write_frontier));
483
484        // TODO(ct1): Obey `compute_state.read_only_rx`
485        //
486        // Seemingly, the read-only env needs to tail the output shard and keep
487        // historical updates around until it sees that the output frontier
488        // advances beyond their times.
489        let sink_button = continual_task_sink(
490            &name,
491            to_append,
492            append_times,
493            as_of,
494            write_handle,
495            start_signal,
496            sink_write_frontier,
497        );
498        Some(Rc::new(sink_button.press_on_drop()))
499    }
500}
501
502fn continual_task_sink<G: Scope<Timestamp = Timestamp>>(
503    name: &str,
504    to_append: VecCollection<G, SourceData, Diff>,
505    append_times: VecCollection<G, (), Diff>,
506    as_of: Antichain<Timestamp>,
507    write_handle: impl Future<Output = WriteHandle<SourceData, (), Timestamp, StorageDiff>>
508    + Send
509    + 'static,
510    start_signal: StartSignal,
511    output_frontier: Rc<RefCell<Antichain<Timestamp>>>,
512) -> Button {
513    let scope = to_append.scope();
514    let mut op = AsyncOperatorBuilder::new(format!("ct_sink({})", name), scope.clone());
515
516    // TODO(ct2): This all works perfectly well data parallel (assuming we
517    // broadcast the append_times). We just need to hook it up to the
518    // multi-worker persist-sink, but that requires some refactoring. This would
519    // also remove the need for this to be an async timely operator.
520    let active_worker = name.hashed();
521    let to_append_input =
522        op.new_input_for_many(to_append.inner, Exchange::new(move |_| active_worker), []);
523    let append_times_input = op.new_input_for_many(
524        append_times.inner,
525        Exchange::new(move |_| active_worker),
526        [],
527    );
528
529    let active_worker = usize::cast_from(active_worker) % scope.peers() == scope.index();
530    let button = op.build(move |_capabilities| async move {
531        if !active_worker {
532            output_frontier.borrow_mut().clear();
533            return;
534        }
535
536        // SUBTLE: The start_signal below may not be unblocked by the compute
537        // controller until it thinks the inputs are "ready" (i.e. readable at
538        // the as_of), but if the CT is self-referential, one of the inputs will
539        // be the output (which starts at `T::minimum()`, not the as_of). To
540        // break this cycle, before we even get the start signal, go ahead and
541        // advance the output's (exclusive) upper to the first time that this CT
542        // might write: `as_of+1`. Because we don't want this to happen on
543        // restarts, only do it if the upper is `T::minimum()`.
544        let mut write_handle = write_handle.await;
545        {
546            let res = write_handle
547                .compare_and_append_batch(
548                    &mut [],
549                    Antichain::from_elem(Timestamp::minimum()),
550                    as_of.clone(),
551                    true,
552                )
553                .await
554                .expect("usage was valid");
555            match res {
556                // We advanced the upper.
557                Ok(()) => {}
558                // Someone else advanced the upper.
559                Err(UpperMismatch { .. }) => {}
560            }
561        }
562
563        let () = start_signal.await;
564
565        #[derive(Debug)]
566        enum OpEvent<C> {
567            ToAppend(Event<Timestamp, C, Vec<(SourceData, Timestamp, Diff)>>),
568            AppendTimes(Event<Timestamp, C, Vec<((), Timestamp, Diff)>>),
569        }
570
571        impl<C: std::fmt::Debug> OpEvent<C> {
572            fn apply(self, state: &mut SinkState<SourceData, Timestamp>) {
573                debug!("ct_sink event {:?}", self);
574                match self {
575                    OpEvent::ToAppend(Event::Data(_cap, x)) => {
576                        for (k, t, d) in x {
577                            state.to_append.push(((k, t), d));
578                        }
579                    }
580                    OpEvent::ToAppend(Event::Progress(x)) => state.to_append_progress = x,
581                    OpEvent::AppendTimes(Event::Data(_cap, x)) => state
582                        .append_times
583                        .extend(x.into_iter().map(|((), t, _d)| t)),
584                    OpEvent::AppendTimes(Event::Progress(x)) => state.append_times_progress = x,
585                }
586            }
587        }
588
589        let to_insert_input = to_append_input.map(OpEvent::ToAppend);
590        let append_times_input = append_times_input.map(OpEvent::AppendTimes);
591        let mut op_inputs = futures::stream::select(to_insert_input, append_times_input);
592
593        let mut state = SinkState::new();
594        loop {
595            // Loop until we've processed all the work we can.
596            loop {
597                if PartialOrder::less_than(&*output_frontier.borrow(), &state.output_progress) {
598                    output_frontier.borrow_mut().clear();
599                    output_frontier
600                        .borrow_mut()
601                        .extend(state.output_progress.iter().cloned());
602                }
603
604                debug!("ct_sink about to process {:?}", state);
605                let Some((new_upper, to_append)) = state.process() else {
606                    break;
607                };
608                debug!("ct_sink got write {:?}: {:?}", new_upper, to_append);
609                state.output_progress =
610                    truncating_compare_and_append(&mut write_handle, to_append, new_upper).await;
611            }
612
613            // Then try to generate some more work by reading inputs.
614            let Some(event) = op_inputs.next().await else {
615                // Inputs exhausted, shutting down.
616                output_frontier.borrow_mut().clear();
617                return;
618            };
619            event.apply(&mut state);
620            // Also drain any other events that may be ready.
621            while let Some(Some(event)) = op_inputs.next().now_or_never() {
622                event.apply(&mut state);
623            }
624        }
625    });
626
627    button
628}
629
630/// Writes the given data to the shard, truncating it as necessary.
631///
632/// Returns the latest known upper for the shard.
633async fn truncating_compare_and_append(
634    write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
635    to_append: Vec<((&SourceData, &()), &Timestamp, StorageDiff)>,
636    new_upper: Antichain<Timestamp>,
637) -> Antichain<Timestamp> {
638    let mut expected_upper = write_handle.shared_upper();
639    loop {
640        if !PartialOrder::less_than(&expected_upper, &new_upper) {
641            debug!("ct_sink skipping {:?}", new_upper.elements());
642            return expected_upper;
643        }
644        let res = write_handle
645            .compare_and_append(&to_append, expected_upper.clone(), new_upper.clone())
646            .await
647            .expect("usage was valid");
648        debug!(
649            "ct_sink write res {:?}-{:?}: {:?}",
650            expected_upper.elements(),
651            new_upper.elements(),
652            res
653        );
654        match res {
655            Ok(()) => return new_upper,
656            Err(err) => {
657                expected_upper = err.current;
658                continue;
659            }
660        }
661    }
662}
663
664#[derive(Debug)]
665struct SinkState<D, T> {
666    /// The known times at which we're going to write data to the output. This
667    /// is guaranteed to include all times < append_times_progress, except that
668    /// ones < output_progress may have been truncated.
669    append_times: BTreeSet<T>,
670    append_times_progress: Antichain<T>,
671
672    /// The data we've collected to append to the output. This is often
673    /// compacted to advancing times and is expected to be ~empty in the steady
674    /// state.
675    to_append: ConsolidatingVec<(D, T)>,
676    to_append_progress: Antichain<T>,
677
678    /// A lower bound on the upper of the output.
679    output_progress: Antichain<T>,
680}
681
682impl<D: Ord> SinkState<D, Timestamp> {
683    fn new() -> Self {
684        SinkState {
685            append_times: BTreeSet::new(),
686            append_times_progress: Antichain::from_elem(Timestamp::minimum()),
687            to_append: ConsolidatingVec::new(128, 0),
688            to_append_progress: Antichain::from_elem(Timestamp::minimum()),
689            output_progress: Antichain::from_elem(Timestamp::minimum()),
690        }
691    }
692
693    /// Returns data to write to the output, if any, and the new upper to use.
694    fn process(
695        &mut self,
696    ) -> Option<(
697        Antichain<Timestamp>,
698        Vec<((&D, &()), &Timestamp, StorageDiff)>,
699    )> {
700        // We can only append at times >= the output_progress, so pop off
701        // anything unnecessary.
702        while let Some(x) = self.append_times.first() {
703            if self.output_progress.less_equal(x) {
704                break;
705            }
706            self.append_times.pop_first();
707        }
708
709        // Find the smallest append_time before append_time_progress. This is
710        // the next time we might need to write data at. Note that we can only
711        // act on append_times once the progress has passed them, because they
712        // could come out of order.
713        let write_ts = match self.append_times.first() {
714            Some(x) if !self.append_times_progress.less_equal(x) => x,
715            Some(_) | None => {
716                // The CT sink's contract is that it only writes data at times
717                // we received an input diff. There are none in
718                // `[output_progress, append_times_progress)`, so we can go
719                // ahead and advance the upper of the output, if it's not
720                // already.
721                //
722                // We could instead ensure liveness by basing this off of
723                // to_append, but for any CTs reading the output (expected to be
724                // a common case) we'd end up looping each timestamp through
725                // persist one-by-one.
726                if PartialOrder::less_than(&self.output_progress, &self.append_times_progress) {
727                    return Some((self.append_times_progress.clone(), Vec::new()));
728                }
729                // Otherwise, nothing to do!
730                return None;
731            }
732        };
733
734        if self.to_append_progress.less_equal(write_ts) {
735            // Don't have all the necessary data yet.
736            if self.output_progress.less_than(write_ts) {
737                // We can advance the output upper up to the write_ts. For
738                // self-referential CTs this might be necessary to ensure
739                // dataflow progress.
740                return Some((Antichain::from_elem(write_ts.clone()), Vec::new()));
741            }
742            return None;
743        }
744
745        // Time to write some data! Produce the collection as of write_ts by
746        // advancing timestamps, consolidating, and filtering out anything at
747        // future timestamps.
748        let as_of = std::slice::from_ref(write_ts);
749        for ((_, t), _) in self.to_append.iter_mut() {
750            t.advance_by(AntichainRef::new(as_of))
751        }
752        // TODO(ct2): Metrics for vec len and cap.
753        self.to_append.consolidate();
754
755        let append_data = self
756            .to_append
757            .iter()
758            .filter_map(|((k, t), d)| (t <= write_ts).then_some(((k, &()), t, d.into_inner())))
759            .collect();
760        Some((Antichain::from_elem(write_ts.step_forward()), append_data))
761    }
762}
763
764trait StepForward<G: Scope, D, R> {
765    /// Translates a collection one timestamp "forward" (i.e. `T` -> `T+1` as
766    /// defined by `TimestampManipulation::step_forward`).
767    ///
768    /// This includes:
769    /// - The differential timestamps in each data.
770    /// - The capabilities paired with that data.
771    /// - (As a consequence of the previous) the output frontier is one step forward
772    ///   of the input frontier.
773    ///
774    /// The caller is responsible for ensuring that all data and capabilities given
775    /// to this operator can be stepped forward without panicking, otherwise the
776    /// operator will panic at runtime.
777    fn step_forward(self, name: &str) -> VecCollection<G, D, R>;
778}
779
780impl<G, D, R> StepForward<G, D, R> for VecCollection<G, D, R>
781where
782    G: Scope<Timestamp = Timestamp>,
783    D: Clone + 'static,
784    R: Semigroup + 'static,
785{
786    fn step_forward(self, name: &str) -> VecCollection<G, D, R> {
787        let name = format!("ct_step_forward({})", name);
788        let mut builder = OperatorBuilder::new(name, self.scope());
789        let (output, output_stream) = builder.new_output();
790        let mut output = OutputBuilder::from(output);
791
792        // We step forward (by one) each data timestamp and capability. As a
793        // result the output's frontier is guaranteed to be one past the input
794        // frontier, so make this promise to timely.
795        let step_forward_summary = Timestamp::from(1);
796        let mut input = builder.new_input_connection(
797            self.inner,
798            Pipeline,
799            [(0, Antichain::from_elem(step_forward_summary))],
800        );
801        builder.set_notify_for(0, FrontierInterest::Never);
802        builder.build(move |_caps| {
803            move |_frontiers| {
804                let mut output = output.activate();
805                input.for_each(|cap, data| {
806                    for (_, ts, _) in data.iter_mut() {
807                        *ts = ts.step_forward();
808                    }
809                    let cap = cap.delayed(&cap.time().step_forward(), 0);
810                    output.session(&cap).give_container(data);
811                });
812            }
813        });
814
815        output_stream.as_collection()
816    }
817}
818
819trait TimesExtract<G: Scope, D, R> {
820    /// Returns a collection with the times changed in the input collection.
821    ///
822    /// This works by mapping the data piece of the differential tuple to `()`.
823    /// It is essentially the same as the following, but without cloning
824    /// everything in the input.
825    ///
826    /// ```ignore
827    /// input.map(|(_data, ts, diff)| ((), ts, diff))
828    /// ```
829    ///
830    /// The output may be partially consolidated, but no consolidation
831    /// guarantees are made.
832    fn times_extract(self, name: &str) -> (VecCollection<G, D, R>, VecCollection<G, (), R>);
833}
834
835impl<G, D, R> TimesExtract<G, D, R> for VecCollection<G, D, R>
836where
837    G: Scope<Timestamp = Timestamp>,
838    D: Clone + 'static,
839    R: Semigroup + 'static + std::fmt::Debug,
840{
841    fn times_extract(self, name: &str) -> (VecCollection<G, D, R>, VecCollection<G, (), R>) {
842        let name = format!("ct_times_extract({})", name);
843        let mut builder = OperatorBuilder::new(name, self.scope());
844        let (passthrough, passthrough_stream) = builder.new_output();
845        let mut passthrough = OutputBuilder::from(passthrough);
846        let (times, times_stream) = builder.new_output();
847        let mut times = OutputBuilder::<_, ConsolidatingContainerBuilder<_>>::from(times);
848        let mut input = builder.new_input(self.inner, Pipeline);
849        builder.set_notify_for(0, FrontierInterest::Never);
850        builder.build(|_caps| {
851            move |_frontiers| {
852                let mut passthrough = passthrough.activate();
853                let mut times = times.activate();
854                input.for_each_time(|time, data| {
855                    let mut times_session = times.session_with_builder(&time);
856                    let mut passthrough_session = passthrough.session(&time);
857                    for data in data {
858                        let times_iter =
859                            data.iter().map(|(_data, ts, diff)| ((), *ts, diff.clone()));
860                        times_session.give_iterator(times_iter);
861                        passthrough_session.give_container(data);
862                    }
863                });
864            }
865        });
866        (
867            passthrough_stream.as_collection(),
868            times_stream.as_collection(),
869        )
870    }
871}
872
873trait TimesReduce<G: Scope, R> {
874    /// This is essentially a specialized impl of consolidate, with a HashMap
875    /// instead of the Trace.
876    fn times_reduce(self, name: &str) -> VecCollection<G, (), R>;
877}
878
879impl<G, R> TimesReduce<G, R> for VecCollection<G, (), R>
880where
881    G: Scope<Timestamp = Timestamp>,
882    R: Semigroup + 'static + std::fmt::Debug,
883{
884    fn times_reduce(self, name: &str) -> VecCollection<G, (), R> {
885        let name = format!("ct_times_reduce({})", name);
886        self.inner
887            .unary_frontier(Pipeline, &name, |_caps, _info| {
888                let mut notificator = FrontierNotificator::default();
889                let mut stash = HashMap::<_, R>::new();
890                move |(input, frontier), output| {
891                    input.for_each(|cap, data| {
892                        for ((), ts, diff) in data.drain(..) {
893                            notificator.notify_at(cap.delayed(&ts, 0));
894                            if let Some(sum) = stash.get_mut(&ts) {
895                                sum.plus_equals(&diff);
896                            } else {
897                                stash.insert(ts, diff);
898                            }
899                        }
900                    });
901                    notificator.for_each(&[frontier], |cap, _not| {
902                        if let Some(diff) = stash.remove(cap.time()) {
903                            output.session(&cap).give(((), cap.time().clone(), diff));
904                        }
905                    });
906                }
907            })
908            .as_collection()
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use differential_dataflow::AsCollection;
915    use mz_repr::Timestamp;
916    use timely::Config;
917    use timely::dataflow::ProbeHandle;
918    use timely::dataflow::operators::capture::Extract;
919    use timely::dataflow::operators::{Capture, Input, ToStream};
920    use timely::progress::Antichain;
921
922    use super::*;
923
924    #[mz_ore::test]
925    fn step_forward() {
926        timely::execute(Config::thread(), |worker| {
927            let (mut input, probe, output) = worker.dataflow(|scope| {
928                let (handle, input) = scope.new_input();
929                let probe = ProbeHandle::<Timestamp>::new();
930                let output = input
931                    .as_collection()
932                    .step_forward("test")
933                    .probe_with(&probe)
934                    .inner
935                    .capture();
936                (handle, probe, output)
937            });
938
939            let mut expected = Vec::new();
940            for i in 0u64..10 {
941                let in_ts = Timestamp::new(i);
942                let out_ts = in_ts.step_forward();
943                input.send((i, in_ts, 1));
944                input.advance_to(in_ts.step_forward());
945
946                // We should get the data out advanced by `step_forward` and
947                // also, crucially, the output frontier should do the same (i.e.
948                // this is why we can't simply use `VecCollection::delay`).
949                worker.step_while(|| probe.less_than(&out_ts.step_forward()));
950                expected.push((i, out_ts, 1));
951            }
952            // Closing the input should allow the output to advance and the
953            // dataflow to shut down.
954            input.close();
955            while worker.step() {}
956
957            let actual = output
958                .extract()
959                .into_iter()
960                .flat_map(|x| x.1)
961                .collect::<Vec<_>>();
962            assert_eq!(actual, expected);
963        })
964        .unwrap();
965    }
966
967    #[mz_ore::test]
968    fn times_extract() {
969        struct PanicOnClone;
970
971        impl Clone for PanicOnClone {
972            fn clone(&self) -> Self {
973                panic!("boom")
974            }
975        }
976
977        let output = timely::execute_directly(|worker| {
978            worker.dataflow(|scope| {
979                let input = [
980                    (PanicOnClone, Timestamp::new(0), 0),
981                    (PanicOnClone, Timestamp::new(1), 1),
982                    (PanicOnClone, Timestamp::new(1), 1),
983                    (PanicOnClone, Timestamp::new(2), -2),
984                    (PanicOnClone, Timestamp::new(2), 1),
985                ]
986                .to_stream(scope)
987                .as_collection();
988                let (_passthrough, times) = input.times_extract("test");
989                times.inner.capture()
990            })
991        });
992        let expected = vec![((), Timestamp::new(1), 2), ((), Timestamp::new(2), -1)];
993        let actual = output
994            .extract()
995            .into_iter()
996            .flat_map(|x| x.1)
997            .collect::<Vec<_>>();
998        assert_eq!(actual, expected);
999    }
1000
1001    #[mz_ore::test]
1002    fn times_reduce() {
1003        let output = timely::execute_directly(|worker| {
1004            worker.dataflow(|scope| {
1005                let input = [
1006                    ((), Timestamp::new(3), 1),
1007                    ((), Timestamp::new(2), 1),
1008                    ((), Timestamp::new(1), 1),
1009                    ((), Timestamp::new(2), 1),
1010                    ((), Timestamp::new(3), 1),
1011                    ((), Timestamp::new(3), 1),
1012                ]
1013                .to_stream(scope)
1014                .as_collection();
1015                input.times_reduce("test").inner.capture()
1016            })
1017        });
1018        let expected = vec![
1019            ((), Timestamp::new(1), 1),
1020            ((), Timestamp::new(2), 2),
1021            ((), Timestamp::new(3), 3),
1022        ];
1023        let actual = output
1024            .extract()
1025            .into_iter()
1026            .flat_map(|x| x.1)
1027            .collect::<Vec<_>>();
1028        assert_eq!(actual, expected);
1029    }
1030
1031    #[mz_ore::test]
1032    fn ct_sink_state() {
1033        #[track_caller]
1034        fn assert_noop(state: &mut super::SinkState<&'static str, Timestamp>) {
1035            if let Some(ret) = state.process() {
1036                panic!("should be nothing to write: {:?}", ret);
1037            }
1038        }
1039
1040        #[track_caller]
1041        fn assert_write(
1042            state: &mut super::SinkState<&'static str, Timestamp>,
1043            expected_upper: u64,
1044            expected_append: &[&str],
1045        ) {
1046            let (new_upper, to_append) = state.process().expect("should be something to write");
1047            assert_eq!(
1048                new_upper,
1049                Antichain::from_elem(Timestamp::new(expected_upper))
1050            );
1051            let to_append = to_append
1052                .into_iter()
1053                .map(|((k, ()), _ts, _diff)| *k)
1054                .collect::<Vec<_>>();
1055            assert_eq!(to_append, expected_append);
1056        }
1057
1058        let mut s = super::SinkState::new();
1059
1060        // Nothing to do at the initial state.
1061        assert_noop(&mut s);
1062
1063        // Getting data to append is not enough to do anything yet.
1064        s.to_append.push((("a", 1.into()), Diff::ONE));
1065        s.to_append.push((("b", 1.into()), Diff::ONE));
1066        assert_noop(&mut s);
1067
1068        // Knowing that this is the only data we'll get for that timestamp is
1069        // still not enough.
1070        s.to_append_progress = Antichain::from_elem(2.into());
1071        assert_noop(&mut s);
1072
1073        // Even knowing that we got input at that time is not quite enough yet
1074        // (we could be getting these out of order).
1075        s.append_times.insert(1.into());
1076        assert_noop(&mut s);
1077
1078        // Indeed, it did come out of order. Also note that this checks the ==
1079        // case for time vs progress.
1080        s.append_times.insert(0.into());
1081        assert_noop(&mut s);
1082
1083        // Okay, now we know that we've seen all the times we got input up to 3.
1084        // This is enough to allow the empty write of `[0,1)`.
1085        s.append_times_progress = Antichain::from_elem(3.into());
1086        assert_write(&mut s, 1, &[]);
1087
1088        // That succeeded, now we can write the data at 1.
1089        s.output_progress = Antichain::from_elem(1.into());
1090        assert_write(&mut s, 2, &["a", "b"]);
1091
1092        // That succeeded, now we know about some empty time.
1093        s.output_progress = Antichain::from_elem(2.into());
1094        assert_write(&mut s, 3, &[]);
1095
1096        // That succeeded, now nothing to do.
1097        s.output_progress = Antichain::from_elem(3.into());
1098        assert_noop(&mut s);
1099
1100        // Find out about a new time to write at. Even without the data, we can
1101        // do an empty write up to that time.
1102        s.append_times.insert(5.into());
1103        s.append_times_progress = Antichain::from_elem(6.into());
1104        assert_write(&mut s, 5, &[]);
1105
1106        // That succeeded, now nothing to do again.
1107        s.output_progress = Antichain::from_elem(5.into());
1108
1109        // Retract one of the things currently in the collection and add a new
1110        // thing, to verify the consolidate.
1111        s.to_append.push((("a", 5.into()), Diff::MINUS_ONE));
1112        s.to_append.push((("c", 5.into()), Diff::ONE));
1113        s.to_append_progress = Antichain::from_elem(6.into());
1114        assert_write(&mut s, 6, &["b", "c"]);
1115    }
1116}