Skip to main content

mz_compute_client/controller/
introspection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_repr::{Diff, Row};
11use mz_storage_client::client::AppendOnlyUpdate;
12use mz_storage_client::controller::{IntrospectionType, StorageController, StorageWriteOp};
13use mz_storage_types::controller::StorageError;
14use tokio::sync::{mpsc, oneshot};
15use tracing::info;
16
17pub type IntrospectionUpdates = (IntrospectionType, Vec<(Row, Diff)>);
18
19/// Spawn a task sinking introspection updates produced by the compute controller to storage.
20pub fn spawn_introspection_sink(
21    mut rx: mpsc::UnboundedReceiver<IntrospectionUpdates>,
22    storage_controller: &dyn StorageController,
23) {
24    let sink = IntrospectionSink::new(storage_controller);
25
26    mz_ore::task::spawn(|| "compute-introspection-sink", async move {
27        info!("running introspection sink task");
28
29        while let Some((type_, updates)) = rx.recv().await {
30            sink.send(type_, updates);
31        }
32
33        info!("introspection sink task shutting down");
34    });
35}
36
37type Notifier = oneshot::Sender<Result<(), StorageError>>;
38type AppendOnlySender = mpsc::UnboundedSender<(Vec<AppendOnlyUpdate>, Notifier)>;
39type DifferentialSender = mpsc::UnboundedSender<(StorageWriteOp, Notifier)>;
40
41/// A sink for introspection updates produced by the compute controller.
42///
43/// The sender is connected to the storage controller's CollectionManager, which writes received
44/// updates to persist.
45#[derive(Debug)]
46struct IntrospectionSink {
47    /// Sender for [`IntrospectionType::Frontiers`] updates.
48    frontiers_tx: DifferentialSender,
49    /// Sender for [`IntrospectionType::ReplicaFrontiers`] updates.
50    replica_frontiers_tx: DifferentialSender,
51    /// Sender for [`IntrospectionType::ComputeDependencies`] updates.
52    compute_dependencies_tx: DifferentialSender,
53    /// Sender for [`IntrospectionType::ComputeMaterializedViewRefreshes`] updates.
54    compute_materialized_view_refreshes_tx: DifferentialSender,
55    /// Sender for [`IntrospectionType::WallclockLagHistory`] updates.
56    wallclock_lag_history_tx: AppendOnlySender,
57    /// Sender for [`IntrospectionType::WallclockLagHistogram`] updates.
58    wallclock_lag_histogram_tx: AppendOnlySender,
59}
60
61impl IntrospectionSink {
62    /// Create a new `IntrospectionSink`.
63    pub fn new(storage_controller: &dyn StorageController) -> Self {
64        use IntrospectionType::*;
65        Self {
66            frontiers_tx: storage_controller.differential_introspection_tx(Frontiers),
67            replica_frontiers_tx: storage_controller
68                .differential_introspection_tx(ReplicaFrontiers),
69            compute_dependencies_tx: storage_controller
70                .differential_introspection_tx(ComputeDependencies),
71            compute_materialized_view_refreshes_tx: storage_controller
72                .differential_introspection_tx(ComputeMaterializedViewRefreshes),
73            wallclock_lag_history_tx: storage_controller
74                .append_only_introspection_tx(WallclockLagHistory),
75            wallclock_lag_histogram_tx: storage_controller
76                .append_only_introspection_tx(WallclockLagHistogram),
77        }
78    }
79
80    /// Send a batch of updates of the given introspection type.
81    pub fn send(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
82        let send_append_only = |tx: &AppendOnlySender, updates: Vec<_>| {
83            let updates = updates.into_iter().map(AppendOnlyUpdate::Row).collect();
84            let (notifier, _) = oneshot::channel();
85            let _ = tx.send((updates, notifier));
86        };
87        let send_differential = |tx: &DifferentialSender, updates: Vec<_>| {
88            let op = StorageWriteOp::Append { updates };
89            let (notifier, _) = oneshot::channel();
90            let _ = tx.send((op, notifier));
91        };
92
93        use IntrospectionType::*;
94        match type_ {
95            Frontiers => send_differential(&self.frontiers_tx, updates),
96            ReplicaFrontiers => send_differential(&self.replica_frontiers_tx, updates),
97            ComputeDependencies => send_differential(&self.compute_dependencies_tx, updates),
98            ComputeMaterializedViewRefreshes => {
99                send_differential(&self.compute_materialized_view_refreshes_tx, updates);
100            }
101            WallclockLagHistory => send_append_only(&self.wallclock_lag_history_tx, updates),
102            WallclockLagHistogram => send_append_only(&self.wallclock_lag_histogram_tx, updates),
103            _ => panic!("unexpected introspection type: {type_:?}"),
104        }
105    }
106}