mz_clusterd_test_driver/ctp.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute CTP connection and the `Hello` step of the controller handshake.
11//! Generic: it sends any `ComputeCommand` and receives any `ComputeResponse`.
12//!
13//! Only `Hello` (the transport/version step) happens here; the controller
14//! handshake proper — `CreateInstance`, `UpdateConfiguration`,
15//! `InitializationComplete` — is driven by explicit caller commands, so the
16//! caller controls the instance config, the peek-stash setting, and exactly when
17//! the reconciliation window opens and closes.
18
19use std::time::Duration;
20
21use mz_compute_client::protocol::command::ComputeCommand;
22use mz_compute_client::protocol::response::ComputeResponse;
23use mz_service::client::GenericClient;
24use mz_service::transport::{Client, NoopMetrics};
25use uuid::Uuid;
26
27pub type ComputeCtpClient = Client<ComputeCommand, ComputeResponse>;
28
29/// Connects to a clusterd compute controller address and sends `Hello`, leaving
30/// the controller handshake (`CreateInstance` onward) to the caller.
31///
32/// A reconnect re-runs exactly this: a fresh transport connection plus `Hello`.
33/// The reconciliation window then opens when the script sends `CreateInstance`
34/// and closes when it sends `InitializationComplete`; in between, the replica
35/// reconciles the replayed dataflows against its live ones rather than
36/// rehydrating.
37pub async fn connect_and_hello(compute_addr: &str) -> anyhow::Result<ComputeCtpClient> {
38 // Use persist-client's BUILD_INFO: it is release-versioned (synced by
39 // bin/bump-version), so it matches the clusterd we connect to. Our own
40 // crate is `0.0.0`, which would fail the handshake's version check.
41 let version = mz_persist_client::BUILD_INFO.semver_version();
42 let mut client = Client::<ComputeCommand, ComputeResponse>::connect(
43 compute_addr,
44 version,
45 Duration::from_secs(30),
46 Duration::from_secs(60),
47 NoopMetrics,
48 )
49 .await?;
50
51 client
52 .send(ComputeCommand::Hello {
53 nonce: Uuid::new_v4(),
54 })
55 .await?;
56
57 Ok(client)
58}
59
60#[cfg(test)]
61mod tests {
62 use super::*;
63 use crate::target;
64
65 #[mz_ore::test(tokio::test)]
66 #[cfg_attr(miri, ignore)]
67 async fn hello_holds_connection() {
68 if !target::e2e_enabled() {
69 return;
70 }
71 let mut client = connect_and_hello(&target::compute_addr())
72 .await
73 .expect("hello");
74 let r = tokio::time::timeout(Duration::from_millis(500), client.recv()).await;
75 assert!(r.is_err() || r.unwrap().is_ok());
76 }
77}