mz_persist_client/internal/
maintenance.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! All machines need maintenance
11//!
12//! Maintenance operations for persist, shared among active handles
13
14use std::fmt::Debug;
15use std::mem;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use futures_util::FutureExt;
21use futures_util::future::BoxFuture;
22use mz_persist::location::SeqNo;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::Timestamp;
25
26use crate::internal::compact::CompactReq;
27use crate::internal::gc::GcReq;
28use crate::{Compactor, GarbageCollector, Machine};
29
30/// Every handle to this shard may be occasionally asked to perform
31/// routine maintenance after a successful compare_and_set operation.
32///
33/// For one-shot operations (like registering a reader) handles are
34/// allowed to skip routine maintenance if necessary, as the same
35/// maintenance operations will be recomputed by the next successful
36/// compare_and_set of any handle.
37///
38/// Operations that run regularly once a handle is registered, such
39/// as heartbeats, are expected to always perform maintenance.
40#[must_use]
41#[derive(Debug, Default, PartialEq)]
42pub struct RoutineMaintenance {
43    pub(crate) garbage_collection: Option<GcReq>,
44    pub(crate) write_rollup: Option<SeqNo>,
45}
46
47impl RoutineMaintenance {
48    pub(crate) fn is_empty(&self) -> bool {
49        self == &RoutineMaintenance::default()
50    }
51
52    /// Initiates any routine maintenance necessary in background tasks
53    pub(crate) fn start_performing<K, V, T, D>(
54        self,
55        machine: &Machine<K, V, T, D>,
56        gc: &GarbageCollector<K, V, T, D>,
57    ) where
58        K: Debug + Codec,
59        V: Debug + Codec,
60        T: Timestamp + Lattice + Codec64 + Sync,
61        D: Semigroup + Codec64 + Send + Sync,
62    {
63        let _ = self.perform_in_background(machine, gc);
64    }
65
66    /// Performs any routine maintenance necessary. Returns when all background
67    /// tasks have completed and the maintenance is done.
68    ///
69    /// Generally, clients should prefer `start_performing` since it's typically not critical
70    /// that maintenance is performed by any particular deadline.
71    pub(crate) async fn perform<K, V, T, D>(
72        self,
73        machine: &Machine<K, V, T, D>,
74        gc: &GarbageCollector<K, V, T, D>,
75    ) where
76        K: Debug + Codec,
77        V: Debug + Codec,
78        T: Timestamp + Lattice + Codec64 + Sync,
79        D: Semigroup + Codec64 + Send + Sync,
80    {
81        let mut more_maintenance = RoutineMaintenance::default();
82        for future in self.perform_in_background(machine, gc) {
83            more_maintenance.merge(future.await);
84        }
85
86        while !more_maintenance.is_empty() {
87            let maintenance = mem::take(&mut more_maintenance);
88            for future in maintenance.perform_in_background(machine, gc) {
89                more_maintenance.merge(future.await);
90            }
91        }
92    }
93
94    /// Initiates maintenance work in the background, either through spawned tasks
95    /// or by sending messages to existing tasks. The returned futures may be
96    /// awaited to know when the work is completed, but do not need to be polled
97    /// to drive the work to completion.
98    fn perform_in_background<K, V, T, D>(
99        self,
100        machine: &Machine<K, V, T, D>,
101        gc: &GarbageCollector<K, V, T, D>,
102    ) -> Vec<BoxFuture<'static, RoutineMaintenance>>
103    where
104        K: Debug + Codec,
105        V: Debug + Codec,
106        T: Timestamp + Lattice + Codec64 + Sync,
107        D: Semigroup + Codec64 + Send + Sync,
108    {
109        let mut futures = vec![];
110        if let Some(gc_req) = self.garbage_collection {
111            if let Some(recv) = gc.gc_and_truncate_background(gc_req) {
112                // it's safe to ignore errors on the receiver. in the
113                // case of shutdown, the sender may have been dropped
114                futures.push(recv.map(Result::unwrap_or_default).boxed());
115            }
116        }
117
118        if let Some(rollup_seqno) = self.write_rollup {
119            let machine = machine.clone();
120            let isolated_runtime = Arc::clone(&machine.isolated_runtime);
121            futures.push(
122                isolated_runtime
123                    .spawn_named(|| "persist::write_rollup", async move {
124                        machine
125                            .applier
126                            .fetch_and_update_state(Some(rollup_seqno))
127                            .await;
128                        // We don't have to write at exactly rollup_seqno, just need
129                        // something recent.
130                        assert!(
131                            machine.seqno() >= rollup_seqno,
132                            "{} vs {}",
133                            machine.seqno(),
134                            rollup_seqno
135                        );
136                        machine.add_rollup_for_current_seqno().await
137                    })
138                    .map(Result::unwrap_or_default)
139                    .boxed(),
140            );
141        }
142
143        futures
144    }
145
146    /// Merges another maintenance request into this one.
147    ///
148    /// `other` is expected to come "later", and so its maintenance might
149    /// override `self`'s maintenance.
150    pub fn merge(&mut self, other: RoutineMaintenance) {
151        // Deconstruct other so we get a compile failure if new fields are
152        // added.
153        let RoutineMaintenance {
154            garbage_collection,
155            write_rollup,
156        } = other;
157        if let Some(garbage_collection) = garbage_collection {
158            self.garbage_collection = Some(garbage_collection);
159        }
160        if let Some(write_rollup) = write_rollup {
161            self.write_rollup = Some(write_rollup);
162        }
163    }
164}
165
166/// Writers may be asked to perform additional tasks beyond the
167/// routine maintenance common to all handles. It is expected that
168/// writers always perform maintenance.
169#[must_use]
170#[derive(Debug)]
171pub struct WriterMaintenance<T> {
172    pub(crate) routine: RoutineMaintenance,
173    pub(crate) compaction: Vec<CompactReq<T>>,
174}
175
176impl<T> Default for WriterMaintenance<T> {
177    fn default() -> Self {
178        Self {
179            routine: RoutineMaintenance::default(),
180            compaction: Vec::default(),
181        }
182    }
183}
184
185impl<T> WriterMaintenance<T>
186where
187    T: Timestamp + Lattice + Codec64 + Sync,
188{
189    /// Initiates any writer maintenance necessary in background tasks
190    pub(crate) fn start_performing<K, V, D>(
191        self,
192        machine: &Machine<K, V, T, D>,
193        gc: &GarbageCollector<K, V, T, D>,
194        compactor: Option<&Compactor<K, V, T, D>>,
195    ) where
196        K: Debug + Codec,
197        V: Debug + Codec,
198        D: Semigroup + Ord + Codec64 + Send + Sync,
199    {
200        let machine = machine.clone();
201        let gc = gc.clone();
202        let compactor = compactor.cloned();
203        mz_ore::task::spawn(|| "writer-maintenance", async move {
204            self.perform(&machine, &gc, compactor.as_ref()).await
205        });
206    }
207
208    /// Performs any writer maintenance necessary. Returns when all background
209    /// tasks have completed and the maintenance is done.
210    pub(crate) async fn perform<K, V, D>(
211        self,
212        machine: &Machine<K, V, T, D>,
213        gc: &GarbageCollector<K, V, T, D>,
214        compactor: Option<&Compactor<K, V, T, D>>,
215    ) where
216        K: Debug + Codec,
217        V: Debug + Codec,
218        D: Semigroup + Ord + Codec64 + Send + Sync,
219    {
220        let Self {
221            routine,
222            compaction,
223        } = self;
224        let mut more_maintenance = RoutineMaintenance::default();
225        for future in routine.perform_in_background(machine, gc) {
226            more_maintenance.merge(future.await);
227        }
228
229        if let Some(compactor) = compactor {
230            for req in compaction {
231                if let Some(receiver) = compactor.compact_and_apply_background(req, machine) {
232                    // it's safe to ignore errors on the receiver. in the
233                    // case of shutdown, the sender may have been dropped
234                    let _ = receiver.await;
235                }
236            }
237        }
238
239        while !more_maintenance.is_empty() {
240            let maintenance = mem::take(&mut more_maintenance);
241            for future in maintenance.perform_in_background(machine, gc) {
242                more_maintenance.merge(future.await);
243            }
244        }
245    }
246}