mz_compute/sink/
refresh.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
11use differential_dataflow::{AsCollection, Collection, Data};
12use mz_ore::soft_panic_or_log;
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{Diff, Timestamp};
15use timely::dataflow::Scope;
16use timely::dataflow::channels::pact::Pipeline;
17use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
18
19/// This is for REFRESH options on materialized views. It adds an operator that rounds up the
20/// timestamps of data and frontiers to the time of the next refresh. See
21/// `doc/developer/design/20231027_refresh_every_mv.md`.
22///
23/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR,
24/// because iteration numbers should disappear by the time the data gets to the Persist sink.)
25pub(crate) fn apply_refresh<G, D>(
26    coll: Collection<G, D, Diff>,
27    refresh_schedule: RefreshSchedule,
28) -> Collection<G, D, Diff>
29where
30    G: Scope<Timestamp = Timestamp>,
31    D: Data,
32{
33    // We need to disconnect the reachability graph and manage capabilities manually, because we'd
34    // like to round up frontiers as well as data: as soon as our input frontier passes a refresh
35    // time, we'll round it up to the next refresh time.
36    let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope());
37    let (mut output_buf, output_stream) = builder.new_output::<ConsolidatingContainerBuilder<_>>();
38    let mut input = builder.new_input_connection(&coll.inner, Pipeline, []);
39    builder.build(move |capabilities| {
40        // This capability directly controls this operator's output frontier (because we have
41        // disconnected the input above). We wrap it in an Option so we can drop it to advance to
42        // the empty output frontier when the last refresh is done. (We must be careful that we only
43        // ever emit output updates at times that are at or beyond this capability.)
44        let mut capability = capabilities.into_iter().next(); // (We have 1 one input.)
45        move |frontiers| {
46            let mut output_handle_core = output_buf.activate();
47            input.for_each(|input_cap, data| {
48                // Note that we can't use `input_cap` to get an output session because we might have
49                // advanced our output frontier already beyond the frontier of this capability.
50
51                // `capability` will be None if we are past the last refresh. We have made sure to
52                // not receive any data that is after the last refresh by setting the `until` of the
53                // dataflow to the last refresh.
54                let Some(capability) = capability.as_mut() else {
55                    soft_panic_or_log!(
56                        "should have a capability if we received data. input_cap: {:?}, frontier: {:?}",
57                        input_cap.time(),
58                        frontiers[0].frontier()
59                    );
60                    return;
61                };
62                let mut output_buf = output_handle_core.session_with_builder(&capability);
63
64                let mut cached_ts: Option<Timestamp> = None;
65                let mut cached_rounded_up_data_ts = None;
66                for (d, ts, r) in data.drain(..) {
67                    let rounded_up_data_ts = {
68                        // We cache the rounded up timestamp for the last seen timestamp,
69                        // because the rounding up has a non-negligible cost. Caching for
70                        // just the 1 last timestamp helps already, because in some common
71                        // cases, we'll be seeing a lot of identical timestamps, e.g.,
72                        // during a rehydration, or when we have much more than 1000 records
73                        // during a single second.
74                        if cached_ts != Some(ts) {
75                            cached_ts = Some(ts);
76                            cached_rounded_up_data_ts = refresh_schedule.round_up_timestamp(ts);
77                        }
78                        cached_rounded_up_data_ts
79                    };
80                    match rounded_up_data_ts {
81                        Some(rounded_up_ts) => {
82                            output_buf.give((d, rounded_up_ts, r));
83                        }
84                        None => {
85                            // This record is after the last refresh, which is not possible because
86                            // we set the dataflow `until` to the last refresh.
87                            soft_panic_or_log!("Received data after the last refresh");
88                        }
89                    }
90                }
91            });
92
93            // Round up the frontier.
94            // Note that `round_up_timestamp` is monotonic. This is needed to ensure that the
95            // timestamp (t) of any received data that has a larger timestamp than the original
96            // frontier (f) will get rounded up to a time that is at least at the rounded up
97            // frontier. In other words, monotonicity ensures that
98            // when `t >= f` then `round_up_timestamp(t) >= round_up_timestamp(f)`.
99            match frontiers[0].frontier().as_option() { // (We have only 1 input, so only 1 frontier.)
100                Some(ts) => {
101                    match refresh_schedule.round_up_timestamp(*ts) {
102                        Some(rounded_up_ts) => {
103                            capability
104                                .as_mut()
105                                .expect("capability must exist if frontier is <= last refresh")
106                                .downgrade(&rounded_up_ts);
107                        }
108                        None => {
109                            // We are past the last refresh. Drop the capability to signal that we
110                            // are done.
111                            capability = None;
112                            // We can only get here if we see the frontier advancing to a time after the last refresh,
113                            // but not empty. This is ok, because even though we set the `until` to the last refresh,
114                            // frontier advancements might still happen past the `until`.
115                        }
116                    }
117                }
118                None => {
119                    capability = None;
120                }
121            }
122        }
123    });
124
125    output_stream.as_collection()
126}