mz_compute_client/controller/
introspection.rs1use mz_repr::{Diff, Row};
11use mz_storage_client::client::AppendOnlyUpdate;
12use mz_storage_client::controller::{IntrospectionType, StorageController, StorageWriteOp};
13use mz_storage_types::controller::StorageError;
14use tokio::sync::{mpsc, oneshot};
15use tracing::info;
16
17pub type IntrospectionUpdates = (IntrospectionType, Vec<(Row, Diff)>);
18
19pub fn spawn_introspection_sink(
21 mut rx: mpsc::UnboundedReceiver<IntrospectionUpdates>,
22 storage_controller: &dyn StorageController,
23) {
24 let sink = IntrospectionSink::new(storage_controller);
25
26 mz_ore::task::spawn(|| "compute-introspection-sink", async move {
27 info!("running introspection sink task");
28
29 while let Some((type_, updates)) = rx.recv().await {
30 sink.send(type_, updates);
31 }
32
33 info!("introspection sink task shutting down");
34 });
35}
36
37type Notifier = oneshot::Sender<Result<(), StorageError>>;
38type AppendOnlySender = mpsc::UnboundedSender<(Vec<AppendOnlyUpdate>, Notifier)>;
39type DifferentialSender = mpsc::UnboundedSender<(StorageWriteOp, Notifier)>;
40
41#[derive(Debug)]
46struct IntrospectionSink {
47 frontiers_tx: DifferentialSender,
49 replica_frontiers_tx: DifferentialSender,
51 compute_dependencies_tx: DifferentialSender,
53 compute_materialized_view_refreshes_tx: DifferentialSender,
55 wallclock_lag_history_tx: AppendOnlySender,
57 wallclock_lag_histogram_tx: AppendOnlySender,
59}
60
61impl IntrospectionSink {
62 pub fn new(storage_controller: &dyn StorageController) -> Self {
64 use IntrospectionType::*;
65 Self {
66 frontiers_tx: storage_controller.differential_introspection_tx(Frontiers),
67 replica_frontiers_tx: storage_controller
68 .differential_introspection_tx(ReplicaFrontiers),
69 compute_dependencies_tx: storage_controller
70 .differential_introspection_tx(ComputeDependencies),
71 compute_materialized_view_refreshes_tx: storage_controller
72 .differential_introspection_tx(ComputeMaterializedViewRefreshes),
73 wallclock_lag_history_tx: storage_controller
74 .append_only_introspection_tx(WallclockLagHistory),
75 wallclock_lag_histogram_tx: storage_controller
76 .append_only_introspection_tx(WallclockLagHistogram),
77 }
78 }
79
80 pub fn send(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
82 let send_append_only = |tx: &AppendOnlySender, updates: Vec<_>| {
83 let updates = updates.into_iter().map(AppendOnlyUpdate::Row).collect();
84 let (notifier, _) = oneshot::channel();
85 let _ = tx.send((updates, notifier));
86 };
87 let send_differential = |tx: &DifferentialSender, updates: Vec<_>| {
88 let op = StorageWriteOp::Append { updates };
89 let (notifier, _) = oneshot::channel();
90 let _ = tx.send((op, notifier));
91 };
92
93 use IntrospectionType::*;
94 match type_ {
95 Frontiers => send_differential(&self.frontiers_tx, updates),
96 ReplicaFrontiers => send_differential(&self.replica_frontiers_tx, updates),
97 ComputeDependencies => send_differential(&self.compute_dependencies_tx, updates),
98 ComputeMaterializedViewRefreshes => {
99 send_differential(&self.compute_materialized_view_refreshes_tx, updates);
100 }
101 WallclockLagHistory => send_append_only(&self.wallclock_lag_history_tx, updates),
102 WallclockLagHistogram => send_append_only(&self.wallclock_lag_histogram_tx, updates),
103 _ => panic!("unexpected introspection type: {type_:?}"),
104 }
105 }
106}