mz_adapter/coord/
read_policy.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and methods related to initializing, updating, and removing read policies
11//! on collections.
12//!
13//! This module contains the API for read holds on collections. A "read hold" prevents
14//! the controller from compacting the associated collections, and ensures that they
15//! remain "readable" at a specific time, as long as the hold is held.
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_adapter_types::compaction::CompactionWindow;
23use mz_adapter_types::connection::ConnectionId;
24use mz_compute_types::ComputeInstanceId;
25use mz_ore::instrument;
26use mz_repr::{CatalogItemId, GlobalId, Timestamp};
27use mz_storage_types::read_holds::ReadHold;
28use mz_storage_types::read_policy::ReadPolicy;
29use timely::progress::Antichain;
30use timely::progress::Timestamp as TimelyTimestamp;
31
32use crate::coord::id_bundle::CollectionIdBundle;
33use crate::coord::timeline::{TimelineContext, TimelineState};
34use crate::util::ResultExt;
35
36/// Read holds kept to ensure a set of collections remains readable at some
37/// time.
38///
39/// This is a collection of [`ReadHold`] objects, which act as tokens ensuring
40/// that read frontiers cannot advance past the held time as long as they exist.
41/// Dropping a [`ReadHolds`] also drops the [`ReadHold`] tokens within and
42/// relinquishes the associated read capabilities.
43#[derive(Debug, Clone)]
44pub struct ReadHolds<T: TimelyTimestamp> {
45    pub storage_holds: BTreeMap<GlobalId, ReadHold<T>>,
46    pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold<T>>,
47}
48
49impl<T: TimelyTimestamp> ReadHolds<T> {
50    /// Return empty `ReadHolds`.
51    pub fn new() -> Self {
52        ReadHolds {
53            storage_holds: BTreeMap::new(),
54            compute_holds: BTreeMap::new(),
55        }
56    }
57
58    pub fn is_empty(&self) -> bool {
59        self.storage_holds.is_empty() && self.compute_holds.is_empty()
60    }
61
62    /// Return the IDs of the contained storage collections.
63    pub fn storage_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
64        self.storage_holds.keys().copied()
65    }
66
67    /// Return the IDs of the contained compute collections.
68    pub fn compute_ids(&self) -> impl Iterator<Item = (ComputeInstanceId, GlobalId)> + '_ {
69        self.compute_holds.keys().copied()
70    }
71
72    /// Return a `CollectionIdBundle` containing all the IDs in the
73    /// [ReadHolds].
74    pub fn id_bundle(&self) -> CollectionIdBundle {
75        let mut res = CollectionIdBundle::default();
76        for id in self.storage_ids() {
77            res.storage_ids.insert(id);
78        }
79        for (instance_id, id) in self.compute_ids() {
80            res.compute_ids.entry(instance_id).or_default().insert(id);
81        }
82
83        res
84    }
85
86    /// Downgrade the contained [`ReadHold`]s to the given time.
87    pub fn downgrade(&mut self, time: T) {
88        let frontier = Antichain::from_elem(time);
89        for hold in self.storage_holds.values_mut() {
90            let _ = hold.try_downgrade(frontier.clone());
91        }
92        for hold in self.compute_holds.values_mut() {
93            let _ = hold.try_downgrade(frontier.clone());
94        }
95    }
96
97    pub fn remove_storage_collection(&mut self, id: GlobalId) {
98        self.storage_holds.remove(&id);
99    }
100
101    pub fn remove_compute_collection(&mut self, instance_id: ComputeInstanceId, id: GlobalId) {
102        self.compute_holds.remove(&(instance_id, id));
103    }
104
105    /// Returns a new ReadHolds containing only the holds for collections in `id_bundle`.
106    pub fn subset(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<T> {
107        let mut result = ReadHolds::new();
108
109        for id in &id_bundle.storage_ids {
110            if let Some(hold) = self.storage_holds.get(id) {
111                result.storage_holds.insert(*id, hold.clone());
112            }
113        }
114
115        for (instance_id, ids) in &id_bundle.compute_ids {
116            for id in ids {
117                if let Some(hold) = self.compute_holds.get(&(*instance_id, *id)) {
118                    result
119                        .compute_holds
120                        .insert((*instance_id, *id), hold.clone());
121                }
122            }
123        }
124
125        result
126    }
127}
128
129impl<T: TimelyTimestamp + Lattice> ReadHolds<T> {
130    pub fn least_valid_read(&self) -> Antichain<T> {
131        let mut since = Antichain::from_elem(T::minimum());
132        for hold in self.storage_holds.values() {
133            since.join_assign(hold.since());
134        }
135
136        for hold in self.compute_holds.values() {
137            since.join_assign(hold.since());
138        }
139
140        since
141    }
142
143    /// Returns the frontier at which this [ReadHolds] is holding back the
144    /// since of the collection identified by `id`. This does not mean that the
145    /// overall since of the collection is what we report here. Only that it is
146    /// _at least_ held back to the reported frontier by this read hold.
147    ///
148    /// This method is not meant to be fast, use wisely!
149    pub fn since(&self, desired_id: &GlobalId) -> Antichain<T> {
150        let mut since = Antichain::new();
151
152        if let Some(hold) = self.storage_holds.get(desired_id) {
153            since.extend(hold.since().iter().cloned());
154        }
155
156        for ((_instance, id), hold) in self.compute_holds.iter() {
157            if id != desired_id {
158                continue;
159            }
160            since.extend(hold.since().iter().cloned());
161        }
162
163        since
164    }
165
166    /// Merge the read holds in `other` into the contained read holds.
167    fn merge(&mut self, other: Self) {
168        use std::collections::btree_map::Entry;
169
170        for (id, other_hold) in other.storage_holds {
171            match self.storage_holds.entry(id) {
172                Entry::Occupied(mut o) => {
173                    o.get_mut().merge_assign(other_hold);
174                }
175                Entry::Vacant(v) => {
176                    v.insert(other_hold);
177                }
178            }
179        }
180        for (id, other_hold) in other.compute_holds {
181            match self.compute_holds.entry(id) {
182                Entry::Occupied(mut o) => {
183                    o.get_mut().merge_assign(other_hold);
184                }
185                Entry::Vacant(v) => {
186                    v.insert(other_hold);
187                }
188            }
189        }
190    }
191
192    /// Extend the contained read holds with those in `other`.
193    ///
194    ///
195    /// # Panics
196    ///
197    /// In contrast to [`ReadHolds::merge`], this method expects the collection
198    /// IDs in `self` and `other` to be distinct and panics otherwise.
199    fn extend(&mut self, other: Self) {
200        for (id, other_hold) in other.storage_holds {
201            let prev = self.storage_holds.insert(id, other_hold);
202            assert!(prev.is_none(), "duplicate storage read hold: {id}");
203        }
204        for (id, other_hold) in other.compute_holds {
205            let prev = self.compute_holds.insert(id, other_hold);
206            assert!(prev.is_none(), "duplicate compute read hold: {id:?}");
207        }
208    }
209}
210
211impl<T: TimelyTimestamp> Default for ReadHolds<T> {
212    fn default() -> Self {
213        ReadHolds::new()
214    }
215}
216
217impl crate::coord::Coordinator {
218    /// Initialize the storage read policies.
219    ///
220    /// This should be called only after a storage collection is created, and
221    /// ideally very soon afterwards. The collection is otherwise initialized
222    /// with a read policy that allows no compaction.
223    pub(crate) async fn initialize_storage_read_policies(
224        &mut self,
225        ids: BTreeSet<CatalogItemId>,
226        compaction_window: CompactionWindow,
227    ) {
228        let gids = ids
229            .into_iter()
230            .map(|item_id| self.catalog().get_entry(&item_id).global_ids())
231            .flatten()
232            .collect();
233        self.initialize_read_policies(
234            &CollectionIdBundle {
235                storage_ids: gids,
236                compute_ids: BTreeMap::new(),
237            },
238            compaction_window,
239        )
240        .await;
241    }
242
243    /// Initialize the compute read policies.
244    ///
245    /// This should be called only after a compute collection is created, and
246    /// ideally very soon afterwards. The collection is otherwise initialized
247    /// with a read policy that allows no compaction.
248    pub(crate) async fn initialize_compute_read_policies(
249        &mut self,
250        ids: Vec<GlobalId>,
251        instance: ComputeInstanceId,
252        compaction_window: CompactionWindow,
253    ) {
254        let mut compute_ids: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
255        compute_ids.insert(instance, ids.into_iter().collect());
256        self.initialize_read_policies(
257            &CollectionIdBundle {
258                storage_ids: BTreeSet::new(),
259                compute_ids,
260            },
261            compaction_window,
262        )
263        .await;
264    }
265
266    /// Initialize the storage and compute read policies.
267    ///
268    /// This should be called only after a collection is created, and
269    /// ideally very soon afterwards. The collection is otherwise initialized
270    /// with a read policy that allows no compaction.
271    #[instrument(name = "coord::initialize_read_policies")]
272    pub(crate) async fn initialize_read_policies(
273        &mut self,
274        id_bundle: &CollectionIdBundle,
275        compaction_window: CompactionWindow,
276    ) {
277        // Install read holds in the Coordinator's timeline state.
278        for (timeline_context, id_bundle) in
279            self.catalog().partition_ids_by_timeline_context(id_bundle)
280        {
281            if let TimelineContext::TimelineDependent(timeline) = timeline_context {
282                let TimelineState { oracle, .. } = self.ensure_timeline_state(&timeline).await;
283                let read_ts = oracle.read_ts().await;
284
285                let mut new_read_holds = self.acquire_read_holds(&id_bundle);
286                new_read_holds.downgrade(read_ts);
287
288                let TimelineState { read_holds, .. } = self.ensure_timeline_state(&timeline).await;
289                read_holds.extend(new_read_holds);
290            }
291        }
292
293        // Install read policies.
294        let read_policy = ReadPolicy::from(compaction_window);
295
296        let storage_policies = id_bundle
297            .storage_ids
298            .iter()
299            .map(|id| (*id, read_policy.clone()))
300            .collect();
301        self.controller
302            .storage_collections
303            .set_read_policies(storage_policies);
304
305        for (instance_id, collection_ids) in &id_bundle.compute_ids {
306            let compute_policies = collection_ids
307                .iter()
308                .map(|id| (*id, read_policy.clone()))
309                .collect();
310            self.controller
311                .compute
312                .set_read_policy(*instance_id, compute_policies)
313                .expect("cannot fail to set read policy");
314        }
315    }
316
317    pub(crate) fn update_storage_read_policies(
318        &self,
319        policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>,
320    ) {
321        let policies = policies
322            .into_iter()
323            .map(|(item_id, policy)| {
324                // Set the read policy for all GlobalIds associated with an item.
325                self.catalog()
326                    .get_entry(&item_id)
327                    .global_ids()
328                    .map(move |gid| (gid, policy.clone()))
329            })
330            .flatten()
331            .collect();
332        self.controller
333            .storage_collections
334            .set_read_policies(policies);
335    }
336
337    pub(crate) fn update_compute_read_policies(
338        &self,
339        mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>,
340    ) {
341        policies.sort_by_key(|&(cluster_id, _, _)| cluster_id);
342        for (cluster_id, group) in &policies
343            .into_iter()
344            .chunk_by(|&(cluster_id, _, _)| cluster_id)
345        {
346            let group = group
347                .flat_map(|(_, item_id, policy)| {
348                    // Set the read policy for all GlobalIds associated with an item.
349                    self.catalog()
350                        .get_entry(&item_id)
351                        .global_ids()
352                        .map(move |gid| (gid, policy.clone()))
353                })
354                .collect();
355            self.controller
356                .compute
357                .set_read_policy(cluster_id, group)
358                .unwrap_or_terminate("cannot fail to set read policy");
359        }
360    }
361
362    pub(crate) fn update_compute_read_policy(
363        &self,
364        compute_instance: ComputeInstanceId,
365        item_id: CatalogItemId,
366        base_policy: ReadPolicy<Timestamp>,
367    ) {
368        self.update_compute_read_policies(vec![(compute_instance, item_id, base_policy)])
369    }
370
371    /// Attempt to acquire read holds on the indicated collections at the
372    /// earliest available time.
373    ///
374    /// # Panics
375    ///
376    /// Will panic if any of the referenced collections in `id_bundle` don't
377    /// exist.
378    pub(crate) fn acquire_read_holds(
379        &self,
380        id_bundle: &CollectionIdBundle,
381    ) -> ReadHolds<Timestamp> {
382        let mut read_holds = ReadHolds::new();
383
384        let desired_storage_holds = id_bundle.storage_ids.iter().map(|id| *id).collect_vec();
385        let storage_read_holds = self
386            .controller
387            .storage_collections
388            .acquire_read_holds(desired_storage_holds)
389            .expect("missing storage collections");
390        read_holds.storage_holds = storage_read_holds
391            .into_iter()
392            .map(|hold| (hold.id(), hold))
393            .collect();
394
395        for (&instance_id, collection_ids) in &id_bundle.compute_ids {
396            for &id in collection_ids {
397                let hold = self
398                    .controller
399                    .compute
400                    .acquire_read_hold(instance_id, id)
401                    .expect("missing compute collection");
402
403                let prev = read_holds.compute_holds.insert((instance_id, id), hold);
404                assert!(
405                    prev.is_none(),
406                    "duplicate compute ID in id_bundle {id_bundle:?}"
407                );
408            }
409        }
410
411        tracing::debug!(?read_holds, "acquire_read_holds");
412        read_holds
413    }
414
415    /// Stash transaction read holds. They will be released when the transaction
416    /// is cleaned up.
417    pub(crate) fn store_transaction_read_holds(
418        &mut self,
419        conn_id: ConnectionId,
420        read_holds: ReadHolds<Timestamp>,
421    ) {
422        use std::collections::btree_map::Entry;
423
424        match self.txn_read_holds.entry(conn_id) {
425            Entry::Vacant(v) => {
426                v.insert(read_holds);
427            }
428            Entry::Occupied(mut o) => {
429                o.get_mut().merge(read_holds);
430            }
431        }
432    }
433}