mz_compute/sink/
refresh.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
11use differential_dataflow::{AsCollection, Data, VecCollection};
12use mz_ore::soft_panic_or_log;
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{Diff, Timestamp};
15use timely::dataflow::Scope;
16use timely::dataflow::channels::pact::Pipeline;
17use timely::dataflow::operators::generic::OutputBuilder;
18use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
19
20/// This is for REFRESH options on materialized views. It adds an operator that rounds up the
21/// timestamps of data and frontiers to the time of the next refresh. See
22/// `doc/developer/design/20231027_refresh_every_mv.md`.
23///
24/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR,
25/// because iteration numbers should disappear by the time the data gets to the Persist sink.)
26pub(crate) fn apply_refresh<G, D>(
27    coll: VecCollection<G, D, Diff>,
28    refresh_schedule: RefreshSchedule,
29) -> VecCollection<G, D, Diff>
30where
31    G: Scope<Timestamp = Timestamp>,
32    D: Data,
33{
34    // We need to disconnect the reachability graph and manage capabilities manually, because we'd
35    // like to round up frontiers as well as data: as soon as our input frontier passes a refresh
36    // time, we'll round it up to the next refresh time.
37    let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope());
38    let (output_buf, output_stream) = builder.new_output();
39    let mut output_buf = OutputBuilder::<_, ConsolidatingContainerBuilder<_>>::from(output_buf);
40
41    let mut input = builder.new_input_connection(&coll.inner, Pipeline, []);
42    builder.build(move |capabilities| {
43        // This capability directly controls this operator's output frontier (because we have
44        // disconnected the input above). We wrap it in an Option so we can drop it to advance to
45        // the empty output frontier when the last refresh is done. (We must be careful that we only
46        // ever emit output updates at times that are at or beyond this capability.)
47        let mut capability = capabilities.into_iter().next(); // (We have 1 one input.)
48        move |frontiers| {
49            let mut output_handle_core = output_buf.activate();
50            input.for_each(|input_cap, data| {
51                // Note that we can't use `input_cap` to get an output session because we might have
52                // advanced our output frontier already beyond the frontier of this capability.
53
54                // `capability` will be None if we are past the last refresh. We have made sure to
55                // not receive any data that is after the last refresh by setting the `until` of the
56                // dataflow to the last refresh.
57                let Some(capability) = capability.as_mut() else {
58                    soft_panic_or_log!(
59                        "should have a capability if we received data. input_cap: {:?}, frontier: {:?}",
60                        input_cap.time(),
61                        frontiers[0].frontier()
62                    );
63                    return;
64                };
65                let mut output_buf = output_handle_core.session_with_builder(&capability);
66
67                let mut cached_ts: Option<Timestamp> = None;
68                let mut cached_rounded_up_data_ts = None;
69                for (d, ts, r) in data.drain(..) {
70                    let rounded_up_data_ts = {
71                        // We cache the rounded up timestamp for the last seen timestamp,
72                        // because the rounding up has a non-negligible cost. Caching for
73                        // just the 1 last timestamp helps already, because in some common
74                        // cases, we'll be seeing a lot of identical timestamps, e.g.,
75                        // during a rehydration, or when we have much more than 1000 records
76                        // during a single second.
77                        if cached_ts != Some(ts) {
78                            cached_ts = Some(ts);
79                            cached_rounded_up_data_ts = refresh_schedule.round_up_timestamp(ts);
80                        }
81                        cached_rounded_up_data_ts
82                    };
83                    match rounded_up_data_ts {
84                        Some(rounded_up_ts) => {
85                            output_buf.give((d, rounded_up_ts, r));
86                        }
87                        None => {
88                            // This record is after the last refresh, which is not possible because
89                            // we set the dataflow `until` to the last refresh.
90                            soft_panic_or_log!("Received data after the last refresh");
91                        }
92                    }
93                }
94            });
95
96            // Round up the frontier.
97            // Note that `round_up_timestamp` is monotonic. This is needed to ensure that the
98            // timestamp (t) of any received data that has a larger timestamp than the original
99            // frontier (f) will get rounded up to a time that is at least at the rounded up
100            // frontier. In other words, monotonicity ensures that
101            // when `t >= f` then `round_up_timestamp(t) >= round_up_timestamp(f)`.
102            match frontiers[0].frontier().as_option() { // (We have only 1 input, so only 1 frontier.)
103                Some(ts) => {
104                    match refresh_schedule.round_up_timestamp(*ts) {
105                        Some(rounded_up_ts) => {
106                            capability
107                                .as_mut()
108                                .expect("capability must exist if frontier is <= last refresh")
109                                .downgrade(&rounded_up_ts);
110                        }
111                        None => {
112                            // We are past the last refresh. Drop the capability to signal that we
113                            // are done.
114                            capability = None;
115                            // We can only get here if we see the frontier advancing to a time after the last refresh,
116                            // but not empty. This is ok, because even though we set the `until` to the last refresh,
117                            // frontier advancements might still happen past the `until`.
118                        }
119                    }
120                }
121                None => {
122                    capability = None;
123                }
124            }
125        }
126    });
127
128    output_stream.as_collection()
129}