1use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_adapter_types::compaction::CompactionWindow;
23use mz_adapter_types::connection::ConnectionId;
24use mz_compute_types::ComputeInstanceId;
25use mz_ore::instrument;
26use mz_repr::{CatalogItemId, GlobalId, Timestamp};
27use mz_storage_types::read_holds::ReadHold;
28use mz_storage_types::read_policy::ReadPolicy;
29use timely::progress::Antichain;
30use timely::progress::Timestamp as _;
31
32use crate::coord::id_bundle::CollectionIdBundle;
33use crate::coord::timeline::{TimelineContext, TimelineState};
34use crate::util::ResultExt;
35
36#[derive(Debug, Default, Clone)]
44pub struct ReadHolds {
45 pub storage_holds: BTreeMap<GlobalId, ReadHold>,
46 pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold>,
47}
48
49impl ReadHolds {
50 pub fn new() -> Self {
52 ReadHolds {
53 storage_holds: BTreeMap::new(),
54 compute_holds: BTreeMap::new(),
55 }
56 }
57
58 pub fn is_empty(&self) -> bool {
59 self.storage_holds.is_empty() && self.compute_holds.is_empty()
60 }
61
62 pub fn storage_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
64 self.storage_holds.keys().copied()
65 }
66
67 pub fn compute_ids(&self) -> impl Iterator<Item = (ComputeInstanceId, GlobalId)> + '_ {
69 self.compute_holds.keys().copied()
70 }
71
72 pub fn id_bundle(&self) -> CollectionIdBundle {
75 let mut res = CollectionIdBundle::default();
76 for id in self.storage_ids() {
77 res.storage_ids.insert(id);
78 }
79 for (instance_id, id) in self.compute_ids() {
80 res.compute_ids.entry(instance_id).or_default().insert(id);
81 }
82
83 res
84 }
85
86 pub fn downgrade(&mut self, time: Timestamp) {
88 let frontier = Antichain::from_elem(time);
89 for hold in self.storage_holds.values_mut() {
90 let _ = hold.try_downgrade(frontier.clone());
91 }
92 for hold in self.compute_holds.values_mut() {
93 let _ = hold.try_downgrade(frontier.clone());
94 }
95 }
96
97 pub fn remove_storage_collection(&mut self, id: GlobalId) {
98 self.storage_holds.remove(&id);
99 }
100
101 pub fn remove_compute_collection(&mut self, instance_id: ComputeInstanceId, id: GlobalId) {
102 self.compute_holds.remove(&(instance_id, id));
103 }
104
105 pub fn subset(&self, id_bundle: &CollectionIdBundle) -> ReadHolds {
107 let mut result = ReadHolds::new();
108
109 for id in &id_bundle.storage_ids {
110 if let Some(hold) = self.storage_holds.get(id) {
111 result.storage_holds.insert(*id, hold.clone());
112 }
113 }
114
115 for (instance_id, ids) in &id_bundle.compute_ids {
116 for id in ids {
117 if let Some(hold) = self.compute_holds.get(&(*instance_id, *id)) {
118 result
119 .compute_holds
120 .insert((*instance_id, *id), hold.clone());
121 }
122 }
123 }
124
125 result
126 }
127}
128
129impl ReadHolds {
130 pub fn least_valid_read(&self) -> Antichain<Timestamp> {
131 let mut since = Antichain::from_elem(Timestamp::minimum());
132 for hold in self.storage_holds.values() {
133 since.join_assign(hold.since());
134 }
135
136 for hold in self.compute_holds.values() {
137 since.join_assign(hold.since());
138 }
139
140 since
141 }
142
143 pub fn since(&self, desired_id: &GlobalId) -> Antichain<Timestamp> {
150 let mut since = Antichain::new();
151
152 if let Some(hold) = self.storage_holds.get(desired_id) {
153 since.extend(hold.since().iter().cloned());
154 }
155
156 for ((_instance, id), hold) in self.compute_holds.iter() {
157 if id != desired_id {
158 continue;
159 }
160 since.extend(hold.since().iter().cloned());
161 }
162
163 since
164 }
165
166 fn merge(&mut self, other: Self) {
168 use std::collections::btree_map::Entry;
169
170 for (id, other_hold) in other.storage_holds {
171 match self.storage_holds.entry(id) {
172 Entry::Occupied(mut o) => {
173 o.get_mut().merge_assign(other_hold);
174 }
175 Entry::Vacant(v) => {
176 v.insert(other_hold);
177 }
178 }
179 }
180 for (id, other_hold) in other.compute_holds {
181 match self.compute_holds.entry(id) {
182 Entry::Occupied(mut o) => {
183 o.get_mut().merge_assign(other_hold);
184 }
185 Entry::Vacant(v) => {
186 v.insert(other_hold);
187 }
188 }
189 }
190 }
191
192 fn extend(&mut self, other: Self) {
200 for (id, other_hold) in other.storage_holds {
201 let prev = self.storage_holds.insert(id, other_hold);
202 assert!(prev.is_none(), "duplicate storage read hold: {id}");
203 }
204 for (id, other_hold) in other.compute_holds {
205 let prev = self.compute_holds.insert(id, other_hold);
206 assert!(prev.is_none(), "duplicate compute read hold: {id:?}");
207 }
208 }
209}
210
211impl crate::coord::Coordinator {
212 pub(crate) async fn initialize_storage_read_policies(
218 &mut self,
219 ids: BTreeSet<CatalogItemId>,
220 compaction_window: CompactionWindow,
221 ) {
222 let gids = ids
223 .into_iter()
224 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
225 .flatten()
226 .collect();
227 self.initialize_read_policies(
228 &CollectionIdBundle {
229 storage_ids: gids,
230 compute_ids: BTreeMap::new(),
231 },
232 compaction_window,
233 )
234 .await;
235 }
236
237 pub(crate) async fn initialize_compute_read_policies(
243 &mut self,
244 ids: Vec<GlobalId>,
245 instance: ComputeInstanceId,
246 compaction_window: CompactionWindow,
247 ) {
248 let mut compute_ids: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
249 compute_ids.insert(instance, ids.into_iter().collect());
250 self.initialize_read_policies(
251 &CollectionIdBundle {
252 storage_ids: BTreeSet::new(),
253 compute_ids,
254 },
255 compaction_window,
256 )
257 .await;
258 }
259
260 #[instrument(name = "coord::initialize_read_policies")]
266 pub(crate) async fn initialize_read_policies(
267 &mut self,
268 id_bundle: &CollectionIdBundle,
269 compaction_window: CompactionWindow,
270 ) {
271 for (timeline_context, id_bundle) in
273 self.catalog().partition_ids_by_timeline_context(id_bundle)
274 {
275 if let TimelineContext::TimelineDependent(timeline) = timeline_context {
276 let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await;
277 let read_ts = oracle.read_ts().await;
278
279 let mut new_read_holds = self.acquire_read_holds(&id_bundle);
280 new_read_holds.downgrade(read_ts);
281
282 let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await;
283 read_holds.extend(new_read_holds);
284 }
285 }
286
287 let read_policy = ReadPolicy::from(compaction_window);
289
290 let storage_policies = id_bundle
291 .storage_ids
292 .iter()
293 .map(|id| (*id, read_policy.clone()))
294 .collect();
295 self.controller
296 .storage_collections
297 .set_read_policies(storage_policies);
298
299 for (instance_id, collection_ids) in &id_bundle.compute_ids {
300 let compute_policies = collection_ids
301 .iter()
302 .map(|id| (*id, read_policy.clone()))
303 .collect();
304 self.controller
305 .compute
306 .set_read_policy(*instance_id, compute_policies)
307 .expect("cannot fail to set read policy");
308 }
309 }
310
311 pub(crate) fn update_storage_read_policies(&self, policies: Vec<(CatalogItemId, ReadPolicy)>) {
312 let policies = policies
313 .into_iter()
314 .map(|(item_id, policy)| {
315 self.catalog()
317 .get_entry(&item_id)
318 .global_ids()
319 .map(move |gid| (gid, policy.clone()))
320 })
321 .flatten()
322 .collect();
323 self.controller
324 .storage_collections
325 .set_read_policies(policies);
326 }
327
328 pub(crate) fn update_compute_read_policies(
329 &self,
330 mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy)>,
331 ) {
332 policies.sort_by_key(|&(cluster_id, _, _)| cluster_id);
333 for (cluster_id, group) in &policies
334 .into_iter()
335 .chunk_by(|&(cluster_id, _, _)| cluster_id)
336 {
337 let group = group
338 .flat_map(|(_, item_id, policy)| {
339 self.catalog()
341 .get_entry(&item_id)
342 .global_ids()
343 .map(move |gid| (gid, policy.clone()))
344 })
345 .collect();
346 self.controller
347 .compute
348 .set_read_policy(cluster_id, group)
349 .unwrap_or_terminate("cannot fail to set read policy");
350 }
351 }
352
353 pub(crate) fn update_compute_read_policy(
354 &self,
355 compute_instance: ComputeInstanceId,
356 item_id: CatalogItemId,
357 base_policy: ReadPolicy,
358 ) {
359 self.update_compute_read_policies(vec![(compute_instance, item_id, base_policy)])
360 }
361
362 pub(crate) fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds {
370 let mut read_holds = ReadHolds::new();
371
372 let desired_storage_holds = id_bundle.storage_ids.iter().map(|id| *id).collect_vec();
373 let storage_read_holds = self
374 .controller
375 .storage_collections
376 .acquire_read_holds(desired_storage_holds)
377 .expect("missing storage collections");
378 read_holds.storage_holds = storage_read_holds
379 .into_iter()
380 .map(|hold| (hold.id(), hold))
381 .collect();
382
383 for (&instance_id, collection_ids) in &id_bundle.compute_ids {
384 for &id in collection_ids {
385 let hold = self
386 .controller
387 .compute
388 .acquire_read_hold(instance_id, id)
389 .expect("missing compute collection");
390
391 let prev = read_holds.compute_holds.insert((instance_id, id), hold);
392 assert!(
393 prev.is_none(),
394 "duplicate compute ID in id_bundle {id_bundle:?}"
395 );
396 }
397 }
398
399 tracing::debug!(?read_holds, "acquire_read_holds");
400 read_holds
401 }
402
403 pub(crate) fn store_transaction_read_holds(
406 &mut self,
407 conn_id: ConnectionId,
408 read_holds: ReadHolds,
409 ) {
410 use std::collections::btree_map::Entry;
411
412 match self.txn_read_holds.entry(conn_id) {
413 Entry::Vacant(v) => {
414 v.insert(read_holds);
415 }
416 Entry::Occupied(mut o) => {
417 o.get_mut().merge(read_holds);
418 }
419 }
420 }
421}