mz_adapter/coord/
read_policy.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and methods related to initializing, updating, and removing read policies
11//! on collections.
12//!
13//! This module contains the API for read holds on collections. A "read hold" prevents
14//! the controller from compacting the associated collections, and ensures that they
15//! remain "readable" at a specific time, as long as the hold is held.
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_adapter_types::compaction::CompactionWindow;
23use mz_compute_types::ComputeInstanceId;
24use mz_ore::instrument;
25use mz_repr::{CatalogItemId, GlobalId, Timestamp};
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::read_holds::ReadHold;
28use mz_storage_types::read_policy::ReadPolicy;
29use timely::progress::Antichain;
30use timely::progress::Timestamp as TimelyTimestamp;
31
32use crate::coord::id_bundle::CollectionIdBundle;
33use crate::coord::timeline::{TimelineContext, TimelineState};
34use crate::session::Session;
35use crate::util::ResultExt;
36
37/// Read holds kept to ensure a set of collections remains readable at some
38/// time.
39///
40/// This is a collection of [`ReadHold`] objects, which act as tokens ensuring
41/// that read frontiers cannot advance past the held time as long as they exist.
42/// Dropping a [`ReadHolds`] also drops the [`ReadHold`] tokens within and
43/// relinquishes the associated read capabilities.
44#[derive(Debug)]
45pub struct ReadHolds<T: TimelyTimestamp> {
46    pub storage_holds: BTreeMap<GlobalId, ReadHold<T>>,
47    pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold<T>>,
48}
49
50impl<T: TimelyTimestamp> ReadHolds<T> {
51    /// Return empty `ReadHolds`.
52    pub fn new() -> Self {
53        ReadHolds {
54            storage_holds: BTreeMap::new(),
55            compute_holds: BTreeMap::new(),
56        }
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.storage_holds.is_empty() && self.compute_holds.is_empty()
61    }
62
63    /// Return the IDs of the contained storage collections.
64    pub fn storage_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
65        self.storage_holds.keys().copied()
66    }
67
68    /// Return the IDs of the contained compute collections.
69    pub fn compute_ids(&self) -> impl Iterator<Item = (ComputeInstanceId, GlobalId)> + '_ {
70        self.compute_holds.keys().copied()
71    }
72
73    /// Return a `CollectionIdBundle` containing all the IDs in the
74    /// [ReadHolds].
75    pub fn id_bundle(&self) -> CollectionIdBundle {
76        let mut res = CollectionIdBundle::default();
77        for id in self.storage_ids() {
78            res.storage_ids.insert(id);
79        }
80        for (instance_id, id) in self.compute_ids() {
81            res.compute_ids.entry(instance_id).or_default().insert(id);
82        }
83
84        res
85    }
86
87    /// Downgrade the contained [`ReadHold`]s to the given time.
88    pub fn downgrade(&mut self, time: T) {
89        let frontier = Antichain::from_elem(time);
90        for hold in self.storage_holds.values_mut() {
91            let _ = hold.try_downgrade(frontier.clone());
92        }
93        for hold in self.compute_holds.values_mut() {
94            let _ = hold.try_downgrade(frontier.clone());
95        }
96    }
97
98    pub fn remove_storage_collection(&mut self, id: GlobalId) {
99        self.storage_holds.remove(&id);
100    }
101
102    pub fn remove_compute_collection(&mut self, instance_id: ComputeInstanceId, id: GlobalId) {
103        self.compute_holds.remove(&(instance_id, id));
104    }
105}
106
107impl<T: TimelyTimestamp + Lattice> ReadHolds<T> {
108    pub fn least_valid_read(&self) -> Antichain<T> {
109        let mut since = Antichain::from_elem(T::minimum());
110        for hold in self.storage_holds.values() {
111            since.join_assign(hold.since());
112        }
113
114        for hold in self.compute_holds.values() {
115            since.join_assign(hold.since());
116        }
117
118        since
119    }
120
121    /// Returns the frontier at which this [ReadHolds] is holding back the
122    /// since of the collection identified by `id`. This does not mean that the
123    /// overall since of the collection is what we report here. Only that it is
124    /// _at least_ held back to the reported frontier by this read hold.
125    ///
126    /// This method is not meant to be fast, use wisely!
127    pub fn since(&self, desired_id: &GlobalId) -> Antichain<T> {
128        let mut since = Antichain::new();
129
130        if let Some(hold) = self.storage_holds.get(desired_id) {
131            since.extend(hold.since().iter().cloned());
132        }
133
134        for ((_instance, id), hold) in self.compute_holds.iter() {
135            if id != desired_id {
136                continue;
137            }
138            since.extend(hold.since().iter().cloned());
139        }
140
141        since
142    }
143
144    /// Merge the read holds in `other` into the contained read holds.
145    fn merge(&mut self, other: Self) {
146        use std::collections::btree_map::Entry;
147
148        for (id, other_hold) in other.storage_holds {
149            match self.storage_holds.entry(id) {
150                Entry::Occupied(mut o) => {
151                    o.get_mut().merge_assign(other_hold);
152                }
153                Entry::Vacant(v) => {
154                    v.insert(other_hold);
155                }
156            }
157        }
158        for (id, other_hold) in other.compute_holds {
159            match self.compute_holds.entry(id) {
160                Entry::Occupied(mut o) => {
161                    o.get_mut().merge_assign(other_hold);
162                }
163                Entry::Vacant(v) => {
164                    v.insert(other_hold);
165                }
166            }
167        }
168    }
169
170    /// Extend the contained read holds with those in `other`.
171    ///
172    ///
173    /// # Panics
174    ///
175    /// In contrast to [`ReadHolds::merge`], this method expects the collection
176    /// IDs in `self` and `other` to be distinct and panics otherwise.
177    fn extend(&mut self, other: Self) {
178        for (id, other_hold) in other.storage_holds {
179            let prev = self.storage_holds.insert(id, other_hold);
180            assert!(prev.is_none(), "duplicate storage read hold: {id}");
181        }
182        for (id, other_hold) in other.compute_holds {
183            let prev = self.compute_holds.insert(id, other_hold);
184            assert!(prev.is_none(), "duplicate compute read hold: {id:?}");
185        }
186    }
187}
188
189impl<T: TimelyTimestamp> Default for ReadHolds<T> {
190    fn default() -> Self {
191        ReadHolds::new()
192    }
193}
194
195impl crate::coord::Coordinator {
196    /// Initialize the storage read policies.
197    ///
198    /// This should be called only after a storage collection is created, and
199    /// ideally very soon afterwards. The collection is otherwise initialized
200    /// with a read policy that allows no compaction.
201    pub(crate) async fn initialize_storage_read_policies(
202        &mut self,
203        ids: BTreeSet<CatalogItemId>,
204        compaction_window: CompactionWindow,
205    ) {
206        let gids = ids
207            .into_iter()
208            .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
209            .flatten()
210            .collect();
211        self.initialize_read_policies(
212            &CollectionIdBundle {
213                storage_ids: gids,
214                compute_ids: BTreeMap::new(),
215            },
216            compaction_window,
217        )
218        .await;
219    }
220
221    /// Initialize the compute read policies.
222    ///
223    /// This should be called only after a compute collection is created, and
224    /// ideally very soon afterwards. The collection is otherwise initialized
225    /// with a read policy that allows no compaction.
226    pub(crate) async fn initialize_compute_read_policies(
227        &mut self,
228        ids: Vec<GlobalId>,
229        instance: ComputeInstanceId,
230        compaction_window: CompactionWindow,
231    ) {
232        let mut compute_ids: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
233        compute_ids.insert(instance, ids.into_iter().collect());
234        self.initialize_read_policies(
235            &CollectionIdBundle {
236                storage_ids: BTreeSet::new(),
237                compute_ids,
238            },
239            compaction_window,
240        )
241        .await;
242    }
243
244    /// Initialize the storage and compute read policies.
245    ///
246    /// This should be called only after a collection is created, and
247    /// ideally very soon afterwards. The collection is otherwise initialized
248    /// with a read policy that allows no compaction.
249    #[instrument(name = "coord::initialize_read_policies")]
250    pub(crate) async fn initialize_read_policies(
251        &mut self,
252        id_bundle: &CollectionIdBundle,
253        compaction_window: CompactionWindow,
254    ) {
255        // Install read holds in the Coordinator's timeline state.
256        for (timeline_context, id_bundle) in self.partition_ids_by_timeline_context(id_bundle) {
257            if let TimelineContext::TimelineDependent(timeline) = timeline_context {
258                let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await;
259                let read_ts = oracle.read_ts().await;
260
261                let mut new_read_holds = self.acquire_read_holds(&id_bundle);
262                new_read_holds.downgrade(read_ts);
263
264                let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await;
265                read_holds.extend(new_read_holds);
266            }
267        }
268
269        // Install read policies.
270        let read_policy = ReadPolicy::from(compaction_window);
271
272        let storage_policies = id_bundle
273            .storage_ids
274            .iter()
275            .map(|id| (*id, read_policy.clone()))
276            .collect();
277        self.controller
278            .storage_collections
279            .set_read_policies(storage_policies);
280
281        for (instance_id, collection_ids) in &id_bundle.compute_ids {
282            let compute_policies = collection_ids
283                .iter()
284                .map(|id| (*id, read_policy.clone()))
285                .collect();
286            self.controller
287                .compute
288                .set_read_policy(*instance_id, compute_policies)
289                .expect("cannot fail to set read policy");
290        }
291    }
292
293    pub(crate) fn update_storage_read_policies(
294        &self,
295        policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>,
296    ) {
297        let policies = policies
298            .into_iter()
299            .map(|(item_id, policy)| {
300                // Set the read policy for all GlobalIds associated with an item.
301                self.catalog()
302                    .get_entry(&item_id)
303                    .global_ids()
304                    .map(move |gid| (gid, policy.clone()))
305            })
306            .flatten()
307            .collect();
308        self.controller
309            .storage_collections
310            .set_read_policies(policies);
311    }
312
313    pub(crate) fn update_compute_read_policies(
314        &self,
315        mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>,
316    ) {
317        policies.sort_by_key(|&(cluster_id, _, _)| cluster_id);
318        for (cluster_id, group) in &policies
319            .into_iter()
320            .group_by(|&(cluster_id, _, _)| cluster_id)
321        {
322            let group = group
323                .flat_map(|(_, item_id, policy)| {
324                    // Set the read policy for all GlobalIds associated with an item.
325                    self.catalog()
326                        .get_entry(&item_id)
327                        .global_ids()
328                        .map(move |gid| (gid, policy.clone()))
329                })
330                .collect();
331            self.controller
332                .compute
333                .set_read_policy(cluster_id, group)
334                .unwrap_or_terminate("cannot fail to set read policy");
335        }
336    }
337
338    pub(crate) fn update_compute_read_policy(
339        &self,
340        compute_instance: ComputeInstanceId,
341        item_id: CatalogItemId,
342        base_policy: ReadPolicy<Timestamp>,
343    ) {
344        self.update_compute_read_policies(vec![(compute_instance, item_id, base_policy)])
345    }
346
347    /// Attempt to acquire read holds on the indicated collections at the
348    /// earliest available time.
349    ///
350    /// # Panics
351    ///
352    /// Will panic if any of the referenced collections in `id_bundle` don't
353    /// exist.
354    pub(crate) fn acquire_read_holds(
355        &self,
356        id_bundle: &CollectionIdBundle,
357    ) -> ReadHolds<Timestamp> {
358        let mut read_holds = ReadHolds::new();
359
360        let desired_storage_holds = id_bundle.storage_ids.iter().map(|id| *id).collect_vec();
361        let storage_read_holds = self
362            .controller
363            .storage_collections
364            .acquire_read_holds(desired_storage_holds)
365            .expect("missing storage collections");
366        read_holds.storage_holds = storage_read_holds
367            .into_iter()
368            .map(|hold| (hold.id(), hold))
369            .collect();
370
371        for (&instance_id, collection_ids) in &id_bundle.compute_ids {
372            for &id in collection_ids {
373                let hold = self
374                    .controller
375                    .compute
376                    .acquire_read_hold(instance_id, id)
377                    .expect("missing compute collection");
378
379                let prev = read_holds.compute_holds.insert((instance_id, id), hold);
380                assert!(
381                    prev.is_none(),
382                    "duplicate compute ID in id_bundle {id_bundle:?}"
383                );
384            }
385        }
386
387        tracing::debug!(?read_holds, "acquire_read_holds");
388        read_holds
389    }
390
391    /// Stash transaction read holds. They will be released when the transaction
392    /// is cleaned up.
393    pub(crate) fn store_transaction_read_holds(
394        &mut self,
395        session: &Session,
396        read_holds: ReadHolds<Timestamp>,
397    ) {
398        use std::collections::btree_map::Entry;
399
400        let conn_id = session.conn_id().clone();
401        match self.txn_read_holds.entry(conn_id) {
402            Entry::Vacant(v) => {
403                v.insert(read_holds);
404            }
405            Entry::Occupied(mut o) => {
406                o.get_mut().merge(read_holds);
407            }
408        }
409    }
410}