mz_clusterd_test_driver/
driver.rs1use std::time::Duration;
13
14use mz_compute_client::protocol::command::{
15 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
16};
17use mz_compute_client::protocol::response::{FrontiersResponse, PeekResponse};
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_compute_types::dyncfgs::ENABLE_PEEK_RESPONSE_STASH;
20use mz_compute_types::plan::render_plan::RenderPlan;
21use mz_dyncfg::ConfigUpdates;
22use mz_expr::{MapFilterProject, RowSetFinishing};
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_repr::{GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, Timestamp};
25use mz_storage_types::controller::CollectionMetadata;
26use timely::progress::Antichain;
27
28use crate::ctp::connect_and_hello;
29use crate::persist_host::PersistHost;
30use crate::responses::{ComputeSender, Responses};
31
32pub struct Driver {
34 pub host: PersistHost,
35 compute_addr: String,
36 sender: ComputeSender,
37 responses: Responses,
38}
39
40impl Driver {
41 pub async fn connect(host: PersistHost, compute_addr: &str) -> anyhow::Result<Self> {
46 let client = connect_and_hello(compute_addr).await?;
47 let (responses, sender) = Responses::spawn(client);
48 Ok(Driver {
49 host,
50 compute_addr: compute_addr.to_string(),
51 sender,
52 responses,
53 })
54 }
55
56 pub async fn reconnect(&mut self) -> anyhow::Result<()> {
65 let client = connect_and_hello(&self.compute_addr).await?;
66 let (responses, sender) = Responses::spawn(client);
67 self.responses = responses;
68 self.sender = sender;
69 Ok(())
70 }
71
72 pub fn create_instance(
86 &self,
87 expiration_offset: Option<Duration>,
88 arrangement_dictionary_compression: bool,
89 ) -> anyhow::Result<()> {
90 self.send(ComputeCommand::CreateInstance(Box::new(InstanceConfig {
91 logging: Default::default(),
92 expiration_offset,
93 peek_stash_persist_location: self.host.location().clone(),
94 arrangement_dictionary_compression,
95 })))?;
96 let mut dyncfg_updates = ConfigUpdates::default();
97 dyncfg_updates.add(&ENABLE_PEEK_RESPONSE_STASH, false);
98 self.send(ComputeCommand::UpdateConfiguration(Box::new(
99 ComputeParameters {
100 dyncfg_updates,
101 ..Default::default()
102 },
103 )))
104 }
105
106 pub fn update_configuration(&self, dyncfg_updates: ConfigUpdates) -> anyhow::Result<()> {
110 self.send(ComputeCommand::UpdateConfiguration(Box::new(
111 ComputeParameters {
112 dyncfg_updates,
113 ..Default::default()
114 },
115 )))
116 }
117
118 pub fn send(&self, cmd: ComputeCommand) -> anyhow::Result<()> {
122 self.sender.send(cmd)
123 }
124
125 pub fn submit_dataflow(
128 &self,
129 df: DataflowDescription<RenderPlan, CollectionMetadata>,
130 ) -> anyhow::Result<()> {
131 self.send(ComputeCommand::CreateDataflow(Box::new(df)))
132 }
133
134 pub fn schedule(&self, id: GlobalId) -> anyhow::Result<()> {
136 self.send(ComputeCommand::Schedule(id))
137 }
138
139 pub fn frontiers(&self, id: GlobalId) -> tokio::sync::watch::Receiver<FrontiersResponse> {
142 self.responses.frontier(id)
143 }
144
145 pub async fn expect_frontier(
147 &self,
148 id: GlobalId,
149 target: Timestamp,
150 timeout: Duration,
151 ) -> anyhow::Result<()> {
152 let mut rx = self.responses.frontier(id);
153 let want = Antichain::from_elem(target);
154 tokio::time::timeout(timeout, async {
155 loop {
156 let reached = rx
157 .borrow_and_update()
158 .output_frontier
159 .as_ref()
160 .is_some_and(|of| timely::PartialOrder::less_equal(&want, of));
161 if reached {
162 return;
163 }
164 if rx.changed().await.is_err() {
165 futures::future::pending::<()>().await;
169 }
170 }
171 })
172 .await
173 .map_err(|_| anyhow::anyhow!("frontier for {id} did not reach {target:?} in time"))
174 }
175
176 pub async fn peek(
182 &self,
183 target: PeekTarget,
184 result_desc: RelationDesc,
185 ts: Timestamp,
186 ) -> anyhow::Result<Vec<Row>> {
187 let uuid = uuid::Uuid::new_v4();
188 let rx = self.responses.register_peek(uuid);
189 let arity = result_desc.arity();
190 let map_filter_project = MapFilterProject::new(arity)
192 .into_plan()
193 .map_err(|e| anyhow::anyhow!("failed to plan MFP: {e}"))?
194 .into_nontemporal()
195 .map_err(|_| anyhow::anyhow!("unexpected temporal MFP for identity plan"))?;
196 let peek = Peek {
197 target,
198 result_desc: result_desc.clone(),
199 literal_constraints: None,
200 uuid,
201 timestamp: ts,
202 finishing: RowSetFinishing::trivial(arity),
203 map_filter_project,
204 otel_ctx: OpenTelemetryContext::empty(),
205 };
206 self.send(ComputeCommand::Peek(Box::new(peek)))?;
207 match rx.await? {
208 PeekResponse::Rows(collections) => {
209 let mut rows = Vec::new();
210 for collection in collections {
211 let mut iter = collection.into_row_iter();
212 while let Some(row_ref) = iter.next() {
213 rows.push(row_ref.to_owned());
214 }
215 }
216 Ok(rows)
217 }
218 PeekResponse::Error(e) => anyhow::bail!("peek error: {e}"),
219 PeekResponse::Canceled => anyhow::bail!("peek canceled"),
220 PeekResponse::Stashed(_) => anyhow::bail!("unexpected stashed peek result"),
221 }
222 }
223
224 pub async fn peek_count(
226 &self,
227 target: PeekTarget,
228 result_desc: RelationDesc,
229 ts: Timestamp,
230 ) -> anyhow::Result<usize> {
231 Ok(self.peek(target, result_desc, ts).await?.len())
232 }
233
234 pub fn register_subscribe(&self, id: GlobalId) {
237 let _ = self.responses.ensure_subscribe(id);
238 }
239
240 pub async fn await_subscribe(
244 &self,
245 id: GlobalId,
246 up_to: Timestamp,
247 timeout: Duration,
248 ) -> anyhow::Result<Vec<(Row, Timestamp, i64)>> {
249 let mut rx = self.responses.ensure_subscribe(id);
250 let want = Antichain::from_elem(up_to);
251 tokio::time::timeout(timeout, async {
252 loop {
253 let reached = timely::PartialOrder::less_equal(&want, &*rx.borrow_and_update());
256 if reached {
257 return;
258 }
259 if rx.changed().await.is_err() {
260 futures::future::pending::<()>().await;
263 }
264 }
265 })
266 .await
267 .map_err(|_| anyhow::anyhow!("subscribe {id} did not reach {up_to:?} in time"))?;
268 self.responses.drain_subscribe(id)
269 }
270}