1use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_adapter_types::compaction::CompactionWindow;
23use mz_adapter_types::connection::ConnectionId;
24use mz_compute_types::ComputeInstanceId;
25use mz_ore::instrument;
26use mz_repr::{CatalogItemId, GlobalId, Timestamp};
27use mz_storage_types::read_holds::ReadHold;
28use mz_storage_types::read_policy::ReadPolicy;
29use timely::progress::Antichain;
30use timely::progress::Timestamp as TimelyTimestamp;
31
32use crate::coord::id_bundle::CollectionIdBundle;
33use crate::coord::timeline::{TimelineContext, TimelineState};
34use crate::util::ResultExt;
35
36#[derive(Debug, Clone)]
44pub struct ReadHolds<T: TimelyTimestamp> {
45 pub storage_holds: BTreeMap<GlobalId, ReadHold<T>>,
46 pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold<T>>,
47}
48
49impl<T: TimelyTimestamp> ReadHolds<T> {
50 pub fn new() -> Self {
52 ReadHolds {
53 storage_holds: BTreeMap::new(),
54 compute_holds: BTreeMap::new(),
55 }
56 }
57
58 pub fn is_empty(&self) -> bool {
59 self.storage_holds.is_empty() && self.compute_holds.is_empty()
60 }
61
62 pub fn storage_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
64 self.storage_holds.keys().copied()
65 }
66
67 pub fn compute_ids(&self) -> impl Iterator<Item = (ComputeInstanceId, GlobalId)> + '_ {
69 self.compute_holds.keys().copied()
70 }
71
72 pub fn id_bundle(&self) -> CollectionIdBundle {
75 let mut res = CollectionIdBundle::default();
76 for id in self.storage_ids() {
77 res.storage_ids.insert(id);
78 }
79 for (instance_id, id) in self.compute_ids() {
80 res.compute_ids.entry(instance_id).or_default().insert(id);
81 }
82
83 res
84 }
85
86 pub fn downgrade(&mut self, time: T) {
88 let frontier = Antichain::from_elem(time);
89 for hold in self.storage_holds.values_mut() {
90 let _ = hold.try_downgrade(frontier.clone());
91 }
92 for hold in self.compute_holds.values_mut() {
93 let _ = hold.try_downgrade(frontier.clone());
94 }
95 }
96
97 pub fn remove_storage_collection(&mut self, id: GlobalId) {
98 self.storage_holds.remove(&id);
99 }
100
101 pub fn remove_compute_collection(&mut self, instance_id: ComputeInstanceId, id: GlobalId) {
102 self.compute_holds.remove(&(instance_id, id));
103 }
104
105 pub fn subset(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<T> {
107 let mut result = ReadHolds::new();
108
109 for id in &id_bundle.storage_ids {
110 if let Some(hold) = self.storage_holds.get(id) {
111 result.storage_holds.insert(*id, hold.clone());
112 }
113 }
114
115 for (instance_id, ids) in &id_bundle.compute_ids {
116 for id in ids {
117 if let Some(hold) = self.compute_holds.get(&(*instance_id, *id)) {
118 result
119 .compute_holds
120 .insert((*instance_id, *id), hold.clone());
121 }
122 }
123 }
124
125 result
126 }
127}
128
129impl<T: TimelyTimestamp + Lattice> ReadHolds<T> {
130 pub fn least_valid_read(&self) -> Antichain<T> {
131 let mut since = Antichain::from_elem(T::minimum());
132 for hold in self.storage_holds.values() {
133 since.join_assign(hold.since());
134 }
135
136 for hold in self.compute_holds.values() {
137 since.join_assign(hold.since());
138 }
139
140 since
141 }
142
143 pub fn since(&self, desired_id: &GlobalId) -> Antichain<T> {
150 let mut since = Antichain::new();
151
152 if let Some(hold) = self.storage_holds.get(desired_id) {
153 since.extend(hold.since().iter().cloned());
154 }
155
156 for ((_instance, id), hold) in self.compute_holds.iter() {
157 if id != desired_id {
158 continue;
159 }
160 since.extend(hold.since().iter().cloned());
161 }
162
163 since
164 }
165
166 fn merge(&mut self, other: Self) {
168 use std::collections::btree_map::Entry;
169
170 for (id, other_hold) in other.storage_holds {
171 match self.storage_holds.entry(id) {
172 Entry::Occupied(mut o) => {
173 o.get_mut().merge_assign(other_hold);
174 }
175 Entry::Vacant(v) => {
176 v.insert(other_hold);
177 }
178 }
179 }
180 for (id, other_hold) in other.compute_holds {
181 match self.compute_holds.entry(id) {
182 Entry::Occupied(mut o) => {
183 o.get_mut().merge_assign(other_hold);
184 }
185 Entry::Vacant(v) => {
186 v.insert(other_hold);
187 }
188 }
189 }
190 }
191
192 fn extend(&mut self, other: Self) {
200 for (id, other_hold) in other.storage_holds {
201 let prev = self.storage_holds.insert(id, other_hold);
202 assert!(prev.is_none(), "duplicate storage read hold: {id}");
203 }
204 for (id, other_hold) in other.compute_holds {
205 let prev = self.compute_holds.insert(id, other_hold);
206 assert!(prev.is_none(), "duplicate compute read hold: {id:?}");
207 }
208 }
209}
210
211impl<T: TimelyTimestamp> Default for ReadHolds<T> {
212 fn default() -> Self {
213 ReadHolds::new()
214 }
215}
216
217impl crate::coord::Coordinator {
218 pub(crate) async fn initialize_storage_read_policies(
224 &mut self,
225 ids: BTreeSet<CatalogItemId>,
226 compaction_window: CompactionWindow,
227 ) {
228 let gids = ids
229 .into_iter()
230 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
231 .flatten()
232 .collect();
233 self.initialize_read_policies(
234 &CollectionIdBundle {
235 storage_ids: gids,
236 compute_ids: BTreeMap::new(),
237 },
238 compaction_window,
239 )
240 .await;
241 }
242
243 pub(crate) async fn initialize_compute_read_policies(
249 &mut self,
250 ids: Vec<GlobalId>,
251 instance: ComputeInstanceId,
252 compaction_window: CompactionWindow,
253 ) {
254 let mut compute_ids: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
255 compute_ids.insert(instance, ids.into_iter().collect());
256 self.initialize_read_policies(
257 &CollectionIdBundle {
258 storage_ids: BTreeSet::new(),
259 compute_ids,
260 },
261 compaction_window,
262 )
263 .await;
264 }
265
266 #[instrument(name = "coord::initialize_read_policies")]
272 pub(crate) async fn initialize_read_policies(
273 &mut self,
274 id_bundle: &CollectionIdBundle,
275 compaction_window: CompactionWindow,
276 ) {
277 for (timeline_context, id_bundle) in
279 self.catalog().partition_ids_by_timeline_context(id_bundle)
280 {
281 if let TimelineContext::TimelineDependent(timeline) = timeline_context {
282 let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await;
283 let read_ts = oracle.read_ts().await;
284
285 let mut new_read_holds = self.acquire_read_holds(&id_bundle);
286 new_read_holds.downgrade(read_ts);
287
288 let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await;
289 read_holds.extend(new_read_holds);
290 }
291 }
292
293 let read_policy = ReadPolicy::from(compaction_window);
295
296 let storage_policies = id_bundle
297 .storage_ids
298 .iter()
299 .map(|id| (*id, read_policy.clone()))
300 .collect();
301 self.controller
302 .storage_collections
303 .set_read_policies(storage_policies);
304
305 for (instance_id, collection_ids) in &id_bundle.compute_ids {
306 let compute_policies = collection_ids
307 .iter()
308 .map(|id| (*id, read_policy.clone()))
309 .collect();
310 self.controller
311 .compute
312 .set_read_policy(*instance_id, compute_policies)
313 .expect("cannot fail to set read policy");
314 }
315 }
316
317 pub(crate) fn update_storage_read_policies(
318 &self,
319 policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>,
320 ) {
321 let policies = policies
322 .into_iter()
323 .map(|(item_id, policy)| {
324 self.catalog()
326 .get_entry(&item_id)
327 .global_ids()
328 .map(move |gid| (gid, policy.clone()))
329 })
330 .flatten()
331 .collect();
332 self.controller
333 .storage_collections
334 .set_read_policies(policies);
335 }
336
337 pub(crate) fn update_compute_read_policies(
338 &self,
339 mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>,
340 ) {
341 policies.sort_by_key(|&(cluster_id, _, _)| cluster_id);
342 for (cluster_id, group) in &policies
343 .into_iter()
344 .chunk_by(|&(cluster_id, _, _)| cluster_id)
345 {
346 let group = group
347 .flat_map(|(_, item_id, policy)| {
348 self.catalog()
350 .get_entry(&item_id)
351 .global_ids()
352 .map(move |gid| (gid, policy.clone()))
353 })
354 .collect();
355 self.controller
356 .compute
357 .set_read_policy(cluster_id, group)
358 .unwrap_or_terminate("cannot fail to set read policy");
359 }
360 }
361
362 pub(crate) fn update_compute_read_policy(
363 &self,
364 compute_instance: ComputeInstanceId,
365 item_id: CatalogItemId,
366 base_policy: ReadPolicy<Timestamp>,
367 ) {
368 self.update_compute_read_policies(vec![(compute_instance, item_id, base_policy)])
369 }
370
371 pub(crate) fn acquire_read_holds(
379 &self,
380 id_bundle: &CollectionIdBundle,
381 ) -> ReadHolds<Timestamp> {
382 let mut read_holds = ReadHolds::new();
383
384 let desired_storage_holds = id_bundle.storage_ids.iter().map(|id| *id).collect_vec();
385 let storage_read_holds = self
386 .controller
387 .storage_collections
388 .acquire_read_holds(desired_storage_holds)
389 .expect("missing storage collections");
390 read_holds.storage_holds = storage_read_holds
391 .into_iter()
392 .map(|hold| (hold.id(), hold))
393 .collect();
394
395 for (&instance_id, collection_ids) in &id_bundle.compute_ids {
396 for &id in collection_ids {
397 let hold = self
398 .controller
399 .compute
400 .acquire_read_hold(instance_id, id)
401 .expect("missing compute collection");
402
403 let prev = read_holds.compute_holds.insert((instance_id, id), hold);
404 assert!(
405 prev.is_none(),
406 "duplicate compute ID in id_bundle {id_bundle:?}"
407 );
408 }
409 }
410
411 tracing::debug!(?read_holds, "acquire_read_holds");
412 read_holds
413 }
414
415 pub(crate) fn store_transaction_read_holds(
418 &mut self,
419 conn_id: ConnectionId,
420 read_holds: ReadHolds<Timestamp>,
421 ) {
422 use std::collections::btree_map::Entry;
423
424 match self.txn_read_holds.entry(conn_id) {
425 Entry::Vacant(v) => {
426 v.insert(read_holds);
427 }
428 Entry::Occupied(mut o) => {
429 o.get_mut().merge(read_holds);
430 }
431 }
432 }
433}