mz_adapter/coord/
introspection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for unified compute introspection.
11//!
12//! Unified compute introspection is the process of collecting introspection data exported by
13//! individual replicas through their logging indexes and then writing that data, tagged with the
14//! respective replica ID, to "unified" storage collections. These storage collections then allow
15//! querying introspection data across all replicas and regardless of the health of individual
16//! replicas.
17//!
18//! # Lifecycle of Introspection Subscribes
19//!
20//! * After a new replica was created, the coordinator calls `install_introspection_subscribes` to
21//!   install all defined introspection subscribes on the new replica.
22//! * The coordinator calls `handle_introspection_subscribe_batch` for each response it receives
23//!   from an introspection subscribe, to write received updates to their corresponding
24//!   storage-managed collection.
25//! * Before a replica is dropped, the coordinator calls `drop_introspection_subscribes` to drop
26//!   all introspection subscribes previously installed on the replica.
27//! * When a replica disconnects without being dropped (e.g. because of a crash or network
28//!   failure), `handle_introspection_subscribe_batch` reacts on the corresponding error responses
29//!   by reinstalling the failed introspection subscribes.
30
31use anyhow::bail;
32use derivative::Derivative;
33use mz_adapter_types::dyncfgs::ENABLE_INTROSPECTION_SUBSCRIBES;
34use mz_cluster_client::ReplicaId;
35use mz_compute_client::controller::error::ERROR_TARGET_REPLICA_FAILED;
36use mz_compute_client::protocol::response::SubscribeBatch;
37use mz_controller_types::ClusterId;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_panic_or_log;
40use mz_repr::optimize::OverrideFrom;
41use mz_repr::{Datum, GlobalId, Row};
42use mz_sql::catalog::SessionCatalog;
43use mz_sql::plan::{Params, Plan, SubscribePlan};
44use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, RoleMetadata};
45use mz_storage_client::controller::{IntrospectionType, StorageWriteOp};
46use tracing::{Span, info};
47
48use crate::coord::{
49    Coordinator, IntrospectionSubscribeFinish, IntrospectionSubscribeOptimizeMir,
50    IntrospectionSubscribeStage, IntrospectionSubscribeTimestampOptimizeLir, Message, PlanValidity,
51    StageResult, Staged,
52};
53use crate::optimize::Optimize;
54use crate::{AdapterError, ExecuteResponse, optimize};
55
56// State tracked about an active introspection subscribe.
57#[derive(Derivative)]
58#[derivative(Debug)]
59pub(super) struct IntrospectionSubscribe {
60    /// The ID of the targeted cluster.
61    cluster_id: ClusterId,
62    /// The ID of the targeted replica.
63    replica_id: ReplicaId,
64    /// The spec from which this subscribe was created.
65    spec: &'static SubscribeSpec,
66    /// A storage write to be applied the next time the introspection subscribe produces any
67    /// output.
68    ///
69    /// This mechanism exists to delay the deletion of previous subscribe results from the target
70    /// storage collection when an introspection subscribe is reinstalled. After reinstallation it
71    /// can take a while for the new subscribe dataflow to produce its snapshot and keeping the old
72    /// introspection data around in the meantime makes for a better UX than removing it.
73    #[derivative(Debug = "ignore")]
74    deferred_write: Option<StorageWriteOp>,
75}
76
77impl IntrospectionSubscribe {
78    /// Returns a `StorageWriteOp` that instructs the deletion of all data previously written by
79    /// this subscribe.
80    fn delete_write_op(&self) -> StorageWriteOp {
81        let target_replica = self.replica_id.to_string();
82        let filter = Box::new(move |row: &Row| {
83            let replica_id = row.unpack_first();
84            replica_id == Datum::String(&target_replica)
85        });
86        StorageWriteOp::Delete { filter }
87    }
88}
89
90impl Coordinator {
91    /// Installs introspection subscribes on all existing replicas.
92    ///
93    /// Meant to be invoked during coordinator bootstrapping.
94    pub(super) async fn bootstrap_introspection_subscribes(&mut self) {
95        let mut cluster_replicas = Vec::new();
96        for cluster in self.catalog.clusters() {
97            for replica in cluster.replicas() {
98                cluster_replicas.push((cluster.id, replica.replica_id));
99            }
100        }
101
102        for (cluster_id, replica_id) in cluster_replicas {
103            self.install_introspection_subscribes(cluster_id, replica_id)
104                .await;
105        }
106    }
107
108    /// Installs introspection subscribes on the given replica.
109    pub(super) async fn install_introspection_subscribes(
110        &mut self,
111        cluster_id: ClusterId,
112        replica_id: ReplicaId,
113    ) {
114        let dyncfgs = self.catalog().system_config().dyncfgs();
115        if !ENABLE_INTROSPECTION_SUBSCRIBES.get(dyncfgs) {
116            return;
117        }
118
119        for spec in SUBSCRIBES {
120            self.install_introspection_subscribe(cluster_id, replica_id, spec)
121                .await;
122        }
123    }
124
125    async fn install_introspection_subscribe(
126        &mut self,
127        cluster_id: ClusterId,
128        replica_id: ReplicaId,
129        spec: &'static SubscribeSpec,
130    ) {
131        let (_, id) = self.allocate_transient_id();
132        info!(
133            %id,
134            %replica_id,
135            type_ = ?spec.introspection_type,
136            "installing introspection subscribe",
137        );
138
139        // Sequencing is performed asynchronously, and the target replica may be dropped before it
140        // completes. To ensure the subscribe does not leak in this case, we need to already add it
141        // to the coordinator state here, rather than at the end of sequencing.
142        let subscribe = IntrospectionSubscribe {
143            cluster_id,
144            replica_id,
145            spec,
146            deferred_write: None,
147        };
148        self.introspection_subscribes.insert(id, subscribe);
149
150        self.sequence_introspection_subscribe(id, spec, cluster_id, replica_id)
151            .await;
152    }
153
154    async fn sequence_introspection_subscribe(
155        &mut self,
156        subscribe_id: GlobalId,
157        spec: &'static SubscribeSpec,
158        cluster_id: ClusterId,
159        replica_id: ReplicaId,
160    ) {
161        let catalog = self.catalog().for_system_session();
162        let plan = spec.to_plan(&catalog).expect("valid spec");
163
164        let role_metadata = RoleMetadata::new(MZ_SYSTEM_ROLE_ID);
165        let dependencies = plan
166            .from
167            .depends_on()
168            .iter()
169            .map(|id| self.catalog().resolve_item_id(id))
170            .collect();
171        let validity = PlanValidity::new(
172            self.catalog.transient_revision(),
173            dependencies,
174            Some(cluster_id),
175            Some(replica_id),
176            role_metadata,
177        );
178
179        let stage = IntrospectionSubscribeStage::OptimizeMir(IntrospectionSubscribeOptimizeMir {
180            validity,
181            plan,
182            subscribe_id,
183            cluster_id,
184            replica_id,
185        });
186        self.sequence_staged((), Span::current(), stage).await;
187    }
188
189    fn sequence_introspection_subscribe_optimize_mir(
190        &self,
191        stage: IntrospectionSubscribeOptimizeMir,
192    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
193        let IntrospectionSubscribeOptimizeMir {
194            mut validity,
195            plan,
196            subscribe_id,
197            cluster_id,
198            replica_id,
199        } = stage;
200
201        let compute_instance = self.instance_snapshot(cluster_id).expect("must exist");
202        let (_, view_id) = self.allocate_transient_id();
203
204        let vars = self.catalog().system_config();
205        let overrides = self.catalog.get_cluster(cluster_id).config.features();
206        let optimizer_config = optimize::OptimizerConfig::from(vars).override_from(&overrides);
207
208        let mut optimizer = optimize::subscribe::Optimizer::new(
209            self.owned_catalog(),
210            compute_instance,
211            view_id,
212            subscribe_id,
213            None,
214            plan.with_snapshot,
215            None,
216            format!("introspection-subscribe-{subscribe_id}"),
217            optimizer_config,
218            self.optimizer_metrics(),
219        );
220        let catalog = self.owned_catalog();
221
222        let span = Span::current();
223        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
224            || "optimize introspection subscribe (mir)",
225            move || {
226                span.in_scope(|| {
227                    // MIR ⇒ MIR optimization (global)
228                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from)?;
229                    // Add introduced indexes as validity dependencies.
230                    let id_bundle = global_mir_plan.id_bundle(cluster_id);
231                    let item_ids = id_bundle.iter().map(|id| catalog.resolve_item_id(&id));
232                    validity.extend_dependencies(item_ids);
233
234                    let stage = IntrospectionSubscribeStage::TimestampOptimizeLir(
235                        IntrospectionSubscribeTimestampOptimizeLir {
236                            validity,
237                            optimizer,
238                            global_mir_plan,
239                            cluster_id,
240                            replica_id,
241                        },
242                    );
243                    Ok(Box::new(stage))
244                })
245            },
246        )))
247    }
248
249    fn sequence_introspection_subscribe_timestamp_optimize_lir(
250        &self,
251        stage: IntrospectionSubscribeTimestampOptimizeLir,
252    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
253        let IntrospectionSubscribeTimestampOptimizeLir {
254            validity,
255            mut optimizer,
256            global_mir_plan,
257            cluster_id,
258            replica_id,
259        } = stage;
260
261        // Timestamp selection.
262        let id_bundle = global_mir_plan.id_bundle(cluster_id);
263        let read_holds = self.acquire_read_holds(&id_bundle);
264        let as_of = read_holds.least_valid_read();
265
266        let global_mir_plan = global_mir_plan.resolve(as_of);
267
268        let span = Span::current();
269        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
270            || "optimize introspection subscribe (lir)",
271            move || {
272                span.in_scope(|| {
273                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
274                    let global_lir_plan =
275                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
276
277                    let stage = IntrospectionSubscribeStage::Finish(IntrospectionSubscribeFinish {
278                        validity,
279                        global_lir_plan,
280                        read_holds,
281                        cluster_id,
282                        replica_id,
283                    });
284                    Ok(Box::new(stage))
285                })
286            },
287        )))
288    }
289
290    async fn sequence_introspection_subscribe_finish(
291        &mut self,
292        stage: IntrospectionSubscribeFinish,
293    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
294        let IntrospectionSubscribeFinish {
295            validity: _,
296            global_lir_plan,
297            read_holds,
298            cluster_id,
299            replica_id,
300        } = stage;
301
302        let subscribe_id = global_lir_plan.sink_id();
303
304        // The subscribe may already have been dropped, in which case we must not install a
305        // dataflow for it.
306        let response = if self.introspection_subscribes.contains_key(&subscribe_id) {
307            let (df_desc, _df_meta) = global_lir_plan.unapply();
308            self.ship_dataflow(df_desc, cluster_id, Some(replica_id))
309                .await;
310
311            Ok(StageResult::Response(
312                ExecuteResponse::CreatedIntrospectionSubscribe,
313            ))
314        } else {
315            Err(AdapterError::internal(
316                "introspection",
317                "introspection subscribe has already been dropped",
318            ))
319        };
320
321        drop(read_holds);
322        response
323    }
324
325    /// Drops the introspection subscribes installed on the given replica.
326    ///
327    /// Dropping an introspection subscribe entails:
328    ///  * removing it from [`Coordinator::introspection_subscribes`]
329    ///  * dropping its compute collection
330    ///  * retracting any rows previously omitted by it from its corresponding storage-managed
331    ///    collection
332    pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId) {
333        let to_drop: Vec<_> = self
334            .introspection_subscribes
335            .iter()
336            .filter(|(_, s)| s.replica_id == replica_id)
337            .map(|(id, _)| *id)
338            .collect();
339
340        for id in to_drop {
341            self.drop_introspection_subscribe(id);
342        }
343    }
344
345    fn drop_introspection_subscribe(&mut self, id: GlobalId) {
346        let Some(subscribe) = self.introspection_subscribes.remove(&id) else {
347            soft_panic_or_log!("attempt to remove unknown introspection subscribe (id={id})");
348            return;
349        };
350
351        info!(
352            %id,
353            replica_id = %subscribe.replica_id,
354            type_ = ?subscribe.spec.introspection_type,
355            "dropping introspection subscribe",
356        );
357
358        // This can fail if the sequencing hasn't finished yet for the subscribe. In this case,
359        // `sequence_introspection_subscribe_finish` will skip installing the compute collection in
360        // the first place.
361        let _ = self
362            .controller
363            .compute
364            .drop_collections(subscribe.cluster_id, vec![id]);
365
366        self.controller.storage.update_introspection_collection(
367            subscribe.spec.introspection_type,
368            subscribe.delete_write_op(),
369        );
370    }
371
372    async fn reinstall_introspection_subscribe(&mut self, id: GlobalId) {
373        let Some(mut subscribe) = self.introspection_subscribes.remove(&id) else {
374            soft_panic_or_log!("attempt to reinstall unknown introspection subscribe (id={id})");
375            return;
376        };
377
378        // Note that we don't simply call `drop_introspection_subscribe` here because that would
379        // cause an immediate deletion of all data previously reported by the subscribe from its
380        // target storage collection. We'd like to not present empty introspection data while the
381        // replica reconnects, so we want to delay the `StorageWriteOp::Delete` until then.
382
383        let IntrospectionSubscribe {
384            cluster_id,
385            replica_id,
386            spec,
387            ..
388        } = subscribe;
389        let old_id = id;
390        let (_, new_id) = self.allocate_transient_id();
391
392        info!(
393            %old_id, %new_id, %replica_id,
394            type_ = ?subscribe.spec.introspection_type,
395            "reinstalling introspection subscribe",
396        );
397
398        if let Err(error) = self
399            .controller
400            .compute
401            .drop_collections(cluster_id, vec![old_id])
402        {
403            soft_panic_or_log!(
404                "error dropping compute collection for introspection subscribe: {error} \
405                 (id={old_id}, cluster_id={cluster_id})"
406            );
407        }
408
409        // Ensure that the contents of the target storage collection are cleaned when the new
410        // subscribe starts reporting data.
411        subscribe.deferred_write = Some(subscribe.delete_write_op());
412
413        self.introspection_subscribes.insert(new_id, subscribe);
414        self.sequence_introspection_subscribe(new_id, spec, cluster_id, replica_id)
415            .await;
416    }
417
418    /// Processes a batch returned by an introspection subscribe.
419    ///
420    /// Depending on the contents of the batch, this either appends received updates to the
421    /// corresponding storage-managed collection, or reinstalls a disconnected subscribe.
422    pub(super) async fn handle_introspection_subscribe_batch(
423        &mut self,
424        id: GlobalId,
425        batch: SubscribeBatch,
426    ) {
427        let Some(subscribe) = self.introspection_subscribes.get_mut(&id) else {
428            soft_panic_or_log!("updates for unknown introspection subscribe (id={id})");
429            return;
430        };
431
432        let updates = match batch.updates {
433            Ok(updates) if updates.is_empty() => return,
434            Ok(updates) => updates,
435            Err(error) if error == ERROR_TARGET_REPLICA_FAILED => {
436                // The target replica disconnected, reinstall the subscribe.
437                self.reinstall_introspection_subscribe(id).await;
438                return;
439            }
440            Err(error) => {
441                soft_panic_or_log!(
442                    "introspection subscribe produced an error: {error} \
443                     (id={id}, subscribe={subscribe:?})",
444                );
445                return;
446            }
447        };
448
449        // Prepend the `replica_id` to each row.
450        let replica_id = subscribe.replica_id.to_string();
451        let mut new_updates = Vec::with_capacity(updates.len());
452        let mut new_row = Row::default();
453        for (_time, row, diff) in updates {
454            let mut packer = new_row.packer();
455            packer.push(Datum::String(&replica_id));
456            packer.extend_by_row(&row);
457            new_updates.push((new_row.clone(), diff));
458        }
459
460        // If we have a pending deferred write, we need to apply it _before_ the append of the new
461        // rows.
462        if let Some(op) = subscribe.deferred_write.take() {
463            self.controller
464                .storage
465                .update_introspection_collection(subscribe.spec.introspection_type, op);
466        }
467
468        self.controller.storage.update_introspection_collection(
469            subscribe.spec.introspection_type,
470            StorageWriteOp::Append {
471                updates: new_updates,
472            },
473        );
474    }
475}
476
477impl Staged for IntrospectionSubscribeStage {
478    type Ctx = ();
479
480    fn validity(&mut self) -> &mut PlanValidity {
481        match self {
482            Self::OptimizeMir(stage) => &mut stage.validity,
483            Self::TimestampOptimizeLir(stage) => &mut stage.validity,
484            Self::Finish(stage) => &mut stage.validity,
485        }
486    }
487
488    async fn stage(
489        self,
490        coord: &mut Coordinator,
491        _ctx: &mut (),
492    ) -> Result<StageResult<Box<Self>>, AdapterError> {
493        match self {
494            Self::OptimizeMir(stage) => coord.sequence_introspection_subscribe_optimize_mir(stage),
495            Self::TimestampOptimizeLir(stage) => {
496                coord.sequence_introspection_subscribe_timestamp_optimize_lir(stage)
497            }
498            Self::Finish(stage) => coord.sequence_introspection_subscribe_finish(stage).await,
499        }
500    }
501
502    fn message(self, _ctx: (), span: Span) -> super::Message {
503        Message::IntrospectionSubscribeStageReady { span, stage: self }
504    }
505
506    fn cancel_enabled(&self) -> bool {
507        false
508    }
509}
510
511/// The specification for an introspection subscribe.
512#[derive(Debug)]
513pub(super) struct SubscribeSpec {
514    /// An [`IntrospectionType`] identifying the storage-managed collection to which updates
515    /// received from subscribes instantiated from this spec are written.
516    introspection_type: IntrospectionType,
517    /// The SQL definition of the subscribe.
518    sql: &'static str,
519}
520
521impl SubscribeSpec {
522    fn to_plan(&self, catalog: &dyn SessionCatalog) -> Result<SubscribePlan, anyhow::Error> {
523        let parsed = mz_sql::parse::parse(self.sql)?.into_element();
524        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, parsed.ast)?;
525        let plan = mz_sql::plan::plan(None, catalog, stmt, &Params::empty(), &resolved_ids)?;
526        match plan {
527            Plan::Subscribe(plan) => Ok(plan),
528            _ => bail!("unexpected plan type: {plan:?}"),
529        }
530    }
531}
532
533const SUBSCRIBES: &[SubscribeSpec] = &[
534    SubscribeSpec {
535        introspection_type: IntrospectionType::ComputeErrorCounts,
536        sql: "SUBSCRIBE (
537            SELECT export_id, sum(count)
538            FROM mz_introspection.mz_compute_error_counts_raw
539            GROUP BY export_id
540        )",
541    },
542    SubscribeSpec {
543        introspection_type: IntrospectionType::ComputeHydrationTimes,
544        sql: "SUBSCRIBE (
545            SELECT
546                export_id,
547                CASE count(*) = count(time_ns)
548                    WHEN true THEN max(time_ns)
549                    ELSE NULL
550                END AS time_ns
551            FROM mz_introspection.mz_compute_hydration_times_per_worker
552            WHERE export_id NOT LIKE 't%'
553            GROUP BY export_id
554            OPTIONS (AGGREGATE INPUT GROUP SIZE = 1)
555        )",
556    },
557];