mz_storage/source/
source_reader_pipeline.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to the creation of dataflow raw sources.
11//!
12//! Raw sources are differential dataflow  collections of data directly produced by the
13//! upstream service. The main export of this module is [`create_raw_source`],
14//! which turns [`RawSourceCreationConfig`]s into the aforementioned streams.
15//!
16//! The full source, which is the _differential_ stream that represents the actual object
17//! created by a `CREATE SOURCE` statement, is created by composing
18//! [`create_raw_source`] with
19//! decoding, `SourceEnvelope` rendering, and more.
20//!
21
22// https://github.com/tokio-rs/prost/issues/237
23#![allow(missing_docs)]
24#![allow(clippy::needless_borrow)]
25
26use std::cell::RefCell;
27use std::collections::{BTreeMap, VecDeque};
28use std::convert::Infallible;
29use std::hash::{Hash, Hasher};
30use std::rc::Rc;
31use std::sync::Arc;
32use std::time::Duration;
33
34use differential_dataflow::lattice::Lattice;
35use differential_dataflow::{AsCollection, Hashable, VecCollection};
36use futures::stream::StreamExt;
37use mz_ore::cast::CastFrom;
38use mz_ore::collections::CollectionExt;
39use mz_ore::now::NowFn;
40use mz_persist_client::cache::PersistClientCache;
41use mz_repr::{Diff, GlobalId, RelationDesc, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::controller::CollectionMetadata;
44use mz_storage_types::dyncfgs;
45use mz_storage_types::errors::DataflowError;
46use mz_storage_types::sources::{SourceConnection, SourceExport, SourceTimestamp};
47use mz_timely_util::antichain::AntichainExt;
48use mz_timely_util::builder_async::{
49    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
50};
51use mz_timely_util::capture::PusherCapture;
52use mz_timely_util::operator::ConcatenateFlatten;
53use mz_timely_util::reclock::reclock;
54use timely::PartialOrder;
55use timely::container::CapacityContainerBuilder;
56use timely::dataflow::channels::pact::Pipeline;
57use timely::dataflow::operators::capture::capture::Capture;
58use timely::dataflow::operators::capture::{Event, EventPusher};
59use timely::dataflow::operators::core::Map as _;
60use timely::dataflow::operators::generic::OutputBuilder;
61use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
62use timely::dataflow::operators::{Broadcast, CapabilitySet, Inspect, Leave};
63use timely::dataflow::scopes::Child;
64use timely::dataflow::{Scope, Stream};
65use timely::order::TotalOrder;
66use timely::progress::frontier::MutableAntichain;
67use timely::progress::{Antichain, Timestamp};
68use tokio::sync::{Semaphore, watch};
69use tokio_stream::wrappers::WatchStream;
70use tracing::trace;
71
72use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate};
73use crate::metrics::StorageMetrics;
74use crate::metrics::source::SourceMetrics;
75use crate::source::probe;
76use crate::source::reclock::ReclockOperator;
77use crate::source::types::{Probe, SourceMessage, SourceOutput, SourceRender, StackedCollection};
78use crate::statistics::SourceStatistics;
79
80/// Shared configuration information for all source types. This is used in the
81/// `create_raw_source` functions, which produce raw sources.
82#[derive(Clone)]
83pub struct RawSourceCreationConfig {
84    /// The name to attach to the underlying timely operator.
85    pub name: String,
86    /// The ID of this instantiation of this source.
87    pub id: GlobalId,
88    /// The details of the outputs from this ingestion.
89    pub source_exports: BTreeMap<GlobalId, SourceExport<CollectionMetadata>>,
90    /// The ID of the worker on which this operator is executing
91    pub worker_id: usize,
92    /// The total count of workers
93    pub worker_count: usize,
94    /// Granularity with which timestamps should be closed (and capabilities
95    /// downgraded).
96    pub timestamp_interval: Duration,
97    /// The function to return a now time.
98    pub now_fn: NowFn,
99    /// The metrics & registry that each source instantiates.
100    pub metrics: StorageMetrics,
101    /// The upper frontier this source should resume ingestion at
102    pub as_of: Antichain<mz_repr::Timestamp>,
103    /// For each source export, the upper frontier this source should resume ingestion at in the
104    /// system time domain.
105    pub resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
106    /// For each source export, the upper frontier this source should resume ingestion at in the
107    /// source time domain.
108    ///
109    /// Since every source has a different timestamp type we carry the timestamps of this frontier
110    /// in an encoded `Vec<Row>` form which will get decoded once we reach the connection
111    /// specialized functions.
112    pub source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
113    /// A handle to the persist client cache
114    pub persist_clients: Arc<PersistClientCache>,
115    /// Collection of `SourceStatistics` for source and exports to share updates.
116    pub statistics: BTreeMap<GlobalId, SourceStatistics>,
117    /// Enables reporting the remap operator's write frontier.
118    pub shared_remap_upper: Rc<RefCell<Antichain<mz_repr::Timestamp>>>,
119    /// Configuration parameters, possibly from LaunchDarkly
120    pub config: StorageConfiguration,
121    /// The ID of this source remap/progress collection.
122    pub remap_collection_id: GlobalId,
123    /// The storage metadata for the remap/progress collection
124    pub remap_metadata: CollectionMetadata,
125    // A semaphore that should be acquired by async operators in order to signal that upstream
126    // operators should slow down.
127    pub busy_signal: Arc<Semaphore>,
128}
129
130/// Reduced version of [`RawSourceCreationConfig`] that is used when rendering
131/// each export.
132#[derive(Clone)]
133pub struct SourceExportCreationConfig {
134    /// The ID of this instantiation of this source.
135    pub id: GlobalId,
136    /// The ID of the worker on which this operator is executing
137    pub worker_id: usize,
138    /// The metrics & registry that each source instantiates.
139    pub metrics: StorageMetrics,
140    /// Place to share statistics updates with storage state.
141    pub source_statistics: SourceStatistics,
142}
143
144impl RawSourceCreationConfig {
145    /// Returns the worker id responsible for handling the given partition.
146    pub fn responsible_worker<P: Hash>(&self, partition: P) -> usize {
147        let mut h = std::hash::DefaultHasher::default();
148        (self.id, partition).hash(&mut h);
149        let key = usize::cast_from(h.finish());
150        key % self.worker_count
151    }
152
153    /// Returns true if this worker is responsible for handling the given partition.
154    pub fn responsible_for<P: Hash>(&self, partition: P) -> bool {
155        self.responsible_worker(partition) == self.worker_id
156    }
157}
158
159/// Creates a source dataflow operator graph from a source connection. The type of SourceConnection
160/// determines the type of connection that _should_ be created.
161///
162/// This is also the place where _reclocking_
163/// (<https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210714_reclocking.md>)
164/// happens.
165///
166/// See the [`source` module docs](crate::source) for more details about how raw
167/// sources are used.
168///
169/// The `resume_stream` parameter will contain frontier updates whenever times are durably
170/// recorded which allows the ingestion to release upstream resources.
171pub fn create_raw_source<'g, G: Scope<Timestamp = ()>, C>(
172    scope: &mut Child<'g, G, mz_repr::Timestamp>,
173    storage_state: &crate::storage_state::StorageState,
174    committed_upper: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
175    config: &RawSourceCreationConfig,
176    source_connection: C,
177    start_signal: impl std::future::Future<Output = ()> + 'static,
178) -> (
179    BTreeMap<
180        GlobalId,
181        VecCollection<
182            Child<'g, G, mz_repr::Timestamp>,
183            Result<SourceOutput<C::Time>, DataflowError>,
184            Diff,
185        >,
186    >,
187    Stream<G, HealthStatusMessage>,
188    Vec<PressOnDropButton>,
189)
190where
191    C: SourceConnection + SourceRender + Clone + 'static,
192{
193    let worker_id = config.worker_id;
194    let id = config.id;
195
196    let mut tokens = vec![];
197
198    let (ingested_upper_tx, ingested_upper_rx) =
199        watch::channel(MutableAntichain::new_bottom(C::Time::minimum()));
200    let (probed_upper_tx, probed_upper_rx) = watch::channel(None);
201
202    let source_metrics = Arc::new(config.metrics.get_source_metrics(id, worker_id));
203
204    let timestamp_desc = source_connection.timestamp_desc();
205
206    let (remap_collection, remap_token) = remap_operator(
207        scope,
208        storage_state,
209        config.clone(),
210        probed_upper_rx,
211        ingested_upper_rx,
212        timestamp_desc,
213    );
214    // Need to broadcast the remap changes to all workers.
215    let remap_collection = remap_collection.inner.broadcast().as_collection();
216    tokens.push(remap_token);
217
218    let committed_upper = reclock_committed_upper(
219        &remap_collection,
220        config.as_of.clone(),
221        committed_upper,
222        id,
223        Arc::clone(&source_metrics),
224    );
225
226    let mut reclocked_exports = BTreeMap::new();
227
228    let reclocked_exports2 = &mut reclocked_exports;
229    let (health, source_tokens) = scope.parent.scoped("SourceTimeDomain", move |scope| {
230        let (exports, source_upper, health_stream, source_tokens) = source_render_operator(
231            scope,
232            config,
233            source_connection,
234            probed_upper_tx,
235            committed_upper,
236            start_signal,
237        );
238
239        for (id, export) in exports {
240            let (reclock_pusher, reclocked) = reclock(&remap_collection, config.as_of.clone());
241            export
242                .inner
243                .map(move |(result, from_time, diff)| {
244                    let result = match result {
245                        Ok(msg) => Ok(SourceOutput {
246                            key: msg.key.clone(),
247                            value: msg.value.clone(),
248                            metadata: msg.metadata.clone(),
249                            from_time: from_time.clone(),
250                        }),
251                        Err(err) => Err(err.clone()),
252                    };
253                    (result, from_time.clone(), *diff)
254                })
255                .capture_into(PusherCapture(reclock_pusher));
256            reclocked_exports2.insert(id, reclocked);
257        }
258
259        source_upper.capture_into(FrontierCapture(ingested_upper_tx));
260
261        (health_stream.leave(), source_tokens)
262    });
263
264    tokens.extend(source_tokens);
265
266    (reclocked_exports, health, tokens)
267}
268
269pub struct FrontierCapture<T>(watch::Sender<MutableAntichain<T>>);
270
271impl<T: Timestamp> EventPusher<T, Vec<Infallible>> for FrontierCapture<T> {
272    fn push(&mut self, event: Event<T, Vec<Infallible>>) {
273        match event {
274            Event::Progress(changes) => self.0.send_modify(|frontier| {
275                frontier.update_iter(changes);
276            }),
277            Event::Messages(_, _) => unreachable!(),
278        }
279    }
280}
281
282/// Renders the source dataflow fragment from the given [SourceConnection]. This returns a
283/// collection timestamped with the source specific timestamp type. Also returns a second stream
284/// that can be used to learn about the `source_upper` that all the source reader instances know
285/// about. This second stream will be used by the `remap_operator` to mint new timestamp bindings
286/// into the remap shard.
287fn source_render_operator<G, C>(
288    scope: &mut G,
289    config: &RawSourceCreationConfig,
290    source_connection: C,
291    probed_upper_tx: watch::Sender<Option<Probe<C::Time>>>,
292    resume_uppers: impl futures::Stream<Item = Antichain<C::Time>> + 'static,
293    start_signal: impl std::future::Future<Output = ()> + 'static,
294) -> (
295    BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
296    Stream<G, Infallible>,
297    Stream<G, HealthStatusMessage>,
298    Vec<PressOnDropButton>,
299)
300where
301    G: Scope<Timestamp = C::Time>,
302    C: SourceRender + 'static,
303{
304    let source_id = config.id;
305    let worker_id = config.worker_id;
306    let now_fn = config.now_fn.clone();
307    let timestamp_interval = config.timestamp_interval;
308
309    let resume_uppers = resume_uppers.inspect(move |upper| {
310        let upper = upper.pretty();
311        trace!(%upper, "timely-{worker_id} source({source_id}) received resume upper");
312    });
313
314    let (exports, progress, health, probes, tokens) =
315        source_connection.render(scope, config, resume_uppers, start_signal);
316
317    let mut export_collections = BTreeMap::new();
318
319    let source_metrics = config.metrics.get_source_metrics(config.id, worker_id);
320
321    // Compute the overall resume upper to report for the ingestion
322    let resume_upper = Antichain::from_iter(
323        config
324            .resume_uppers
325            .values()
326            .flat_map(|f| f.iter().cloned()),
327    );
328    source_metrics
329        .resume_upper
330        .set(mz_persist_client::metrics::encode_ts_metric(&resume_upper));
331
332    let mut health_streams = vec![];
333
334    for (id, export) in exports {
335        let name = format!("SourceGenericStats({})", id);
336        let mut builder = OperatorBuilderRc::new(name, scope.clone());
337
338        let (health_output, derived_health) = builder.new_output();
339        let mut health_output =
340            OutputBuilder::<_, CapacityContainerBuilder<_>>::from(health_output);
341        health_streams.push(derived_health);
342
343        let (output, new_export) = builder.new_output();
344        let mut output = OutputBuilder::<_, CapacityContainerBuilder<_>>::from(output);
345
346        let mut input = builder.new_input(&export.inner, Pipeline);
347        export_collections.insert(id, new_export.as_collection());
348
349        let bytes_read_counter = config.metrics.source_defs.bytes_read.clone();
350        let source_statistics = config
351            .statistics
352            .get(&id)
353            .expect("statistics initialized")
354            .clone();
355
356        builder.build(move |mut caps| {
357            let mut health_cap = Some(caps.remove(0));
358
359            move |frontiers| {
360                let mut last_status = None;
361                let mut health_output = health_output.activate();
362
363                if frontiers[0].is_empty() {
364                    health_cap = None;
365                    return;
366                }
367                let health_cap = health_cap.as_mut().unwrap();
368
369                while let Some((cap, data)) = input.next() {
370                    for (message, _, _) in data.iter() {
371                        match message {
372                            Ok(message) => {
373                                source_statistics.inc_messages_received_by(1);
374                                let key_len = u64::cast_from(message.key.byte_len());
375                                let value_len = u64::cast_from(message.value.byte_len());
376                                bytes_read_counter.inc_by(key_len + value_len);
377                                source_statistics.inc_bytes_received_by(key_len + value_len);
378                            }
379                            Err(error) => {
380                                // All errors coming into the data stream are definite.
381                                // Downstream consumers of this data will preserve this
382                                // status.
383                                let update = HealthStatusUpdate::stalled(
384                                    error.to_string(),
385                                    Some(
386                                        "retracting the errored value may resume the source"
387                                            .to_string(),
388                                    ),
389                                );
390                                let status = HealthStatusMessage {
391                                    id: Some(id),
392                                    namespace: C::STATUS_NAMESPACE.clone(),
393                                    update,
394                                };
395                                if last_status.as_ref() != Some(&status) {
396                                    last_status = Some(status.clone());
397                                    health_output.session(&health_cap).give(status);
398                                }
399                            }
400                        }
401                    }
402                    let mut output = output.activate();
403                    output.session(&cap).give_container(data);
404                }
405            }
406        });
407    }
408
409    let probe_stream = match probes {
410        Some(stream) => stream,
411        None => synthesize_probes(source_id, &progress, timestamp_interval, now_fn),
412    };
413
414    // Broadcasting does more work than necessary, which would be to exchange the probes to the
415    // worker that will be the one minting the bindings but we'd have to thread this information
416    // through and couple the two functions enough that it's not worth the optimization (I think).
417    probe_stream.broadcast().inspect(move |probe| {
418        // We don't care if the receiver is gone
419        let _ = probed_upper_tx.send(Some(probe.clone()));
420    });
421
422    (
423        export_collections,
424        progress,
425        health.concatenate_flatten::<_, CapacityContainerBuilder<_>>(health_streams),
426        tokens,
427    )
428}
429
430/// Mints new contents for the remap shard based on summaries about the source
431/// upper it receives from the raw reader operators.
432///
433/// Only one worker will be active and write to the remap shard. All source
434/// upper summaries will be exchanged to it.
435fn remap_operator<G, FromTime>(
436    scope: &G,
437    storage_state: &crate::storage_state::StorageState,
438    config: RawSourceCreationConfig,
439    mut probed_upper: watch::Receiver<Option<Probe<FromTime>>>,
440    mut ingested_upper: watch::Receiver<MutableAntichain<FromTime>>,
441    remap_relation_desc: RelationDesc,
442) -> (VecCollection<G, FromTime, Diff>, PressOnDropButton)
443where
444    G: Scope<Timestamp = mz_repr::Timestamp>,
445    FromTime: SourceTimestamp,
446{
447    let RawSourceCreationConfig {
448        name,
449        id,
450        source_exports: _,
451        worker_id,
452        worker_count,
453        timestamp_interval,
454        remap_metadata,
455        as_of,
456        resume_uppers: _,
457        source_resume_uppers: _,
458        metrics: _,
459        now_fn,
460        persist_clients,
461        statistics: _,
462        shared_remap_upper,
463        config: _,
464        remap_collection_id,
465        busy_signal: _,
466    } = config;
467
468    let read_only_rx = storage_state.read_only_rx.clone();
469    let error_handler = storage_state.error_handler("remap_operator", id);
470
471    let chosen_worker = usize::cast_from(id.hashed() % u64::cast_from(worker_count));
472    let active_worker = chosen_worker == worker_id;
473
474    let operator_name = format!("remap({})", id);
475    let mut remap_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
476    let (remap_output, remap_stream) = remap_op.new_output::<CapacityContainerBuilder<_>>();
477
478    let button = remap_op.build(move |capabilities| async move {
479        if !active_worker {
480            // This worker is not writing, so make sure it's "taken out" of the
481            // calculation by advancing to the empty frontier.
482            shared_remap_upper.borrow_mut().clear();
483            return;
484        }
485
486        let mut cap_set = CapabilitySet::from_elem(capabilities.into_element());
487
488        let remap_handle = crate::source::reclock::compat::PersistHandle::<FromTime, _>::new(
489            Arc::clone(&persist_clients),
490            read_only_rx,
491            remap_metadata.clone(),
492            as_of.clone(),
493            shared_remap_upper,
494            id,
495            "remap",
496            worker_id,
497            worker_count,
498            remap_relation_desc,
499            remap_collection_id,
500        )
501        .await;
502
503        let remap_handle = match remap_handle {
504            Ok(handle) => handle,
505            Err(e) => {
506                error_handler
507                    .report_and_stop(
508                        e.context(format!("Failed to create remap handle for source {name}")),
509                    )
510                    .await
511            }
512        };
513
514        let (mut timestamper, mut initial_batch) = ReclockOperator::new(remap_handle).await;
515
516        // Emit initial snapshot of the remap_shard, bootstrapping
517        // downstream reclock operators.
518        trace!(
519            "timely-{worker_id} remap({id}) emitting remap snapshot: trace_updates={:?}",
520            &initial_batch.updates
521        );
522
523        let cap = cap_set.delayed(cap_set.first().unwrap());
524        remap_output.give_container(&cap, &mut initial_batch.updates);
525        drop(cap);
526        cap_set.downgrade(initial_batch.upper);
527
528        let mut ticker = tokio::time::interval(timestamp_interval);
529        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
530
531        let mut prev_probe_ts: Option<mz_repr::Timestamp> = None;
532        let timestamp_interval_ms: u64 = timestamp_interval
533            .as_millis()
534            .try_into()
535            .expect("huge duration");
536
537        while !cap_set.is_empty() {
538            // Check the reclocking strategy in every iteration, to make it possible to change it
539            // without restarting the source pipeline.
540            let reclock_to_latest =
541                dyncfgs::STORAGE_RECLOCK_TO_LATEST.get(&config.config.config_set());
542
543            // If we are reclocking to the latest offset then we only mint bindings after a
544            // successful probe. Otherwise we fall back to the earlier behavior where we just
545            // record the ingested frontier.
546            let mut new_probe = None;
547            if reclock_to_latest {
548                new_probe = probed_upper
549                    .wait_for(|new_probe| match (prev_probe_ts, new_probe) {
550                        (None, Some(_)) => true,
551                        (Some(prev_ts), Some(new)) => prev_ts < new.probe_ts,
552                        _ => false,
553                    })
554                    .await
555                    .map(|probe| (*probe).clone())
556                    .unwrap_or_else(|_| {
557                        Some(Probe {
558                            probe_ts: now_fn().into(),
559                            upstream_frontier: Antichain::new(),
560                        })
561                    });
562            } else {
563                while prev_probe_ts >= new_probe.as_ref().map(|p| p.probe_ts) {
564                    ticker.tick().await;
565                    // We only proceed if the source upper frontier is not the minimum frontier. This
566                    // makes it so the first binding corresponds to the snapshot of the source, and
567                    // because the first binding always maps to the minimum *target* frontier we
568                    // guarantee that the source will never appear empty.
569                    let upstream_frontier = ingested_upper
570                        .wait_for(|f| *f.frontier() != [FromTime::minimum()])
571                        .await
572                        .unwrap()
573                        .frontier()
574                        .to_owned();
575
576                    let now = (now_fn)();
577                    let mut probe_ts = now - now % timestamp_interval_ms;
578                    if (now % timestamp_interval_ms) != 0 {
579                        probe_ts += timestamp_interval_ms;
580                    }
581                    new_probe = Some(Probe {
582                        probe_ts: probe_ts.into(),
583                        upstream_frontier,
584                    });
585                }
586            };
587
588            let probe = new_probe.expect("known to be Some");
589            prev_probe_ts = Some(probe.probe_ts);
590
591            let binding_ts = probe.probe_ts;
592            let cur_source_upper = probe.upstream_frontier;
593
594            let new_into_upper = Antichain::from_elem(binding_ts.step_forward());
595
596            let mut remap_trace_batch = timestamper
597                .mint(binding_ts, new_into_upper, cur_source_upper.borrow())
598                .await;
599
600            trace!(
601                "timely-{worker_id} remap({id}) minted new bindings: \
602                updates={:?} \
603                source_upper={} \
604                trace_upper={}",
605                &remap_trace_batch.updates,
606                cur_source_upper.pretty(),
607                remap_trace_batch.upper.pretty()
608            );
609
610            let cap = cap_set.delayed(cap_set.first().unwrap());
611            remap_output.give_container(&cap, &mut remap_trace_batch.updates);
612            cap_set.downgrade(remap_trace_batch.upper);
613        }
614    });
615
616    (remap_stream.as_collection(), button.press_on_drop())
617}
618
619/// Reclocks an `IntoTime` frontier stream into a `FromTime` frontier stream. This is used for the
620/// virtual (through persist) feedback edge so that we convert the `IntoTime` resumption frontier
621/// into the `FromTime` frontier that is used with the source's `OffsetCommiter`.
622fn reclock_committed_upper<G, FromTime>(
623    bindings: &VecCollection<G, FromTime, Diff>,
624    as_of: Antichain<G::Timestamp>,
625    committed_upper: &Stream<G, ()>,
626    id: GlobalId,
627    metrics: Arc<SourceMetrics>,
628) -> impl futures::stream::Stream<Item = Antichain<FromTime>> + 'static
629where
630    G: Scope,
631    G::Timestamp: Lattice + TotalOrder,
632    FromTime: SourceTimestamp,
633{
634    let (tx, rx) = watch::channel(Antichain::from_elem(FromTime::minimum()));
635    let scope = bindings.scope().clone();
636
637    let name = format!("ReclockCommitUpper({id})");
638    let mut builder = OperatorBuilderRc::new(name, scope);
639
640    let mut bindings = builder.new_input(&bindings.inner, Pipeline);
641    let _ = builder.new_input(committed_upper, Pipeline);
642
643    builder.build(move |_| {
644        // Remap bindings beyond the upper
645        use timely::progress::ChangeBatch;
646        let mut accepted_times: ChangeBatch<(G::Timestamp, FromTime)> = ChangeBatch::new();
647        // The upper frontier of the bindings
648        let mut upper = Antichain::from_elem(Timestamp::minimum());
649        // Remap bindings not beyond upper
650        let mut ready_times = VecDeque::new();
651        let mut source_upper = MutableAntichain::new();
652
653        move |frontiers| {
654            // Accept new bindings
655            while let Some((_, data)) = bindings.next() {
656                accepted_times.extend(data.drain(..).map(|(from, mut into, diff)| {
657                    into.advance_by(as_of.borrow());
658                    ((into, from), diff.into_inner())
659                }));
660            }
661            // Extract ready bindings
662            let new_upper = frontiers[0].frontier();
663            if PartialOrder::less_than(&upper.borrow(), &new_upper) {
664                upper = new_upper.to_owned();
665                // Drain consolidated accepted times not greater or equal to `upper` into `ready_times`.
666                // Retain accepted times greater or equal to `upper` in
667                let mut pending_times = std::mem::take(&mut accepted_times).into_inner();
668                // These should already be sorted, as part of `.into_inner()`, but sort defensively in case.
669                pending_times.sort_unstable_by(|a, b| a.0.cmp(&b.0));
670                for ((into, from), diff) in pending_times.drain(..) {
671                    if !upper.less_equal(&into) {
672                        ready_times.push_back((from, into, diff));
673                    } else {
674                        accepted_times.update((into, from), diff);
675                    }
676                }
677            }
678
679            // The received times only accumulate correctly for times beyond the as_of.
680            if as_of.iter().all(|t| !upper.less_equal(t)) {
681                let committed_upper = frontiers[1].frontier();
682                if as_of.iter().all(|t| !committed_upper.less_equal(t)) {
683                    // We have committed this source up until `committed_upper`. Because we have
684                    // required that IntoTime is a total order this will be either a singleton set
685                    // or the empty set.
686                    //
687                    // * Case 1: committed_upper is the empty set {}
688                    //
689                    // There won't be any future IntoTime timestamps that we will produce so we can
690                    // provide feedback to the source that it can forget about everything.
691                    //
692                    // * Case 2: committed_upper is a singleton set {t_next}
693                    //
694                    // We know that t_next cannot be the minimum timestamp because we have required
695                    // that all times of the as_of frontier are not beyond some time of
696                    // committed_upper. Therefore t_next has a predecessor timestamp t_prev.
697                    //
698                    // We don't know what remap[t_next] is yet, but we do know that we will have to
699                    // emit all source updates `u: remap[t_prev] <= time(u) <= remap[t_next]`.
700                    // Since `t_next` is the minimum undetermined timestamp and we know that t1 <=
701                    // t2 => remap[t1] <= remap[t2] we know that we will never need any source
702                    // updates `u: !(remap[t_prev] <= time(u))`.
703                    //
704                    // Therefore we can provide feedback to the source that it can forget about any
705                    // updates that are not beyond remap[t_prev].
706                    //
707                    // Important: We are *NOT* saying that the source can *compact* its data using
708                    // remap[t_prev] as the compaction frontier. If the source were to compact its
709                    // collection to remap[t_prev] we would lose the distinction between updates
710                    // that happened *at* t_prev versus updates that happened ealier and were
711                    // advanced to t_prev. If the source needs to communicate a compaction frontier
712                    // upstream then the specific source implementation needs to further adjust the
713                    // reclocked committed_upper and calculate a suitable compaction frontier in
714                    // the same way we adjust uppers of collections in the controller with the
715                    // LagWriteFrontier read policy.
716                    //
717                    // == What about IntoTime times that are general lattices?
718                    //
719                    // Reversing the upper for a general lattice is much more involved but it boils
720                    // down to computing the meet of all the times in `committed_upper` and then
721                    // treating that as `t_next` (I think). Until we need to deal with that though
722                    // we can just assume TotalOrder.
723                    let reclocked_upper = match committed_upper.as_option() {
724                        Some(t_next) => {
725                            let idx = ready_times.partition_point(|(_, t, _)| t < t_next);
726                            let updates = ready_times
727                                .drain(0..idx)
728                                .map(|(from_time, _, diff)| (from_time, diff));
729                            source_upper.update_iter(updates);
730                            // At this point source_upper contains all updates that are less than
731                            // t_next, which is equal to remap[t_prev]
732                            source_upper.frontier().to_owned()
733                        }
734                        None => Antichain::new(),
735                    };
736                    tx.send_replace(reclocked_upper);
737                }
738            }
739
740            metrics
741                .commit_upper_accepted_times
742                .set(u64::cast_from(accepted_times.len()));
743            metrics
744                .commit_upper_ready_times
745                .set(u64::cast_from(ready_times.len()));
746        }
747    });
748
749    WatchStream::from_changes(rx)
750}
751
752/// Synthesizes a probe stream that produces the frontier of the given progress stream at the given
753/// interval.
754///
755/// This is used as a fallback for sources that don't support probing the frontier of the upstream
756/// system.
757fn synthesize_probes<G>(
758    source_id: GlobalId,
759    progress: &Stream<G, Infallible>,
760    interval: Duration,
761    now_fn: NowFn,
762) -> Stream<G, Probe<G::Timestamp>>
763where
764    G: Scope,
765{
766    let scope = progress.scope();
767
768    let active_worker = usize::cast_from(source_id.hashed()) % scope.peers();
769    let is_active_worker = active_worker == scope.index();
770
771    let mut op = AsyncOperatorBuilder::new("synthesize_probes".into(), scope);
772    let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
773    let mut input = op.new_input_for(progress, Pipeline, &output);
774
775    op.build(|caps| async move {
776        if !is_active_worker {
777            return;
778        }
779
780        let [cap] = caps.try_into().expect("one capability per output");
781
782        let mut ticker = probe::Ticker::new(move || interval, now_fn.clone());
783
784        let minimum_frontier = Antichain::from_elem(Timestamp::minimum());
785        let mut frontier = minimum_frontier.clone();
786        loop {
787            tokio::select! {
788                event = input.next() => match event {
789                    Some(AsyncEvent::Progress(progress)) => frontier = progress,
790                    Some(AsyncEvent::Data(..)) => unreachable!(),
791                    None => break,
792                },
793                // We only report a probe if the source upper frontier is not the minimum frontier.
794                // This makes it so the first remap binding corresponds to the snapshot of the
795                // source, and because the first binding always maps to the minimum *target*
796                // frontier we guarantee that the source will never appear empty.
797                probe_ts = ticker.tick(), if frontier != minimum_frontier => {
798                    let probe = Probe {
799                        probe_ts,
800                        upstream_frontier: frontier.clone(),
801                    };
802                    output.give(&cap, probe);
803                }
804            }
805        }
806
807        let probe = Probe {
808            probe_ts: now_fn().into(),
809            upstream_frontier: Antichain::new(),
810        };
811        output.give(&cap, probe);
812    });
813
814    output_stream
815}