Skip to main content

mz_clusterd_test_driver/
driver.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The headless `Driver`: the mechanism's top-level API. Use cases call it.
11
12use std::time::Duration;
13
14use mz_compute_client::protocol::command::{
15    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
16};
17use mz_compute_client::protocol::response::{FrontiersResponse, PeekResponse};
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_compute_types::dyncfgs::ENABLE_PEEK_RESPONSE_STASH;
20use mz_compute_types::plan::render_plan::RenderPlan;
21use mz_dyncfg::ConfigUpdates;
22use mz_expr::{MapFilterProject, RowSetFinishing};
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_repr::{GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, Timestamp};
25use mz_storage_types::controller::CollectionMetadata;
26use timely::progress::Antichain;
27
28use crate::ctp::connect_and_hello;
29use crate::persist_host::PersistHost;
30use crate::responses::{ComputeSender, Responses};
31
32/// Headless frontend to a clusterd replica.
33pub struct Driver {
34    pub host: PersistHost,
35    compute_addr: String,
36    sender: ComputeSender,
37    responses: Responses,
38}
39
40impl Driver {
41    /// Connects to `compute_addr`, sends `Hello`, and starts the response pump.
42    /// `host` provides persist + pubsub. The controller handshake proper
43    /// (`create_instance`, `update_configuration`, `InitializationComplete`) is
44    /// driven by the caller — see the script commands of the same name.
45    pub async fn connect(host: PersistHost, compute_addr: &str) -> anyhow::Result<Self> {
46        let client = connect_and_hello(compute_addr).await?;
47        let (responses, sender) = Responses::spawn(client);
48        Ok(Driver {
49            host,
50            compute_addr: compute_addr.to_string(),
51            sender,
52            responses,
53        })
54    }
55
56    /// Drops the current connection and opens a new one, sending only `Hello`, so
57    /// the caller can re-`create_instance`, replay the dataflows it expects the
58    /// replica to be running, and then send `InitializationComplete` to close the
59    /// reconciliation window.
60    ///
61    /// Replacing `sender` drops the previous [`ComputeSender`]; with no other
62    /// clones, the old pump task's command channel closes and the pump exits,
63    /// dropping the old CTP client and closing the old connection.
64    pub async fn reconnect(&mut self) -> anyhow::Result<()> {
65        let client = connect_and_hello(&self.compute_addr).await?;
66        let (responses, sender) = Responses::spawn(client);
67        self.responses = responses;
68        self.sender = sender;
69        Ok(())
70    }
71
72    /// Sends `CreateInstance`, opening the compute instance (and the reconciliation
73    /// window).
74    ///
75    /// `expiration_offset` and `arrangement_dictionary_compression` are the
76    /// caller-settable [`InstanceConfig`] knobs; `logging` is left at its default
77    /// (introspection logging off — enabling it safely needs `index_logs` wiring)
78    /// and `peek_stash_persist_location` is necessarily the host's, since the driver
79    /// hosts persist.
80    ///
81    /// This also force-disables `ENABLE_PEEK_RESPONSE_STASH`: the driver reads peek
82    /// results inline, so a stashed peek would break [`Self::peek`]/`count`. It is
83    /// patched here rather than exposed as a configuration knob, so a script's
84    /// `update-configuration` cannot turn it back on.
85    pub fn create_instance(
86        &self,
87        expiration_offset: Option<Duration>,
88        arrangement_dictionary_compression: bool,
89    ) -> anyhow::Result<()> {
90        self.send(ComputeCommand::CreateInstance(Box::new(InstanceConfig {
91            logging: Default::default(),
92            expiration_offset,
93            peek_stash_persist_location: self.host.location().clone(),
94            arrangement_dictionary_compression,
95        })))?;
96        let mut dyncfg_updates = ConfigUpdates::default();
97        dyncfg_updates.add(&ENABLE_PEEK_RESPONSE_STASH, false);
98        self.send(ComputeCommand::UpdateConfiguration(Box::new(
99            ComputeParameters {
100                dyncfg_updates,
101                ..Default::default()
102            },
103        )))
104    }
105
106    /// Sends `UpdateConfiguration` with a set of dyncfg updates assembled by the
107    /// caller. Generic over any configuration; the peek-response stash is not among
108    /// them — it is force-disabled in [`Self::create_instance`].
109    pub fn update_configuration(&self, dyncfg_updates: ConfigUpdates) -> anyhow::Result<()> {
110        self.send(ComputeCommand::UpdateConfiguration(Box::new(
111            ComputeParameters {
112                dyncfg_updates,
113                ..Default::default()
114            },
115        )))
116    }
117
118    /// Sends a raw `ComputeCommand`. The primitive behind every interaction;
119    /// use cases drive side effects (`AllowCompaction`, `CancelPeek`, ...) through
120    /// this without the mechanism interpreting them.
121    pub fn send(&self, cmd: ComputeCommand) -> anyhow::Result<()> {
122        self.sender.send(cmd)
123    }
124
125    /// Submits a dataflow. Does NOT schedule it — the caller decides when to
126    /// `schedule`, so side-effect timing stays under test control.
127    pub fn submit_dataflow(
128        &self,
129        df: DataflowDescription<RenderPlan, CollectionMetadata>,
130    ) -> anyhow::Result<()> {
131        self.send(ComputeCommand::CreateDataflow(Box::new(df)))
132    }
133
134    /// Schedules a previously-submitted collection, allowing it to make progress.
135    pub fn schedule(&self, id: GlobalId) -> anyhow::Result<()> {
136        self.send(ComputeCommand::Schedule(id))
137    }
138
139    /// A receiver for an id's full merged frontiers, for use cases that need
140    /// write/input frontiers rather than just output.
141    pub fn frontiers(&self, id: GlobalId) -> tokio::sync::watch::Receiver<FrontiersResponse> {
142        self.responses.frontier(id)
143    }
144
145    /// Waits until `id`'s output frontier reaches at least `target`, or fails.
146    pub async fn expect_frontier(
147        &self,
148        id: GlobalId,
149        target: Timestamp,
150        timeout: Duration,
151    ) -> anyhow::Result<()> {
152        let mut rx = self.responses.frontier(id);
153        let want = Antichain::from_elem(target);
154        tokio::time::timeout(timeout, async {
155            loop {
156                let reached = rx
157                    .borrow_and_update()
158                    .output_frontier
159                    .as_ref()
160                    .is_some_and(|of| timely::PartialOrder::less_equal(&want, of));
161                if reached {
162                    return;
163                }
164                if rx.changed().await.is_err() {
165                    // The watch sender is gone (pump exited, e.g. clusterd
166                    // disconnected). The frontier can no longer advance, so
167                    // park and let the outer timeout fire with its message.
168                    futures::future::pending::<()>().await;
169                }
170            }
171        })
172        .await
173        .map_err(|_| anyhow::anyhow!("frontier for {id} did not reach {target:?} in time"))
174    }
175
176    /// Peeks `target` at `ts`, returning the decoded rows. The target is an index
177    /// (served from the replica's arrangement) or a persist collection — notably a
178    /// materialized-view sink's output shard, which is how `SELECT * FROM mv` reads.
179    /// A persist peek blocks (async-friendly) until the shard seals through `ts`, so
180    /// it doubles as a wait for the writing sink to catch up.
181    pub async fn peek(
182        &self,
183        target: PeekTarget,
184        result_desc: RelationDesc,
185        ts: Timestamp,
186    ) -> anyhow::Result<Vec<Row>> {
187        let uuid = uuid::Uuid::new_v4();
188        let rx = self.responses.register_peek(uuid);
189        let arity = result_desc.arity();
190        // Build an identity MFP: no maps, no filters, project all columns.
191        let map_filter_project = MapFilterProject::new(arity)
192            .into_plan()
193            .map_err(|e| anyhow::anyhow!("failed to plan MFP: {e}"))?
194            .into_nontemporal()
195            .map_err(|_| anyhow::anyhow!("unexpected temporal MFP for identity plan"))?;
196        let peek = Peek {
197            target,
198            result_desc: result_desc.clone(),
199            literal_constraints: None,
200            uuid,
201            timestamp: ts,
202            finishing: RowSetFinishing::trivial(arity),
203            map_filter_project,
204            otel_ctx: OpenTelemetryContext::empty(),
205        };
206        self.send(ComputeCommand::Peek(Box::new(peek)))?;
207        match rx.await? {
208            PeekResponse::Rows(collections) => {
209                let mut rows = Vec::new();
210                for collection in collections {
211                    let mut iter = collection.into_row_iter();
212                    while let Some(row_ref) = iter.next() {
213                        rows.push(row_ref.to_owned());
214                    }
215                }
216                Ok(rows)
217            }
218            PeekResponse::Error(e) => anyhow::bail!("peek error: {e}"),
219            PeekResponse::Canceled => anyhow::bail!("peek canceled"),
220            PeekResponse::Stashed(_) => anyhow::bail!("unexpected stashed peek result"),
221        }
222    }
223
224    /// Convenience: total row count from a peek.
225    pub async fn peek_count(
226        &self,
227        target: PeekTarget,
228        result_desc: RelationDesc,
229        ts: Timestamp,
230    ) -> anyhow::Result<usize> {
231        Ok(self.peek(target, result_desc, ts).await?.len())
232    }
233
234    /// Registers a subscribe-sink buffer for `id`, so the response pump accumulates
235    /// its batches. Call this before scheduling the sink.
236    pub fn register_subscribe(&self, id: GlobalId) {
237        let _ = self.responses.ensure_subscribe(id);
238    }
239
240    /// Waits until subscribe `id`'s upper frontier reaches at least `up_to`, then
241    /// drains and returns its buffered `(row, time, diff)` updates. Fails on timeout
242    /// or if the replica reported a subscribe error.
243    pub async fn await_subscribe(
244        &self,
245        id: GlobalId,
246        up_to: Timestamp,
247        timeout: Duration,
248    ) -> anyhow::Result<Vec<(Row, Timestamp, i64)>> {
249        let mut rx = self.responses.ensure_subscribe(id);
250        let want = Antichain::from_elem(up_to);
251        tokio::time::timeout(timeout, async {
252            loop {
253                // An empty upper (the subscribe was dropped / completed) is past any
254                // finite target, so `less_equal` against it also unblocks.
255                let reached = timely::PartialOrder::less_equal(&want, &*rx.borrow_and_update());
256                if reached {
257                    return;
258                }
259                if rx.changed().await.is_err() {
260                    // The pump exited (clusterd disconnected); the upper can no longer
261                    // advance, so park and let the outer timeout fire.
262                    futures::future::pending::<()>().await;
263                }
264            }
265        })
266        .await
267        .map_err(|_| anyhow::anyhow!("subscribe {id} did not reach {up_to:?} in time"))?;
268        self.responses.drain_subscribe(id)
269    }
270}