mz_persist_client/internal/
maintenance.rs
1use std::fmt::Debug;
15use std::mem;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use futures_util::FutureExt;
21use futures_util::future::BoxFuture;
22use mz_persist::location::SeqNo;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::Timestamp;
25
26use crate::internal::compact::CompactReq;
27use crate::internal::gc::GcReq;
28use crate::{Compactor, GarbageCollector, Machine};
29
30#[must_use]
41#[derive(Debug, Default, PartialEq)]
42pub struct RoutineMaintenance {
43 pub(crate) garbage_collection: Option<GcReq>,
44 pub(crate) write_rollup: Option<SeqNo>,
45}
46
47impl RoutineMaintenance {
48 pub(crate) fn is_empty(&self) -> bool {
49 self == &RoutineMaintenance::default()
50 }
51
52 pub(crate) fn start_performing<K, V, T, D>(
54 self,
55 machine: &Machine<K, V, T, D>,
56 gc: &GarbageCollector<K, V, T, D>,
57 ) where
58 K: Debug + Codec,
59 V: Debug + Codec,
60 T: Timestamp + Lattice + Codec64 + Sync,
61 D: Semigroup + Codec64 + Send + Sync,
62 {
63 let _ = self.perform_in_background(machine, gc);
64 }
65
66 pub(crate) async fn perform<K, V, T, D>(
72 self,
73 machine: &Machine<K, V, T, D>,
74 gc: &GarbageCollector<K, V, T, D>,
75 ) where
76 K: Debug + Codec,
77 V: Debug + Codec,
78 T: Timestamp + Lattice + Codec64 + Sync,
79 D: Semigroup + Codec64 + Send + Sync,
80 {
81 let mut more_maintenance = RoutineMaintenance::default();
82 for future in self.perform_in_background(machine, gc) {
83 more_maintenance.merge(future.await);
84 }
85
86 while !more_maintenance.is_empty() {
87 let maintenance = mem::take(&mut more_maintenance);
88 for future in maintenance.perform_in_background(machine, gc) {
89 more_maintenance.merge(future.await);
90 }
91 }
92 }
93
94 fn perform_in_background<K, V, T, D>(
99 self,
100 machine: &Machine<K, V, T, D>,
101 gc: &GarbageCollector<K, V, T, D>,
102 ) -> Vec<BoxFuture<'static, RoutineMaintenance>>
103 where
104 K: Debug + Codec,
105 V: Debug + Codec,
106 T: Timestamp + Lattice + Codec64 + Sync,
107 D: Semigroup + Codec64 + Send + Sync,
108 {
109 let mut futures = vec![];
110 if let Some(gc_req) = self.garbage_collection {
111 if let Some(recv) = gc.gc_and_truncate_background(gc_req) {
112 futures.push(recv.map(Result::unwrap_or_default).boxed());
115 }
116 }
117
118 if let Some(rollup_seqno) = self.write_rollup {
119 let machine = machine.clone();
120 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
121 futures.push(
122 isolated_runtime
123 .spawn_named(|| "persist::write_rollup", async move {
124 machine
125 .applier
126 .fetch_and_update_state(Some(rollup_seqno))
127 .await;
128 assert!(
131 machine.seqno() >= rollup_seqno,
132 "{} vs {}",
133 machine.seqno(),
134 rollup_seqno
135 );
136 machine.add_rollup_for_current_seqno().await
137 })
138 .map(Result::unwrap_or_default)
139 .boxed(),
140 );
141 }
142
143 futures
144 }
145
146 pub fn merge(&mut self, other: RoutineMaintenance) {
151 let RoutineMaintenance {
154 garbage_collection,
155 write_rollup,
156 } = other;
157 if let Some(garbage_collection) = garbage_collection {
158 self.garbage_collection = Some(garbage_collection);
159 }
160 if let Some(write_rollup) = write_rollup {
161 self.write_rollup = Some(write_rollup);
162 }
163 }
164}
165
166#[must_use]
170#[derive(Debug)]
171pub struct WriterMaintenance<T> {
172 pub(crate) routine: RoutineMaintenance,
173 pub(crate) compaction: Vec<CompactReq<T>>,
174}
175
176impl<T> Default for WriterMaintenance<T> {
177 fn default() -> Self {
178 Self {
179 routine: RoutineMaintenance::default(),
180 compaction: Vec::default(),
181 }
182 }
183}
184
185impl<T> WriterMaintenance<T>
186where
187 T: Timestamp + Lattice + Codec64 + Sync,
188{
189 pub(crate) fn start_performing<K, V, D>(
191 self,
192 machine: &Machine<K, V, T, D>,
193 gc: &GarbageCollector<K, V, T, D>,
194 compactor: Option<&Compactor<K, V, T, D>>,
195 ) where
196 K: Debug + Codec,
197 V: Debug + Codec,
198 D: Semigroup + Ord + Codec64 + Send + Sync,
199 {
200 let machine = machine.clone();
201 let gc = gc.clone();
202 let compactor = compactor.cloned();
203 mz_ore::task::spawn(|| "writer-maintenance", async move {
204 self.perform(&machine, &gc, compactor.as_ref()).await
205 });
206 }
207
208 pub(crate) async fn perform<K, V, D>(
211 self,
212 machine: &Machine<K, V, T, D>,
213 gc: &GarbageCollector<K, V, T, D>,
214 compactor: Option<&Compactor<K, V, T, D>>,
215 ) where
216 K: Debug + Codec,
217 V: Debug + Codec,
218 D: Semigroup + Ord + Codec64 + Send + Sync,
219 {
220 let Self {
221 routine,
222 compaction,
223 } = self;
224 let mut more_maintenance = RoutineMaintenance::default();
225 for future in routine.perform_in_background(machine, gc) {
226 more_maintenance.merge(future.await);
227 }
228
229 if let Some(compactor) = compactor {
230 for req in compaction {
231 if let Some(receiver) = compactor.compact_and_apply_background(req, machine) {
232 let _ = receiver.await;
235 }
236 }
237 }
238
239 while !more_maintenance.is_empty() {
240 let maintenance = mem::take(&mut more_maintenance);
241 for future in maintenance.perform_in_background(machine, gc) {
242 more_maintenance.merge(future.await);
243 }
244 }
245 }
246}