mz_compute_client/controller/
introspection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_repr::{Diff, Row};
11use mz_storage_client::client::AppendOnlyUpdate;
12use mz_storage_client::controller::{IntrospectionType, StorageController, StorageWriteOp};
13use mz_storage_types::controller::StorageError;
14use timely::progress::Timestamp;
15use tokio::sync::{mpsc, oneshot};
16use tracing::info;
17
18pub type IntrospectionUpdates = (IntrospectionType, Vec<(Row, Diff)>);
19
20/// Spawn a task sinking introspection updates produced by the compute controller to storage.
21pub fn spawn_introspection_sink<T: Timestamp>(
22    mut rx: mpsc::UnboundedReceiver<IntrospectionUpdates>,
23    storage_controller: &dyn StorageController<Timestamp = T>,
24) {
25    let sink = IntrospectionSink::new(storage_controller);
26
27    mz_ore::task::spawn(|| "compute-introspection-sink", async move {
28        info!("running introspection sink task");
29
30        while let Some((type_, updates)) = rx.recv().await {
31            sink.send(type_, updates);
32        }
33
34        info!("introspection sink task shutting down");
35    });
36}
37
38type Notifier<T> = oneshot::Sender<Result<(), StorageError<T>>>;
39type AppendOnlySender<T> = mpsc::UnboundedSender<(Vec<AppendOnlyUpdate>, Notifier<T>)>;
40type DifferentialSender<T> = mpsc::UnboundedSender<(StorageWriteOp, Notifier<T>)>;
41
42/// A sink for introspection updates produced by the compute controller.
43///
44/// The sender is connected to the storage controller's CollectionManager, which writes received
45/// updates to persist.
46#[derive(Debug)]
47struct IntrospectionSink<T> {
48    /// Sender for [`IntrospectionType::Frontiers`] updates.
49    frontiers_tx: DifferentialSender<T>,
50    /// Sender for [`IntrospectionType::ReplicaFrontiers`] updates.
51    replica_frontiers_tx: DifferentialSender<T>,
52    /// Sender for [`IntrospectionType::ComputeDependencies`] updates.
53    compute_dependencies_tx: DifferentialSender<T>,
54    /// Sender for [`IntrospectionType::ComputeOperatorHydrationStatus`] updates.
55    compute_operator_hydration_status_tx: DifferentialSender<T>,
56    /// Sender for [`IntrospectionType::ComputeMaterializedViewRefreshes`] updates.
57    compute_materialized_view_refreshes_tx: DifferentialSender<T>,
58    /// Sender for [`IntrospectionType::WallclockLagHistory`] updates.
59    wallclock_lag_history_tx: AppendOnlySender<T>,
60    /// Sender for [`IntrospectionType::WallclockLagHistogram`] updates.
61    wallclock_lag_histogram_tx: AppendOnlySender<T>,
62}
63
64impl<T: Timestamp> IntrospectionSink<T> {
65    /// Create a new `IntrospectionSink`.
66    pub fn new(storage_controller: &dyn StorageController<Timestamp = T>) -> Self {
67        use IntrospectionType::*;
68        Self {
69            frontiers_tx: storage_controller.differential_introspection_tx(Frontiers),
70            replica_frontiers_tx: storage_controller
71                .differential_introspection_tx(ReplicaFrontiers),
72            compute_dependencies_tx: storage_controller
73                .differential_introspection_tx(ComputeDependencies),
74            compute_operator_hydration_status_tx: storage_controller
75                .differential_introspection_tx(ComputeOperatorHydrationStatus),
76            compute_materialized_view_refreshes_tx: storage_controller
77                .differential_introspection_tx(ComputeMaterializedViewRefreshes),
78            wallclock_lag_history_tx: storage_controller
79                .append_only_introspection_tx(WallclockLagHistory),
80            wallclock_lag_histogram_tx: storage_controller
81                .append_only_introspection_tx(WallclockLagHistogram),
82        }
83    }
84
85    /// Send a batch of updates of the given introspection type.
86    pub fn send(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
87        let send_append_only = |tx: &AppendOnlySender<_>, updates: Vec<_>| {
88            let updates = updates.into_iter().map(AppendOnlyUpdate::Row).collect();
89            let (notifier, _) = oneshot::channel();
90            let _ = tx.send((updates, notifier));
91        };
92        let send_differential = |tx: &DifferentialSender<_>, updates: Vec<_>| {
93            let op = StorageWriteOp::Append { updates };
94            let (notifier, _) = oneshot::channel();
95            let _ = tx.send((op, notifier));
96        };
97
98        use IntrospectionType::*;
99        match type_ {
100            Frontiers => send_differential(&self.frontiers_tx, updates),
101            ReplicaFrontiers => send_differential(&self.replica_frontiers_tx, updates),
102            ComputeDependencies => send_differential(&self.compute_dependencies_tx, updates),
103            ComputeOperatorHydrationStatus => {
104                send_differential(&self.compute_operator_hydration_status_tx, updates);
105            }
106            ComputeMaterializedViewRefreshes => {
107                send_differential(&self.compute_materialized_view_refreshes_tx, updates);
108            }
109            WallclockLagHistory => send_append_only(&self.wallclock_lag_history_tx, updates),
110            WallclockLagHistogram => send_append_only(&self.wallclock_lag_histogram_tx, updates),
111            _ => panic!("unexpected introspection type: {type_:?}"),
112        }
113    }
114}