mz_persist_client/internal/
maintenance.rs1use std::fmt::Debug;
15use std::mem;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use futures_util::FutureExt;
21use futures_util::future::BoxFuture;
22use mz_persist::location::SeqNo;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::Timestamp;
25
26use crate::internal::compact::CompactReq;
27use crate::internal::gc::GcReq;
28use crate::{Compactor, GarbageCollector, Machine};
29
30#[must_use]
41#[derive(Debug, Default, PartialEq)]
42pub struct RoutineMaintenance {
43 pub(crate) garbage_collection: Option<GcReq>,
44 pub(crate) write_rollup: Option<SeqNo>,
45}
46
47impl RoutineMaintenance {
48 pub(crate) fn is_empty(&self) -> bool {
49 self == &RoutineMaintenance::default()
50 }
51
52 pub(crate) fn start_performing<K, V, T, D>(
54 self,
55 machine: &Machine<K, V, T, D>,
56 gc: &GarbageCollector<K, V, T, D>,
57 ) where
58 K: Debug + Codec,
59 V: Debug + Codec,
60 T: Timestamp + Lattice + Codec64 + Sync,
61 D: Monoid + Codec64 + Send + Sync,
62 {
63 let _ = self.perform_in_background(machine, gc);
64 }
65
66 pub(crate) async fn perform<K, V, T, D>(
72 self,
73 machine: &Machine<K, V, T, D>,
74 gc: &GarbageCollector<K, V, T, D>,
75 ) where
76 K: Debug + Codec,
77 V: Debug + Codec,
78 T: Timestamp + Lattice + Codec64 + Sync,
79 D: Monoid + Codec64 + Send + Sync,
80 {
81 let mut more_maintenance = RoutineMaintenance::default();
82 for future in self.perform_in_background(machine, gc) {
83 more_maintenance.merge(future.await);
84 }
85
86 while !more_maintenance.is_empty() {
87 let maintenance = mem::take(&mut more_maintenance);
88 for future in maintenance.perform_in_background(machine, gc) {
89 more_maintenance.merge(future.await);
90 }
91 }
92 }
93
94 fn perform_in_background<K, V, T, D>(
99 self,
100 machine: &Machine<K, V, T, D>,
101 gc: &GarbageCollector<K, V, T, D>,
102 ) -> Vec<BoxFuture<'static, RoutineMaintenance>>
103 where
104 K: Debug + Codec,
105 V: Debug + Codec,
106 T: Timestamp + Lattice + Codec64 + Sync,
107 D: Monoid + Codec64 + Send + Sync,
108 {
109 let mut futures = vec![];
110 if let Some(gc_req) = self.garbage_collection {
111 if let Some(recv) = gc.gc_and_truncate_background(gc_req) {
112 futures.push(recv.map(Result::unwrap_or_default).boxed());
115 }
116 }
117
118 if let Some(rollup_seqno) = self.write_rollup {
119 let machine = machine.clone();
120 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
121 futures.push(
122 isolated_runtime
123 .spawn_named(|| "persist::write_rollup", async move {
124 machine
125 .applier
126 .fetch_and_update_state(Some(rollup_seqno))
127 .await;
128 assert!(
131 machine.seqno() >= rollup_seqno,
132 "{} vs {}",
133 machine.seqno(),
134 rollup_seqno
135 );
136 machine.add_rollup_for_current_seqno().await
137 })
138 .boxed(),
139 );
140 }
141
142 futures
143 }
144
145 pub fn merge(&mut self, other: RoutineMaintenance) {
150 let RoutineMaintenance {
153 garbage_collection,
154 write_rollup,
155 } = other;
156 if let Some(garbage_collection) = garbage_collection {
157 self.garbage_collection = Some(garbage_collection);
158 }
159 if let Some(write_rollup) = write_rollup {
160 self.write_rollup = Some(write_rollup);
161 }
162 }
163}
164
165#[must_use]
169#[derive(Debug)]
170pub struct WriterMaintenance<T> {
171 pub(crate) routine: RoutineMaintenance,
172 pub(crate) compaction: Vec<CompactReq<T>>,
173}
174
175impl<T> Default for WriterMaintenance<T> {
176 fn default() -> Self {
177 Self {
178 routine: RoutineMaintenance::default(),
179 compaction: Vec::default(),
180 }
181 }
182}
183
184impl<T> WriterMaintenance<T>
185where
186 T: Timestamp + Lattice + Codec64 + Sync,
187{
188 pub(crate) fn start_performing<K, V, D>(
190 self,
191 machine: &Machine<K, V, T, D>,
192 gc: &GarbageCollector<K, V, T, D>,
193 compactor: Option<&Compactor<K, V, T, D>>,
194 ) where
195 K: Debug + Codec,
196 V: Debug + Codec,
197 D: Monoid + Ord + Codec64 + Send + Sync,
198 {
199 let machine = machine.clone();
200 let gc = gc.clone();
201 let compactor = compactor.cloned();
202 mz_ore::task::spawn(|| "writer-maintenance", async move {
203 self.perform(&machine, &gc, compactor.as_ref()).await
204 });
205 }
206
207 pub(crate) async fn perform<K, V, D>(
210 self,
211 machine: &Machine<K, V, T, D>,
212 gc: &GarbageCollector<K, V, T, D>,
213 compactor: Option<&Compactor<K, V, T, D>>,
214 ) where
215 K: Debug + Codec,
216 V: Debug + Codec,
217 D: Monoid + Ord + Codec64 + Send + Sync,
218 {
219 let Self {
220 routine,
221 compaction,
222 } = self;
223 let mut more_maintenance = RoutineMaintenance::default();
224 for future in routine.perform_in_background(machine, gc) {
225 more_maintenance.merge(future.await);
226 }
227
228 if let Some(compactor) = compactor {
229 for req in compaction {
230 if let Some(receiver) = compactor.compact_and_apply_background(req, machine) {
231 let _ = receiver.await;
234 }
235 }
236 }
237
238 while !more_maintenance.is_empty() {
239 let maintenance = mem::take(&mut more_maintenance);
240 for future in maintenance.perform_in_background(machine, gc) {
241 more_maintenance.merge(future.await);
242 }
243 }
244 }
245}