Skip to main content

mz_storage/source/
source_reader_pipeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to the creation of dataflow raw sources.
11//!
12//! Raw sources are differential dataflow  collections of data directly produced by the
13//! upstream service. The main export of this module is [`create_raw_source`],
14//! which turns [`RawSourceCreationConfig`]s into the aforementioned streams.
15//!
16//! The full source, which is the _differential_ stream that represents the actual object
17//! created by a `CREATE SOURCE` statement, is created by composing
18//! [`create_raw_source`] with
19//! decoding, `SourceEnvelope` rendering, and more.
20//!
21
22// https://github.com/tokio-rs/prost/issues/237
23#![allow(missing_docs)]
24#![allow(clippy::needless_borrow)]
25
26use std::cell::RefCell;
27use std::collections::{BTreeMap, VecDeque};
28use std::hash::{Hash, Hasher};
29use std::rc::Rc;
30use std::sync::Arc;
31use std::time::Duration;
32
33use differential_dataflow::lattice::Lattice;
34use differential_dataflow::{AsCollection, Hashable, VecCollection};
35use futures::stream::StreamExt;
36use mz_ore::cast::CastFrom;
37use mz_ore::collections::CollectionExt;
38use mz_ore::now::NowFn;
39use mz_persist_client::cache::PersistClientCache;
40use mz_repr::{Diff, GlobalId, RelationDesc, Row};
41use mz_storage_types::configuration::StorageConfiguration;
42use mz_storage_types::controller::CollectionMetadata;
43use mz_storage_types::errors::DataflowError;
44use mz_storage_types::sources::{SourceConnection, SourceExport, SourceTimestamp};
45use mz_timely_util::antichain::AntichainExt;
46use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
47use mz_timely_util::capture::PusherCapture;
48use mz_timely_util::operator::ConcatenateFlatten;
49use mz_timely_util::reclock::reclock;
50use timely::PartialOrder;
51use timely::container::CapacityContainerBuilder;
52use timely::dataflow::channels::pact::Pipeline;
53use timely::dataflow::operators::capture::capture::Capture;
54use timely::dataflow::operators::core::Map as _;
55use timely::dataflow::operators::generic::OutputBuilder;
56use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
57use timely::dataflow::operators::vec::Broadcast;
58use timely::dataflow::operators::{CapabilitySet, Inspect, Leave};
59use timely::dataflow::{Scope, StreamVec};
60use timely::order::TotalOrder;
61use timely::progress::frontier::MutableAntichain;
62use timely::progress::{Antichain, Timestamp};
63use tokio::sync::{Semaphore, watch};
64use tokio_stream::wrappers::WatchStream;
65use tracing::trace;
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate};
68use crate::metrics::StorageMetrics;
69use crate::metrics::source::SourceMetrics;
70use crate::source::reclock::ReclockOperator;
71use crate::source::types::{Probe, SourceMessage, SourceOutput, SourceRender, StackedCollection};
72use crate::statistics::SourceStatistics;
73
74/// Shared configuration information for all source types. This is used in the
75/// `create_raw_source` functions, which produce raw sources.
76#[derive(Clone)]
77pub struct RawSourceCreationConfig {
78    /// The name to attach to the underlying timely operator.
79    pub name: String,
80    /// The ID of this instantiation of this source.
81    pub id: GlobalId,
82    /// The details of the outputs from this ingestion.
83    pub source_exports: BTreeMap<GlobalId, SourceExport<CollectionMetadata>>,
84    /// The ID of the worker on which this operator is executing
85    pub worker_id: usize,
86    /// The total count of workers
87    pub worker_count: usize,
88    /// Granularity with which timestamps should be closed (and capabilities
89    /// downgraded).
90    pub timestamp_interval: Duration,
91    /// The function to return a now time.
92    pub now_fn: NowFn,
93    /// The metrics & registry that each source instantiates.
94    pub metrics: StorageMetrics,
95    /// The upper frontier this source should resume ingestion at
96    pub as_of: Antichain<mz_repr::Timestamp>,
97    /// For each source export, the upper frontier this source should resume ingestion at in the
98    /// system time domain.
99    pub resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
100    /// For each source export, the upper frontier this source should resume ingestion at in the
101    /// source time domain.
102    ///
103    /// Since every source has a different timestamp type we carry the timestamps of this frontier
104    /// in an encoded `Vec<Row>` form which will get decoded once we reach the connection
105    /// specialized functions.
106    pub source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
107    /// A handle to the persist client cache
108    pub persist_clients: Arc<PersistClientCache>,
109    /// Collection of `SourceStatistics` for source and exports to share updates.
110    pub statistics: BTreeMap<GlobalId, SourceStatistics>,
111    /// Enables reporting the remap operator's write frontier.
112    pub shared_remap_upper: Rc<RefCell<Antichain<mz_repr::Timestamp>>>,
113    /// Configuration parameters, possibly from LaunchDarkly
114    pub config: StorageConfiguration,
115    /// The ID of this source remap/progress collection.
116    pub remap_collection_id: GlobalId,
117    /// The storage metadata for the remap/progress collection
118    pub remap_metadata: CollectionMetadata,
119    // A semaphore that should be acquired by async operators in order to signal that upstream
120    // operators should slow down.
121    pub busy_signal: Arc<Semaphore>,
122}
123
124/// Reduced version of [`RawSourceCreationConfig`] that is used when rendering
125/// each export.
126#[derive(Clone)]
127pub struct SourceExportCreationConfig {
128    /// The ID of this instantiation of this source.
129    pub id: GlobalId,
130    /// The ID of the worker on which this operator is executing
131    pub worker_id: usize,
132    /// The metrics & registry that each source instantiates.
133    pub metrics: StorageMetrics,
134    /// Place to share statistics updates with storage state.
135    pub source_statistics: SourceStatistics,
136}
137
138impl RawSourceCreationConfig {
139    /// Returns the worker id responsible for handling the given partition.
140    pub fn responsible_worker<P: Hash>(&self, partition: P) -> usize {
141        let mut h = std::hash::DefaultHasher::default();
142        (self.id, partition).hash(&mut h);
143        let key = usize::cast_from(h.finish());
144        key % self.worker_count
145    }
146
147    /// Returns true if this worker is responsible for handling the given partition.
148    pub fn responsible_for<P: Hash>(&self, partition: P) -> bool {
149        self.responsible_worker(partition) == self.worker_id
150    }
151}
152
153/// Creates a source dataflow operator graph from a source connection. The type of SourceConnection
154/// determines the type of connection that _should_ be created.
155///
156/// This is also the place where _reclocking_
157/// (<https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210714_reclocking.md>)
158/// happens.
159///
160/// See the [`source` module docs](crate::source) for more details about how raw
161/// sources are used.
162///
163/// The `resume_stream` parameter will contain frontier updates whenever times are durably
164/// recorded which allows the ingestion to release upstream resources.
165pub fn create_raw_source<'scope, 'root, C>(
166    scope: Scope<'scope, mz_repr::Timestamp>,
167    root_scope: Scope<'root, ()>,
168    storage_state: &crate::storage_state::StorageState,
169    committed_upper: StreamVec<'scope, mz_repr::Timestamp, ()>,
170    config: &RawSourceCreationConfig,
171    source_connection: C,
172    start_signal: impl std::future::Future<Output = ()> + 'static,
173) -> (
174    BTreeMap<
175        GlobalId,
176        VecCollection<
177            'scope,
178            mz_repr::Timestamp,
179            Result<SourceOutput<C::Time>, DataflowError>,
180            Diff,
181        >,
182    >,
183    StreamVec<'root, (), HealthStatusMessage>,
184    Vec<PressOnDropButton>,
185)
186where
187    C: SourceConnection + SourceRender + Clone + 'static,
188{
189    let worker_id = config.worker_id;
190    let id = config.id;
191
192    let mut tokens = vec![];
193
194    let (probed_upper_tx, probed_upper_rx) = watch::channel(None);
195
196    let source_metrics = Arc::new(config.metrics.get_source_metrics(id, worker_id));
197
198    let timestamp_desc = source_connection.timestamp_desc();
199
200    let (remap_collection, remap_token) = remap_operator(
201        scope,
202        storage_state,
203        config.clone(),
204        probed_upper_rx,
205        timestamp_desc,
206    );
207    // Need to broadcast the remap changes to all workers.
208    let remap_collection = remap_collection.inner.broadcast().as_collection();
209    tokens.push(remap_token);
210
211    let committed_upper = reclock_committed_upper(
212        remap_collection.clone(),
213        config.as_of.clone(),
214        committed_upper,
215        id,
216        Arc::clone(&source_metrics),
217    );
218
219    let mut reclocked_exports = BTreeMap::new();
220
221    let reclocked_exports2 = &mut reclocked_exports;
222    let (health, source_tokens) = root_scope.scoped("SourceTimeDomain", move |scope| {
223        let (exports, health_stream, source_tokens) = source_render_operator(
224            scope,
225            config,
226            source_connection,
227            probed_upper_tx,
228            committed_upper,
229            start_signal,
230        );
231
232        for (id, export) in exports {
233            let (reclock_pusher, reclocked) =
234                reclock(remap_collection.clone(), config.as_of.clone());
235            export
236                .inner
237                .map(move |(result, from_time, diff)| {
238                    let result = match result {
239                        Ok(msg) => Ok(SourceOutput {
240                            key: msg.key.clone(),
241                            value: msg.value.clone(),
242                            metadata: msg.metadata.clone(),
243                            from_time: from_time.clone(),
244                        }),
245                        Err(err) => Err(err.clone()),
246                    };
247                    (result, from_time.clone(), *diff)
248                })
249                .capture_into(PusherCapture(reclock_pusher));
250            reclocked_exports2.insert(id, reclocked);
251        }
252
253        (health_stream.leave(root_scope), source_tokens)
254    });
255
256    tokens.extend(source_tokens);
257
258    (reclocked_exports, health, tokens)
259}
260
261/// Renders the source dataflow fragment from the given [SourceConnection]. This returns a
262/// collection timestamped with the source specific timestamp type.
263fn source_render_operator<'scope, C>(
264    scope: Scope<'scope, C::Time>,
265    config: &RawSourceCreationConfig,
266    source_connection: C,
267    probed_upper_tx: watch::Sender<Option<Probe<C::Time>>>,
268    resume_uppers: impl futures::Stream<Item = Antichain<C::Time>> + 'static,
269    start_signal: impl std::future::Future<Output = ()> + 'static,
270) -> (
271    BTreeMap<GlobalId, StackedCollection<'scope, C::Time, Result<SourceMessage, DataflowError>>>,
272    StreamVec<'scope, C::Time, HealthStatusMessage>,
273    Vec<PressOnDropButton>,
274)
275where
276    C: SourceRender + 'static,
277{
278    let source_id = config.id;
279    let worker_id = config.worker_id;
280
281    let resume_uppers = resume_uppers.inspect(move |upper| {
282        let upper = upper.pretty();
283        trace!(%upper, "timely-{worker_id} source({source_id}) received resume upper");
284    });
285
286    let (exports, health, probe_stream, tokens) =
287        source_connection.render(scope, config, resume_uppers, start_signal);
288
289    let mut export_collections = BTreeMap::new();
290
291    let source_metrics = config.metrics.get_source_metrics(config.id, worker_id);
292
293    // Compute the overall resume upper to report for the ingestion
294    let resume_upper = Antichain::from_iter(
295        config
296            .resume_uppers
297            .values()
298            .flat_map(|f| f.iter().cloned()),
299    );
300    source_metrics
301        .resume_upper
302        .set(mz_persist_client::metrics::encode_ts_metric(&resume_upper));
303
304    let mut health_streams = vec![];
305
306    for (id, export) in exports {
307        let name = format!("SourceGenericStats({})", id);
308        let mut builder = OperatorBuilderRc::new(name, scope.clone());
309
310        let (health_output, derived_health) = builder.new_output();
311        let mut health_output =
312            OutputBuilder::<_, CapacityContainerBuilder<_>>::from(health_output);
313        health_streams.push(derived_health);
314
315        let (output, new_export) = builder.new_output();
316        let mut output = OutputBuilder::<_, CapacityContainerBuilder<_>>::from(output);
317
318        let mut input = builder.new_input(export.inner, Pipeline);
319        export_collections.insert(id, new_export.as_collection());
320
321        let bytes_read_counter = config.metrics.source_defs.bytes_read.clone();
322        let source_statistics = config
323            .statistics
324            .get(&id)
325            .expect("statistics initialized")
326            .clone();
327
328        builder.build(move |mut caps| {
329            let mut health_cap = Some(caps.remove(0));
330
331            move |frontiers| {
332                let mut last_status = None;
333                let mut health_output = health_output.activate();
334
335                if frontiers[0].is_empty() {
336                    health_cap = None;
337                    return;
338                }
339                let health_cap = health_cap.as_mut().unwrap();
340
341                input.for_each(|cap, data| {
342                    for (message, _, _) in data.iter() {
343                        match message {
344                            Ok(message) => {
345                                source_statistics.inc_messages_received_by(1);
346                                let key_len = u64::cast_from(message.key.byte_len());
347                                let value_len = u64::cast_from(message.value.byte_len());
348                                bytes_read_counter.inc_by(key_len + value_len);
349                                source_statistics.inc_bytes_received_by(key_len + value_len);
350                            }
351                            Err(error) => {
352                                // All errors coming into the data stream are definite.
353                                // Downstream consumers of this data will preserve this
354                                // status.
355                                let update = HealthStatusUpdate::stalled(
356                                    error.to_string(),
357                                    Some(
358                                        "retracting the errored value may resume the source"
359                                            .to_string(),
360                                    ),
361                                );
362                                let status = HealthStatusMessage {
363                                    id: Some(id),
364                                    namespace: C::STATUS_NAMESPACE.clone(),
365                                    update,
366                                };
367                                if last_status.as_ref() != Some(&status) {
368                                    last_status = Some(status.clone());
369                                    health_output.session(&health_cap).give(status);
370                                }
371                            }
372                        }
373                    }
374                    let mut output = output.activate();
375                    output.session(&cap).give_container(data);
376                });
377            }
378        });
379    }
380
381    // Broadcasting does more work than necessary, which would be to exchange the probes to the
382    // worker that will be the one minting the bindings but we'd have to thread this information
383    // through and couple the two functions enough that it's not worth the optimization (I think).
384    probe_stream.broadcast().inspect(move |probe| {
385        // We don't care if the receiver is gone
386        let _ = probed_upper_tx.send(Some(probe.clone()));
387    });
388
389    (
390        export_collections,
391        health.concatenate_flatten::<_, CapacityContainerBuilder<_>>(health_streams),
392        tokens,
393    )
394}
395
396/// Mints new contents for the remap shard based on summaries about the source
397/// upper it receives from the raw reader operators.
398///
399/// Only one worker will be active and write to the remap shard. All source
400/// upper summaries will be exchanged to it.
401fn remap_operator<'scope, FromTime>(
402    scope: Scope<'scope, mz_repr::Timestamp>,
403    storage_state: &crate::storage_state::StorageState,
404    config: RawSourceCreationConfig,
405    mut probed_upper: watch::Receiver<Option<Probe<FromTime>>>,
406    remap_relation_desc: RelationDesc,
407) -> (
408    VecCollection<'scope, mz_repr::Timestamp, FromTime, Diff>,
409    PressOnDropButton,
410)
411where
412    FromTime: SourceTimestamp,
413{
414    let RawSourceCreationConfig {
415        name,
416        id,
417        source_exports: _,
418        worker_id,
419        worker_count,
420        timestamp_interval: _,
421        remap_metadata,
422        as_of,
423        resume_uppers: _,
424        source_resume_uppers: _,
425        metrics: _,
426        now_fn,
427        persist_clients,
428        statistics: _,
429        shared_remap_upper,
430        config: _,
431        remap_collection_id,
432        busy_signal: _,
433    } = config;
434
435    let read_only_rx = storage_state.read_only_rx.clone();
436    let error_handler = storage_state.error_handler("remap_operator", id);
437
438    let chosen_worker = usize::cast_from(id.hashed() % u64::cast_from(worker_count));
439    let active_worker = chosen_worker == worker_id;
440
441    let operator_name = format!("remap({})", id);
442    let mut remap_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
443    let (remap_output, remap_stream) = remap_op.new_output::<CapacityContainerBuilder<_>>();
444
445    let button = remap_op.build(move |capabilities| async move {
446        if !active_worker {
447            // This worker is not writing, so make sure it's "taken out" of the
448            // calculation by advancing to the empty frontier.
449            shared_remap_upper.borrow_mut().clear();
450            return;
451        }
452
453        let mut cap_set = CapabilitySet::from_elem(capabilities.into_element());
454
455        let remap_handle = crate::source::reclock::compat::PersistHandle::<FromTime, _>::new(
456            Arc::clone(&persist_clients),
457            read_only_rx,
458            remap_metadata.clone(),
459            as_of.clone(),
460            shared_remap_upper,
461            id,
462            "remap",
463            worker_id,
464            worker_count,
465            remap_relation_desc,
466            remap_collection_id,
467        )
468        .await;
469
470        let remap_handle = match remap_handle {
471            Ok(handle) => handle,
472            Err(e) => {
473                error_handler
474                    .report_and_stop(
475                        e.context(format!("Failed to create remap handle for source {name}")),
476                    )
477                    .await
478            }
479        };
480
481        let (mut timestamper, mut initial_batch) = ReclockOperator::new(remap_handle).await;
482
483        // Emit initial snapshot of the remap_shard, bootstrapping
484        // downstream reclock operators.
485        trace!(
486            "timely-{worker_id} remap({id}) emitting remap snapshot: trace_updates={:?}",
487            &initial_batch.updates
488        );
489
490        let cap = cap_set.delayed(cap_set.first().unwrap());
491        remap_output.give_container(&cap, &mut initial_batch.updates);
492        drop(cap);
493        cap_set.downgrade(initial_batch.upper);
494
495        let mut prev_probe_ts: Option<mz_repr::Timestamp> = None;
496
497        while !cap_set.is_empty() {
498            // We only mint bindings after a successful probe.
499            let new_probe = probed_upper
500                .wait_for(|new_probe| match (prev_probe_ts, new_probe) {
501                    (None, Some(_)) => true,
502                    (Some(prev_ts), Some(new)) => prev_ts < new.probe_ts,
503                    _ => false,
504                })
505                .await
506                .map(|probe| (*probe).clone())
507                .unwrap_or_else(|_| {
508                    Some(Probe {
509                        probe_ts: now_fn().into(),
510                        upstream_frontier: Antichain::new(),
511                    })
512                });
513
514            let probe = new_probe.expect("known to be Some");
515            prev_probe_ts = Some(probe.probe_ts);
516
517            let binding_ts = probe.probe_ts;
518            let cur_source_upper = probe.upstream_frontier;
519
520            let new_into_upper = Antichain::from_elem(binding_ts.step_forward());
521
522            let mut remap_trace_batch = timestamper
523                .mint(binding_ts, new_into_upper, cur_source_upper.borrow())
524                .await;
525
526            trace!(
527                "timely-{worker_id} remap({id}) minted new bindings: \
528                updates={:?} \
529                source_upper={} \
530                trace_upper={}",
531                &remap_trace_batch.updates,
532                cur_source_upper.pretty(),
533                remap_trace_batch.upper.pretty()
534            );
535
536            let cap = cap_set.delayed(cap_set.first().unwrap());
537            remap_output.give_container(&cap, &mut remap_trace_batch.updates);
538            cap_set.downgrade(remap_trace_batch.upper);
539        }
540    });
541
542    (remap_stream.as_collection(), button.press_on_drop())
543}
544
545/// Reclocks an `IntoTime` frontier stream into a `FromTime` frontier stream. This is used for the
546/// virtual (through persist) feedback edge so that we convert the `IntoTime` resumption frontier
547/// into the `FromTime` frontier that is used with the source's `OffsetCommiter`.
548fn reclock_committed_upper<'scope, T, FromTime>(
549    bindings: VecCollection<'scope, T, FromTime, Diff>,
550    as_of: Antichain<T>,
551    committed_upper: StreamVec<'scope, T, ()>,
552    id: GlobalId,
553    metrics: Arc<SourceMetrics>,
554) -> impl futures::stream::Stream<Item = Antichain<FromTime>> + 'static
555where
556    T: Timestamp + Lattice + TotalOrder,
557    FromTime: SourceTimestamp,
558{
559    let (tx, rx) = watch::channel(Antichain::from_elem(FromTime::minimum()));
560    let scope = bindings.scope().clone();
561
562    let name = format!("ReclockCommitUpper({id})");
563    let mut builder = OperatorBuilderRc::new(name, scope);
564
565    let mut bindings = builder.new_input(bindings.inner.clone(), Pipeline);
566    let _ = builder.new_input(committed_upper.clone(), Pipeline);
567
568    builder.build(move |_| {
569        // Remap bindings beyond the upper
570        use timely::progress::ChangeBatch;
571        let mut accepted_times: ChangeBatch<(T, FromTime)> = ChangeBatch::new();
572        // The upper frontier of the bindings
573        let mut upper = Antichain::from_elem(Timestamp::minimum());
574        // Remap bindings not beyond upper
575        let mut ready_times = VecDeque::new();
576        let mut source_upper = MutableAntichain::new();
577
578        move |frontiers| {
579            // Accept new bindings
580            bindings.for_each(|_, data| {
581                accepted_times.extend(data.drain(..).map(|(from, mut into, diff)| {
582                    into.advance_by(as_of.borrow());
583                    ((into, from), diff.into_inner())
584                }));
585            });
586            // Extract ready bindings
587            let new_upper = frontiers[0].frontier();
588            if PartialOrder::less_than(&upper.borrow(), &new_upper) {
589                upper = new_upper.to_owned();
590                // Drain consolidated accepted times not greater or equal to `upper` into `ready_times`.
591                // Retain accepted times greater or equal to `upper` in
592                let mut pending_times = std::mem::take(&mut accepted_times).into_inner();
593                // These should already be sorted, as part of `.into_inner()`, but sort defensively in case.
594                pending_times.sort_unstable_by(|a, b| a.0.cmp(&b.0));
595                for ((into, from), diff) in pending_times.drain(..) {
596                    if !upper.less_equal(&into) {
597                        ready_times.push_back((from, into, diff));
598                    } else {
599                        accepted_times.update((into, from), diff);
600                    }
601                }
602            }
603
604            // The received times only accumulate correctly for times beyond the as_of.
605            if as_of.iter().all(|t| !upper.less_equal(t)) {
606                let committed_upper = frontiers[1].frontier();
607                if as_of.iter().all(|t| !committed_upper.less_equal(t)) {
608                    // We have committed this source up until `committed_upper`. Because we have
609                    // required that IntoTime is a total order this will be either a singleton set
610                    // or the empty set.
611                    //
612                    // * Case 1: committed_upper is the empty set {}
613                    //
614                    // There won't be any future IntoTime timestamps that we will produce so we can
615                    // provide feedback to the source that it can forget about everything.
616                    //
617                    // * Case 2: committed_upper is a singleton set {t_next}
618                    //
619                    // We know that t_next cannot be the minimum timestamp because we have required
620                    // that all times of the as_of frontier are not beyond some time of
621                    // committed_upper. Therefore t_next has a predecessor timestamp t_prev.
622                    //
623                    // We don't know what remap[t_next] is yet, but we do know that we will have to
624                    // emit all source updates `u: remap[t_prev] <= time(u) <= remap[t_next]`.
625                    // Since `t_next` is the minimum undetermined timestamp and we know that t1 <=
626                    // t2 => remap[t1] <= remap[t2] we know that we will never need any source
627                    // updates `u: !(remap[t_prev] <= time(u))`.
628                    //
629                    // Therefore we can provide feedback to the source that it can forget about any
630                    // updates that are not beyond remap[t_prev].
631                    //
632                    // Important: We are *NOT* saying that the source can *compact* its data using
633                    // remap[t_prev] as the compaction frontier. If the source were to compact its
634                    // collection to remap[t_prev] we would lose the distinction between updates
635                    // that happened *at* t_prev versus updates that happened ealier and were
636                    // advanced to t_prev. If the source needs to communicate a compaction frontier
637                    // upstream then the specific source implementation needs to further adjust the
638                    // reclocked committed_upper and calculate a suitable compaction frontier in
639                    // the same way we adjust uppers of collections in the controller with the
640                    // LagWriteFrontier read policy.
641                    //
642                    // == What about IntoTime times that are general lattices?
643                    //
644                    // Reversing the upper for a general lattice is much more involved but it boils
645                    // down to computing the meet of all the times in `committed_upper` and then
646                    // treating that as `t_next` (I think). Until we need to deal with that though
647                    // we can just assume TotalOrder.
648                    let reclocked_upper = match committed_upper.as_option() {
649                        Some(t_next) => {
650                            let idx = ready_times.partition_point(|(_, t, _)| t < t_next);
651                            let updates = ready_times
652                                .drain(0..idx)
653                                .map(|(from_time, _, diff)| (from_time, diff));
654                            source_upper.update_iter(updates);
655                            // At this point source_upper contains all updates that are less than
656                            // t_next, which is equal to remap[t_prev]
657                            source_upper.frontier().to_owned()
658                        }
659                        None => Antichain::new(),
660                    };
661                    tx.send_replace(reclocked_upper);
662                }
663            }
664
665            metrics
666                .commit_upper_accepted_times
667                .set(u64::cast_from(accepted_times.len()));
668            metrics
669                .commit_upper_ready_times
670                .set(u64::cast_from(ready_times.len()));
671        }
672    });
673
674    WatchStream::from_changes(rx)
675}