Skip to main content

mz_compute/sink/
refresh.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
11use differential_dataflow::{AsCollection, Data, VecCollection};
12use mz_ore::soft_panic_or_log;
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{Diff, Timestamp};
15use timely::dataflow::channels::pact::Pipeline;
16use timely::dataflow::operators::generic::OutputBuilder;
17use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
18
19/// This is for REFRESH options on materialized views. It adds an operator that rounds up the
20/// timestamps of data and frontiers to the time of the next refresh. See
21/// `doc/developer/design/20231027_refresh_every_mv.md`.
22///
23/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR,
24/// because iteration numbers should disappear by the time the data gets to the Persist sink.)
25pub(crate) fn apply_refresh<'scope, D>(
26    coll: VecCollection<'scope, Timestamp, D, Diff>,
27    refresh_schedule: RefreshSchedule,
28) -> VecCollection<'scope, Timestamp, D, Diff>
29where
30    D: Data,
31{
32    // We need to disconnect the reachability graph and manage capabilities manually, because we'd
33    // like to round up frontiers as well as data: as soon as our input frontier passes a refresh
34    // time, we'll round it up to the next refresh time.
35    let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope());
36    let (output_buf, output_stream) = builder.new_output();
37    let mut output_buf = OutputBuilder::<_, ConsolidatingContainerBuilder<_>>::from(output_buf);
38
39    let mut input = builder.new_input_connection(coll.inner, Pipeline, []);
40    builder.build(move |capabilities| {
41        // This capability directly controls this operator's output frontier (because we have
42        // disconnected the input above). We wrap it in an Option so we can drop it to advance to
43        // the empty output frontier when the last refresh is done. (We must be careful that we only
44        // ever emit output updates at times that are at or beyond this capability.)
45        let mut capability = capabilities.into_iter().next(); // (We have 1 one input.)
46        move |frontiers| {
47            let mut output_handle_core = output_buf.activate();
48            input.for_each(|input_cap, data| {
49                // Note that we can't use `input_cap` to get an output session because we might have
50                // advanced our output frontier already beyond the frontier of this capability.
51
52                // `capability` will be None if we are past the last refresh. We have made sure to
53                // not receive any data that is after the last refresh by setting the `until` of the
54                // dataflow to the last refresh.
55                let Some(capability) = capability.as_mut() else {
56                    soft_panic_or_log!(
57                        "should have a capability if we received data. input_cap: {:?}, frontier: {:?}",
58                        input_cap.time(),
59                        frontiers[0].frontier()
60                    );
61                    return;
62                };
63                let mut output_buf = output_handle_core.session_with_builder(&capability);
64
65                let mut cached_ts: Option<Timestamp> = None;
66                let mut cached_rounded_up_data_ts = None;
67                for (d, ts, r) in data.drain(..) {
68                    let rounded_up_data_ts = {
69                        // We cache the rounded up timestamp for the last seen timestamp,
70                        // because the rounding up has a non-negligible cost. Caching for
71                        // just the 1 last timestamp helps already, because in some common
72                        // cases, we'll be seeing a lot of identical timestamps, e.g.,
73                        // during a rehydration, or when we have much more than 1000 records
74                        // during a single second.
75                        if cached_ts != Some(ts) {
76                            cached_ts = Some(ts);
77                            cached_rounded_up_data_ts = refresh_schedule.round_up_timestamp(ts);
78                        }
79                        cached_rounded_up_data_ts
80                    };
81                    match rounded_up_data_ts {
82                        Some(rounded_up_ts) => {
83                            output_buf.give((d, rounded_up_ts, r));
84                        }
85                        None => {
86                            // This record is after the last refresh, which is not possible because
87                            // we set the dataflow `until` to the last refresh.
88                            soft_panic_or_log!("Received data after the last refresh");
89                        }
90                    }
91                }
92            });
93
94            // Round up the frontier.
95            // Note that `round_up_timestamp` is monotonic. This is needed to ensure that the
96            // timestamp (t) of any received data that has a larger timestamp than the original
97            // frontier (f) will get rounded up to a time that is at least at the rounded up
98            // frontier. In other words, monotonicity ensures that
99            // when `t >= f` then `round_up_timestamp(t) >= round_up_timestamp(f)`.
100            match frontiers[0].frontier().as_option() { // (We have only 1 input, so only 1 frontier.)
101                Some(ts) => {
102                    match refresh_schedule.round_up_timestamp(*ts) {
103                        Some(rounded_up_ts) => {
104                            capability
105                                .as_mut()
106                                .expect("capability must exist if frontier is <= last refresh")
107                                .downgrade(&rounded_up_ts);
108                        }
109                        None => {
110                            // We are past the last refresh. Drop the capability to signal that we
111                            // are done.
112                            capability = None;
113                            // We can only get here if we see the frontier advancing to a time after the last refresh,
114                            // but not empty. This is ok, because even though we set the `until` to the last refresh,
115                            // frontier advancements might still happen past the `until`.
116                        }
117                    }
118                }
119                None => {
120                    capability = None;
121                }
122            }
123        }
124    });
125
126    output_stream.as_collection()
127}