mz_compute_client/controller/
introspection.rs
1use mz_repr::{Diff, Row};
11use mz_storage_client::client::AppendOnlyUpdate;
12use mz_storage_client::controller::{IntrospectionType, StorageController, StorageWriteOp};
13use mz_storage_types::controller::StorageError;
14use timely::progress::Timestamp;
15use tokio::sync::{mpsc, oneshot};
16use tracing::info;
17
18pub type IntrospectionUpdates = (IntrospectionType, Vec<(Row, Diff)>);
19
20pub fn spawn_introspection_sink<T: Timestamp>(
22 mut rx: mpsc::UnboundedReceiver<IntrospectionUpdates>,
23 storage_controller: &dyn StorageController<Timestamp = T>,
24) {
25 let sink = IntrospectionSink::new(storage_controller);
26
27 mz_ore::task::spawn(|| "compute-introspection-sink", async move {
28 info!("running introspection sink task");
29
30 while let Some((type_, updates)) = rx.recv().await {
31 sink.send(type_, updates);
32 }
33
34 info!("introspection sink task shutting down");
35 });
36}
37
38type Notifier<T> = oneshot::Sender<Result<(), StorageError<T>>>;
39type AppendOnlySender<T> = mpsc::UnboundedSender<(Vec<AppendOnlyUpdate>, Notifier<T>)>;
40type DifferentialSender<T> = mpsc::UnboundedSender<(StorageWriteOp, Notifier<T>)>;
41
42#[derive(Debug)]
47struct IntrospectionSink<T> {
48 frontiers_tx: DifferentialSender<T>,
50 replica_frontiers_tx: DifferentialSender<T>,
52 compute_dependencies_tx: DifferentialSender<T>,
54 compute_operator_hydration_status_tx: DifferentialSender<T>,
56 compute_materialized_view_refreshes_tx: DifferentialSender<T>,
58 wallclock_lag_history_tx: AppendOnlySender<T>,
60 wallclock_lag_histogram_tx: AppendOnlySender<T>,
62}
63
64impl<T: Timestamp> IntrospectionSink<T> {
65 pub fn new(storage_controller: &dyn StorageController<Timestamp = T>) -> Self {
67 use IntrospectionType::*;
68 Self {
69 frontiers_tx: storage_controller.differential_introspection_tx(Frontiers),
70 replica_frontiers_tx: storage_controller
71 .differential_introspection_tx(ReplicaFrontiers),
72 compute_dependencies_tx: storage_controller
73 .differential_introspection_tx(ComputeDependencies),
74 compute_operator_hydration_status_tx: storage_controller
75 .differential_introspection_tx(ComputeOperatorHydrationStatus),
76 compute_materialized_view_refreshes_tx: storage_controller
77 .differential_introspection_tx(ComputeMaterializedViewRefreshes),
78 wallclock_lag_history_tx: storage_controller
79 .append_only_introspection_tx(WallclockLagHistory),
80 wallclock_lag_histogram_tx: storage_controller
81 .append_only_introspection_tx(WallclockLagHistogram),
82 }
83 }
84
85 pub fn send(&self, type_: IntrospectionType, updates: Vec<(Row, Diff)>) {
87 let send_append_only = |tx: &AppendOnlySender<_>, updates: Vec<_>| {
88 let updates = updates.into_iter().map(AppendOnlyUpdate::Row).collect();
89 let (notifier, _) = oneshot::channel();
90 let _ = tx.send((updates, notifier));
91 };
92 let send_differential = |tx: &DifferentialSender<_>, updates: Vec<_>| {
93 let op = StorageWriteOp::Append { updates };
94 let (notifier, _) = oneshot::channel();
95 let _ = tx.send((op, notifier));
96 };
97
98 use IntrospectionType::*;
99 match type_ {
100 Frontiers => send_differential(&self.frontiers_tx, updates),
101 ReplicaFrontiers => send_differential(&self.replica_frontiers_tx, updates),
102 ComputeDependencies => send_differential(&self.compute_dependencies_tx, updates),
103 ComputeOperatorHydrationStatus => {
104 send_differential(&self.compute_operator_hydration_status_tx, updates);
105 }
106 ComputeMaterializedViewRefreshes => {
107 send_differential(&self.compute_materialized_view_refreshes_tx, updates);
108 }
109 WallclockLagHistory => send_append_only(&self.wallclock_lag_history_tx, updates),
110 WallclockLagHistogram => send_append_only(&self.wallclock_lag_histogram_tx, updates),
111 _ => panic!("unexpected introspection type: {type_:?}"),
112 }
113 }
114}