1use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_adapter_types::compaction::CompactionWindow;
23use mz_compute_types::ComputeInstanceId;
24use mz_ore::instrument;
25use mz_repr::{CatalogItemId, GlobalId, Timestamp};
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::read_holds::ReadHold;
28use mz_storage_types::read_policy::ReadPolicy;
29use timely::progress::Antichain;
30use timely::progress::Timestamp as TimelyTimestamp;
31
32use crate::coord::id_bundle::CollectionIdBundle;
33use crate::coord::timeline::{TimelineContext, TimelineState};
34use crate::session::Session;
35use crate::util::ResultExt;
36
37#[derive(Debug)]
45pub struct ReadHolds<T: TimelyTimestamp> {
46 pub storage_holds: BTreeMap<GlobalId, ReadHold<T>>,
47 pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold<T>>,
48}
49
50impl<T: TimelyTimestamp> ReadHolds<T> {
51 pub fn new() -> Self {
53 ReadHolds {
54 storage_holds: BTreeMap::new(),
55 compute_holds: BTreeMap::new(),
56 }
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.storage_holds.is_empty() && self.compute_holds.is_empty()
61 }
62
63 pub fn storage_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
65 self.storage_holds.keys().copied()
66 }
67
68 pub fn compute_ids(&self) -> impl Iterator<Item = (ComputeInstanceId, GlobalId)> + '_ {
70 self.compute_holds.keys().copied()
71 }
72
73 pub fn id_bundle(&self) -> CollectionIdBundle {
76 let mut res = CollectionIdBundle::default();
77 for id in self.storage_ids() {
78 res.storage_ids.insert(id);
79 }
80 for (instance_id, id) in self.compute_ids() {
81 res.compute_ids.entry(instance_id).or_default().insert(id);
82 }
83
84 res
85 }
86
87 pub fn downgrade(&mut self, time: T) {
89 let frontier = Antichain::from_elem(time);
90 for hold in self.storage_holds.values_mut() {
91 let _ = hold.try_downgrade(frontier.clone());
92 }
93 for hold in self.compute_holds.values_mut() {
94 let _ = hold.try_downgrade(frontier.clone());
95 }
96 }
97
98 pub fn remove_storage_collection(&mut self, id: GlobalId) {
99 self.storage_holds.remove(&id);
100 }
101
102 pub fn remove_compute_collection(&mut self, instance_id: ComputeInstanceId, id: GlobalId) {
103 self.compute_holds.remove(&(instance_id, id));
104 }
105}
106
107impl<T: TimelyTimestamp + Lattice> ReadHolds<T> {
108 pub fn least_valid_read(&self) -> Antichain<T> {
109 let mut since = Antichain::from_elem(T::minimum());
110 for hold in self.storage_holds.values() {
111 since.join_assign(hold.since());
112 }
113
114 for hold in self.compute_holds.values() {
115 since.join_assign(hold.since());
116 }
117
118 since
119 }
120
121 pub fn since(&self, desired_id: &GlobalId) -> Antichain<T> {
128 let mut since = Antichain::new();
129
130 if let Some(hold) = self.storage_holds.get(desired_id) {
131 since.extend(hold.since().iter().cloned());
132 }
133
134 for ((_instance, id), hold) in self.compute_holds.iter() {
135 if id != desired_id {
136 continue;
137 }
138 since.extend(hold.since().iter().cloned());
139 }
140
141 since
142 }
143
144 fn merge(&mut self, other: Self) {
146 use std::collections::btree_map::Entry;
147
148 for (id, other_hold) in other.storage_holds {
149 match self.storage_holds.entry(id) {
150 Entry::Occupied(mut o) => {
151 o.get_mut().merge_assign(other_hold);
152 }
153 Entry::Vacant(v) => {
154 v.insert(other_hold);
155 }
156 }
157 }
158 for (id, other_hold) in other.compute_holds {
159 match self.compute_holds.entry(id) {
160 Entry::Occupied(mut o) => {
161 o.get_mut().merge_assign(other_hold);
162 }
163 Entry::Vacant(v) => {
164 v.insert(other_hold);
165 }
166 }
167 }
168 }
169
170 fn extend(&mut self, other: Self) {
178 for (id, other_hold) in other.storage_holds {
179 let prev = self.storage_holds.insert(id, other_hold);
180 assert!(prev.is_none(), "duplicate storage read hold: {id}");
181 }
182 for (id, other_hold) in other.compute_holds {
183 let prev = self.compute_holds.insert(id, other_hold);
184 assert!(prev.is_none(), "duplicate compute read hold: {id:?}");
185 }
186 }
187}
188
189impl<T: TimelyTimestamp> Default for ReadHolds<T> {
190 fn default() -> Self {
191 ReadHolds::new()
192 }
193}
194
195impl crate::coord::Coordinator {
196 pub(crate) async fn initialize_storage_read_policies(
202 &mut self,
203 ids: BTreeSet<CatalogItemId>,
204 compaction_window: CompactionWindow,
205 ) {
206 let gids = ids
207 .into_iter()
208 .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
209 .flatten()
210 .collect();
211 self.initialize_read_policies(
212 &CollectionIdBundle {
213 storage_ids: gids,
214 compute_ids: BTreeMap::new(),
215 },
216 compaction_window,
217 )
218 .await;
219 }
220
221 pub(crate) async fn initialize_compute_read_policies(
227 &mut self,
228 ids: Vec<GlobalId>,
229 instance: ComputeInstanceId,
230 compaction_window: CompactionWindow,
231 ) {
232 let mut compute_ids: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
233 compute_ids.insert(instance, ids.into_iter().collect());
234 self.initialize_read_policies(
235 &CollectionIdBundle {
236 storage_ids: BTreeSet::new(),
237 compute_ids,
238 },
239 compaction_window,
240 )
241 .await;
242 }
243
244 #[instrument(name = "coord::initialize_read_policies")]
250 pub(crate) async fn initialize_read_policies(
251 &mut self,
252 id_bundle: &CollectionIdBundle,
253 compaction_window: CompactionWindow,
254 ) {
255 for (timeline_context, id_bundle) in
257 self.catalog().partition_ids_by_timeline_context(id_bundle)
258 {
259 if let TimelineContext::TimelineDependent(timeline) = timeline_context {
260 let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await;
261 let read_ts = oracle.read_ts().await;
262
263 let mut new_read_holds = self.acquire_read_holds(&id_bundle);
264 new_read_holds.downgrade(read_ts);
265
266 let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await;
267 read_holds.extend(new_read_holds);
268 }
269 }
270
271 let read_policy = ReadPolicy::from(compaction_window);
273
274 let storage_policies = id_bundle
275 .storage_ids
276 .iter()
277 .map(|id| (*id, read_policy.clone()))
278 .collect();
279 self.controller
280 .storage_collections
281 .set_read_policies(storage_policies);
282
283 for (instance_id, collection_ids) in &id_bundle.compute_ids {
284 let compute_policies = collection_ids
285 .iter()
286 .map(|id| (*id, read_policy.clone()))
287 .collect();
288 self.controller
289 .compute
290 .set_read_policy(*instance_id, compute_policies)
291 .expect("cannot fail to set read policy");
292 }
293 }
294
295 pub(crate) fn update_storage_read_policies(
296 &self,
297 policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>,
298 ) {
299 let policies = policies
300 .into_iter()
301 .map(|(item_id, policy)| {
302 self.catalog()
304 .get_entry(&item_id)
305 .global_ids()
306 .map(move |gid| (gid, policy.clone()))
307 })
308 .flatten()
309 .collect();
310 self.controller
311 .storage_collections
312 .set_read_policies(policies);
313 }
314
315 pub(crate) fn update_compute_read_policies(
316 &self,
317 mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>,
318 ) {
319 policies.sort_by_key(|&(cluster_id, _, _)| cluster_id);
320 for (cluster_id, group) in &policies
321 .into_iter()
322 .chunk_by(|&(cluster_id, _, _)| cluster_id)
323 {
324 let group = group
325 .flat_map(|(_, item_id, policy)| {
326 self.catalog()
328 .get_entry(&item_id)
329 .global_ids()
330 .map(move |gid| (gid, policy.clone()))
331 })
332 .collect();
333 self.controller
334 .compute
335 .set_read_policy(cluster_id, group)
336 .unwrap_or_terminate("cannot fail to set read policy");
337 }
338 }
339
340 pub(crate) fn update_compute_read_policy(
341 &self,
342 compute_instance: ComputeInstanceId,
343 item_id: CatalogItemId,
344 base_policy: ReadPolicy<Timestamp>,
345 ) {
346 self.update_compute_read_policies(vec![(compute_instance, item_id, base_policy)])
347 }
348
349 pub(crate) fn acquire_read_holds(
357 &self,
358 id_bundle: &CollectionIdBundle,
359 ) -> ReadHolds<Timestamp> {
360 let mut read_holds = ReadHolds::new();
361
362 let desired_storage_holds = id_bundle.storage_ids.iter().map(|id| *id).collect_vec();
363 let storage_read_holds = self
364 .controller
365 .storage_collections
366 .acquire_read_holds(desired_storage_holds)
367 .expect("missing storage collections");
368 read_holds.storage_holds = storage_read_holds
369 .into_iter()
370 .map(|hold| (hold.id(), hold))
371 .collect();
372
373 for (&instance_id, collection_ids) in &id_bundle.compute_ids {
374 for &id in collection_ids {
375 let hold = self
376 .controller
377 .compute
378 .acquire_read_hold(instance_id, id)
379 .expect("missing compute collection");
380
381 let prev = read_holds.compute_holds.insert((instance_id, id), hold);
382 assert!(
383 prev.is_none(),
384 "duplicate compute ID in id_bundle {id_bundle:?}"
385 );
386 }
387 }
388
389 tracing::debug!(?read_holds, "acquire_read_holds");
390 read_holds
391 }
392
393 pub(crate) fn store_transaction_read_holds(
396 &mut self,
397 session: &Session,
398 read_holds: ReadHolds<Timestamp>,
399 ) {
400 use std::collections::btree_map::Entry;
401
402 let conn_id = session.conn_id().clone();
403 match self.txn_read_holds.entry(conn_id) {
404 Entry::Vacant(v) => {
405 v.insert(read_holds);
406 }
407 Entry::Occupied(mut o) => {
408 o.get_mut().merge(read_holds);
409 }
410 }
411 }
412}