mz_persist_client/internal/
maintenance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! All machines need maintenance
11//!
12//! Maintenance operations for persist, shared among active handles
13
14use std::fmt::Debug;
15use std::mem;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use futures_util::FutureExt;
21use futures_util::future::BoxFuture;
22use mz_persist::location::SeqNo;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::Timestamp;
25
26use crate::internal::compact::CompactReq;
27use crate::internal::gc::GcReq;
28use crate::{Compactor, GarbageCollector, Machine};
29
30/// Every handle to this shard may be occasionally asked to perform
31/// routine maintenance after a successful compare_and_set operation.
32///
33/// For one-shot operations (like registering a reader) handles are
34/// allowed to skip routine maintenance if necessary, as the same
35/// maintenance operations will be recomputed by the next successful
36/// compare_and_set of any handle.
37///
38/// Operations that run regularly once a handle is registered, such
39/// as heartbeats, are expected to always perform maintenance.
40#[must_use]
41#[derive(Debug, Default, PartialEq)]
42pub struct RoutineMaintenance {
43    pub(crate) garbage_collection: Option<GcReq>,
44    pub(crate) write_rollup: Option<SeqNo>,
45}
46
47impl RoutineMaintenance {
48    pub(crate) fn is_empty(&self) -> bool {
49        self == &RoutineMaintenance::default()
50    }
51
52    /// Initiates any routine maintenance necessary in background tasks
53    pub(crate) fn start_performing<K, V, T, D>(
54        self,
55        machine: &Machine<K, V, T, D>,
56        gc: &GarbageCollector<K, V, T, D>,
57    ) where
58        K: Debug + Codec,
59        V: Debug + Codec,
60        T: Timestamp + Lattice + Codec64 + Sync,
61        D: Monoid + Codec64 + Send + Sync,
62    {
63        let _ = self.perform_in_background(machine, gc);
64    }
65
66    /// Performs any routine maintenance necessary. Returns when all background
67    /// tasks have completed and the maintenance is done.
68    ///
69    /// Generally, clients should prefer `start_performing` since it's typically not critical
70    /// that maintenance is performed by any particular deadline.
71    pub(crate) async fn perform<K, V, T, D>(
72        self,
73        machine: &Machine<K, V, T, D>,
74        gc: &GarbageCollector<K, V, T, D>,
75    ) where
76        K: Debug + Codec,
77        V: Debug + Codec,
78        T: Timestamp + Lattice + Codec64 + Sync,
79        D: Monoid + Codec64 + Send + Sync,
80    {
81        let mut more_maintenance = RoutineMaintenance::default();
82        for future in self.perform_in_background(machine, gc) {
83            more_maintenance.merge(future.await);
84        }
85
86        while !more_maintenance.is_empty() {
87            let maintenance = mem::take(&mut more_maintenance);
88            for future in maintenance.perform_in_background(machine, gc) {
89                more_maintenance.merge(future.await);
90            }
91        }
92    }
93
94    /// Initiates maintenance work in the background, either through spawned tasks
95    /// or by sending messages to existing tasks. The returned futures may be
96    /// awaited to know when the work is completed, but do not need to be polled
97    /// to drive the work to completion.
98    fn perform_in_background<K, V, T, D>(
99        self,
100        machine: &Machine<K, V, T, D>,
101        gc: &GarbageCollector<K, V, T, D>,
102    ) -> Vec<BoxFuture<'static, RoutineMaintenance>>
103    where
104        K: Debug + Codec,
105        V: Debug + Codec,
106        T: Timestamp + Lattice + Codec64 + Sync,
107        D: Monoid + Codec64 + Send + Sync,
108    {
109        let mut futures = vec![];
110        if let Some(gc_req) = self.garbage_collection {
111            if let Some(recv) = gc.gc_and_truncate_background(gc_req) {
112                // it's safe to ignore errors on the receiver. in the
113                // case of shutdown, the sender may have been dropped
114                futures.push(recv.map(Result::unwrap_or_default).boxed());
115            }
116        }
117
118        if let Some(rollup_seqno) = self.write_rollup {
119            let machine = machine.clone();
120            let isolated_runtime = Arc::clone(&machine.isolated_runtime);
121            futures.push(
122                isolated_runtime
123                    .spawn_named(|| "persist::write_rollup", async move {
124                        machine
125                            .applier
126                            .fetch_and_update_state(Some(rollup_seqno))
127                            .await;
128                        // We don't have to write at exactly rollup_seqno, just need
129                        // something recent.
130                        assert!(
131                            machine.seqno() >= rollup_seqno,
132                            "{} vs {}",
133                            machine.seqno(),
134                            rollup_seqno
135                        );
136                        machine.add_rollup_for_current_seqno().await
137                    })
138                    .boxed(),
139            );
140        }
141
142        futures
143    }
144
145    /// Merges another maintenance request into this one.
146    ///
147    /// `other` is expected to come "later", and so its maintenance might
148    /// override `self`'s maintenance.
149    pub fn merge(&mut self, other: RoutineMaintenance) {
150        // Deconstruct other so we get a compile failure if new fields are
151        // added.
152        let RoutineMaintenance {
153            garbage_collection,
154            write_rollup,
155        } = other;
156        if let Some(garbage_collection) = garbage_collection {
157            self.garbage_collection = Some(garbage_collection);
158        }
159        if let Some(write_rollup) = write_rollup {
160            self.write_rollup = Some(write_rollup);
161        }
162    }
163}
164
165/// Writers may be asked to perform additional tasks beyond the
166/// routine maintenance common to all handles. It is expected that
167/// writers always perform maintenance.
168#[must_use]
169#[derive(Debug)]
170pub struct WriterMaintenance<T> {
171    pub(crate) routine: RoutineMaintenance,
172    pub(crate) compaction: Vec<CompactReq<T>>,
173}
174
175impl<T> Default for WriterMaintenance<T> {
176    fn default() -> Self {
177        Self {
178            routine: RoutineMaintenance::default(),
179            compaction: Vec::default(),
180        }
181    }
182}
183
184impl<T> WriterMaintenance<T>
185where
186    T: Timestamp + Lattice + Codec64 + Sync,
187{
188    /// Initiates any writer maintenance necessary in background tasks
189    pub(crate) fn start_performing<K, V, D>(
190        self,
191        machine: &Machine<K, V, T, D>,
192        gc: &GarbageCollector<K, V, T, D>,
193        compactor: Option<&Compactor<K, V, T, D>>,
194    ) where
195        K: Debug + Codec,
196        V: Debug + Codec,
197        D: Monoid + Ord + Codec64 + Send + Sync,
198    {
199        let machine = machine.clone();
200        let gc = gc.clone();
201        let compactor = compactor.cloned();
202        mz_ore::task::spawn(|| "writer-maintenance", async move {
203            self.perform(&machine, &gc, compactor.as_ref()).await
204        });
205    }
206
207    /// Performs any writer maintenance necessary. Returns when all background
208    /// tasks have completed and the maintenance is done.
209    pub(crate) async fn perform<K, V, D>(
210        self,
211        machine: &Machine<K, V, T, D>,
212        gc: &GarbageCollector<K, V, T, D>,
213        compactor: Option<&Compactor<K, V, T, D>>,
214    ) where
215        K: Debug + Codec,
216        V: Debug + Codec,
217        D: Monoid + Ord + Codec64 + Send + Sync,
218    {
219        let Self {
220            routine,
221            compaction,
222        } = self;
223        let mut more_maintenance = RoutineMaintenance::default();
224        for future in routine.perform_in_background(machine, gc) {
225            more_maintenance.merge(future.await);
226        }
227
228        if let Some(compactor) = compactor {
229            for req in compaction {
230                if let Some(receiver) = compactor.compact_and_apply_background(req, machine) {
231                    // it's safe to ignore errors on the receiver. in the
232                    // case of shutdown, the sender may have been dropped
233                    let _ = receiver.await;
234                }
235            }
236        }
237
238        while !more_maintenance.is_empty() {
239            let maintenance = mem::take(&mut more_maintenance);
240            for future in maintenance.perform_in_background(machine, gc) {
241                more_maintenance.merge(future.await);
242            }
243        }
244    }
245}