Skip to main content

mz_adapter/coord/
introspection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for unified compute introspection.
11//!
12//! Unified compute introspection is the process of collecting introspection data exported by
13//! individual replicas through their logging indexes and then writing that data, tagged with the
14//! respective replica ID, to "unified" storage collections. These storage collections then allow
15//! querying introspection data across all replicas and regardless of the health of individual
16//! replicas.
17//!
18//! # Lifecycle of Introspection Subscribes
19//!
20//! * After a new replica was created, the coordinator calls `install_introspection_subscribes` to
21//!   install all defined introspection subscribes on the new replica.
22//! * The coordinator calls `handle_introspection_subscribe_batch` for each response it receives
23//!   from an introspection subscribe, to write received updates to their corresponding
24//!   storage-managed collection.
25//! * Before a replica is dropped, the coordinator calls `drop_introspection_subscribes` to drop
26//!   all introspection subscribes previously installed on the replica.
27//! * When a replica disconnects without being dropped (e.g. because of a crash or network
28//!   failure), `handle_introspection_subscribe_batch` reacts on the corresponding error responses
29//!   by reinstalling the failed introspection subscribes.
30
31use anyhow::bail;
32use derivative::Derivative;
33use mz_adapter_types::dyncfgs::ENABLE_INTROSPECTION_SUBSCRIBES;
34use mz_cluster_client::ReplicaId;
35use mz_compute_client::controller::error::ERROR_TARGET_REPLICA_FAILED;
36use mz_compute_client::protocol::response::SubscribeBatch;
37use mz_controller_types::ClusterId;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_panic_or_log;
40use mz_repr::optimize::OverrideFrom;
41use mz_repr::{Datum, GlobalId, Row};
42use mz_sql::catalog::SessionCatalog;
43use mz_sql::plan::{Params, Plan, SubscribePlan};
44use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, RoleMetadata};
45use mz_storage_client::controller::{IntrospectionType, StorageWriteOp};
46use tracing::{Span, info};
47
48use crate::coord::{
49    Coordinator, IntrospectionSubscribeFinish, IntrospectionSubscribeOptimizeMir,
50    IntrospectionSubscribeStage, IntrospectionSubscribeTimestampOptimizeLir, Message, PlanValidity,
51    StageResult, Staged,
52};
53use crate::optimize::Optimize;
54use crate::{AdapterError, ExecuteResponse, optimize};
55
56// State tracked about an active introspection subscribe.
57#[derive(Derivative)]
58#[derivative(Debug)]
59pub(super) struct IntrospectionSubscribe {
60    /// The ID of the targeted cluster.
61    cluster_id: ClusterId,
62    /// The ID of the targeted replica.
63    replica_id: ReplicaId,
64    /// The spec from which this subscribe was created.
65    spec: &'static SubscribeSpec,
66    /// A storage write to be applied the next time the introspection subscribe produces any
67    /// output.
68    ///
69    /// This mechanism exists to delay the deletion of previous subscribe results from the target
70    /// storage collection when an introspection subscribe is reinstalled. After reinstallation it
71    /// can take a while for the new subscribe dataflow to produce its snapshot and keeping the old
72    /// introspection data around in the meantime makes for a better UX than removing it.
73    #[derivative(Debug = "ignore")]
74    deferred_write: Option<StorageWriteOp>,
75}
76
77impl IntrospectionSubscribe {
78    /// Returns a `StorageWriteOp` that instructs the deletion of all data previously written by
79    /// this subscribe.
80    fn delete_write_op(&self) -> StorageWriteOp {
81        let target_replica = self.replica_id.to_string();
82        let filter = Box::new(move |row: &Row| {
83            let replica_id = row.unpack_first();
84            replica_id == Datum::String(&target_replica)
85        });
86        StorageWriteOp::Delete { filter }
87    }
88}
89
90impl Coordinator {
91    /// Installs introspection subscribes on all existing replicas.
92    ///
93    /// Meant to be invoked during coordinator bootstrapping.
94    pub(super) async fn bootstrap_introspection_subscribes(&mut self) {
95        let mut cluster_replicas = Vec::new();
96        for cluster in self.catalog.clusters() {
97            for replica in cluster.replicas() {
98                cluster_replicas.push((cluster.id, replica.replica_id));
99            }
100        }
101
102        for (cluster_id, replica_id) in cluster_replicas {
103            self.install_introspection_subscribes(cluster_id, replica_id)
104                .await;
105        }
106    }
107
108    /// Installs introspection subscribes on the given replica.
109    pub(super) async fn install_introspection_subscribes(
110        &mut self,
111        cluster_id: ClusterId,
112        replica_id: ReplicaId,
113    ) {
114        let dyncfgs = self.catalog().system_config().dyncfgs();
115        if !ENABLE_INTROSPECTION_SUBSCRIBES.get(dyncfgs) {
116            return;
117        }
118
119        for spec in SUBSCRIBES {
120            self.install_introspection_subscribe(cluster_id, replica_id, spec)
121                .await;
122        }
123    }
124
125    async fn install_introspection_subscribe(
126        &mut self,
127        cluster_id: ClusterId,
128        replica_id: ReplicaId,
129        spec: &'static SubscribeSpec,
130    ) {
131        let (_, id) = self.allocate_transient_id();
132        info!(
133            %id,
134            %replica_id,
135            type_ = ?spec.introspection_type,
136            "installing introspection subscribe",
137        );
138
139        // Sequencing is performed asynchronously, and the target replica may be dropped before it
140        // completes. To ensure the subscribe does not leak in this case, we need to already add it
141        // to the coordinator state here, rather than at the end of sequencing.
142        let subscribe = IntrospectionSubscribe {
143            cluster_id,
144            replica_id,
145            spec,
146            deferred_write: None,
147        };
148        self.introspection_subscribes.insert(id, subscribe);
149
150        self.sequence_introspection_subscribe(id, spec, cluster_id, replica_id)
151            .await;
152    }
153
154    async fn sequence_introspection_subscribe(
155        &mut self,
156        subscribe_id: GlobalId,
157        spec: &'static SubscribeSpec,
158        cluster_id: ClusterId,
159        replica_id: ReplicaId,
160    ) {
161        let catalog = self.catalog().for_system_session();
162        let plan = spec.to_plan(&catalog).expect("valid spec");
163
164        let role_metadata = RoleMetadata::new(MZ_SYSTEM_ROLE_ID);
165        let dependencies = plan
166            .from
167            .depends_on()
168            .iter()
169            .map(|id| self.catalog().resolve_item_id(id))
170            .collect();
171        let validity = PlanValidity::new(
172            self.catalog.transient_revision(),
173            dependencies,
174            Some(cluster_id),
175            Some(replica_id),
176            role_metadata,
177        );
178
179        let stage = IntrospectionSubscribeStage::OptimizeMir(IntrospectionSubscribeOptimizeMir {
180            validity,
181            plan,
182            subscribe_id,
183            cluster_id,
184            replica_id,
185        });
186        self.sequence_staged((), Span::current(), stage).await;
187    }
188
189    fn sequence_introspection_subscribe_optimize_mir(
190        &self,
191        stage: IntrospectionSubscribeOptimizeMir,
192    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
193        let IntrospectionSubscribeOptimizeMir {
194            mut validity,
195            plan,
196            subscribe_id,
197            cluster_id,
198            replica_id,
199        } = stage;
200
201        let compute_instance = self.instance_snapshot(cluster_id).expect("must exist");
202        let (_, view_id) = self.allocate_transient_id();
203
204        let vars = self.catalog().system_config();
205        let overrides = self.catalog.get_cluster(cluster_id).config.features();
206        let optimizer_config = optimize::OptimizerConfig::from(vars)
207            .override_from(&overrides)
208            .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id));
209
210        let mut optimizer = optimize::subscribe::Optimizer::new(
211            self.owned_catalog(),
212            compute_instance,
213            view_id,
214            subscribe_id,
215            plan.with_snapshot,
216            None,
217            format!("introspection-subscribe-{subscribe_id}"),
218            optimizer_config,
219            self.optimizer_metrics(),
220        );
221        let catalog = self.owned_catalog();
222
223        let span = Span::current();
224        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
225            || "optimize introspection subscribe (mir)",
226            move || {
227                span.in_scope(|| {
228                    // MIR ⇒ MIR optimization (global)
229                    let global_mir_plan = optimizer.catch_unwind_optimize(plan)?;
230                    // Add introduced indexes as validity dependencies.
231                    let id_bundle = global_mir_plan.id_bundle(cluster_id);
232                    let item_ids = id_bundle.iter().map(|id| catalog.resolve_item_id(&id));
233                    validity.extend_dependencies(item_ids);
234
235                    let stage = IntrospectionSubscribeStage::TimestampOptimizeLir(
236                        IntrospectionSubscribeTimestampOptimizeLir {
237                            validity,
238                            optimizer,
239                            global_mir_plan,
240                            cluster_id,
241                            replica_id,
242                        },
243                    );
244                    Ok(Box::new(stage))
245                })
246            },
247        )))
248    }
249
250    fn sequence_introspection_subscribe_timestamp_optimize_lir(
251        &self,
252        stage: IntrospectionSubscribeTimestampOptimizeLir,
253    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
254        let IntrospectionSubscribeTimestampOptimizeLir {
255            validity,
256            mut optimizer,
257            global_mir_plan,
258            cluster_id,
259            replica_id,
260        } = stage;
261
262        // Timestamp selection.
263        let id_bundle = global_mir_plan.id_bundle(cluster_id);
264        let read_holds = self.acquire_read_holds(&id_bundle);
265        let as_of = read_holds.least_valid_read();
266
267        let global_mir_plan = global_mir_plan.resolve(as_of);
268
269        let span = Span::current();
270        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
271            || "optimize introspection subscribe (lir)",
272            move || {
273                span.in_scope(|| {
274                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
275                    let global_lir_plan =
276                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
277
278                    let stage = IntrospectionSubscribeStage::Finish(IntrospectionSubscribeFinish {
279                        validity,
280                        global_lir_plan,
281                        read_holds,
282                        cluster_id,
283                        replica_id,
284                    });
285                    Ok(Box::new(stage))
286                })
287            },
288        )))
289    }
290
291    async fn sequence_introspection_subscribe_finish(
292        &mut self,
293        stage: IntrospectionSubscribeFinish,
294    ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
295        let IntrospectionSubscribeFinish {
296            validity: _,
297            global_lir_plan,
298            read_holds,
299            cluster_id,
300            replica_id,
301        } = stage;
302
303        let subscribe_id = global_lir_plan.sink_id();
304
305        // The subscribe may already have been dropped, in which case we must not install a
306        // dataflow for it.
307        let response = if self.introspection_subscribes.contains_key(&subscribe_id) {
308            let (df_desc, _df_meta) = global_lir_plan.unapply();
309            self.ship_dataflow(df_desc, cluster_id, Some(replica_id))
310                .await;
311
312            Ok(StageResult::Response(
313                ExecuteResponse::CreatedIntrospectionSubscribe,
314            ))
315        } else {
316            Err(AdapterError::internal(
317                "introspection",
318                "introspection subscribe has already been dropped",
319            ))
320        };
321
322        drop(read_holds);
323        response
324    }
325
326    /// Drops the introspection subscribes installed on the given replica.
327    ///
328    /// Dropping an introspection subscribe entails:
329    ///  * removing it from [`Coordinator::introspection_subscribes`]
330    ///  * dropping its compute collection
331    ///  * retracting any rows previously omitted by it from its corresponding storage-managed
332    ///    collection
333    pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId) {
334        let to_drop: Vec<_> = self
335            .introspection_subscribes
336            .iter()
337            .filter(|(_, s)| s.replica_id == replica_id)
338            .map(|(id, _)| *id)
339            .collect();
340
341        for id in to_drop {
342            self.drop_introspection_subscribe(id);
343        }
344    }
345
346    fn drop_introspection_subscribe(&mut self, id: GlobalId) {
347        let Some(subscribe) = self.introspection_subscribes.remove(&id) else {
348            soft_panic_or_log!("attempt to remove unknown introspection subscribe (id={id})");
349            return;
350        };
351
352        info!(
353            %id,
354            replica_id = %subscribe.replica_id,
355            type_ = ?subscribe.spec.introspection_type,
356            "dropping introspection subscribe",
357        );
358
359        // This can fail if the sequencing hasn't finished yet for the subscribe. In this case,
360        // `sequence_introspection_subscribe_finish` will skip installing the compute collection in
361        // the first place.
362        let _ = self
363            .controller
364            .compute
365            .drop_collections(subscribe.cluster_id, vec![id]);
366
367        self.controller.storage.update_introspection_collection(
368            subscribe.spec.introspection_type,
369            subscribe.delete_write_op(),
370        );
371    }
372
373    async fn reinstall_introspection_subscribe(&mut self, id: GlobalId) {
374        let Some(mut subscribe) = self.introspection_subscribes.remove(&id) else {
375            soft_panic_or_log!("attempt to reinstall unknown introspection subscribe (id={id})");
376            return;
377        };
378
379        // Note that we don't simply call `drop_introspection_subscribe` here because that would
380        // cause an immediate deletion of all data previously reported by the subscribe from its
381        // target storage collection. We'd like to not present empty introspection data while the
382        // replica reconnects, so we want to delay the `StorageWriteOp::Delete` until then.
383
384        let IntrospectionSubscribe {
385            cluster_id,
386            replica_id,
387            spec,
388            ..
389        } = subscribe;
390        let old_id = id;
391        let (_, new_id) = self.allocate_transient_id();
392
393        info!(
394            %old_id, %new_id, %replica_id,
395            type_ = ?subscribe.spec.introspection_type,
396            "reinstalling introspection subscribe",
397        );
398
399        if let Err(error) = self
400            .controller
401            .compute
402            .drop_collections(cluster_id, vec![old_id])
403        {
404            soft_panic_or_log!(
405                "error dropping compute collection for introspection subscribe: {error} \
406                 (id={old_id}, cluster_id={cluster_id})"
407            );
408        }
409
410        // Ensure that the contents of the target storage collection are cleaned when the new
411        // subscribe starts reporting data.
412        subscribe.deferred_write = Some(subscribe.delete_write_op());
413
414        self.introspection_subscribes.insert(new_id, subscribe);
415        self.sequence_introspection_subscribe(new_id, spec, cluster_id, replica_id)
416            .await;
417    }
418
419    /// Processes a batch returned by an introspection subscribe.
420    ///
421    /// Depending on the contents of the batch, this either appends received updates to the
422    /// corresponding storage-managed collection, or reinstalls a disconnected subscribe.
423    pub(super) async fn handle_introspection_subscribe_batch(
424        &mut self,
425        id: GlobalId,
426        batch: SubscribeBatch,
427    ) {
428        let Some(subscribe) = self.introspection_subscribes.get_mut(&id) else {
429            soft_panic_or_log!("updates for unknown introspection subscribe (id={id})");
430            return;
431        };
432
433        let updates = match batch.updates {
434            Ok(updates) if updates.is_empty() => return,
435            Ok(updates) => updates,
436            Err(error) if error == ERROR_TARGET_REPLICA_FAILED => {
437                // The target replica disconnected, reinstall the subscribe.
438                self.reinstall_introspection_subscribe(id).await;
439                return;
440            }
441            Err(error) => {
442                soft_panic_or_log!(
443                    "introspection subscribe produced an error: {error} \
444                     (id={id}, subscribe={subscribe:?})",
445                );
446                return;
447            }
448        };
449
450        // Prepend the `replica_id` to each row.
451        let replica_id = subscribe.replica_id.to_string();
452        let mut new_updates = Vec::with_capacity(updates.len());
453        let mut new_row = Row::default();
454        for collection in updates {
455            for (row, _time, diff) in collection.iter() {
456                let mut packer = new_row.packer();
457                packer.push(Datum::String(&replica_id));
458                packer.extend_by_row_ref(row);
459                new_updates.push((new_row.clone(), diff));
460            }
461        }
462
463        // If we have a pending deferred write, we need to apply it _before_ the append of the new
464        // rows.
465        if let Some(op) = subscribe.deferred_write.take() {
466            self.controller
467                .storage
468                .update_introspection_collection(subscribe.spec.introspection_type, op);
469        }
470
471        self.controller.storage.update_introspection_collection(
472            subscribe.spec.introspection_type,
473            StorageWriteOp::Append {
474                updates: new_updates,
475            },
476        );
477    }
478}
479
480impl Staged for IntrospectionSubscribeStage {
481    type Ctx = ();
482
483    fn validity(&mut self) -> &mut PlanValidity {
484        match self {
485            Self::OptimizeMir(stage) => &mut stage.validity,
486            Self::TimestampOptimizeLir(stage) => &mut stage.validity,
487            Self::Finish(stage) => &mut stage.validity,
488        }
489    }
490
491    async fn stage(
492        self,
493        coord: &mut Coordinator,
494        _ctx: &mut (),
495    ) -> Result<StageResult<Box<Self>>, AdapterError> {
496        match self {
497            Self::OptimizeMir(stage) => coord.sequence_introspection_subscribe_optimize_mir(stage),
498            Self::TimestampOptimizeLir(stage) => {
499                coord.sequence_introspection_subscribe_timestamp_optimize_lir(stage)
500            }
501            Self::Finish(stage) => coord.sequence_introspection_subscribe_finish(stage).await,
502        }
503    }
504
505    fn message(self, _ctx: (), span: Span) -> super::Message {
506        Message::IntrospectionSubscribeStageReady { span, stage: self }
507    }
508
509    fn cancel_enabled(&self) -> bool {
510        false
511    }
512}
513
514/// The specification for an introspection subscribe.
515#[derive(Debug)]
516pub(super) struct SubscribeSpec {
517    /// An [`IntrospectionType`] identifying the storage-managed collection to which updates
518    /// received from subscribes instantiated from this spec are written.
519    introspection_type: IntrospectionType,
520    /// The SQL definition of the subscribe.
521    sql: &'static str,
522}
523
524impl SubscribeSpec {
525    fn to_plan(&self, catalog: &dyn SessionCatalog) -> Result<SubscribePlan, anyhow::Error> {
526        let parsed = mz_sql::parse::parse(self.sql)?.into_element();
527        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, parsed.ast)?;
528        let (plan, _sql_impl_ids) =
529            mz_sql::plan::plan(None, catalog, stmt, &Params::empty(), &resolved_ids)?;
530        match plan {
531            Plan::Subscribe(plan) => Ok(plan),
532            _ => bail!("unexpected plan type: {plan:?}"),
533        }
534    }
535}
536
537const SUBSCRIBES: &[SubscribeSpec] = &[
538    SubscribeSpec {
539        introspection_type: IntrospectionType::ComputeErrorCounts,
540        sql: "SUBSCRIBE (
541            SELECT export_id, sum(count)
542            FROM mz_introspection.mz_compute_error_counts_raw
543            GROUP BY export_id
544        )",
545    },
546    SubscribeSpec {
547        introspection_type: IntrospectionType::ComputeHydrationTimes,
548        sql: "SUBSCRIBE (
549            SELECT
550                export_id,
551                CASE count(*) = count(time_ns)
552                    WHEN true THEN max(time_ns)
553                    ELSE NULL
554                END AS time_ns
555            FROM mz_introspection.mz_compute_hydration_times_per_worker
556            WHERE export_id NOT LIKE 't%'
557            GROUP BY export_id
558            OPTIONS (AGGREGATE INPUT GROUP SIZE = 1)
559        )",
560    },
561    SubscribeSpec {
562        introspection_type: IntrospectionType::ComputeOperatorHydrationStatus,
563        sql: "SUBSCRIBE (
564            SELECT
565                export_id,
566                lir_id,
567                bool_and(hydrated) AS hydrated
568            FROM mz_introspection.mz_compute_operator_hydration_statuses_per_worker
569            GROUP BY export_id, lir_id
570        )",
571    },
572    // Per-object arrangement sizes, one row per `(object_id, replica)`,
573    // populating `mz_object_arrangement_sizes`.
574    //
575    // `mz_arrangement_heap_size_raw` and `mz_arrangement_batcher_size_raw` are
576    // differential logs where each `+1` row represents one byte of heap delta;
577    // after consolidation, `COUNT(*)` is the current arrangement size in bytes.
578    //
579    // The `HAVING` floor drops objects below 10 MiB. Below that threshold the
580    // heap-size collection wiggles by a few bytes per second from ordinary
581    // allocator activity, and emitting exact bytes would push a downstream
582    // update on every wiggle. Quantizing to the nearest 10 MiB keeps the
583    // emitted size stable across in-bucket wiggle.
584    //
585    // `mz_dataflow_addresses.address[1]` is the root of each operator's address
586    // tree, which equals the owning `dataflow_id` — so we can go addresses →
587    // operator → dataflow without joining `mz_dataflow_operator_dataflows`.
588    //
589    // Joining on `ce.dataflow_id` assumes one dataflow exports a single object;
590    // if that stops holding, the same arrangement bytes would be attributed
591    // to multiple `export_id`s and we'd need to revisit the granularity.
592    //
593    // Transient export IDs (`t*`) are ephemeral dataflows (peeks, subscribes,
594    // including this one); we drop them to avoid self-feedback churn.
595    SubscribeSpec {
596        introspection_type: IntrospectionType::ComputeObjectArrangementSizes,
597        sql: "SUBSCRIBE (
598            SELECT
599                ce.export_id AS object_id,
600                ((COUNT(*) + 5242880) / 10485760 * 10485760)::int8 AS size
601            FROM mz_introspection.mz_compute_exports AS ce
602            JOIN (
603                SELECT addrs.address[1] AS dataflow_id, addrs.id AS operator_id
604                FROM mz_introspection.mz_dataflow_addresses addrs
605            ) AS od ON od.dataflow_id = ce.dataflow_id
606            JOIN (
607                SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw
608                UNION ALL
609                SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw
610            ) AS rs ON rs.operator_id = od.operator_id
611            WHERE ce.export_id NOT LIKE 't%'
612            GROUP BY ce.export_id
613            HAVING COUNT(*) >= 10485760
614        )",
615    },
616];