mz_compute/sink/refresh.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
11use differential_dataflow::{AsCollection, Collection, Data};
12use mz_ore::soft_panic_or_log;
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{Diff, Timestamp};
15use timely::dataflow::Scope;
16use timely::dataflow::channels::pact::Pipeline;
17use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
18
19/// This is for REFRESH options on materialized views. It adds an operator that rounds up the
20/// timestamps of data and frontiers to the time of the next refresh. See
21/// `doc/developer/design/20231027_refresh_every_mv.md`.
22///
23/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR,
24/// because iteration numbers should disappear by the time the data gets to the Persist sink.)
25pub(crate) fn apply_refresh<G, D>(
26 coll: Collection<G, D, Diff>,
27 refresh_schedule: RefreshSchedule,
28) -> Collection<G, D, Diff>
29where
30 G: Scope<Timestamp = Timestamp>,
31 D: Data,
32{
33 // We need to disconnect the reachability graph and manage capabilities manually, because we'd
34 // like to round up frontiers as well as data: as soon as our input frontier passes a refresh
35 // time, we'll round it up to the next refresh time.
36 let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope());
37 let (mut output_buf, output_stream) = builder.new_output::<ConsolidatingContainerBuilder<_>>();
38 let mut input = builder.new_input_connection(&coll.inner, Pipeline, []);
39 builder.build(move |capabilities| {
40 // This capability directly controls this operator's output frontier (because we have
41 // disconnected the input above). We wrap it in an Option so we can drop it to advance to
42 // the empty output frontier when the last refresh is done. (We must be careful that we only
43 // ever emit output updates at times that are at or beyond this capability.)
44 let mut capability = capabilities.into_iter().next(); // (We have 1 one input.)
45 move |frontiers| {
46 let mut output_handle_core = output_buf.activate();
47 input.for_each(|input_cap, data| {
48 // Note that we can't use `input_cap` to get an output session because we might have
49 // advanced our output frontier already beyond the frontier of this capability.
50
51 // `capability` will be None if we are past the last refresh. We have made sure to
52 // not receive any data that is after the last refresh by setting the `until` of the
53 // dataflow to the last refresh.
54 let Some(capability) = capability.as_mut() else {
55 soft_panic_or_log!(
56 "should have a capability if we received data. input_cap: {:?}, frontier: {:?}",
57 input_cap.time(),
58 frontiers[0].frontier()
59 );
60 return;
61 };
62 let mut output_buf = output_handle_core.session_with_builder(&capability);
63
64 let mut cached_ts: Option<Timestamp> = None;
65 let mut cached_rounded_up_data_ts = None;
66 for (d, ts, r) in data.drain(..) {
67 let rounded_up_data_ts = {
68 // We cache the rounded up timestamp for the last seen timestamp,
69 // because the rounding up has a non-negligible cost. Caching for
70 // just the 1 last timestamp helps already, because in some common
71 // cases, we'll be seeing a lot of identical timestamps, e.g.,
72 // during a rehydration, or when we have much more than 1000 records
73 // during a single second.
74 if cached_ts != Some(ts) {
75 cached_ts = Some(ts);
76 cached_rounded_up_data_ts = refresh_schedule.round_up_timestamp(ts);
77 }
78 cached_rounded_up_data_ts
79 };
80 match rounded_up_data_ts {
81 Some(rounded_up_ts) => {
82 output_buf.give((d, rounded_up_ts, r));
83 }
84 None => {
85 // This record is after the last refresh, which is not possible because
86 // we set the dataflow `until` to the last refresh.
87 soft_panic_or_log!("Received data after the last refresh");
88 }
89 }
90 }
91 });
92
93 // Round up the frontier.
94 // Note that `round_up_timestamp` is monotonic. This is needed to ensure that the
95 // timestamp (t) of any received data that has a larger timestamp than the original
96 // frontier (f) will get rounded up to a time that is at least at the rounded up
97 // frontier. In other words, monotonicity ensures that
98 // when `t >= f` then `round_up_timestamp(t) >= round_up_timestamp(f)`.
99 match frontiers[0].frontier().as_option() { // (We have only 1 input, so only 1 frontier.)
100 Some(ts) => {
101 match refresh_schedule.round_up_timestamp(*ts) {
102 Some(rounded_up_ts) => {
103 capability
104 .as_mut()
105 .expect("capability must exist if frontier is <= last refresh")
106 .downgrade(&rounded_up_ts);
107 }
108 None => {
109 // We are past the last refresh. Drop the capability to signal that we
110 // are done.
111 capability = None;
112 // We can only get here if we see the frontier advancing to a time after the last refresh,
113 // but not empty. This is ok, because even though we set the `until` to the last refresh,
114 // frontier advancements might still happen past the `until`.
115 }
116 }
117 }
118 None => {
119 capability = None;
120 }
121 }
122 }
123 });
124
125 output_stream.as_collection()
126}