mz_adapter/coord/
introspection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for unified compute introspection.
11//!
12//! Unified compute introspection is the process of collecting introspection data exported by
13//! individual replicas through their logging indexes and then writing that data, tagged with the
14//! respective replica ID, to "unified" storage collections. These storage collections then allow
15//! querying introspection data across all replicas and regardless of the health of individual
16//! replicas.
17//!
18//! # Lifecycle of Introspection Subscribes
19//!
20//! * After a new replica was created, the coordinator calls `install_introspection_subscribes` to
21//!   install all defined introspection subscribes on the new replica.
22//! * The coordinator calls `handle_introspection_subscribe_batch` for each response it receives
23//!   from an introspection subscribe, to write received updates to their corresponding
24//!   storage-managed collection.
25//! * Before a replica is dropped, the coordinator calls `drop_introspection_subscribes` to drop
26//!   all introspection subscribes previously installed on the replica.
27//! * When a replica disconnects without being dropped (e.g. because of a crash or network
28//!   failure), `handle_introspection_subscribe_batch` reacts on the corresponding error responses
29//!   by reinstalling the failed introspection subscribes.
30
31use anyhow::bail;
32use derivative::Derivative;
33use mz_adapter_types::dyncfgs::ENABLE_INTROSPECTION_SUBSCRIBES;
34use mz_cluster_client::ReplicaId;
35use mz_compute_client::controller::error::ERROR_TARGET_REPLICA_FAILED;
36use mz_compute_client::protocol::response::SubscribeBatch;
37use mz_controller_types::ClusterId;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_panic_or_log;
40use mz_repr::optimize::OverrideFrom;
41use mz_repr::{Datum, GlobalId, Row};
42use mz_sql::catalog::SessionCatalog;
43use mz_sql::plan::{Params, Plan, SubscribePlan};
44use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, RoleMetadata};
45use mz_storage_client::controller::{IntrospectionType, StorageWriteOp};
46use tracing::{Span, info};
47
48use crate::coord::{
49    Coordinator, IntrospectionSubscribeFinish, IntrospectionSubscribeOptimizeMir,
50    IntrospectionSubscribeStage, IntrospectionSubscribeTimestampOptimizeLir, Message, PlanValidity,
51    StageResult, Staged,
52};
53use crate::optimize::Optimize;
54use crate::{AdapterError, ExecuteResponse, optimize};
55
56// State tracked about an active introspection subscribe.
57#[derive(Derivative)]
58#[derivative(Debug)]
59pub(super) struct IntrospectionSubscribe {
60    /// The ID of the targeted cluster.
61    cluster_id: ClusterId,
62    /// The ID of the targeted replica.
63    replica_id: ReplicaId,
64    /// The spec from which this subscribe was created.
65    spec: &'static SubscribeSpec,
66    /// A storage write to be applied the next time the introspection subscribe produces any
67    /// output.
68    ///
69    /// This mechanism exists to delay the deletion of previous subscribe results from the target
70    /// storage collection when an introspection subscribe is reinstalled. After reinstallation it
71    /// can take a while for the new subscribe dataflow to produce its snapshot and keeping the old
72    /// introspection data around in the meantime makes for a better UX than removing it.
73    #[derivative(Debug = "ignore")]
74    deferred_write: Option<StorageWriteOp>,
75}
76
77impl IntrospectionSubscribe {
78    /// Returns a `StorageWriteOp` that instructs the deletion of all data previously written by
79    /// this subscribe.
80    fn delete_write_op(&self) -> StorageWriteOp {
81        let target_replica = self.replica_id.to_string();
82        let filter = Box::new(move |row: &Row| {
83            let replica_id = row.unpack_first();
84            replica_id == Datum::String(&target_replica)
85        });
86        StorageWriteOp::Delete { filter }
87    }
88}
89
90impl Coordinator {
91    /// Installs introspection subscribes on all existing replicas.
92    ///
93    /// Meant to be invoked during coordinator bootstrapping.
94    pub(super) async fn bootstrap_introspection_subscribes(&mut self) {
95        let mut cluster_replicas = Vec::new();
96        for cluster in self.catalog.clusters() {
97            for replica in cluster.replicas() {
98                cluster_replicas.push((cluster.id, replica.replica_id));
99            }
100        }
101
102        for (cluster_id, replica_id) in cluster_replicas {
103            self.install_introspection_subscribes(cluster_id, replica_id)
104                .await;
105        }
106    }
107
108    /// Installs introspection subscribes on the given replica.
109    pub(super) async fn install_introspection_subscribes(
110        &mut self,
111        cluster_id: ClusterId,
112        replica_id: ReplicaId,
113    ) {
114        let dyncfgs = self.catalog().system_config().dyncfgs();
115        if !ENABLE_INTROSPECTION_SUBSCRIBES.get(dyncfgs) {
116            return;
117        }
118
119        for spec in SUBSCRIBES {
120            self.install_introspection_subscribe(cluster_id, replica_id, spec)
121                .await;
122        }
123    }
124
125    async fn install_introspection_subscribe(
126        &mut self,
127        cluster_id: ClusterId,
128        replica_id: ReplicaId,
129        spec: &'static SubscribeSpec,
130    ) {
131        let (_, id) = self.allocate_transient_id();
132        info!(
133            %id,
134            %replica_id,
135            type_ = ?spec.introspection_type,
136            "installing introspection subscribe",
137        );
138
139        // Sequencing is performed asynchronously, and the target replica may be dropped before it
140        // completes. To ensure the subscribe does not leak in this case, we need to already add it
141        // to the coordinator state here, rather than at the end of sequencing.
142        let subscribe = IntrospectionSubscribe {
143            cluster_id,
144            replica_id,
145            spec,
146            deferred_write: None,
147        };
148        self.introspection_subscribes.insert(id, subscribe);
149
150        self.sequence_introspection_subscribe(id, spec, cluster_id, replica_id)
151            .await;
152    }
153
154    async fn sequence_introspection_subscribe(
155        &mut self,
156        subscribe_id: GlobalId,
157        spec: &'static SubscribeSpec,
158        cluster_id: ClusterId,
159        replica_id: ReplicaId,
160    ) {
161        let catalog = self.catalog().for_system_session();
162        let plan = spec.to_plan(&catalog).expect("valid spec");
163
164        let role_metadata = RoleMetadata::new(MZ_SYSTEM_ROLE_ID);
165        let dependencies = plan
166            .from
167            .depends_on()
168            .iter()
169            .map(|id| self.catalog().resolve_item_id(id))
170            .collect();
171        let validity = PlanValidity::new(
172            self.catalog.transient_revision(),
173            dependencies,
174            Some(cluster_id),
175            Some(replica_id),
176            role_metadata,
177        );
178
179        let stage = IntrospectionSubscribeStage::OptimizeMir(IntrospectionSubscribeOptimizeMir {
180            validity,
181            plan,
182            subscribe_id,
183            cluster_id,
184            replica_id,
185        });
186        self.sequence_staged((), Span::current(), stage).await;
187    }
188
189    fn sequence_introspection_subscribe_optimize_mir(
190        &self,
191        stage: IntrospectionSubscribeOptimizeMir,
192    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
193        let IntrospectionSubscribeOptimizeMir {
194            mut validity,
195            plan,
196            subscribe_id,
197            cluster_id,
198            replica_id,
199        } = stage;
200
201        let compute_instance = self.instance_snapshot(cluster_id).expect("must exist");
202        let (_, view_id) = self.allocate_transient_id();
203
204        let vars = self.catalog().system_config();
205        let overrides = self.catalog.get_cluster(cluster_id).config.features();
206        let optimizer_config = optimize::OptimizerConfig::from(vars).override_from(&overrides);
207
208        let mut optimizer = optimize::subscribe::Optimizer::new(
209            self.owned_catalog(),
210            compute_instance,
211            view_id,
212            subscribe_id,
213            plan.with_snapshot,
214            None,
215            format!("introspection-subscribe-{subscribe_id}"),
216            optimizer_config,
217            self.optimizer_metrics(),
218        );
219        let catalog = self.owned_catalog();
220
221        let span = Span::current();
222        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
223            || "optimize introspection subscribe (mir)",
224            move || {
225                span.in_scope(|| {
226                    // MIR ⇒ MIR optimization (global)
227                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from)?;
228                    // Add introduced indexes as validity dependencies.
229                    let id_bundle = global_mir_plan.id_bundle(cluster_id);
230                    let item_ids = id_bundle.iter().map(|id| catalog.resolve_item_id(&id));
231                    validity.extend_dependencies(item_ids);
232
233                    let stage = IntrospectionSubscribeStage::TimestampOptimizeLir(
234                        IntrospectionSubscribeTimestampOptimizeLir {
235                            validity,
236                            optimizer,
237                            global_mir_plan,
238                            cluster_id,
239                            replica_id,
240                        },
241                    );
242                    Ok(Box::new(stage))
243                })
244            },
245        )))
246    }
247
248    fn sequence_introspection_subscribe_timestamp_optimize_lir(
249        &self,
250        stage: IntrospectionSubscribeTimestampOptimizeLir,
251    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
252        let IntrospectionSubscribeTimestampOptimizeLir {
253            validity,
254            mut optimizer,
255            global_mir_plan,
256            cluster_id,
257            replica_id,
258        } = stage;
259
260        // Timestamp selection.
261        let id_bundle = global_mir_plan.id_bundle(cluster_id);
262        let read_holds = self.acquire_read_holds(&id_bundle);
263        let as_of = read_holds.least_valid_read();
264
265        let global_mir_plan = global_mir_plan.resolve(as_of);
266
267        let span = Span::current();
268        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
269            || "optimize introspection subscribe (lir)",
270            move || {
271                span.in_scope(|| {
272                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
273                    let global_lir_plan =
274                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
275
276                    let stage = IntrospectionSubscribeStage::Finish(IntrospectionSubscribeFinish {
277                        validity,
278                        global_lir_plan,
279                        read_holds,
280                        cluster_id,
281                        replica_id,
282                    });
283                    Ok(Box::new(stage))
284                })
285            },
286        )))
287    }
288
289    async fn sequence_introspection_subscribe_finish(
290        &mut self,
291        stage: IntrospectionSubscribeFinish,
292    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
293        let IntrospectionSubscribeFinish {
294            validity: _,
295            global_lir_plan,
296            read_holds,
297            cluster_id,
298            replica_id,
299        } = stage;
300
301        let subscribe_id = global_lir_plan.sink_id();
302
303        // The subscribe may already have been dropped, in which case we must not install a
304        // dataflow for it.
305        let response = if self.introspection_subscribes.contains_key(&subscribe_id) {
306            let (df_desc, _df_meta) = global_lir_plan.unapply();
307            self.ship_dataflow(df_desc, cluster_id, Some(replica_id))
308                .await;
309
310            Ok(StageResult::Response(
311                ExecuteResponse::CreatedIntrospectionSubscribe,
312            ))
313        } else {
314            Err(AdapterError::internal(
315                "introspection",
316                "introspection subscribe has already been dropped",
317            ))
318        };
319
320        drop(read_holds);
321        response
322    }
323
324    /// Drops the introspection subscribes installed on the given replica.
325    ///
326    /// Dropping an introspection subscribe entails:
327    ///  * removing it from [`Coordinator::introspection_subscribes`]
328    ///  * dropping its compute collection
329    ///  * retracting any rows previously omitted by it from its corresponding storage-managed
330    ///    collection
331    pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId) {
332        let to_drop: Vec<_> = self
333            .introspection_subscribes
334            .iter()
335            .filter(|(_, s)| s.replica_id == replica_id)
336            .map(|(id, _)| *id)
337            .collect();
338
339        for id in to_drop {
340            self.drop_introspection_subscribe(id);
341        }
342    }
343
344    fn drop_introspection_subscribe(&mut self, id: GlobalId) {
345        let Some(subscribe) = self.introspection_subscribes.remove(&id) else {
346            soft_panic_or_log!("attempt to remove unknown introspection subscribe (id={id})");
347            return;
348        };
349
350        info!(
351            %id,
352            replica_id = %subscribe.replica_id,
353            type_ = ?subscribe.spec.introspection_type,
354            "dropping introspection subscribe",
355        );
356
357        // This can fail if the sequencing hasn't finished yet for the subscribe. In this case,
358        // `sequence_introspection_subscribe_finish` will skip installing the compute collection in
359        // the first place.
360        let _ = self
361            .controller
362            .compute
363            .drop_collections(subscribe.cluster_id, vec![id]);
364
365        self.controller.storage.update_introspection_collection(
366            subscribe.spec.introspection_type,
367            subscribe.delete_write_op(),
368        );
369    }
370
371    async fn reinstall_introspection_subscribe(&mut self, id: GlobalId) {
372        let Some(mut subscribe) = self.introspection_subscribes.remove(&id) else {
373            soft_panic_or_log!("attempt to reinstall unknown introspection subscribe (id={id})");
374            return;
375        };
376
377        // Note that we don't simply call `drop_introspection_subscribe` here because that would
378        // cause an immediate deletion of all data previously reported by the subscribe from its
379        // target storage collection. We'd like to not present empty introspection data while the
380        // replica reconnects, so we want to delay the `StorageWriteOp::Delete` until then.
381
382        let IntrospectionSubscribe {
383            cluster_id,
384            replica_id,
385            spec,
386            ..
387        } = subscribe;
388        let old_id = id;
389        let (_, new_id) = self.allocate_transient_id();
390
391        info!(
392            %old_id, %new_id, %replica_id,
393            type_ = ?subscribe.spec.introspection_type,
394            "reinstalling introspection subscribe",
395        );
396
397        if let Err(error) = self
398            .controller
399            .compute
400            .drop_collections(cluster_id, vec![old_id])
401        {
402            soft_panic_or_log!(
403                "error dropping compute collection for introspection subscribe: {error} \
404                 (id={old_id}, cluster_id={cluster_id})"
405            );
406        }
407
408        // Ensure that the contents of the target storage collection are cleaned when the new
409        // subscribe starts reporting data.
410        subscribe.deferred_write = Some(subscribe.delete_write_op());
411
412        self.introspection_subscribes.insert(new_id, subscribe);
413        self.sequence_introspection_subscribe(new_id, spec, cluster_id, replica_id)
414            .await;
415    }
416
417    /// Processes a batch returned by an introspection subscribe.
418    ///
419    /// Depending on the contents of the batch, this either appends received updates to the
420    /// corresponding storage-managed collection, or reinstalls a disconnected subscribe.
421    pub(super) async fn handle_introspection_subscribe_batch(
422        &mut self,
423        id: GlobalId,
424        batch: SubscribeBatch,
425    ) {
426        let Some(subscribe) = self.introspection_subscribes.get_mut(&id) else {
427            soft_panic_or_log!("updates for unknown introspection subscribe (id={id})");
428            return;
429        };
430
431        let updates = match batch.updates {
432            Ok(updates) if updates.is_empty() => return,
433            Ok(updates) => updates,
434            Err(error) if error == ERROR_TARGET_REPLICA_FAILED => {
435                // The target replica disconnected, reinstall the subscribe.
436                self.reinstall_introspection_subscribe(id).await;
437                return;
438            }
439            Err(error) => {
440                soft_panic_or_log!(
441                    "introspection subscribe produced an error: {error} \
442                     (id={id}, subscribe={subscribe:?})",
443                );
444                return;
445            }
446        };
447
448        // Prepend the `replica_id` to each row.
449        let replica_id = subscribe.replica_id.to_string();
450        let mut new_updates = Vec::with_capacity(updates.len());
451        let mut new_row = Row::default();
452        for (_time, row, diff) in updates {
453            let mut packer = new_row.packer();
454            packer.push(Datum::String(&replica_id));
455            packer.extend_by_row(&row);
456            new_updates.push((new_row.clone(), diff));
457        }
458
459        // If we have a pending deferred write, we need to apply it _before_ the append of the new
460        // rows.
461        if let Some(op) = subscribe.deferred_write.take() {
462            self.controller
463                .storage
464                .update_introspection_collection(subscribe.spec.introspection_type, op);
465        }
466
467        self.controller.storage.update_introspection_collection(
468            subscribe.spec.introspection_type,
469            StorageWriteOp::Append {
470                updates: new_updates,
471            },
472        );
473    }
474}
475
476impl Staged for IntrospectionSubscribeStage {
477    type Ctx = ();
478
479    fn validity(&mut self) -> &mut PlanValidity {
480        match self {
481            Self::OptimizeMir(stage) => &mut stage.validity,
482            Self::TimestampOptimizeLir(stage) => &mut stage.validity,
483            Self::Finish(stage) => &mut stage.validity,
484        }
485    }
486
487    async fn stage(
488        self,
489        coord: &mut Coordinator,
490        _ctx: &mut (),
491    ) -> Result<StageResult<Box<Self>>, AdapterError> {
492        match self {
493            Self::OptimizeMir(stage) => coord.sequence_introspection_subscribe_optimize_mir(stage),
494            Self::TimestampOptimizeLir(stage) => {
495                coord.sequence_introspection_subscribe_timestamp_optimize_lir(stage)
496            }
497            Self::Finish(stage) => coord.sequence_introspection_subscribe_finish(stage).await,
498        }
499    }
500
501    fn message(self, _ctx: (), span: Span) -> super::Message {
502        Message::IntrospectionSubscribeStageReady { span, stage: self }
503    }
504
505    fn cancel_enabled(&self) -> bool {
506        false
507    }
508}
509
510/// The specification for an introspection subscribe.
511#[derive(Debug)]
512pub(super) struct SubscribeSpec {
513    /// An [`IntrospectionType`] identifying the storage-managed collection to which updates
514    /// received from subscribes instantiated from this spec are written.
515    introspection_type: IntrospectionType,
516    /// The SQL definition of the subscribe.
517    sql: &'static str,
518}
519
520impl SubscribeSpec {
521    fn to_plan(&self, catalog: &dyn SessionCatalog) -> Result<SubscribePlan, anyhow::Error> {
522        let parsed = mz_sql::parse::parse(self.sql)?.into_element();
523        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, parsed.ast)?;
524        let plan = mz_sql::plan::plan(None, catalog, stmt, &Params::empty(), &resolved_ids)?;
525        match plan {
526            Plan::Subscribe(plan) => Ok(plan),
527            _ => bail!("unexpected plan type: {plan:?}"),
528        }
529    }
530}
531
532const SUBSCRIBES: &[SubscribeSpec] = &[
533    SubscribeSpec {
534        introspection_type: IntrospectionType::ComputeErrorCounts,
535        sql: "SUBSCRIBE (
536            SELECT export_id, sum(count)
537            FROM mz_introspection.mz_compute_error_counts_raw
538            GROUP BY export_id
539        )",
540    },
541    SubscribeSpec {
542        introspection_type: IntrospectionType::ComputeHydrationTimes,
543        sql: "SUBSCRIBE (
544            SELECT
545                export_id,
546                CASE count(*) = count(time_ns)
547                    WHEN true THEN max(time_ns)
548                    ELSE NULL
549                END AS time_ns
550            FROM mz_introspection.mz_compute_hydration_times_per_worker
551            WHERE export_id NOT LIKE 't%'
552            GROUP BY export_id
553            OPTIONS (AGGREGATE INPUT GROUP SIZE = 1)
554        )",
555    },
556    SubscribeSpec {
557        introspection_type: IntrospectionType::ComputeOperatorHydrationStatus,
558        sql: "SUBSCRIBE (
559            SELECT
560                export_id,
561                lir_id,
562                bool_and(hydrated) AS hydrated
563            FROM mz_introspection.mz_compute_operator_hydration_statuses_per_worker
564            GROUP BY export_id, lir_id
565        )",
566    },
567];